LoRaWANとSORACOMFunnelのAWSIoTアダプタを使ってDynamoDBにデータを書き込む

はじめに

つい先日、SORACOMFunnelがAWSIoTに対応したというニュースを耳にしました
ちょうど仕事の関係でSORACOMのシールドが届いたし、会社にLoRaWANのPublicGWもあることだし・・・
ということでちょいと触ってみた
SORACOM公式ブログにも手順が書いてありましたが、ちょっと躓いたところがあったりしたので、まとめてみました

やりたいこと

  1. LoRaデバイスからLoRaゲートウェイを通ってAWSIoTにセンサーデータを投げる
  2. AWSIoTが受け取ったデータを加工するためのLambdaファンクションをキックする
  3. Lambdaがデータを加工してDynamoDBに格納する

SORACOM Funnelって?

SORACOM Funnel(以下、Funnel) は、デバイスからのデータを特定のクラウドサービスに直接転送するクラウドリソースアダプターです。
Funnel でサポートされるクラウドサービスと、そのサービスの接続先のリソースを指定するだけで、データを指定のリソースにインプット
することができます。

http://soracom.jp/services/funnel/より抜粋
要するに、デバイスからAWSなどのクラウド上に閉域網でデータを送信することができるサービス(合ってるかな・・・)

AWSIoTって?

AWS IoT によって、さまざまなデバイスを AWS の各種 Services や他のデバイスに接続し、データと通信を保護し、
デバイスデータに対する処理やアクションを実行することが可能になります。
アプリケーションからは、デバイスがオフラインの状態でもデバイスとのやり取りが可能です。

https://aws.amazon.com/jp/iot-platform/how-it-works/より抜粋
うーん、なるほどわからん。とりあえず使ってみよう

デバイス側の設定

同じ部署の電気系強いお方が気づいたらセッティングしていただいていましたので割愛
この時点でSORACOM Harvestにてデータが送信されているのを確認できている状態

AWSIoTの設定

Funnelでデータを送信する先のAWSIoTを作成します

エンドポイントを控える

Funnelを設定する際に必要なAWSIoTのエンドポイントを控えておきます

AWSIoT_TOP.PNG

Ruleを作成する

左のサイドメニューから「Rule」を選択し、「Create a rule」をクリック

AWSIoT_Rule.PNG

「Name」と「Description」を入力する(Descriptionは任意)

AWSIoT_Rule_name.PNG

「Attribute」に「*」、「Topic filter」に「IoTDemo/#」を入力
「Using SQL version」は「2016-03-23」で問題なければそのままでOK

AWSIoT_Rule_massage.PNG

「Set one or more actions」の「add action」をクリック

AWSIoT_Rule_set_action.PNG

今回はLambdaでデコードする必要があるため「Invoke a Lambda function passing the message data」を選択

AWSIoT_Rule_select_lambda.PNG

「Configure action」を選択

AWSIoT_Rule_select_lambda_button.PNG

キックするLambda Functionを選択
今回は初めて作成するので、Lambdaが呼ばれたときのeventの中身をログに吐き出すLambdaを作成して、それをキックするようにします
※DynamoDBに格納する処理は後ほど実装

「Create a new resouce」をクリック。Lambdaのページに遷移します

AWSIoT_Rule_lambda_create.PNG

「Blank Function」を選択

Lambda_create.PNG

Lambdaのトリガーを設定
「IoTタイプ」は「カスタムIoTルール」を選択
「ルール名は」現在作成中のルール名
「SQLステートメント」は作成中の「Rule query statement」の中身をコピー
「次へ」をクリック

Lambda_trigger.PNG

「名前」はお好きなFunction名をつけてください
「ランタイム」は筆者の好みによりNode.jsです
コードには

exports.handler = (event, context, callback) => {
    console.log(event);
};

と書いておいてください。

Lambda_setting.PNG

あとは、DynamoDBの権限を持ったロールを選択(作成)して、ページ下部の「次へ」をクリックしてLambdaFunctionを作成してください

AWSIoTのページに戻って、先ほど作成したLambdaFunctionを選択し、「Add action」をクリック

AWSIoT_Rule_add_lambda.PNG

その後「create Rule」をクリックするとRuleが作成されます
これでAWSIoTのRule作成が完了です

SORACOM Funnelの設定

まず、SORACOMコンソールにログインし、再度メニューから「LoRaグループ」⇒「追加」をクリックします
ポップアップが出てきてグループ名を入力するように言ってくるので、任意のグループ名を入力しグループを作成します

作成したグループを選択し、設定画面に移動します

転送先サービス:AWS IoT
転送先URL:https:///rule内で作成したSQLTopicFilter/#{deviceId}
認証情報:AWSIoTの権限を持ったIAMアカウント情報で作成したもの
送信データ形式:無難にJSON

funnel_setting.PNG

※転送先URLにはプレースホルダーを作成することができます
  - SIMを利用する場合:{imsi}
  - LoRaデバイスを利用する場合:{deviceId}

これでFunnelの設定は完了です

Lambdaの実装

デバイスの電源を入れ、データが送信されるようになると、Lambdaが起動してeventの中身をログに吐き出していると思います
↓こんな感じ

2017-06-23T04:13:59.850Z 62014535-57ca-11e7-b4e4-9fbd147f2037 { 
  operatorId: '0123456789',
  timestamp: 1498191237793,
  destination: { 
    resourceUrl: 'https://xxxxxxxxx.iot.ap-northeast-1.amazonaws.com/xxxxxxx/#{deviceId}',
    service: 'aws-iot',
    provider: 'aws' 
  },
  credentialsId: 'iot-sys',
  payloads: { 
    date: '2017-06-23T04:13:54.276320',
    gatewayData: [ [Object] ],
    data: '7b2268223a36312e367d',
    deveui: '1234567890' 
  },
  sourceProtocol: 'lora',
  deviceId: '1234567890' 
}

センサーから送られてくるデータはevent[“payloads”][“data”]にHEX形式で格納されているので、取り出してデコードする必要があります。


const data = event["payloads"]["data"];
const decodeData = new Buffer(data, "hex").toString("utf8");

デコードすると「7b2268223a36312e367d」⇒「{“h”: 61.6}」のようなString型になります(これは一例)

Object型のほうが使い勝手がよいので、parseしてしまいましょう


const parseData = JSON.parse(decodeData); // {h : 61.6}

あとはDynamoDBにputで投げつけます

index.js
"use strict";

const AWS = require("aws-sdk");
const co = require("co");
const moment = require("moment-timezone");

const dynamodb = new AWS.DynamoDB.DocumentClient({
  region: "ap-northeast-1"
});

const dynamoPutData = require("./lib/dynamo_put_data");

exports.handler = (event, context, callback) => {
  // UTCなのでJSTに変換
  const date = event["payloads"]["date"];
  const time = moment(date).tz("Asia/Tokyo").format();
  // HEX形式をデコード
  const data = event["payloads"]["data"];
  const decodeData = new Buffer(data, "hex").toString("utf8");
  // Object型に変換
  const parseData = JSON.parse(decodeData);
  // deviceIdを取得
  const deviceId = event["deviceId"];

  // DynamoDBにPUTするItem
  const item = [{
    deviceId: deviceId,
    time: time,
    value: parseData
  }];

  co(function *() {
    yield dynamoPutData.putDynamoDB(dynamodb, item[0]);
  }).then(() => {
    console.log("success!")
  }).catch((err) => {
    console.log(err);
  });
};

dynamo_put_data.js
"use strict";

class dynamoPutData {
  /**
   * DynamoDBへのPUT処理
   * @param {DocumentClient} dynamoDB
   * @param item
   * @returns {Promise}
   */
  static putDynamoDB(dynamoDB, item) {
    const params = {
      TableName: "TABLE_NAME",
      Item: item
    };
    return dynamoDB.put(params).promise();
  }
}

module.exports = dynamoPutData;

dynamo_put_data.js中の”TABLE_NAME”にはデータを投げつけるテーブル名を書いてください
関数を外だしして複数ファイルがあるので、Lambdaにはソースコード一式をZIPに固めてアップする方法でデプロイを行います
データが送られてきてLambdaがキックされると、DynamoDBにデータが格納されていると思います

まとめ

日ごろからAWSのサービスを使っていましたが、AWSIoTを利用する機会がなくとてもいい経験になりました。
今回はデバイスからクラウドといった方向でしたが、AWSIoTを利用すればその逆方向も実現することができるらしいので、近々そういった実装もしてみたと思います

では!

続きを読む

AWS独学メモ

頑張って学んでいきます。

サービス俯瞰

コンピューティング関連

サービス名 概要
EC2 仮想サーバー
EC2 Container Service Doker(アプリ実行環境構築ツール)運用サービス
EC2 Container Regstry Dokerイメージ保存・共有サービス。
Elastic Beanstalk .NET/PHP/Python/Ruby/Node.jsアプリを自動でAWSにデプロイ。
Lambda クライアントからのリクエスト発生時に任意プログラミング起動。イベント駆動型サービス。
Auto Scaling CPU使用率等、事前決定条件に応じ、EC2インスタンス増減
Elastic Load Balancing トラフィックに応じ、複数EC2インスタンスに負荷分散

ストレージ・コンテンツ配信

サービス名 概要
S3 ファイルサーバ。画像格納したり。
CloudFront コンテンツ配信ネットワーク。利用者から近い場所から効率よく配信
EBS EC2データを保持するストレージ。EC2のHDD,SSDのような役割。
Elastic File System EC2共有ファイルストレージ
Glacier 低価格ストレージ。仕様頻度低いけど長期保存のバックアップ用。
Import / Export Snowball ペタバイト級の大容量転送サービス。
Storage Gateway オンプレミスとAWSを接続

DB関連

サービス名 概要
RDS DB(MySQL/Oracle/SQL Server/PostgreSQL/Aurora)が利用できる
Database Migration Service 最小限停止時間でDBを移行。オンプレミスのDBサーバからの移行等に用いる
DynamoDB NoSQLデータベスサービス構築/運用。
ElastiCache クラウドでのメモり内キャッシュの管理サービス
Redshift ビッグデータを分析

ネットワーク

サービス名 概要
VPC プライベートネットワーク構築サービス。
Direct Connect オンプレミスのネットワークとAWSのVPCネットワークを直接接続。
Route 53 DNS(ドメイン名とIPアドレスを対応)

開発者用ツール

サービス名 概要
CodeCommit プライベートGit
CodeDeploy 開発アプリを実行環境に自動配置
CodePipeline 継続的デリバリ使用したアプリのリリース

開発ツール

サービス名 概要
CloudWatch AWSリソース監視サービス
CloudFormation テンプレート利用したリソースの作成と管理
CloudTrail ユーザアクティビティとAPI使用状況確認
Config リソースのイベントリ変更の追跡
OpsWorks Chef利用し操作の自動化
Service Catalog 標準化製品の作成と使用
Trusted Advisor パフォーマンスとせきゅりてぃの最適化

セキュリティ

サービス名 概要
IAM AWS認証
Directory Service Active Directoryのホスティングと管理
Inspector アプリのセキュリティ分析
CloudHSM 暗号鍵管理の専用ハードウェア
Key Management Service 暗号鍵作成と管理
WAF 攻撃から保護するファイアウォール

分析

サービス名 概要
EMR Hadoopフレームワーク
Data Pipeline オーケストレーションサービス
Kinesis リアルタイムストリーミングデータとの連携
Machine Learning 機械学習
QuickSight 高速ビジネスインテリジェンスサービス

モバイルサービス

サービス名 概要
Mobile Hub モバイルアプリの構築/テスト/監視
API Gateway RESTful APIの構築/管理
Cofnito ユーザID及びアプリデータの同期
Device Farm iOS/Android/FireOSアプリのテスト
Mobile Analytics アプリ分析の収集/表示/エクスポート
Mobile SDK モバイルソフトウェアの開発キット

アプリケーションサービス

サービス名 概要
AppStream ストリーミングサービス
CloudSearch マネージド型検索サービス
Elastic Transcorder メディアと動画変換
SES Eメール送受信
SNS プッシュ通知サービス
SQS メッセージキューサービス
SWF アプリ同士を連携ワークフローサービス

大企業向け

サービス名 概要
WorkSpaces クラウド上仮想デスクトップパソコンサービス
WorkMail セキュリティ保護、企業向けEメール及びカレンダー
WorkDocs ファイル共有サービス

S3について

用語

用語 意味
バケット データの入れ物
オブジェクト 格納ファイル

ステップ

  1. バケット作成
  2. オブジェクト格納

EC2について

用語

用語 意味
EC2 仮想サーバ。オンプレミスのWindowsサーバやUNIXサーバに相当。
インスタンス 1台の仮想サーバ
EBS(Elastic Block Store) サーバのHDDに相当する仮想ディスク
AMI(Amazon Machine Image) サーバにインストールするOSやミドルウェアやアプリのイメージ。新インスタンスを複数生成時、AMIを利用。
yum パッケージ管理システム
scp(secure copy) SSH機能を用いて、安全にファイル転送する

EC2にSSH接続した

参考ページ1
参考ページ2

ミドルウェアをインストール

yum更新
$ sudo yum -y update
httpdインストール
$ sudo yum  install -y httpd
httpd起動
$ sudo service httpd start
httpd自動起動を確認
$ sudo chkconfig --list httpd
httpd自動起動を設定
$ sudo chkconfig  httpd on
$ sudo chkconfig  httpd off

scp(コンテンツをアップロードする)

【現在ここで躓き中!】
→ 突破!!

参考ページ1

HTTPコンテンツをコピー

HTTPコンテンツのコピー

$ sudo cp /home/ec2-user/index.html /var/www/html/

【現在ここで躓き中!】index.htmlへアクセスできない

続きを読む

AWSでprivate subnetからのインターネットサービスを利用する方法について

  • AWSでprivate subnetからのインターネットサービスを利用する方法について、簡単に列挙する。

前提

  • private subnet(インターネットゲートェイへのルート定義を持たないサブネット)は、内部からインターネットに接続できない。
  • そのため、そのままでは、S3やDynamoDbを使うことができない。ヤフーやグーグルも見れない。

改善するために方法が3つある。

NATインスタンス

  • メリット:EC2インスタンスを立てるため、柔軟な構成を取れる。
  • デメリット:フェイルオーバーを自分で考慮しないといけない。

NATゲートウェイ

  • メリット:管理が楽。
  • デメリット:マネージドサービスのため、柔軟な構成が取れない。

VPCエンドポイント

  • メリット:インターネット、NAT デバイス、VPN 接続、または AWS Direct Connect を経由せずに、VPC と他の AWS サービスとをプライベートに接続できる。
  • デメリット:S3のみ!DynamoDbがパブリックプレビューに(2017/4/19)

その他

  • SQS,SES,SNSなどのサービスもprivate subnetからだと使えない?使える? 要調査。
  • 料金的な比較もしたいが、嘘つきそうなので省略。
  • 疑問)セキュリティグループで外部からのアクセスを制限だけなのと、VPCサブネットを別にしてルート定義を分けるのと、どれくらいセキュリティ効果が違うものなのか?

続きを読む

AWS データ蓄積アイテムのサイズ上限

AWSサービスの最小保存単位の最大サイズ一覧

DynamoDb 400KB

  • 属性名の長さと属性値の長さを含むサイズ
  • 特徴:(限定的な)クエリーもかけるKVS。集計はできない。lambdaのイベント発生元になれる。
  • サービス種類:リージョンサービス

ElastiCache(Redis) 1GB

  • SET(key, value) のvalueの制限
  • 特徴:KVS。FIFOとしても使える。
  • サービス種類:AZサービス

SQS 256 KB

  • メッセージサイズ
  • 特徴:FIFO。厳密な順序保証はなし。
  • サービス種類:リージョンサービス

RDS(mysql) 64KB

  • 最大行サイズ(TEXT,BLOBのデータは別領域)
  • 特徴:クエリーかける。集計できる。
  • サービス種類:AZサービス

S3 5TB

  • オブジェクトのサイズ
  • 特徴:ストレージ。athenaでクエリーもかける?lambdaのイベント発生元になれる。
  • サービス種類:リージョンサービス

Redshift ??

  • サイズ制限に関する情報が見当たらず
  • 特徴:クエリーかける。集計めっちゃ早い。管理や設計が難。
  • サービス種類:AZサービス

サービス種類について

  • リージョンサービス:リージョン単位のサービス。AZ単位の障害の考慮不要
  • AZサービス:AZ単位のサービス。AZ単位の障害の考慮必要。(バックアップ、フェイルオーバー、リードレプリカ、など)

続きを読む

Go言語でDynamoDB接続を試みたときの備忘録

前提

最近、Go言語を触り始めた。

GoからDynamoDBに接続するにあたって、 aws-sdk-go/aws/service/dynamodb を使おうとし、ドキュメントを読んでたら結構難しそうだったので、 guregu/dynamo を使うことにした。

また、DynamoDBも触ったことがなかった。

ゴールは、GoでDynamoDBに読み書きすること!

DynamoDBの設定

まず、AWSコンソール画面からDynamoDBのテーブルを作ります。

スクリーンショット 2017-06-17 16.47.12.png

上記の画面で、「テーブル名」を Test とします。

「プライマリーキー」を user_id とします。型は「数値」にします。

他のattributeにかんしては、時間に関する値を想定してますが、実際にGoからItemをPUTするときに足されるので、定義する必要はありません。(そもそも登録もできない。)

(キャメルケースがいいのか、パスカルケースがいいのか等の命名規則もあるかと思いますが、一旦これでいきます。)

これでAWS側の用意は出来ました。

あと、もう一つ大事なこととして
IAMで、DynamoDBにアクセスできる権限をもったユーザーを作成し、

  • アクセスキー
  • シークレットキー

を入手しましょう。

Go言語をかく!

user_dynamo.goという名前でファイルを作成します。

必要なモジュール

以下のリアブラリをimportします。

user_dynamo.go
import (
  "fmt"
  "time"
  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/aws/credentials"
  "github.com/guregu/dynamo"
)
ライブラリ名 説明
fmt 標準出力に使う。
time 現在時刻を取得するのに使う。
github.com/aws/aws-sdk-go/aws configに関するオブジェクト作成につかう
github.com/aws/aws-sdk-go/aws/session sessionに関するオブジェクト作成につかう
github.com/aws/aws-sdk-go/aws/credentials awsへのアクセス情報に関するオブジェクト作成につかう
github.com/guregu/dynamo github.com/aws/aws-sdk-go/aws/service/dynamodbに対する操作を便利にしてくれるライブラリ

DynamoDBのスキーマ定義

この言い回しが正しいかわかりませんが、DynamoDBのUserテーブルに登録する予定の項目を構造体として定義します。

user_dynamo.go
type User struct {
  UseId int `dynamo:"use_id"`
  CreatTime time.Time `dynamo:"created_time"`
}

inttime.Time のあとのバッククウォートで囲まれた部分はGoで「タグ情報」と呼ばれるものになります。

DynamoDBのUserテーブルにアクセス

user_dynamo.go
func main(){
  c := credentials.NewStaticCredentials("<アクセスキー>", "<シークレットキー>", "") // 最後の引数は[セッショントークン]

  db := dynamo.New(session.New(), &aws.Config{
      Credentials: c,
      Region: aws.String("<region名>"), // "ap-northeast-1"等
  })
  table := db.Table("User")

 // 続く
}

のように書きます。

データをPUTする

user_dynamo.go
func main(){
  // 続き

  u := User{UserId: 3, CreatTime: time.Now().UTC()}
  fmt.Println(u)

  if err := table.Put(u).Run(); err != nil {
    fmt.Println("err")
    panic(err.Error())
  }

上記をまとめると

user_dynamo.go
package main

import (
  "fmt"
  "time"
  "github.com/aws/aws-sdk-go/aws"
  "github.com/aws/aws-sdk-go/aws/session"
  "github.com/aws/aws-sdk-go/aws/credentials"
  "github.com/guregu/dynamo"
)


type User struct {
  UseId int `dynamo:"use_id"`
  CreatedTime time.Time `dynamo:"created_time"`
}


func main(){
  cred := credentials.NewStaticCredentials("<アクセスキー>", "<シークレットキー>", "") // 最後の引数は[セッショントークン]

  db := dynamo.New(session.New(), &aws.Config{
      Credentials: cred,
      Region: aws.String("<region名>"), // "ap-northeast-1"等
  })

  table := db.Table("User")

  u := User{UserId: 100, CreatedTime: time.Now().UTC()}
  fmt.Println(u)

  if err := table.Put(u).Run(); err != nil {
    fmt.Println("err")
    panic(err.Error())
  }

それではterminalから

$ go run user_dynamo.go

とうってみてください。DynamoDBに

user_id created_time
100 2017-06-02T09:25:56.000147134Z

と登録されたかと思います。これで、DynamoDBへのデータ登録は完了です。

データの読み取り

DynamoDBより

「user_idが100のデータ全て」

データを読み取る場合は以下のようになります。

user_dynamo.go
func main(){
  // 続き
  var users []User
  err := table.Get("user_id", 100).All(&users)
  if err != nil {
    fmt.Println("err")
    panic(err.Error())
  }

  fmt.Println(users) // [{100 2017-06-02T09:25:56.000147134 +0000 UTC}]と出力される。

  for i, _ := range users {
    fmt.Println(users[i]) // {100 2017-06-02T09:25:56.000147134 +0000 UTC}
  }
}

また全データを取得したい場合は

err := table.Get("user_id", 1).All(&users)

というところを

err := table.Scan().All(&users)

とすれば大丈夫です。

さらに created_time によって、絞りたい場合は以下のようにすれば大丈夫です。

err := table.Get("user_id", 1).Filter("created_time >= ?", "2017-06-01 00:00:00").All(&users)

注意点

DynamoDBでは、プライマリーキーは、DBで一つしか存在できないので、本コードでは書き込みをするたびに書き換わります。

参考にしたサイト

http://www.geocities.jp/m_hiroi/golang/abcgo07.html
https://github.com/guregu/dynamo
http://qiita.com/guregu/items/2e9ac305791935b86f5d
https://developers.eure.jp/tech/aws-sdk-go-tutorial-sqs-dynamodb/

続きを読む