LoRaWANとSORACOMFunnelのAWSIoTアダプタを使ってDynamoDBにデータを書き込む

はじめに

つい先日、SORACOMFunnelがAWSIoTに対応したというニュースを耳にしました
ちょうど仕事の関係でSORACOMのシールドが届いたし、会社にLoRaWANのPublicGWもあることだし・・・
ということでちょいと触ってみた
SORACOM公式ブログにも手順が書いてありましたが、ちょっと躓いたところがあったりしたので、まとめてみました

やりたいこと

  1. LoRaデバイスからLoRaゲートウェイを通ってAWSIoTにセンサーデータを投げる
  2. AWSIoTが受け取ったデータを加工するためのLambdaファンクションをキックする
  3. Lambdaがデータを加工してDynamoDBに格納する

SORACOM Funnelって?

SORACOM Funnel(以下、Funnel) は、デバイスからのデータを特定のクラウドサービスに直接転送するクラウドリソースアダプターです。
Funnel でサポートされるクラウドサービスと、そのサービスの接続先のリソースを指定するだけで、データを指定のリソースにインプット
することができます。

http://soracom.jp/services/funnel/より抜粋
要するに、デバイスからAWSなどのクラウド上に閉域網でデータを送信することができるサービス(合ってるかな・・・)

AWSIoTって?

AWS IoT によって、さまざまなデバイスを AWS の各種 Services や他のデバイスに接続し、データと通信を保護し、
デバイスデータに対する処理やアクションを実行することが可能になります。
アプリケーションからは、デバイスがオフラインの状態でもデバイスとのやり取りが可能です。

https://aws.amazon.com/jp/iot-platform/how-it-works/より抜粋
うーん、なるほどわからん。とりあえず使ってみよう

デバイス側の設定

同じ部署の電気系強いお方が気づいたらセッティングしていただいていましたので割愛
この時点でSORACOM Harvestにてデータが送信されているのを確認できている状態

AWSIoTの設定

Funnelでデータを送信する先のAWSIoTを作成します

エンドポイントを控える

Funnelを設定する際に必要なAWSIoTのエンドポイントを控えておきます

AWSIoT_TOP.PNG

Ruleを作成する

左のサイドメニューから「Rule」を選択し、「Create a rule」をクリック

AWSIoT_Rule.PNG

「Name」と「Description」を入力する(Descriptionは任意)

AWSIoT_Rule_name.PNG

「Attribute」に「*」、「Topic filter」に「IoTDemo/#」を入力
「Using SQL version」は「2016-03-23」で問題なければそのままでOK

AWSIoT_Rule_massage.PNG

「Set one or more actions」の「add action」をクリック

AWSIoT_Rule_set_action.PNG

今回はLambdaでデコードする必要があるため「Invoke a Lambda function passing the message data」を選択

AWSIoT_Rule_select_lambda.PNG

「Configure action」を選択

AWSIoT_Rule_select_lambda_button.PNG

キックするLambda Functionを選択
今回は初めて作成するので、Lambdaが呼ばれたときのeventの中身をログに吐き出すLambdaを作成して、それをキックするようにします
※DynamoDBに格納する処理は後ほど実装

「Create a new resouce」をクリック。Lambdaのページに遷移します

AWSIoT_Rule_lambda_create.PNG

「Blank Function」を選択

Lambda_create.PNG

Lambdaのトリガーを設定
「IoTタイプ」は「カスタムIoTルール」を選択
「ルール名は」現在作成中のルール名
「SQLステートメント」は作成中の「Rule query statement」の中身をコピー
「次へ」をクリック

Lambda_trigger.PNG

「名前」はお好きなFunction名をつけてください
「ランタイム」は筆者の好みによりNode.jsです
コードには

exports.handler = (event, context, callback) => {
    console.log(event);
};

と書いておいてください。

Lambda_setting.PNG

あとは、DynamoDBの権限を持ったロールを選択(作成)して、ページ下部の「次へ」をクリックしてLambdaFunctionを作成してください

AWSIoTのページに戻って、先ほど作成したLambdaFunctionを選択し、「Add action」をクリック

AWSIoT_Rule_add_lambda.PNG

その後「create Rule」をクリックするとRuleが作成されます
これでAWSIoTのRule作成が完了です

SORACOM Funnelの設定

まず、SORACOMコンソールにログインし、再度メニューから「LoRaグループ」⇒「追加」をクリックします
ポップアップが出てきてグループ名を入力するように言ってくるので、任意のグループ名を入力しグループを作成します

作成したグループを選択し、設定画面に移動します

転送先サービス:AWS IoT
転送先URL:https:///rule内で作成したSQLTopicFilter/#{deviceId}
認証情報:AWSIoTの権限を持ったIAMアカウント情報で作成したもの
送信データ形式:無難にJSON

funnel_setting.PNG

※転送先URLにはプレースホルダーを作成することができます
  - SIMを利用する場合:{imsi}
  - LoRaデバイスを利用する場合:{deviceId}

これでFunnelの設定は完了です

Lambdaの実装

デバイスの電源を入れ、データが送信されるようになると、Lambdaが起動してeventの中身をログに吐き出していると思います
↓こんな感じ

2017-06-23T04:13:59.850Z 62014535-57ca-11e7-b4e4-9fbd147f2037 { 
  operatorId: '0123456789',
  timestamp: 1498191237793,
  destination: { 
    resourceUrl: 'https://xxxxxxxxx.iot.ap-northeast-1.amazonaws.com/xxxxxxx/#{deviceId}',
    service: 'aws-iot',
    provider: 'aws' 
  },
  credentialsId: 'iot-sys',
  payloads: { 
    date: '2017-06-23T04:13:54.276320',
    gatewayData: [ [Object] ],
    data: '7b2268223a36312e367d',
    deveui: '1234567890' 
  },
  sourceProtocol: 'lora',
  deviceId: '1234567890' 
}

センサーから送られてくるデータはevent[“payloads”][“data”]にHEX形式で格納されているので、取り出してデコードする必要があります。


const data = event["payloads"]["data"];
const decodeData = new Buffer(data, "hex").toString("utf8");

デコードすると「7b2268223a36312e367d」⇒「{“h”: 61.6}」のようなString型になります(これは一例)

Object型のほうが使い勝手がよいので、parseしてしまいましょう


const parseData = JSON.parse(decodeData); // {h : 61.6}

あとはDynamoDBにputで投げつけます

index.js
"use strict";

const AWS = require("aws-sdk");
const co = require("co");
const moment = require("moment-timezone");

const dynamodb = new AWS.DynamoDB.DocumentClient({
  region: "ap-northeast-1"
});

const dynamoPutData = require("./lib/dynamo_put_data");

exports.handler = (event, context, callback) => {
  // UTCなのでJSTに変換
  const date = event["payloads"]["date"];
  const time = moment(date).tz("Asia/Tokyo").format();
  // HEX形式をデコード
  const data = event["payloads"]["data"];
  const decodeData = new Buffer(data, "hex").toString("utf8");
  // Object型に変換
  const parseData = JSON.parse(decodeData);
  // deviceIdを取得
  const deviceId = event["deviceId"];

  // DynamoDBにPUTするItem
  const item = [{
    deviceId: deviceId,
    time: time,
    value: parseData
  }];

  co(function *() {
    yield dynamoPutData.putDynamoDB(dynamodb, item[0]);
  }).then(() => {
    console.log("success!")
  }).catch((err) => {
    console.log(err);
  });
};

dynamo_put_data.js
"use strict";

class dynamoPutData {
  /**
   * DynamoDBへのPUT処理
   * @param {DocumentClient} dynamoDB
   * @param item
   * @returns {Promise}
   */
  static putDynamoDB(dynamoDB, item) {
    const params = {
      TableName: "TABLE_NAME",
      Item: item
    };
    return dynamoDB.put(params).promise();
  }
}

module.exports = dynamoPutData;

dynamo_put_data.js中の”TABLE_NAME”にはデータを投げつけるテーブル名を書いてください
関数を外だしして複数ファイルがあるので、Lambdaにはソースコード一式をZIPに固めてアップする方法でデプロイを行います
データが送られてきてLambdaがキックされると、DynamoDBにデータが格納されていると思います

まとめ

日ごろからAWSのサービスを使っていましたが、AWSIoTを利用する機会がなくとてもいい経験になりました。
今回はデバイスからクラウドといった方向でしたが、AWSIoTを利用すればその逆方向も実現することができるらしいので、近々そういった実装もしてみたと思います

では!

続きを読む

CodeStarバンザイ!数クリックで始めるCI/CDパイプライン

CodeStarを使ってCI/CDパイプラインを構築してみたので、紹介します。

CodeStarとは?

AWSのマネージドサービスであるCodePipeline、CodeCommit、CodeBuild、CodeDeployを使ってCI/CDパイプライン+実行環境をさくっと構築してくれるサービスらしい。

https://aws.amazon.com/jp/codestar/

よしっ、動かしてみるぞ!!

1.プロジェクト作成

マネジメントコンソールからCodeStarを選び、「start a project」をポチッ。
するとプロジェクトのテンプレートを選択する画面が表示される。

カテゴリは、Web application、Web service、Alexa Skill、Static Websiteから、
言語は、Ruby、Node.js、Java、Python、PHP、HTML 5から、
実行環境は、Beanstalk、EC2、Lambdaから選択できる。
※もちろん存在しない組み合わせもあります。

今の時代っぽいWeb Application×Node.js×Lambdaなんてのも選択できるんですね。

うーん、ここはCodeBuildも使ってみたいし「Web Application×Java Spring×EC2」を選択。

使う1.png

そして、プロジェクト名を入力。インスタンスタイプを設定して。ポチッ、ポチッ。

使う2.png

。。。
はいっ、CI/CDパイプライン構築の作業はこれで終わり。

そして、待つこと10分。
CodePipeline、CodeCommit、CodeBuild、CodeDeployと、これを統合的に確認するためのダッシュボードがいい感じにできちゃいました。
もちろんjavaのwebアプリケーションも起動しています。

ダッシュボード

使う4修正.png

CodePipeline

使う8.png

CodeCommit

使う5.png

CodeBuild

使う6.png

CodeDeploy

使う7.png

デプロイされたjavaのwebアプリケーション

12.png

2.CI/CDパイプライン実行

ここからが本番。
gitへの接続情報をIAMで確認(ユーザ⇒認証情報⇒AWS CodeCommit の HTTPS Git 認証情報「生成」)し、code commit上のソースコードをcloneする。
するとこんなものが落ちてきます。

tree
.
├── README.md
├── appspec.yml
├── buildspec.yml
├── pom.xml
├── scripts
│   ├── install_dependencies
│   └── start_server
└── src
    └── main
        ├── java
        │   └── com
        │       └── aws
        │           └── codestar
        │               └── projecttemplates
        │                   ├── HelloWorldAppInitializer.java
        │                   ├── configuration
        │                   │   ├── ApplicationConfig.java
        │                   │   └── MvcConfig.java
        │                   └── controller
        │                       └── HelloWorldController.java
        ├── resources
        │   └── application.properties
        └── webapp
            ├── WEB-INF
            │   └── views
            │       └── index.jsp
            └── resources
                ├── gradients.css
                ├── set-background.js
                ├── styles.css
                └── tweet.svg

ふむふむ、なるほど。
ここは手っ取り早くCI/CDパイプラインを確認するため、index.jspをちょこっと修正。
そして、code commitにpush。
code commit上で、変更した内容も確認できます。

使う11.png
すると。。。

使う9.png

パイプラインが動き出したーーー
どうやら動きとしては、こんなことをやっているみたい。

  • Source : 新たしいコードがpushされると、code commitからソースコードを取得しS3に格納。
  • Build : S3からソースコードを取得しビルド。そしてビルドしたモジュールをS3に格納。
  • Application : S3に格納されたモジュールをEC2にデプロイ。

待つこと5分。デプロイまで成功。
そして、先程の画面を確認してみると。。。

使う10.png

変わった!
簡単!!

これを使えば

  • Java、Rubyなどメジャーな言語を利用したCI/CDパイプラインを爆速で構築できる。
  • Jenkinsから開放される。

がしかし。。

  • 東京リージョンにはまだ来ていない。
  • CodeStarというかcode commit側の問題になるが、pull requestが使えない。。

本番用のアプリケーション開発環境・実行環境として利用するのは、まだまだ難しいような気もしますが、
pocくらいであればこれで十分かもしれませんね。

続きを読む

Amazon Kinesis Streamの監視方法

Kinesis Streamのシャードには以下のような制限がある。

  • 書き込み 1シャード 最大1秒あたり1,000レコードまたは1MBまで
  • 読み込み 1シャード 最大1秒あたり5回の読み込みまたは2MBまで

シャードへの流入・流出量を上記の値を超えないように・または超えたことを検知できるように
監視していかないといけない。

そのため、CloudWatchでのKinesisの監視項目をまとめる。

どのメトリクスを見るべきか

CloudWatchでどういうメトリクスが見れるかは公式ドキュメントに記載がある。
http://docs.aws.amazon.com/ja_jp/streams/latest/dev/monitoring-with-cloudwatch.html

その中でも以下の項目を見ると、シャードを増やすしきい値が見えてきそう。

WriteProvisionedThroughputExceeded

シャードの書き込み上限の超過が発生しているか否かを判定するのなら、この項目を見ればよい。
これは指定された期間中にストリームの上限超過が発生した件数が出力される。

これが定常的に1以上になった場合は、シャードを増やしたほうが良さそう。

AWS CLIでの取得方法

profile=hoge
stream_name=hoge

aws --profile ${profile} cloudwatch get-metric-statistics 
--namespace AWS/Kinesis 
--metric-name WriteProvisionedThroughputExceeded 
--start-time 2017-06-20T12:00:00 
--end-time 2017-06-20T13:00:00 
--period 60 
--statistics Maximum 
--dimensions Name=StreamName,Value=${stream_name}

ReadProvisionedThroughputExceeded

これは上記に記載した「WriteProvisionedThroughputExceeded」の読み込み版。
こちらも合わせて見たい。

PutRecords.Bytes / PutRecords.Records

これは指定期間内にストリームに送信されたバイト/レコード数を確認することが出来る項目。
周期の最小単位が1分となっているため、指標として使うならこの値を使って1秒あたりの数値に計算しなおすとわかりやすい。

この項目を見ることにより、シャードの上限の超過の発生前に事前にシャードを増やすことができる。

PutRecordというのもあるが、KPLを使用している場合はPutRecordsを使う。

AWS CLIでの取得方法

profile=hoge
stream_name=hoge

aws --profile ${profile} cloudwatch get-metric-statistics 
--namespace AWS/Kinesis 
--metric-name PutRecords.Bytes 
--start-time 2017-06-22T12:00:00 
--end-time 2017-06-22T12:05:00 
--period 60 
--statistics Sum 
--dimensions Name=StreamName,Value=${stream_name} | jq .
response
{
  "Datapoints": [
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:02:00Z",
      "Sum": 4777677
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:01:00Z",
      "Sum": 4241130
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:00:00Z",
      "Sum": 5064734
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:04:00Z",
      "Sum": 4647186
    },
    {
      "Unit": "Bytes",
      "Timestamp": "2017-06-22T12:03:00Z",
      "Sum": 4581718
    }
  ],
  "Label": "PutRecords.Bytes"
}
# 1分周期で取っているので、60(秒)で割って、1秒あたりのByteを計算
profile=hoge
stream_name=hoge

aws --profile ${profile} cloudwatch get-metric-statistics 
--namespace AWS/Kinesis 
--metric-name PutRecords.Bytes 
--start-time 2017-06-22T12:00:00 
--end-time 2017-06-22T12:05:00 
--period 60 
--statistics Sum 
--dimensions Name=StreamName,Value=${stream_name} | jq -r '.Datapoints[].Sum' | xargs -i expr {} / 60
response
79627
70685
84412
77453
76361

GetRecords.Bytes / GetRecords.Records

上記の「PutRecords.Bytes / PutRecords.Records」の読み込み版。
こちらも合わせて見たい。

続きを読む