AmazonAthenaをCLIから使う

よーやくCLIから使えるようになりました。
早速試しましょう

前提条件

  • S3にQuery用のダミーデータが入っているものとします
  • S3にAthenaの実行結果保存用ディレクトリが作成されているものとします
  • Athenaは us-east-1 を使用して実行しています

実行環境

コマンド
aws --version
結果
aws-cli/1.11.89 Python/3.5.0 Darwin/16.5.0 botocore/1.5.52

手元にコマンドがない場合はアップデートしましょう

アップデート
sudo pip install -U awscli --ignore-installed six
  • 筆者の環境ではDatapipelinを使用してDynamoDBのあるテーブルデータをS3にバックアップしたものをサンプルデータとして使用しています。
  • DATABASEに関しては以前Javaからの接続の際検証で作成した dynamodb という名前のDATABASEが存在します。
  • テーブルとして、DynamoDBのJSONデータに対応するように作成した dynamodb.sample1 を使用します。

Queryの実行

コマンド
aws athena start-query-execution 
    --query-string 'select count(*) from dynamodb.sample1;' 
    --result-configuration Database=dynamodb 
    --result-configuration OutputLocation=s3://xxxxxxxxxxxxxxxxxxxxxxxxxxx/cli-test/
{
    "QueryExecutionId": "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
}

結果の取得

コマンド
aws athena get-query-results 
    --query-execution-id "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
結果
{
    "ResultSet": {
        "ResultSetMetadata": {
            "ColumnInfo": [
                {
                    "CatalogName": "hive",
                    "Name": "_col0",
                    "Scale": 0,
                    "SchemaName": "",
                    "Precision": 19,
                    "Type": "bigint",
                    "CaseSensitive": false,
                    "Nullable": "UNKNOWN",
                    "Label": "_col0",
                    "TableName": ""
                }
            ]
        },
        "Rows": [
            {
                "Data": [
                    {
                        "VarCharValue": "_col0"
                    }
                ]
            },
            {
                "Data": [
                    {
                        "VarCharValue": "500000"
                    }
                ]
            }
        ]
    }
}

実行したQueryの詳細を取得する

コマンド
aws athena get-query-execution 
    --query-execution-id  "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
結果
{
    "QueryExecution": {
        "Status": {
            "SubmissionDateTime": 1495440535.706,
            "State": "SUCCEEDED",
            "CompletionDateTime": 1495440549.995
        },
        "QueryExecutionId": "xxxxxxxxxxxxxxxxxxxxxxxxxxx",
        "Statistics": {
            "DataScannedInBytes": 71512882,
            "EngineExecutionTimeInMillis": 13930
        },
        "Query": "select count(*) from dynamodb.sample1",
        "ResultConfiguration": {
            "OutputLocation": "s3://xxxxxxxxxxxxxxxxxxxxxxxxxxx/cli-test/xxxxxxxxxxxxxxxxxxxxxxxxxxx.csv"
        }
    }
}

格納先のオブジェクトパスが取得できます。

これでAthenaを簡単に使える!東京リージョンまだだけど、S3を介してやり取り出来るのでまあまあ使えるんじゃないかという所感。

続きを読む

DynamoDB は空文字を登録できない

DynamoDB は空文字を登録できないため、空文字チェックしてエラー回避する実装にする必要があります。

が、JSONとかの関係でどうしても登録したい場合は、以下のように単純にスペースで登録するとよいかもしれません。

ちなみに Lambda – Node.js 6.10 での例です。

'use strict';

const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {

    // 空文字チェック
    // ※DynamoDBの仕様で空文字を登録できないため、半角スペースで登録
    let resourcesPara = ' ';
    if (event.resources !== undefined && event.resources.length > 0) {
        resourcesPara = event.resources;
    }

    const params = {
        TableName: "tableName",
        Item: {
            "id": "newId",
            "resources": resourcesPara
        }
    };

    docClient.put(params, function(err, data) {
        if (err) {
            console.error('Unable to add item. Error JSON:', JSON.stringify(err, null, 2));
            // エラー
            callback(null, err);
        } else {
            console.log('Added item:', JSON.stringify(data, null, 2));
            callback(null, {"id": "newId"});
        }
    });
};

取得するときも単純に半角スペースを空文字にして返却。

'use strict';

const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {

    const params = {
        TableName: "tableName"
    };

    docClient.scan(params, function(err, data) {
        if (err) {
            console.error('Unable to get item. Error JSON:', JSON.stringify(err, null, 2));
            // エラー
            callback(null, err);
        } else {
            if (data.Count > 0) {
                // データあり
                for (let i = 0; i < data.Count; i++) {
                    // スペースのみの場合空文字にする
                    // ※DynamoDBの仕様で空文字を登録できないため、半角スペースで登録している
                    if (data.Items[i].resources === ' ') {
                        data.Items[i].resources = '';
                    }
                }
                callback(null, data.Items);
            } else {
                // データなし
                callback(null, '404 Not Found. Data is nothing.');
            }
        }
    });
};

続きを読む

AWS IoTのルールエンジンとシャドウ

AWS IoTに関する基本的な内容をまとめてみたものです。AWS IoTに関する、Web上にすでにある解説コンテンツをまとめたサイトの抜粋です。
ルールエンジンとシャドウ

ルールエンジンによりデバイスのデータに基づいたアクションを設定

AWS IoTでは、ルールエンジンによって、接続されたデバイスによって生成されるデータを収集し、処理し、分析し、データに基づいたアクションを実行するアプリケーションを構築します。

ルールエンジンで直感的にルールを設定し、SQL に似たシンタックスでインバウンドデータを自動的にフィルタおよび変換できます。AWS IoT サービスから他のいくつかの AWS のサービスまたはお客様がお使いのサードパーティサービスへルーティングするルールを設定できます。

例)
• 受信メッセージのフィルタリングおよび変換、DynamoDB へ時系列データとして保存。
• センサーからのデータが特定のしきい値を超えた時、SNS を経由してプッシュ通知を送信。
• ファームウェアファイルを S3 に保存。
• Kinesis を使用して複数のデバイスからのメッセージを同時に処理。
• 受信データのカスタム処理を行うため Lambda を呼び出し。
• 自動再発行して、デバイスのグループへコマンドを送信。

ルールはSQLライクな構文にて指定

AWS IoT のルールは 2 つの主要な部分で構成されます。

SQLステートメント:
SQL ステートメントでは、ルールを適用するトピック、必要に応じて実行するデータ変換、およびルールを実行する際の条件を指定します。ルールは指定されたトピックに発行されたすべてのメッセージに適用されます。

アクションリスト:
受信メッセージがSQLステートメントで定義された条件と一致した時、アクションリストで定義されたアクションが実行されます。

ルールの定義は JSON ベースのスキーマを使用しています。直接 JSON を編集、または AWS マネジメントコンソールのルールエディタを使用できます。

デバイスシャドウによるデバイスの状態管理の仕組み

デバイス シャドウは、デバイスの現在のステータス、アプリケーションから要求されたステータスを管理するJSONドキュメントであり、デバイスシャドウには、デバイスがオフライン状態のときでも、各デバイスについて最後に報告された状態と、希望する今後の状態が保持されます。最後に報告された時点の状態の取得や、希望する今後の状態の設定は、API またはルールエンジンによって実行されます。

デバイスシャドウでは、REST API が常時利用できるため、デバイスと協働するアプリケーションの構築が容易になります。さらに、アプリケーションではデバイスの現在の状態を取得することなく、希望する今後の状態を設定できるようになります。希望する状態と最後に報告された時点の状態との相違は AWS IoT によって比較され、相違を補うようデバイスに対してコマンドが送られます。

デバイスのシャドウには、デバイスの状態を最大 1 年間無料で保存できます。最低 1 年間に 1 度更新していれば、デバイスのシャドウは無期限に継続できます。更新しなかった場合は消去されます。

続きを読む

AWS IoTとは

AWS IoTに関する基本的な内容をまとめてみたものです。AWS IoTに関する、Web上にすでにある解説コンテンツをまとめたサイトの抜粋です。
AWS IoTとは

AWS IoTとは

AWS IoSサービスにより、さまざまなデバイスを AWS の各種 Services や他のデバイスに接続し、データと通信を保護し、デバイスデータに対する処理やアクションを実行することが可能になります。

AWS IoTデバイスSDK
AWS IoT には、ハードウェアデバイスやモバイルアプリケーションを簡単に、すばやく接続できるようサポートする SDK が準備されています。AWS IoT デバイス SDK を使用すれば、AWS IoT との間で MQTT、HTTP、または WebSockets プロトコルを介した接続、認証、メッセージ交換が可能になります。このデバイス SDK では C、JavaScript および Arduino がサポートされており、クライアントライブラリ、開発者ガイドおよびメーカー向けの移植ガイドが付属しています。

認証と認可
AWS IoT では、接続するすべてのポイントでの相互認証と暗号化が提供されており、デバイスと AWS IoT 間では身元が証明されたデータのみが交換されます。

コミュニケーションプロトコル
AWS IoT とデバイスの間では MQTT、HTTP、または WebSockets プロトコルを介した接続、認証、メッセージ交換が可能です。MQTTは、M2M/IoTでよく使用されるOASISスタンダードのプロトコル。ライトウェイトでリソースや回線帯域が限られているデバイスでよく利用されます。MQTTはhttpsと比較して、スループットで93倍、メッセージ送信の消費電力で1/12、メッセージ受信の消費電力で1/180となります。

ルールエンジンによって、AWS Lambda、Amazon Kinesis、Amazon S3、Amazon Machine Learning、Amazon DynamoDB、Amazon CloudWatch、および Amazon Elasticsearch Service (組み込みの Kibana と統合されている) などの AWS エンドポイントへのメッセージのルーティングも行えます。

AWS IoTのコンポーネント

デバイスゲートウェイ
接続されたデバイスから、指定されたトピックについて複数の受信者にデータをブロードキャストすることができます。デバイスゲートウェイでは MQTT、WebSocket、および HTTP 1.1 プロトコルがサポートされており、専用プロトコルやレガシープロトコルのサポート実装も容易に行えます。デバイスゲートウェイは、インフラストラクチャのプロビジョニングが不要でありながら、10 億台以上のデバイスにも対応できるよう自動的にスケールされます。

レジストリ
レジストリによって、デバイスの ID が確定され、デバイスの属性や機能といったメタデータが追跡されます。各デバイスには、デバイスのタイプや接続方法にかかわらず、一貫した形式の一意の ID がレジストリによって割り当てられます。

シャドウ
AWS IoT では、それぞれのデバイスについて「シャドウ」を作成できます。シャドウにはデバイスの最新の状態が保存されるため、アプリケーションや他のデバイスからのメッセージの読み出し、およびデバイスとの通信が実行できます。デバイスのシャドウには、デバイスがオフライン状態のときでも、各デバイスについて最後に報告された状態と、希望する今後の状態が保持されます。最後に報告された時点の状態の取得や、希望する今後の状態の設定は、API またはルールエンジンによって実行できます。

ルールエンジン
ルールエンジンによって、接続されたデバイスによって生成されるデータを収集し、処理し、分析し、データに基づいたアクションを実行するアプリケーションを構築することが可能になります。ルールエンジンでは、お客様が定義したビジネスルールに基づいて、AWS IoT に向けて発行された入力メッセージが評価、変換され、別のデバイスやクラウドサービスへと配信されます。1 つのデバイスからのデータにも、多数のデバイスからのデータにも同じルールを適用でき、アクションを単独で実行することも、多数のアクションを並行して実行することも可能です。

ルールエンジンによって、AWS Lambda、Amazon Kinesis、Amazon S3、Amazon Machine Learning、Amazon DynamoDB、などの AWS エンドポイントへのメッセージのルーティングも行えます。AWS Lambda、Amazon Kinesis および Amazon Simple Notification Service (SNS) を使用して外部のエンドポイントに届けることも可能です。

KinesisとIoTの違い

Kinesisを使ってIoTのシステムを実現している例もありますが、IoTサービスはより包括的にデバイスとクラウドを接続するためのサービスを用意していますので、まずはデバイスとの入り口をIoTサービスで対応して、Kinesisサービスにつなぐという構成が、AWSでのIoTシステムの構成かと思います。
具体的なKinesisとIoTの違いとして一番明確なのは、デバイスとの間のコミュニケーションプロトコルです。IoTはMQQTに対応していますが、Kinesisはhttpsのみとなります。IoTはシャドウのような仕組みも持っており、IoTシステムを構築するためのトータルなシステムを組み上げる仕組みを包括的に用意しています。

デバイスデータに対するさまざまな処理を簡単に実現する仕組み

AWS IoTでは、「ルールエンジン」に定義したビジネスルールに基づいて、デバイスデータを迅速にフィルタリング、変換、活用することができます。各種処理は、AWS Lambda、Amazon Kinesis、Amazon S3、Amazon Machine Learning、Amazon DynamoDB、Amazon CloudWatch、および Amazon Elasticsearch Service といった AWS の各種サービスを呼び出すことにより実行させます。

各種AWSのサービスには、「ルールエンジン」がAWS IoTにPublishされたメッセージを評価し、ルールに基づいて配信されます。

また、Amazon Kinesis、AWS Lambda、Amazon S3などのサービスを経由して、外部のエンドポイントを呼び出すことも可能です。

(例) センサデータをKinesisを経由してエンタプライズアプリケーションへ

・各種デバイスからのセンサデータをIoTにて受信
・ルールエンジンにて、受信したセンサデータへのフィルタリングおよびデータ変換を実施
・AWS Kinesisにストリームデータとしてパブリッシュ
・Kinesisで収集されたデータをデータベースやBIなどの分析アプリケーションに転送

例) デバイスデータをIoTからAmazon SNSを経由してスマホにプッシュ通知

・各種デバイスからのデータやイベント通知をIoTで受信
・スマホに通知すべき条件判定をIoTのルールエンジンにて実行
・通知条件に合致する場合には、Amazon SNSを経由して、アップル社APNS、Google社のGCMに
・配信リクエストを送信することにより、スマホへのプッシュ通知として、デバイスのデータやイベントの通知を実現

続きを読む

EMRの基本的な利用手順

Hadoopに関する基本的な内容をまとめてみたものです。Hadoopに関する、Web上にすでにある解説コンテンツをまとめたサイトの抜粋です。
各種BIツールが使えるAmzon EMR

Amazon EMRとは

Amazon EMRとは、AWS上でオープンソース型フレームワーク Hadoopが動作出来る環境を提供するものです。

一般的にはHadoopを使用するためには複数のサーバーを用意する必要がありますが、Amazon EMRを使えば新たにサーバーを購入したりシステムの構成を変更する必要はありません。
Amazon EMRがHadoopを常時使用可能な環境を構築しますので、容易にデプロイする事が可能となります。

Amazon EMRの特徴

Amazon EMRには、以下のような特徴があります。

・Amazon EMR でHadoopクラスターを実行した時の仮想サーバー数の増減が簡単に出来ます
・ハードウェア等のメンテナンス料金を支払う必要はなく、Amazon EMRでクラスターを使用した分だけの料金を払うだけです
・Hive、Pig、HBase などのHadoop アプリケーションの使用が可能
・Amazon EC2、Amazon S3、DynamoDB、Amazon RDSなどのAWSとAmazon EMRを統合する事が出来ます
・Microsoft Excel、MicroStrategy、QlikView、TableauなどのBIツールを使って、データの分析を行う事が出来ます

EMRの基本的な利用手順

EMRの基本的な利用手順は、次のようなものです。
EMRの入門者でも、比較的容易に操作する事が出来ます。

入出力データ・データ格納領域をS3等に用意
Amazon S3 を使って、Amazon EMRに入出力データやログファイルなどを格納出来ます。
S3コンソールを開きパケット名とデータのあるパス等を指定する事で、S3パケットを作成します。

クラスターの起動
Amazon EMRコンソールを開き、ソフトウェア・ファイルシステム・ハードウェア等の設定を行ってから、クラスターを作成します。

Hiveスクリプトを実行する
Amazon EMRコンソールを使って、Hiveスクリプトを実行します。
Hiveスクリプトをステップとして送信する事で、出力を確認する事が出来ます。
S3コンソールを開いて、出力したパケットのフォルダ内の出力ファイルで確認します。

Hue を使ってクエリを送信する
Hadoop用オープンソースウェブユーザーインターフェイスであるHueにログインして、クエリを送信します。
Hueを使う事で簡単にクエリを送信したり、スクリプトが作成出来るようになります。

EMRを使用した後の処理

EMRを使用した後は追加料金が発生しないように、不要なリソースは削除しなくてはなりません。

Amazon S3のバケットの削除
Amazon S3 コンソールを使えば、選択したオブジェクトを削除出来ます。

Amazon EMRクラスターの終了
Amazon EMRクラスターを終了するには、Amazon EMRコンソールを開いてCluster Listページで終了したいクラスターのチェックボックスをオンにして、Terminateを選択します。

続きを読む

VPCのネットワークまわりをもう一度

VPCまわりって、ややこしいよね


  • VPC
  • サブネット
  • IGW・VGW
  • NATインスタンス・ゲートウェイ
  • セキュリティグループ
  • ネットワークACL
  • VPCピア接続

😂


今日はこのへん話します

  • VPC
  • サブネット
  • IGW
  • NATインスタンス

ひとつずつ、思い出してみよう
(知らない人はこれを機にそんなもんがあるのかと思ってみよう)


VPC(Virtual Private Cloud)


VPCとは

利用者ごとのプライベートなネットワーク空間のこと

クラスA〜Cのプライベートネットワークのいずれかの値を使用します


Q. aとbどちらが正しいでしょう

a. VPCはリージョン内でAZ(アベイラビリティゾーン)をまたげる
b. VPCはリージョン内でAZ(アベイラビリティゾーン)をまたげない


答え

a. VPCはリージョン内でAZ(アベイラビリティゾーン)をまたげる


AWS Design Simple AZ:Untitled - Cacoo 2017-05-17 11-07-07.png


サブネット


サブネットとは

大きなネットワークの中の小さなネットワーク

役割に応じてWeb用、DB用と作られる。これによって、インターネットからのアクセス制限をかけられたりする


Q. aとbどちらが正しいでしょう

a. サブネットはリージョン内でAZ(アベイラビリティゾーン)をまたげる
b. サブネットはリージョン内でAZ(アベイラビリティゾーン)をまたげない


答え

b. サブネットはリージョン内でAZ(アベイラビリティゾーン)をまたげない


AWS Design Simple subnet:Untitled - Cacoo 2017-05-17 11-12-13.png


IGW(Internet Gate way)


IGWとは

VPCと外部ネットワークの間で通信を行うための出入り口。こいつをVPCにアタッチしてインターをネットする


サブネットでネット制限

  • インターネットとのアクセスを許可するサブネット: パブリックサブネット
  • インターネットとのアクセスを許可しないサブネット: プライベートサブネット

ルートテーブルの設定によってパブリック/プライベートが決まる。
ルートテーブルでIGWをターゲットに設定するとパブリックになる。


AWS Design Simple:Untitled - Cacoo 2017-05-17 11-15-36.png


でもこのままだとプライベートサブネット孤立問題が起こる

  • パッチなどをダウンロードしたいときインターネットにアクセスできない
  • DynamoDBなど他のリージョンサービスにアクセスできない

そこで

NAT(Network Address Translation)

が登場します


NATを使うことで、インターネットからのアクセスを受け付けないまま、インターネットやDynamoDBなどのリージョンサービスにアクセス可能に


nat-gateway-diagram.png


ルーターとは

サブネットを相互接続し、インターネットゲートウェイ、仮想プライベートゲートウェイ、NAT ゲートウェイ、およびサブネットの間でトラフィックを正しい宛先に送る機能。


ありがとうございました

続きを読む

Kinesis Stream, Lambda, DynamoDB, S3 で Stream ベース実装に使える npm モジュール

AWS のマネージドサービスを連携する Lambda や サービスを Node.js の Stream を使って作ることが多いため、利用している自作モジュールについて説明します。

kinesis-stream-lambda

https://github.com/tilfin/kinesis-stream-lambda

  • Kinesis Stream のイベントソースの Record をパースして data フィールドを Base64 から
    Buffer にしてくれる ReadableStream (KPL の aggregation にも対応可能)
  • その Buffer を JSON としてパースし、JavaScript Plain Object のデータに Transform するストリーム

それぞれを提供します。要するに Lambda の event をコンストラクタ渡して2つ pipe で繋げると中身の JSON が Object で Stream 処理できる代物です。

const es = require('event-stream');
const KSL = require('kinesis-stream-lambda');

exports.handler = function(event, context, callback) {
  console.log('event: ', JSON.stringify(event, null, 2));

  const result = [];
  const stream = KSL.reader(event, { isAgg: false });

  stream.on('end', function() {
    console.dir(result);
    callback(null, null);
  });

  stream.on('error', function(err) {
    callback(err);
  });

  stream
  .pipe(KSL.parseJSON({ expandArray: false }))
  .pipe(es.map(function(data, callback) {
    result.push(data);
    callback(null, data)
  }));
}

s3-block-read-stream

https://github.com/tilfin/s3-block-read-stream

S3 からファイルの中身を Range で取得しつつ Stream で処理できます。通常の Stream だと後方のストリームが流量が少ない場合に S3 の HTTP レスポンス処理が長時間になって TCP 接続が双方で Write/ReadTimeout しまう懸念があります。但しブロックサイズが細かいと API のコール回数が増えるのでそこは注意が必要です(APIのコール回数は従量課金対象です)。

paced-work-stream

https://github.com/tilfin/paced-work-stream

指定した並列数かつ一定間隔で処理できるワーカーのような Transform ベースの Stream を提供します。
主に DynamoDB への IO 処理や毎秒の呼び出し回数に制限がある API を呼び出すときに使えます。
fast-paced work にするのか slow-paced work にするかはコンストラクタの第1引数の concurrencyworkMS で調節します。

実際の処理はコンストラクタの第2引数にもしくは、サブクラスに _workPromise メソッド を定義します。この関数は処理内容を Promise で定義してその Promise を返す Function にします(これは Promise は定義した瞬間から実行が始まるためです)。なお Array<Function> として返すことも可能で、その場合でもそれを解釈して同時実行数を調節します。あと、処理数を tag でカウントできる機能も付いています。

dynamo-processor

https://github.com/tilfin/dynamo-processor

DynamoDB への操作を JSON で定義して食わせると CRUD 処理が簡単に実行できるプロセッサモジュールです。
paced-work-stream と組ませて RCU/WCU を意識した汎用的な処理を実装できます。

class DynamoWorkStream extends PacedWorkStream {
  constructor() {
    super({
      concurrency: 10,
      workMS: 80
    });
  }

  _workPromise(data) {
    return dp.proc(data, {
               table: data.table,
               useBatch: false
             });
  }
}

module.exports = DynamoWorkStream;

promised-lifestream

https://github.com/tilfin/promised-lifestream

AWSに限らずですが、複数の stream を pipe で繋げていくと、いずれかの stream で起きたエラーを補足するためには、それぞれの stream に .on('error', <function>) を定義する必要があります。またこれを Promise 化してすっきりさせたい。そのための関数を提供するモジュールです。needResult: true でオプション指定すると .then(result => ...) と最後の結果を受け取ることも可能です。

const PromisedLifestream = require('promised-lifestream');

exports.handle = function(event, context, callback) {
  const workStream = new DynamoWorkStream();
  PromisedLifestream([
    KSL.reader(event, { isAgg: false }),
    KSL.parseJSON({ expandArray: true }),
    workStream
  ])
  .then(() => {
    callback(null, null);
  })
  .catch(err => {
    callback(err); // 3つの stream どこかでエラーが起きてもここでキャッチできる
  });
}

補足

実装例と組み合わせるべきモジュール

promised-lifestream は全体的に使えます。

その他便利な Stream モジュール

続きを読む