PyQSでお気軽Task Queue構築

PythonでQueue制御をする場合、Celeryなどが定番ですが、Amazon SQSを使ったPyQSというのを見つけたのでちょっと使ってみました。READMEとは微妙に挙動が異なるので苦労しましたが、非常にお気軽なTask Queue構築が可能なのはありがたいです。

設置

設置は簡単でpipコマンド一発です。
pip install pyqs

環境変数

以下の環境変数が必要です。

  • AWS_ACCESS_KEY_ID AWSS_SECRET_ACCESS_KEY…Amazon SQSへのRead/Write権限を備えていること
  • PYTHONPATH…Queueで実行するPythonスクリプトを参照できるようにしておきます

タスク

pyqsからデコレーターtaskをインポートして、Queueに登録したい関数に載せます。

qqq/tasks.py
from pyqs import task


@task('queue0')
def another_task(message):
    print "another_task: message={}".format(message)


@task('queue0')
def send_email(subject):
    print "send_email: subject={}".format(subject)

Queueへの登録

add_queue.py
from qqq.tasks import another_task, send_email
from settings import config

for i in range(0, 100):
    send_email.delay(subject='hogehoge')
    another_task.delay(message='hogehogehoge')

Workerの起動

run_queue.sh
#! /bin/bash

export PYTHONPATH=`pwd`
export QUEUE='queue0'
pyqs $QUEUE

GithubのREADMEによればQueue名は「queue0.tasks.send_email」とか「queue.tasks.another_task」になるはずですが、実際のSQSはQueue名にピリオドは許してくれないので、@taskデコレータで宣言したqueue0がQueue名になります。ここを理解するまでに2時間くらいソース追いかけましたよ。

まとめ

自力でメッセージブローカーを設定しなくても、Amazon SQSへのアクセス権があれば即Task Queueを作れるのは魅力的です。あとSQSはタダ同然なのもありがたいですね。

 Github

ソースは こちら

続きを読む

JAWS DAYS 2017に参加してきた話(前編)

JAWS DAYS 2017に参加してきた!

2017/03/11(土)にTOC五反田メッセで開催されたJAWS DAYS 2017
ボランティアスタッフとして参加してきました。

≪はじめに≫
セッションの内容やスライドの紹介は、他の方もたくさん記事にされているので
そういった内容をお求めの方はコチラから辿ると良いかと。
講演で使われたスライドはコチラにまとめられていました。

≪スカラーシップでの参加≫
参加の経緯は、以前から交流のあったJAWS-UG札幌支部の方から
「今年からスカラーシッププログラムを始めるのだけど、推薦するから北海道の支部選抜として参加しない?」
と、声を掛けて頂いたことがキッカケでした。

なんでもスカラーシッププログラムで参加すると
交通費・宿泊費・当日の参加費をJAWS-UGで負担して頂けるとのこと!
参加条件は

  • 九州、沖縄、中国四国、北陸、東北、北海道のいずれかの支部に所属していること
  • 事前、当日スタッフとしてJAWS DAYSを盛り上げること
  • 3/10~3/12に必ずイベントに参加できること
  • JAWS-UGの支部運営メンバーによる推薦があること
  • 会社の経費等ではなく、あくまでプライベートで参加する方
  • イベント参加後にブログレポートを書くこと (これがソレです)

等々、他にも細々とした条件がありますが
私のように地方に住んでいて、東京のイベントまでなかなか行くことの出来ない人には願ってもない条件です。
えぇ、飛び付きましたとも。

スカラーシッププログラムの目的は

  • JAWS-UGを運営する次世代のヒーローを全国から発掘する
  • JAWS-UGが開催する最大規模のラーニングイベントの熱量を一人でも多くのメンバーと共有する仕組みを作り、コミュニティの成長の可能性を広げる

とのことですが、「次世代のヒーロー」と呼ばれるには結構年いってんだけど大丈夫かなという不安を抱きつつも
JAWS DAYS自体参加したことが無かったこともあり、誘惑に負けて申込書に記入してしまいました。
今年の一枠はオッサンにください。ゴメンナサイ。

他の地域からの選抜メンバーはコチラで紹介されてました。
私の他にも東北、中国・四国、九州からもスカラーシップ参加者がいるようです。
お会いするのが楽しみです。

で、スカラーシップでの参加が決まってから言われたのですが
「当日4分のLTよろしくね☆」とのこと。
まじかよ聞いてねーよとも言えず、大人しく準備をすることにします。
(LTあまり得意じゃないんです。。。)

JAWS DAYS 2017前日(pre-day)

そんなこんながありつつも前日(3/10)を迎えます。
この日は、JAWS DAYSを前日に控えてアマゾン ジャパン本社のセミナールームに
各支部のメンバー(参加出来る方のみ)が集まりました。

明日の打ち合わせとかするんだろうなぁと思ってたんですが
そんなことは全くなく普通に飲み会でした。
マジか。
真面目な私はメモ帳とか用意してたんですが・・・(笑)

このpre-dayには台湾、韓国、シンガポールのUGの方々も参加した交流会を兼ねたもので
ビールやピザを片手に和やかなムードで場は進みます。

20170310-P3100008.jpg

この中でスカラーシップ参加者の自己紹介のターンがあったのですが
海外UGの方々も参加していることもあり、「英語でお願いします」と無茶振り・・・
そこはスカラーシップ最年長参加者のこの私、軽やかなスルーで思い切り日本語で自己紹介してしまいます。
ホントごめんなさい(私信)

最後は皆で記念撮影!
P8680577.jpg

この後、二次会もあったのですが
翌日のLTの準備が終わっていない私は泣く泣く離脱・・・

≪pre-dayの感想≫
Amazonのエレベーターかっけー(オイ)
Image_09cae74.jpg

JAWS DAYS 2017当日

P_20170311_102606.jpg

当日は朝8時に会場に集合し、会場設営の準備から始まります。
JAWS DAYSの参加自体が初めてな私は、勝手がわからず右往左往してしまうこともありましたが
何とか最低限のお手伝いは出来たかな?

DSC_9134.jpg

F01_8226.jpg

IMG_0698.jpg

このJAWS DAYSですが、全国のJAWS-UG支部からボランティアでメンバーが集まり運営されているイベントで
ボランティアスタッフだけでも100名を超えるとのこと。
改めてすごいイベントだなぁと思います。
国内で他に無い規模じゃないですかね?(少なくとも私は聞いたことないです)

個人的にイイネ!と思ったのは託児ルームでしたね。
私も小さい子供がいるので、東京に住んでたら利用してたと思います。
しかもちゃんとプロの方が常駐して対応してくれるのです。

P_20170311_102829.jpg

これも今年からの試みだったそうですが、是非続けて欲しいなと思いました。

長くなってきたので、いったん切ります。
後編はコチラをご覧ください!

続きを読む

DynamoDB StreamをLambda(Python)で処理する時の共通事前処理を考える

AWSリソース同士の連携でLambdaをつかうのは便利だけどeventが毎度複雑でたいへん。
今回はDynamoDB StreamをPythonで拾うとき、主要な処理以外をなんとか簡潔に書けるようにしたいチャレンジ。

DynamoDB ストリーム と AWS Lambda のトリガー – Amazon DynamoDB

共通処理の仕様

このあたりの処理を使いまわせば楽になりそう。

  • 対応する関数(lambda含む)を登録して、レコードごとに実行したい

    • 呼ばれる側はコンテキスト決め打ちでよいので、分岐とか不要
  • Itemのデータを取り出しやすくする
    • ついでにDynamoおなじみの型付きでくるので、PythonのDictに変換する
  • (Option) 例外処理

で、エントリーポイントは、こんな風に書けたらよいかなと。

handler(予定)
def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name)) # lambda OK
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2) #複数の処理 OK
    ds.on_modify.append(after_modify)


    ds.dispatch()
    return True

ハンドラを登録してdispatchする感じ。

ざっくり作ってみる

とりあえず当初にきめたlambda_handlerに書きたい内容を実装してみた。

lambda_function.py
from __future__ import print_function

import json
from boto3.dynamodb.types import TypeDeserializer
deser = TypeDeserializer()

print('Loading function')


class DeRecord:
    ## Itemをデシリアライズしたもの
    def __init__(self, rec):
        self.event_name = rec['eventName']
        self.old = self._desi(rec['dynamodb'].get('OldImage'))
        self.new = self._desi(rec['dynamodb'].get('NewImage'))

    def _desi(self, image):
        d = {}
        if image:
            for key in image:
                d[key] = deser.deserialize(image[key])
        return d


class DynamoStreamDispatcher:
    def __init__(self, event):
        self.on_insert = []
        self.on_remove = []
        self.on_modify = []
        self.records   = []
        for r in event['Records']:
            # レコードはdictに加工しちゃう。
            self.records.append(DeRecord(r))

        self.raw = event

    def dispatch(self):
        """
        synced dispatcher
        """
        results = []
        for r in self.records:
            try:
                for runner in getattr(self, 'on_' + r.event_name.lower()):
                    results.append(runner(r))
            except AttributeError:
                print("Unknown event " + r.event_name)
                continue

        return results


## ここから、個別Lambda用処理の関数サンプル。引数は加工済みのレコード
def after_remove1(rec):
    print("deleted")
    return None

def after_remove2(rec):
    print(rec.old)
    return None


def after_modify(rec):
    print("key updated...")
    print(rec.old['Message'])
    print(rec.new['Message'])
    return None


def lambda_handler(event, context):
    ds = DynamoStreamDispatcher(event)
    ds.on_insert.append(lambda rec: print(rec.event_name))
    ds.on_remove.append(after_remove1)
    ds.on_remove.append(after_remove2)
    ds.on_modify.append(after_modify)

    ds.dispatch()
    return True

Sample event templateからDynamoDB Update(※末尾に付属)を流してみる。

START RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3 Version: $LATEST
INSERT
key updated...
New item!
This item has changed
deleted
{u'Message': u'This item has changed', u'Id': Decimal('101')}
END RequestId: 6ed79996-0ecc-11e7-8985-db0ca21254c3

登録した関数がそれぞれ実行されてOK。

ほぼ自分用ライブラリだけど、PyPIに似たようなのがなければ登録して使いまわそうかな。
あとは差分とかをうまいこと格納したいね。


付録: DynamoDB Updateのサンプルイベント

{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "3",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 38,
        "SequenceNumber": "333",
        "OldImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "REMOVE",
      "eventSourceARN": "arn:aws:dynamodb:us-west-2:account-id:table/ExampleTableWithStream/stream/2015-06-27T00:48:05.899",
      "eventSource": "aws:dynamodb"
    }
  ]
}

参考:

続きを読む

[JAWS-UG CLI] StepFunctions #6 ステートマシンの作成 (RetryStateMachine)

前提条件

StepFunctionsへの権限

StepFunctionsに対してフル権限があること。

AWS CLIのバージョン

以下のバージョンで動作確認済

  • AWS CLI 1.11.63
コマンド
aws --version

結果(例):

  aws-cli/1.11.63 Python/2.7.10 Darwin/15.6.0 botocore/1.5.26

バージョンが古い場合は最新版に更新しましょう。

コマンド
sudo -H pip install -U awscli

0. 準備

まず変数の確認をします。

変数の確認
cat << ETX

        AWS_DEFAULT_PROFILE: (0.1) ${AWS_DEFAULT_PROFILE}
        AWS_DEFAULT_REGION:  (0.2) ${AWS_DEFAULT_REGION}
        DIR_CONF:            (0.3) ${DIR_CONF}
        IAM_ROLE_ARN:        (0.4) ${IAM_ROLE_ARN}
        LAMBDA_FUNC_ARN:     (0.5) ${LAMBDA_FUNC_ARN}

ETX

結果(例):

  AWS_DEFAULT_PROFILE: (0.1) stepfunctionsas_full-prjZ-mbp13
  AWS_DEFAULT_REGION:  (0.2) ap-northeast-1
  DIR_CONF:            (0.3) conf-stepfunctions
  IAM_ROLE_ARN:        (0.4) arn:aws:iam::XXXXXXXXXXXX:role/states-lambda-role
  LAMBDA_FUNC_ARN:     (0.5) arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:FailFunction

変数が入っていない、適切でない場合は、それぞれの手順番号について作業を
行います。

0.1. プロファイルの指定

プロファイルの一覧を確認します。

コマンド
cat ~/.aws/credentials \
       | grep '\[' \
       | sed 's/\[//g' | sed 's/\]//g'

結果(例):

  iamFull-prjz-mbpr13
  stepfunctionsas_full-prjZ-mbp13
変数の設定
export AWS_DEFAULT_PROFILE='stepfunctionsas_full-prjZ-mbp13'

0.2. リージョンの指定

変数の設定
export AWS_DEFAULT_REGION='ap-northeast-1'

0.3. 設定ファイル用ディレクトリの指定

変数の設定
DIR_CONF='conf-stepfunctions'
コマンド
mkdir -p ${DIR_CONF}

0.4. IAMロールの指定

IAMロールを指定します。

変数の設定
IAM_ROLE_NAME='states-lambda-role'

ARNを取得します。

コマンド
IAM_ROLE_ARN=$( \
        aws iam get-role \
          --role-name ${IAM_ROLE_NAME} \
          --query 'Role.Arn' \
          --output text \
) \
        && echo ${IAM_ROLE_ARN}

結果(例):

  arn:aws:iam::XXXXXXXXXXXX:role/states-lambda-role

0.5. Lambda関数の指定

Lambda関数を指定します。

変数の設定
LAMBDA_FUNC_NAME='FailFunction'

ARNを取得します。

コマンド
LAMBDA_FUNC_ARN=$( \
        aws lambda get-function \
          --function-name ${LAMBDA_FUNC_NAME} \
          --query 'Configuration.FunctionArn' \
          --output text \
) \
        && echo ${LAMBDA_FUNC_ARN}

結果(例):

  arn:aws:lambda:ap-northeast-1:XXXXXXXXXXXX:function:FailFunction

1. 事前作業

1.1. ステートマシン名の決定

変数の設定
STEPF_STATEM_NAME='RetryStateMachine'

同名のステートマシンが存在しないことを確認します。

コマンド
aws stepfunctions list-state-machines \
        --query "stateMachines[?name == \`${STEPF_STATEM_NAME}\`]"

結果(例):

  []

1.2. ステートマシン定義ファイルの作成

変数の設定
FILE_INPUT="${STEPF_STATEM_NAME}.json" \
        && echo ${FILE_INPUT}
コマンド
cat << EOF > ${FILE_INPUT}
{
         "Comment": "A Catch example of the Amazon States Language using an AWS Lambda Function",
         "StartAt": "CreateAccount",
         "States": {
            "CreateAccount": {
               "Type": "Task",
               "Resource": "${LAMBDA_FUNC_ARN}",
               "Catch": [ {
                  "ErrorEquals": ["AccountAlreadyExistsError"],
                  "Next": "CustomErrorFallback"
               }, {
                  "ErrorEquals": ["States.TaskFailed"],
                  "Next": "ReservedTypeFallback"
               }, {
                  "ErrorEquals": ["States.ALL"],
                  "Next": "CatchAllFallback"
               } ],
              "End": true
            },
            "CustomErrorFallback": {
               "Type": "Pass",
               "Result": "This is a fallback from a custom Lambda function exception",
               "End": true
            },
            "ReservedTypeFallback": {
               "Type": "Pass",
               "Result": "This is a fallback from a reserved error code",
               "End": true
            },
            "CatchAllFallback": {
               "Type": "Pass",
               "Result": "This is a fallback from any error code",
               "End": true
            }
         }
}
EOF

cat ${FILE_INPUT}

JSONファイルを作成したら、フォーマットが壊れてないか必ず確認します。

コマンド
jsonlint -q ${FILE_INPUT}

エラーが出力されなければOKです。

2. ステートマシンの作成

変数の確認
cat << ETX

        STEPF_STATEM_NAME: ${STEPF_STATEM_NAME}
        FILE_INPUT:        ${FILE_INPUT}
        IAM_ROLE_ARN:      ${IAM_ROLE_ARN}

ETX
コマンド
aws stepfunctions create-state-machine \
        --name ${STEPF_STATEM_NAME} \
        --definition file://${FILE_INPUT} \
        --role-arn ${IAM_ROLE_ARN}

結果():

      {
        "creationDate": 1489740033.593,
        "stateMachineArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:stateMachine:RetryStateMachine"
      }

3. 事後作業

3.1. ARNの取得

コマンド
STEPF_STATEM_ARN=$( \
        aws stepfunctions list-state-machines \
          --query "stateMachines[?name == \`${STEPF_STATEM_NAME}\`]".stateMachineArn \
          --output text \
) \
        && echo ${STEPF_STATEM_ARN}

結果(例):

  arn:aws:states:ap-northeast-1:XXXXXXXXXXXX:stateMachine:RetryStateMachine

3.2. ステートマシンのステータス確認

コマンド
STEPF_STATEM_STATUS=$( \
        aws stepfunctions describe-state-machine \
          --state-machine-arn ${STEPF_STATEM_ARN} \
          --query "status" \
          --output text \
) \
        && echo ${STEPF_STATEM_STATUS}

結果(例):

  ACTIVE

3.3. ステートマシンの確認

コマンド
aws stepfunctions describe-state-machine \
        --state-machine-arn ${STEPF_STATEM_ARN}

結果(例):

  (yet)

完了

続きを読む