DynamoDBと「強力な整合性のある読み込み」とCAP定理のこと

  • DynamoDBの参照はデフォルトが結果整合性なのでCAP定理のAP型(一貫性を犠牲にして可用性をとった設計)と紹介されていた。では「強力な整合性のある読み込み」オプションをどう考えたら良いのか?
  • AP型のDynamoDBで、強力な・・・を選べば(+Cで)CAP定理の限界を超えた!な訳はないはず。

考えるヒント1

考えるヒント2

考えるヒント3

現時点のアイデア

  • 「強力な整合性」は、分散ノードの値を複数みて比較しないといけないため、値を参照するまでの時間が伸びているという意味で「可用性が落ちている(CP型)」と考える。
  • ネットワークの問題がないときにCA型、ネットワークの問題があるときにAP型になる、と考える。

別の方向から考える

  • MySQLでレプリケーション遅延が起きれば、一貫性が損なわれるのでAP型になっている。(スレーブはCを捨ててAを実現している)
  • 思考実験1)マスターとスレーブの値をすべて取得して、一致したときだけ値を返すようにラップすれば、CP型になる?(一致するまでは結果を返せないのでAを捨てている)
  • 思考実験2)遅延したスレーブは切り捨てて、マスターと遅延していないスレーブだけが値を返すようにラップすれば、CA型になる?(ネットワークの問題により故障していないノードが使えなくなるのでPを捨てている)

その他

続きを読む

[モバイルアーキテクチャ ] Cognitoにどこまで任せるべきか?研究してみた

CognitoとはAWSサービスの一つであり、主にモバイルアプリの認証システムとして利用されている。
Cognitoには主に3つの機能があり、それぞれ以下のように自分は解釈している
(ツッコミをいただくために後悔してるので、間違ってたら是非教えてください)

Cognitoのサービス達

Cognito UserPool

概要

emailの確認、SMSによる確認、パスワードの再発行などもやってくれるメールによる認証機能。

メリット

  • ステップが複雑なメール認証機能を実装する手間がない
  • ユーザーの認証ステップの保持、Email, Passwordの暗号化など、面倒なデータを保持せずに済む(AWS管理コンソールにおいて、ユーザーの権限などいじる必要はあるけど)

デメリット

  • 既存のメール認証システムから移行する場合、name, email, phone numberなどはcsvでimportできるが、passwordだけはimportできないので、ユーザーにpasswordの再設定を依頼する必要がある。そのため、移行コストは高い
  • データを自分たちで保持していないため、トラブルシューティングの工数が増す危険がある

Cognito IdentityPool

概要

メール、Google, Facebookなどで認証されたユーザーに対して、特定のAWSサービスへのアクセスを許可する(=認可)の機能。
未認証ユーザーも取り扱い可能で、未認証ユーザーは認証ユーザーと別のrole(権限)を与えることができる

メリット

  • アクセストークンに関する実装をする必要がない
  • サーバーレスアーキテクチャーに移行しやすい(もとからサーバーレスで組むつもりならほぼ必須要件)

デメリット

  • API Gateway経由で、APIを公開する際に、Lambdaならuser情報を取得できるが、HTTPプロキシでELBやEC2を繋いだときは、ユーザー情報が取得できない(?)
  • tokenの受け渡しだけで、未認証ユーザーの照会をすることができない?(idをパラメータに含めないといけない?)

Cognito Sync

概要

Cognito IdentityPoolに存在するユーザーのユーザー設定を保持・同期できるサービス。認証認可とは関係ない。

メリット

機種変時、マルチデバイス対応時に、設定項目を同期するのに便利。

デメリット

今の所見つかってはいないが、iOSでいうUserDefaultに入れるような情報の保持、復元など用途が限定的なので、使わなくていいアプリケーションもたくさんありそう

考察

UserPool

開発コスト

[使わない場合]
– 単純なemail認証であったとしても、rubyだとdeviseなど便利なgemがあるものの、センシティブなデータを扱うために確認工数なども膨れる可能性が高い
– SMS認証も必須だとすると、それをサポートしてるライブラリも少ないため開発工数は爆発的に膨らみそう

[使った場合]
– ライブラリの利用方法を覚える必要があるものの、Mobile Hubなどを利用することでそのコストを格段に下げることができる
– SMS周りは特にハマるらしい。http://qiita.com/aki/items/e35e1bea8c27cfab5e9e
– アクセストークンの検証も忘れがち。http://qiita.com/devalon/items/721ef4bdec80e1e6847c, http://qiita.com/ya-mada/items/154ea6e10f9f788bfdd5

運用コスト

[使わない場合]
– いつでもデータにアクセス可能なので、ユーザー固有の問題に対するトラブルシューティングが比較的容易
– 認証の度にDBへのアクセスが走るためシステムリソースが喰われる
– センシティブなデータにいつでもアクセス可能なため、ルール作りや社内でのアクセス制御が面倒
– UserPoolを使いたくなった時に、ユーザーにpasswordの再設定を依頼する必要がある

[使った場合]
– データを自分たちで保持していないため、トラブルシューティングに時間がかかることもありそう
– AWSのユーザー権限で、アクセス制御が可能なので、そこは簡単
– AWS Cognitoが落ちたら、機能の大半が使えなくなるというリスクを孕んでいる
– export機能はないので、AWS Cognitoから独自の認証機能に移行する場合は、APIを叩いてデータをexportした上で、ユーザーにpasswordの再設定を依頼する必要がある

所感

まだ認証システムを独自で持ってないなら、使うと大幅に開発コストが削減できそう

Cognito IdentityPool

開発コスト

[使わない場合]
– Facebook, Googleなどのuidに相当する情報を保持する必要がある
– クライアントに一時的なアクセストークンを発行する必要がある
– アクセストークンの検証機構を作る必要がある

[使った場合]
– uidの保持をしてくれる
– アクセストークンの発行、検証をawsがよしなにしてくれるが、ec2, elbでそれをやろうとすると、前段にAPI Gatewayをhttp proxyとして置く必要がある
– API Gateway + Lambda ならユーザーの照会までしてくれるが、ec2, elbだとそれができない?
– elb, ec2との相性はわるそうで、未認証ユーザーの照会ができない?
– 上記の理由で構成が複雑化する

運用コスト

[使わない場合]
– uidなどの情報へのアクセス制御をする必要がある
– アクセストークン発行に際して、システムリソースを食う
– 保守しないといけないコードが増える
– サーバーレスアーキテクチャにする際にIdentityPoolに乗り換えるか、STSへのリクエストを独自で頑張るかする必要がある。1ユーザーに認証情報を複数持てるようにしておけば、IdentityPoolのidentityIdをuidとして紐付けできるので、移行コストはそこまで高くないと思う(?)

[使った場合]
– 一部システムを移譲できるものの、構成が複雑になり、全員がクラウドネイティブな考えがない限り、運用が大変になりそう
– クラウドネイティブ的な構成になると、デバッグが大変そう(慣れてないだけ?)

所感

使うことによって、構成が限定されるため初期から使う必要はなさそう。
サービスが大きくなってきて、クラウドを存分に使いたくなったら検討したい

疑問

誰か、もし知ってたらおしえてください

  • API Gateway経由で、APIを公開する際に、Lambdaならuser情報を取得できるが、HTTPプロキシでELBやEC2を繋いだときは、ユーザー情報が取得できないっぽいんですが、何か方法ないですかね?(API Gatewayにおいて、 Integration Type = AWS Service というのがあるけど、これでどうにかできないものか…)

続きを読む

AWS Price List API で雑に月額を出してみる

cloudpack あら便利カレンダー 2017 の20日目です。

まえがき

AWS Price List API を使うと AWS 各種サービスの使用料金を巨大な JSON の形式で取得できます。

例えば EC2 のインスタンス使用料金を知りたい場合、 https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/current/region_index.json を取得し、そこに含まれる目的のリージョンのオファーファイルを取得します。

$ curl -I https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/AmazonEC2/20170605233259/ap-northeast-1/index.json
HTTP/1.1 200 OK
Content-Type: application/octet-stream
Content-Length: 10293063
Connection: keep-alive
Date: Mon, 19 Jun 2017 04:27:13 GMT
Last-Modified: Tue, 06 Jun 2017 00:55:29 GMT
ETag: "35bdcd83d7f7863f5f26550206b0e624"
Accept-Ranges: bytes
Server: AmazonS3
Age: 24112
X-Cache: Hit from cloudfront
Via: 1.1 41f313008af830d498dcb13814523bd7.cloudfront.net (CloudFront)
X-Amz-Cf-Id: 1oUuTPzPmELzB65fwzB1ds1ezCEpBn8ZWVANIJT-rURfG2p8buOFMg==

上記は東京リージョンの EC2 の例ですが、 Content-Length: 10293063 というどう見ても人間が素で読む量ではないし、テキストエディターに読み込むのも若干ためらわれるサイズです。

公式のドキュメントによると…

JSON ファイルを使用して EC2 のリザーブドインスタンスを確認するには

  1. JSON ファイルをからダウンロードします。
  2. 適切なプログラムで JSON ファイルを開きます。この例では、Notepad++ を使用します。
  3. Ctrl キーを押しながら F キーを押します。
  4. [Find what:] に [reserved] と入力します。
  5. [Find All in Current Document] を選択します。

とあんまりなので、もう少し楽に価格を調べられる雑スクリプトを作りました。

スクリプト本体

ec2ZatsuPrice.js
const region          = process.env["AWS_REGION"]       || "us-east-1";
const instanceType    = process.env["INSTANCE_TYPE"]    || "c4.large";
const tenancy         = process.env["TENANCY"]          || "Shared";
const operatingSystem = process.env["OPERATING_SYSTEM"] || "Linux";
const offerType       = process.env["OFFER_TYPE"]       || "OnDemand";

// ↑フィルター条件をenvで指定してね

const axios = require("axios");

const api = axios.create({
  baseURL: "https://pricing.us-east-1.amazonaws.com"
});

Promise.resolve().then(() => {
  return api.get("/offers/v1.0/aws/AmazonEC2/current/region_index.json")
}).then(regionIndexResponse => {
  const currentVersionUrl = regionIndexResponse.data.regions[region].currentVersionUrl;

  return api.get(currentVersionUrl);
}).then(currentVersionResponse => {
  const filteredProducts = [];

  for (productCode in currentVersionResponse.data.products) {
    const product = currentVersionResponse.data.products[productCode];

    if (product.attributes.instanceType !== instanceType) continue;
    if (product.attributes.tenancy !== tenancy) continue;
    if (product.attributes.operatingSystem !== operatingSystem) continue;

    filteredProducts.push(product);
  }

  if (filteredProducts.length > 1) {
    console.log(`絞りきれなかったかも… (${filteredProducts.length} products)`);
    process.exit(1);
  };

  const offers = currentVersionResponse.data.terms[offerType][filteredProducts[0].sku];

  if (Object.keys(offers).length > 1) {
    console.log(`絞りきれなかったかも… (${Object.keys(offers).length} offers)`);
    process.exit(1);
  };

  const offer = offers[Object.keys(offers)[0]];
  const priceDimension = offer.priceDimensions[Object.keys(offer.priceDimensions)[0]];
  const pricePerUnit = parseFloat(priceDimension.pricePerUnit.USD);
  const monthlyPrice = pricePerUnit * 24 * 30;

  console.log(JSON.stringify({
    region,
    instanceType,
    tenancy,
    operatingSystem,
    monthlyPrice,

    description: priceDimension.description,
  }, undefined, 2))
}).catch(e => console.log(e));

あまり古くない Node.js と axios が必要です。

使い方

叩くだけです。

$ node ec2ZatsuPrice.js
{
  "region": "us-east-1",
  "instanceType": "c4.large",
  "tenancy": "Shared",
  "operatingSystem": "Linux",
  "monthlyPrice": 72.00000000000001,
  "description": "$0.1 per On Demand Linux c4.large Instance Hour"
}

条件を変えたいときは環境変数で。

$ AWS_REGION=us-west-2 INSTANCE_TYPE=r3.xlarge node ec2ZatsuPrice.js
{
  "region": "us-west-2",
  "instanceType": "r3.xlarge",
  "tenancy": "Shared",
  "operatingSystem": "Linux",
  "monthlyPrice": 239.76000000000002,
  "description": "$0.333 per On Demand Linux r3.xlarge Instance Hour"
}

オレゴンの r3.xlarge 、オンデマンドの Linux インスタンスは月額だいたい240ドルくらいということがわかります。

雑な部分

  • EC2 しか対応してません
  • Reserved Instances (1プロダクト複数オファー)に対応してません
  • 1カ月 == 30日で決め打ちです
  • 変えられるべき条件は他にも沢山ある気がします
  • その他未知の不具合があると思います

最後に

もう少しまともなやつをそのうち作ろうと思います。

続きを読む

Route53にDNS自動登録

  • EC2インスタンスを起動すると内部DNSにAレコードを登録し、停止するとAレコードを削除するinitスクリプトを作成しました。
    ”ZONE_NAME”だけ書き換えてあとはコピペでいける!はずです!

起動停止スクリプト作成

/etc/init.d/aws_route53_edit
#!/bin/bash
# chkconfig: 2345 99 10
# description: regist A record to aws route53
# Author:



ZONE_NAME="ゾーン名"
HOST_NAME=$(aws ec2 describe-tags --output text --query "Tags[?ResourceId==`$(curl -s http://169.254.169.254/latest/meta-data/instance-id)` && Key==`Name`].Value")
IP_ADDRESS=$(curl -s http://169.254.169.254/latest/meta-data/local-ipv4)
HOSTED_ZONE_ID=$(aws route53 list-hosted-zones | grep "Id" | awk '{print $2}' | sed -e 's//hostedzone///' | sed -e 's/,//' | sed -e 's/"//g')
LOCK=/var/lock/subsys/aws_route53_edit


case "$1" in
  "start" )

  ARECORD_JSON="{
    "Changes" : [
      { "Action" : "UPSERT",
        "ResourceRecordSet" : {
          "Name" : "${HOST_NAME}.${ZONE_NAME}",
          "Type" : "A",
          "TTL"  : 60,
          "ResourceRecords": [
            { "Value" : "${IP_ADDRESS}" }
          ]
        }
      }
    ]
  }"
  touch $LOCK
  ;;

  "stop" )

  ARECORD_JSON="{
    "Changes" : [
      { "Action" : "DELETE",
        "ResourceRecordSet" : {
          "Name" : "${HOST_NAME}.${ZONE_NAME}",
          "Type" : "A",
          "TTL"  : 60,
          "ResourceRecords": [
            { "Value" : "${IP_ADDRESS}" }
          ]
        }
      }
    ]
  }"
  rm -f $LOCK
  ;;

  *)
  echo $"Usage: $0 {start|stop}"
  exit 2
  ;;

esac

自動起動停止登録

sudo chmod 755 /etc/init.d/aws_route53_edit
sudo chkconfig --add aws_route53_edit
sudo chkconfig --list aws_route53_edit

確認

  • マネジメントコンソールから落とし上げをして別ウィンドウでRoute53を監視

ハマったところ

  • ロックファイル
    /var/lock/subsys/以下にファイルが無いとうまく動かないことがしばらくわからず時間をかけてしまった。ない状態でやると起動時のレコード登録はうまくいくがレコード削除がされなかった

その他

  • 起動停止時にCloudWatch登録削除なども作成予定なのでスクリプト名やロックファイル名は”aws_サービス名_動作名”などがよさそう

続きを読む

AmazonLinuxにffmpegをインストールする

AWS-EC2のAmazonLinuxにffmpegをインストールしようと思い、つまずいたところがあったので書き残し。

基本的な手順

基本的には公式のCentOS向けの手順を使う。
https://trac.ffmpeg.org/wiki/CompilationGuide/Centos

つまずいたところ

libx264のインストール途中、./configureでmakefileを作成する時に「nasmのバージョンが低いよ」という旨のエラーが出た。

Found no assembler
Minimum version is nasm-2.13
If you really want to compile without asm, configure with --disable-asm.

nasm -vしてみると、NASM version 2.10.07みたいな感じで、たしかにバージョンが足りてない。
公式向け手順の最初でnasmのrepoを追加した後にyum installしているので、なんでバージョン低いやつが入ってるんだ?となった。

解決

結論としては、yum installの時にnasmのrepoが参照されておらず、Amazonのrepo(nasmのバージョンが古い)を参照してインストールが走っていたので、最新バージョンが入っていなかった。

調べてみると、AmazonLinuxは初期設定でAmazonのrepoを優先的に参照する設定になっているらしい。
(参照:http://dev.classmethod.jp/server-side/os/amazonlinux-yum-priority/)

解決方法もこの記事に書いてある通りで、/etc/yum.repos.d/amzn-main.repoを編集して、AmazonのrepoのPriorityを99(デフォルト)にしてあげる。
これでnasmのrepoとAmazonのrepoの優先度が同じになるので、あとは普通にyum install nasmでOK。

続きを読む

AWS Lambda で動的HTMLコンテンツを配信する(Lambdaプロキシ統合を利用)

静的な内容ならS3に置いてWEBホスティングを有効にすればいいのですが、動的にHTMLを生成する場合にLambdaでHTMLを生成してレスポンスする方法です。この方法だとEC2等のサーバを運用する必要がありません。

Lambda

例として適当なHTMLの内容を返すコードは次のとおりです。

exports.handler = (event, context, callback) => {
  // もしQueryStringから値を取り出したい場合はevent.queryStringParametersから取得する

  // 動的にHTMLの内容を作成
  const html = `
  <html>
    <meta http-equiv="Content-Type" content="text/html" charset="utf-8">
    <body>
      <h1>Test Page</h1>
      こんにちわ!
    </body>
  </html>`;

  // Lambdaプロキシ統合の場合は下記のようなObjectでレスポンスを返せる
  const response = {
    statusCode: 200,
    headers: {
      'Content-Type': 'text/html',
    },
    body: html,
  };

  callback(null, response);
};

API Gateway

Lambdaプロキシ統合の使用にチェックをつけて、先程 作成したLambdaを指定します。

スクリーンショット_2017-06-26_11_05_26.png

動作確認

上記のAPIをAWSのコンソールからデプロイして、

スクリーンショット 2017-06-26 11.12.27.png

ブラウザでアクセスしてみます。

スクリーンショット 2017-06-26 11.14.52.png

補足

Lambdaが起動するのに時間がかかるので、実際の運用で利用する場合はAPI Gatewayでキャッシュを設定するか、CloudFront等のCDNを利用するのがよいかと思います。

以上

続きを読む

LoRaWANとSORACOMFunnelのAWSIoTアダプタを使ってDynamoDBにデータを書き込む

はじめに

SORACOMFunnelがAWSIoTに対応しましたね!
ちょうど仕事の関係でSORACOMのシールドが届いたし、近くにLoRaWANのPublicGWもあることだし・・・
ということでちょいと触ってみやした

筆者は今まで、主にKinesisアダプターを利用してデータの収集を行っています
簡単にまとめると、
1. KinesisStreamにセンサーからデータを投げて
2. Lambdaをキックして
3. DynamoDBに突っ込む
といった構成です

また普段からAWSは使っているのですが、AWSIoTを使ってみたことがほとんどなかったので、勉強がてらAWSIoTアダプターでデータの収集をしてみた

やりたいこと

  1. LoRaデバイスからLoRaゲートウェイを通ってFunnel AWSIoTアダプターを利用してAWSIoTにセンサーデータを投げる
  2. AWSIoTが受け取ったデータの中にHEX形式でセンサーデータが格納されているので、デコードするためのLambdaファンクションをキックする
  3. Lambdaでデータをデコードして必要なデータをJSON形式にまとめて、DynamoDBにPUTする

SORACOM Funnelって?

SORACOM Funnel(以下、Funnel) は、デバイスからのデータを特定のクラウドサービスに直接転送するクラウドリソースアダプターです。
Funnel でサポートされるクラウドサービスと、そのサービスの接続先のリソースを指定するだけで、データを指定のリソースにインプット
することができます。

http://soracom.jp/services/funnel/より抜粋
要するに、デバイスからAWSなどのクラウド上に閉域網でデータを送信することができるサービス(合ってるかな・・・)

AWSIoTって?

AWS IoT によって、さまざまなデバイスを AWS の各種 Services や他のデバイスに接続し、データと通信を保護し、
デバイスデータに対する処理やアクションを実行することが可能になります。
アプリケーションからは、デバイスがオフラインの状態でもデバイスとのやり取りが可能です。

https://aws.amazon.com/jp/iot-platform/how-it-works/より抜粋
うーん、なるほどわからん。とりあえず使ってみよう

デバイス側の設定

同じ部署の電気系強いお方が気づいたらセッティングしていただいていましたので割愛
この時点でSORACOM Harvestにてデータが送信されているのを確認できている状態

AWSIoTの設定

Funnelでデータを送信する先のAWSIoTを作成します

エンドポイントを控える

Funnelを設定する際に必要なAWSIoTのエンドポイントを控えておきます

AWSIoT_TOP.PNG

Ruleを作成する

左のサイドメニューから「Rule」を選択し、「Create a rule」をクリック

AWSIoT_Rule.PNG

「Name」と「Description」を入力する(Descriptionは任意)

AWSIoT_Rule_name.PNG

「Attribute」に「*」、「Topic filter」に「IoTDemo/#」を入力
AWSIoTはエンドポイントいかにTopic(今回では「IoTDemo/#」)を指定してリクエストを送ることで、それと一致するTopicFilterを持つRuleが呼ばれます
「Using SQL version」は「2016-03-23」で問題なければそのままでOK

AWSIoT_Rule_massage.PNG

「Set one or more actions」の「add action」をクリック

AWSIoT_Rule_set_action.PNG

今回はLambdaでデコードする必要があるため「Invoke a Lambda function passing the message data」を選択

AWSIoT_Rule_select_lambda.PNG

「Configure action」を選択

AWSIoT_Rule_select_lambda_button.PNG

キックするLambda Functionを選択
今回は初めて作成するので、Lambdaが呼ばれたときのeventの中身をログに吐き出すLambdaを作成して、それをキックするようにします
※DynamoDBに格納する処理は後ほど実装

「Create a new resouce」をクリック。Lambdaのページに遷移します

AWSIoT_Rule_lambda_create.PNG

「Blank Function」を選択

Lambda_create.PNG

Lambdaのトリガーを設定
「IoTタイプ」は「カスタムIoTルール」を選択
「ルール名は」現在作成中のルール名
「SQLステートメント」は作成中の「Rule query statement」の中身をコピー
「次へ」をクリック

Lambda_trigger.PNG

「名前」はお好きなFunction名をつけてください
「ランタイム」は筆者の好みによりNode.jsです
コードには

exports.handler = (event, context, callback) => {
    console.log(event);
};

と書いておいてください。

Lambda_setting.PNG

あとは、DynamoDBの権限を持ったロールを選択(作成)して、ページ下部の「次へ」をクリックしてLambdaFunctionを作成してください

AWSIoTのページに戻って、先ほど作成したLambdaFunctionを選択し、「Add action」をクリック

AWSIoT_Rule_add_lambda.PNG

その後「create Rule」をクリックするとRuleが作成されます
これでAWSIoTのRule作成が完了です

SORACOM Funnelの設定

まず、SORACOMコンソールにログインし、再度メニューから「LoRaグループ」⇒「追加」をクリックします
ポップアップが出てきてグループ名を入力するように言ってくるので、任意のグループ名を入力しグループを作成します

作成したグループを選択し、設定画面に移動します

転送先サービス:AWS IoT
転送先URL:https:///rule内で作成したSQLTopicFilter/#{deviceId}
認証情報:AWSIoTの権限を持ったIAMアカウント情報で作成したもの
送信データ形式:無難にJSON

funnel_setting.PNG

※転送先URLにはプレースホルダーを作成することができます
  - SIMを利用する場合:{imsi}
  - LoRaデバイスを利用する場合:{deviceId}

これでFunnelの設定は完了です

Lambdaの実装

デバイスの電源を入れ、データが送信されるようになると、Lambdaが起動してeventの中身をログに吐き出していると思います
↓こんな感じ

2017-06-23T04:13:59.850Z 62014535-57ca-11e7-b4e4-9fbd147f2037 { 
  operatorId: '0123456789',
  timestamp: 1498191237793,
  destination: { 
    resourceUrl: 'https://xxxxxxxxx.iot.ap-northeast-1.amazonaws.com/xxxxxxx/#{deviceId}',
    service: 'aws-iot',
    provider: 'aws' 
  },
  credentialsId: 'iot-sys',
  payloads: { 
    date: '2017-06-23T04:13:54.276320',
    gatewayData: [ [Object] ],
    data: '7b2268223a36312e367d',
    deveui: '1234567890' 
  },
  sourceProtocol: 'lora',
  deviceId: '1234567890' 
}

センサーから送られてくるデータはevent[“payloads”][“data”]にHEX形式で格納されているので、取り出してデコードする必要があります。


const data = event["payloads"]["data"];
const decodeData = new Buffer(data, "hex").toString("utf8");

デコードすると「7b2268223a36312e367d」⇒「{“h”: 61.6}」のようなString型になります(これは一例)

Object型のほうが使い勝手がよいので、parseしてしまいましょう


const parseData = JSON.parse(decodeData); // {h : 61.6}

あとはDynamoDBにputで投げつけます

index.js
"use strict";

const AWS = require("aws-sdk");
const co = require("co");
const moment = require("moment-timezone");

const dynamodb = new AWS.DynamoDB.DocumentClient({
  region: "ap-northeast-1"
});

const dynamoPutData = require("./lib/dynamo_put_data");

exports.handler = (event, context, callback) => {
  // UTCなのでJSTに変換
  const date = event["payloads"]["date"];
  const time = moment(date).tz("Asia/Tokyo").format();
  // HEX形式をデコード
  const data = event["payloads"]["data"];
  const decodeData = new Buffer(data, "hex").toString("utf8");
  // Object型に変換
  const parseData = JSON.parse(decodeData);
  // deviceIdを取得
  const deviceId = event["deviceId"];

  // DynamoDBにPUTするItem
  const item = [{
    deviceId: deviceId,
    time: time,
    value: parseData
  }];

  co(function *() {
    yield dynamoPutData.putDynamoDB(dynamodb, item[0]);
  }).then(() => {
    console.log("success!")
  }).catch((err) => {
    console.log(err);
  });
};

dynamo_put_data.js
"use strict";

class dynamoPutData {
  /**
   * DynamoDBへのPUT処理
   * @param {DocumentClient} dynamoDB
   * @param item
   * @returns {Promise}
   */
  static putDynamoDB(dynamoDB, item) {
    const params = {
      TableName: "TABLE_NAME",
      Item: item
    };
    return dynamoDB.put(params).promise();
  }
}

module.exports = dynamoPutData;

dynamo_put_data.js中の”TABLE_NAME”にはデータを投げつけるテーブル名を書いてください
関数を外だしして複数ファイルがあるので、Lambdaにはソースコード一式をZIPに固めてアップする方法でデプロイを行います
データが送られてきてLambdaがキックされると、DynamoDBにデータが格納されていると思います

まとめ

日ごろからAWSのサービスを使っていましたが、AWSIoTを利用する機会がなくとてもいい経験になりました。
今回はデバイスからクラウドといった方向でしたが、AWSIoTを利用すればその逆方向も実現することができるらしいので、近々そういった実装もしてみたと思います

では!

続きを読む

Amazon Kinesis Streamの監視方法

Kinesis Streamのシャードには以下のような制限がある。

  • 書き込み 1シャード 最大1秒あたり1,000レコードまたは1MBまで
  • 読み込み 1シャード 最大1秒あたり5回の読み込みまたは2MBまで

シャードへの流入・流出量を上記の値を超えないように・または超えたことを検知できるように
監視していかないといけない。

そのため、CloudWatchでのKinesisの監視項目をまとめる。

どのメトリクスを見るべきか

CloudWatchでどういうメトリクスが見れるかは公式ドキュメントに記載がある。
http://docs.aws.amazon.com/ja_jp/streams/latest/dev/monitoring-with-cloudwatch.html

その中でも以下の項目を見ると、シャードを増やすしきい値が見えてきそう。

WriteProvisionedThroughputExceeded

シャードの書き込み上限の超過が発生しているか否かを判定するのなら、この項目を見ればよい。
これは指定された期間中にストリームの上限超過が発生した件数が出力される。

これが定常的に1以上になった場合は、シャードを増やしたほうが良さそう。

AWS CLIでの取得方法

profile=hoge
stream_name=hoge

aws --profile ${profile} cloudwatch get-metric-statistics 
--namespace AWS/Kinesis 
--metric-name WriteProvisionedThroughputExceeded 
--start-time 2017-06-20T12:00:00 
--end-time 2017-06-20T13:00:00 
--period 60 
--statistics Maximum 
--dimensions Name=StreamName,Value=${stream_name}

ReadProvisionedThroughputExceeded

これは上記に記載した「WriteProvisionedThroughputExceeded」の読み込み版。
こちらも合わせて見たい。

PutRecords.Bytes / PutRecords.Records

これは指定期間内にストリームに送信されたバイト/レコード数を確認することが出来る項目。
周期の最小単位が1分となっているため、指標として使うならこの値を使って1秒あたりの数値に計算しなおすとわかりやすい。

この項目を見ることにより、シャードの上限の超過の発生前に事前にシャードを増やすことができる。

PutRecordというのもあるが、KPLを使用している場合はPutRecordsを使う。

AWS CLIでの取得方法

profile=hoge
stream_name=hoge

aws --profile ${profile} cloudwatch get-metric-statistics 
--namespace AWS/Kinesis 
--metric-name PutRecords.Bytes 
--start-time 2017-06-22T12:00:00 
--end-time 2017-06-22T12:05:00 
--period 60 
--statistics Sum 
--dimensions Name=StreamName,Value=${stream_name} | jq .
response
{
  "Datapoints": [
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:02:00Z",
      "Sum": 4777677
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:01:00Z",
      "Sum": 4241130
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:00:00Z",
      "Sum": 5064734
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:04:00Z",
      "Sum": 4647186
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:03:00Z",
      "Sum": 4581718
    }
  ],
  "Label": "PutRecords.Bytes"
}
# 1分周期で取っているので、60(秒)で割って、1秒あたりのByteを計算
profile=hoge
stream_name=hoge

aws --profile ${profile} cloudwatch get-metric-statistics 
--namespace AWS/Kinesis 
--metric-name PutRecords.Bytes 
--start-time 2017-06-22T12:00:00 
--end-time 2017-06-22T12:05:00 
--period 60 
--statistics Sum 
--dimensions Name=StreamName,Value=${stream_name} | jq -r '.Datapoints[].Sum' | xargs -i expr {} / 60
response
79627
70685
84412
77453
76361

GetRecords.Bytes / GetRecords.Records

上記の「PutRecords.Bytes / PutRecords.Records」の読み込み版。
こちらも合わせて見たい。

続きを読む