AWSとAzureとGCPを比較してみる – FaaS編

FaaSについてAWSとAzureとGCPを比較してみました。

注)

1. FaaS比較表

AWS Azure GCP
Lambda Functions Cloud Functions***
言語 Python,
node.js,
java,
C#,
go
ランタイムバージョン1.X
C#,
JavaScript,
F#,
Python*,
PHP*,
TypeScript*,
バッチ (.cmd、.bat)*,
Bash*,
PowerShell*

ランタイムバージョン2.X
C#**,
JavaScript**,
Java**
node.js***
最大実行時間 5分 10分 (従量課金プラン)
無制限 (App Serviceプラン)
9分
直接HTTPアクセスを受け付けるか 受け付けない(API Gatewayと連携必要) 受け付ける 受け付ける
トリガー Amazon S3,
Amazon DynamoDB,
Amazon Kinesis Data Streams,
Amazon Simple Notification Service,
Amazon Simple Email Service,
Amazon Cognito,
AWS CloudFormation,
Amazon CloudWatch Logs,
Amazon CloudWatch Events,
AWS CodeCommit,
スケジュールされたイベント (Amazon CloudWatch Events を使用),
AWS Config,
Amazon Alexa,
Amazon Lex,
Amazon API Gateway,
AWS IoT ボタン,
Amazon CloudFront,
Amazon Kinesis Data
Blob Storage,
Cosmos DB,
Event Hubs,
HTTP,
Microsoft Graph Events(2.Xのみ),
Queue storage,
Service Bus,
Timer,
Webhooks(1.Xのみ)
HTTP,
Cloud Storage,
Cloud Pub/Sub

*試験段階
**プレビュー
***ベータ

2. 対応言語の比較

言語の種類は試験段階とプレビューを含めればAzureが一番多いのですが、正式リリースされたものに限定すればAWSの方が種類が多いです。
一方GCPは機能自体がベータリリースなので、まだこれからといった感じでしょうか。

AzureはBashにも対応しているのが特徴です。運用系のシェルスクリプトをFaaS化すれば、スクリプト用のサーバが不要になりますね。

3. 最大実行時間

最大実行時間はAzureの10分(要host.jsonのfunctionTimeoutプロパティ変更)、GCPの9分に対しAWS Lamdbaは5分と約半分です。実際にAWS Lambdaを利用していると5分の壁を結構感じます。この点は他クラウドが羨ましいですね。
2017年のRe:InventでAWSはFargateというコンテナのサービスをリリースしましたが、このサービスがlambdaが5分以上実行できないことに対するAWSからの回答のように感じます。

4. 直接HTTPアクセスを受け付けるか

AWS lambdaだけ直接HTTPアクセスを受け付けることができません。HTTPアクセスを受け付けるには、API Gatewayと連携する必要がありますが、多機能な分やや設定が面倒な印象です。(但しAPI経由でLambdaを起動することは可能)

まとめ

AWS Lambdaのリリース後、Azure・GCP・Bluemix(現IBM Cloud)は超特急で追従しました。AWS LambdaがIT業界に与えたインパクトはとても大きかったと思います。
現在は「FaaS無ければばクラウドにあらず」といったところでしょうか。

また、AWS GreengrassやAzure IoT Edge**というエッジにデプロイするサービスも出てきています。
将来AWS LambdaがiPhoneやApple Watchにデプロイできるようにならないかなーと妄想中です。

**プレビュー

続きを読む

Glueの使い方的な⑦(Step Functionsでジョブフロー)

Step FunctionsでGlueのジョブフローを作る

Glueの使い方的な③(CLIでジョブ作成)“(以後③と書きます)で書いたように、現在Glueのジョブスケジュール機能は簡易的なものなので、複雑なジョブフロー形成には別のスケジューラーが必要になる場合もあります。
例えばGlueのクローラーとGlueジョブもそれぞれにスケジュール機能があり統合したジョブフローを作ることがGlueだけでは出来ません(例えばクローラーを実行し終わったらジョブを実行するとか)。今回はサーバーレスなジョブフローのサービスであるStep Functionsを使って、クローラーを実行し正常終了したら後続のジョブを実行するというフローを作ってみます。

全体の流れ

  • Glue処理内容
  • StepFunctionsの処理内容
  • 前準備
  • Step FunctionsでStateMachine作成
  • 実行

処理内容

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)で実行したものと同じクローラーとジョブを使います。入力データも出力結果も①と同じです。
今回行うのはGlueクローラー処理が終わったら次のGlueジョブ処理開始というジョブフロー形成です。

あらためて①のクローラーとジョブの処理内容は以下の通りです

クローラーの内容

入力のCSVファイルからスキーマを作成します

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

Step Functionsを使ったジョブフローの内容

図の四角をStep Functionsでは”State”と呼びます。処理の1単位と思ってください。

ジョブフローは以下のような形です。

Stateごとに流れを説明します

  • “Submit Crawler Job”でLambdaを使いGlueクローラーを実行
  • “Wait X Seconds”で指定時間待つ
  • “Get Crawler Job Status”でLambdaを使いGlueクローラーの状態をポーリングして確認
  • “Job Complete?”で状態を判定して結果によって3つに処理が分岐
    • 失敗なら”Job Failed”エラー処理
    • 終了なら”Run Final Glue Job”でLambdaを使い後続のGlueジョブを実行
    • 処理中なら”Add Count”でLambdaを使いカウンタをインクリメント。
      • “Add Count”の後”Chk Count”でカウンタをチェックし3回以上になっていたら”Job Failed Timeout”でタイムアウト処理、3未満なら”Wait X Seconds”に戻りループ処理

スクリーンショット 0030-01-13 21.47.05.png

前準備

①と同じです

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

入力ファイルをS3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

ジョブのPySparkスクリプト

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

入力のCSVデータのスキーマ

クローラーによって作成されるスキーマ

スクリーンショット 0030-01-13 22.01.43.png

StepFunctionsでStateMachine作成

StepFunctionsは一連のジョブフローをJSONで定義しこれを”StateMachine”と呼びます。
StateMachine内の処理の1つ1つの四角をStateと呼びます。処理の1単位です。
このJSONの記述はASL(AmazonStatesLanguages)と呼ばれStateTypeとしてChoice(分岐処理)やWait(待ち)やParallel(並列実行)などがJSONだけで表現出来ます。またTaskというStateTypeからはLambdaやアクティビティ(EC2からStepFunctionsをポーリングする)を定義できます。前述の通り今回はLmabdaを使います。

マネージメントコンソールからいくつかあるテンプレートを元に作ることも出来ますが、カスタムでJSONを一から作ることもできます。

新規StateMachine作成画面
“Author from scrach”で一からJSON作成

スクリーンショット 0030-01-14 10.24.59.png

“Template”を選ぶとASLのStateパターンのいくつかのテンプレが選べます

スクリーンショット 0030-01-14 10.28.14.png

左側の”コード”部分にJSONを書き、右側の”ビジュアルワークフロー”の部分にJSONコードで書いたフローがビジュアライズされます

スクリーンショット 0030-01-14 10.29.50.png

StateMachine

今回のStateMachieのJSONは以下です。
内容は前述の通りです。
※[AWSID]のところは自身のAWSIDと置き換えてください

{
  "Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.",
  "StartAt": "Submit Crawler Job",
  "States": {
    "Submit Crawler Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-cr1",
      "ResultPath": "$.chkcount",
      "Next": "Wait X Seconds",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 120,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
    "Wait X Seconds": {
      "Type": "Wait",
      "SecondsPath": "$.wait_time",
      "Next": "Get Crawler Job Status"
    },
    "Get Crawler Job Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-crcheck",
      "Next": "Job Complete?",
      "InputPath": "$",
      "ResultPath": "$.response",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Job Complete?": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.response",
          "StringEquals": "FAILED",
          "Next": "Job Failed"
        },
        {
          "Variable": "$.response",
          "StringEquals": "READY",
          "Next": "Run Final Glue Job"
        }
      ],
      "Default": "Add Count"
        },
    "Add Count": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-addcount",
      "Next": "Chk Count",
      "InputPath": "$",
      "ResultPath": "$.chkcount",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Chk Count": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.chkcount",
          "NumericGreaterThan": 3,
          "Next": "Job Failed Timeout"
        }],
      "Default": "Wait X Seconds"
    },
    "Job Failed": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED"
    },
        "Job Failed Timeout": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED Because of Timeout"
    },
    "Run Final Glue Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-job1",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    }
  }
}

Lambda

今回使うLambdaは4つです。流れも振り返りながら見ていきます
書き方はいろいろあるし今回はエラーハンドリングも甘いのであくまでも動きのイメージをつかむための参考程度にしてください。最後のGlueジョブの実行についてはジョブの終了判定とかはしてないです。

“Submit Crawler Job”

GlueのAPIを使ってクローラーのStartを行う

glue-test1-cr1
# coding: UTF-8

import sys
import boto3
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_crawler(Name='se2_in0')
    return 1

“Wait X Seconds”

Waitで指定秒数待つ

“Get Crawler Job Status”

GlueのAPIを使ってクローラーのステータスを取得します

glue-test1-crcheck
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.get_crawler(Name='se2_in0')
    response = response['Crawler']['State']
    return response

“Job Complete?”

Choiceで取得したステータスが、”READY”なら正常終了、”FAILED”なら失敗、それ以外は実行中の分岐処理

“Job Failed”

ステータスが失敗なら
FailでStepFunctionsをエラーさせます

“Run Final Glue Job”

ステータスが正常終了なら
GlueのAPIを使ってジョブをStartします

glue-test1-job1
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_job_run(
    JobName='se2_job0')
    return response['JobRunId']

“Add Count”

クローラーがまだ実行中なら
カウンタにインクリメントします

glue-test1-addcount
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    chkcount = event["chkcount"]
    chkcount = chkcount + 1

    return chkcount

“Chk Count”

choiceでカウンタが3未満か3以上かをチェックします

“Job Failed Timeout”

Failでカウンタが3以上だった時のエラー処理

“Wait X Seconds”

3未満の場合はここに戻りループ処理

実行

Step Functionsを実行

作成したStateMachineを選び”新しい実行”をクリック

スクリーンショット 0030-01-14 10.54.54.png

JSONに引数を入れて”実行の開始”をクリック
今回はJSON内で使う変数で”wait_time”を60秒で待ちの時間として入力しています

スクリーンショット 0030-01-14 10.55.52.png

実行状況

スクリーンショット 0030-01-14 10.59.44.png

CloudWatchイベントでスケジュール

あとは上記で作成したStateMachineをCloudWatchイベントでCRON指定すれば定期的実行されるジョブフローの完成です。This is Serverless!

スクリーンショット 0030-01-13 22.34.00.png

その他

今回はクローラー実行後にジョブ実行というシンプルなフローでしたが、Step Functionsは並列度を替えたり引数の受け渡しをしたり、さらにLambdaでロジックを書くことができるので自由度高く複雑なフローの作成が行えます。Glueとの相性はいいのではないでしょうか?

JSON部分も30分もあれば学習完了というカジュアルさがありLambdaを使ってAPI操作で様々なAWSの処理を繋げるのにはとてもいい印象です。

かなりシンプルな処理だったのですがコードがやや多い印象で、より複雑な処理になると結構大きいJSONになりそうで、JSONなのでコメント書けないとか少し大変な部分が出て来るのかもしれません。

バージョン管理を考えるとCliでの処理で運用したほうが良さそうですが、こういったサービスはGUIでの良さもあるのでどちらに比重を置いた運用がいいかは考慮が必要かもです

本文中で使ったカウンタのステート情報はDynamoDBなどに入れた方が良いかもです。

マイクロサービス化しやすいので、極力本来の処理のロジックをLambda側にやらせてそれ以外のフロー処理(分岐とかカウンタインクリメントとか)をJSONで書くのがいいと思います。今回カウンタはLambdaでやってしまいましたが。

ログはCloudWatchLogsに出ます

To Be Continue

TODO

参考

StepFunctions BlackBelt資料
https://www.slideshare.net/AmazonWebServicesJapan/20170726-black-beltstepfunctions-78267693

続きを読む

Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)

パーティション分割csv->パーティション分割parquet

ジョブの内容

※”Glueの使い方①(GUIでジョブ実行)”(以後①とだけ書きます)と同様のcsvデータを使います

“パーティション分割されたcsvデータを同じパーティションで別の場所にparquetで出力する”

ジョブ名

se2_job4

クローラー名

se2_in1
se2_out3

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

内容としては①と同じデータで、year,month,day,hourのパーティションごとに分けたcsvファイルを配置します。
year,month,day,hourのカラムは削除しています。

元となる①のデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

今回使う入力データ(19件)
year,month,day,hourのカラムは削除しています

$ ls
cvlog_2017111112.csv    cvlog_2017120101.csv    cvlog_2017121621.csv
cvlog_2017111414.csv    cvlog_2017120218.csv    cvlog_2017121714.csv
cvlog_2017112114.csv    cvlog_2017120904.csv    cvlog_2017121718.csv
cvlog_2017112915.csv    cvlog_2017121412.csv    cvlog_2017121908.csv
cvlog_2017113014.csv    cvlog_2017121414.csv    cvlog_2017121914.csv
cvlog_2017113020.csv    cvlog_2017121511.csv    cvlog_2017122915.csv
$ cat *
deviceid,uuid,appid,country
iphone,11121,001,JP
deviceid,uuid,appid,country
iphone,11123,009,FR
deviceid,uuid,appid,country
iphone,11119,007,AUS
deviceid,uuid,appid,country
other,11110,005,JP
iphone,11125,005,JP
deviceid,uuid,appid,country
iphone,11129,007,AUS
deviceid,uuid,appid,country
android,11122,001,FR
deviceid,uuid,appid,country
pc,11118,001,FR
deviceid,uuid,appid,country
pc,11117,009,FR
deviceid,uuid,appid,country
iphone,11128,009,FR
deviceid,uuid,appid,country
iphone,11111,001,JP
deviceid,uuid,appid,country
android,11112,001,FR
deviceid,uuid,appid,country
iphone,11116,001,JP
deviceid,uuid,appid,country
iphone,11113,009,FR
deviceid,uuid,appid,country
iphone,11124,007,AUS
deviceid,uuid,appid,country
iphone,11114,007,AUS
deviceid,uuid,appid,country
iphone,11126,001,JP
deviceid,uuid,appid,country
android,11127,001,FR
deviceid,uuid,appid,country
other,11115,005,JP

データの場所
year,month,day,hourのパーティションごとに分けたcsvファイルを配置しています

# aws s3 ls s3://test-glue00/se2/in1/year=2017/ --recursive
2018-01-04 09:38:13          0 se2/in1/year=2017/
2018-01-04 09:40:12          0 se2/in1/year=2017/month=11/
2018-01-04 09:40:38          0 se2/in1/year=2017/month=11/day=11/
2018-01-04 09:41:23          0 se2/in1/year=2017/month=11/day=11/hour=12/
2018-01-04 10:19:28         48 se2/in1/year=2017/month=11/day=11/hour=12/cvlog_2017111112.csv
2018-01-04 09:40:42          0 se2/in1/year=2017/month=11/day=14/
2018-01-04 09:41:41          0 se2/in1/year=2017/month=11/day=14/hour=14/
2018-01-04 10:19:45         48 se2/in1/year=2017/month=11/day=14/hour=14/cvlog_2017111414.csv
2018-01-04 09:40:47          0 se2/in1/year=2017/month=11/day=21/
2018-01-04 09:41:55          0 se2/in1/year=2017/month=11/day=21/hour=14/
2018-01-04 10:20:01         49 se2/in1/year=2017/month=11/day=21/hour=14/cvlog_2017112114.csv
2018-01-04 09:40:50          0 se2/in1/year=2017/month=11/day=29/
2018-01-04 09:42:09          0 se2/in1/year=2017/month=11/day=29/hour=15/
2018-01-04 10:20:22         67 se2/in1/year=2017/month=11/day=29/hour=15/cvlog_2017112915.csv
2018-01-04 09:41:01          0 se2/in1/year=2017/month=11/day=30/
2018-01-04 09:42:22          0 se2/in1/year=2017/month=11/day=30/hour=14/
2018-01-04 10:20:41         49 se2/in1/year=2017/month=11/day=30/hour=14/cvlog_2017113014.csv
2018-01-04 09:42:40          0 se2/in1/year=2017/month=11/day=30/hour=20/
2018-01-04 10:20:52         49 se2/in1/year=2017/month=11/day=30/hour=20/cvlog_2017113020.csv
2018-01-04 09:40:16          0 se2/in1/year=2017/month=12/
2018-01-04 09:43:11          0 se2/in1/year=2017/month=12/day=1/
2018-01-04 09:45:16          0 se2/in1/year=2017/month=12/day=1/hour=1/
2018-01-04 10:21:19         44 se2/in1/year=2017/month=12/day=1/hour=1/cvlog_2017120101.csv
2018-01-04 09:43:21          0 se2/in1/year=2017/month=12/day=14/
2018-01-04 09:46:50          0 se2/in1/year=2017/month=12/day=14/hour=12/
2018-01-04 10:22:28         48 se2/in1/year=2017/month=12/day=14/hour=12/cvlog_2017121412.csv
2018-01-04 09:47:01          0 se2/in1/year=2017/month=12/day=14/hour=14/
2018-01-04 10:22:51         49 se2/in1/year=2017/month=12/day=14/hour=14/cvlog_2017121414.csv
2018-01-04 09:43:38          0 se2/in1/year=2017/month=12/day=15/
2018-01-04 09:47:11          0 se2/in1/year=2017/month=12/day=15/hour=11/
2018-01-04 10:23:12         48 se2/in1/year=2017/month=12/day=15/hour=11/cvlog_2017121511.csv
2018-01-04 09:43:42          0 se2/in1/year=2017/month=12/day=16/
2018-01-04 09:47:23          0 se2/in1/year=2017/month=12/day=16/hour=21/
2018-01-04 10:23:34         48 se2/in1/year=2017/month=12/day=16/hour=21/cvlog_2017121621.csv
2018-01-04 09:43:45          0 se2/in1/year=2017/month=12/day=17/
2018-01-04 09:47:38          0 se2/in1/year=2017/month=12/day=17/hour=14/
2018-01-04 10:23:54         49 se2/in1/year=2017/month=12/day=17/hour=14/cvlog_2017121714.csv
2018-01-04 09:47:43          0 se2/in1/year=2017/month=12/day=17/hour=18/
2018-01-04 10:24:12         49 se2/in1/year=2017/month=12/day=17/hour=18/cvlog_2017121718.csv
2018-01-04 09:43:49          0 se2/in1/year=2017/month=12/day=19/
2018-01-04 09:48:04          0 se2/in1/year=2017/month=12/day=19/hour=14/
2018-01-04 10:25:07         49 se2/in1/year=2017/month=12/day=19/hour=14/cvlog_2017121914.csv
2018-01-04 09:47:59          0 se2/in1/year=2017/month=12/day=19/hour=8/
2018-01-04 10:24:56         48 se2/in1/year=2017/month=12/day=19/hour=8/cvlog_2017121908.csv
2018-01-04 09:43:15          0 se2/in1/year=2017/month=12/day=2/
2018-01-04 09:45:44          0 se2/in1/year=2017/month=12/day=2/hour=18/
2018-01-04 10:21:41         44 se2/in1/year=2017/month=12/day=2/hour=18/cvlog_2017120218.csv
2018-01-04 09:43:52          0 se2/in1/year=2017/month=12/day=29/
2018-01-04 09:48:17          0 se2/in1/year=2017/month=12/day=29/hour=15/
2018-01-04 10:25:26         47 se2/in1/year=2017/month=12/day=29/hour=15/cvlog_2017122915.csv
2018-01-04 09:43:18          0 se2/in1/year=2017/month=12/day=9/
2018-01-04 09:46:24          0 se2/in1/year=2017/month=12/day=9/hour=4/
2018-01-04 10:22:02         48 se2/in1/year=2017/month=12/day=9/hour=4/cvlog_2017120904.csv

S3のディレクトリ構成

Glueジョブの入力データは”in1″ディレクトリ配下、出力は”out3″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE in1/
                           PRE out0/
                           PRE out1/
                           PRE out2/
                           PRE out3/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

入力データ用に新しくクローラーを作り実行してテーブルを作ります。

出来上がるテーブルの情報は以下です。

スクリーンショット 0030-01-04 10.52.04.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job4ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”パーティション分割されたcsvデータを同じパーティションで別の場所にparquetとして出力する”です。

se2_job4
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションをマッピング対象のカラムとして含めてくれません
入力のパーティションのカラムをマップの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

↓↓↓

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job4_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out3/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
###add

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-04 15.54.04.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/
2018-01-04 15:15:41        926 part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet .
download: s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet to ./part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# ls
part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out3でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-04 15.59.33.png

S3の出力パスを入力
形式の違うデータが混在しているとテーブルが複数できてしまうので、不要なものがあれば、excludeで除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-04 16.00.06.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.19.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-04 16.00.28.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.36.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-04 16.00.49.png

クローラー実行

1つのテーブルとして認識している

スクリーンショット 0030-01-04 16.09.01.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-04 16.09.52.png

Athenaから確認

左メニューからse2_out3のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-04 16.11.18.png

件数も19件で合っている

スクリーンショット 0030-01-04 16.11.01.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばappidというカラムがあるので、アプリごとに集計をするようなケースが多いならappidも含めてパーティション分割する

他にもdeviceidとかでデバイスごとに集計したり
一時的な調査にも役立つかも

出力に”out4″ディレクトリ作成
①と同様のジョブをse2_job5で作成
PySparkに以下3点修正したジョブ作成

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションはカラムとして含めてくれません。
入力のパーティションのカラムをapplyの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['appid','year','month','day','hour']
output='s3://test-glue00/se2/out4/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

appidごとにパーティション別れてます

スクリーンショット 0030-01-04 16.30.35.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-04 16.33.32.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-04 16.33.49.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-04 16.34.10.png

件数も同じく19件

スクリーンショット 0030-01-04 16.34.39.png

その他

todo

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/latest/sql-programming-guide.html

続きを読む

Glueの使い方的な④(ブックマーク)

ブックマークを使って一度処理したデータは処理対象外とする

入力パスの先にデータがあっても既に処理済なデータは除外したい。そんな思いになったことありませんでしょうか?

これを実現する機能がGlueの”ブックマーク”です。

全体の流れ

  • 前提
  • ブックマークの効果を見る
  • ジョブの永続的なブックマーク有効化
  • トリガー側のブックマーク有効無効
  • (おまけの考察)

前提

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)、”Glueの使い方的な③(CLIでジョブ実行)“(以後③と書きます)あたりを読んでいただけるとスムーズです

今回扱うジョブは①と同じ内容です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

③の後ろの方で書いたように、このジョブはGlueのGUIだけで作成し、1つのcsvを1つのparquetにするだけのなので、このジョブを2回実行すると同じ内容の出力が2つ出来てしまいます。

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

*①と同じディレクトリ
Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

ブックマークの効果を見る

①や③でジョブを実行して出力ファイルがout0に既に存在しているのでそれを事前に消しておきます。

想定される流れと結果

1回目の実行で、①と同じくparquetの出力ファイルが1つできる。(あとメタデータ2つ)
こんな感じ

2018-01-02 10:44:04        782 _common_metadata
2018-01-02 10:44:04       1746 _metadata
2018-01-02 10:27:05       2077 part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet

ブックマークを有効化することで、2回目の実行で、既に処理した入力ファイルは処理されない為、新たなparquetファイルは出力されず1つのままになります

やってみる

ジョブは①で作ったのと同じ内容で新たにジョブをse2_job2という名前で作ります
ジョブの内容は以下です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

最初のジョブの状態

実行履歴が1件ありますが気にしないでください。
スクリーンショット 0030-01-02 17.38.20.png

1回目実行

ブックマークを有効にして実行する
“Action”の”Run job”をクリックする

※ちなみにこの画面のActionの”Reset job bookmark”でブックマークに保持された状態をリセットできます

スクリーンショット 0030-01-02 17.39.21.png

Parameter(optional)の画面で”Job bookmark”を”Enable”にして”Run job”をクリックする
今回のみ有効なパラメータとしてジョブを実行します。

スクリーンショット 0030-01-02 17.39.06.png

成功して履歴が1つ増えています
スクリーンショット 0030-01-02 17.49.46.png

S3にparquetファイル1つとメタデータ2つがほぼ同じ時刻で作成されています。

スクリーンショット 0030-01-02 17.50.06.png

続いて同じ手順で2回目を実行します。

実行後のS3はメタデータは更新されている(?)
ただ、肝心のparquetファイルは1つのみで、1度処理した入力データを対象外としていることがわかる
※メタデータの内容は後半で確認してみます

スクリーンショット 0030-01-02 17.55.50.png

このようにGlueのブックマークを有効にしておくと、同じ場所にあるソースデータの中でまだ処理していないデータだけを処理対象とすることができます。

ジョブ側の永続的なブックマーク有効化

さっきまではジョブ実行時の1度だけのブックマーク有効でした
今回は永続的なブックマーク有効化をします。

対象ジョブにチェックを入れ、”Action”をクリックし、”Edit job”をクリックする

スクリーンショット 0030-01-02 17.53.30.png

この画面が出て一見ブックマークの設定箇所がなさそうだが、実は下にスクロールできる

スクリーンショット 0030-01-02 17.53.49.png

あった

スクリーンショット 0030-01-02 17.54.07.png

“Enable”に変更して”Save”
するとエラーが、、”temporary directory”と”IAM role”をこのタイミングでも入れないといけなようです。
それぞれ入力後”Save”をクリックします。

スクリーンショット 0030-01-02 17.54.20.png

画面の右端にあるように”Job bookmark”が”Enable”になっているのがわかります。

スクリーンショット 0030-01-02 18.11.49.png

トリガー側のブックマーク有効無効

ブックマークの有効無効確認

対象のトリガーのse2_trigger2の部分をクリック

スクリーンショット 0030-01-02 18.18.50.png

以下の画面になり、Parametersのところに”–job-bookmark-option:job-bookmark-enable”とあればブックマークが有効になっている

スクリーンショット 0030-01-02 18.19.04.png

ブックマーク有効無効変更

対象のトリガーse2_trigger2にチェックを入れ、”Action”をクリックし”Edit trigger”をクリックする

スクリーンショット 0030-01-02 18.16.45.png

“Next”をクリックする

スクリーンショット 0030-01-02 18.16.59.png

画面下の部分に”Job bookmark”がありDisable、Enable、Pauseの3つの状態が選べる
これを選びNextと進めればよさそうだが、それだけだとダメである

スクリーンショット 0030-01-02 18.17.16.png

画面下部にあるKeyとValueの箇所の”job-bookmark-enable”を事前に消しておく必要がある。右側の✖をクリックすることで消せます。消した後に”Job bookmark”をDisableにして”Next”をクリックし、次にサマリが出るので問題なければ”Finish”をクリックする

スクリーンショット 0030-01-02 18.17.39.png

確認すると”–job-bookmark-option:job-bookmark-disable”になっていて無効に変わったことがわかる

スクリーンショット 0030-01-02 18.33.18.png

(ブックマーク有効にしてジョブ実行してもメタデータが更新された件の考察)

結果的にはparquetの仕様っぽいので気にする必要もなさそうです。

メタを詳細に見るためにparquet-toolsを入れます。
このツール相変わらず普通にビルドできない・・

ここは弊社の担々麺デカの力を借りて無事入りました。(+1タンタンメン)
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

とは言え改善してるんじゃと思い、ちょっと脱線しますが

【parquet-toolsインストール2018年版】

github
また(?)リポジトリ変わってます・・
https://github.com/apache/parquet-mr

最新のv1.8.0、v1.8.1はpom.xmlをゴニョゴニョしないとダメだ
Issueは多分こちら
https://issues.apache.org/jira/browse/PARQUET-1129

v1.7はなぜかなく、結果的にv1.6系最後(?)の1.6.0rc7だとすんなりビルドできました。

jdkインストール

yum -y install java-1.7.0-openjdk-devel

mavenインストール

wget http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar xvfz apache-maven-3.3.9-bin.tar.gz 
./apache-maven-3.3.9/bin/mvn -version

.bash_profileとかでパス通す

PATH=$PATH:$HOME/bin:/root/apache-maven-3.3.9/bin

parquet-toolsインストール

git clone https://github.com/apache/parquet-mr.git
cd parquet-mr
git checkout ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7
cd parquet-tools/
mvn clean package -Plocal 

実行

aws s3 cp s3://test-glue00/se2/out0/ .data/ --recursive

java -jar target/parquet-tools-1.6.0rc7.jar head -n 1 data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
deviceid = iphone
uuid = 11111
appid = 1
country = JP
year = 2017
month = 12
day = 14
hour = 12

parquet-toolsでメタ情報も見れます。
ブックマークを使ったのにメタデータは更新があったので、更新前と後のメタの情報を比較してみます。

メタ情報表示

# java -jar target/parquet-tools-1.6.0rc7.jar meta data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
file:        file:/root/parquet/parquet-mr/parquet-tools/data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"deviceid","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"long","nullable":true,"metadata":{}},{"name":"appid","type":"long","nullable":true,"metadata":{}},{"name":"country","type":"string","nullable":true,"metadata":{}},{"name":"year","type":"long","nullable":true,"metadata":{}},{"name":"month","type":"long","nullable":true,"metadata":{}},{"name":"day","type":"long","nullable":true,"metadata":{}},{"name":"hour","type":"long","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
deviceid:    OPTIONAL BINARY O:UTF8 R:0 D:1
uuid:        OPTIONAL INT64 R:0 D:1
appid:       OPTIONAL INT64 R:0 D:1
country:     OPTIONAL BINARY O:UTF8 R:0 D:1
year:        OPTIONAL INT64 R:0 D:1
month:       OPTIONAL INT64 R:0 D:1
day:         OPTIONAL INT64 R:0 D:1
hour:        OPTIONAL INT64 R:0 D:1

row group 1: RC:19 TS:952 OFFSET:4 
--------------------------------------------------------------------------------
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 ENC:PLAIN,BIT_PACKED,RLE
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE

更新前のメタ情報:/tmp/data1/meta.log
更新後のメタ情報:/tmp/data2/meta.log
diffは見切れちゃってますが、差分は以下の並びが違うということでした。内容は同じです。
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
もう1回ジョブを実行した場合、上記の並びは同じでしたがタイムスタンプは最新になってました。
そういうもののようです;

# sdiff -s /tmp/data1/meta.log /tmp/data2/meta.log 
file:        file:/root/parquet/parquet-mr/parquet-tools/data | file:        file:/root/parquet/parquet-mr/parquet-tools/data
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E | deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 | uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19  | appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19  | country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E | year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E | month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 | day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 | hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19

その他

  • Glueで自動生成されるPySparkコードに以下のようなコンテキストオブジェクトがあります。これにはブックマークとしての意味もあり、それぞれの実行時にソース、変換、およびシンクの状態をブックマークの状態のキーとして使用します。状態はタイムスタンプとして記録し維持します。
    なのでブックマークを使用していない場合は、この変数を指定しなくても問題はありません。
    transformation_ctx = “datasource0”
    transformation_ctx = “applymapping1”
    transformation_ctx = “datasink4”

  • S3の結果整合性への対処
    ジョブ開始前に、以前のデータと不整合があるデータをジョブの対象とする(整合なデータは除外リストとして維持する)
    状態としてサイズも持っているということかもしれません。

  • 例えばあるファイルは処理対象か対象ではないのか?と言った詳細なブックマークの状態を見ることは現在はできません。

To Be Continue

todo

参考

Bookmarkの公式ページ
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-continuations.html

re:Invent資料。34ページあたりにBookMarkの細かい情報あり
https://www.slideshare.net/AmazonWebServices/abd315serverless-etl-with-aws-glue

parquet-toolsインストール
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

parquet-tools
https://github.com/apache/parquet-mr

続きを読む

Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)

パーティション分割するジョブを作る

ジョブの内容

※”Glueの使い方的な①(GUIでジョブ実行)“(以後①とだけ書きます)と同様のcsvデータを使います

“csvデータのタイムスタンプのカラムごとにパーティション分割してparquetで出力する”

ジョブ名

se2_job1

クローラー名

se2_in0
se2_out1

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

※①で作ったものを使います。

テーブルの情報は以下です。

スクリーンショット 0030-01-03 15.45.21.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job1ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”です。

se2_job1
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job1_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

### Add(start)
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
#dropnullfields3 = DynamicFrame.fromDF(df, glueContext, "dropnullfields3")
### Add(end)

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
###
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-03 16.25.57.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/
2018-01-03 16:25:49        926 part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet . 
download: s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet to ./part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head  part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out1でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-03 16.37.26.png

S3の出力パスを入力

スクリーンショット 0030-01-03 16.37.50.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.38.46.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-03 16.38.55.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.39.03.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-03 16.39.16.png

クローラー実行

形式の違うデータが混在しているとテーブルが複数できてしまう

スクリーンショット 0030-01-03 16.41.46.png

クローラーを作り直し、”Add a data store”の”Exclude patterns”の箇所で
不要なものがあれば除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-05 20.18.38.png

クローラー実行後
1つのテーブルとして認識される

スクリーンショット 0030-01-03 16.48.52.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-03 16.49.09.png

Athenaから確認

左メニューからse2_out1のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-05 20.39.54.png

件数も19件で合っている

スクリーンショット 0030-01-03 16.51.35.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばcountryというカラムがあるので、国ごとに集計をするようなケースが多いならcountryも含めてパーティション分割する

他にもappidとかでアプリごとに集計したり
一時的な調査にも役立つかも

出力に”out2″ディレクトリ作成
①と同様のジョブをse2_job3で作成
作成したジョブのPySparkスクリプトに以下2点修正

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['country','year','month','day','hour']
output='s3://test-glue00/se2/out2/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

countryごとにパーティション別れてます

スクリーンショット 0030-01-03 17.33.24.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-03 17.36.02.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-03 17.37.19.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-03 17.39.12.png

件数も同じく19件

スクリーンショット 0030-01-03 17.38.57.png

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/preview/api/python/pyspark.sql.html
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

続きを読む

Glueの使い方的な①(GUIでジョブ実行)

GUIによる操作でGlueジョブを作って実行する

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

ジョブ名

se2_job0

クローラー名

se2_in0
se2_out0

全体の流れ

  • 前準備
  • クローラー作成と実行、Athenaで確認
  • ジョブの作成と実行、Athenaで確認
  • 出来上がったPySparkスクリプト確認

前準備

ジョブで使うIAM role

以下のポリシーを付与した任意の名前のロールを作っておく。今回はtest-glueという名前にした。
・AmazonS3FullAccess
・AWSGlueServiceRole
※権限は必要に応じてより厳しくしてください。今回は検証のため緩めにしてあります。

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

S3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

クローラー作成と実行、Athenaで確認

ジョブ作成の前に、こちらもGlueの重要な機能の1つであるData CatalogのCrawlerを作成して動かしていきます。
クローラはデータをスキャン、分類、スキーマ情報を自動認識し、そのメタデータをデータカタログに保存する機能です。これを使って入力となるサンプルデータのスキーマを事前に抽出してデータカタログにメタデータとして保存します。Athena、EMR、RedshiftからもこのGlueのデータカタログを使えます。Hive互換のメタストアです(Hiveメタストアのマネージドサービスと思ってもらえればいいと思います)。

基本的な操作はGUIを使って行えます。

AWSマネージメントコンソールから、Glueをクリックし、画面左側メニューの”Crawlers”をクリックし、”Add crawler”をクリック
スクリーンショット 0029-12-28 13.24.04.png

クローラーの名前入力
スクリーンショット 0029-12-28 11.19.32.png

S3にあるソースデータのパス入力(今回はS3に配置してあるデータが対象)
スクリーンショット 0030-01-02 15.17.11.png

そのまま”Next”
スクリーンショット 0029-12-28 9.38.41.png

“Choose an existing IAM role”にチェックを入れ、IAM roleをプルダウンからtest-glueを選択する
スクリーンショット 0030-01-02 15.17.49.png

“Run on demand”にチェックを入れ”Next”(今回のクローラーはスケジュールせずに手動実行とする)
スクリーンショット 0029-12-28 9.39.03.png

スキーマ情報を保存するDatabaseを選択、既存のものがあればそれでもいいし、なければ下の”Add database”でdatabase作成しそれを選択
Prefixは作成されるテーブル名の先頭に付くもの。見分け分類しやすいものにしておくと良いと思います(現状テーブルにtagとかつけられないので)
スクリーンショット 0029-12-28 9.39.23.png

クローラー実行

対象のクローラーにチェックを入れ、”Run Crawler”をクリック
スクリーンショット 0030-01-01 17.02.58.png

テーブルが出来上がる

“se2_”のPrefixが付いた”se2_in0″のテーブルができています。”in0″は指定した”Include Path”の一番下のディレクトリ名です。指定した”Include Path”(ソースデータがあるS3のパス、画像の”Location”の部分に表示されている)の配下のディレクトリは自動でパーティションとして認識されます。今回は配下にディレクトリはありませんのでパーティションも作成されません。
スクリーンショット 0030-01-02 15.21.15.png

テーブルの内容を確認するとスキーマが自動で作成されています
実データ配置場所のLocationがs3://test-glue00/se2/in0
Table名のNameが、prefixのse2_とInclude Pathで指定したs3://test-glue00/se2/in0の一番下のディレクトリのin0でse2_in0

Schemaを見るとuuidやappidなどがbigintで数値型になってます、文字列型がよければここでも修正できます。
今回は一旦このまま進めます
※本来はClassifierでいい感じにしたほうがいいと思う

スクリーンショット 0030-01-02 15.23.51.png

AthenaからもGlueのData Catalog使えます
Athenaから同様のテーブルとスキーマの内容が確認できます。Athenaがスキーマ情報にGlueのデータカタログを使ってることがわかります。画面右上にもGlue Data Catalogへのショートカットもありますね。
スクリーンショット 0030-01-02 15.27.07.png

もちろんクエリ実行もできます。
スクリーンショット 0030-01-02 15.29.31.png

ジョブの作成と実行、Athenaで確認

今回のようなジョブであれば、基本は画面ポチポチするだけです

Glueのメニューから”ETL”の”Job”をクリックし、”Add job”をクリック
スクリーンショット 0030-01-01 17.36.49.png

“Name”にジョブ名を入れ、”IAM role”はクローラーでも使ったロールを指定、”Temporary directory”は任意の場所で構いません。
スクリーンショット 0030-01-02 15.31.34.png

ソースデータとなるテーブルを選択。(先程作成したテーブルをクリック)
スクリーンショット 0030-01-02 15.33.35.png

ターゲットとなるテーブルは作成していないのでここで作ります。
今回は、保存先のData storeは”S3″、出力ファイルフォーマットのFormatは”Parquet”、出力先のパスのTargetPathは”任意の場所”を指定
※圧縮も選べます
スクリーンショット 0030-01-02 15.34.41.png

ソースデータとターゲットデータのマッピング変換ができます。いらないカラムを出力から除外したり、カラムの順序を変えたり、”Data Type”をstring,long,intなどに変えたり、新しいカラムを追加してそこに出力させる(カラム名を変える時に使えそうです)などができます
スクリーンショット 0029-12-28 10.08.35.png

次にサマリがでますので問題なければ”Finish”をクリック

ジョブの実行

作成したジョブにチェックを入れ、”Action”から”Run job”をクリック
スクリーンショット 0029-12-28 10.27.38.png

数分待って以下のように”Run status”が”Succeeded”となれば問題なく完了しています。
※問題があれば、”Error”の箇所にエラーの概要、”Logs”、”Error logs”の箇所の出力がリンクになっていてクリックするとCloudWatch logsに移動します。GlueのログはデフォルトでCloudWatch logsに出力されます
スクリーンショット 0030-01-02 15.46.47.png

出力したParquetフォーマットのファイルを、ソースデータと同様にクローラーを使ってスキーマを作り、スキーマオンリードでAthenaクエリを実行してみます。クローラー作成手順は前回と同様なので割愛します。
クローラーにより自動作成されたスキーマ
スクリーンショット 0030-01-02 15.49.41.png

クエリ結果です。データ量が少なくselectしてるだけなのでアレですが、parquetになったので列単位での集計処理などが高速化されるデータフォーマットに変換ができました。
スクリーンショット 0030-01-02 15.51.00.png

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”くらいであればGlueはGUIだけでサーバーレスでできます。

出来上がったPySparkスクリプト確認

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ビルトイン変換のいくつか補足

ApplyMapping:ソースの列とデータ型をターゲットの列とデータ型にマップします

ResolveChoice:複数の型の値が含まれている場合の列の処理方法を指定します。列を単一のデータ型にキャストするか、1つ以上の型を破棄するか、またはすべての型を別々の列または構造体に保持するかを選択できます
make_structは構造体を使用してデータを表現することにより、潜在的なあいまいさを解決します。たとえば、列のデータがintまたはstringの場合、make_structアクションを使用すると、生成されたDynamicFrameにintとstringの両方を含む構造体の列が生成されます。
choiceはspecが空の場合のデフォルトのresolution actionです

DropNullFields:nullフィールドを削除します

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

ジョブを作る際に似たようなジョブを作りたい、テスト段階でジョブのパラメータを一部だけ変えたジョブを作りたいことあると思います。現在はジョブのコピーがGUIからはできないのでそのあたりの運用を考慮する場合はCLIの利用がおすすめです。またあとで書きます。

こちらも是非

似た内容ですがより丁寧にかかれています。安定のクラメソブログ
https://dev.classmethod.jp/cloud/aws/aws-glue-released/
https://dev.classmethod.jp/cloud/aws/aws-glue-tutorial/

Built-In Transforms
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/built-in-transforms.html
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-ApplyMapping.html

続きを読む

ポテト工場から温泉旅館まで! “AI祭り”の2017年「事例記事」ベスト10

このほか、トレンドであるAIに関する記事が上位を占める一方で、4位にはハイパワー環境でのAWS活用、7位にはiPhone導入、9位にはセルフサービスBIなど、バラエティに富んだランキングとなりました。 続きを読む

iPhone と AWS だけで作るマイクロサービス – Qiita

先日参加したJAWSDAYSでもいろいろ紹介されていたLambda。 社内勉強会で1時間半程度のハンズオンネタになったので、これから勉強する方に活用いただければと思って投稿します AWS Lambdaとは 実行されるインフラを意識せずにアプリケーションだけ実行できるサービスです。 Developers Guide によると、 AWS … 続きを読む

コストレスアーキテクチャでLINE Botを作ろうとして苦戦した話

この記事は「ボット (Bot) Advent Calendar 2017」 の22日目の記事です。

今月、とあるイベントのサブ企画として、
「イベント用LINEアカウントに写真を投稿すると記念フレームが付いて返ってくる!」
というLINE Botを作って公開しました。

主に設計面で困ったことを中心に書こうと思います。

コストレスアーキテクチャとは!!!!

  • さっき考えた造語です
  • サーバレスアーキテクチャとほぼ同義です
  • 料金がかかりそうなものは極力使わないという一種の縛りプレイみたいなものです

経緯とか方針とか考えとか技術選定とか

  • 何か面白い企画できない?とイベント主催者に相談されたのがきっかけ
  • 2~300人の集客が想定された
  • 若者ばかりだったので、LINEならみんな使ってくれそうだと思った
  • ほぼ身内だし、最悪コケてもしゃあないくらいの軽い気持ち
  • なんか写真とか使って記念になりそうなことがしたい
  • 実装よりも企画やフレームのデザイン作りに時間をかけたい
  • 単発だしとりあえず動かせればおk
  • 単発だしデプロイとかも泥臭くていい
  • ケチだからお金は極力かけたくない(重要)

→AWS Lambda + node.jsで画像加工のLINEBotを作ろう。
動くものを安くサクッと仕上げよう。

構成について

当初、以下のような単純なものを想定していました。
やることは投稿画像をトリミングしてからフレームを合成するだけなので、
APIGateWayから実行したLambda内で処理を完結させ、
Messaging APIのReplyAPIで返信するつもりでした。

※Messaging APIはLINEが提供するAPIで、無料のDeveloperプランがあります
https://developers.line.me/ja/services/messaging-api/

Untitled Diagram (2).png

さぁ作るかと思ったのも束の間、いくつか課題が浮き彫りになりました。

  • Replyに必要なReplyTokenは数秒(?)で切れてしまう
  • Pushで画像を返す場合はURL指定が必須
  • Messaging APIのDeveloper Trialプランは友達50人まで

Replyに必要なReplyTokenは数秒(?)で切れてしまう

MessagingAPIには、
ユーザの投稿に返信するReplyAPIと
Bot側から投稿できるPushAPIがあります。

1つ目の問題は、Replyに必要なReplyTokenの切れる時間です。
何秒で切れるのかいまいち情報が無かったのですが、
LINEへの投稿で発行されたReplyTokenを確認して急いでコピってcurlを投げてもエラーになってしまうレベルでした。
文字の返信くらいしかできないのかなーという印象だったので、画像加工をする時間があるか不安でした。

Pushで返す画像はURL指定が必須

ならば上記の構成のままPushAPIで返せばいいじゃん!と思ったのですが、
PushAPIで画像を返す場合は、画像のパスをURLで指定しなければいけませんでした。

ユーザがLINEに投稿した画像は、画像取得APIを通じて取得できますが、
そのままフレーム合成してそのままReplyAPIで返すことができなかったため、
一度どこかのストレージに保存する必要がありました。
このため、functionをいくつかのstepに分けることになり、

①画像を取得してS3に保存
②S3の画像をトリミング、フレーム合成をしてもう一度保存
③S3の画像をPushAPIで返信
と、3つに分けました。

※①②は分けなくても良いのですが、1機能で1つとしました。

Untitled Diagram (1).png

PushAPIにはUserIDが必要ですが、投稿と非同期になるため返信先をどう特定するかが問題です。
しっかりやるなら、画像URLとUserIdをDBにでも持たせて、キューなどで順番にPushするのが綺麗だと思うのですが、
極力金をかけないという謎の縛りプレイを始めた私は、
画像の保存パスを

バケット名/UserId/YYYYMMDDhhmmss.png

とすることでDBは使わない方向でいきました。
後続のLambdaはS3へのPUTをトリガーに動くので、
画像のパスからUserIdを抜き取ってReplyします。

Messaging APIのDeveloperプランは友達50人まで

これが一番問題でした。
Developerプランの範疇を超えて使おうとしている自分がいけないのですが、
今回想定されるお客さんは2~300人なので、全然足りません(泣)

有料プランもありますが、目指すはコストレスです。
しかもPushAPIはDeveloperプランかプロプラン(¥32,400/月)でしか使えないのです。世の中世知辛い。

というわけで、苦肉の策でアカウントを6つ用意しました。

Untitled Diagram.png

展開用のURL(QRコード)はAPIGateWayのエンドポイントになっており、
ランダムに1つのLINEアカウントへのリンクを返します。

素晴らしいです。ロードバランサー的な役割によって、
友達リクエストを負荷分散をさせることができました(謎)

かなりごり押しですが、ここも妥協点です。

同じ人が複数回URLにアクセスしてくることも想定できましたが、
身内イベントだしまぁいっか、という感じです。

金がかからないのはいいが、デプロイがつらい

Botアカウント6つをポチポチ作りながら、ふと思いました。

    これはデプロイで死ねる 、と。

「単発なのでデプロイは泥臭くてもいい」とは言ったものの6つはあまりに面倒です。

MessagingAPIを使うためには、
LINEアカウント毎のCHANNEL_ACCESS_TOKENが必要になります。
これをLambdaの環境変数に設定する必要があるので、
裏側は処理は全て同じLambda、というわけにはいきませんでした。

もちろん、UserIdと共にCredential情報もDBに、と行きたいところですが、
目指すはコストレスです。

少しでもデプロイが楽になればとCloudFormation(AWS SAM)を使うことにしました。

Untitled Diagram (3).png

yaml形式のテンプレートによって、AWSの構成を立ち上げることができます。
多少面倒さは残りますが、立ち上げ時に環境変数にCredential情報を設定すればいいような形でつくりました。

あまり詳しくないのですが、Serverless Frameworkとかだと環境変数をファイルで外出しして、
コマンドの引数で与えるだけで各環境が作れたりするっぽいですね。すごい。

とりあえず、これで幾分かはスマートにいったと思います。

Lambdaの実装

ここは設計ではないので、割愛します。
Node.jsとjimpという画像加工ライブラリを使いました。
Node.jsを書くのは初めてだったので非同期処理やPromiseに翻弄されしんどかったです。
jimpはシンプルで分かりやすかったのでオススメです。
https://github.com/oliver-moran/jimp

イベント当日に公開。しかし…

無事、当日を迎えたのですが、
android端末の方から画像を保存できないという報告が。
OSのバージョンや、エラーメッセージのスクショをもらいそびれたのですが、
「LINEのサーバーから送られた画像じゃない!怪しいからダウンロードしないよ!」というようなエラーでした。

タイムライン上にはフレーム付きのサムネイルが返ってきているのですが、
保存しようとするとエラーが出るようでした。。くやしい。

動作確認は数人にお願いしていたのですが、全員iPhoneだったみたいです。。
LINEBotと言えども、アプリ上で使うものだということを全く考えてなかったです。。

 まとめ

  • なんとか無理やりコストレスで作れました。Lambdaは無料枠範囲内です。
  • 厳密にはS3とAPIGateWayが若干かかってますが、11,12月の請求見ても分からないレベルです。
  • コストレスアーキテクチャって言葉は流行らなくていいです。
  • お金はケチらないほうがいいな、と思いました。

続きを読む

Alexaの開発初心者が見ると幸せになれる場所

今年は、LINE Wave、Google home、Amazon Echoなど、音声認識デバイス元年と言っていいくらいのインパクトのある年になりました。

自分もいろいろやりたくなって、先日のre:Invent 2017ではAlexaのハッカソンに参加してきました。世界中がAlexaに注目しているのが肌感覚でわかって、よい刺激になりました:grinning:

日本でもAlexaが盛り上がりつつあり、最近日本語の情報も増えてきましたので、
ココらへんで一旦、開発するにあたって役に立つ情報をまとめておこうと思います。

入門

本家サイト

Alexaの開発者ポータル。ここから、ダッシュボードに進み、Alexaのスキル開発を行います。
https://developer.amazon.com/ja/alexa

端末管理

Alexa端末にスキルを追加したり、端末の設定を変更できます。
それぞれ、amazon.com と amazon.co.jp はアカウントが別扱いになるので注意。日本で利用する場合、amazon.co.jpを見てればいいと思います。iPhoneアプリとかはalexa.amazon.co.jpと同じ内容です。
https://alexa.amazon.co.jp
https://alexa.amazon.com

おすすめセッション

AWS re:invent 2017: It’s All in the Data: The Machine Learning Behind Alexa’s AI Sys (ALX319)

今回のre:Inventでのセッション。基本概念の理解に役立ちます。Echoなどの端末側とAlexa側での処理の役割分担や、処理フロー等など非常にわかりやすく説明されていたり、slotやutteranceなど慣れない用語を抑えるのにも最適です。

slideshare
https://www.slideshare.net/AmazonWebServices/alx319its-all-in-the-data
Youtube
https://youtu.be/7ZmdYFgbQ6w

個人的には、slideshare より Youtube のほうが流れが理解しやすいと思います。

開発

Hello world

Alexa skill kitを使い、スキル開発の流れが理解できます。初めにやってイメージを掴んでおくとよいでしょう。
https://developer.amazon.com/ja/docs/ask-overviews/build-skills-with-the-alexa-skills-kit.html

Github

alexa本家。ここで、実際のサンプルをたくさんみることができます。
https://github.com/alexa/

スターが多いのはAlexa Voice serviceのサンプルコード。
https://github.com/alexa/alexa-avs-sample-app
個人的には、alexa-cookbookがおすすめです。
https://github.com/alexa/alexa-cookbook

echosim

Alexaは実機がなくてもechosimでテストできます。
日本語のスキルにも対応済です。自分の開発者アカウントでログインし、スキルのテストをオンにすれば実機がなくても確認できます。また、ここでデバイス側の送受信データも確認できます。
https://echosim.io/

ask-cli

ASKコマンドラインインターフェース(ask-cli)の使い方。
インストールから手順を追って説明してあります。askコマンドを使うと、alexa側の設定と、lambda側を一つのリポジトリで開発できて、Alexa側とlambda側にまとめてデプロイできるので、おすすめ。開発者ポータルと、AWSを行ったり来たりしないでもよくなります。
ただし、現時点で、regionが ap-northeast-1 に非対応っぽいです。us-east-1にlambdaが作られるので注意です。
https://developer.amazon.com/ja/docs/smapi/ask-cli-command-reference.html

alexa-sdk

Node.jsで使うAlexa skill kit のSDK。
DynamoDBに簡単にデータを永続化したり、SSMLでwrapしたりしてくれる。Node.jsのlambdaで実装するなら、ほぼ必須
ask-cliを使う場合は、作成したプロジェクトに含まれるlambdaのpackage.jsonにすでに書かれているので、インストールする必要はありません。
https://github.com/alexa/alexa-skills-kit-sdk-for-nodejs

ralyxa

同僚に教えてもらった、Alexa とやり取りするときの Ruby製フレームワーク。Alexaはバックエンドの処理にhttpサーバーも選べるので、rubyで書きたいひとにおすすめ。シナトラで動く。
https://github.com/sjmog/ralyxa
こっちがサンプル。
https://github.com/sjmog/ralyxa_example

flask-ask

Alexa Skills Kit for Python。 Flaskの拡張機能。処理側をpythonで書きたい人におすすめ。
https://github.com/johnwheeler/flask-ask

alexa-skills-kit-java

JavaのAlexa Skills Kit SDK。
https://github.com/amzn/alexa-skills-kit-java
Javaのサンプルはこの辺です。
https://github.com/alexa/skill-samples-java

音声合成のマークアップ(SSML)

Alexaでは、SSMLを使って豊かな音声表現を実現できます。

SSMLの仕様
https://www.w3.org/TR/2010/REC-speech-synthesis11-20100907/
SSMLの仕様(和訳) 
http://www.asahi-net.or.jp/~ax2s-kmtn/ref/accessibility/REC-speech-synthesis11-20100907.html

Alexaで使えるSSMLタグ一覧
https://developer.amazon.com/ja/docs/custom-skills/speech-synthesis-markup-language-ssml-reference.html#ssml-supported

リファレンス

Alexa開発者ブログ
https://developer.amazon.com/ja/blogs/alexa/
Alexa開発者ブログ(日本タグ)
https://developer.amazon.com/ja/blogs/alexa/tag/japan

クラスメソッドさんのAlexa系の記事
いつも大変お世話になっております。
https://dev.classmethod.jp/referencecat/voice-assistant-amazon-alexa/

まとめ

日本では、まだEchoなどの端末がすぐに手に入る状態ではない(招待メールをリクエストしないと、買うことすらできない)ですが、今後は徐々に手に入りやすくなると思います。

実機がなくてもechosimを使えばスキルの開発はできるので先に触って、自分のアイデアを形にしておくのがいいと思います。

何か作ってみたいと思った人が迷わずに開発できるよう本記事が役立てば幸いです:slight_smile:

続きを読む