Alexa からの smart home skill イベントを SQS でデバイスに送信して結果を受信 【Python】

Alexa のスマートホームスキルで呼ばれる lambda で取得したイベントを SQS を使ってデバイスに送る方法を紹介します。目的は AWS の勉強ですので正直プロダクトじゃない場合はこの辺は自作せず買った方が良い気がします。
これは以前の記事にも書きましたが AWS greengrass でもっときれいに実現できる(と思います)。一応 python でスマートホームスキル書いたのでそれの軽い解説もしておきます。

前提

  • python 3.6
  • Alexa での smart home skill の作り方は知っている
  • 私は初めてのホームスキル作成

参考サイト

Alexa のスキルに対応するコード

Alexa のメッセージが送られて来る Lambda の部分です。namespacenameに合わせて処理内容を書きます。今回は基本的に Discovery のイベント以外はラズパイに投げてしまい、ラズパイで色々な作業をします。

smartHomeSkill.py
def my_handler(event, context):
    logging("DEBUG", "Request", event)
    header = event['directive']['header']
    namespace = header['namespace']
    name = header['name']

    # Discovery request だったら Lambda で処理
    if namespace == ALEXA_DISCOVERY and name == DISCOVER:
        return handleDiscovery(event, context)

    # 今回は Lambda 側ではあまり作業はせずにイベントをデバイスに渡してしまう
    # configファイルの読み込み
    ini = configparser.ConfigParser()
    ini.read("./config.ini")

    # 現在(2018/01/14)日本語の Alexa スキルはオレゴンでしか実行できないので
    # 東京リージョンのSQSを使うためにリージョンを指定
    sqs = boto3.client('sqs', region_name='ap-northeast-1')
    requestQueUrl = ini['sqs']['requestQueUrl']
    responseQueUrl = ini['sqs']['responseQueUrl']

    # イベントを送信
    result = sendQueBody(sqs, requestQueUrl, event)
    if result is False:
        return createErrorResponse(event, 'INTERNAL_ERROR', "Failed to send request to the device")
    # 結果を受信
    response = receiveQueBody(sqs, responseQueUrl)
    if response is None:
        return createErrorResponse(event, 'INTERNAL_ERROR', "Failed to receive a device's response")

    logging("DEBUG", "Response", response)
    return response

requestQueUrlresponseQueUrlには SQS の URL を設定ファイルから読み取っています。https://sqs.ap-northeast-1.amazonaws.com/XXXXXXXXXX/YYYYYYYYYってかんじのやつです。
SQS クライアントは作った場所のリージョンを指定する必要があるので気をつけてください。
スキルの作成時にどこで実行するのかを選べるのでおそらくエッジロケーションで lambda は実行されていると思います。Far-east と指定ができるのでおそらく日本付近で実行されているのではないかと。なので SQS も日本に作っておくのが良いと思います。

まずは本題の SQS の部分の説明、次におまけでhandleDiscovery(event, context)を紹介します。

SQS の送受信

sendQueBody()receiveQueBody()の実装です。

utils.py
# -*- coding: utf-8 -*-
import json
import hashlib

# url で指定した SQS に body を json で送信
def sendQueBody(sqs, url, body):
    jsonBody = json.dumps(body)
    response = sqs.send_message(
        QueueUrl=url,
        DelaySeconds=0,
        MessageBody=(
            jsonBody
        )
    )

    # 送信に成功したか確認
    if response['MD5OfMessageBody'] != hashlib.md5(jsonBody.encode('utf-8')).hexdigest():
        logging("ERROR", "sendQueBody", "Failed to send sqs")
        return False

    return True

# SQS からメッセージを受け取る
# returnType: dict
def receiveQueBody(sqs, url):
    response = sqs.receive_message(
        QueueUrl=url,
        AttributeNames=[
            'SentTimestamp'
        ],
        MaxNumberOfMessages=1,
        VisibilityTimeout=0,
        WaitTimeSeconds=20
    )

    # キューになにもない場合
    if 'Messages' not in response:
        return None

    message = response['Messages'][0]
    body = json.loads(message['Body'])

    # メッセージを削除するための情報を取得
    receipt_handle = message['ReceiptHandle']

    # 場合によっては処理が完了してからメッセージを削除するが
    # 今回は受信した時点で削除する
    sqs.delete_message(
        QueueUrl=url,
        ReceiptHandle=receipt_handle
    )

    return body

SQS からデータを削除するタイミングは場合によると思いますが、今回は Alexa に命令したものが一回失敗したら次も失敗する想定だったのでリトライとか考えずに受信したら削除してしまいます。sqs.send_message()MD5OfMessageBodyによって送信ができているかが確認できます。

handleDiscovery(event, context)

Alexa にスマートホーム家電を見つけてなどと指示をしたときの処理をする部分です。これで「電気をつけて」などに対応できることを Alexa に伝えられます。対応する部分は別に実装する必要があります。

smartHomeSkill.py
# Alexa の Discovery request に対応するためのコード
# ここでこのスキルで使えるデバイスの内容を全て返してあげる。
def handleDiscovery(event):
    payload = {
        "endpoints": [
            {
                # 電気
                "endpointId": RIGHT_00["endpointId"],
                "manufacturerName": COMPANY_NAME,
                "friendlyName": RIGHT_00["friendlyName"],
                "description": "This is smart device",
                "displayCategories": [RIGHT_00["displayCategories"]],
                "capabilities": [{
                    "type": "AlexaInterface",
                    "interface": "Alexa",
                    "version": "3"
                },
                    {
                        "interface": "Alexa.PowerController",
                        "version": "3",
                        "type": "AlexaInterface",
                        "properties": {
                            "supported": [{
                                "name": "powerState"
                            }],
                            "retrievable": True
                        }
                    }
                ]
            },
            {
                # TV
                "endpointId": TV_00["endpointId"],
                "manufacturerName": COMPANY_NAME,
                "friendlyName": TV_00["friendlyName"],
                "description": "This is smart device",
                "displayCategories": [TV_00["displayCategories"]],
                "capabilities": [{
                    "type": "AlexaInterface",
                    "interface": "Alexa",
                    "version": "3"
                },
                    {
                        "interface": "Alexa.PowerController",
                        "version": "3",
                        "type": "AlexaInterface",
                        "properties": {
                            "supported": [{
                                "name": "powerState"
                            }],
                            "retrievable": True
                        }
                    }
                ]
            },
            {
                # エアコン
                "endpointId": THERMOSTAT_00["endpointId"],
                "manufacturerName": COMPANY_NAME,
                "friendlyName": THERMOSTAT_00["friendlyName"],
                "description": "This is smart device",
                "displayCategories": [THERMOSTAT_00["displayCategories"]],
                "capabilities": [{
                    "type": "AlexaInterface",
                    "interface": "Alexa",
                    "version": "3"
                },
                    {
                        "interface": "Alexa.PowerController",
                        "version": "3",
                        "type": "AlexaInterface",
                        "properties": {
                            "supported": [{
                                "name": "powerState"
                            }],
                            "retrievable": True
                        }
                    }
                ]
            }
        ]
    }
    header = event['directive']['header']
    header['name'] = "Discover.Response"
    logging("DEBUG", 'Discovery Response: ', ({'header': header, 'payload': payload}))
    return {
        'event': {'header': header, 'payload': payload}
    }

所感

Alexa スキルが Lambda で実装されていることをしり、面白そうだなと Alexa 予約してスキルを作っています。JSONでの送受信フォーマットが違うと Alexa は毎回「XXの応答がありません」としか返してくれないのでデバックがやりにくいです。またいざ自分で作ってみると日本語に対応していない API が多いようでがっかりしています。早く対応してほしい。これからさかんになる IoT の実装を体験できたのは良い勉強だったのではないでしょうか。

まとめ

今回は Lambda から SQS の送受信を説明しました。また Alexa に Discovery 部分も軽く紹介しました。今度はラズパイでの家電を実際に動かす実装を紹介します。

続きを読む

scrapy でクローラーを実装し、画像を収集してみる

AWS Rekognition を使う時にクローラーも使ってなんかできないかなと思い scrapy を利用してみました。とりあえず今回はドメインと画像収集のところまで。いかがわしいことには絶対利用しないでください

今回はスタートのページからどんどんリンクを辿り、ドメイン名のフォルダごとに、辿った時のページの画像を保存します。今度そのフォルダごとに画像を AWS Rekognition に投げて、そのドメインがどんなドメインなのかを画像から判別しようと考えています。

前提

  • scrapy 1.5.0
  • python3
  • scrapy インストール済み

参考サイト

Spider のコード

クローラーの肝となる部分です。参考サイトではCrawlSpiderクラスを継承して利用している場合が多かったです。そっちの方が大抵の場合は楽だと思います。

WebSpider.py
# -*- coding: utf-8 -*-
  import scrapy
  from tutorial.items import TutorialItem
  import re
  from scrapy.exceptions import NotSupported
  from urllib.parse import urlparse


  class WebSpider(scrapy.Spider):
      name = 'web'
      # 見つけたドメインを入れる
      tracked_domains = []
      # 全てを対象
      allowed_domains = []
      # 最初に見に行くサイト
      start_urls = ['http://XXXXXXXXXXXXX']

      # response を毎回処理する関数
      def parse(self, response):
          try:
              # データ処理
              # この関数内の処理が終わると続きを実行する
              # dataPipeline を利用した場合もここに戻って来る
              yield self.parse_items(response)

              # リンクを辿る
              for link in response.xpath('//@href').extract():
                  if re.match(r"^https?://", link):
                      yield scrapy.Request(link, callback=self.parse)
          except NotSupported:
              # GET のレスポンスが txt じゃなかった場合
              # Spiders の logging 機能が用意されているのでそれを利用
              self.logger.info("Raise NotSupported")

      # ドメインごとにページに表示された画像を保存する
      def parse_items(self, response):
          # domain の抽出
          url = response.url
          parsed_url = urlparse(url)
          domain = parsed_url.netloc

          # 同じ Domain は一回しかチェックしない
          if domain in self.tracked_domains:
              return

          self.tracked_domains.append(domain)

          item = TutorialItem()
          item['domain'] = domain

          # title の抽出
          title = response.xpath(r'//title/text()').extract()
          if len(title) > 0:
              item['title'] = title[0]
          else:
              item['title'] = None

          # 画像 URL をセット
          item["image_urls"] = []
          for image_url in response.xpath("//img/@src").extract():
              if "http" not in image_url:
                  item["image_urls"].append(response.url.rsplit("/", 1)[0]
                         + "/" + image_url)
              else:
                  item["image_urls"].append(image_url)

          # item を返すと datapipeline に渡される
          return item

start_urlsに設定したURLを元にクローラーが動きます。
私はそんなことはしませんが、ここにいかがわしいサイトを指定するとリンクを辿って新しいいかがわしいサイトのドメインが見つかるかもしれません。私はそんなことしませんが。

基本的にはparse()関数が scrapy のレスポンスごとに呼ばれて処理を行います。
今回は途中でparse_items()を呼び出し、まだ保存していないドメインであればフォルダを作成してそのページ上の画像を保存します。

settings.py で scrapy の設定を記述

parse_items()itemを return すると、ImagePipeline に渡されます。
その設定は以下の通りです。自作 Pipeline の説明は後述。

settings.py
  # 自作 pipeline に繋げる
  ITEM_PIPELINES = {'tutorial.pipelines.MyImagesPipeline': 1}
  # データの保存場所
  IMAGES_STORE = '/Users/paper2/scrapy/tutorial/imgs'
  # リンクを辿る深さを指定
  DEPTH_LIMIT = 5
  # LOG_LEVEL = 'ERROR'
  DOWNLOAD_DELAY = 3

pipelines.py で pipeline をカスタマイズ

デフォルトの ImagePipeline ですとドメインごとのフォルダを作成して、、、などといった追加の作業ができないので継承して自分で作成します。

pipeliens.py
# -*- coding: utf-8 -*-
from scrapy.pipelines.images import ImagesPipeline
import os
from tutorial import settings
import shutil


class MyImagesPipeline(ImagesPipeline):
    def item_completed(self, results, item, info):
        # DL できたファイルのパス
        file_paths = [x['path'] for ok, x in results if ok]

        # ドメインごとのフォルダに move
        for file_path in file_paths:
            img_home = settings.IMAGES_STORE
            full_path = img_home + "/" + file_path
            domain_home = img_home + "/" + item['domain']

            os.makedirs(domain_home, exist_ok=True)
            # DL した結果同じファイルのことがある
            if os.path.exists(domain_home + '/' + os.path.basename(full_path)):
                continue
            shutil.move(full_path, domain_home)

        # parse() の続きに戻る
        return item

これで完成です。

実際に回してみる

しばらく回すと色々なドメインから画像が集まりました。
Screen Shot 2018-01-20 at 20.07.47.png

うん?よくみたらいかがわしいサイトのドメインが混ざっているぞ、、、画像もいかがわしいものが、、、、ということで次回はこのいかがわしいドメインを取り除く (逆にそれだけ残す??)のを AWS Rekognition でやってみようと思います。

続きを読む

Alexa Home Skill では Payload Version v3 を利用する

Alexa Home Skill を使ってみようとした時に詰まったのでメモ.

Payload Version v2 のサイトは古い

いくつか説明サイトがあるがレスポンスの
Payload Versionv2にしているものは古いので公式説明ページを参照することをおすすめします.

現在(2018/01/12)は,Lambdaに用意されているalexa-smart-home-skill-adapterテンプレートも古いままで,これをそのまま使うと versionがv2なので Alexa スキルのデフォルト設定の v3 にしているとデバイスを見つけてくれないので注意が必要.公式説明ページにLambdaの書き換えるコードも書いてあります.

追記(実際にアレクサでスキルを試す場合)

Alexa Skill を作ってみる分には公式説明ページで十分でしたが,実際にコードをアレクサで試す場合には公式ページではレスポンスをうまく返せていないのでAlexaは「○○は応答していません」とエラーを返します.そこでLambdaの関数を書き換えました.基本は公式ページのコードを引用しています.

index.js
exports.handler = function(request, context) {
    if (request.directive.header.namespace === 'Alexa.Discovery' && request.directive.header.name === 'Discover') {
        log("DEGUG:", "Discover request", JSON.stringify(request));
        handleDiscovery(request, context, "");
    }
    else if (request.directive.header.namespace === 'Alexa.PowerController') {
        if (request.directive.header.name === 'TurnOn' || request.directive.header.name === 'TurnOff') {
            log("DEBUG:", "TurnOn or TurnOff Request", JSON.stringify(request));
            handlePowerControl(request, context);
        }
    }

    function handleDiscovery(request, context) {
        var payload = {
            "endpoints": [{
                "endpointId": "demo_id",
                "manufacturerName": "Smart Device Company",
                "friendlyName": "電気",
                "description": "smart switch",
                "displayCategories": ["SWITCH"],
                "cookie": {
                    "key1": "arbitrary key/value pairs for skill to reference this endpoint.",
                    "key2": "There can be multiple entries",
                    "key3": "but they should only be used for reference purposes.",
                    "key4": "This is not a suitable place to maintain current endpoint state."
                },
                "capabilities": [{
                        "type": "AlexaInterface",
                        "interface": "Alexa",
                        "version": "3"
                    },
                    {
                        "interface": "Alexa.PowerController",
                        "version": "3",
                        "type": "AlexaInterface",
                        "properties": {
                            "supported": [{
                                "name": "powerState"
                            }],
                            "retrievable": true
                        }
                    }
                ]
            }]
        };
        var header = request.directive.header;
        header.name = "Discover.Response";
        log("DEBUG", "Discovery Response: ", JSON.stringify({ header: header, payload: payload }));
        context.succeed({ event: { header: header, payload: payload } });
    }

    function log(message, message1, message2) {
        console.log(message + message1 + message2);
    }

    function handlePowerControl(request, context) {
        // get device ID passed in during discovery
        var requestMethod = request.directive.header.name;
        // get user token pass in request
        var requestToken = request.directive.endpoint.scope.token;
        var powerResult;

        if (requestMethod === "TurnOn") {

            // Make the call to your device cloud for control 
            // powerResult = stubControlFunctionToYourCloud(endpointId, token, request);
            powerResult = "ON";
        }
        else if (requestMethod === "TurnOff") {
            // Make the call to your device cloud for control and check for success 
            // powerResult = stubControlFunctionToYourCloud(endpointId, token, request);
            powerResult = "OFF";
        }

        var response = {
            "context": {
                "properties": [{
                    "namespace": "Alexa.PowerController",
                    "name": "powerState",
                    "value": powerResult,
                    "timeOfSample": "2017-02-03T16:20:50.52Z",
                    "uncertaintyInMilliseconds": 500
                }]
            },
            "event": {
                "header": {
                    "namespace": "Alexa",
                    "name": "Response",
                    "payloadVersion": "3",
                    "messageId": "5f8a426e-01e4-4cc9-8b79-65f8bd0fd8a4",
                    "correlationToken": "dFMb0z+PgpgdDmluhJ1LddFvSqZ/jCc8ptlAKulUj90jSqg=="
                },
                "payload": {}
            }
        };


        log("DEBUG", "Alexa.PowerController ", JSON.stringify(response));
        context.succeed(response);
    }
};

これで「電気をつけて」とAlexaにお願いすると「はい」と返答してくれます.

続きを読む

Raspberry pi model b+ では AWS greengrass できなかった

ずっと眠っていたラズパイを活用しようとAWS Greengrass利用しようとしたら条件満たせず使えませんでした.
(できるかわからないけど)Alexa スキルの Lambda をラズパイで実行するつもりでしたが作戦を変える必要がありそうです.

CPUアーキテクチャ条件

よし,使うぞ!とアーキテクチャ調べたら即条件を満たせず.GreengrassがサポートしているアーキテクチャはARMv7lだそうです.

pi@raspberrypi:~ $ arch
armv6l

続きを読む

AWS SDK で SQS を Python で操作する

今回はPythonのAWS SDKでSQSメッセージの送受信を行います.

前提

  • aws cli の configure を済ませている

ソースコード

設定はconfigparserモジュールを使用します.

config.ini
[sqs]
url : https://sqs.ap-northeast-1.amazonaws.com/YYYYYYY/XXX

それではソースコードです.

sqs.py
# -*- coding: utf-8 -*-

import configparser
import boto3
import json

# configファイルの読み込み
ini = configparser.SafeConfigParser()
ini.read("./config.ini")

sqs = boto3.client('sqs')
url = ini.get("sqs", "url")

# 送信するJSON
body = {"type": "Right", "Action": "On"}

# SQSへJSONの送信
response = sqs.send_message(
    QueueUrl=url,
    DelaySeconds=0,
    MessageBody=(
        json.dumps(body)
    )
)

# SQSからJSONを受信
response = sqs.receive_message(
    QueueUrl=url,
    AttributeNames=[
        'SentTimestamp'
    ],
    MaxNumberOfMessages=1,
    VisibilityTimeout=0,
    WaitTimeSeconds=0
)

message = response['Messages'][0]
body = json.loads(message['Body'])
print(body)

# メッセージを削除するための情報を取得
receipt_handle = message['ReceiptHandle']

# メッセージを削除
sqs.delete_message(
    QueueUrl=url,
    ReceiptHandle=receipt_handle
)

実行します.

$python3 sqs.py
{'Action': 'On', 'type': 'Right'}

無事JSONを受信できました.
Alexaを使って遊ぼうと考えているので今日はその準備にSQSを使用してみました.

参考文献

続きを読む