Lambdaの実行環境にフォントを追加する

はじめに

AWS LambdaでPhantomJS日本語フォント対応では fontconfig をビルドしてデプロイパッケージに含めているが、 Lambdaの実行環境を確認したところ、fontconfig は導入されている。
したがって、フォントキャッシュさえ生成すれば、実行環境のfontconfigが利用できる。

なおLambdaの実行環境のfontconfigはXDGには対応していない。
検証時、fc-cache のバージョンは 2.8.0 であった。

Lambda の実行時 LAMBDA_TASK_ROOT=/var/task/share/fonts となっている。

fontconfig は

~/.fonts.conf
~/.fonts
~/.fontconfig

は見るので、HOME=$LAMBDA_TASK_ROOT に設定して、これらが

$HOME/
        .fontconfig
        .fonts
        .fonts.conf

というツリー構造で見えるようにデプロイパッケージに含めてやれば良い。

フォントキャッシュの生成

NotoSansCJK のパッケージから NotoSansCJK-Regular.ttc を抽出して、.fonts に配置する。

Lambda の実行時に $LAMBDA_TASK_ROOT/.fontconfig は書き込みできないため、そのままでは fc-cache の実行でキャッシュファイルの作成に失敗する。

いったん /tmp/cache/fontconfig にキャッシュを生成して、デプロイパッケージでは .fontconfig に配置する。

このため、次の内容で .fonts.conf を作成する。

fonts.conf
<fontconfig>
    <cachedir>/tmp/cache/fontconfig</cachedir>
</fontconfig>

フォントキャッシュを生成する Lambda 関数は次の通り。

fontcache.py
from __future__ import print_function
import subprocess

import sys
import os
import base64
import boto3
from os.path import join

logger = logging.getLogger()  
logger.setLevel(logging.INFO)  

BUCKET_NAME = 'バケット名'

def lambda_handler(event, context):
    os.environ['HOME'] = os.environ['LAMBDA_TASK_ROOT']
    try:
        os.makedirs('/tmp/cache/fontconfig')
    except OSError as e:
        (errno, strerror) = e
        print('OSError {}'.format(strerror))

    args = [ 'fc-cache', '-v', join(os.environ['HOME'], '.fonts') ]
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    returncode = p.returncode
    stdout_data, stderr_data = p.communicate()

    print('stdout_data:n' + stdout_data)
    print('stderr_data:n' + stderr_data)
    s3bucket = boto3.resource('s3').Bucket(BUCKET_NAME)
    tmp_fontconfig = '/tmp/cache/fontconfig'
    for cache in os.listdir(tmp_fontconfig):
        response = s3bucket.upload_file(join(tmp_fontconfig, cache), cache)
        with open(join(tmp_fontconfig, cache), mode='rb') as f:
            print("{}:n{}n".format(cache, base64.b64encode(f.read())))

デプロイパッケージの内容

% unzip -l fontcache.zip 
Archive:  fontcache.zip
  Length     Date   Time    Name
 --------    ----   ----    ----
     1116  10-16-17 15:34   fontcache.py
      105  10-12-17 20:54   .fonts.conf
        0  10-12-17 22:01   .fonts/
 18748872  10-12-17 17:04   .fonts/NotoSansCJK-Regular.ttc
 --------                   -------
 18750093                   4 files

この Lambda 関数を実行すると、S3 にキャッシュファイルがアップロードされる。

テスト

先ほど生成したキャッシュファイルを .fontconfig に配置する。

phantomjs のロードモジュールと rasterize.jsbin ディレクトリに配置する。

phantomjs でのスクリーンキャプチャーは、Screen Capture | PhantomJSを参考に、 rasterize.js 使用してテストプログラムを作成した。

phantomjs.py
from __future__ import print_function
import subprocess
import tempfile
import os
import boto3

def phantomjs(url, bucket, name):
    from os.path import join
    run_dir = os.environ['LAMBDA_TASK_ROOT']
    bin_dir = join(run_dir, 'bin')
    args = [ join(bin_dir, 'phantomjs'), join(bin_dir, 'rasterize.js'), url ]

    with tempfile.NamedTemporaryFile(suffix='.png') as f:
        args.append(f.name)
        print('{}n'.format(' '.join(args)))
        p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        returncode = p.returncode
        stdout_data, stderr_data = p.communicate()

        s3 = boto3.resource('s3')
        bucket = s3.Bucket(bucket)
        response = bucket.upload_file(f.name, name)

    print('returncode {}n'.format(returncode))
    print('stdout:: n{}n'.format(stdout_data.decode('utf-8')))
    print('stderr:: n{}n'.format(stderr_data.decode('utf-8')))
    print('stderr:: n{}n'.format(stderr_data.decode('utf-8')))
    print('response {}n'.format(response))

def lambda_handler(event, context):
    os.environ['HOME'] = os.environ['LAMBDA_TASK_ROOT']
    os.chdir(os.environ['LAMBDA_TASK_ROOT'])
    phantomjs('https://www.amazon.co.jp', 'バケット名', 'screenshot.png')

デプロイパッケージの内容

% unzip -l phantomjs.zip 
Archive:  phantomjs.zip
  Length     Date   Time    Name
 --------    ----   ----    ----
     1304  10-13-17 17:09   phantomjs.py
      105  10-12-17 20:54   .fonts.conf
        0  10-12-17 22:01   .fonts/
 18748872  10-12-17 17:04   .fonts/NotoSansCJK-Regular.ttc
        0  10-12-17 23:05   .fontconfig/
    12792  10-12-17 22:27   .fontconfig/996789b4ba9d471a5fd80c008c2b2acf-le64.cache-3
 67932064  01-25-16 10:01   bin/phantomjs
     2241  10-12-17 23:19   bin/rasterize.js
 --------                   -------
 86697378                   8 files

この Lambda 関数を実行して、うまく画面がキャプチャーされていればよい。

phantomjs の実行のために、Lambda が使用するメモリを192MB以上にしておかないと、メモリ不足で処理が途中で打ち切られる。

続きを読む

[AWS][Python3]SESでIAM認証情報を変換してSMTP 認証情報を取得する

SESをSMTPを利用して送信したい

SESを利用するならばboto3を利用するのが一番簡単かと思いますが,
ローカル開発環境ではMailHog等の開発用SMTPサーバを用いて動作確認を行なっている環境で
汎用的なsmtplibをテスト・本番で利用したいというのがそもそものはじまり

SMTPを利用するには

通常のsmtp同様以下のような形で行えます

email.py
def send_mail(self, host, user, password, msg):
  smtp_con = smtplib.SMTP_SSL(host=host)
  smtp_con.ehlo()
  smtp_con.login(user, password)
  result = smtp_con.send_message(msg=msg)
  smtp_con.close()

が、ここで渡すpasswordが曲者

以下のリンクを見るとわかりますが
通常boto3などのAPI経由であれば、パスワードにはIAMのAWS_SECRET_ACCESS_KEYを用いるが
SMTP認証情報にはAWS_SECRET_ACCESS_KEY を Amazon SES SMTPパスワードに変換したものを利用する必要があります
Amazon SES SMTP 認証情報の取得

新たにSMTP認証情報を持つIAMユーザを作る方法もありますが
今回はAWS_SECRET_ACCESS_KEY->SMTPCredentialsの変換を行います

Amazon SES SMTP Credentialsを python3 で生成する

実際に利用したコードがこちら
keyにAWS_SECRET_ACCESS_KEYを渡してあげればSMTP認証に利用できるパスワードが返却される

aws_ses_hash.py
def hash_smtp_pass_from_secret_key(self, key):
    message = "SendRawEmail"
    sig_bytes = bytearray(b'x02')

    h = hmac.new(key.encode(), message.encode(), digestmod=hashlib.sha256)
    digest = h.hexdigest()

    sig_bytes.extend(bytearray.fromhex(digest))

    return base64.b64encode(sig_bytes).decode()

ちなみにpython2であれば以下が参考になります
■[tips][aws][python][ruby] Amazon SES SMTP Credentialsをpythonやrubyで作ってみる

続きを読む

【AWS Batch】1コマンドでdockerを定期実行する

CloudWatch EventsでLambdaを定期実行し、LambdaからAWS Batch上でdockerが動くようにする。

scheduled-docker-batch.yaml
Parameters:
  SubnetIds:
    Description: Subnets For ComputeEnvironment
    Type: List<AWS::EC2::Subnet::Id>
  SecurityGroupIds:
    Description: SecurityGroups For ComputeEnvironment
    Type: List<AWS::EC2::SecurityGroup::Id>
Resources:
  BatchComputeSpotFleetRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: spotfleet.amazonaws.com
            Action: "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonEC2SpotFleetRole
  BatchComputeEnvironmentRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: batch.amazonaws.com
            Action: "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSBatchServiceRole
  BatchComputeInstanceRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: ec2.amazonaws.com
            Action: "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AmazonEC2ContainerServiceforEC2Role
  BatchComputeInstanceProfile:
    Type: "AWS::IAM::InstanceProfile"
    Properties:
      Path: "/"
      Roles:
        - Ref: BatchComputeInstanceRole
  BatchJobQueue:
    Type: "AWS::Batch::JobQueue"
    Properties:
      JobQueueName: scheduled-job-queue
      Priority: 1
      ComputeEnvironmentOrder:
        - Order: 1
          ComputeEnvironment: !Ref BatchComputeEnvironment
  BatchComputeEnvironment:
    Type: AWS::Batch::ComputeEnvironment
    Properties:
      Type: MANAGED
      ServiceRole: !Ref BatchComputeEnvironmentRole
      ComputeEnvironmentName: scheduled-batch-compute-environment
      ComputeResources:
        Type: SPOT
        SecurityGroupIds: !Ref SecurityGroupIds
        Subnets: !Ref SubnetIds
        MaxvCpus: 128
        MinvCpus: 0
        DesiredvCpus: 0
        InstanceRole: !Ref BatchComputeInstanceProfile
        InstanceTypes:
          - optimal
        SpotIamFleetRole: !Ref BatchComputeSpotFleetRole
        BidPercentage: 50
      State: ENABLED
  JobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      Type: container
      JobDefinitionName: schedule-batch-example
      ContainerProperties:
        Command:
          - /hello
        Memory: 256
        Vcpus: 1
        Image: hello-world
      RetryStrategy:
        Attempts: 1
  LambdaExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: "sts:AssumeRole"
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        - PolicyName: CustomLocalPolicy
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: Allow
                Action:
                  - batch:SubmitJob
                Resource:
                  - "*"
  JobSubmitFunction:
    Type: "AWS::Lambda::Function"
    Properties:
      # Handler: "index.handler"
      Handler: "index.handler"
      Role: !GetAtt [ LambdaExecutionRole, Arn ]
      Code:
        ZipFile: |
          import os
          import boto3
          def handler(event, context):
              params = {
                  'jobName': 'schedule-sample-job',
                  'jobQueue': os.environ.get('JOB_QUEUE'),
                  'jobDefinition': os.environ.get('JOB_DEFINITION')
              }
              print(params)
              client = boto3.client('batch')
              res = client.submit_job(**params)
              print(res)
      Runtime: "python3.6"
      Environment:
        Variables:
          JOB_QUEUE: !Ref BatchJobQueue
          JOB_DEFINITION: !Ref JobDefinition
  JobSubmitEventRule:
    Type: AWS::Events::Rule
    Properties:
      ScheduleExpression: rate(5 minutes)
      Targets:
        - Id: JobSubmitScheduler
          Arn:
            Fn::GetAtt:
              - JobSubmitFunction
              - Arn
  InvokeLambdaPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName:
        Fn::GetAtt:
          - JobSubmitFunction
          - Arn
      Action: lambda:InvokeFunction
      Principal: events.amazonaws.com
      SourceArn:
        Fn::GetAtt:
          - JobSubmitEventRule
          - Arn

実行方法

$ aws cloudformation create-stack --stack-name schedule-docker-batch 
  --template-body file://./schedule-docker-batch.yaml 
  --capabilities CAPABILITY_IAM 
  --parameters ParameterKey=SubnetIds,ParameterValue="subnet-xxxxxxx" ParameterKey=SubnetIds,ParameterValue="subnet-yyyyyyy" ParameterKey=SecurityGroupIds,ParameterValue="sg-zzzzzzzz"

よしなにJobDefinitionを編集して好きなようにdockerを動かす。

続きを読む

LambdaとKinesisStreamによる非同期並列処理

はじめに

Kinesis Streamに入れたログをlambdaで処理するときlambdaはシャードに対して直列で動きます
Kinesisにログが入るスピード < ログ1件あたりのlambdaの処理時間となると、ログが入ってから処理が完了するまでの時間がどんどん開いていきます
シャードを分割すればlambdaも並列で動くのですが、分割するほどログも多くない
ということで、シャードは一つですが非同期に並列で処理できる仕組みを考えました
言語はpython
自分自身を再帰的に呼び出します

参考

lambdaとkinesisの設定方法は他を参考にしてください
AWS Lambda編~Kinesisと連携してみる~
ストリーム型(kinesis stream)のAWS Lambdaの同時起動数とデータの取り方を整理

設定

デプロイしたlambdaにkinisisを紐づけます
一度に処理するログの数は10
スクリーンショット 2017-09-28 18.48.22.png

コード

非同期処理へ投げる

  • Kinesisへ投げるログはjson
  • Kinesisから流れてきたログは配列に入れ、async_flagをつけて非同期実行へ投げる
  • async_flagが付いている場合は、データを取得してループを回す
run.py

import json
import base64
import time
from aws_lambda import AwsLambdaModule



def run(event, context):

    data_dict = {}
    data_dict['async_flag'] = True
    data_dict['data'] = []


    if 'async_flag' in event:
        # 非同期で呼び出される場合
        main(event['data'])

    else:
        # 非同期で呼び出す場合
        # キネシスストリームから起動した場合
        for record in event['Records']:

            # kinesisのデータを戻す
            kinesis_data = load_kinesis_data(record['kinesis']['data'])
            # 詰め替える
            data_dict['data'].append(kinesis_data)


        # 非同期で再帰呼び出し
        lambda_function = AwsLambdaModule()
        lambda_function.invoke_async(data_dict)





def main(data_):
    for data in data_:
        time.sleep(2)
        print(data)




def load_kinesis_data(data):
    """
    kinesisから受け取ったデータはエンコードされているため
    """
    payload = base64.b64decode(data)
    return json.loads(payload)

lambdaの非同期呼び出し

aws_lambda.py

import os
import json
import boto3




class AwsLambdaModule(object):



    def __init__(self):
        self.client = boto3.client('lambda')
        self.function_name = os.environ.get('AWS_LAMBDA_FUNCTION_NAME')



    def invoke_async(self, data):
        """
        lambdaを非同期呼び出し
        """
        if self.function_name is None:
            return

        print('### Asynchronous execution ###')
        self.client.invoke(
            FunctionName=self.function_name,
            InvocationType="Event",
            Payload=json.dumps(data)
        )


結果

100連続でKinesisへログを挿入

{"test":"momonga"}

スクリーンショット 2017-10-02 16.23.05.png

非同期処理への命令と2秒ごとにログをダンプしている処理が非同期で動作していることがわかります

続きを読む

サーバ死活監視→メール送付をLambda、AmazonSNSで実現する

AWSを使ってのサーバの死活監視ってよくやると思うんですが、大体問題があった時の通知の飛ばし先はSlackだったりします。(無料だし)

とはいえ仕事で使おうとすると、Slackは見れない状況もあるかもしれないと思い、メールで通知を飛ばす、という観点でAWSを利用し実現してみました。

最終的に利用できるようにしたものは以下に登録してあります。

https://github.com/kojiisd/aws-server-monitor

サーバ監視からのメール送付までの仕組み

公開されているサービスに対してREST通信を行い、そのステータスコードを見るだけのシンプルな死活監視を行います。

ただし何も考えずに実装すると、エラーが起きてから回復するまで、定期実行のたびに何度もエラーメールを投げてしまう仕組みとなるため、管理する側としてはうっとおしくもあり(何回もメールが来る)コスト的にも嬉しくありません。またエラー発生時だけメールを飛ばす仕組みとすると、復旧したかどうかがわからず、管理者はヒヤヒヤすることになります。

そこで、現在のステータスをDynamoDBで管理するようにし、状態が変更されたタイミングでメールを送る(異常発生/復旧)という仕組みにしてみました。

監視対象のサーバ情報はS3から取得してくるようにします。

まずはサーバ死活監視とメール送付部分の作成

状態の監視は後ほど加えるとして、まずはメインとなるサーバ死活監視とメール送付のLambda実装です。Python3.6で実装しています。

サーバ死活監視処理

S3から監視対象のデータを持ってくるところと、サーバ監視を実施してエラーが発生したサーバの一覧を作成する処理です。Lambda実行の際の権限の割り振り忘れに注意。

def get_target_servers():
    s3 = boto3.resource('s3')
    obj = s3.Object(BUCKET_NAME, OBJECT_NAME)
    response = obj.get()
    body = response['Body'].read()
    return body.decode('utf-8')

def check_target_servers(target_json):
    data = json.loads(target_json)
    servers = data['servers']

    error_servers = []

    for server in servers:
        name = server['name']
        url = server['url']
        try:
            res = requests.get(url)
            if res.status_code != 200:
                error_servers.append(server)
        except Exception:
            error_servers.append(server)

    if len(error_servers) == 0:
        print("Successful finished servers checking")
    else:
        response = send_error(name, url, error_servers)
        print("Error occured:")
        print(response)
        print(error_servers)

S3上には以下のようなjsonデータを配置するようにします。

{
    "servers": [
      { "name": "googlea", "url": "http://www.google.coma" },
      { "name": "googleb", "url": "http://www.google.comb" },
      { "name": "google", "url": "http://www.google.com" }
    ]
}

上記を監視対象とすると、最後以外がアクセス失敗するので、エラーメールが飛ぶことになります。
以下の値は環境変数として定義しており、Lambda実行前に定義が必要です。

変数名 内容
S3_BUCKET_NAME S3の対象バケット名
S3_OBJECT_NAME S3の対象オブジェクト名
SNS_TOPICS_NAME SNSの送付対象トピック名
DDB_TABLE_NAME DynamoDBのテーブル名

メール送付処理

とりあえずこんな感じで書けばSNSは呼び出せるので、これをカスタマイズします。

import json
import boto3

sns = boto3.client('sns')

def lambda_handler(event, context):
    sns_message = "Test email"

    topic = 'arn:aws:sns:us-east-1:<ACCOUNT_ID>:<TOPICS_NAME>'
    subject = 'Test-email'
    response = sns.publish(
        TopicArn=topic,
        Message=sns_message,
        Subject=subject
    )
    return 'Success'

カスタマイズをしてこんな感じでメソッドにして組み込みました。

def send_error(name, url, error_servers):
    sns = boto3.client('sns')
    sns_message = "Error happens:nn" + json.dumps(error_servers, indent=4, separators=(',', ': '))

    subject = '[ServerMonitor] Error happens'
    response = sns.publish(
        TopicArn=SNS_TOPICS_NAME,
        Message=sns_message,
        Subject=subject
    )

    return response

死活監視でエラーが発生すると以下のようなメールを受け取れます。内容は質素ですが、とりあえずこれで良しとします。

Error happens:

[
    {
        "name": "googlea",
        "url": "http://www.google.coma"
    },
    {
        "name": "googleb",
        "url": "http://www.google.comb"
    }
]

DynamoDBを使った状態管理

さて、このままでも監視はできますが、エラー発生と復旧がわかった方が良いので、DynamoDBを使って状態管理をし、変化があった場合にメールを送付するように改修します。

「url」をプライマリキー、「name」をソートキー(レンジキー)として「server-monitor」というテーブルを作成します。
取得と実装はこんな感じになります。これをカスタマイズして今回の処理に適用します。

import boto3
import json
from boto3.dynamodb.conditions import Key, Attr


dynamodb = boto3.resource('dynamodb')
table    = dynamodb.Table('server-monitor')

def lambda_handler(event, context):
    #add_server()
    #get_server()

    return 'Finish operation'

def get_server():

    items = table.get_item(
            Key={
                 "url": "http://www.google.com",
                 "name": "google"
            }
        )

    print(items['Item'])

def add_server():
    table.put_item(
            Item={
                 "url": "http://www.google.com",
                 "name": "google",
                 "status": True
            }
        )    

カスタマイズして組み込んだ結果はこちらになります。

def check_status(url, name):
    status_ok = True
    try:
        items = dynamodb.Table(DDB_TABLE_NAME).get_item(
                Key={
                    "url": url,
                    "name": name
                }
            )
        status_ok = items['Item']['status']
    except:
        status_ok = None
    return status_ok

def add_server(url, name, status):
    dynamodb.Table(DDB_TABLE_NAME).put_item(
        Item={
                "url": url,
                "name": name,
                "status": status
        }
    )

死活監視後のエラー判定部分も、DynamoDBから現在の状況を確認してメール通知をする処理の追加が必要になります。こんな感じです。

def check_target_servers(target_json):
    data = json.loads(target_json)
    servers = data['servers']

    status_changed_servers = []

    for server in servers:
        name = server['name']
        url = server['url']
        status_ok = check_status(url, name)
        try:
            res = requests.get(url)
            if res.status_code != 200:
                if status_ok != False:
                    server['status'] = "Error"
                    status_changed_servers.append(server)
                add_server(url, name, False)
            else:
                if status_ok == False:
                    server['status'] = "Recover"
                    status_changed_servers.append(server)
                add_server(url, name, True)
        except Exception:
            if status_ok != False:
                server['status'] = "Error"
                status_changed_servers.append(server)
            add_server(url, name, False)

    if len(status_changed_servers) == 0:
        print("Successful finished servers checking")
    else:
        response = send_error(name, url, status_changed_servers)
        print("Error occured:")
        print(response)
        print(status_changed_servers)

動作確認

今までの実装でエラーが発生した時には「Error」と、復旧した時には「Recover」という内容のメールが送信されるようになっているはずです。動作確認をしてみます。

以前作成したWebページがS3上にあるので、アクセス権限を変更してテストしてみます。

Three.jsのかっこいいサンプルとAWSを連携させてみた

OKパターンで実行してみる。。。

メールは送信されず、DynamoDBにデータだけ追加されました。

スクリーンショット 2017-09-24 19.17.32.png

さて、アクセス権限を変更して、アクセスできなくしてから再度実行してみます。無事エラーが発生してメールが飛んで来ました。状態は「Error」です。

Server Status Changed happens:

[
    {
        "name": "s3-test",
        "url": "https://s3.amazonaws.com/xxxxxx/xxxx/css3d_periodictable.html",
        "status": "Error"
    }
]

DynamoDBのデータもエラーを表す「false」にstatusカラムが変更されています。

スクリーンショット 2017-09-24 19.21.23.png

もう一度実行しても、エラーメールは飛んでこないようです。無事状態判定をしてくれています。DynamoDBの値も変化なしです。(キャプチャだけだと何もわかりませんがw)

スクリーンショット 2017-09-24 19.23.43.png

それではページを復旧してみます。またアクセス権限を元に戻してアクセス可能にし、サーバ死活監視処理を実行してみます。

Server Status Changed happens:

[
    {
        "name": "s3-test",
        "url": "https://s3.amazonaws.com/xxxxxx/xxxx/css3d_periodictable.html",
        "status": "Recover"
    }
]

無事復旧メールが届きました。これで完成です。

まとめ

サーバの死活監視を、状態管理しながら実施してみました。今回はAWSのサービスを中心に実現しましたが、おかげでトータル数時間レベルで実現できました。かかるコストも抑えつつ、実際に使えそうなものがこれくらいスピーディーに作れてしまうのは、さすがAWSのマネージドサービス、といったところです。

[おまけ]ライブラリの管理

Lambdaでデプロイパッケージを作成する際、普通に実装すると外部ライブラリをプロジェクトのルートディレクトリに置くことになるので、あまり綺麗なパッケージ構成になりません。

そこで今回、こちらのページを参考にさせてもらいながら、別ディレクトリにパッケージを配置してく方法をとりました。

# pip install -U requests -t ./lib

で、Pythonにはこういう処理を追加する。

sys.path.append(os.path.join(os.path.abspath(os.path.dirname(__file__)), 'lib'))

import requests

こうするだけでlib配下のパッケージを見に行ってくれるようになりました。これはディレクトリ管理することを考えるとかなり助かりました。実際serverlessコマンドでデプロイしても、正しく動きました。

続きを読む

メールにパスワード付きzipを添付して「パスワードは別途お送りいたします」とする慣習がめんどくさいのでなんとかした

あの慣習

メールにパスワード付きzipを添付して「パスワードは別途お送りいたします」とする慣習、ありますよね。
自分からはやらないけど、相手に合わせてやらざるを得なかったりしてめんどくさい。

ここでは、このやり方の是非は問題にしません。
どんなに是非を説いても、この慣習があるという状況は変わらないので。

そして、この慣習を無くすことも考えません。
そういうのは巨大な力を持った何かにおまかせします。

昔のエラい人は言いました。「長いものには巻かれろ」と。
ただし、巻かれ方は考えたほうがいいと思うのです。

スマートな巻かれ方を考える

巻かれるにあたって、解決したいことはただ一つ。めんどくさくないこと。
このためにWebシステム作って、ブラウザ開いてどうのこうのなんてやってると本末転倒です。
可能な限り、普通のメール送信に近い形で実現したい。

というわけで、あれこれ考えた末、一部の制約を許容しつつ、AmazonSESを使ってサーバーレスな感じで解決してみました。

仕様

  1. 普通にメールを書く(新規・返信・転送問わず)
  2. ファイルをzipで固めずにそのまま放り込む
  3. SES宛のメールアドレスをToに、実際にファイルを送りたい相手をReply-Toに設定する。
  4. システムを信じて送信ボタンを押す
  5. 自分と相手に、パスワード付きzipが添付されたメールとパスワードのお知らせメールが届く

ただし、以下の制約があります。個人的には許容範囲です。

  • 結果的に相手方には全員Toで届く。Ccはできない(自分はBcc)
  • zipファイルの名前は日時(yymmddHHMMSS.zip)になる(中身のファイル名はそのまま)

システム構成

flow_01.png

  1. SESに宛ててメールを送る
  2. メールデータがS3に保存される
  3. それをトリガーにしてLambdaが起動する
  4. Lambdaがメールの内容を解析してパスワードとzipファイルを生成する
  5. いい感じにメールを送る(念のため自分にもBccで送る)

実装

Lambda

真面目にpython書いたの初めてだけどこんな感じでいいのかな?
大体メールと文字コードとファイルとの戦いです。

# -*- coding: utf-8 -*-

import os
import sys
import string
import random
import json
import urllib.parse
import boto3
import re
import smtplib
import email
import base64
from email                import encoders
from email.header         import decode_header
from email.mime.base      import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text      import MIMEText
from email.mime.image     import MIMEImage
from datetime             import datetime

sys.path.append(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'vendored'))
import pyminizip

s3 = boto3.client('s3')

class MailParser(object):
    """
    メール解析クラス
    (参考) http://qiita.com/sayamada/items/a42d344fa343cd80cf86
    """

    def __init__(self, email_string):
        """
        初期化
        """
        self.email_message    = email.message_from_string(email_string)
        self.subject          = None
        self.from_address     = None
        self.reply_to_address = None
        self.body             = ""
        self.attach_file_list = []

        # emlの解釈
        self._parse()

    def get_attr_data(self):
        """
        メールデータの取得
        """
        attr = {
                "from":         self.from_address,
                "reply_to":     self.reply_to_address,
                "subject":      self.subject,
                "body":         self.body,
                "attach_files": self.attach_file_list
                }
        return attr


    def _parse(self):
        """
        メールファイルの解析
        """

        # メッセージヘッダ部分の解析
        self.subject          = self._get_decoded_header("Subject")
        self.from_address     = self._get_decoded_header("From")
        self.reply_to_address = self._get_decoded_header("Reply-To")

        # メールアドレスの文字列だけ抽出する
        from_list =  re.findall(r"<(.*@.*)>", self.from_address)
        if from_list:
            self.from_address = from_list[0]
        reply_to_list =  re.findall(r"<(.*@.*)>", self.reply_to_address)
        if reply_to_list:
            self.reply_to_address = ','.join(reply_to_list)

        # メッセージ本文部分の解析
        for part in self.email_message.walk():
            # ContentTypeがmultipartの場合は実際のコンテンツはさらに
            # 中のpartにあるので読み飛ばす
            if part.get_content_maintype() == 'multipart':
                continue
            # ファイル名の取得
            attach_fname = part.get_filename()
            # ファイル名がない場合は本文のはず
            if not attach_fname:
                charset = str(part.get_content_charset())
                if charset != None:
                    if charset == 'utf-8':
                        self.body += part.get_payload()
                    else:
                        self.body += part.get_payload(decode=True).decode(charset, errors="replace")
                else:
                    self.body += part.get_payload(decode=True)
            else:
                # ファイル名があるならそれは添付ファイルなので
                # データを取得する
                self.attach_file_list.append({
                    "name": attach_fname,
                    "data": part.get_payload(decode=True)
                })

    def _get_decoded_header(self, key_name):
        """
        ヘッダーオブジェクトからデコード済の結果を取得する
        """
        ret = ""

        # 該当項目がないkeyは空文字を戻す
        raw_obj = self.email_message.get(key_name)
        if raw_obj is None:
            return ""
        # デコードした結果をunicodeにする
        for fragment, encoding in decode_header(raw_obj):
            if not hasattr(fragment, "decode"):
                ret += fragment
                continue
            # encodeがなければとりあえずUTF-8でデコードする
            if encoding:
                ret += fragment.decode(encoding)
            else:
                ret += fragment.decode("UTF-8")
        return ret

class MailForwarder(object):

    def __init__(self, email_attr):
        """
        初期化
        """
        self.email_attr = email_attr
        self.encode     = 'utf-8'

    def send(self):
        """
        添付ファイルにパスワード付き圧縮を行い転送、さらにパスワード通知メールを送信
        """

        # パスワード生成
        password = self._generate_password()

        # zipデータ生成
        zip_name = datetime.now().strftime('%Y%m%d%H%M%S')
        zip_data = self._generate_zip(zip_name, password)

        # zipデータを送信
        self._forward_with_zip(zip_name, zip_data)

        # パスワードを送信
        self._send_password(zip_name, password)

    def _generate_password(self):
        """
        パスワード生成
        記号、英字、数字からそれぞれ4文字ずつ取ってシャッフル
        """
        password_chars = ''.join(random.sample(string.punctuation, 4)) + 
                         ''.join(random.sample(string.ascii_letters, 4)) + 
                         ''.join(random.sample(string.digits, 4))

        return ''.join(random.sample(password_chars, len(password_chars)))

    def _generate_zip(self, zip_name, password):
        """
        パスワード付きZipファイルのデータを生成
        """
        tmp_dir  = "/tmp/" + zip_name
        os.mkdir(tmp_dir)

        # 一旦ローカルにファイルを保存
        for attach_file in self.email_attr['attach_files']:
            f = open(tmp_dir + "/" + attach_file['name'], 'wb')
            f.write(attach_file['data'])
            f.flush()
            f.close()

        # パスワード付きzipに
        dst_file_path = "/tmp/%s.zip" % zip_name
        src_file_names = ["%s/%s" % (tmp_dir, name) for name in os.listdir(tmp_dir)]

        pyminizip.compress_multiple(src_file_names, dst_file_path, password, 4)

        # # 生成したzipファイルを読み込み
        r = open(dst_file_path, 'rb')
        zip_data = r.read()
        r.close()

        return zip_data

    def _forward_with_zip(self, zip_name, zip_data):
        """
        パスワード付きZipファイルのデータを生成
        """
        self._send_message(
                self.email_attr['subject'],
                self.email_attr["body"].encode(self.encode),
                zip_name,
                zip_data
                )
        return

    def _send_password(self, zip_name, password):
        """
        zipファイルのパスワードを送信
        """

        subject = self.email_attr['subject']

        message = """
先ほどお送りしたファイルのパスワードのお知らせです。

[件名] {}
[ファイル名] {}.zip
[パスワード] {}
        """.format(subject, zip_name, password)

        self._send_message(
                '[password]%s' % subject,
                message,
                None,
                None
                )
        return

    def _send_message(self, subject, message, attach_name, attach_data):
        """
        メール送信
        """

        msg = MIMEMultipart()

        # ヘッダ
        msg['Subject'] = subject
        msg['From']    = self.email_attr['from']
        msg['To']      = self.email_attr['reply_to']
        msg['Bcc']     = self.email_attr['from']

        # 本文
        body = MIMEText(message, 'plain', self.encode)
        msg.attach(body)

        # 添付ファイル
        if attach_data:
            file_name = "%s.zip" % attach_name
            attachment = MIMEBase('application', 'zip')
            attachment.set_param('name', file_name)
            attachment.set_payload(attach_data)
            encoders.encode_base64(attachment)
            attachment.add_header("Content-Dispositon", "attachment", filename=file_name)
            msg.attach(attachment)

        # 送信
        smtp_server   = self._get_decrypted_environ("SMTP_SERVER")
        smtp_port     = self._get_decrypted_environ("SMTP_PORT")
        smtp_user     = self._get_decrypted_environ("SMTP_USER")
        smtp_password = self._get_decrypted_environ("SMTP_PASSWORD")
        smtp = smtplib.SMTP(smtp_server, smtp_port)
        smtp.ehlo()
        smtp.starttls()
        smtp.ehlo()
        smtp.login(smtp_user, smtp_password)
        smtp.send_message(msg)
        smtp.quit()
        print("Successfully sent email")

        return

    def _get_decrypted_environ(self, key):
        """
        暗号化された環境変数を復号化
        """

        client = boto3.client('kms')
        encrypted_data = os.environ[key]
        return client.decrypt(CiphertextBlob=base64.b64decode(encrypted_data))['Plaintext'].decode('utf-8')

def lambda_handler(event, context):

    # イベントからバケット名、キー名を取得
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])

    try:
        # S3からファイルの中身を読み込む
        s3_object = s3.get_object(Bucket=bucket, Key=key)
        email_string = s3_object['Body'].read().decode('utf-8')

        # メールを解析
        parser = MailParser(email_string)

        # メール転送
        forwarder = MailForwarder(parser.get_attr_data())
        forwarder.send()
        return

    except Exception as e:
        print(e)
        raise e

pyminizip

パスワード付きzipは標準のライブラリじゃできないっぽい。
ということで、ここだけpyminizipという外部ライブラリに頼りました。
ただこれ、インストール時にビルドしてバイナリ作る系のライブラリだったので、Lambdaで動かすためにローカルでAmazonLinuxのDockerコンテナ立ててバイナリを作りました。何かほかにいい方法あるのかな。。

AWS SAM

ちなみに、これはAWS SAMを使ってローカルテストしてみました。
SMTPサーバーの情報を直書きして試してたところまでは良かったけど、それを環境変数に移すとうまく動かなくて挫折しました。修正はされてるけどリリースされてないっぽい。

導入方法

せっかくなので公開してみます。コードネームzaru
かなり設定方法が泥臭いままですがご容赦ください。。
https://github.com/Kta-M/zaru

自分の環境(Mac, Thunderbird)でしか試してないので、メーラーやその他環境によってはうまくいかないかも?自己責任でお願いします。

SES

SESはまだ東京リージョンで使えないので、オレゴンリージョン(us-west-2)で構築します。

ドメイン検証

まずはSESに向けてメールが送れるように、ドメインの検証を行います。
やり方はいろいろなので、このあたりは割愛。
たとえばこのあたりとか参考になるかも -> RailsでAmazon SES・Route53を用いてドメインメールを送信する

Rule作成

ドメインの検証ができたら、Ruleを作成します。

メニュー右側のRule Setsから、View Active Rule Setをクリック。
ses_rule_01.png

Create Ruleをクリック。
ses_rule_02.png

受信するメールアドレスを登録。検証を行なったドメインのメールアドレスを入力して、Add Recipientをクリック。
ses_rule_03.png

メール受信時のアクションを登録。
アクションタイプとしてS3を選択し、受信したメールデータを保存するバケットを指定します。このとき、Create S3 bucketでバケットを作成してあげると、必要なバケットポリシーが自動で登録されて便利。
SESからバケットへのファイルアップロードを許可するポリシーが設定されます。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AllowSESPuts-XXXXXXXXXXXX",
            "Effect": "Allow",
            "Principal": {
                "Service": "ses.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::<ses-bucket-name>/*",
            "Condition": {
                "StringEquals": {
                    "aws:Referer": "XXXXXXXXXXXX"
                }
            }
        }
    ]
}

また、バケットに保存されたメールデータは、貯めておいても仕方ないので、ライフサイクルを設定して一定期間経過後削除されるようにしておくといいかも。
ses_rule_04.png

ルールに名前を付けます。あとはデフォルトで。
ses_rule_05.png

登録内容を確認して、登録!
ses_rule_06.png

Lambda

デプロイ

SESと同じくオレゴンリージョンにデプロイします。
CloudFormationを利用するので、データをアップロードするS3バケットを作っておいてください。

# git clone git@github.com:Kta-M/zaru.git
# cd zaru
# aws cloudformation package --template-file template.yaml --s3-bucket <cfn-bucket-name> --output-template-file packaged.yaml
# aws cloudformation deploy --template-file packaged.yaml --stack-name zaru-stack --capabilities CAPABILITY_IAM --region us-west-2

Lambdaのコンソールに行くと、関数が作成されています。
また、この関数の実行に必要なIAMロールも作成されています。
lambda_01.png

トリガー設定

バケットにメールデータが入るのをトリガーにして、Lambdaが動くように設定します。

関数の詳細画面のトリガータブに移動します。
lambda_02.png

トリガーを追加をクリックし、S3のイベントを作成します。
SESからデータが来るバケット、イベントタイプはPutです。それ以外はデフォルト。
バケットはlambda_03.png

暗号化キーを作成

このLambda関数内では、暗号化された環境変数からSMTP関連の情報を取得しています。
その暗号化に使用するキーを作成します。

IAMコンソールから、左下にある暗号化キーをクリックします。
リージョンをオレゴンに変更し、キーを作成してください。
lambda_04.png

設定内容は、任意のエイリアスを設定するだけで、残りはデフォルトでOKです。
lambda_05.png

環境変数数設定

Lambdaに戻って、関数内で使用する環境変数を設定します。
コードタブの下のほうに、環境変数を設定するフォームがあります。
暗号化ヘルパーを有効にするにチェックを入れ、先ほど作成した暗号化キーを指定します。
環境変数は、変数名と値(平文)を入力し、暗号化ボタンを押します。すると、指定した暗号化キーで暗号化してくれます。
設定する環境変数は以下の4つです。

変数名 説明
SMTP_SERVER smtpサーバー smtp.example.com
SMTP_PORT smtpポート 587
SMTP_USER smtpサーバーにログインするユーザー名 test@example.com
SMTP_PASSWORD SMTP_USERのパスワード

lambda_06.png

ロール設定

最後に、このLambda関数を実行するロールに必要な権限を付けます。
– メールデータを保存するS3バケットからデータを取得する権限
– 暗号化キーを使って環境変数を復号する権限

まず、IAMコンソールのポリシーに行き、ポリシーの作成->独自のポリシーを作成で以下の2つのポリシーを作成します。
lambda_07.png

ポリシー:s3-get-object-zaru
<ses-bucket-name>には、SESからメールデータを受け取るバケット名を指定してください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1505586008000",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::<ses-bucket-name>/*"
            ]
        }
    ]
}

ポリシー;kms-decrypt-zaru
<kms-arn>には、暗号化キーのARNを指定してください。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt1448696327000",
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt"
            ],
            "Resource": [
                "<kms-arn>"
            ]
        }
    ]
}

最後に、この2つのポリシーを、Lambda関数実行ロールにアタッチします。
まず、IAMコンソールのロールに行き、ロールを選択し、ポリシーのアタッチからアタッチします。
lambda_08.png

動作確認

これで動くようになったはずです。
ToにSES向けに設定したメールアドレス、Reply-Toに相手のメールアドレスを設定し、適当なファイルを添付して送ってみてください。どうでしょう?

まとめ

どんとこいzip添付!

続きを読む

boto3を使ってS3のサーバ側暗号化を試す

PythonのスクリプトでS3にオブジェクトをアップロードする際に
サーバ側で暗号化をする方法を調べました。

S3とオブジェクト暗号化

参考: 大きく分けて2種類あります。

Client-Side Encryption(CSE)

クライアント側がデータを暗号化してからサーバに送信して保存する。
-> サーバ(S3)との送受信の間もデータが保護されている。

Server-Side Encryption(SSE)

クライアントはデータをそのまま送り、サーバが暗号化してから保存する。
-> サーバ(S3)に保管されている間はデータが保護されている。

後者はクライアント側で暗号化についてあれこれ考える手間が少なく
ダウンロード時にも透過的に復号化がされるため、扱いが楽です。
※センシティブな情報はもちろん送受信前に暗号化すべきです

S3とServer-Side Encryption

参考: S3におけるSSEの選択肢にも種類があります。

SSE-S3 SSE-KMS SSE-C
暗号化鍵 S3が裏側で管理している鍵 AWSのKMSサービスで管理している鍵 ユーザが管理している鍵
特徴 すべてAWSが管理するため、手間が少ない。ただし、鍵へのアクセス権限などが操作できず、その鍵はS3へのデータ保存にしか使用できない。 IAMユーザやロールを操作して鍵自体へのアクセス権限を柔軟に変更できる。SSE-S3より小回りが効くものの、逆に設定項目は増える。 使用する鍵や保存場所を選べる。もっとも自由度が高いが、鍵の安全な保管を自分で担保する必要がある。

極力AWS側に役割を任せたいため、今回試したのはSSE-S3とSSE-KMSの2つです。

Management Consoleでの事前確認

ManagementConsoleでオブジェクトをバケットに保存しようとすると
途中で暗号化の選択肢が出てきます。
0.png

“Amazon S3 master-key” はSSE-S3のことです。
選択しても特に選択肢は増えません。

1.png

“AWS KMS master-key”はAWS-KMSを意味します。
選択すると使用する鍵の選択肢が出てきます。

  • “aws/s3″はKMSがS3サービス用に勝手に作成しようとする鍵
  • “test_key”はKMSで作成した鍵(私が事前に作成したもの)
  • Custom KMS ARNを選択するとARNの記述で鍵を指定可能

2.png

試しにSSE-S3で保存してみると、オブジェクトの詳細画面でこのように表示されます。

3.png

この作業をスクリプト(boto3)から行います。

スクリプトの記述

S3のドキュメントにはfor Pythonの記述がないため
他の言語のものやboto3のドキュメントを参考に書いてみました。
実行環境はLambdaのPython3.6ですが、どこでも書き方や挙動はさほど変わらないと思います。

S3やKMSの鍵にアクセスできるように、ロールはいい感じに書き換えてください。

SSE-S3

SSE-S3
import boto3

def lambda_handler(event, context):

    file_name = 'test.txt'

    # 一時的に使える'/tmp'にファイルを作成
    with open('/tmp/' + file_name, 'w') as f:
        f.write('hoge')

    # ファイルのアップロード時、ExtraArgsに暗号化方式を指定
    response = boto3.client('s3').upload_file(
        Filename='/tmp/' + file_name,
        Bucket='nanakenashi-test',
        Key=file_name,
        ExtraArgs={'ServerSideEncryption': 'AES256'})

    return True

アップロードの際に静的な引数が増えるだけで非常に単純です。
なお、現時点(2017/09/16)で暗号化方式は AES256 のみのようです。
実際に保存されたオブジェクトは、先程と同じかたちになっているのがわかります。

4.png

SSE-KMS

S3用のデフォルト鍵を使用する場合

前述のスクリプトの ExtraArgs のみを書き換えます。

SSE-KMS(S3用デフォルト鍵を使用)
ExtraArgs={
    'ServerSideEncryption': 'aws:kms',
    }

保存されたオブジェクトはS3用のデフォルト鍵で暗号化されています。
(この鍵はこのタイミングで作成されたためSSE-S3用の鍵とは別物だと思います)

5.png

ただしこの鍵は設定が変更できないため、SSE-S3と大差がありません。

KMSで作成した鍵を使用する場合

SSE-KMS(KMSで作成した鍵を使用)
# 使用する鍵のIDを追加
ExtraArgs={
    'ServerSideEncryption': 'aws:kms',
    'SSEKMSKeyId': 'ea41458h-0c2o-496g-b92e-67441d771282'
    }

事前に作成したキーで暗号化されていることがわかります。

6.png

補足

SSE-S3SSE-KMSいわく

サーバー側の暗号化では、オブジェクトデータのみが暗号化されます。
オブジェクトメタデータは暗号化されません。

また

バケットに保存するすべてのオブジェクトに対してサーバー側の暗号化を必要とする場合は
バケットポリシーを使用できます。

(難しい日本語ですが…)
つまり、暗号化されていないオブジェクトの保存を禁止することができるため
このバケットポリシーを設定しておけばセキュアな状態を保持しやすくなります。

まとめ

S3でのデータ保管をよりセキュアにするために
サーバ側暗号化対応の選択肢をいくつか試してみました。

ちなみにSSE-Cについてはこのあたりを参考にすれば利用できそうです。

続きを読む

boto3(AWS SDK for python)を使って、ローカルファイルをS3にアップロードする方法

はじめに

boto2を使って、ローカルストレージにあるファイルをアップロードするQiita記事はよく見かけるが、
boto3を使ったものがなかなか見つからなかったので、公式サイトを参考に実装したものを貼っておく。

開発環境

Python 3.6.1
pip 9.0.1

手順

  • パッケージインストール
  • Configファイル設定
  • 実装 (pythonソースコード)

パッケージインストール

boto3(AWS SDK for python)

$ pip install boto3

コマンドラインよりAWSのサービスを操作するためのパッケージも併せてインストールしておく。

$ pip install awscli

Configファイル生成

アクセスキー、シークレットキー、リージョン等をConfigファイルに書き込む。
下記コマンドを実行し、後は対話型にデータを入力していけばホームディレクトリ配下にファイルが生成される。

※ boto2では、ソース上よりAWSアクセスキー、シークレットキーを読み込んでいたが、
boto3では、Configファイルから上記2つキーを取得する。

$ aws configure
AWS Access Key ID [None]: xxxxxxxxxxxxxxxxxxxxxxx
AWS Secret Access Key [None]: xxxxxxxxxxxxxxxxxxx
Default region name [None]: xxxxxxxxxxxxxxxxxxxxx
Default output format [None]: xxxxxxxxxxxxxxxxxxx

生成されるファイル(2つ)

~/.aws/credentials
----------------------------------------------
[default]
aws_access_key_id = ACCESS_KEY_ID
aws_secret_access_key = SECRET_ACCESS_KEY
----------------------------------------------
~/.aws/config
----------------------------------------------
[default]
region = [xxxxxxxxxxxxxxxxx]
output = [xxxxxxxxxxxxxxxxx]
----------------------------------------------

ホームディレクトリに生成され、boto3実行時もホームディレクトリの上記ファイルを読みに行っているので、
.awsのディレクトリを移動させるとエラーが発生するので注意。

botocore.exceptions.NoCredentialsError: Unable to locate credentials

もし移動させると、上記のようなエラーが発生する。
まぁ、普通の人は移動させないんだろうけど。。。

実装 (Pythonソースコード)


# -*- coding: utf-8 -*-

import sys
import threading

import boto3

# boto3 (.aws/config)
BUCKET_NAME = 'YOUR_BUCKET_NAME'

class ProgressCheck(object):
    def __init__(self, filename):
        self._filename = filename
        self._size = int(os.path.getsize(filename))
        self._seen_so_far = 0
        self._lock = threading.Lock()
    def __call__(self, bytes_amount):
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                    "r%s / %s (%.2f%%)" % (
                        self._seen_so_far, self._size,
                        percentage))
            sys.stdout.flush()


def UploadToS3():
    # S3Connection
    s3 = boto3.resource('s3')
    s3.Object(BUCKET_NAME, 'OBJECT_KEY (S3)').upload_file('UPLOAD_FILE_PATH (lOCAL)')

UploadToS3()  

OBJECT_KEY (S3) : S3でのオブジェクトキーを設定する。
UPLOAD_FILE_PATH (lOCAL) : アップロードするローカルファイルのパスを設定する。

終わりに

気が向いたら、boto2でのアップロード方法との比較も書きたいと思います。
最後までお読みいただき、ありがとうございました。

何か誤り、アップデート事項ございましたらご指摘お願いいたします。

続きを読む

Three.jsのかっこいいサンプルとAWSを連携させてみた

かっこいい画面が作りたかったので、やってみた備忘録。

やりたいこと

Three.jsがとてもかっこいいんですが、値の設定が静的なので、何とか動的に値を変えつつ画面に反映できないかな、、、と。このサンプルを使ってみます。

スクリーンショット 2017-05-14 19.08.58.png

イメージとしては、この画像で異常が検出されたところを赤くアラート表示したりできないかな、と。

実際の流れの整理

Githubからサンプルを持って来て、必要なものだけ抜き出します。

異常があったら表示する、という感じにしたいので、以前投稿したロジックを使います。

AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加)

最新値を取得する中で、異常があったら対象の項目に変化をつけたいと思います。

とりあえずサンプルをAWS上で動かす

AWS上で動作させたいので、とりあえずこのサンプルをAWS上に乗せる必要があります。

  1. 必要なファイルの抽出(抽出結果などはこちらに。https://github.com/kojiisd/aws-threejs
  2. 適当な名前のバケットをS3に作成(今回は「aws-three」という名前にしました)
  3. ファイルのアップロード
  4. パーミッションの適用

パーミッションの適用は、とりあえず以下のコマンドで一括でつけました。実際の運用を考えると、もっと慎重にならないといけないところだとは思います。こちらのサイトを参考にさせてもらいました。

$ aws s3 ls --recursive s3://aws-three/ | awk '{print $4}' | xargs -I{} aws s3api put-object-acl --acl public-read --bucket aws-three --key "{}"

これで一旦サンプルがS3上で動くようになりました。

データの中身を理解する

実際にソースを読んで見ると、一つ一つのオブジェクトをどの様に格納しているかがわかります。

var table = [
    "H", "Hydrogen", "1.00794", 1, 1,
    "He", "Helium", "4.002602", 18, 1,
    "Li", "Lithium", "6.941", 1, 2,
    "Be", "Beryllium", "9.012182", 2, 2,
    :
    :

5つの要素で1セットとしてオブジェクトを表示している様です。元の実装がいいかどうかは置いておいて(^^;とりあえずこれを動的に変更できるようにします。

DBからの値をtableに反映する

データの構造自体はとても単純なので、1レコード5カラムのデータを持つテーブルを作成すれば、後々データの内容全てをDBから持ってくることも可能になりそうです。

DynamoDBで下記の様な単純なテーブルを作成します。もちろんAWS Consoleから作成可能ですが、Localで同じスキーマを作成したい場合はこんな感じで定義を流し込めばOK。

var params = {
    TableName: "test-three",
    KeySchema: [
        {
            AttributeName: "id",
            KeyType: "HASH"
        },
        {
            AttributeName: "timestamp",
            KeyType: "RANGE"
        }
    ],
    AttributeDefinitions: [
        {
            AttributeName: "id",
            AttributeType: "S"
        },
        {
            AttributeName: "timestamp",
            AttributeType: "S"
        }
    ],
    ProvisionedThroughput: {
        ReadCapacityUnits: 1,
        WriteCapacityUnits: 1
    }
};

dynamodb.createTable(params, function(err, data) {
    if (err) ppJson(err);
    else ppJson(data);
});

はい、無事出来ました。

スクリーンショット 2017-07-09 16.12.16.png

データの準備と投入

次にデータを流し込みます。こんな感じでpythonスクリプトでサクッと。実行する前に、クライアントのAWS認証設定が正しいことを確認してください。

data-insert.py
args = sys.argv

if len(args) < 4:
    print "Usage: python data-insert.py <FileName> <Region> <Table>"
    sys.exit()

dynamodb = boto3.resource('dynamodb', region_name=args[2])
table = dynamodb.Table(args[3])

if __name__ == "__main__":
    print "Data insert start."
    target = pandas.read_csv(args[1])
    for rowIndex, row in target.iterrows():
        itemDict = {}
        for col in target:
            if row[col] != None and type(row[col]) == float and math.isnan(float(row[col])) and row[col] != float('nan'):
                continue
            elif row[col] == float('inf') or row[col] == float('-inf'):
                continue
            elif type(row[col]) == float:
                itemDict[col] = Decimal(str(row[col]))
            else:
                itemDict[col] = row[col]
        print itemDict

        table.put_item(Item=itemDict)

    print "Data insert finish."

とりあえずこんなデータを用意しました。先ほどのスクリプトで投入します。

id,score,timestamp
"H",0,"2017-07-23T16:00:00"
"H",0,"2017-07-23T16:01:00"
"H",0,"2017-07-23T16:02:00"
"H",0,"2017-07-23T16:03:00"
"H",0,"2017-07-23T16:04:00"
"H",1,"2017-07-23T16:05:00"
"H",0,"2017-07-23T16:06:00"

無事投入できました。

$ python data-insert.py sample-data.csv us-east-1 test-three
Data insert start.
{'timestamp': '2017-07-23T16:00:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:01:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:02:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:03:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:04:00', 'score': 0, 'id': 'H'}
{'timestamp': '2017-07-23T16:05:00', 'score': 1, 'id': 'H'}
{'timestamp': '2017-07-23T16:06:00', 'score': 0, 'id': 'H'}
Data insert finish.

スクリーンショット 2017-07-23 17.33.30.png

API Gatewayの設定

絞り込み結果を得るためのLambdaを実行するRESTの口を設けます。API Gatewayを設定してPOST通信でデータが来た場合に該当のLambdaを実行するようにします。

スクリーンショット 2017-07-23 18.13.07_dec.png

画面からDynamoDBのデータを取得する

定期的に画面からDynamoDBのデータを取得します。今回はCognitoによるセキュアな通信は実装しません。他のページで色々と実装されている方がいるので、そちらを参考にして見てください。

処理の流れとしては、以下のような感じ。(S3からの取得処理が定期的に実行される)

S3 → API Gateway → Lambda → DynamoDB

API GatewayでSDK生成

API Gatewayの設定を以下にし、SDKを生成します。

  1. HTTPメソッドはPOSTを指定
  2. とりあえず認証などは今回はかけない

で、作成されたSDKを、本家のページを参考に埋め込みます。

API Gateway で生成した JavaScript SDK を使用する

以下のコードをScriptタグ部分に貼り付けました。

        <script type="text/javascript" src="../extralib/axios/dist/axios.standalone.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/rollups/hmac-sha256.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/rollups/sha256.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/components/hmac.js"></script>
        <script type="text/javascript" src="../extralib/CryptoJS/components/enc-base64.js"></script>
        <script type="text/javascript" src="../extralib/url-template/url-template.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/sigV4Client.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/apiGatewayClient.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/simpleHttpClient.js"></script>
        <script type="text/javascript" src="../extralib/apiGatewayCore/utils.js"></script>
        <script type="text/javascript" src="../extralib/apigClient.js"></script>

実際にAPI Gatewayにアクセスするスクリプトはこんな感じになります。

SDKを用いてAPI Gatewayにブラウザからアクセスしてみる

var apigClient = apigClientFactory.newClient();

var params = {
    // This is where any modeled request parameters should be added.
    // The key is the parameter name, as it is defined in the API in API Gateway.
    "Content-Type": "application/x-www-form-urlencoded"
};

var body = {

    "label_id": "id",
    "label_range": "timestamp",
    "id": [
    "H"
    ],
    "aggregator": "latest",
    "time_from": "2017-07-23T16:00:00.000",
    "time_to": "2017-07-23T16:06:00.000",
    "params": {
    "range": "timestamp"
    }
};


  var additionalParams = {
    // If there are any unmodeled query parameters or headers that must be
    //   sent with the request, add them here.
    headers: {
    },
    queryParams: {
    }
  };

  apigClient.rootPost(params, body, additionalParams)
      .then(function(result){
        // Add success callback code here.
        alert("Success");
        alert(JSON.stringify(result));
      }).catch( function(result){
        // Add error callback code here.
      });

パラメータ(body部)には「AWS LambdaでDynamoDBから取得した値に任意の集計をかける(グルーピング処理追加)」で必要になるJSONを渡します。

API GatewayにアクセスするにはCORSの設定が必要なので、AWS Consoleから設定します。この際Resourcesで変更したものはStaging環境に再デプロイをしないと追加で設定した項目は反映されないので注意(ハマった。。。)

「Enable CORS」から設定できます。

スクリーンショット 2017-09-10 10.56.00.png

実際に画面にアクセスすると、サーバから値が取れたことがわかります。(今回はAPI KEYなどはOFFにしていますが、実際に運用するとなったら、もちろん考慮が必要です。)

スクリーンショット 2017-09-10 11.03.04.png

スクリーンショット 2017-09-10 11.03.11_deco.png

無事値が取れました。ここまでくればもう一息です。

取得した値を元に画面に変更を加える

さて、先ほど取得できた値には「score」というものが入っています。この「score」値を元に画面のコンポーネントに変化を加えます。

持ってきたサンプルコードの中で色に影響を与えているのは以下のコードになります。

for ( var i = 0; i < table.length; i += 5 ) {

    var element = document.createElement( 'div' );
    element.className = 'element';
    element.style.backgroundColor = 'rgba(0,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')';

全てのコンポーネントに対してrgbaで値を与えているので、ここを変更します。とはいえ、そのまま変更しようとしてもinit()メソッドを実行すると追加でコンポーネントが描画されてしまうので、描画後に変更が可能なように以下のようなidの埋め込みを行います。

var element = document.createElement( 'div' );
element.className = 'element';
element.id = table[ i ];                        // ここが追加したところ
element.style.backgroundColor = 'rgba(0,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')';

これらの準備を前提として、処理の流れは以下とします。

  1. DynamoDBから取ってきたjson値をオブジェクトに変換。
  2. 一旦オブジェクト配列としてidとscore値を持たせるようにする。
  3. オブジェクト配列でループ処理を実装し、その中でscore値が「0」ではないidに対して、色の変更を実施する。

今回の準備を踏まえると、上記で対象となるのは「id: H」となります。DynamoDBからデータを取得するために、rootPost呼び出し後のコールバック処理を以下のように変更します。今回色変更の処理をシュッと実装するために、jQueryを読み込ませています。

apigClient.rootPost(params, body, additionalParams)
    .then(function(result){
        var resultObjArray= new Array();
        // Add success callback code here.
        var resultJson = JSON.parse(result.data);
        for (var index = 0; index < resultJson.length; index++) {
            var resultObj = new Object();
            resultObj.id = resultJson[index].id;
            resultObj.score = resultJson[index].score
            resultObjArray[index] = resultObj;
        }

        for (var index = 0; index < resultObjArray.length; index++) {
            var resultObj = resultObjArray[index];
            if (resultObj.score != 0) {
                $('#' + resultObj.id).css('background-color', 'rgba(255,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')')
            }
        }

    }).catch( function(result){
        // Add error callback code here.
        alert("Failed");
        alert(JSON.stringify(result));
    });

DynamoDBに格納されている値(検索期間の最新)を「1」に変更して画面を更新すると、無事赤くなりました。

スクリーンショット 2017-09-10 18.58.41.png

実行タイミングを任意にするためにボタンからのクリックイベントとして実装する

このままでもいいのですが、今のままだと最初のアニメーションが終わる前に色が変わるので、色が変わった感がありません。ですので一つボタンを追加してクリックしたら画面に値を反映する、という手法を取ろうと思います。

先ほどのrootPostを呼び出すプログラムを、画面に配置したボタンのイベントとして処理させます。

<button id="getData">Get Data from DynamoDB</button>

画面にはこんな感じでボタンを配置し、jQueryでイベントを登録します。

$(document).ready(function(){
    $("#getData").click(function() {
        apigClient.rootPost(params, body, additionalParams)
        .then(function(result){
            var resultObjArray= new Array();
            // Add success callback code here.
            var resultJson = JSON.parse(result.data);
            for (var index = 0; index < resultJson.length; index++) {
                var resultObj = new Object();
                resultObj.id = resultJson[index].id;
                resultObj.score = resultJson[index].score
                resultObjArray[index] = resultObj;
            }

            for (var index = 0; index < resultObjArray.length; index++) {
                var resultObj = resultObjArray[index];
                if (resultObj.score != 0) {
                    $('#' + resultObj.id).css('background-color', 'rgba(255,127,127,' + ( Math.random() * 0.5 + 0.25 ) + ')')
                }
            }

        }).catch( function(result){
            // Add error callback code here.
            alert("Failed");
            alert(JSON.stringify(result));
        });
    });
});

これで無事画面から操作することが可能になりました。ボタンを押すといい感じに「H」の要素が赤く光っています。

スクリーンショット 2017-09-10 23.08.28.png

まとめ

もともとかっこいいと思っていたThree.jsの画面をカスタマイズして監視画面(IoTとかのデバイス監視画面)のような機能にできないものか、と実装してみましたが、なんとかここまでたどり着けました。

DynamoDBからの取得間隔やIDの指定方法など、まだまだハードコーディングな部分はありますが、土台は出来上がったのであとはちょっとのカスタマイズで実際に使えそうなところまではイメージが持てました。元々の実装の仕組みも理解できたので、条件が合えば文言を変えるなども実現できそうです。

やっぱり業務で使うものもかっこいい画面でないとね、と思うこの頃です。
(GithubのコードはAPI GatewayのSDKさえ埋め込めば動くような作りにしています)

https://github.com/kojiisd/aws-threejs

続きを読む