Aws lambda pymysql

AWS Lambda function example. Glue import sys import logging import rds_config import pymysql rds_host = “rds-instance” db_name = rds_config. logger. db_name = os. try: conn = pymysql. pymysql (mysql connector for Python) 2017年4月25日 一行で説明Python で書かれた Lambda function から RDS … 続きを読む

scrapy でクローラーを実装し、画像を収集してみる

AWS Rekognition を使う時にクローラーも使ってなんかできないかなと思い scrapy を利用してみました。とりあえず今回はドメインと画像収集のところまで。いかがわしいことには絶対利用しないでください

今回はスタートのページからどんどんリンクを辿り、ドメイン名のフォルダごとに、辿った時のページの画像を保存します。今度そのフォルダごとに画像を AWS Rekognition に投げて、そのドメインがどんなドメインなのかを画像から判別しようと考えています。

前提

  • scrapy 1.5.0
  • python3
  • scrapy インストール済み

参考サイト

Spider のコード

クローラーの肝となる部分です。参考サイトではCrawlSpiderクラスを継承して利用している場合が多かったです。そっちの方が大抵の場合は楽だと思います。

WebSpider.py
# -*- coding: utf-8 -*-
  import scrapy
  from tutorial.items import TutorialItem
  import re
  from scrapy.exceptions import NotSupported
  from urllib.parse import urlparse


  class WebSpider(scrapy.Spider):
      name = 'web'
      # 見つけたドメインを入れる
      tracked_domains = []
      # 全てを対象
      allowed_domains = []
      # 最初に見に行くサイト
      start_urls = ['http://XXXXXXXXXXXXX']

      # response を毎回処理する関数
      def parse(self, response):
          try:
              # データ処理
              # この関数内の処理が終わると続きを実行する
              # dataPipeline を利用した場合もここに戻って来る
              yield self.parse_items(response)

              # リンクを辿る
              for link in response.xpath('//@href').extract():
                  if re.match(r"^https?://", link):
                      yield scrapy.Request(link, callback=self.parse)
          except NotSupported:
              # GET のレスポンスが txt じゃなかった場合
              # Spiders の logging 機能が用意されているのでそれを利用
              self.logger.info("Raise NotSupported")

      # ドメインごとにページに表示された画像を保存する
      def parse_items(self, response):
          # domain の抽出
          url = response.url
          parsed_url = urlparse(url)
          domain = parsed_url.netloc

          # 同じ Domain は一回しかチェックしない
          if domain in self.tracked_domains:
              return

          self.tracked_domains.append(domain)

          item = TutorialItem()
          item['domain'] = domain

          # title の抽出
          title = response.xpath(r'//title/text()').extract()
          if len(title) > 0:
              item['title'] = title[0]
          else:
              item['title'] = None

          # 画像 URL をセット
          item["image_urls"] = []
          for image_url in response.xpath("//img/@src").extract():
              if "http" not in image_url:
                  item["image_urls"].append(response.url.rsplit("/", 1)[0]
                         + "/" + image_url)
              else:
                  item["image_urls"].append(image_url)

          # item を返すと datapipeline に渡される
          return item

start_urlsに設定したURLを元にクローラーが動きます。
私はそんなことはしませんが、ここにいかがわしいサイトを指定するとリンクを辿って新しいいかがわしいサイトのドメインが見つかるかもしれません。私はそんなことしませんが。

基本的にはparse()関数が scrapy のレスポンスごとに呼ばれて処理を行います。
今回は途中でparse_items()を呼び出し、まだ保存していないドメインであればフォルダを作成してそのページ上の画像を保存します。

settings.py で scrapy の設定を記述

parse_items()itemを return すると、ImagePipeline に渡されます。
その設定は以下の通りです。自作 Pipeline の説明は後述。

settings.py
  # 自作 pipeline に繋げる
  ITEM_PIPELINES = {'tutorial.pipelines.MyImagesPipeline': 1}
  # データの保存場所
  IMAGES_STORE = '/Users/paper2/scrapy/tutorial/imgs'
  # リンクを辿る深さを指定
  DEPTH_LIMIT = 5
  # LOG_LEVEL = 'ERROR'
  DOWNLOAD_DELAY = 3

pipelines.py で pipeline をカスタマイズ

デフォルトの ImagePipeline ですとドメインごとのフォルダを作成して、、、などといった追加の作業ができないので継承して自分で作成します。

pipeliens.py
# -*- coding: utf-8 -*-
from scrapy.pipelines.images import ImagesPipeline
import os
from tutorial import settings
import shutil


class MyImagesPipeline(ImagesPipeline):
    def item_completed(self, results, item, info):
        # DL できたファイルのパス
        file_paths = [x['path'] for ok, x in results if ok]

        # ドメインごとのフォルダに move
        for file_path in file_paths:
            img_home = settings.IMAGES_STORE
            full_path = img_home + "/" + file_path
            domain_home = img_home + "/" + item['domain']

            os.makedirs(domain_home, exist_ok=True)
            # DL した結果同じファイルのことがある
            if os.path.exists(domain_home + '/' + os.path.basename(full_path)):
                continue
            shutil.move(full_path, domain_home)

        # parse() の続きに戻る
        return item

これで完成です。

実際に回してみる

しばらく回すと色々なドメインから画像が集まりました。
Screen Shot 2018-01-20 at 20.07.47.png

うん?よくみたらいかがわしいサイトのドメインが混ざっているぞ、、、画像もいかがわしいものが、、、、ということで次回はこのいかがわしいドメインを取り除く (逆にそれだけ残す??)のを AWS Rekognition でやってみようと思います。

続きを読む

Cronから実行するEC2スナップショットスクリプト

実行条件

  • awscliがインストールされている
  • IAMロールなどでEC2周りの権限を解放しておく
#!/bin/sh

# 取得したインスタンスのidを並べる
INSTANCE_ID=(i-xxxxx1 i-xxxxx2 i-xxxxx3)

SHELLDIR=`dirname ${0}`
SHELLDIR=`cd ${SHELLDIR}; pwd`
SHELLNAME=`basename $0`

LOG_DIR="/var/log"
LOG_SAVE_PERIOD=14
LOG_FILE="${LOG_DIR}/${SHELLNAME}.log"
echo $LOG_FILE

REGION=ca-central-1
SNAPSHOTS_PERIOD=2

AWS="/usr/bin/aws --region ${REGION}"


rotate_log() {
    (( cnt=${LOG_SAVE_PERIOD} ))
    while (( cnt > 0 ))
    do
        logfile1=${LOG_FILE}.$cnt
        (( cnt=cnt-1 ))
        logfile2=${LOG_FILE}.$cnt
        if [ -f $logfile2 ]; then
            mv $logfile2 $logfile1
        fi
    done

    if [ -f $LOG_FILE ]; then
        mv ${LOG_FILE} ${LOG_FILE}.1
    fi
    touch $LOG_FILE
}

print_msg() {
    echo "`date '+%Y/%m/%d %H:%M:%S'` $1" | tee -a ${LOG_FILE}
}

create_snapshot() {
    for ID in `echo $@`
    do
        print_msg "Create snapshot Start"
        VOL_ID=`${AWS} ec2 describe-instances --instance-ids ${ID} --output text | grep EBS | awk '{print $5}'`
        if [ -z ${VOL_ID} ] ; then
            echo ${VOL_ID}
            print_msg "ERR:ec2-describe-instances"
            logger -f ${LOG_FILE}
            exit 1
        fi
        print_msg "ec2-describe-instances Success : ${VOL_ID}"
        ${AWS} ec2 create-snapshot --volume-id ${VOL_ID} --description "Created by SYSTEMBK(${ID}) from ${VOL_ID}" >> ${LOG_FILE} 2>&1
        if [ $? != 0 ] ; then
            print_msg "ERR:${SHELLDIR}/${SHELLNAME} ec2-create-snapshot"
            logger -f ${LOG_FILE}
            exit 1
        fi
        print_msg "Create snapshot End"
    done
}

delete_old_snapshot() {
    for ID in `echo $@`
    do
        VOL_ID=`${AWS} ec2 describe-instances --instance-ids ${ID} --output text | grep EBS | awk '{print $5}'`
        print_msg "Delete old snapshot Start"
        SNAPSHOTS=`${AWS} ec2 describe-snapshots --output text | grep ${VOL_ID} | grep "Created by SYSTEMBK" | wc -l`
        while [ ${SNAPSHOTS} -gt ${SNAPSHOTS_PERIOD} ]
        do
            ${AWS} ec2 delete-snapshot --snapshot-id `${AWS} ec2 describe-snapshots --output text | grep ${VOL_ID} | grep "Created by SYSTEMBK" | sort -k 11,11 | awk 'NR==1 {print $10}'` >> ${LOG_FILE} 2>&1
            if [ $? != 0 ] ; then
                print_msg "ERR:${SHELLDIR}/${SHELLNAME} ec2-delete-snapshot"
                logger -f ${LOG_FILE}
                exit 1
            fi
            SNAPSHOTS=`${AWS} ec2 describe-snapshots | grep ${VOL_ID} | grep "Created by SYSTEMBK" | wc -l`
        done
        print_msg "Delete old snapshot End"
    done
}

rotate_log

print_msg "INF:$SHELLDIR/${SHELLNAME} START"
create_snapshot ${INSTANCE_ID[@]}
delete_old_snapshot ${INSTANCE_ID[@]}
print_msg "INF:$SHELLDIR/${SHELLNAME} END"

exit 0

続きを読む

Spring Boot for java で設定ファイル( application.properties )の値を参照

Autowired; import org.springframework.beans.factory.annotation.Value; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Controller public class TeiFind { private static final Logger logger = LoggerFactory.getLogger(TeiFind.class); @Value(“${aws.s3.region}”) //// ココ private String awsS3Region; … 続きを読む

AWS codepipelineで手動承認が複数発生した場合

TL;DR

  • AWS codepipelineで承認ボタンを押しても、ボタンが残るケースがある
  • 自分の場合、Lambdaでcodepipelineをよしなにする処理を追加
  • 他にいい方法ご存知の方は、コメントいただけると嬉しいです

背景

以下のような形で承認付きのAWS+Docker+BlueGreenDeploymentを構築していました。

blue-green sample.png

図のざっくりな流れとしては、

1.githubでmasterのマージ

2.AWS Codebuild経由で最新dockerイメージをGreen系にデプロイ

3.承認者へslackで通知し、AWSコンソールより承認ボタンpush

4.承認ボタンpush後、ELBのbluegreen切り替え

という感じです。

参考リンク:
https://aws.amazon.com/jp/blogs/news/bluegreen-deployments-with-amazon-ecs/
https://dev.classmethod.jp/cloud/aws/codepipeline-approval-action/

こちらをテスト運用してみたところ、

・問題なく承認してリリースした場合
・codepipelineで一度否認してから再度マージする場合
であれば問題ないんですが、

・否認するまえに再度マージしたpipeline処理が追いついてしまった
場合、codepipelineの承認ボタンを一度押してもまた残ってしまうケースがありました。

スクリーンショット 2017-12-20 14.09.53.jpg
  ※押しても承認ボタンが復活してしまう。否認しても再試行ボタンは残ってしまう
  もちろん承認ボタンを押してしまうと、以降のpipeline処理は動いてしまう。

対応

残った承認ボタンによる不要なpipelineの実行を防ぎたかったので、
pipeline実行時に
承認アクションの状態を確認→承認待ちなら自動的に否認
という処理をLambda(python,boto3)でなんとかしています。
以下参考ですがコードです。

import boto3
import logging
import time

logger = logging.getLogger()
logger.setLevel(logging.INFO)

elb = boto3.client('elbv2')
ecs = boto3.client('ecs')
asg = boto3.client('autoscaling')
cf = boto3.resource('cloudformation')
code_pipeline = boto3.client('codepipeline')

def reject_approve(pipeline):
    # get pipeline state
    response = code_pipeline.get_pipeline_state(
        name = pipeline
    )
    target_stage_pos = response['stageStates'][xxx] //xxx にはApprovalステージの場所
    target_action_json = target_stage_pos['actionStates'][0]
    print target_action_json
    print "-----------"

    if 'latestExecution' not in target_action_json:
        print "no execution history in this stage."
        return

    target_stage_name = target_stage_pos['stageName']
    print "Check stage name."
    print target_stage_name
    print "-----------"

    target_action_name = target_stage_pos['actionStates'][0]['actionName']
    print "Check action name."
    print target_action_name

    print "-----------"
    target_action_status = target_action_json['latestExecution']['status']
    print "Check status."
    print target_action_status

    if target_action_status == 'InProgress':
        print "-----------"
        target_action_token = target_action_json['latestExecution']['token']
        print "Get token."
        print target_action_token

    print "-----------"
    if target_stage_name == 'YOUR_APPROVAL_STAGE_NAME' and target_action_name == 'Approve' and target_action_status == 'InProgress':
        print "target check OK."
        print "-----------"
        print "Reject target approval."
        reject_response = code_pipeline.put_approval_result(
            pipelineName = pipeline,
            stageName = target_stage_name,
            actionName = target_action_name,
            result={
                'summary': target_action_status,
                'status': 'Rejected'
            },
            token = target_action_token
        )        
        print reject_response
    else:
        print "No action to reject."

def lambda_handler(event, context):
    try:
        # consts
        pipeline_name = 'YOUR_PIPELINE_NAME'
        job_id = event['CodePipeline.job']['id']
        reject_approve(pipeline_name)
        code_pipeline.put_job_success_result(jobId=job_id)

    except Exception as e:
        logger.error(e)
        raise e  

まとめ

pipelineでの手動承認やslack通知についての記事は調べればいくつか出てきますが、
案外こういう細かいケースでの対応はまだ無かったので投稿してみました。

他にいいやり方などありましたら、コメントいただけると幸いです。

続きを読む

Windowsで AWS IoTを体験してみる

「AWS IoTってどんなの?」を体験するため 日本語の AWSマネジメントコンソールから「デバイスの設定」ウィザードを利用して動作を確認します。

  • Windows10で行っております。
  • 予め Pythonと gitをインストールしておいてください。
  • ポート8883で外部インターネットに接続できる必要があります。
  • こちらの手順を行ったとしても「AWS IoTが動いた」以外の事は何一つわかりません

1. AWS IoT選択

AWS マネジメントコンソールから「AWS IoT」を選択。
構築するリージョンを右上で選択し「今すぐ始める」を選択します。

aws1.jpg

2. 「AWS IoTコンソールにようこそ。」

左のメニューから「オンボード」を選択します。

aws2.jpg

3. 「AWS IoTに接続する」その1

「デバイスの設定」の「今すぐ始める」を選択します。

aws3.jpg

4. 「AWS IoTに接続する」その2

作業の内容が表示されます。
査読後に「今すぐ始める」を選択します。

aws4.jpg

5. 「AWS IoTにどのように接続していますか?」

プラットフォームと、AWS IoTデバイスSDKの選択をします。
今回は Windowsと Pythonを選択します。

選択後「留意が必要な前提条件」が表示さるので確認し「次へ」を選択します。

aws5.jpg

6. 「モノの登録」 ステップ 1/3

名前に任意の名称を記入し、「次のステップ」を選択します。

aws6.jpg

7. 「接続キットのダウンロード」 ステップ 2/3

表示内容を確認し「Windows」を選択。
必要なファイル一式が、 connect_device_package.zipとしてダウンロードされます。
ダウンロードを確認したら「次のステップ」を選択します。

aws7.jpg

8. 「デバイスの設定とテスト」 ステップ 3/3

ダウンロードしたファイルを使用したテストの方法が表示されます。
ブラウザは この状態のままとし、「完了」は選択しないでください。

aws8.jpg

9. デバイスで接続キットを解凍する

ここから Windowsで操作を行います。
任意のフォルダでダウンロードした connect_device_package.zipを展開します。
こちらの例では PowerShell で Expand-Archiveを利用しました

aws9.jpg

10. 実行権限を追加する

「デバイスの設定とテスト」の説明通りにコマンドを入力します。
実行ポリシーの変更の確認を求められますので「Y」を入力します。

aws10.jpg

12. デバイスから AWS IoTを使用してデータの送信を行います

.start.ps1 を実行します。
SDK等はスクリプトが gitで取得します。
しばらくするとAWS マネジメントコンソールの画面にデバイスから送られたメッセージが表示されます。
※なんかこの画面 “sequence”が 0ばっかりですが…

aws11.jpg

13. デバイスにメッセージを送信する

ブラウザの「ステップ4:デバイスにメッセージを送信する」に文字列を入力し「メッセージ」を選択すると、

aws12_1.jpg

入力文字列が Windows側に表示されます。
※そのままですとデバッグ表示が多い為、見づらいですが….

aws12_2.jpg

おまけ

  • basicPubSub.py は「sdk/test/Python」トピックを使用しています。

  • .star.ps1 で実行されるスクリプトは、 aws-iot-device-sdk-pythonsamplesbasicPubSubbasicPubSub.py ファイルです。
    basicPubSub.pyの 74行目のデバック出力を logging.DEBUGから、logging.INFOにする事で表示が抑えられます。

# logger.setLevel(logging.DEBUG)
logger.setLevel(logging.INFO)

omake1.jpg

続きを読む

わざわざバッチサーバを用意してバッチ処理する時代を終わらせたい

この記事は リクルートライフスタイル Advent Calendar 2017 18日目の記事です。

はじめに

リクルートライフスタイルでレストランボードの開発を担当しております門脇と申します :bow:
今回はバックエンド側のお話をさせて頂こうと思います。

つい最近、どうしてもバッチ処理をおこなわないといけない場面にぶち当たってしまい、渋々作ることにしました :thinking:
自分の中では、「バッチ処理がこけた」とか「バッチサーバが死んでた」とか、社内外問わずそんなことを耳にする機会がよくあったように思うので、バッチ処理にいいイメージを持っていませんでした。
自分はバッチ処理を書いたことがないので、どういった構成にすればいいのかを自分なりに考えてみたところ、以下のような思考に辿り着きました。

  • アプリケーション(API)が作動しているサーバでは動かさない
    処理によって負荷が上がりサービス影響が出る可能性があるため
  • じゃあ別のサーバ(バッチサーバ)をたてて動かそう
    環境構築とか監視が面倒な気がする
  • データの更新対象はRDS上にあるし、Lambdaでなんとかなるんじゃないか?
    一旦やってみよう

ということで、試しにLambdaからRDS上にあるデータの更新処理をやってみることにしました。
わりと上手くいった気がするので、その共有を書いていきます。

サンプルケース

今回はサンプルケースとして以下のような処理を作成します。
(あまり良いのが思いつかなかったので、ケースとしてはちょっと微妙かもですがw)

日本全国(都道府県)の天気を5分毎に取得・更新する

  • OpenWeatherMapの Weather API を利用する(アクセス制限はあるものの無料利用可能)
  • 都道府県の位置情報は こちら を参考にさせて頂く
  • ニーズ:無料で自サービスに日本全国の天気を一覧・マップ等で表示したい

前提条件

  • RDSがVPC内にあるため、LambdaもVPC内にアクセス可能にする
    → RDSじゃなくとも、もし外部環境にDBがあるならホストの指定先を変えるだけでOKです(IP制限はしっかりしましょう)
  • Lambdaがインターネットへ接続されること

言語

  • Python(3.6.3)

※ async/await周りで実用的な処理も書いたので、バッチに興味が無くPythonに興味がある方にも参考になればと思います!
※ ただし、Python歴はほぼ皆無なので「こういう書き方があるよ」というのがあればコメント等フランクにして頂けますと幸いです!

DB

  • MySQL(5.7)

ソースコード

https://github.com/gates1de/weather-batch

ディレクトリ構成

├── /modules
├── README.md
├── http_client.py
├── logger.py
├── main.py
├── mysql.py
├── slack.py
├── util.py
└── weather_type.py

準備

VPC内のRDSに接続&インターネットへ接続する

こちらの記事が参考になりました。
手順だけさっくりと記述しておきます。

① VPCを作成する

いきなりですがVPCは作成しません :rolling_eyes:
というのも、上記の記事ではVPC作成からの手順を記載してくださっているのでそちらを参考にして頂ければと思います。
今回は誰でも試しやすいように、Default VPCでできるようにしていきます。
Default VPCは既に作成されているはずなのでこの手順はスキップします。

② インターネットゲートウェイを作成する

こちらも既に作成されているはずなのでスキップします。

③ サブネットを作成する

ここから少し作業します。
東京リージョンでAWSを使い始めたなら、既に ap-northeast-1aap-northeast-1c のPublicなサブネットが作成されているかと思います。
可用性を少しでも担保するなら、ここでPrivateなサブネットを2つ作成すべきですが、とりあえず1つだけ作成します。
例としては以下のとおりです。

  • Default VPCのCIDR: 172.31.0.0/16
  • Default ap-northeast-1aのpublicサブネット: 172.31.0.0/20
  • Default ap-northeast-1cのpublicサブネット: 172.31.16.0/20
  • この例の時に作成するprivateサブネット: 172.31.192.0/24 (VPCの範囲内かつ、上記2サブネットと範囲がかぶらなければOK)

④ NATゲートウェイを作成する

続いて、NATゲートウェイのページへ遷移し、作成ボタンを押下します。
以下のようなNATゲートウェイを作成します。

  • サブネット: Default ap-northeast-1aのpublicサブネット
  • EIP: 作成するもよし、既にあるものでもよし

これでインターネットに繋がるNATゲートウェイが作成されます。

⑤ ルートテーブルを作成する

最後に、ルートテーブルのページへ遷移し、作成ボタンを押下します。
名前タグの設定とDefault VPCを選択するのみで、特に設定はないですが作成後に以下の設定をおこないます。

  • 「ルート」の編集: 送信先 = 0.0.0.0/0, ターゲット = 先程作成したNATゲートウェイ
  • 「サブネットの関連付け」の編集: 先ほど作成したprivateサブネットを設定

⑥ VPCに対するアクセス権限を持ったIAMロールの作成

簡単に手順だけ記載します。

  1. IAMの「ロール」ページへ遷移し、作成ボタンを押下
  2. ロールを使用するサービス: Lambda
  3. ポリシー: AmazonRDSFullAccess, AWSLambdaVPCAccessExecutionRole
  4. 確認して作成

⑦ Lambdaを作成し、VPC・サブネット・セキュティグループを設定する

後はもう流れに沿ってLambda関数を作成するのみです。
Lambdaのページへ遷移し、関数の作成ボタンを押下して、⑥で作成したロールを選択します。
この記事ではPython3.6をランタイムとして使用しますが、お好きな言語で試してもらうのも良いかと思います。


だいぶ簡素ですがこれで一旦完了となります :thumbsup:

Lambdaでサードパーティ製のライブラリを利用する

AWSでの作業から離れまして、次は関数内で利用するライブラリの話です。
当たり前ではありますが、pip を利用してライブラリをインストールする作業はLambda上ではできません。
しかしながら、ライブラリのソースをLambdaにアップロードしてしまえば同じように利用可能です。
以下のようにして、必要なライブラリを任意のディレクトリ(関数となるファイルと同ディレクトリ内)に持ってきます。

$ pip install pymysql -t path/to/modules
$ pip install aiohttp -t path/to/modules
$ pip install slackclient -t path/to/modules

パスは main.py に以下を記載しておけば通ります。

import sys
sys.path.append('./modules')

楽すぎてビビりました :sweat_smile:

Lambdaにパッケージをアップロードする

上記ソースコードは、ひとまとまりのパッケージとしてzipで圧縮すればLambdaにアップロードすることが可能です。
注意するポイントとしては、ディレクトリを圧縮するのではなく、全ファイルを圧縮することです。

$ cd path/to/weather-batch
$ zip -r weather-batch.zip *

この weather-batch.zip を作成したLambda関数の設定からアップロードして完了です。

処理全容

さて、ようやく関数の処理について説明していきます。
大きくポイントを分けて以下に記載します。

その前に、前提としてLambdaの設定で環境変数をいくつか作成したので、 os.environ['hoge'] となっている部分はその環境変数の値を取得しています。

外部APIを並列で叩く

今回のケースで言うと、そこまで速度は必要のないバッチ処理ですが、ベストエフォートで処理を終わらせることを想定した作りにします。
ソースコードは以下のとおりです。

http_client.py
import aiohttp
import asyncio
import os

async def weather_api_request(id, lat, lon):
    # 環境変数から OpenWeatherMap のAppID(APIKey)を取得する
    app_id = os.environ['OPEN_WEATHER_MAP_APP_ID']

    # APIのURL指定(緯度・経度・AppIDをパラメータとして送付)
    url = 'http://api.openweathermap.org/data/2.5/weather?lat={}&lon={}&appid={}'.format(lat, lon, app_id)

    # 最大同時リクエスト数の指定
    semaphore = asyncio.Semaphore(5)
    async with semaphore:
        # セッション作成
        async with aiohttp.ClientSession() as session:

            # APIリクエスト
            async with session.get(url) as response:

                # レスポンスが返ってきたら、更新対象の weather_id と APIレスポンスをリターンする
                return (id, await response.json())

今までjsやswift, java等で async/await を見てきましたが, こんなにシンプルに書けてしまうのかと感動しました :sob:

恐らく、Pythonを書いてない人でも分かるくらいシンプルですが、個人的にポイントだと思う部分を説明します。

    # 最大同時リクエスト数の指定
    semaphore = asyncio.Semaphore(5)

上記の処理について、最大同時リクエスト数の指定と記載しましたが、semaphoreの言葉通り同時プロセス(コルーチン)の処理制限です。

ベストエフォートで処理を終わらせる

とは言ったものの、APIを提供しているサービス側に瞬間的でも高負荷をかけると、さすがに迷惑をかけてしまう & サーバ停止やリクエストエラー等の悪影響がこちらにも及んでしまうので、ここで同時処理制限をかけています。
まだLambda上ではこの数を5以上にしたことはないですが、EC2(t2.micro)上でこの処理を動かしたら「スレッド作れなかったわ」的なエラーで停止しました…(記録してなくてすみません)

あとは

                # レスポンスが返ってきたら、更新対象の weather_id と APIレスポンスをリターンする
                return (id, await response.json())

の部分は、同期的に返す値と非同期的に返す値を混在させてタプルを返すことができるので便利だなと感じました(小一時間どう返すのか悩みましたが)。

データ更新処理

上記の結果を受け取り、データを更新します。

main.py
import sys
sys.path.append('./modules')

import asyncio
import mysql as datasource
import http_client
import slack
import util
from weather_type import WeatherType

# lambda hundler
def lambda_hundler(event, context):
    main()

# local functions
async def get_task(weather_list):
    task_list = []
    for data in weather_list:
        id  = data.get('id')
        lat = data.get('lat')
        lon = data.get('lon')
        if all(update_value is not None for update_value in [id, lat, lon]):
            task_list.append(asyncio.ensure_future(http_client.weather_api_request(id, lat, lon)))

    return await asyncio.gather(*task_list)


def main():
    weather_list = datasource.read_data()
    loop = asyncio.get_event_loop()
    result_list = loop.run_until_complete(get_task(weather_list))

    for id, result in result_list:
        weather = util.safe_array_get(result.get('weather'), 0, {})
        weather_main = weather.get('main')
        weather_name = WeatherType(weather_main).japanese_name()

        if weather_name is not None:
            datasource.update_data(id, weather_name)
        else:
            slack.send_error_message(slack.error_message_format.format(id, result))
        continue

# lambda hundler
def lambda_hundler(event, context):
    main()

上記コードは、Lambdaのハンドラに登録する関数です。
ファイル名.関数名 という組み合わせになるはずなので、今回は main.lambda_hundler とします。
引数には eventcontext を設定しておくようです。


get_task 内で先程のAPIリクエスト処理を集めます。
asyncio.gather(*task_list) の部分で、処理順序の担保はしないものの、返却される値(配列)では順序を保持するような仕組みになっているようです。(参考記事)


main 内ではレスポンス取得とデータ更新処理をおこなっています。

    loop = asyncio.get_event_loop()
    result_list = loop.run_until_complete(get_task(weather_list))

もはや定型文レベルの記述ですが、非同期の処理が終わるまで待機して、処理された値を全て返すという大事な処理です。
こんなにもシンプルに書けてしまうのは素晴らしい。

    for id, result in result_list:
        weather = util.safe_array_get(result.get('weather'), 0, {})
        weather_main = weather.get('main')
        weather_name = WeatherType(weather_main).japanese_name()

ここはなんてことない処理ですが、 Swiftを普段書いている身としてはちょっと感動したポイントがあります。
WeatherType というのはその名の通り「天気種別 (晴れとかくもりとか)」です。
この中身を見てみましょう。

weather_type.py
from enum import Enum

class WeatherType(Enum):
    CLEAR  = "Clear"
    CLOUDS = "Clouds"
    RAIN   = "Rain"
    SNOW   = "Snow"

    def japanese_name(self):
        if self == WeatherType.CLEAR:
            return "晴れ"
        elif self == WeatherType.CLOUDS:
            return "くもり"
        elif self == WeatherType.RAIN:
            return "雨"
        elif self == WeatherType.SNOW:
            return "雪"
        else:
            return ""

これはほぼSwiftの enum と変わらない実装です(クラス実装という点では違いますが)。

enum WeatherType: String {
    case clear  = "Clear"
    case clouds = "Clouds"
    case rain   = "Rain"
    case snow   = "Snow"

    var japaneseName: String {
        switch self {
        case .clear:
            return "晴れ"
        case .clouds:
            return "くもり"
        case .rain:
            return "雨"
        case .snow:
            return "雪"
        }
    }
}

print("weather name = (WeatherType(rawValue: "Clear")!.japaneseName)") // weather name = 晴れ

呼び出し方すら似ている :smiley:


mysql.py に書いてある更新処理は特に目立った記述はないので説明省きます。

失敗したらslackに通知

これはなんてことない処理ですが、エラーを察知するのに非常に有効です。
main 内に以下のような処理がありました。

main.py
        if weather_name is not None:
            datasource.update_data(id, weather_name)
        else:
            slack.send_error_message(slack.error_message_format.format(id, result))
        continue

APIの結果に天気が入っていなければエラーを通知するようにしています。
エラー対象となった weatherid とレスポンス結果である result を通知します。
わざとエラーを起こす時間がなかったのでわざと処理を呼び出した結果がこちらです。
slack_error.png

イイネ :thumbsup:

実行結果

実際にLambdaで動かしている結果を載せるのが難しかったのでデータ更新の結果だけ載せます。 :pray:

before

before.png

after

after.png

おわりに

意志強めなタイトルでお送りしましたが、なるべくサーバのことを考えず(サーバレス)にバッチ処理をおこなうことができる時代なので、単純な処理なら上記のような方法で実現するのもアリなのではないでしょうか。
今回触れなかったですが、そもそもLambdaの関数にエラー(例外処理の発生など)があった時に通知できていないのでは?と思った方もいらっしゃるかと思いますが、実は CloudWatch で簡単にアラームの作成ができます。
Slack に通知みたいなフランクなことはできないですが、メール飛ばすくらいはできるので監視も簡単に実現可能です。

ちなみに、AWS Batchというサービスも提供されていますが、どうもこの記事のような定時処理をおこなうというよりは、レポート作成や機械学習などの集計・解析系に向いているようです(こちらの記事が参考になります)。
そもそもインターネットに接続して何か処理をおこなう想定ではなさそうです。

ということで、この記事と似たようなことを実現したい方は是非お試し頂ければ幸いです :bow:
また、細かい部分は端折ってしまったので、不明点等あればコメントに記載頂ければと思います!

最後に、外部のAPIを叩く時は利用条件・限度の他、APIを公開してくれているサービスに迷惑をかけないよう節度を考慮して叩くようにしましょう :bulb:

それでは!

続きを読む

Amazon GuardDutyのイベントをSplunkで検索・可視化

re:Invent 2017で発表されたGuardDutyですが、Splunkでそのデータを取り込んで分析できるとのことなので、さっそく試してみました。

記事(英語)
Splunk Announces New Integrations With Amazon Kinesis Firehose and Amazon GuardDuty

Amazon GuardDutyって?

Amazon GuardDuty – 継続したセキュリティ監視と脅威の検知

(抜粋)

GuardDutyは 脅威情報を含む複数のデータストリームから、悪意のあるIPアドレス、デバイスドメインを認識し、あなたのAWSアカウントで悪意のある、もしくは不正な行動があるか特定するために学習します。VPC Flow Logs、CloudTrail のイベントログ、DNS ログを集め組み合わせることにより、GuardDuty は非常に多くのことなったタイプの危険性のある、悪意のある行動を検知します。

なるほど、AWSリソースの脅威を機械学習で発見するんですね。

Splunkって?

https://www.splunk.com/
ログ分析のソフトウェア。
あらゆるマシンデータをインデックスして検索や可視化、アラート通知や分析ができるっていう優れモノ。

設定してみた

5つのステップで設定できます。
1. SplunkにAppインストール
2. Splunk HTTP Event Collector有効化
3. Amazon GuardDuty有効化
4. AWS Lambdaでテンプレから関数作成
5. AWS CloudWatchでGuardDutyとLambdaを設定したルールを作成

ということで、設定方法を書いていきます。

Splunk設定

まずはデータの受け口であるSplunkの設定から
App入れてHTTP Event Collector (HEC)有効化するだけです。

Appインストール

このApp↓をSplunkインスタンスにインストールしましょう。
AWS GuardDuty Add-on for Splunk

(補足)Appインストール方法

Splunkにログインした後、左側のメニューにある歯車アイコンをクリック
Screen Shot 2017-12-14 1.31.38 PM.png

上記リンク先からAppをダウンロードして ファイルからAppをインストール からインスコ、もしくは、 他のAppを参照 からGuardDutyを検索してインスコ
Screen Shot 2017-12-14 1.35.35 PM.png

データ入力設定

Appインストール完了後、ログイン後のトップ画面に aws_guardduty というAppが追加されています。
Screen Shot 2017-12-14 1.38.53 PM.png

早速 aws_guardduty に移動
Screen Shot 2017-12-14 1.41.51 PM.png

まだ何もデータが入ってきていない状態なので、データ受け取りとしてHTTP Event Collectorを設定します。

右上の 設定 から データ入力 をクリック
Screen Shot 2017-12-14 1.42.50 PM.png

HTTPイベントコレクタ をクリック
Screen Shot 2017-12-14 1.44.07 PM.png

別の記事でHECの設定方法は書いたので、これ以降の手順は割愛します。こちらを参照ください。
https://qiita.com/kikeyama/items/515d65906537239e04d2#splunk%E3%81%AE%E8%A8%AD%E5%AE%9A

(注意)ソースタイプは aws:cloudwatch:guardduty を選択してください。

設定後のトークンはどこかにコピペしておいてください。

GuardDuty設定

AWSコンソールからAmazon GuardDutyに行って有効化。

今すぐ始める をクリック
Screen Shot 2017-12-14 1.15.38 PM.png

GuardDutyの有効化 をクリック
Screen Shot 2017-12-14 1.16.47 PM.png

GuardDuty設定は完了
Screen Shot 2017-12-14 1.20.55 PM.png

今はまだ空っぽですけど、とりあえずGuardDutyの設定はこれでおしまいです。

Lambda設定

まずは 関数の作成
これでSplunkにHTTPでイベントをPOSTするインターフェースを作ります。
Screen Shot 2017-12-14 1.49.39 PM.png

設計図 (Blueprints) を選択して、検索画面に splunk と入力して検索
Screen Shot 2017-12-14 1.51.43 PM.png

Splunk Logging を選択
Screen Shot 2017-12-14 1.53.08 PM.png

下にスクロールすると環境変数の設定があるので、こちらにSplunkのHECエンドポイントURLとトークンを設定
Screen Shot 2017-12-14 1.58.09 PM_mosaic.png

で、名前をつけて保存

その後、作成した関数を編集して sourcetype の値を aws:cloudwatch:guardduty に上書き

index.js
    // Advanced:
    // Log event with user-specified request parameters - useful to set input settings per event vs token-level
    // Full list of request parameters available here:
    // http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
    logger.logEvent({
        time: Date.now(),
        host: 'serverless',
        source: `lambda:${context.functionName}`,
        sourcetype: 'aws:cloudwatch:guardduty',
        event: event,
    });

CloudWatch設定

ルールを一個作りましょう
Screen Shot 2017-12-14 2.08.15 PM.png

サービス名は GuardDuty 、ターゲットは Lambda関数 から、先ほど作成したLambda関数を選択
Screen Shot 2017-12-14 2.09.28 PM.png

あとは名前をつけて保存

以上、すべての設定は完了!

GuardDutyイベントを検索

ということで、しばらく待つとGuardDutyデータがSplunkにインデックスされてきました。

Screen Shot 2017-12-14 2.16.08 PM_mosaic.png

ダッシュボード

GuardDuty Appには既成のダッシュボードがあるようです。

GuardDuty Examples からダッシュボードに移動してみましょう
Screen Shot 2017-12-14 2.19.22 PM.png

Screen Shot 2017-12-14 12.42.27 PM.png

早速脅威が検知されてしまったみたいですね・・・。
比較的シンプルなダッシュボードですが、可視化やモニタリングには十分かな、と。
運用してみて足りない部分は自前でダッシュボード作ってみよう。

最後に

どうやらダッシュボード内のテーブルをクリックすると、Splunk App for AWSにドリルダウンできるようです。

Kinesis FirehoseからSplunkにデータを流せるとのことですし、せっかくなので近日中にこのAppも設定してみて記事を書いてみようかなと思います。

続きを読む

システム障害解析におけるログのあれこれ

この記事は Akatsuki Advent Calendar 2017 の 8 日目の記事です。
7日目: バイナリのビルド作業はそろそろボタンをポチるだけにしようぜ

背景

システムを運用していると、日々アプリケーション・ミドルウェア・インフラのログが蓄積されていきます。これらのログはシステムの障害対応・解析のための貴重な情報源となりますし、そうであることが期待されます。
しかし、これらのログの取り扱いを誤ると誤った障害解析結果を導き出してしまったり、解析にいたずらに時間がかかったり、障害を特定することができなかったりといったことが起こります。
今回はこれらのログを扱う上で注意すべき点とその改善案を紹介をしたいと思います。

前提

私はソーシャルゲームのインフラとサーバサイドアプリケーションを担当しており、下記のサービス・ソフトウェアを利用しています。

  • AWS

    • ELB (Classic)
    • EC2 (AmazonLinux)
    • RDS (MySQL)
    • CloudWatch
  • nginx
  • Ruby on Rails (unicorn)
  • 他 BigQuery, ElasticSearch, Re:dash, Kibana, Mackerel 等

そのタイムスタンプ、いつのもの?

通常、ログデータにはタイムスタンプが付けられていますが、このタイムスタンプは一体「いつ」の時刻を記録したものなのでしょうか。

ほとんどの場合、対象ソフトウェアが処理を開始した時間が記録されるのですが、実は例外もあります。
私の所属しているプロジェクトで使っているソフトウェアの中では nginx がこれに該当します。
nginx では「処理が完了した時刻 (= レスポンスを返した時刻)」が記録されます。

システムが正常に稼働している限りこれらの違いを気にすることは少ないと思いますが、障害解析時はその限りではありません。
各アプリケーション・ミドルウェア・インフラのログを少なくとも秒単位であわせ解析する必要があるため、各タイムスタンプが「いつ」の時刻を記録したものなのか把握していないと、誤った障害解析結果を導きかねません。
特にタイムアウト処理が絡んだ場合、レスポンスを返した時刻はリクエストを受けた時刻と大きな差が発生します。

何気なく記録されているログのタイムスタンプにも罠があります。ご注意ください。

必要な情報出してる?

前項で「nginx のタイムスタンプはレスポンスを返した時刻」と説明しましたが、ではいったいどうやって「処理を開始した時刻(= リクエストを受けた時刻)」を出力するのでしょうか。
実は nginx のデフォルトの設定ではこれができません。

nginx で「リクエストを受けた時刻」を記録する方法はいくつかあるようなのですが、最も簡単なのは「レスポンスを返すまでにかかった時間」を一緒に記録することです。ログの解析時にそれらの値を使って「リクエストを受けた時刻」を求めることができます。ログ解析時に前処理は必要になりますが、それを低コストで行える環境もあわせて用意しておくとよいです(後述)

(※ 最も良いのはもちろん予めログにリクエストを受けた時刻を記録することですが、ログ収集時に計算させる方法も可能です)

死ぬ前の情報は残した?

エラー時の情報は貴重です。この情報の有無で障害解析のスピードと精度は数倍変わってくるでしょう。しかし、中にはエラー時の情報を残さずに死んでしまうソフトウェアもあります。私のプロジェクトで利用しているもの中では unicorn がこれに該当します。

unicorn はリクエストを処理する worker プロセスと、workerプロセスを管理する masater プロセスから構成されます。
unicorn はタイムアウトの設定を持ち、worker プロセスの処理がこのタイムアウト内に完了しない場合、master プロセスは workerプロセスに対して即座に SIGKILL を送りつけます。その結果、「タイムアウト内に完了しなかった処理」がログに記録されないという事態が発生します。

これに対する改善策はいくつかあります。

  1. より上位にあるソフトウェアで記録を残す

    • 具体的には ELB や nginx でログを残す。当該リクエストを処理したホストの情報、エラーコード、エンドポイント等を記録する。
  2. SIGKILL の代わりにトラップ可能な SIGINT 等を利用し、そこで Rails.logger.flush させる
  3. Rails の ActionController の around_action で “ソフトな” タイムアウトを設定する
around_action :global_timeout

def global_timeout
  Timeout.timeout(TIMEOUT_SEC) do
    yield
  end
end

私の所属するプロジェクトで実際に適用されているのはまだ1のみですが、2,3の手法も評価していく予定です。

ログデータ膨大すぎるんだけど…

正確な障害解析には普段から多くの情報を取得しておく必要がありますが、その結果、解析に時間がかかったり、そもそも普通のマシンでは処理ができなかったりといったことが発生します。
私の所属するプロジェクトでは、ログをBigQueryとElasticSearchに格納し、Re:dashやKibanaで可視化できる仕組みを構築しています。

普段はマクロなインフラメトリクス(や、売上情報等)を表示するために使っていますが、障害解析時はクエリを書くことで簡単に情報を絞り込んだり、可視化することができ、便利です。nginxのタイムスタンプ問題もクエリを書くことで簡単に解決できます。

(※ すべてBigQuery+Re:dash に統一化したいなぁ)

さいごに

障害解析は「より少ない情報、より少ない時間で原因を特定する」エクストリームスポーツではありませんし、そうあってはなりません。
エンジニアにエスパーの力を求めるのは間違っています。
また、「システムの癖を知った、長年の経験のあるエンジニアにしかできない作業」であってもなりません。

障害解析のために十分な情報を集めることや、スピーディに解析できる環境を用意することは言うほど簡単ではありませんし、コストもかかりますが、安定したサービスを提供するには必要不可欠なものです。

堅牢なシステムの構築は1日にして成らず、頑張っていきましょう。

続きを読む

api-gateway+lambdaからlambda・AWS batchを非同期でキックしてみた記事

最近はやりのchatopsをrubyで書いていたのですが、botが死ぬたびに原因調査を行うのがめんどくさくて、slackでメッセージを受け取るところと内部の処理を分離しました。
分離した処理はjenkinsのapiを使って処理していたのですが、せっかくなのでAWSに全てあげてしまうことにしました。
そこでlambdaからLambda・AWS Batchを非同期でキックした時のことを書こうと思います。

はじめに

本稿はslackにbotを多数動かす想定で設計しているので、個人でちゃちゃっと作りたい人はbotで完結したほうがいいと思います。
またlambdaの非同期でキックするところを中心に書くのでbot部分や周辺は割愛します。
(litaをECS上で動作 passなどはkmsで管理)

想定ユーザー

  • データ基盤の機能などの一部を社内ユーザーに解放したい
  • いくつかのbotを動かしたい
  • terraformで環境構築している
  • サーバーレスが好き

全体的な構成

以下構成図になります
– メッセージは全てlita一台で受け取ってApi-GatewayにPOST
– LambdaからLambdaかAWS batchを非同期で起動

スクリーンショット 2017-12-05 12.42.21.png

post.json
{
    "method":{
         "channel_name" : "****",
         "req_usr" : "***",
         "magic_words" : "特定の情報を渡す場所(ddl取る場合はスキーマとテーブル名など)"
    }
}

api-gatewayの部分

以下の記事にlambdaをキックするところまでを書いたので割愛します。
API Gatewayを叩いてLambdaからRedshiftにSQLを投げる(ついでにslackにsnippet通知)

lambda(kicker)の部分

  • post内容から呼び出したい関数名を取り出す
  • 関数名のlambdaがあればそれをキック
  • 関数名に対応するAWS batchがあればそれをキック
kicker.py
import simplejson as json
import boto3
import logging


logger = logging.getLogger()
logger.setLevel(logging.INFO)

def handler(event, context):
    event_dict = json.loads(event["body"])
    function_name = list(event_dict.keys())[0]
    if function_name == "kicker":
        status_code = 404
        body_text = "Kicker loop error : " + function_name
        logger.error(body_text)
    elif kick_lambda(function_name, event):
        status_code = 201
        body_text = "kick lambda : " + function_name
        logger.info(body_text)
    elif kick_batch(function_name, event):
        status_code = 201
        body_text = "kick batch : " + function_name
        logger.info(body_text)
    else:
        status_code = 404
        body_text = "No Method Error : " + function_name
        logger.error(body_text)

    responseObj = {
        "statusCode": status_code,
        "body": body_text
    }
    return responseObj


def kick_lambda(function_name, event):
    try:
        clientLambda = boto3.client("lambda")
        res = clientLambda.invoke(
            FunctionName=function_name,
            InvocationType="Event",
            Payload=json.dumps(event)
        )
        return True
    except:
        return False


def kick_batch(function_name, event):
    try:
        clientBatch = boto3.client("batch")
        res = clientBatch.submit_job(
            jobName=function_name,
            jobQueue=function_name + "_queue",
            jobDefinition=function_name + "_job_definition",
            parameters={
                'event' : json.dumps(event)
            }
        )
        return True
    except:
        return False
    return False


if __name__ == '__main__':
    handler('', '')

非同期で呼び出されたlambda(右側)の部分

eventをそのまま渡しているので以下の記事のように色々作ればOK
API Gatewayを叩いてLambdaからRedshiftにSQLを投げる(ついでにslackにsnippet通知)

非同期で呼び出されたAWS batch(右側)の部分

kickerで定義された名前に対応するものを作成しておけばOK
例のものであれば
– aws_batch_job_queueのname属性が{function_name}_queue
– aws_batch_job_definitionのname属性が{function_name}_job_definition

kicker.pyの一部
def kick_batch(function_name, event):
    try:
        clientBatch = boto3.client("batch")
        res = clientBatch.submit_job(
            jobName=function_name,
            jobQueue=function_name + "_queue",
            jobDefinition=function_name + "_job_definition",
            parameters={
                'event' : json.dumps(event)
            }
        )
        return True
    except:
        return False
    return False

終わりに

全体的にスッキリして機能追加などがかなり簡単になりました。
あとはterraform・lambdaのzipファイル・AWS batchのコンテナイメージをどうやって運用するかが考え所な感じがしました

続きを読む