PyQSでお気軽Task Queue構築

PythonでQueue制御をする場合、Celeryなどが定番ですが、Amazon SQSを使ったPyQSというのを見つけたのでちょっと使ってみました。READMEとは微妙に挙動が異なるので苦労しましたが、非常にお気軽なTask Queue構築が可能なのはありがたいです。

設置

設置は簡単でpipコマンド一発です。
pip install pyqs

環境変数

以下の環境変数が必要です。

  • AWS_ACCESS_KEY_ID AWSS_SECRET_ACCESS_KEY…Amazon SQSへのRead/Write権限を備えていること
  • PYTHONPATH…Queueで実行するPythonスクリプトを参照できるようにしておきます

タスク

pyqsからデコレーターtaskをインポートして、Queueに登録したい関数に載せます。

qqq/tasks.py
from pyqs import task


@task('queue0')
def another_task(message):
    print "another_task: message={}".format(message)


@task('queue0')
def send_email(subject):
    print "send_email: subject={}".format(subject)

Queueへの登録

add_queue.py
from qqq.tasks import another_task, send_email
from settings import config

for i in range(0, 100):
    send_email.delay(subject='hogehoge')
    another_task.delay(message='hogehogehoge')

Workerの起動

run_queue.sh
#! /bin/bash

export PYTHONPATH=`pwd`
export QUEUE='queue0'
pyqs $QUEUE

GithubのREADMEによればQueue名は「queue0.tasks.send_email」とか「queue.tasks.another_task」になるはずですが、実際のSQSはQueue名にピリオドは許してくれないので、@taskデコレータで宣言したqueue0がQueue名になります。ここを理解するまでに2時間くらいソース追いかけましたよ。

まとめ

自力でメッセージブローカーを設定しなくても、Amazon SQSへのアクセス権があれば即Task Queueを作れるのは魅力的です。あとSQSはタダ同然なのもありがたいですね。

 Github

ソースは こちら

続きを読む

JAWS DAYS 2017に参加してきた話(後編)

JAWS DAYS 2017に参加してきた!

2017/03/11(土)にTOC五反田メッセで開催されたJAWS DAYS 2017
ボランティアスタッフとして参加してきました。

JAWS DAYS 2017にスカラーシップで参加してきた話の後編です。
前編はコチラになります。

JAWS DAYS 2017当日

セッションはどれも超満員!
F01_8258(1).jpg

私はハンズオンセッションの受付にいることが多かったので
あまり他のセッションを見て歩くことは出来ませんでしたが
合間合間に他のスタッフの方と話すだけでも、ものすごい刺激を受けます。
ただ歩き回ってるだけでも楽しくて仕方ない。

思い思いのメッセージを残してみたり
P_20170311_151917.jpg

AWSドンジャラに興じる人もいれば・・・
P8680606.jpg

AWSカルタに汗を流す人もいます。
P8680605.jpg

もちろん私もスカラーシップの責務、LTちゃんとやってきましたよ!
ASCII.jp様が良い感じにまとめてくれていたので
興味がある方は、コチラの記事をお読みください。
何か自分のLTのこと紹介するのも照れくさいですしね。(笑)

圧巻だったのは懇親会ですかね。
「何人いるんだよ」というくらいの人が参加しており
マジでもう懇親会のレベルじゃねぇ。
いちいちスケールがでかくて笑えます。

P8680622.jpg

20170311-P3110544.jpg

P8680631.jpg

ちなみに来場者は過去最高の1,200人オーバーだそうです。
すげー。

懇親会の後は、スタッフでの打ち上げ。
前日は、LT準備のため参加出来なかった私も参加してきました。
が、困ったことに疲労とお酒で何を話してたかあまり覚えてないんです。。。
楽しかったことだけは確かなんですが(笑)

20170311-DSC03070.jpg

≪JAWS DAYS当日の感想≫
参加する人、運営する人、各々でそれぞれの楽しみ方をしている姿がとても印象的でした。
セッションの内容も大真面目なものから、ネタに走ったものまで色々で
「学びに来る」というよりも「刺激を受けに来る」という方が合ってるかもしれません。

「クラウドとかあまり知らないし行っても仕方ないんじゃ・・・」
という人こそ、是非来て欲しいイベントだなーと感じました。
1年間の週末のうち、たった一日を割くだけの価値は必ず見つかりますよ!

JAWS DAYS 2017翌日(JAWS-UG総会)

JAWS DAYSの翌日は、pre-dayが行われたアマゾン ジャパン本社で
JAWS-UGの総会が行われ、代表の引継ぎ、各エリアでの現況報告、課題の洗い出しなどが行われました。
3日間の参加で、この日が唯一のノンアルDayでした。

20170312-P3120009.jpg

20170312-P3120025.jpg

20170312-P3120028.jpg

私は函館支部をRebootしたばかりで、JAWS-UG支部の運営に関して勝手のわからないことばかりだったので
各支部を運営してきた先人の方達に色々教えて頂く良い機会になりました。

≪3日間のDAYS参加の感想≫
JAWS DAYSはスタッフはおろか、参加者としても初めての参加でしたので
本当にあっという間に終わってしまった感がありますが
これだけのイベントの運営に関われる機会もそうそうありません。
良い経験になったと思います。

今回はスカラーシッププログラムというキッカケがあり、私は参加出来ました。
私のように地方に住んでいる人間からすると非常に有難い制度だと思います。
JAWS-UGの運営予算がどのようになっているのか、正直分からない部分は多いですが
今後も是非続けて欲しい取り組みです。

また、この記事を読んでJAWS-UGというものに興味を持った方がいれば
ほんの少しだけ思い切って飛び込んで来て欲しいです。
どの支部の人達も、新たに加わる仲間にはとても優しい人たちばかりでした。

最後になりますが、推薦してくれたJAWS-UG札幌支部のKさん、良い機会をありがとうございました!
来年は自腹で参加するぞー

最後にスカラーシップ参加者で記念撮影
20170312-P3120042.jpg
またお会いしましょうね!!

続きを読む

JAWS DAYS 2017に参加してきた話(前編)

JAWS DAYS 2017に参加してきた!

2017/03/11(土)にTOC五反田メッセで開催されたJAWS DAYS 2017
ボランティアスタッフとして参加してきました。

≪はじめに≫
セッションの内容やスライドの紹介は、他の方もたくさん記事にされているので
そういった内容をお求めの方はコチラから辿ると良いかと。
講演で使われたスライドはコチラにまとめられていました。

≪スカラーシップでの参加≫
参加の経緯は、以前から交流のあったJAWS-UG札幌支部の方から
「今年からスカラーシッププログラムを始めるのだけど、推薦するから北海道の支部選抜として参加しない?」
と、声を掛けて頂いたことがキッカケでした。

なんでもスカラーシッププログラムで参加すると
交通費・宿泊費・当日の参加費をJAWS-UGで負担して頂けるとのこと!
参加条件は

  • 九州、沖縄、中国四国、北陸、東北、北海道のいずれかの支部に所属していること
  • 事前、当日スタッフとしてJAWS DAYSを盛り上げること
  • 3/10~3/12に必ずイベントに参加できること
  • JAWS-UGの支部運営メンバーによる推薦があること
  • 会社の経費等ではなく、あくまでプライベートで参加する方
  • イベント参加後にブログレポートを書くこと (これがソレです)

等々、他にも細々とした条件がありますが
私のように地方に住んでいて、東京のイベントまでなかなか行くことの出来ない人には願ってもない条件です。
えぇ、飛び付きましたとも。

スカラーシッププログラムの目的は

  • JAWS-UGを運営する次世代のヒーローを全国から発掘する
  • JAWS-UGが開催する最大規模のラーニングイベントの熱量を一人でも多くのメンバーと共有する仕組みを作り、コミュニティの成長の可能性を広げる

とのことですが、「次世代のヒーロー」と呼ばれるには結構年いってんだけど大丈夫かなという不安を抱きつつも
JAWS DAYS自体参加したことが無かったこともあり、誘惑に負けて申込書に記入してしまいました。
今年の一枠はオッサンにください。ゴメンナサイ。

他の地域からの選抜メンバーはコチラで紹介されてました。
私の他にも東北、中国・四国、九州からもスカラーシップ参加者がいるようです。
お会いするのが楽しみです。

で、スカラーシップでの参加が決まってから言われたのですが
「当日4分のLTよろしくね☆」とのこと。
まじかよ聞いてねーよとも言えず、大人しく準備をすることにします。
(LTあまり得意じゃないんです。。。)

JAWS DAYS 2017前日(pre-day)

そんなこんながありつつも前日(3/10)を迎えます。
この日は、JAWS DAYSを前日に控えてアマゾン ジャパン本社のセミナールームに
各支部のメンバー(参加出来る方のみ)が集まりました。

明日の打ち合わせとかするんだろうなぁと思ってたんですが
そんなことは全くなく普通に飲み会でした。
マジか。
真面目な私はメモ帳とか用意してたんですが・・・(笑)

このpre-dayには台湾、韓国、シンガポールのUGの方々も参加した交流会を兼ねたもので
ビールやピザを片手に和やかなムードで場は進みます。

20170310-P3100008.jpg

この中でスカラーシップ参加者の自己紹介のターンがあったのですが
海外UGの方々も参加していることもあり、「英語でお願いします」と無茶振り・・・
そこはスカラーシップ最年長参加者のこの私、軽やかなスルーで思い切り日本語で自己紹介してしまいます。
ホントごめんなさい(私信)

最後は皆で記念撮影!
P8680577.jpg

この後、二次会もあったのですが
翌日のLTの準備が終わっていない私は泣く泣く離脱・・・

≪pre-dayの感想≫
Amazonのエレベーターかっけー(オイ)
Image_09cae74.jpg

JAWS DAYS 2017当日

P_20170311_102606.jpg

当日は朝8時に会場に集合し、会場設営の準備から始まります。
JAWS DAYSの参加自体が初めてな私は、勝手がわからず右往左往してしまうこともありましたが
何とか最低限のお手伝いは出来たかな?

DSC_9134.jpg

F01_8226.jpg

IMG_0698.jpg

このJAWS DAYSですが、全国のJAWS-UG支部からボランティアでメンバーが集まり運営されているイベントで
ボランティアスタッフだけでも100名を超えるとのこと。
改めてすごいイベントだなぁと思います。
国内で他に無い規模じゃないですかね?(少なくとも私は聞いたことないです)

個人的にイイネ!と思ったのは託児ルームでしたね。
私も小さい子供がいるので、東京に住んでたら利用してたと思います。
しかもちゃんとプロの方が常駐して対応してくれるのです。

P_20170311_102829.jpg

これも今年からの試みだったそうですが、是非続けて欲しいなと思いました。

長くなってきたので、いったん切ります。
後編はコチラをご覧ください!

続きを読む

R で Amazon Athena を活用する | Amazon Web Services ブログ

Amazon Web Services ブログ R で Amazon Athena を活用する by AWS Japan Staff | on 22 MAR 2017 | in Amazon Athena | Permalink データサイエンティストはしばしば、R から SQL クエリを投げるときに、その裏側のビッグデータ基盤のインフラ管理を気に掛けなければなりません。 Amazon Athen… 続きを読む

DynamoDB StreamをLambda(Python)で処理する時の共通事前処理を考える

AWSリソース同士の連携でLambdaをつかうのは便利だけどeventが毎度複雑でたいへん。
今回はDynamoDB StreamをPythonで拾うとき、主要な処理以外をなんとか簡潔に書けるようにしたいチャレンジ。

DynamoDB ストリーム と AWS Lambda のトリガー – Amazon DynamoDB

共通処理の仕様

このあたりの処理を使いまわせば楽になりそう。

  • 対応する関数(lambda含む)を登録して、レコードごとに実行したい

    • 呼ばれる側はコンテキスト決め打ちでよいので、分岐とか不要
  • Itemのデータを取り出しやすくする
    • ついでにDynamoおなじみの型付きでくるので、PythonのDictに変換する
  • (Option) 例外処理

で、エントリーポイントは、こんな風に書けたらよいかなと。

handler(予定)
def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2) #複数の処理 OK
    ds.on_modify.append(after_modify)


    ds.dispatch()
    return True

ハンドラを登録してdispatchする感じ。

ざっくり作ってみる

とりあえず当初にきめたlambda_handlerに書きたい内容を実装してみた。

lambda_function.py
from __future__ import print_function

import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()

print('Loading function')


class DeRecord:
    ## Itemをデシリアライズしたもの
    def __init__(self, rec):
        self.event_name = rec['eventName']
        self.old = self._desi(rec['dynamodb'].get('OldImage'))
        self.new = self._desi(rec['dynamodb'].get('NewImage'))

    def _desi(self, image):
        d = {}
        if image:
            for key in image:
                d[key] = deser.deserialize(image[key])
        return d


class DynamoStreamDispatcher:
    def __init__(self, event):
        self.on_insert = []
        self.on_remove = []
        self.on_modify = []
        self.records   = []
        for r in event['Records']:
            # レコードはdictに加工しちゃう。
            self.records.append(DeRecord(r))

        self.raw = event

    def dispatch(self):
        """
        synced dispatcher
        """
        results = []
        for r in self.records:
            try:
                for runner in getattr(self, 'on_' + r.event_name.lower()):
                    results.append(runner(r))
            except AttributeError:
                print("Unknown event " + r.event_name)
                continue

        return results


## ここから、個別Lambda用処理の関数サンプル。引数は加工済みのレコード
def after_remove1(rec):
    print("deleted")
    return None

def after_remove2(rec):
    print(rec.old)
    return None


def after_modify(rec):
    print("key updated...")
    print(rec.old['Message'])
    print(rec.new['Message'])
    return None


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name))
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2)
    ds.on_modify.append(after_modify)

    ds.dispatch()
    return True

Sample event templateからDynamoDB Update(※末尾に付属)を流してみる。

START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3

登録した関数がそれぞれ実行されてOK。

ほぼ自分用ライブラリだけど、PyPIに似たようなのがなければ登録して使いまわそうかな。
あとは差分とかをうまいこと格納したいね。


付録: DynamoDB Updateのサンプルイベント

{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "3",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 38,
        "SequenceNumber": "333",
        "OldImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "REMOVE",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    }
  ]
}

参考:

続きを読む

AWS メンテナンス対象を抽出するスクリプト

  • redmine 貼り付け用
  • region は適宜
#! /bin/bash
echo "|_. Name|_. ID|_. Event|_. Start|_. End|";aws ec2 describe-instance-status --region us-east-1 --filters "Name=event.code,Values=*" --query 'sort_by(InstanceStatuses[].[InstanceId,Events[0].Code,Events[0].NotBefore,Events[0].NotAfter],&[2])' --output text|xargs -I{} bash -c 'line="{}";id=$(echo $line|cut -d" " -f1);name=$(aws ec2 describe-instances --instance-ids $id --query "Reservations[].Instances[].[Tags[?Key==`Name`].Value|[0]]" --output text);echo -e "|$name|${line//     /|}|"'

続きを読む