ブラウザからAndroidアプリを動かすGenymotion AWSの紹介

紹介 AWS · iPhone・Androidのオススメアプリ紹介! Androidアプリ「サイコロ3D」1.07の紹介動画 · 番組紹介 – NHK 週刊 ニュース深読み · 「転職ナビ」Androidアプリ紹介動画 · このタグ見た人は2文字で自己紹介する · 1枚目:大人数での飲み会(持ち込み有)全員の自己紹介で2時間ですよ!!尺犠牲。 検索対象: 検索 … 続きを読む

雲勉:大阪【勉強会】AWSとDatalakeとDatawareHouse

概要. AWSでDatalakeを作成する(したい)っていう話を良く聞きます. Datalakeを構築したは良いものの、それをどう活用するかの話はなかなか耳にしません. AWSで構築したDatalakeの活用方法の1例を紹介します. お願い. 資料の写真撮影はOKですが、撮影音が気になる方もおられますので iPhone Microsoft Pix カメラ( … 続きを読む

カテゴリー 未分類 | タグ

iPhoneで使える!OCRアプリ(文字認識)

お客様との打ち合わせの中で、ある文章をホームページに埋め込みたいという要望がありました。 その資料を見ると、なんとも膨大な量の文字。 これは、入力するのに、かなりの時間がかかるな~と思いつつ、もしかしたら、iPhoneで写真を撮って、それを文字認識してテキストデータにしてくるアプリがあるんじゃないかと、探して … 続きを読む

カテゴリー 未分類 | タグ

AWSとAzureとGCPを比較してみる – FaaS編

FaaSについてAWSとAzureとGCPを比較してみました。

注)

1. FaaS比較表

AWS Azure GCP
Lambda Functions Cloud Functions***
言語 Python,
node.js,
java,
C#,
go
ランタイムバージョン1.X
C#,
JavaScript,
F#,
Python*,
PHP*,
TypeScript*,
バッチ (.cmd、.bat)*,
Bash*,
PowerShell*

ランタイムバージョン2.X
C#**,
JavaScript**,
Java**
node.js***
最大実行時間 5分 10分 (従量課金プラン)
無制限 (App Serviceプラン)
9分
直接HTTPアクセスを受け付けるか 受け付けない(API Gatewayと連携必要) 受け付ける 受け付ける
トリガー Amazon S3,
Amazon DynamoDB,
Amazon Kinesis Data Streams,
Amazon Simple Notification Service,
Amazon Simple Email Service,
Amazon Cognito,
AWS CloudFormation,
Amazon CloudWatch Logs,
Amazon CloudWatch Events,
AWS CodeCommit,
スケジュールされたイベント (Amazon CloudWatch Events を使用),
AWS Config,
Amazon Alexa,
Amazon Lex,
Amazon API Gateway,
AWS IoT ボタン,
Amazon CloudFront,
Amazon Kinesis Data
Blob Storage,
Cosmos DB,
Event Hubs,
HTTP,
Microsoft Graph Events(2.Xのみ),
Queue storage,
Service Bus,
Timer,
Webhooks(1.Xのみ)
HTTP,
Cloud Storage,
Cloud Pub/Sub

*試験段階
**プレビュー
***ベータ

2. 対応言語の比較

言語の種類は試験段階とプレビューを含めればAzureが一番多いのですが、正式リリースされたものに限定すればAWSの方が種類が多いです。
一方GCPは機能自体がベータリリースなので、まだこれからといった感じでしょうか。

AzureはBashにも対応しているのが特徴です。運用系のシェルスクリプトをFaaS化すれば、スクリプト用のサーバが不要になりますね。

3. 最大実行時間

最大実行時間はAzureの10分(要host.jsonのfunctionTimeoutプロパティ変更)、GCPの9分に対しAWS Lamdbaは5分と約半分です。実際にAWS Lambdaを利用していると5分の壁を結構感じます。この点は他クラウドが羨ましいですね。
2017年のRe:InventでAWSはFargateというコンテナのサービスをリリースしましたが、このサービスがlambdaが5分以上実行できないことに対するAWSからの回答のように感じます。

4. 直接HTTPアクセスを受け付けるか

AWS lambdaだけ直接HTTPアクセスを受け付けることができません。HTTPアクセスを受け付けるには、API Gatewayと連携する必要がありますが、多機能な分やや設定が面倒な印象です。(但しAPI経由でLambdaを起動することは可能)

まとめ

AWS Lambdaのリリース後、Azure・GCP・Bluemix(現IBM Cloud)は超特急で追従しました。AWS LambdaがIT業界に与えたインパクトはとても大きかったと思います。
現在は「FaaS無ければばクラウドにあらず」といったところでしょうか。

また、AWS GreengrassやAzure IoT Edge**というエッジにデプロイするサービスも出てきています。
将来AWS LambdaがiPhoneやApple Watchにデプロイできるようにならないかなーと妄想中です。

**プレビュー

続きを読む

Glueの使い方的な⑦(Step Functionsでジョブフロー)

Step FunctionsでGlueのジョブフローを作る

Glueの使い方的な③(CLIでジョブ作成)“(以後③と書きます)で書いたように、現在Glueのジョブスケジュール機能は簡易的なものなので、複雑なジョブフロー形成には別のスケジューラーが必要になる場合もあります。
例えばGlueのクローラーとGlueジョブもそれぞれにスケジュール機能があり統合したジョブフローを作ることがGlueだけでは出来ません(例えばクローラーを実行し終わったらジョブを実行するとか)。今回はサーバーレスなジョブフローのサービスであるStep Functionsを使って、クローラーを実行し正常終了したら後続のジョブを実行するというフローを作ってみます。

全体の流れ

  • Glue処理内容
  • StepFunctionsの処理内容
  • 前準備
  • Step FunctionsでStateMachine作成
  • 実行

処理内容

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)で実行したものと同じクローラーとジョブを使います。入力データも出力結果も①と同じです。
今回行うのはGlueクローラー処理が終わったら次のGlueジョブ処理開始というジョブフロー形成です。

あらためて①のクローラーとジョブの処理内容は以下の通りです

クローラーの内容

入力のCSVファイルからスキーマを作成します

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

Step Functionsを使ったジョブフローの内容

図の四角をStep Functionsでは”State”と呼びます。処理の1単位と思ってください。

ジョブフローは以下のような形です。

Stateごとに流れを説明します

  • “Submit Crawler Job”でLambdaを使いGlueクローラーを実行
  • “Wait X Seconds”で指定時間待つ
  • “Get Crawler Job Status”でLambdaを使いGlueクローラーの状態をポーリングして確認
  • “Job Complete?”で状態を判定して結果によって3つに処理が分岐
    • 失敗なら”Job Failed”エラー処理
    • 終了なら”Run Final Glue Job”でLambdaを使い後続のGlueジョブを実行
    • 処理中なら”Add Count”でLambdaを使いカウンタをインクリメント。
      • “Add Count”の後”Chk Count”でカウンタをチェックし3回以上になっていたら”Job Failed Timeout”でタイムアウト処理、3未満なら”Wait X Seconds”に戻りループ処理

スクリーンショット 0030-01-13 21.47.05.png

前準備

①と同じです

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

入力ファイルをS3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

ジョブのPySparkスクリプト

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

入力のCSVデータのスキーマ

クローラーによって作成されるスキーマ

スクリーンショット 0030-01-13 22.01.43.png

StepFunctionsでStateMachine作成

StepFunctionsは一連のジョブフローをJSONで定義しこれを”StateMachine”と呼びます。
StateMachine内の処理の1つ1つの四角をStateと呼びます。処理の1単位です。
このJSONの記述はASL(AmazonStatesLanguages)と呼ばれStateTypeとしてChoice(分岐処理)やWait(待ち)やParallel(並列実行)などがJSONだけで表現出来ます。またTaskというStateTypeからはLambdaやアクティビティ(EC2からStepFunctionsをポーリングする)を定義できます。前述の通り今回はLmabdaを使います。

マネージメントコンソールからいくつかあるテンプレートを元に作ることも出来ますが、カスタムでJSONを一から作ることもできます。

新規StateMachine作成画面
“Author from scrach”で一からJSON作成

スクリーンショット 0030-01-14 10.24.59.png

“Template”を選ぶとASLのStateパターンのいくつかのテンプレが選べます

スクリーンショット 0030-01-14 10.28.14.png

左側の”コード”部分にJSONを書き、右側の”ビジュアルワークフロー”の部分にJSONコードで書いたフローがビジュアライズされます

スクリーンショット 0030-01-14 10.29.50.png

StateMachine

今回のStateMachieのJSONは以下です。
内容は前述の通りです。
※[AWSID]のところは自身のAWSIDと置き換えてください

{
  "Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.",
  "StartAt": "Submit Crawler Job",
  "States": {
    "Submit Crawler Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-cr1",
      "ResultPath": "$.chkcount",
      "Next": "Wait X Seconds",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 120,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
    "Wait X Seconds": {
      "Type": "Wait",
      "SecondsPath": "$.wait_time",
      "Next": "Get Crawler Job Status"
    },
    "Get Crawler Job Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-crcheck",
      "Next": "Job Complete?",
      "InputPath": "$",
      "ResultPath": "$.response",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Job Complete?": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.response",
          "StringEquals": "FAILED",
          "Next": "Job Failed"
        },
        {
          "Variable": "$.response",
          "StringEquals": "READY",
          "Next": "Run Final Glue Job"
        }
      ],
      "Default": "Add Count"
        },
    "Add Count": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-addcount",
      "Next": "Chk Count",
      "InputPath": "$",
      "ResultPath": "$.chkcount",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Chk Count": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.chkcount",
          "NumericGreaterThan": 3,
          "Next": "Job Failed Timeout"
        }],
      "Default": "Wait X Seconds"
    },
    "Job Failed": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED"
    },
        "Job Failed Timeout": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED Because of Timeout"
    },
    "Run Final Glue Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-job1",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    }
  }
}

Lambda

今回使うLambdaは4つです。流れも振り返りながら見ていきます
書き方はいろいろあるし今回はエラーハンドリングも甘いのであくまでも動きのイメージをつかむための参考程度にしてください。最後のGlueジョブの実行についてはジョブの終了判定とかはしてないです。

“Submit Crawler Job”

GlueのAPIを使ってクローラーのStartを行う

glue-test1-cr1
# coding: UTF-8

import sys
import boto3
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_crawler(Name='se2_in0')
    return 1

“Wait X Seconds”

Waitで指定秒数待つ

“Get Crawler Job Status”

GlueのAPIを使ってクローラーのステータスを取得します

glue-test1-crcheck
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.get_crawler(Name='se2_in0')
    response = response['Crawler']['State']
    return response

“Job Complete?”

Choiceで取得したステータスが、”READY”なら正常終了、”FAILED”なら失敗、それ以外は実行中の分岐処理

“Job Failed”

ステータスが失敗なら
FailでStepFunctionsをエラーさせます

“Run Final Glue Job”

ステータスが正常終了なら
GlueのAPIを使ってジョブをStartします

glue-test1-job1
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_job_run(
    JobName='se2_job0')
    return response['JobRunId']

“Add Count”

クローラーがまだ実行中なら
カウンタにインクリメントします

glue-test1-addcount
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    chkcount = event["chkcount"]
    chkcount = chkcount + 1

    return chkcount

“Chk Count”

choiceでカウンタが3未満か3以上かをチェックします

“Job Failed Timeout”

Failでカウンタが3以上だった時のエラー処理

“Wait X Seconds”

3未満の場合はここに戻りループ処理

実行

Step Functionsを実行

作成したStateMachineを選び”新しい実行”をクリック

スクリーンショット 0030-01-14 10.54.54.png

JSONに引数を入れて”実行の開始”をクリック
今回はJSON内で使う変数で”wait_time”を60秒で待ちの時間として入力しています

スクリーンショット 0030-01-14 10.55.52.png

実行状況

スクリーンショット 0030-01-14 10.59.44.png

CloudWatchイベントでスケジュール

あとは上記で作成したStateMachineをCloudWatchイベントでCRON指定すれば定期的実行されるジョブフローの完成です。This is Serverless!

スクリーンショット 0030-01-13 22.34.00.png

その他

今回はクローラー実行後にジョブ実行というシンプルなフローでしたが、Step Functionsは並列度を替えたり引数の受け渡しをしたり、さらにLambdaでロジックを書くことができるので自由度高く複雑なフローの作成が行えます。Glueとの相性はいいのではないでしょうか?

JSON部分も30分もあれば学習完了というカジュアルさがありLambdaを使ってAPI操作で様々なAWSの処理を繋げるのにはとてもいい印象です。

かなりシンプルな処理だったのですがコードがやや多い印象で、より複雑な処理になると結構大きいJSONになりそうで、JSONなのでコメント書けないとか少し大変な部分が出て来るのかもしれません。

バージョン管理を考えるとCliでの処理で運用したほうが良さそうですが、こういったサービスはGUIでの良さもあるのでどちらに比重を置いた運用がいいかは考慮が必要かもです

本文中で使ったカウンタのステート情報はDynamoDBなどに入れた方が良いかもです。

マイクロサービス化しやすいので、極力本来の処理のロジックをLambda側にやらせてそれ以外のフロー処理(分岐とかカウンタインクリメントとか)をJSONで書くのがいいと思います。今回カウンタはLambdaでやってしまいましたが。

ログはCloudWatchLogsに出ます

To Be Continue

TODO

参考

StepFunctions BlackBelt資料
https://www.slideshare.net/AmazonWebServicesJapan/20170726-black-beltstepfunctions-78267693

続きを読む

Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)

パーティション分割csv->パーティション分割parquet

ジョブの内容

※”Glueの使い方①(GUIでジョブ実行)”(以後①とだけ書きます)と同様のcsvデータを使います

“パーティション分割されたcsvデータを同じパーティションで別の場所にparquetで出力する”

ジョブ名

se2_job4

クローラー名

se2_in1
se2_out3

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

内容としては①と同じデータで、year,month,day,hourのパーティションごとに分けたcsvファイルを配置します。
year,month,day,hourのカラムは削除しています。

元となる①のデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

今回使う入力データ(19件)
year,month,day,hourのカラムは削除しています

$ ls
cvlog_2017111112.csv    cvlog_2017120101.csv    cvlog_2017121621.csv
cvlog_2017111414.csv    cvlog_2017120218.csv    cvlog_2017121714.csv
cvlog_2017112114.csv    cvlog_2017120904.csv    cvlog_2017121718.csv
cvlog_2017112915.csv    cvlog_2017121412.csv    cvlog_2017121908.csv
cvlog_2017113014.csv    cvlog_2017121414.csv    cvlog_2017121914.csv
cvlog_2017113020.csv    cvlog_2017121511.csv    cvlog_2017122915.csv
$ cat *
deviceid,uuid,appid,country
iphone,11121,001,JP
deviceid,uuid,appid,country
iphone,11123,009,FR
deviceid,uuid,appid,country
iphone,11119,007,AUS
deviceid,uuid,appid,country
other,11110,005,JP
iphone,11125,005,JP
deviceid,uuid,appid,country
iphone,11129,007,AUS
deviceid,uuid,appid,country
android,11122,001,FR
deviceid,uuid,appid,country
pc,11118,001,FR
deviceid,uuid,appid,country
pc,11117,009,FR
deviceid,uuid,appid,country
iphone,11128,009,FR
deviceid,uuid,appid,country
iphone,11111,001,JP
deviceid,uuid,appid,country
android,11112,001,FR
deviceid,uuid,appid,country
iphone,11116,001,JP
deviceid,uuid,appid,country
iphone,11113,009,FR
deviceid,uuid,appid,country
iphone,11124,007,AUS
deviceid,uuid,appid,country
iphone,11114,007,AUS
deviceid,uuid,appid,country
iphone,11126,001,JP
deviceid,uuid,appid,country
android,11127,001,FR
deviceid,uuid,appid,country
other,11115,005,JP

データの場所
year,month,day,hourのパーティションごとに分けたcsvファイルを配置しています

# aws s3 ls s3://test-glue00/se2/in1/year=2017/ --recursive
2018-01-04 09:38:13          0 se2/in1/year=2017/
2018-01-04 09:40:12          0 se2/in1/year=2017/month=11/
2018-01-04 09:40:38          0 se2/in1/year=2017/month=11/day=11/
2018-01-04 09:41:23          0 se2/in1/year=2017/month=11/day=11/hour=12/
2018-01-04 10:19:28         48 se2/in1/year=2017/month=11/day=11/hour=12/cvlog_2017111112.csv
2018-01-04 09:40:42          0 se2/in1/year=2017/month=11/day=14/
2018-01-04 09:41:41          0 se2/in1/year=2017/month=11/day=14/hour=14/
2018-01-04 10:19:45         48 se2/in1/year=2017/month=11/day=14/hour=14/cvlog_2017111414.csv
2018-01-04 09:40:47          0 se2/in1/year=2017/month=11/day=21/
2018-01-04 09:41:55          0 se2/in1/year=2017/month=11/day=21/hour=14/
2018-01-04 10:20:01         49 se2/in1/year=2017/month=11/day=21/hour=14/cvlog_2017112114.csv
2018-01-04 09:40:50          0 se2/in1/year=2017/month=11/day=29/
2018-01-04 09:42:09          0 se2/in1/year=2017/month=11/day=29/hour=15/
2018-01-04 10:20:22         67 se2/in1/year=2017/month=11/day=29/hour=15/cvlog_2017112915.csv
2018-01-04 09:41:01          0 se2/in1/year=2017/month=11/day=30/
2018-01-04 09:42:22          0 se2/in1/year=2017/month=11/day=30/hour=14/
2018-01-04 10:20:41         49 se2/in1/year=2017/month=11/day=30/hour=14/cvlog_2017113014.csv
2018-01-04 09:42:40          0 se2/in1/year=2017/month=11/day=30/hour=20/
2018-01-04 10:20:52         49 se2/in1/year=2017/month=11/day=30/hour=20/cvlog_2017113020.csv
2018-01-04 09:40:16          0 se2/in1/year=2017/month=12/
2018-01-04 09:43:11          0 se2/in1/year=2017/month=12/day=1/
2018-01-04 09:45:16          0 se2/in1/year=2017/month=12/day=1/hour=1/
2018-01-04 10:21:19         44 se2/in1/year=2017/month=12/day=1/hour=1/cvlog_2017120101.csv
2018-01-04 09:43:21          0 se2/in1/year=2017/month=12/day=14/
2018-01-04 09:46:50          0 se2/in1/year=2017/month=12/day=14/hour=12/
2018-01-04 10:22:28         48 se2/in1/year=2017/month=12/day=14/hour=12/cvlog_2017121412.csv
2018-01-04 09:47:01          0 se2/in1/year=2017/month=12/day=14/hour=14/
2018-01-04 10:22:51         49 se2/in1/year=2017/month=12/day=14/hour=14/cvlog_2017121414.csv
2018-01-04 09:43:38          0 se2/in1/year=2017/month=12/day=15/
2018-01-04 09:47:11          0 se2/in1/year=2017/month=12/day=15/hour=11/
2018-01-04 10:23:12         48 se2/in1/year=2017/month=12/day=15/hour=11/cvlog_2017121511.csv
2018-01-04 09:43:42          0 se2/in1/year=2017/month=12/day=16/
2018-01-04 09:47:23          0 se2/in1/year=2017/month=12/day=16/hour=21/
2018-01-04 10:23:34         48 se2/in1/year=2017/month=12/day=16/hour=21/cvlog_2017121621.csv
2018-01-04 09:43:45          0 se2/in1/year=2017/month=12/day=17/
2018-01-04 09:47:38          0 se2/in1/year=2017/month=12/day=17/hour=14/
2018-01-04 10:23:54         49 se2/in1/year=2017/month=12/day=17/hour=14/cvlog_2017121714.csv
2018-01-04 09:47:43          0 se2/in1/year=2017/month=12/day=17/hour=18/
2018-01-04 10:24:12         49 se2/in1/year=2017/month=12/day=17/hour=18/cvlog_2017121718.csv
2018-01-04 09:43:49          0 se2/in1/year=2017/month=12/day=19/
2018-01-04 09:48:04          0 se2/in1/year=2017/month=12/day=19/hour=14/
2018-01-04 10:25:07         49 se2/in1/year=2017/month=12/day=19/hour=14/cvlog_2017121914.csv
2018-01-04 09:47:59          0 se2/in1/year=2017/month=12/day=19/hour=8/
2018-01-04 10:24:56         48 se2/in1/year=2017/month=12/day=19/hour=8/cvlog_2017121908.csv
2018-01-04 09:43:15          0 se2/in1/year=2017/month=12/day=2/
2018-01-04 09:45:44          0 se2/in1/year=2017/month=12/day=2/hour=18/
2018-01-04 10:21:41         44 se2/in1/year=2017/month=12/day=2/hour=18/cvlog_2017120218.csv
2018-01-04 09:43:52          0 se2/in1/year=2017/month=12/day=29/
2018-01-04 09:48:17          0 se2/in1/year=2017/month=12/day=29/hour=15/
2018-01-04 10:25:26         47 se2/in1/year=2017/month=12/day=29/hour=15/cvlog_2017122915.csv
2018-01-04 09:43:18          0 se2/in1/year=2017/month=12/day=9/
2018-01-04 09:46:24          0 se2/in1/year=2017/month=12/day=9/hour=4/
2018-01-04 10:22:02         48 se2/in1/year=2017/month=12/day=9/hour=4/cvlog_2017120904.csv

S3のディレクトリ構成

Glueジョブの入力データは”in1″ディレクトリ配下、出力は”out3″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE in1/
                           PRE out0/
                           PRE out1/
                           PRE out2/
                           PRE out3/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

入力データ用に新しくクローラーを作り実行してテーブルを作ります。

出来上がるテーブルの情報は以下です。

スクリーンショット 0030-01-04 10.52.04.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job4ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”パーティション分割されたcsvデータを同じパーティションで別の場所にparquetとして出力する”です。

se2_job4
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションをマッピング対象のカラムとして含めてくれません
入力のパーティションのカラムをマップの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

↓↓↓

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job4_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out3/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
###add

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-04 15.54.04.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/
2018-01-04 15:15:41        926 part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet .
download: s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet to ./part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# ls
part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out3でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-04 15.59.33.png

S3の出力パスを入力
形式の違うデータが混在しているとテーブルが複数できてしまうので、不要なものがあれば、excludeで除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-04 16.00.06.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.19.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-04 16.00.28.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.36.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-04 16.00.49.png

クローラー実行

1つのテーブルとして認識している

スクリーンショット 0030-01-04 16.09.01.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-04 16.09.52.png

Athenaから確認

左メニューからse2_out3のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-04 16.11.18.png

件数も19件で合っている

スクリーンショット 0030-01-04 16.11.01.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばappidというカラムがあるので、アプリごとに集計をするようなケースが多いならappidも含めてパーティション分割する

他にもdeviceidとかでデバイスごとに集計したり
一時的な調査にも役立つかも

出力に”out4″ディレクトリ作成
①と同様のジョブをse2_job5で作成
PySparkに以下3点修正したジョブ作成

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションはカラムとして含めてくれません。
入力のパーティションのカラムをapplyの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['appid','year','month','day','hour']
output='s3://test-glue00/se2/out4/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

appidごとにパーティション別れてます

スクリーンショット 0030-01-04 16.30.35.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-04 16.33.32.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-04 16.33.49.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-04 16.34.10.png

件数も同じく19件

スクリーンショット 0030-01-04 16.34.39.png

その他

todo

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/latest/sql-programming-guide.html

続きを読む

Glueの使い方的な④(ブックマーク)

ブックマークを使って一度処理したデータは処理対象外とする

入力パスの先にデータがあっても既に処理済なデータは除外したい。そんな思いになったことありませんでしょうか?

これを実現する機能がGlueの”ブックマーク”です。

全体の流れ

  • 前提
  • ブックマークの効果を見る
  • ジョブの永続的なブックマーク有効化
  • トリガー側のブックマーク有効無効
  • (おまけの考察)

前提

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)、”Glueの使い方的な③(CLIでジョブ実行)“(以後③と書きます)あたりを読んでいただけるとスムーズです

今回扱うジョブは①と同じ内容です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

③の後ろの方で書いたように、このジョブはGlueのGUIだけで作成し、1つのcsvを1つのparquetにするだけのなので、このジョブを2回実行すると同じ内容の出力が2つ出来てしまいます。

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

*①と同じディレクトリ
Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

ブックマークの効果を見る

①や③でジョブを実行して出力ファイルがout0に既に存在しているのでそれを事前に消しておきます。

想定される流れと結果

1回目の実行で、①と同じくparquetの出力ファイルが1つできる。(あとメタデータ2つ)
こんな感じ

2018-01-02 10:44:04        782 _common_metadata
2018-01-02 10:44:04       1746 _metadata
2018-01-02 10:27:05       2077 part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet

ブックマークを有効化することで、2回目の実行で、既に処理した入力ファイルは処理されない為、新たなparquetファイルは出力されず1つのままになります

やってみる

ジョブは①で作ったのと同じ内容で新たにジョブをse2_job2という名前で作ります
ジョブの内容は以下です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

最初のジョブの状態

実行履歴が1件ありますが気にしないでください。
スクリーンショット 0030-01-02 17.38.20.png

1回目実行

ブックマークを有効にして実行する
“Action”の”Run job”をクリックする

※ちなみにこの画面のActionの”Reset job bookmark”でブックマークに保持された状態をリセットできます

スクリーンショット 0030-01-02 17.39.21.png

Parameter(optional)の画面で”Job bookmark”を”Enable”にして”Run job”をクリックする
今回のみ有効なパラメータとしてジョブを実行します。

スクリーンショット 0030-01-02 17.39.06.png

成功して履歴が1つ増えています
スクリーンショット 0030-01-02 17.49.46.png

S3にparquetファイル1つとメタデータ2つがほぼ同じ時刻で作成されています。

スクリーンショット 0030-01-02 17.50.06.png

続いて同じ手順で2回目を実行します。

実行後のS3はメタデータは更新されている(?)
ただ、肝心のparquetファイルは1つのみで、1度処理した入力データを対象外としていることがわかる
※メタデータの内容は後半で確認してみます

スクリーンショット 0030-01-02 17.55.50.png

このようにGlueのブックマークを有効にしておくと、同じ場所にあるソースデータの中でまだ処理していないデータだけを処理対象とすることができます。

ジョブ側の永続的なブックマーク有効化

さっきまではジョブ実行時の1度だけのブックマーク有効でした
今回は永続的なブックマーク有効化をします。

対象ジョブにチェックを入れ、”Action”をクリックし、”Edit job”をクリックする

スクリーンショット 0030-01-02 17.53.30.png

この画面が出て一見ブックマークの設定箇所がなさそうだが、実は下にスクロールできる

スクリーンショット 0030-01-02 17.53.49.png

あった

スクリーンショット 0030-01-02 17.54.07.png

“Enable”に変更して”Save”
するとエラーが、、”temporary directory”と”IAM role”をこのタイミングでも入れないといけなようです。
それぞれ入力後”Save”をクリックします。

スクリーンショット 0030-01-02 17.54.20.png

画面の右端にあるように”Job bookmark”が”Enable”になっているのがわかります。

スクリーンショット 0030-01-02 18.11.49.png

トリガー側のブックマーク有効無効

ブックマークの有効無効確認

対象のトリガーのse2_trigger2の部分をクリック

スクリーンショット 0030-01-02 18.18.50.png

以下の画面になり、Parametersのところに”–job-bookmark-option:job-bookmark-enable”とあればブックマークが有効になっている

スクリーンショット 0030-01-02 18.19.04.png

ブックマーク有効無効変更

対象のトリガーse2_trigger2にチェックを入れ、”Action”をクリックし”Edit trigger”をクリックする

スクリーンショット 0030-01-02 18.16.45.png

“Next”をクリックする

スクリーンショット 0030-01-02 18.16.59.png

画面下の部分に”Job bookmark”がありDisable、Enable、Pauseの3つの状態が選べる
これを選びNextと進めればよさそうだが、それだけだとダメである

スクリーンショット 0030-01-02 18.17.16.png

画面下部にあるKeyとValueの箇所の”job-bookmark-enable”を事前に消しておく必要がある。右側の✖をクリックすることで消せます。消した後に”Job bookmark”をDisableにして”Next”をクリックし、次にサマリが出るので問題なければ”Finish”をクリックする

スクリーンショット 0030-01-02 18.17.39.png

確認すると”–job-bookmark-option:job-bookmark-disable”になっていて無効に変わったことがわかる

スクリーンショット 0030-01-02 18.33.18.png

(ブックマーク有効にしてジョブ実行してもメタデータが更新された件の考察)

結果的にはparquetの仕様っぽいので気にする必要もなさそうです。

メタを詳細に見るためにparquet-toolsを入れます。
このツール相変わらず普通にビルドできない・・

ここは弊社の担々麺デカの力を借りて無事入りました。(+1タンタンメン)
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

とは言え改善してるんじゃと思い、ちょっと脱線しますが

【parquet-toolsインストール2018年版】

github
また(?)リポジトリ変わってます・・
https://github.com/apache/parquet-mr

最新のv1.8.0、v1.8.1はpom.xmlをゴニョゴニョしないとダメだ
Issueは多分こちら
https://issues.apache.org/jira/browse/PARQUET-1129

v1.7はなぜかなく、結果的にv1.6系最後(?)の1.6.0rc7だとすんなりビルドできました。

jdkインストール

yum -y install java-1.7.0-openjdk-devel

mavenインストール

wget http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar xvfz apache-maven-3.3.9-bin.tar.gz 
./apache-maven-3.3.9/bin/mvn -version

.bash_profileとかでパス通す

PATH=$PATH:$HOME/bin:/root/apache-maven-3.3.9/bin

parquet-toolsインストール

git clone https://github.com/apache/parquet-mr.git
cd parquet-mr
git checkout ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7
cd parquet-tools/
mvn clean package -Plocal 

実行

aws s3 cp s3://test-glue00/se2/out0/ .data/ --recursive

java -jar target/parquet-tools-1.6.0rc7.jar head -n 1 data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
deviceid = iphone
uuid = 11111
appid = 1
country = JP
year = 2017
month = 12
day = 14
hour = 12

parquet-toolsでメタ情報も見れます。
ブックマークを使ったのにメタデータは更新があったので、更新前と後のメタの情報を比較してみます。

メタ情報表示

# java -jar target/parquet-tools-1.6.0rc7.jar meta data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
file:        file:/root/parquet/parquet-mr/parquet-tools/data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"deviceid","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"long","nullable":true,"metadata":{}},{"name":"appid","type":"long","nullable":true,"metadata":{}},{"name":"country","type":"string","nullable":true,"metadata":{}},{"name":"year","type":"long","nullable":true,"metadata":{}},{"name":"month","type":"long","nullable":true,"metadata":{}},{"name":"day","type":"long","nullable":true,"metadata":{}},{"name":"hour","type":"long","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
deviceid:    OPTIONAL BINARY O:UTF8 R:0 D:1
uuid:        OPTIONAL INT64 R:0 D:1
appid:       OPTIONAL INT64 R:0 D:1
country:     OPTIONAL BINARY O:UTF8 R:0 D:1
year:        OPTIONAL INT64 R:0 D:1
month:       OPTIONAL INT64 R:0 D:1
day:         OPTIONAL INT64 R:0 D:1
hour:        OPTIONAL INT64 R:0 D:1

row group 1: RC:19 TS:952 OFFSET:4 
--------------------------------------------------------------------------------
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 ENC:PLAIN,BIT_PACKED,RLE
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE

更新前のメタ情報:/tmp/data1/meta.log
更新後のメタ情報:/tmp/data2/meta.log
diffは見切れちゃってますが、差分は以下の並びが違うということでした。内容は同じです。
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
もう1回ジョブを実行した場合、上記の並びは同じでしたがタイムスタンプは最新になってました。
そういうもののようです;

# sdiff -s /tmp/data1/meta.log /tmp/data2/meta.log 
file:        file:/root/parquet/parquet-mr/parquet-tools/data | file:        file:/root/parquet/parquet-mr/parquet-tools/data
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E | deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 | uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19  | appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19  | country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E | year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E | month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 | day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 | hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19

その他

  • Glueで自動生成されるPySparkコードに以下のようなコンテキストオブジェクトがあります。これにはブックマークとしての意味もあり、それぞれの実行時にソース、変換、およびシンクの状態をブックマークの状態のキーとして使用します。状態はタイムスタンプとして記録し維持します。
    なのでブックマークを使用していない場合は、この変数を指定しなくても問題はありません。
    transformation_ctx = “datasource0”
    transformation_ctx = “applymapping1”
    transformation_ctx = “datasink4”

  • S3の結果整合性への対処
    ジョブ開始前に、以前のデータと不整合があるデータをジョブの対象とする(整合なデータは除外リストとして維持する)
    状態としてサイズも持っているということかもしれません。

  • 例えばあるファイルは処理対象か対象ではないのか?と言った詳細なブックマークの状態を見ることは現在はできません。

To Be Continue

todo

参考

Bookmarkの公式ページ
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-continuations.html

re:Invent資料。34ページあたりにBookMarkの細かい情報あり
https://www.slideshare.net/AmazonWebServices/abd315serverless-etl-with-aws-glue

parquet-toolsインストール
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

parquet-tools
https://github.com/apache/parquet-mr

続きを読む