DynamoDBの予約語一覧を無理やり動的に取得してみた

DynamoDBの予約語を対処するために一覧が欲しくなったのですが、いちいちプログラム内で定義したくなかったので、AWSの予約語一覧ページから一覧を取得するサービスを作りました。

前提

  1. AWSが公開しているDynamoDB予約語一覧のWebページ( http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ReservedWords.html )をスクレイピングして予約語一覧を抽出します。
  2. 実装はAWS Lambda (Python3)、Webサービスとして動作させるためにAPI Gatewayを利用します。

結果

https://github.com/kojiisd/dynamodb-reserved-words

DynamoDB予約語一覧ページの確認

今回は以下のページをParseしたいと思います。

http://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/ReservedWords.html

HTMLを見てみると、Parseしたい箇所はcodeタグで囲まれているようで、しかもこのページ内でcodeタグが出現するのは1度だけのようです。

スクリーンショット 2017-10-15 18.03.18.png

これであればすぐにパースできそうです。

HTMLの取得とParse

BeautifulSoupを利用すれば、簡単に実装ができます。BeautifulSoupのインストールは他のページでいくらでも紹介されているので、ここでは省略します。

import sys
import os


sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'lib'))
from bs4 import BeautifulSoup
import requests

URL = os.environ['TARGET_URL']

def run(event, context):

    response = requests.get(URL)
    soup = BeautifulSoup(response.content, "html.parser")

    result_tmp = soup.find("code")
    if result_tmp == None:
        return "No parsing target"

    result = result_tmp.string

    return result.split('n');

とりあえず申し訳程度にパースできない場合(AWSのページがcodeタグを使わなくなった時)を想定してハンドリングしています。

API Gatewayを定義してサービス化

コードが書けてしまえばAPI Gatewayを定義してサービス化するだけです。

設定は非常に単純なので割愛で。

スクリーンショット 2017-10-15 18.12.11_deco.png

アクセスしてみる

実際にアクセスしてみます。

$ curl -X GET https://xxxxxxxxxxx.execute-api.us-east-1.amazonaws.com/prod
["", "ABORT", "ABSOLUTE", "ACTION", "ADD", "AFTER", "AGENT", "AGGREGATE", "ALL", "ALLOCATE", "ALTER", "ANALYZE", "AND", "ANY", "ARCHIVE", "ARE", "ARRAY", "AS", "ASC", "ASCII", "ASENSITIVE", "ASSERTION", "ASYMMETRIC", "AT", "ATOMIC", "ATTACH", "ATTRIBUTE", "AUTH", "AUTHORIZATION", "AUTHORIZE", "AUTO", "AVG", "BACK", "BACKUP", "BASE", "BATCH", "BEFORE", "BEGIN", "BETWEEN", "BIGINT", "BINARY", "BIT", "BLOB", "BLOCK", "BOOLEAN", "BOTH", "BREADTH", "BUCKET", "BULK", "BY", "BYTE", "CALL", "CALLED", "CALLING", "CAPACITY", "CASCADE", "CASCADED", "CASE", "CAST", "CATALOG", "CHAR", "CHARACTER", "CHECK", "CLASS", "CLOB", "CLOSE", "CLUSTER", "CLUSTERED", "CLUSTERING", "CLUSTERS", "COALESCE", "COLLATE", "COLLATION", "COLLECTION", "COLUMN", "COLUMNS", "COMBINE", "COMMENT", "COMMIT", "COMPACT", "COMPILE", "COMPRESS", "CONDITION", "CONFLICT", "CONNECT", "CONNECTION", "CONSISTENCY", "CONSISTENT", "CONSTRAINT", "CONSTRAINTS", "CONSTRUCTOR", "CONSUMED", "CONTINUE", "CONVERT", "COPY", "CORRESPONDING", "COUNT", "COUNTER", "CREATE", "CROSS", "CUBE", "CURRENT", "CURSOR", "CYCLE", "DATA", "DATABASE", "DATE", "DATETIME", "DAY", "DEALLOCATE", "DEC", "DECIMAL", "DECLARE", "DEFAULT", "DEFERRABLE", "DEFERRED", "DEFINE", "DEFINED", "DEFINITION", "DELETE", "DELIMITED", "DEPTH", "DEREF", "DESC", "DESCRIBE", "DESCRIPTOR", "DETACH", "DETERMINISTIC", "DIAGNOSTICS", "DIRECTORIES", "DISABLE", "DISCONNECT", "DISTINCT", "DISTRIBUTE", "DO", "DOMAIN", "DOUBLE", "DROP", "DUMP", "DURATION", "DYNAMIC", "EACH", "ELEMENT", "ELSE", "ELSEIF", "EMPTY", "ENABLE", "END", "EQUAL", "EQUALS", "ERROR", "ESCAPE", "ESCAPED", "EVAL", "EVALUATE", "EXCEEDED", "EXCEPT", "EXCEPTION", "EXCEPTIONS", "EXCLUSIVE", "EXEC", "EXECUTE", "EXISTS", "EXIT", "EXPLAIN", "EXPLODE", "EXPORT", "EXPRESSION", "EXTENDED", "EXTERNAL", "EXTRACT", "FAIL", "FALSE", "FAMILY", "FETCH", "FIELDS", "FILE", "FILTER", "FILTERING", "FINAL", "FINISH", "FIRST", "FIXED", "FLATTERN", "FLOAT", "FOR", "FORCE", "FOREIGN", "FORMAT", "FORWARD", "FOUND", "FREE", "FROM", "FULL", "FUNCTION", "FUNCTIONS", "GENERAL", "GENERATE", "GET", "GLOB", "GLOBAL", "GO", "GOTO", "GRANT", "GREATER", "GROUP", "GROUPING", "HANDLER", "HASH", "HAVE", "HAVING", "HEAP", "HIDDEN", "HOLD", "HOUR", "IDENTIFIED", "IDENTITY", "IF", "IGNORE", "IMMEDIATE", "IMPORT", "IN", "INCLUDING", "INCLUSIVE", "INCREMENT", "INCREMENTAL", "INDEX", "INDEXED", "INDEXES", "INDICATOR", "INFINITE", "INITIALLY", "INLINE", "INNER", "INNTER", "INOUT", "INPUT", "INSENSITIVE", "INSERT", "INSTEAD", "INT", "INTEGER", "INTERSECT", "INTERVAL", "INTO", "INVALIDATE", "IS", "ISOLATION", "ITEM", "ITEMS", "ITERATE", "JOIN", "KEY", "KEYS", "LAG", "LANGUAGE", "LARGE", "LAST", "LATERAL", "LEAD", "LEADING", "LEAVE", "LEFT", "LENGTH", "LESS", "LEVEL", "LIKE", "LIMIT", "LIMITED", "LINES", "LIST", "LOAD", "LOCAL", "LOCALTIME", "LOCALTIMESTAMP", "LOCATION", "LOCATOR", "LOCK", "LOCKS", "LOG", "LOGED", "LONG", "LOOP", "LOWER", "MAP", "MATCH", "MATERIALIZED", "MAX", "MAXLEN", "MEMBER", "MERGE", "METHOD", "METRICS", "MIN", "MINUS", "MINUTE", "MISSING", "MOD", "MODE", "MODIFIES", "MODIFY", "MODULE", "MONTH", "MULTI", "MULTISET", "NAME", "NAMES", "NATIONAL", "NATURAL", "NCHAR", "NCLOB", "NEW", "NEXT", "NO", "NONE", "NOT", "NULL", "NULLIF", "NUMBER", "NUMERIC", "OBJECT", "OF", "OFFLINE", "OFFSET", "OLD", "ON", "ONLINE", "ONLY", "OPAQUE", "OPEN", "OPERATOR", "OPTION", "OR", "ORDER", "ORDINALITY", "OTHER", "OTHERS", "OUT", "OUTER", "OUTPUT", "OVER", "OVERLAPS", "OVERRIDE", "OWNER", "PAD", "PARALLEL", "PARAMETER", "PARAMETERS", "PARTIAL", "PARTITION", "PARTITIONED", "PARTITIONS", "PATH", "PERCENT", "PERCENTILE", "PERMISSION", "PERMISSIONS", "PIPE", "PIPELINED", "PLAN", "POOL", "POSITION", "PRECISION", "PREPARE", "PRESERVE", "PRIMARY", "PRIOR", "PRIVATE", "PRIVILEGES", "PROCEDURE", "PROCESSED", "PROJECT", "PROJECTION", "PROPERTY", "PROVISIONING", "PUBLIC", "PUT", "QUERY", "QUIT", "QUORUM", "RAISE", "RANDOM", "RANGE", "RANK", "RAW", "READ", "READS", "REAL", "REBUILD", "RECORD", "RECURSIVE", "REDUCE", "REF", "REFERENCE", "REFERENCES", "REFERENCING", "REGEXP", "REGION", "REINDEX", "RELATIVE", "RELEASE", "REMAINDER", "RENAME", "REPEAT", "REPLACE", "REQUEST", "RESET", "RESIGNAL", "RESOURCE", "RESPONSE", "RESTORE", "RESTRICT", "RESULT", "RETURN", "RETURNING", "RETURNS", "REVERSE", "REVOKE", "RIGHT", "ROLE", "ROLES", "ROLLBACK", "ROLLUP", "ROUTINE", "ROW", "ROWS", "RULE", "RULES", "SAMPLE", "SATISFIES", "SAVE", "SAVEPOINT", "SCAN", "SCHEMA", "SCOPE", "SCROLL", "SEARCH", "SECOND", "SECTION", "SEGMENT", "SEGMENTS", "SELECT", "SELF", "SEMI", "SENSITIVE", "SEPARATE", "SEQUENCE", "SERIALIZABLE", "SESSION", "SET", "SETS", "SHARD", "SHARE", "SHARED", "SHORT", "SHOW", "SIGNAL", "SIMILAR", "SIZE", "SKEWED", "SMALLINT", "SNAPSHOT", "SOME", "SOURCE", "SPACE", "SPACES", "SPARSE", "SPECIFIC", "SPECIFICTYPE", "SPLIT", "SQL", "SQLCODE", "SQLERROR", "SQLEXCEPTION", "SQLSTATE", "SQLWARNING", "START", "STATE", "STATIC", "STATUS", "STORAGE", "STORE", "STORED", "STREAM", "STRING", "STRUCT", "STYLE", "SUB", "SUBMULTISET", "SUBPARTITION", "SUBSTRING", "SUBTYPE", "SUM", "SUPER", "SYMMETRIC", "SYNONYM", "SYSTEM", "TABLE", "TABLESAMPLE", "TEMP", "TEMPORARY", "TERMINATED", "TEXT", "THAN", "THEN", "THROUGHPUT", "TIME", "TIMESTAMP", "TIMEZONE", "TINYINT", "TO", "TOKEN", "TOTAL", "TOUCH", "TRAILING", "TRANSACTION", "TRANSFORM", "TRANSLATE", "TRANSLATION", "TREAT", "TRIGGER", "TRIM", "TRUE", "TRUNCATE", "TTL", "TUPLE", "TYPE", "UNDER", "UNDO", "UNION", "UNIQUE", "UNIT", "UNKNOWN", "UNLOGGED", "UNNEST", "UNPROCESSED", "UNSIGNED", "UNTIL", "UPDATE", "UPPER", "URL", "USAGE", "USE", "USER", "USERS", "USING", "UUID", "VACUUM", "VALUE", "VALUED", "VALUES", "VARCHAR", "VARIABLE", "VARIANCE", "VARINT", "VARYING", "VIEW", "VIEWS", "VIRTUAL", "VOID", "WAIT", "WHEN", "WHENEVER", "WHERE", "WHILE", "WINDOW", "WITH", "WITHIN", "WITHOUT", "WORK", "WRAPPED", "WRITE", "YEAR", "ZONE "]

うまくできました。

まとめ

思いつきで作成してみましたが、AWSのページが閉鎖されたりHTML構造が変更されない限り、仕様変更などは気にせずに使えそうです。

あとBeautifulSoup初めて使いましたが、かなり便利。

続きを読む

サーバ死活監視→メール送付をLambda、AmazonSNSで実現する

AWSを使ってのサーバの死活監視ってよくやると思うんですが、大体問題があった時の通知の飛ばし先はSlackだったりします。(無料だし)

とはいえ仕事で使おうとすると、Slackは見れない状況もあるかもしれないと思い、メールで通知を飛ばす、という観点でAWSを利用し実現してみました。

最終的に利用できるようにしたものは以下に登録してあります。

https://github.com/kojiisd/aws-server-monitor

サーバ監視からのメール送付までの仕組み

公開されているサービスに対してREST通信を行い、そのステータスコードを見るだけのシンプルな死活監視を行います。

ただし何も考えずに実装すると、エラーが起きてから回復するまで、定期実行のたびに何度もエラーメールを投げてしまう仕組みとなるため、管理する側としてはうっとおしくもあり(何回もメールが来る)コスト的にも嬉しくありません。またエラー発生時だけメールを飛ばす仕組みとすると、復旧したかどうかがわからず、管理者はヒヤヒヤすることになります。

そこで、現在のステータスをDynamoDBで管理するようにし、状態が変更されたタイミングでメールを送る(異常発生/復旧)という仕組みにしてみました。

監視対象のサーバ情報はS3から取得してくるようにします。

まずはサーバ死活監視とメール送付部分の作成

状態の監視は後ほど加えるとして、まずはメインとなるサーバ死活監視とメール送付のLambda実装です。Python3.6で実装しています。

サーバ死活監視処理

S3から監視対象のデータを持ってくるところと、サーバ監視を実施してエラーが発生したサーバの一覧を作成する処理です。Lambda実行の際の権限の割り振り忘れに注意。

def get_target_servers():
    s3 = boto3.resource('s3')
    obj = s3.Object(BUCKET_NAME, OBJECT_NAME)
    response = obj.get()
    body = response['Body'].read()
    return body.decode('utf-8')

def check_target_servers(target_json):
    data = json.loads(target_json)
    servers = data['servers']

    error_servers = []

    for server in servers:
        name = server['name']
        url = server['url']
        try:
            res = requests.get(url)
            if res.status_code != 200:
                error_servers.append(server)
        except Exception:
            error_servers.append(server)

    if len(error_servers) == 0:
        print("Successful finished servers checking")
    else:
        response = send_error(name, url, error_servers)
        print("Error occured:")
        print(response)
        print(error_servers)

S3上には以下のようなjsonデータを配置するようにします。

{
    "servers": [
      { "name": "googlea", "url": "http://www.google.coma" },
      { "name": "googleb", "url": "http://www.google.comb" },
      { "name": "google", "url": "http://www.google.com" }
    ]
}

上記を監視対象とすると、最後以外がアクセス失敗するので、エラーメールが飛ぶことになります。
以下の値は環境変数として定義しており、Lambda実行前に定義が必要です。

変数名 内容
S3_BUCKET_NAME S3の対象バケット名
S3_OBJECT_NAME S3の対象オブジェクト名
SNS_TOPICS_NAME SNSの送付対象トピック名
DDB_TABLE_NAME DynamoDBのテーブル名

メール送付処理

とりあえずこんな感じで書けばSNSは呼び出せるので、これをカスタマイズします。

import json
import boto3

sns = boto3.client('sns')

def lambda_handler(event, context):
    sns_message = "Test email"

    topic = 'arn:aws:sns:us-east-1:<ACCOUNT_ID>:<TOPICS_NAME>'
    subject = 'Test-email'
    response = sns.publish(
        TopicArn=topic,
        Message=sns_message,
        Subject=subject
    )
    return 'Success'

カスタマイズをしてこんな感じでメソッドにして組み込みました。

def send_error(name, url, error_servers):
    sns = boto3.client('sns')
    sns_message = "Error happens:nn" + json.dumps(error_servers, indent=4, separators=(',', ': '))

    subject = '[ServerMonitor] Error happens'
    response = sns.publish(
        TopicArn=SNS_TOPICS_NAME,
        Message=sns_message,
        Subject=subject
    )

    return response

死活監視でエラーが発生すると以下のようなメールを受け取れます。内容は質素ですが、とりあえずこれで良しとします。

Error happens:

[
    {
        "name": "googlea",
        "url": "http://www.google.coma"
    },
    {
        "name": "googleb",
        "url": "http://www.google.comb"
    }
]

DynamoDBを使った状態管理

さて、このままでも監視はできますが、エラー発生と復旧がわかった方が良いので、DynamoDBを使って状態管理をし、変化があった場合にメールを送付するように改修します。

「url」をプライマリキー、「name」をソートキー(レンジキー)として「server-monitor」というテーブルを作成します。
取得と実装はこんな感じになります。これをカスタマイズして今回の処理に適用します。

import boto3
import json
from boto3.dynamodb.conditions import Key, Attr


dynamodb = boto3.resource('dynamodb')
table    = dynamodb.Table('server-monitor')

def lambda_handler(event, context):
    #add_server()
    #get_server()

    return 'Finish operation'

def get_server():

    items = table.get_item(
            Key={
                 "url": "http://www.google.com",
                 "name": "google"
            }
        )

    print(items['Item'])

def add_server():
    table.put_item(
            Item={
                 "url": "http://www.google.com",
                 "name": "google",
                 "status": True
            }
        )    

カスタマイズして組み込んだ結果はこちらになります。

def check_status(url, name):
    status_ok = True
    try:
        items = dynamodb.Table(DDB_TABLE_NAME).get_item(
                Key={
                    "url": url,
                    "name": name
                }
            )
        status_ok = items['Item']['status']
    except:
        status_ok = None
    return status_ok

def add_server(url, name, status):
    dynamodb.Table(DDB_TABLE_NAME).put_item(
        Item={
                "url": url,
                "name": name,
                "status": status
        }
    )

死活監視後のエラー判定部分も、DynamoDBから現在の状況を確認してメール通知をする処理の追加が必要になります。こんな感じです。

def check_target_servers(target_json):
    data = json.loads(target_json)
    servers = data['servers']

    status_changed_servers = []

    for server in servers:
        name = server['name']
        url = server['url']
        status_ok = check_status(url, name)
        try:
            res = requests.get(url)
            if res.status_code != 200:
                if status_ok != False:
                    server['status'] = "Error"
                    status_changed_servers.append(server)
                add_server(url, name, False)
            else:
                if status_ok == False:
                    server['status'] = "Recover"
                    status_changed_servers.append(server)
                add_server(url, name, True)
        except Exception:
            if status_ok != False:
                server['status'] = "Error"
                status_changed_servers.append(server)
            add_server(url, name, False)

    if len(status_changed_servers) == 0:
        print("Successful finished servers checking")
    else:
        response = send_error(name, url, status_changed_servers)
        print("Error occured:")
        print(response)
        print(status_changed_servers)

動作確認

今までの実装でエラーが発生した時には「Error」と、復旧した時には「Recover」という内容のメールが送信されるようになっているはずです。動作確認をしてみます。

以前作成したWebページがS3上にあるので、アクセス権限を変更してテストしてみます。

Three.jsのかっこいいサンプルとAWSを連携させてみた

OKパターンで実行してみる。。。

メールは送信されず、DynamoDBにデータだけ追加されました。

スクリーンショット 2017-09-24 19.17.32.png

さて、アクセス権限を変更して、アクセスできなくしてから再度実行してみます。無事エラーが発生してメールが飛んで来ました。状態は「Error」です。

Server Status Changed happens:

[
    {
        "name": "s3-test",
        "url": "https://s3.amazonaws.com/xxxxxx/xxxx/css3d_periodictable.html",
        "status": "Error"
    }
]

DynamoDBのデータもエラーを表す「false」にstatusカラムが変更されています。

スクリーンショット 2017-09-24 19.21.23.png

もう一度実行しても、エラーメールは飛んでこないようです。無事状態判定をしてくれています。DynamoDBの値も変化なしです。(キャプチャだけだと何もわかりませんがw)

スクリーンショット 2017-09-24 19.23.43.png

それではページを復旧してみます。またアクセス権限を元に戻してアクセス可能にし、サーバ死活監視処理を実行してみます。

Server Status Changed happens:

[
    {
        "name": "s3-test",
        "url": "https://s3.amazonaws.com/xxxxxx/xxxx/css3d_periodictable.html",
        "status": "Recover"
    }
]

無事復旧メールが届きました。これで完成です。

まとめ

サーバの死活監視を、状態管理しながら実施してみました。今回はAWSのサービスを中心に実現しましたが、おかげでトータル数時間レベルで実現できました。かかるコストも抑えつつ、実際に使えそうなものがこれくらいスピーディーに作れてしまうのは、さすがAWSのマネージドサービス、といったところです。

[おまけ]ライブラリの管理

Lambdaでデプロイパッケージを作成する際、普通に実装すると外部ライブラリをプロジェクトのルートディレクトリに置くことになるので、あまり綺麗なパッケージ構成になりません。

そこで今回、こちらのページを参考にさせてもらいながら、別ディレクトリにパッケージを配置してく方法をとりました。

# pip install -U requests -t ./lib

で、Pythonにはこういう処理を追加する。

sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'lib'))

import requests

こうするだけでlib配下のパッケージを見に行ってくれるようになりました。これはディレクトリ管理することを考えるとかなり助かりました。実際serverlessコマンドでデプロイしても、正しく動きました。

続きを読む

Three.jsのかっこいいサンプルとAWSを連携させてみた

かっこいい画面が作りたかったので、やってみた備忘録。

やりたいこと

Three.jsがとてもかっこいいんですが、値の設定が静的なので、何とか動的に値を変えつつ画面に反映できないかな、、、と。このサンプルを使ってみます。

スクリーンショット 2017-05-14 19.08.58.png

イメージとしては、この画像で異常が検出されたところを赤くアラート表示したりできないかな、と。

実際の流れの整理

Githubからサンプルを持って来て、必要なものだけ抜き出します。

異常があったら表示する、という感じにしたいので、以前投稿したロジックを使います。

AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加)

最新値を取得する中で、異常があったら対象の項目に変化をつけたいと思います。

とりあえずサンプルをAWS上で動かす

AWS上で動作させたいので、とりあえずこのサンプルをAWS上に乗せる必要があります。

  1. 必要なファイルの抽出(抽出結果などはこちらに。https://github.com/kojiisd/aws-threejs
  2. 適当な名前のバケットをS3に作成(今回は「aws-three」という名前にしました)
  3. ファイルのアップロード
  4. パーミッションの適用

パーミッションの適用は、とりあえず以下のコマンドで一括でつけました。実際の運用を考えると、もっと慎重にならないといけないところだとは思います。こちらのサイトを参考にさせてもらいました。

$ aws s3 ls --recursive s3://aws-three/ | awk '{print $4}' | xargs -I{} aws s3api put-object-acl --acl public-read --bucket aws-three --key "{}"

これで一旦サンプルがS3上で動くようになりました。

データの中身を理解する

実際にソースを読んで見ると、一つ一つのオブジェクトをどの様に格納しているかがわかります。

var table = [
    "H", "Hydrogen", "1.00794", 1, 1,
    "He", "Helium", "4.002602", 18, 1,
    "Li", "Lithium", "6.941", 1, 2,
    "Be", "Beryllium", "9.012182", 2, 2,
    :
    :

5つの要素で1セットとしてオブジェクトを表示している様です。元の実装がいいかどうかは置いておいて(^^;とりあえずこれを動的に変更できるようにします。

DBからの値をtableに反映する

データの構造自体はとても単純なので、1レコード5カラムのデータを持つテーブルを作成すれば、後々データの内容全てをDBから持ってくることも可能になりそうです。

DynamoDBで下記の様な単純なテーブルを作成します。もちろんAWS Consoleから作成可能ですが、Localで同じスキーマを作成したい場合はこんな感じで定義を流し込めばOK。

var params = {
    TableName: "test-three",
    KeySchema: [
        {
            AttributeName: "id",
            KeyType: "HASH"
        },
        {
            AttributeName: "timestamp",
            KeyType: "RANGE"
        }
    ],
    AttributeDefinitions: [
        {
            AttributeName: "id",
            AttributeType: "S"
        },
        {
            AttributeName: "timestamp",
            AttributeType: "S"
        }
    ],
    ProvisionedThroughput: {
        ReadCapacityUnits: 1,
        WriteCapacityUnits: 1
    }
};

dynamodb.createTable(params, function(err, data) {
    if (err) ppJson(err);
    else ppJson(data);
});

はい、無事出来ました。

スクリーンショット 2017-07-09 16.12.16.png

データの準備と投入

次にデータを流し込みます。こんな感じでpythonスクリプトでサクッと。実行する前に、クライアントのAWS認証設定が正しいことを確認してください。

data-insert.py
args = sys.argv

if len(args) < 4:
    print "Usage: python data-insert.py <FileName> <Region> <Table>"
    sys.exit()

dynamodb = boto3.resource('dynamodb', region_name=args[2])
table = dynamodb.Table(args[3])

if __name__ == "__main__":
    print "Data insert start."
    target = pandas.read_csv(args[1])
    for rowIndex, row in target.iterrows():
        itemDict = {}
        for col in target:
            if row[col] != None and type(row[col]) == float and math.isnan(float(row[col])) and row[col] != float('nan'):
                continue
            elif row[col] == float('inf') or row[col] == float('-inf'):
                continue
            elif type(row[col]) == float:
                itemDict[col] = Decimal(str(row[col]))
            else:
                itemDict[col] = row[col]
        print itemDict

        table.put_item(Item=itemDict)

    print "Data insert finish."

とりあえずこんなデータを用意しました。先ほどのスクリプトで投入します。

id,score,timestamp
"H",0,"2017-07-23T16:00:00"
"H",0,"2017-07-23T16:01:00"
"H",0,"2017-07-23T16:02:00"
"H",0,"2017-07-23T16:03:00"
"H",0,"2017-07-23T16:04:00"
"H",1,"2017-07-23T16:05:00"
"H",0,"2017-07-23T16:06:00"

無事投入できました。

$ python data-insert.py sample-data.csv us-east-1 test-three
Data insert start.
{'timestamp': '2017-07-23T16:00:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:01:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:02:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:03:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:04:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:05:00', 'score': 1, 'id': 'H'}
{'timestamp': '2017-07-23T16:06:00', 'score': 0, 'id': 'H'}
Data insert finish.

スクリーンショット 2017-07-23 17.33.30.png

API Gatewayの設定

絞り込み結果を得るためのLambdaを実行するRESTの口を設けます。API Gatewayを設定してPOST通信でデータが来た場合に該当のLambdaを実行するようにします。

スクリーンショット 2017-07-23 18.13.07_dec.png

画面からDynamoDBのデータを取得する

定期的に画面からDynamoDBのデータを取得します。今回はCognitoによるセキュアな通信は実装しません。他のページで色々と実装されている方がいるので、そちらを参考にして見てください。

処理の流れとしては、以下のような感じ。(S3からの取得処理が定期的に実行される)

S3 → API Gateway → Lambda → DynamoDB

API GatewayでSDK生成

API Gatewayの設定を以下にし、SDKを生成します。

  1. HTTPメソッドはPOSTを指定
  2. とりあえず認証などは今回はかけない

で、作成されたSDKを、本家のページを参考に埋め込みます。

API Gateway で生成した JavaScript SDK を使用する

以下のコードをScriptタグ部分に貼り付けました。

        <script type="text/javascript" src="../extralib/axios/dist/axios.standalone.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/rollups/hmac-sha256.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/rollups/sha256.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/components/hmac.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/components/enc-base64.js"></script>
        <script type="text/javascript" src="../extralib/url-template/url-template.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/sigV4Client.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/apiGatewayClient.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/simpleHttpClient.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/utils.js"></script>
        <script type="text/javascript" src="../extralib/apigClient.js"></script>

実際にAPI Gatewayにアクセスするスクリプトはこんな感じになります。

SDKを用いてAPI Gatewayにブラウザからアクセスしてみる

var apigClient = apigClientFactory.newClient();

var params = {
    // This is where any modeled request parameters should be added.
    // The key is the parameter name, as it is defined in the API in API Gateway.
    "Content-Type": "application/x-www-form-urlencoded"
};

var body = {

    "label_id": "id",
    "label_range": "timestamp",
    "id": [
    "H"
    ],
    "aggregator": "latest",
    "time_from": "2017-07-23T16:00:00.000",
    "time_to": "2017-07-23T16:06:00.000",
    "params": {
    "range": "timestamp"
    }
};


  var additionalParams = {
    // If there are any unmodeled query parameters or headers that must be
    //   sent with the request, add them here.
    headers: {
    },
    queryParams: {
    }
  };

  apigClient.rootPost(params, body, additionalParams)
      .then(function(result){
        // Add success callback code here.
        alert("Success");
        alert(JSON.stringify(result));
      }).catch( function(result){
        // Add error callback code here.
      });

パラメータ(body部)には「AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加)」で必要になるJSONを渡します。

API GatewayにアクセスするにはCORSの設定が必要なので、AWS Consoleから設定します。この際Resourcesで変更したものはStaging環境に再デプロイをしないと追加で設定した項目は反映されないので注意(ハマった。。。)

「Enable CORS」から設定できます。

スクリーンショット 2017-09-10 10.56.00.png

実際に画面にアクセスすると、サーバから値が取れたことがわかります。(今回はAPI KEYなどはOFFにしていますが、実際に運用するとなったら、もちろん考慮が必要です。)

スクリーンショット 2017-09-10 11.03.04.png

スクリーンショット 2017-09-10 11.03.11_deco.png

無事値が取れました。ここまでくればもう一息です。

取得した値を元に画面に変更を加える

さて、先ほど取得できた値には「score」というものが入っています。この「score」値を元に画面のコンポーネントに変化を加えます。

持ってきたサンプルコードの中で色に影響を与えているのは以下のコードになります。

for ( var i = 0; i < table.length; i += 5 ) {

    var element = document.createElement( 'div' );
    element.className = 'element';
    element.style.backgroundColor = 'rgba(0,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')';

全てのコンポーネントに対してrgbaで値を与えているので、ここを変更します。とはいえ、そのまま変更しようとしてもinit()メソッドを実行すると追加でコンポーネントが描画されてしまうので、描画後に変更が可能なように以下のようなidの埋め込みを行います。

var element = document.createElement( 'div' );
element.className = 'element';
element.id = table[ i ];                        // ここが追加したところ
element.style.backgroundColor = 'rgba(0,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')';

これらの準備を前提として、処理の流れは以下とします。

  1. DynamoDBから取ってきたjson値をオブジェクトに変換。
  2. 一旦オブジェクト配列としてidとscore値を持たせるようにする。
  3. オブジェクト配列でループ処理を実装し、その中でscore値が「0」ではないidに対して、色の変更を実施する。

今回の準備を踏まえると、上記で対象となるのは「id: H」となります。DynamoDBからデータを取得するために、rootPost呼び出し後のコールバック処理を以下のように変更します。今回色変更の処理をシュッと実装するために、jQueryを読み込ませています。

apigClient.rootPost(params, body, additionalParams)
    .then(function(result){
        var resultObjArray= new Array();
        // Add success callback code here.
        var resultJson = JSON.parse(result.data);
        for (var index = 0; index < resultJson.length; index++) {
            var resultObj = new Object();
            resultObj.id = resultJson[index].id;
            resultObj.score = resultJson[index].score
            resultObjArray[index] = resultObj;
        }

        for (var index = 0; index < resultObjArray.length; index++) {
            var resultObj = resultObjArray[index];
            if (resultObj.score != 0) {
                $('#' + resultObj.id).css('background-color', 'rgba(255,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')')
            }
        }

    }).catch( function(result){
        // Add error callback code here.
        alert("Failed");
        alert(JSON.stringify(result));
    });

DynamoDBに格納されている値(検索期間の最新)を「1」に変更して画面を更新すると、無事赤くなりました。

スクリーンショット 2017-09-10 18.58.41.png

実行タイミングを任意にするためにボタンからのクリックイベントとして実装する

このままでもいいのですが、今のままだと最初のアニメーションが終わる前に色が変わるので、色が変わった感がありません。ですので一つボタンを追加してクリックしたら画面に値を反映する、という手法を取ろうと思います。

先ほどのrootPostを呼び出すプログラムを、画面に配置したボタンのイベントとして処理させます。

<button id="getData">Get Data from DynamoDB</button>

画面にはこんな感じでボタンを配置し、jQueryでイベントを登録します。

$(document).ready(function(){
    $("#getData").click(function() {
        apigClient.rootPost(params, body, additionalParams)
        .then(function(result){
            var resultObjArray= new Array();
            // Add success callback code here.
            var resultJson = JSON.parse(result.data);
            for (var index = 0; index < resultJson.length; index++) {
                var resultObj = new Object();
                resultObj.id = resultJson[index].id;
                resultObj.score = resultJson[index].score
                resultObjArray[index] = resultObj;
            }

            for (var index = 0; index < resultObjArray.length; index++) {
                var resultObj = resultObjArray[index];
                if (resultObj.score != 0) {
                    $('#' + resultObj.id).css('background-color', 'rgba(255,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')')
                }
            }

        }).catch( function(result){
            // Add error callback code here.
            alert("Failed");
            alert(JSON.stringify(result));
        });
    });
});

これで無事画面から操作することが可能になりました。ボタンを押すといい感じに「H」の要素が赤く光っています。

スクリーンショット 2017-09-10 23.08.28.png

まとめ

もともとかっこいいと思っていたThree.jsの画面をカスタマイズして監視画面(IoTとかのデバイス監視画面)のような機能にできないものか、と実装してみましたが、なんとかここまでたどり着けました。

DynamoDBからの取得間隔やIDの指定方法など、まだまだハードコーディングな部分はありますが、土台は出来上がったのであとはちょっとのカスタマイズで実際に使えそうなところまではイメージが持てました。元々の実装の仕組みも理解できたので、条件が合えば文言を変えるなども実現できそうです。

やっぱり業務で使うものもかっこいい画面でないとね、と思うこの頃です。
(GithubのコードはAPI GatewayのSDKさえ埋め込めば動くような作りにしています)

https://github.com/kojiisd/aws-threejs

続きを読む

AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加)

以前の投稿の更新版です。前の仕組みでは一つのデータに対しての集計しかけられなかったのですが、それでは実運用上あまりに不便、と思ったので、指定した値でグルーピングできるようにしてみました。

AWS LambdaでDynamoDBから取得した値に任意の集計をかける

インプットデータのフォーマット変更

ほとんど使い方は以前のものと一緒ですが、以下の点だけ変えました。

  1. IDは配列形式([“sensor1”, “sensor2”])で指定するように仕様変更した。
  2. テーブル名を環境変数から取得するようにした。

IDは配列形式([“sensor1”, “sensor2”])で指定するように仕様変更した

こんな感じが最新のフォーマットです。

{
  "label_id": "id",
  "label_range": "timestamp",
  "id": [
    "sensor1",
    "sensor2"
  ],
  "aggregator": "latest",
  "time_from": "2017-04-30T22:00:00.000",
  "time_to": "2017-04-30T22:06:00.000",
  "params": {
    "range": "timestamp"
  }
}

IDの部分を配列にしました。こうすることで指定したIDの最新値を取得することが可能になります。
戻り値はこんな感じになります。

"[{"timestamp": "2017-04-30T22:05:00.000", "score": 0.0, "id": "sensor1"}, {"timestamp": "2017-04-30T22:06:00.000", "score": 1.0, "id": "sensor2"}]"

ちなみにDynamoDBにはこんな感じの値を用意していました。

スクリーンショット 2017-07-19 18.12.43.png

テーブル名を環境変数から取得するようにした

そのまんまです。 handler.pyの os.environ['TABLE'] の部分です。Lambda実行時に環境変数をこんな感じで指定してください。

スクリーンショット 2017-07-19 18.13.46.png

handler.py
import sys
import boto3
import json
import decimal
import os
from boto3.dynamodb.conditions import Key

from aggregator.lambda_aggregator import LambdaAggregator
from aggregator.latest_aggregator import LatestAggregator
from aggregator.max_aggregator import MaxAggregator
from aggregator.min_aggregator import MinAggregator
from aggregator.sum_aggregator import SumAggregator
from aggregator.avg_aggregator import AvgAggregator
from aggregator.count_aggregator import CountAggregator

dynamodb = boto3.resource('dynamodb')
table    = dynamodb.Table(os.environ['TABLE'])

aggregator_map = {}
aggregator_map['latest'] = LatestAggregator()
aggregator_map['max'] = MaxAggregator()
aggregator_map['min'] = MinAggregator()
aggregator_map['sum'] = SumAggregator()
aggregator_map['avg'] = AvgAggregator()
aggregator_map['count'] = CountAggregator()

def run(event, context):
    check_params(event)
    result = []

    for id in event['id']:
        res = table.query(
                KeyConditionExpression=Key(event['label_id']).eq(id) & Key(event['label_range']).between(event['time_from'], event['time_to']),
                ScanIndexForward=False
            )

        return_response = aggregator_map[event['aggregator']].aggregate(res['Items'], event['params'])
        result.append(return_response)

    return json.dumps(result, default=decimal_default)

def decimal_default(obj):
    if isinstance(obj, decimal.Decimal):
        return float(obj)
    raise TypeError

def check_params(params):
    if 'label_id' not in params or 'label_range' not in params or 'id' not in params or 'aggregator' not in params or 'time_from' not in params or 'time_to' not in params or 'params' not in params:
        sys.stderr.write("Parameters for label_id, label_range, id, aggregator, time_from, time_to and params are needed.")
        sys.exit()

ソース

こちらにコミットしています。(以前のものを更新)

https://github.com/kojiisd/lambda-dynamodb-aggregator

続きを読む

AWS LambdaからKintoneアプリケーションのレコードを更新してみる

AWS LambdaからKintoneのアプリケーションレコードを更新してみたのでその備忘録。

前提

  1. AWS Lambda Pythonを使います。
  2. LambdaはVPC内には配置しません。
  3. Kitoneのアプリケーションにはデフォルトで用意されている「備品在庫管理」を利用します。

Kintone側の設定

スクリーンショット 2017-05-28 17.05.06.png

手動で登録したこのレコード(ボールペン)をAWS Lambdaから変更したいと思います。

基本的にはこちらのサイトを参考にさせてもらいました。とてもわかりやすかったです。バージョンの違いなのか若干書き方が異なるところがあったりしましたが、なんとかなりました。

http://www.yamamanx.com/kintone-python-query-check/

Lambdaの実装

以下のような実装になります。一応メソッドによって処理を変更できるような作りにはしてはみましたが、実運用で使うならきちんと設計したほうが良いですね。

def kintone_operator(event, context):
    KINTONE_URL = "https://{kintone_domain}/k/v1/record.json"
    url = KINTONE_URL.format(
        kintone_domain=os.environ['KINTONE_DOMAIN'],
        kintone_app=os.environ['KINTONE_APP']
    )
    headers_key = os.environ['KINTONE_HEADERS_KEY']
    api_key = os.environ['KINTONE_API_KEY']
    basic_headers_key = os.environ['KINTONE_BASIC_HEADERS_KEY']
    basic_headers_value =os.environ['KINTONE_BASIC_HEADERS_VALUE']

    headers = {headers_key: api_key}
    headers["Content-Type"] = "application/json"
    if basic_headers_value != '':
        headers[basic_headers_key] = basic_headers_value

    if event['api'] == 'PUT':
        result = put_record(headers, url, event['data'])
    else:
        result = 'NO KEY'

    return result

def put_record(headers, url, data):
    response_record = requests.put(url, json=data, headers=headers)
    record_data = json.loads(response_record.text)

    return record_data

全て大文字の変数は環境変数としてLambdaで定義しました。API Keyなどを公開しなくて済むので非常に楽です。

実行時には以下のようなデータを送付しました。

{
  "api": "PUT",
  "data": {
      "app": 498,
      "id": 1,
      "record": {
          "文字列__1行_": {
              "value": "シャープペンシル"
          }
      }
  }
}

で、実行してみたところ、無事更新されたようです。

スクリーンショット 2017-06-06 19.45.48.png

まとめ

いろいろなサイトをベースにさせてもらい、更新機能を作ってみました。うまくAPI Gatewayやら他の機能やらと連携できれば、KintoneとAWSでどんどんとできることが増えてきますね。

続きを読む

AWS LambdaでDynamoDBから取得した値に任意の集計をかける

前回書いた記事「AWS LambdaでDynamoDBから取得した値の最新レコードを取得する」を、以下の集計にも対応させてみました。

  • 最新値 (latest)
  • 最大値 (max)
  • 最小値 (min)
  • 平均値 (avg)
  • 合計値 (sum)
  • 件数 (count)

前提&仕様

以下を前提&仕様として作成しました。

  1. DynamoDBのハッシュキーにID(文字列)、レンジキーに時刻(文字列)を指定しているものとする。
  2. データは時系列に格納されているものとする。
  3. 指定したカラムの集計を取るようにする。
  4. 集計の時間を指定できるようにする。
  5. 集計の種類は指定可能とする。
  6. ID、集計期間、集計対象のカラム、集計の種類はそれぞれJSONのインプットで指定可能とする。

結果

こちらに登録してあります。

https://github.com/kojiisd/lambda-dynamodb-aggregator

サンプルデータ

以下のデータをDynamoDBに入れました。これについて色々と操作をしようと思います。

スクリーンショット 2017-05-03 11.49.57.png

使い方

インプットデータの準備

以下のインプットデータが必要になります。

カラム名 内容
label_id DynamoDBで指定しているハッシュキー名
label_range DynamoDBで指定しているレンジキー名
id 集計したいIDの値
aggregator 集計種別。種別は最初に書いた6種類に対応
time_from 集計対象期間(開始)
time_to 集計対象期間(終了)
params [個別]集計時必要になるパラメータ

インプットデータの準備(個別部)

上述の「params」に該当する部分は以下の様なパラメータの準備が必要になります。

集計種別 必要な値
最新値(latest) range: レンジキー名(共通部で指定していますが、一応他に体裁を合わせました)
最大値(max) score: 集計対象のカラム名
最小値(min) score: 集計対象のカラム名
平均値(avg) score: 集計対象のカラム名
合計値(sum) score: 集計対象のカラム名
件数(count) score: 集計対象のカラム名

こんな感じで指定すれば実行可能になります。

とりあえず実行してみる

とりあえず動かしてみたい人は以下の様な感じで実行すれば結果が見れます。今回はSERVERLESS FRAMEWORK上で動かしています。

「sensor1」に対して指定した時間の中での:

最新値(latest)
$ sls invoke local -f run -d '{"label_id": "id", "label_range": "timestamp", "id": "sensor1", "aggregator": "latest", "time_from": "2017-04-30T22:00:00.000", "time_to": "2017-04-30T22:05:00.000", "params": {"range": "timestamp"}}'
"{"timestamp": "2017-04-30T22:05:00.000", "score": 0.0, "id": "sensor1"}"
最大値(max)
$ sls invoke local -f run -d '{"label_id": "id", "label_range": "timestamp", "id": "sensor1", "aggregator": "max", "time_from": "2017-04-30T22:00:00.000", "time_to": "2017-04-30T22:05:00.000", "params": {"score": "score"}}'
"{"timestamp": "2017-04-30T22:04:00.000", "score": 1.0, "id": "sensor1"}"

(同じ値がヒットした場合は新しい方のデータを取得する様にしています。)

最小値(min)
$ sls invoke local -f run -d '{"label_id": "id", "label_range": "timestamp", "id": "sensor1", "aggregator": "min", "time_from": "2017-04-30T22:00:00.000", "time_to": "2017-04-30T22:05:00.000", "params": {"score": "score"}}'
"{"timestamp": "2017-04-30T22:05:00.000", "score": 0.0, "id": "sensor1"}"
平均値(avg)
$ sls invoke local -f run -d '{"label_id": "id", "label_range": "timestamp", "id": "sensor1", "aggregator": "avg", "time_from": "2017-04-30T22:00:00.000", "time_to": "2017-04-30T22:05:00.000", "params": {"score": "score"}}'
"0.3333333333333333"
合計値(sum)
$ sls invoke local -f run -d '{"label_id": "id", "label_range": "timestamp", "id": "sensor1", "aggregator": "sum", "time_from": "2017-04-30T22:00:00.000", "time_to": "2017-04-30T22:05:00.000", "params": {"score": "score"}}'
"2.0"
件数(count)
$ sls invoke local -f run -d '{"label_id": "id", "label_range": "timestamp", "id": "sensor1", "aggregator": "count", "time_from": "2017-04-30T22:00:00.000", "time_to": "2017-04-30T22:05:00.000", "params": {"score": "score"}}'
"6"

仕組み

最新値

以前の記事と同様、指定したレンジキーの最大値を取るようにしただけです。

max(data, key=(lambda x:x[params['range']]))

最大値 / 最小値

max関数とmin関数が使えるようにこんな実装にしています。

max_aggregator.py
max(data, key=(lambda x:x[params['score']]))
min_aggregator.py
min(data, key=lambda x: x[params['score']])

平均値

後述の合計値と件数を使っての計算になります。

sum(map(lambda x: x['score'], data)) / len(map(lambda x: x['score'], data))

合計値

sum関数使ってサクッと計算したかったのでこんな感じです。

sum(map(lambda x: x['score'], data))

件数

対象カラムの長さをとっただけです。

len(map(lambda x: x['score'], data))

AWS Lambdaでも動くのか?

現状のserverless.ymlには権限周りの設定が足りなかったようなのですが、とりあえずデプロイ後に手動で権限を正しくセットしたらうまくいきました。

とりあえずデプロイは何も問題なく完了。

$ sls deploy 
Serverless: Packaging service...
Serverless: Creating Stack...
Serverless: Checking Stack create progress...
.....
Serverless: Stack create finished...
Serverless: Uploading CloudFormation file to S3...
Serverless: Uploading artifacts...
Serverless: Uploading service .zip file to S3 (10.19 KB)...
Serverless: Updating Stack...
Serverless: Checking Stack update progress...
...............
Serverless: Stack update finished...
Service Information
service: lambda-dynamodb-aggregator
stage: dev
region: us-east-1
api keys:
  None
endpoints:
  None
functions:
  run: lambda-dynamodb-aggregator-dev-run

スクリーンショット 2017-05-03 12.20.13.png

動きはしました。(件数を取得しています)

スクリーンショット 2017-05-03 12.38.32.png

続きを読む

AWS LambdaでDynamoDBから取得した値の最新レコードを取得する

DynamoDBに値が入っている前提で、

  • とある時間の
  • あるIDにおける
  • 集計値

を取ろうする際の実装。(AWS Lambda Python 2.7を使っています)

前提

以下の前提とします。

  1. DynamoDBのハッシュキーにID(文字列)、レンジキーに時刻(文字列)を指定
  2. データは時系列に格納されているものとする。
  3. 指定したカラムの集計を取るようにする。
  4. とりあえず指定したIDで全件取得(時間による絞り込みはしない)

こんな感じのデータを想定。

id score timestamp
sensor1 0 2017-04-30T22:00:00.000
sensor1 0 2017-04-30T22:00:01.000
sensor1 1 2017-04-30T22:00:02.000
sensor1 0 2017-04-30T22:00:03.000
sensor2 1 2017-04-30T22:00:04.000

このうちsensor1の最新値を取るための実装をしてみます。結果としてscore:0が取得できるロジックを期待します。

実装

実行環境にはSERVERLESS FRAMEWORKを利用しました。こんな感じの実装でできました。

import boto3
import json
import decimal
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
table    = dynamodb.Table('test')

def run(event, context):

    res = table.query(
            KeyConditionExpression=Key('id').eq("sensor1")
        )

    return_response = max(res["Items"], key=(lambda x:x["timestamp"]))

    return json.dumps(return_response, default=decimal_default)

def decimal_default(obj):
    if isinstance(obj, decimal.Decimal):
        return float(obj)
    raise TypeError

まずは最新値を求めるというところで手抜き感満載の実装ですが、Aggregate部分を別クラスにしてしまって少しインテリジェンスに選択できるようにすれば、一気に用途が広がる気がします。

実行結果は以下の通りです。無事取得できてますね。

$ sls invoke local -f run
"{"timestamp": "2017-04-30T22:03:00.000", "score": 0.0, "id": "sensor1"}"

続きを読む

AWS X-RayでLambda→Athenaのアクセスを可視化してみた

以前こんなものを作りましたが、これをAWS X-Rayで可視化してみたら、何がわかるのか、実験してみました。

Amazon AthenaをAWS Lambdaから操作できるようにしてみた

AWS X-Ray デーモンの実行

AWS X-Ray SDK は、AWS X-Ray に Trace データを直接送信しないらしいので、送付用のEC2インスタンスを作成します。ユーザデータとして以下を登録してインスタンスを生成するだけなので、簡単です。

#!/bin/bash
curl https://s3.dualstack.us-east-1.amazonaws.com/aws-xray-assets.us-east-1/xray-daemon/aws-xray-daemon-2.x.rpm -o /home/ec2-user/xray.rpm
yum install -y /home/ec2-user/xray.rpm

システムログにxrayのインストールログが出力されていたのでOKでしょう。

Examining /home/ec2-user/xray.rpm: xray-2.0.0-1.x86_64
Marking /home/ec2-user/xray.rpm to be installed
Resolving Dependencies
--> Running transaction check
---> Package xray.x86_64 0:2.0.0-1 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

================================================================================
 Package         Arch              Version               Repository        Size
================================================================================
Installing:
 xray            x86_64            2.0.0-1               /xray            6.6 M

Transaction Summary
================================================================================
Install  1 Package

Total size: 6.6 M
Installed size: 6.6 M
Downloading packages:
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
  Installing : xray-2.0.0-1.x86_64                                          1/1 
xray start/running, process 2576
  Verifying  : xray-2.0.0-1.x86_64                                          1/1 

Installed:
  xray.x86_64 0:2.0.0-1                                                         

Complete!

Lambdaアプリ側の準備

今回Javaアプリケーションを動かすわけですが、LambdaアプリケーションをX-Rayで監視したい場合は、Lambdaアプリケーションの「設定」タブの中で以下のチェックボックスをONにするだけで良いようです。

スクリーンショット 2017-04-23 22.21.44.png

参考:http://docs.aws.amazon.com/ja_jp/xray/latest/devguide/xray-services.html

またX-Rayを操作するための権限をIAMで設定する必要もあります。今回は試験的な運用だったため「AWSXrayFullAccess」をつけてしまいましたが、実際の運用に合わせてこの辺りは慎重に選びたいですね。

アプリを起動して可視化してみる

ここまでできれば、普通にLambdaアプリを動かしてみてX-Rayでどのように見えるのか確認ができます。今回Lambdaアプリケーションには以下のJSONをインプットとして与えるようにしました。以前の記事でサンプルとしてAthenaのテーブルからデータを取得するようにした際の入力値です。

{
  "region": "us-east-1",
  "s3Path": "s3://ishida-athena-staging-dir/",
  "sql": "SELECT elbname, requestip,  requestport, backendip, backendport, requestprocessingtime, backendprocessingtime, timestamp FROM sampledb.elb_logs order by timestamp desc limit 10",
  "columnListStr": "elbname, requestip,  requestport, backendip, backendport, requestprocessingtime, backendprocessingtime,  timestamp"
}

実行後1分ほど待つと、以下のような表示がX-Rayで確認できました。無事可視化ができたようです。

スクリーンショット 2017-04-23 22.56.40.png

X-Rayの中身を確認してみる

表示されたService Mapの右側のオブジェクトをクリックすると以下のような表示がされました。
スクリーンショット 2017-04-23 22.56.51.png

それぞれの処理にどの程度時間がかかってレスポンスとして何を返しているのかが一覧でわかります。
表示されているIDをクリックすると、そのTraceの詳細が確認できました。

スクリーンショット 2017-04-23 22.56.58.png

これをみる限り、Lambdaアプリの初期化に230ms程度、実際のAthena接続部分に約3秒程度かかっている、という風にみればいいんですかね。この処理全体としては4.6秒かかっているので、実際にAthenaにアクセスするため以外に1.5秒ほどは時間が取られている、と理解すればいいんでしょうか。この辺はもっと勉強が必要だ(^^;

ちなみにエラーが出ている場合は、その例外の中身も確認することができるようです。

まとめ

それぞれの処理がどの程度時間にかかっていて、さらに呼び出し関係までこれほど簡単にセットアップしつつ可視化ができるのは強力ですね。これからMicroservicesなどで分散して処理をさせることが当たり前になることを考えると、必須の技術と言えると思います。Springで言えばZipkinとSleuthをAWS上で実現しているような感じですね。

続きを読む

AtlassianのLocalStackを使ってみてなんとなく理解するまでのお話

Atlassianが「LocalStack」なんてとても便利そうなものを出していたけど、なかなか使い方を解説しているページが見つからなかったので、とりあえず使いながらなんとなく中身を理解するまでのお話。

https://github.com/atlassian/localstack
スクリーンショット 2017-04-23 17.53.59.png

起動

いくつかGithubで利用方法が紹介されていますが、今回はdockerでの利用をしてみます。

$ docker run -it -p 4567-4578:4567-4578 -p 8080:8080 atlassianlabs/localstack
2017-04-23 08:50:15,876 INFO supervisord started with pid 1
2017-04-23 08:50:16,879 INFO spawned: 'dashboard' with pid 7
2017-04-23 08:50:16,885 INFO spawned: 'infra' with pid 8
(. .venv/bin/activate; bin/localstack web --port=8080)
. .venv/bin/activate; exec localstack/mock/infra.py
Starting local dev environment. CTRL-C to quit.
 * Running on http://0.0.0.0:8080/ (Press CTRL+C to quit)
 * Restarting with stat
Starting local Elasticsearch (port 4571)...
Starting mock ES service (port 4578)...
Starting mock S3 server (port 4572)...
Starting mock SNS server (port 4575)...
Starting mock SQS server (port 4576)...
Starting mock API Gateway (port 4567)...
Starting mock DynamoDB (port 4569)...
Starting mock DynamoDB Streams (port 4570)...
Starting mock Firehose (port 4573)...
Starting mock Lambda (port 4574)...
Starting mock Kinesis (port 4568)...
Starting mock Redshift server (port 4577)...
 * Debugger is active!
2017-04-23 08:50:18,537 INFO success: dashboard entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
2017-04-23 08:50:18,538 INFO success: infra entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)
 * Debugger PIN: 844-652-544
Ready.

とりあえず起動はしたみたい。で、http://localhost:8080/にアクセスしてみたけど、こんな感じで何も表示されず。

スクリーンショット 2017-04-23 17.59.04.png

使ってみてわかりましたが、要は本当に各種サービスが以下のアドレスで利用できるようにしてくれたもののようです。

全部試すのもアレなので、とりあえず馴染み深いDynamoDBとS3を使ってみる。

DynamoDBのテーブル作成

全然関係ないですが、http://localhost:4569/にアクセスすると以下のレスポンスをもらえます。デフォルトはus-east-1で動いている想定のようで。(Githubページにも書いてあった。バインドすればいくつか設定を変更できるようですね。)

healthy: dynamodb.us-east-1.amazonaws.com 

では早速テーブルをCLIから作ってみます。一応作成前にlist-tablesをしてみますが、もちろん何も登録されていません。

$ aws --endpoint-url=http://localhost:4569 dynamodb list-tables
{
    "TableNames": []
}

こちらを参考にさせていただいて、create-tableコマンドを発行します。

$ aws --endpoint-url=http://localhost:4569 dynamodb create-table --table-name test --attribute-definitions AttributeName=testId,AttributeType=S --key-schema AttributeName=testId,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=1
{
    "TableDescription": {
        "TableArn": "arn:aws:dynamodb:us-east-1:000000000000:table/test", 
        "AttributeDefinitions": [
            {
                "AttributeName": "testId", 
                "AttributeType": "S"
            }
        ], 
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0, 
            "WriteCapacityUnits": 1, 
            "ReadCapacityUnits": 1
        }, 
        "TableSizeBytes": 0, 
        "TableName": "test", 
        "TableStatus": "CREATING", 
        "KeySchema": [
            {
                "KeyType": "HASH", 
                "AttributeName": "testId"
            }
        ], 
        "ItemCount": 0, 
        "CreationDateTime": 1492937089.534
    }
}

なんか作られたっぽいですね。もう一度list-tablesをしてみます。

$ aws --endpoint-url=http://localhost:4569 dynamodb list-tables
{
    "TableNames": [
        "test"
    ]
}

出来上がってますね。なるほど、本当にAWS上でやる操作をEndpointURLを変更するだけで操作できてしまうようです。これは思っていたよりも便利かも。

S3のバケットを作成してみる

S3のバケットも作成できるのか試してみます。まずバケットのリストを見てみますが、当然何もありません。

$ aws --endpoint-url=http://localhost:4572 s3 ls
(何も表示されず)

では作成してみます。

$ aws --endpoint-url=http://localhost:4572 s3 mb s3://kojiisd-test/
make_bucket: s3://kojiisd-test/
$ aws --endpoint-url=http://localhost:4572 s3 ls
2006-02-04 01:45:09 kojiisd-test

まじか、これは便利。ただdockerでサービス起動したら停止時に中身が消えてしまうから、できれば作成したものが残るような起動方法の方が色々試そうとしたら適していそうですね。
(作成時間がはちゃめちゃですが、とりあえずそこまで問題にはならないかな)

ちなみにDynamoDBのテーブルとS3のバケットを作成してから気づきましたが、http://localhost:8080/にアクセスしたら、作成したものが表示されていました。なるほど、そのためのDashBoardだったのか。素敵。

スクリーンショット 2017-04-23 18.20.02.png

まとめ

どれくらいどこまで何ができるのかは気になりますが、一般的なことであればだいたいローカルでできるような気がします。しかもEndpointURLを変更するだけで良さそうなので、これはかなり便利かも。今度作成したアプリを全てLocalStackで動かしてみるとかやってみようかな。

とりあえず、もっとこれは知られるべきと、思いました。

続きを読む

DynamoDB Local Viewerに機能追加(スキーマ情報取得、ソート・フィルタ、データ削除)

以前作成したDynamoDBのLocal Viewerですが、現状の機能だと色々と不足してきたため更新をかけました。以前の記事はこちら。

DynamoDB Local用のViewerをSpring Bootベースで作ってみた

当時はScanメソッドでとりあえずデータを取ってましたが、今回は以下に対応させました。

  • テーブルの詳細情報確認
  • 指定したテーブルの中身の削除(制限あり)
  • 指定したテーブルの削除
  • テーブルデータのソートと絞り込み

結果はこちらに登録してあります。

https://github.com/kojiisd/dynamodb-local-view

テーブルの詳細情報確認

テーブル名をクリックした際にスキーマ情報を取得できるようにしました。

スクリーンショット 2017-04-21 7.10.54.png

こんな感じのダイアログを出すようにしています。

スクリーンショット 2017-04-21 7.11.21.png

指定したテーブルの中身の削除(制限あり)

個人的には一番欲しかった機能です。ローカルでDynamoDBを使った動作確認をしている際に、いちいちテーブルをドロップしてから作り直すのをスクリプトを組んで実施するのが面倒でした。なので「Clear」をクリックすれば中身をからにしてくれる機能を作りました。

スクリーンショット 2017-04-21 7.12.01.png

以下は削除後のスキーマ情報です。tableSizeBytesやitemCountが0になっているのがわかります。
ただいくつか制限があり、以下の実装のようにいくつかテーブル情報を引き継いでいません。この辺りうまい方法を知っている人がいればぜひ知りたいです。

CreateTableRequest createRequest = new CreateTableRequest().withTableName(tableName)
        .withAttributeDefinitions(describeResult.getTable().getAttributeDefinitions())
        .withKeySchema(describeResult.getTable().getKeySchema())
        .withProvisionedThroughput(new ProvisionedThroughput()
                .withReadCapacityUnits(describeResult.getTable().getProvisionedThroughput().getReadCapacityUnits())
                .withWriteCapacityUnits(describeResult.getTable().getProvisionedThroughput().getWriteCapacityUnits()));

スクリーンショット 2017-04-21 7.11.49.png

どんな仕組みにしたかはこちらに書きました。

Local DynamoDBのテーブルデータを削除したいときの実装(テーブル削除→作成)

指定したテーブルの削除

スクリーンショット 2017-04-21 7.13.16.png

スクリーンショット 2017-04-21 7.13.40.png

テーブルデータのソートと絞り込み

Scanのページには、各ヘッダで並び替えができるように、また検索結果にフィルタをかけられるように簡易のフィルタ機能をつけました。

スクリーンショット 2017-04-21 9.06.23.png

まとめ

そろそろQuery機能もつけて、実際のAWS ConsoleでのDynamoDBのユーザビリティを再現したいと思います。

続きを読む