Glueの使い方的な⑦(Step Functionsでジョブフロー)

Step FunctionsでGlueのジョブフローを作る

Glueの使い方的な③(CLIでジョブ作成)“(以後③と書きます)で書いたように、現在Glueのジョブスケジュール機能は簡易的なものなので、複雑なジョブフロー形成には別のスケジューラーが必要になる場合もあります。
例えばGlueのクローラーとGlueジョブもそれぞれにスケジュール機能があり統合したジョブフローを作ることがGlueだけでは出来ません(例えばクローラーを実行し終わったらジョブを実行するとか)。今回はサーバーレスなジョブフローのサービスであるStep Functionsを使って、クローラーを実行し正常終了したら後続のジョブを実行するというフローを作ってみます。

全体の流れ

  • Glue処理内容
  • StepFunctionsの処理内容
  • 前準備
  • Step FunctionsでStateMachine作成
  • 実行

処理内容

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)で実行したものと同じクローラーとジョブを使います。入力データも出力結果も①と同じです。
今回行うのはGlueクローラー処理が終わったら次のGlueジョブ処理開始というジョブフロー形成です。

あらためて①のクローラーとジョブの処理内容は以下の通りです

クローラーの内容

入力のCSVファイルからスキーマを作成します

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

Step Functionsを使ったジョブフローの内容

図の四角をStep Functionsでは”State”と呼びます。処理の1単位と思ってください。

ジョブフローは以下のような形です。

Stateごとに流れを説明します

  • “Submit Crawler Job”でLambdaを使いGlueクローラーを実行
  • “Wait X Seconds”で指定時間待つ
  • “Get Crawler Job Status”でLambdaを使いGlueクローラーの状態をポーリングして確認
  • “Job Complete?”で状態を判定して結果によって3つに処理が分岐
    • 失敗なら”Job Failed”エラー処理
    • 終了なら”Run Final Glue Job”でLambdaを使い後続のGlueジョブを実行
    • 処理中なら”Add Count”でLambdaを使いカウンタをインクリメント。
      • “Add Count”の後”Chk Count”でカウンタをチェックし3回以上になっていたら”Job Failed Timeout”でタイムアウト処理、3未満なら”Wait X Seconds”に戻りループ処理

スクリーンショット 0030-01-13 21.47.05.png

前準備

①と同じです

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

入力ファイルをS3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

ジョブのPySparkスクリプト

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

入力のCSVデータのスキーマ

クローラーによって作成されるスキーマ

スクリーンショット 0030-01-13 22.01.43.png

StepFunctionsでStateMachine作成

StepFunctionsは一連のジョブフローをJSONで定義しこれを”StateMachine”と呼びます。
StateMachine内の処理の1つ1つの四角をStateと呼びます。処理の1単位です。
このJSONの記述はASL(AmazonStatesLanguages)と呼ばれStateTypeとしてChoice(分岐処理)やWait(待ち)やParallel(並列実行)などがJSONだけで表現出来ます。またTaskというStateTypeからはLambdaやアクティビティ(EC2からStepFunctionsをポーリングする)を定義できます。前述の通り今回はLmabdaを使います。

マネージメントコンソールからいくつかあるテンプレートを元に作ることも出来ますが、カスタムでJSONを一から作ることもできます。

新規StateMachine作成画面
“Author from scrach”で一からJSON作成

スクリーンショット 0030-01-14 10.24.59.png

“Template”を選ぶとASLのStateパターンのいくつかのテンプレが選べます

スクリーンショット 0030-01-14 10.28.14.png

左側の”コード”部分にJSONを書き、右側の”ビジュアルワークフロー”の部分にJSONコードで書いたフローがビジュアライズされます

スクリーンショット 0030-01-14 10.29.50.png

StateMachine

今回のStateMachieのJSONは以下です。
内容は前述の通りです。
※[AWSID]のところは自身のAWSIDと置き換えてください

{
  "Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.",
  "StartAt": "Submit Crawler Job",
  "States": {
    "Submit Crawler Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-cr1",
      "ResultPath": "$.chkcount",
      "Next": "Wait X Seconds",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 120,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
    "Wait X Seconds": {
      "Type": "Wait",
      "SecondsPath": "$.wait_time",
      "Next": "Get Crawler Job Status"
    },
    "Get Crawler Job Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-crcheck",
      "Next": "Job Complete?",
      "InputPath": "$",
      "ResultPath": "$.response",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Job Complete?": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.response",
          "StringEquals": "FAILED",
          "Next": "Job Failed"
        },
        {
          "Variable": "$.response",
          "StringEquals": "READY",
          "Next": "Run Final Glue Job"
        }
      ],
      "Default": "Add Count"
        },
    "Add Count": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-addcount",
      "Next": "Chk Count",
      "InputPath": "$",
      "ResultPath": "$.chkcount",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Chk Count": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.chkcount",
          "NumericGreaterThan": 3,
          "Next": "Job Failed Timeout"
        }],
      "Default": "Wait X Seconds"
    },
    "Job Failed": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED"
    },
        "Job Failed Timeout": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED Because of Timeout"
    },
    "Run Final Glue Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-job1",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    }
  }
}

Lambda

今回使うLambdaは4つです。流れも振り返りながら見ていきます
書き方はいろいろあるし今回はエラーハンドリングも甘いのであくまでも動きのイメージをつかむための参考程度にしてください。最後のGlueジョブの実行についてはジョブの終了判定とかはしてないです。

“Submit Crawler Job”

GlueのAPIを使ってクローラーのStartを行う

glue-test1-cr1
# coding: UTF-8

import sys
import boto3
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_crawler(Name='se2_in0')
    return 1

“Wait X Seconds”

Waitで指定秒数待つ

“Get Crawler Job Status”

GlueのAPIを使ってクローラーのステータスを取得します

glue-test1-crcheck
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.get_crawler(Name='se2_in0')
    response = response['Crawler']['State']
    return response

“Job Complete?”

Choiceで取得したステータスが、”READY”なら正常終了、”FAILED”なら失敗、それ以外は実行中の分岐処理

“Job Failed”

ステータスが失敗なら
FailでStepFunctionsをエラーさせます

“Run Final Glue Job”

ステータスが正常終了なら
GlueのAPIを使ってジョブをStartします

glue-test1-job1
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_job_run(
    JobName='se2_job0')
    return response['JobRunId']

“Add Count”

クローラーがまだ実行中なら
カウンタにインクリメントします

glue-test1-addcount
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    chkcount = event["chkcount"]
    chkcount = chkcount + 1

    return chkcount

“Chk Count”

choiceでカウンタが3未満か3以上かをチェックします

“Job Failed Timeout”

Failでカウンタが3以上だった時のエラー処理

“Wait X Seconds”

3未満の場合はここに戻りループ処理

実行

Step Functionsを実行

作成したStateMachineを選び”新しい実行”をクリック

スクリーンショット 0030-01-14 10.54.54.png

JSONに引数を入れて”実行の開始”をクリック
今回はJSON内で使う変数で”wait_time”を60秒で待ちの時間として入力しています

スクリーンショット 0030-01-14 10.55.52.png

実行状況

スクリーンショット 0030-01-14 10.59.44.png

CloudWatchイベントでスケジュール

あとは上記で作成したStateMachineをCloudWatchイベントでCRON指定すれば定期的実行されるジョブフローの完成です。This is Serverless!

スクリーンショット 0030-01-13 22.34.00.png

その他

今回はクローラー実行後にジョブ実行というシンプルなフローでしたが、Step Functionsは並列度を替えたり引数の受け渡しをしたり、さらにLambdaでロジックを書くことができるので自由度高く複雑なフローの作成が行えます。Glueとの相性はいいのではないでしょうか?

JSON部分も30分もあれば学習完了というカジュアルさがありLambdaを使ってAPI操作で様々なAWSの処理を繋げるのにはとてもいい印象です。

かなりシンプルな処理だったのですがコードがやや多い印象で、より複雑な処理になると結構大きいJSONになりそうで、JSONなのでコメント書けないとか少し大変な部分が出て来るのかもしれません。

バージョン管理を考えるとCliでの処理で運用したほうが良さそうですが、こういったサービスはGUIでの良さもあるのでどちらに比重を置いた運用がいいかは考慮が必要かもです

本文中で使ったカウンタのステート情報はDynamoDBなどに入れた方が良いかもです。

マイクロサービス化しやすいので、極力本来の処理のロジックをLambda側にやらせてそれ以外のフロー処理(分岐とかカウンタインクリメントとか)をJSONで書くのがいいと思います。今回カウンタはLambdaでやってしまいましたが。

ログはCloudWatchLogsに出ます

To Be Continue

TODO

参考

StepFunctions BlackBelt資料
https://www.slideshare.net/AmazonWebServicesJapan/20170726-black-beltstepfunctions-78267693

続きを読む

Glueの使い方的な⑧(アップデート履歴)

自分のための履歴

抜け漏れあるかもなので参考程度に

履歴

20170815:Glue GA
https://aws.amazon.com/jp/blogs/news/launch-aws-glue-now-generally-available/

20170929:Supports Filter and Map transforms
https://aws.amazon.com/jp/about-aws/whats-new/2017/09/aws-glue-now-supports-filter-and-map-transforms/

2017101x:ジョブ中断できる

2017102x:New map filter
http://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-filter.html

20171024:CFn対応
https://aws.amazon.com/jp/about-aws/whats-new/2017/10/aws-glue-can-now-be-configured-using-aws-cloudformation-templates/

20171107:CloudTrail対応
https://aws.amazon.com/about-aws/whats-new/2017/11/aws-glue-api-calls-are-now-recorded-and-available-in-aws-cloudtrail/

20171221:アイルランドリージョンでリリース
https://aws.amazon.com/jp/about-aws/whats-new/2017/12/aws-glue-is-now-available-in-the-eu-ireland-aws-region/

20171223:東京リージョンリリース
https://aws.amazon.com/jp/about-aws/whats-new/2017/12/aws-glue-is-now-available-in-the-asia-pacific-tokyo-aws-region/

201801xx:View機能(Glue-Tablesの右上の”SaveView”があり検索条件を保存する機能)

スクリーンショット 0030-01-13 19.12.39.png

20180112:Scalaリリース
https://aws.amazon.com/jp/blogs/big-data/aws-glue-now-supports-scala-scripts/

公開されたアップデート予告

TODO

続きを読む

Glueの使い方的な⑥(監視モニタリング)

監視モニタリング概要

現状整っているとは言い難いので、他のサービスも含めた監視について考えてみる

全体の流れ

  • CloudWatchメトリクス
  • CloudWatchイベント
  • CloudWatchログ
  • APIで状態取得
  • 何を監視すべきか

CloudWatchメトリクス

ない

今後は追加されてくるかと思います

CloudWatchイベント

バージニアには以下のイベントがあります。
現状、”Job State change”と”Crawler state change”の2つ

スクリーンショット 0030-01-04 18.12.26.png

Job State changeはステートの全ての変化のみ

スクリーンショット 0030-01-04 18.12.39.png

Crawler state changeは任意のステートの変化
Failed
Started
Succeeded

スクリーンショット 0030-01-04 18.12.54.png

これらのイベントをフックに、Lambda動かしたり、SNSで通知したり、StepFunction動かしたり、SQSにキューしたりできます。

ただ、東京とアイルランドにはこのGlueのイベントがありません↓

スクリーンショット 0030-01-04 18.28.00.png

CloudWatchログ

GlueはCloudWatchに出力します。ログに関してはCloudWatchのログ監視が使えます
出力ログは主にSparkのログを”error”と”output”で2種類です

一般的なCloudWatchのログに対してのアラーム設定手順になります

CloudWatchのログの画面に行く。
“aws-glue/jobs/error”のロググループにチェックを入れ、”メトリクスフィルタ”ボタンをクリック

スクリーンショット 0030-01-06 10.18.42.png

ログメトリクスフィルタの定義の画面です
フィルタする文字列パターンに文字列を入れ”パターンのテスト”をクリックすればフィルタの確認が出来ます。結果は一番下に出力されます。
テストするログデータも変更できます。
画面はテストなので”INFO”でフィルタしています。サンプルから1件フィルタできてることがわかります。
エラーログなのにINFOが出てます。。
確認ができたら右下の”メトリクスの割り当て”をクリックします。

スクリーンショット 0030-01-06 10.20.48.png

フィルタの名前、メトリクス名などを任意の名前で入力して、右下の”フィルタの作成”をクリックします

スクリーンショット 0030-01-06 10.20.55.png

フィルタが作成されました

スクリーンショット 0030-01-06 10.21.43.png

最初のCloudWatchログの画面に行くと、”aws-glue/jobs/error”ロググループの右側に”1フィルタ”となっています
ここをクリックします

スクリーンショット 0030-01-06 10.22.19.png

右上の”アラームの作成”をクリックします

スクリーンショット 0030-01-06 10.22.44.png

いつものアラーム設定画面となります

スクリーンショット 0030-01-06 10.23.03.png

APIで状態取得

Glueの使い方的な③(CLIでジョブ作成)“(以後③と書きます)でも触れたようにAPIにアクセスして状態の取得ができます。他のAWSリソースももちろん同様のことができます。

get系のAPIが使えます

クローラーの状態取得

例えばクローラーのgetで取得できる情報は

$ aws glue get-crawler --name se2_in0
{
    "Crawler": {
        "CrawlElapsedTime": 0, 
        "Name": "se2_in0", 
        "CreationTime": 1514874041.0, 
        "LastUpdated": 1514874041.0, 
        "Targets": {
            "JdbcTargets": [], 
            "S3Targets": [
                {
                    "Path": "s3://test-glue00/se2/in0", 
                    "Exclusions": []
                }
            ]
        }, 
        "LastCrawl": {
            "Status": "SUCCEEDED", 
            "LogStream": "se2_in0", 
            "MessagePrefix": "903fa0e1-2874-4b50-a686-660d2da54004", 
            "StartTime": 1515146760.0, 
            "LogGroup": "/aws-glue/crawlers"
        }, 
        "State": "READY", 
        "Version": 1, 
        "Role": "test-glue", 
        "DatabaseName": "se2", 
        "SchemaChangePolicy": {
            "DeleteBehavior": "DEPRECATE_IN_DATABASE", 
            "UpdateBehavior": "UPDATE_IN_DATABASE"
        }, 
        "TablePrefix": "se2_", 
        "Classifiers": []
    }
}

ここから状態だけを取ってきたければ以下のような感じでステータスを見て成否判定することは出来ます。

$ aws glue get-crawler --name se2_in0 | jq -r .Crawler.LastCrawl.Status
SUCCEEDED

ジョブの状態取得

ジョブも同じ要領ですが、ステータスを得るためにget-job-runにRunIDを渡して上げる必要があります

RunIDはstart-jobでリターン値で得られます

$ aws glue start-job-run --job-name se2_job0
{
    "JobRunId": "jr_711b8b157e3b36a1dc1a48c87c5e8b00c509150cc4f9d7d7106009e57f2cac9b"
}

また、get-job-runsでジョブの履歴から取ることもできます

$ aws glue get-job-runs --job-name se2_jobx 
{
    "JobRuns": [
        {
            "LastModifiedOn": 1514440793.923, 
            "StartedOn": 1514440639.623, 
            "PredecessorRuns": [], 
            "Attempt": 0, 
            "JobRunState": "SUCCEEDED", 
            "JobName": "se2_jobx", 
            "Arguments": {
                "--job-bookmark-option": "job-bookmark-disable"
            }, 
            "AllocatedCapacity": 10, 
            "CompletedOn": 1514440793.923, 
            "Id": "jr_1b3c00146b02e36e4682f352a084a2fd37931967346b44a7c39ad182347957d3"
        }, 
        {
            "LastModifiedOn": 1514440548.4, 
            "StartedOn": 1514440394.07, 
            "PredecessorRuns": [], 
            "Attempt": 0, 
            "JobRunState": "FAILED", 
            "ErrorMessage": "An error occurred while calling o54.getCatalogSource. No such table: se4.se04_in", 
            "JobName": "se2_jobx", 
            "Arguments": {
                "--job-bookmark-option": "job-bookmark-disable"
            }, 
            "AllocatedCapacity": 10, 
            "CompletedOn": 1514440548.4, 
            "Id": "jr_aca8a8c587b0986d040aba7eadc7b216e83db409f842cb9d2912c400b181c907"
        }
    ]
}

取得できたRunIDを使ってステータスを取ります

$ aws glue get-job-run --job-name se2_job0 --run-id jr_dff0ac334e5c5bf3043acc5158f9c3bc1f9c8eae048e053536581278ec34a063 
{
    "JobRun": {
        "LastModifiedOn": 1514875561.077, 
        "StartedOn": 1514875046.406, 
        "PredecessorRuns": [], 
        "Attempt": 0, 
        "JobRunState": "SUCCEEDED", 
        "JobName": "se2_job0", 
        "Arguments": {
            "--job-bookmark-option": "job-bookmark-disable"
        }, 
        "AllocatedCapacity": 10, 
        "CompletedOn": 1514875561.077, 
        "Id": "jr_dff0ac334e5c5bf3043acc5158f9c3bc1f9c8eae048e053536581278ec34a063"
    }
}
$ aws glue get-job-run --job-name se2_job0 --run-id jr_dff0ac334e5c5bf3043acc5158f9c3bc1f9c8eae048e053536581278ec34a063 | jq .JobRun.JobRunState -r
SUCCEEDED

ポーリング型とはなりますが、定期的に状態を確認するというやり方もあります

何を監視すべきか

Glueはバッチ処理で多くの場合ジョブフローを形成する中の一部として使われると思います

何を見るべきかはいろんな意見や視点がありますし業務のサービスレベルでも違うので一概に言えないですが、お勧めとして

まずはジョブの成否を見ます。

“CloudWatchイベント”でも述べたように現状Glueジョブのイベントタイプは”全てのステータスチェンジ”の1つしかないので、ジョブが失敗したらアラートを飛ばすということがCloudWatchだけだと出来ません。本記事の”APIで状態取得”で述べたような方法を使って、ジョブ実行した後非同期でステータスを確認する。またはCloudWatchイベントでJobStateChangeのイベントでLambdaを起動してGlueのAPIを叩いてステータス確認するとよりタイムラグない監視になると思います。

多くの場合ジョブフローを形成し、1つのジョブステップを処理の最小単位とするので、その成否の確認はロールバックし易さや業務影響などの考慮が入っています。その単位でジョブの失敗に気づき、そしてより詳細な調査が必要な場合にログを確認していくのが1つのパターンと思います
このジョブフローをサーバーレスで行うサービスにAWSのStepFunctionというサービスがあります。Glueとも相性がいいのでまた今度書いてゆきます

エラーは吐かないが実行時間が長い

これも気づきたいポイントになると思います。
ジョブのスタート時間は取れるので、そこから経過した時間を算出し、通常1時間のジョブが2時間を超えたらアラートを上げる
などもいいと思います。

$ aws glue get-job-run --job-name se2_job0 --run-id jr_dff0ac334e5c5bf3043acc5158f9c3bc1f9c8eae048e053536581278ec34a063 | jq .JobRun.StartedOn -r
1514875046.406

ログ監視

上記の監視運用を繰り返していくと、ジョブが失敗してログ調査をしてこのログは検知したいといった知見が溜まっていきます。
その場合は本記事の”CloudWatchログ”でも述べたやり方でログ監視してください。ジョブが失敗することには変わらないですがログのアラートも飛んでくることで既知の問題であることが即座にわかりトラブルシュートが格段に早まります

To Be Continue

  • StepFunctionでGlueのジョブフローを作るを書く

こちらも是非

CloudWatchログ監視
http://www.d2c-smile.com/201609137826#a1

続きを読む

Glueの使い方的な

Glueのすぐ使えそうな操作

1.Glueの使い方的な①(GUIでジョブ実行)
GUIだけでcsv->parquet変換処理を作る

2.Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)
元データにタイムスタンプが入ってるデータを、パーティションによるディレクトリ構成にしてデータ配置とフォーマットなど変換

3.Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)
CLIでジョブ作成操作。いろんなスケジューラーとの連携想定

4.Glueの使い方的な④(ブックマーク)
Glueのブックマーク機能を使って重複した処理を防ぐ

5.Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)
パーティション分割して配置されてるcsvを同じパーティション分割してparquetにする

6.Glueの使い方的な⑥(監視モニタリング)
Glueの監視

7.Glueの使い方⑥(StepFunctionsでジョブフロー)

8.いろんな変換パターン
XMLの変換とか

9.いろんな変換パターン

10.いろんな変換パターン

11.todo

続きを読む

カテゴリー 未分類 | タグ

Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)

パーティション分割csv->パーティション分割parquet

ジョブの内容

※”Glueの使い方①(GUIでジョブ実行)”(以後①とだけ書きます)と同様のcsvデータを使います

“パーティション分割されたcsvデータを同じパーティションで別の場所にparquetで出力する”

ジョブ名

se2_job4

クローラー名

se2_in1
se2_out3

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

内容としては①と同じデータで、year,month,day,hourのパーティションごとに分けたcsvファイルを配置します。
year,month,day,hourのカラムは削除しています。

元となる①のデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

今回使う入力データ(19件)
year,month,day,hourのカラムは削除しています

$ ls
cvlog_2017111112.csv    cvlog_2017120101.csv    cvlog_2017121621.csv
cvlog_2017111414.csv    cvlog_2017120218.csv    cvlog_2017121714.csv
cvlog_2017112114.csv    cvlog_2017120904.csv    cvlog_2017121718.csv
cvlog_2017112915.csv    cvlog_2017121412.csv    cvlog_2017121908.csv
cvlog_2017113014.csv    cvlog_2017121414.csv    cvlog_2017121914.csv
cvlog_2017113020.csv    cvlog_2017121511.csv    cvlog_2017122915.csv
$ cat *
deviceid,uuid,appid,country
iphone,11121,001,JP
deviceid,uuid,appid,country
iphone,11123,009,FR
deviceid,uuid,appid,country
iphone,11119,007,AUS
deviceid,uuid,appid,country
other,11110,005,JP
iphone,11125,005,JP
deviceid,uuid,appid,country
iphone,11129,007,AUS
deviceid,uuid,appid,country
android,11122,001,FR
deviceid,uuid,appid,country
pc,11118,001,FR
deviceid,uuid,appid,country
pc,11117,009,FR
deviceid,uuid,appid,country
iphone,11128,009,FR
deviceid,uuid,appid,country
iphone,11111,001,JP
deviceid,uuid,appid,country
android,11112,001,FR
deviceid,uuid,appid,country
iphone,11116,001,JP
deviceid,uuid,appid,country
iphone,11113,009,FR
deviceid,uuid,appid,country
iphone,11124,007,AUS
deviceid,uuid,appid,country
iphone,11114,007,AUS
deviceid,uuid,appid,country
iphone,11126,001,JP
deviceid,uuid,appid,country
android,11127,001,FR
deviceid,uuid,appid,country
other,11115,005,JP

データの場所
year,month,day,hourのパーティションごとに分けたcsvファイルを配置しています

# aws s3 ls s3://test-glue00/se2/in1/year=2017/ --recursive
2018-01-04 09:38:13          0 se2/in1/year=2017/
2018-01-04 09:40:12          0 se2/in1/year=2017/month=11/
2018-01-04 09:40:38          0 se2/in1/year=2017/month=11/day=11/
2018-01-04 09:41:23          0 se2/in1/year=2017/month=11/day=11/hour=12/
2018-01-04 10:19:28         48 se2/in1/year=2017/month=11/day=11/hour=12/cvlog_2017111112.csv
2018-01-04 09:40:42          0 se2/in1/year=2017/month=11/day=14/
2018-01-04 09:41:41          0 se2/in1/year=2017/month=11/day=14/hour=14/
2018-01-04 10:19:45         48 se2/in1/year=2017/month=11/day=14/hour=14/cvlog_2017111414.csv
2018-01-04 09:40:47          0 se2/in1/year=2017/month=11/day=21/
2018-01-04 09:41:55          0 se2/in1/year=2017/month=11/day=21/hour=14/
2018-01-04 10:20:01         49 se2/in1/year=2017/month=11/day=21/hour=14/cvlog_2017112114.csv
2018-01-04 09:40:50          0 se2/in1/year=2017/month=11/day=29/
2018-01-04 09:42:09          0 se2/in1/year=2017/month=11/day=29/hour=15/
2018-01-04 10:20:22         67 se2/in1/year=2017/month=11/day=29/hour=15/cvlog_2017112915.csv
2018-01-04 09:41:01          0 se2/in1/year=2017/month=11/day=30/
2018-01-04 09:42:22          0 se2/in1/year=2017/month=11/day=30/hour=14/
2018-01-04 10:20:41         49 se2/in1/year=2017/month=11/day=30/hour=14/cvlog_2017113014.csv
2018-01-04 09:42:40          0 se2/in1/year=2017/month=11/day=30/hour=20/
2018-01-04 10:20:52         49 se2/in1/year=2017/month=11/day=30/hour=20/cvlog_2017113020.csv
2018-01-04 09:40:16          0 se2/in1/year=2017/month=12/
2018-01-04 09:43:11          0 se2/in1/year=2017/month=12/day=1/
2018-01-04 09:45:16          0 se2/in1/year=2017/month=12/day=1/hour=1/
2018-01-04 10:21:19         44 se2/in1/year=2017/month=12/day=1/hour=1/cvlog_2017120101.csv
2018-01-04 09:43:21          0 se2/in1/year=2017/month=12/day=14/
2018-01-04 09:46:50          0 se2/in1/year=2017/month=12/day=14/hour=12/
2018-01-04 10:22:28         48 se2/in1/year=2017/month=12/day=14/hour=12/cvlog_2017121412.csv
2018-01-04 09:47:01          0 se2/in1/year=2017/month=12/day=14/hour=14/
2018-01-04 10:22:51         49 se2/in1/year=2017/month=12/day=14/hour=14/cvlog_2017121414.csv
2018-01-04 09:43:38          0 se2/in1/year=2017/month=12/day=15/
2018-01-04 09:47:11          0 se2/in1/year=2017/month=12/day=15/hour=11/
2018-01-04 10:23:12         48 se2/in1/year=2017/month=12/day=15/hour=11/cvlog_2017121511.csv
2018-01-04 09:43:42          0 se2/in1/year=2017/month=12/day=16/
2018-01-04 09:47:23          0 se2/in1/year=2017/month=12/day=16/hour=21/
2018-01-04 10:23:34         48 se2/in1/year=2017/month=12/day=16/hour=21/cvlog_2017121621.csv
2018-01-04 09:43:45          0 se2/in1/year=2017/month=12/day=17/
2018-01-04 09:47:38          0 se2/in1/year=2017/month=12/day=17/hour=14/
2018-01-04 10:23:54         49 se2/in1/year=2017/month=12/day=17/hour=14/cvlog_2017121714.csv
2018-01-04 09:47:43          0 se2/in1/year=2017/month=12/day=17/hour=18/
2018-01-04 10:24:12         49 se2/in1/year=2017/month=12/day=17/hour=18/cvlog_2017121718.csv
2018-01-04 09:43:49          0 se2/in1/year=2017/month=12/day=19/
2018-01-04 09:48:04          0 se2/in1/year=2017/month=12/day=19/hour=14/
2018-01-04 10:25:07         49 se2/in1/year=2017/month=12/day=19/hour=14/cvlog_2017121914.csv
2018-01-04 09:47:59          0 se2/in1/year=2017/month=12/day=19/hour=8/
2018-01-04 10:24:56         48 se2/in1/year=2017/month=12/day=19/hour=8/cvlog_2017121908.csv
2018-01-04 09:43:15          0 se2/in1/year=2017/month=12/day=2/
2018-01-04 09:45:44          0 se2/in1/year=2017/month=12/day=2/hour=18/
2018-01-04 10:21:41         44 se2/in1/year=2017/month=12/day=2/hour=18/cvlog_2017120218.csv
2018-01-04 09:43:52          0 se2/in1/year=2017/month=12/day=29/
2018-01-04 09:48:17          0 se2/in1/year=2017/month=12/day=29/hour=15/
2018-01-04 10:25:26         47 se2/in1/year=2017/month=12/day=29/hour=15/cvlog_2017122915.csv
2018-01-04 09:43:18          0 se2/in1/year=2017/month=12/day=9/
2018-01-04 09:46:24          0 se2/in1/year=2017/month=12/day=9/hour=4/
2018-01-04 10:22:02         48 se2/in1/year=2017/month=12/day=9/hour=4/cvlog_2017120904.csv

S3のディレクトリ構成

Glueジョブの入力データは”in1″ディレクトリ配下、出力は”out3″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE in1/
                           PRE out0/
                           PRE out1/
                           PRE out2/
                           PRE out3/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

入力データ用に新しくクローラーを作り実行してテーブルを作ります。

出来上がるテーブルの情報は以下です。

スクリーンショット 0030-01-04 10.52.04.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job4ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”パーティション分割されたcsvデータを同じパーティションで別の場所にparquetとして出力する”です。

se2_job4
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションをマッピング対象のカラムとして含めてくれません
入力のパーティションのカラムをマップの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

↓↓↓

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job4_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out3/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
###add

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-04 15.54.04.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/
2018-01-04 15:15:41        926 part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet .
download: s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet to ./part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# ls
part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out3でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-04 15.59.33.png

S3の出力パスを入力
形式の違うデータが混在しているとテーブルが複数できてしまうので、不要なものがあれば、excludeで除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-04 16.00.06.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.19.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-04 16.00.28.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.36.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-04 16.00.49.png

クローラー実行

1つのテーブルとして認識している

スクリーンショット 0030-01-04 16.09.01.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-04 16.09.52.png

Athenaから確認

左メニューからse2_out3のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-04 16.11.18.png

件数も19件で合っている

スクリーンショット 0030-01-04 16.11.01.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばappidというカラムがあるので、アプリごとに集計をするようなケースが多いならappidも含めてパーティション分割する

他にもdeviceidとかでデバイスごとに集計したり
一時的な調査にも役立つかも

出力に”out4″ディレクトリ作成
①と同様のジョブをse2_job5で作成
PySparkに以下3点修正したジョブ作成

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションはカラムとして含めてくれません。
入力のパーティションのカラムをapplyの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['appid','year','month','day','hour']
output='s3://test-glue00/se2/out4/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

appidごとにパーティション別れてます

スクリーンショット 0030-01-04 16.30.35.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-04 16.33.32.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-04 16.33.49.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-04 16.34.10.png

件数も同じく19件

スクリーンショット 0030-01-04 16.34.39.png

その他

todo

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/latest/sql-programming-guide.html

続きを読む

Glueの使い方的な④(ブックマーク)

ブックマークを使って一度処理したデータは処理対象外とする

入力パスの先にデータがあっても既に処理済なデータは除外したい。そんな思いになったことありませんでしょうか?

これを実現する機能がGlueの”ブックマーク”です。

全体の流れ

  • 前提
  • ブックマークの効果を見る
  • ジョブの永続的なブックマーク有効化
  • トリガー側のブックマーク有効無効
  • (おまけの考察)

前提

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)、”Glueの使い方的な③(CLIでジョブ実行)“(以後③と書きます)あたりを読んでいただけるとスムーズです

今回扱うジョブは①と同じ内容です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

③の後ろの方で書いたように、このジョブはGlueのGUIだけで作成し、1つのcsvを1つのparquetにするだけのなので、このジョブを2回実行すると同じ内容の出力が2つ出来てしまいます。

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

*①と同じディレクトリ
Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

ブックマークの効果を見る

①や③でジョブを実行して出力ファイルがout0に既に存在しているのでそれを事前に消しておきます。

想定される流れと結果

1回目の実行で、①と同じくparquetの出力ファイルが1つできる。(あとメタデータ2つ)
こんな感じ

2018-01-02 10:44:04        782 _common_metadata
2018-01-02 10:44:04       1746 _metadata
2018-01-02 10:27:05       2077 part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet

ブックマークを有効化することで、2回目の実行で、既に処理した入力ファイルは処理されない為、新たなparquetファイルは出力されず1つのままになります

やってみる

ジョブは①で作ったのと同じ内容で新たにジョブをse2_job2という名前で作ります
ジョブの内容は以下です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

最初のジョブの状態

実行履歴が1件ありますが気にしないでください。
スクリーンショット 0030-01-02 17.38.20.png

1回目実行

ブックマークを有効にして実行する
“Action”の”Run job”をクリックする

※ちなみにこの画面のActionの”Reset job bookmark”でブックマークに保持された状態をリセットできます

スクリーンショット 0030-01-02 17.39.21.png

Parameter(optional)の画面で”Job bookmark”を”Enable”にして”Run job”をクリックする
今回のみ有効なパラメータとしてジョブを実行します。

スクリーンショット 0030-01-02 17.39.06.png

成功して履歴が1つ増えています
スクリーンショット 0030-01-02 17.49.46.png

S3にparquetファイル1つとメタデータ2つがほぼ同じ時刻で作成されています。

スクリーンショット 0030-01-02 17.50.06.png

続いて同じ手順で2回目を実行します。

実行後のS3はメタデータは更新されている(?)
ただ、肝心のparquetファイルは1つのみで、1度処理した入力データを対象外としていることがわかる
※メタデータの内容は後半で確認してみます

スクリーンショット 0030-01-02 17.55.50.png

このようにGlueのブックマークを有効にしておくと、同じ場所にあるソースデータの中でまだ処理していないデータだけを処理対象とすることができます。

ジョブ側の永続的なブックマーク有効化

さっきまではジョブ実行時の1度だけのブックマーク有効でした
今回は永続的なブックマーク有効化をします。

対象ジョブにチェックを入れ、”Action”をクリックし、”Edit job”をクリックする

スクリーンショット 0030-01-02 17.53.30.png

この画面が出て一見ブックマークの設定箇所がなさそうだが、実は下にスクロールできる

スクリーンショット 0030-01-02 17.53.49.png

あった

スクリーンショット 0030-01-02 17.54.07.png

“Enable”に変更して”Save”
するとエラーが、、”temporary directory”と”IAM role”をこのタイミングでも入れないといけなようです。
それぞれ入力後”Save”をクリックします。

スクリーンショット 0030-01-02 17.54.20.png

画面の右端にあるように”Job bookmark”が”Enable”になっているのがわかります。

スクリーンショット 0030-01-02 18.11.49.png

トリガー側のブックマーク有効無効

ブックマークの有効無効確認

対象のトリガーのse2_trigger2の部分をクリック

スクリーンショット 0030-01-02 18.18.50.png

以下の画面になり、Parametersのところに”–job-bookmark-option:job-bookmark-enable”とあればブックマークが有効になっている

スクリーンショット 0030-01-02 18.19.04.png

ブックマーク有効無効変更

対象のトリガーse2_trigger2にチェックを入れ、”Action”をクリックし”Edit trigger”をクリックする

スクリーンショット 0030-01-02 18.16.45.png

“Next”をクリックする

スクリーンショット 0030-01-02 18.16.59.png

画面下の部分に”Job bookmark”がありDisable、Enable、Pauseの3つの状態が選べる
これを選びNextと進めればよさそうだが、それだけだとダメである

スクリーンショット 0030-01-02 18.17.16.png

画面下部にあるKeyとValueの箇所の”job-bookmark-enable”を事前に消しておく必要がある。右側の✖をクリックすることで消せます。消した後に”Job bookmark”をDisableにして”Next”をクリックし、次にサマリが出るので問題なければ”Finish”をクリックする

スクリーンショット 0030-01-02 18.17.39.png

確認すると”–job-bookmark-option:job-bookmark-disable”になっていて無効に変わったことがわかる

スクリーンショット 0030-01-02 18.33.18.png

(ブックマーク有効にしてジョブ実行してもメタデータが更新された件の考察)

結果的にはparquetの仕様っぽいので気にする必要もなさそうです。

メタを詳細に見るためにparquet-toolsを入れます。
このツール相変わらず普通にビルドできない・・

ここは弊社の担々麺デカの力を借りて無事入りました。(+1タンタンメン)
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

とは言え改善してるんじゃと思い、ちょっと脱線しますが

【parquet-toolsインストール2018年版】

github
また(?)リポジトリ変わってます・・
https://github.com/apache/parquet-mr

最新のv1.8.0、v1.8.1はpom.xmlをゴニョゴニョしないとダメだ
Issueは多分こちら
https://issues.apache.org/jira/browse/PARQUET-1129

v1.7はなぜかなく、結果的にv1.6系最後(?)の1.6.0rc7だとすんなりビルドできました。

jdkインストール

yum -y install java-1.7.0-openjdk-devel

mavenインストール

wget http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar xvfz apache-maven-3.3.9-bin.tar.gz 
./apache-maven-3.3.9/bin/mvn -version

.bash_profileとかでパス通す

PATH=$PATH:$HOME/bin:/root/apache-maven-3.3.9/bin

parquet-toolsインストール

git clone https://github.com/apache/parquet-mr.git
cd parquet-mr
git checkout ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7
cd parquet-tools/
mvn clean package -Plocal 

実行

aws s3 cp s3://test-glue00/se2/out0/ .data/ --recursive

java -jar target/parquet-tools-1.6.0rc7.jar head -n 1 data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
deviceid = iphone
uuid = 11111
appid = 1
country = JP
year = 2017
month = 12
day = 14
hour = 12

parquet-toolsでメタ情報も見れます。
ブックマークを使ったのにメタデータは更新があったので、更新前と後のメタの情報を比較してみます。

メタ情報表示

# java -jar target/parquet-tools-1.6.0rc7.jar meta data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
file:        file:/root/parquet/parquet-mr/parquet-tools/data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"deviceid","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"long","nullable":true,"metadata":{}},{"name":"appid","type":"long","nullable":true,"metadata":{}},{"name":"country","type":"string","nullable":true,"metadata":{}},{"name":"year","type":"long","nullable":true,"metadata":{}},{"name":"month","type":"long","nullable":true,"metadata":{}},{"name":"day","type":"long","nullable":true,"metadata":{}},{"name":"hour","type":"long","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
deviceid:    OPTIONAL BINARY O:UTF8 R:0 D:1
uuid:        OPTIONAL INT64 R:0 D:1
appid:       OPTIONAL INT64 R:0 D:1
country:     OPTIONAL BINARY O:UTF8 R:0 D:1
year:        OPTIONAL INT64 R:0 D:1
month:       OPTIONAL INT64 R:0 D:1
day:         OPTIONAL INT64 R:0 D:1
hour:        OPTIONAL INT64 R:0 D:1

row group 1: RC:19 TS:952 OFFSET:4 
--------------------------------------------------------------------------------
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 ENC:PLAIN,BIT_PACKED,RLE
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE

更新前のメタ情報:/tmp/data1/meta.log
更新後のメタ情報:/tmp/data2/meta.log
diffは見切れちゃってますが、差分は以下の並びが違うということでした。内容は同じです。
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
もう1回ジョブを実行した場合、上記の並びは同じでしたがタイムスタンプは最新になってました。
そういうもののようです;

# sdiff -s /tmp/data1/meta.log /tmp/data2/meta.log 
file:        file:/root/parquet/parquet-mr/parquet-tools/data | file:        file:/root/parquet/parquet-mr/parquet-tools/data
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E | deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 | uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19  | appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19  | country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E | year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E | month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 | day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 | hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19

その他

  • Glueで自動生成されるPySparkコードに以下のようなコンテキストオブジェクトがあります。これにはブックマークとしての意味もあり、それぞれの実行時にソース、変換、およびシンクの状態をブックマークの状態のキーとして使用します。状態はタイムスタンプとして記録し維持します。
    なのでブックマークを使用していない場合は、この変数を指定しなくても問題はありません。
    transformation_ctx = “datasource0”
    transformation_ctx = “applymapping1”
    transformation_ctx = “datasink4”

  • S3の結果整合性への対処
    ジョブ開始前に、以前のデータと不整合があるデータをジョブの対象とする(整合なデータは除外リストとして維持する)
    状態としてサイズも持っているということかもしれません。

  • 例えばあるファイルは処理対象か対象ではないのか?と言った詳細なブックマークの状態を見ることは現在はできません。

To Be Continue

todo

参考

Bookmarkの公式ページ
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-continuations.html

re:Invent資料。34ページあたりにBookMarkの細かい情報あり
https://www.slideshare.net/AmazonWebServices/abd315serverless-etl-with-aws-glue

parquet-toolsインストール
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

parquet-tools
https://github.com/apache/parquet-mr

続きを読む

Glueの使い方的な③(CLIでジョブ作成)

CLIによる操作でGlueジョブを作る

Glueの使い方①(GUIでジョブ実行)“(以後①と書きます)で書いたように、現在GlueではGUIからジョブのコピーができないので、テスト時やデプロイ時などにもCLIでのジョブ操作が便利な場面があります

今回は①で実行したジョブをCLIで作成します

IAM role

ジョブ作成のコマンド発行するノードに付与するIAM roleもこの時に使ったtest-glueを使います。今回手元ではMacのPCでしたが本来だとジョブの作成や変更操作を行うスケジューラーなどに付与するIAM roleになると思います。
付与されるポリシーはこの2つ
・AmazonS3FullAccess
・AWSGlueServiceRole

全体の流れ

  • 前準備
  • CLIでジョブ作成
  • トリガー作成

前準備

①で使ったジョブで実行されているPySparkスクリプトを持ってきます

Glueのジョブで実行されるスクリプトはここにあります

AWSマネージメントコンソールからGlueをクリック、左側メニューのETLの”Jobs”をクリック
対象ジョブにチェックを入れ、”Action”をクリックし”Edit job”をクリック

スクリーンショット 0030-01-01 18.39.07.png

ジョブの内容が表示されます

Script pathに入力されているS3のパスが、このジョブで実行されるPySparkスクリプトの保存先です。
デフォルトだと以下の場所にスクリプトは保存されます。今回はデフォルトのままです。

s3://aws-glue-scripts-[AWSアカウントID]-[リージョン名]/[ユーザー名]/[ジョブ名]

スクリーンショット 0030-01-02 15.54.51.png

ローカルにもってくる

このファイルをローカルにダウンロードておきます。
ダウンロードしたPySparkスクリプトは前回GUIのみで操作して作られたスクリプトです。
処理内容は、”指定したS3にあるcsvファイルを指定したS3にparquetとして出力する”というものです

se2_job0.txt
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

CLIでジョブ作成

今回はAWS Cliを使います(他の各言語のSDKでも同じ操が作可能です)

まずawscliが古いとglueの操作ができないのでupgradeしておきましょう

pip install awscli --upgrade

Cliによるジョブ作成は、先程ダウンロードしたPySparkスクリプトファイルをリネームします。se2_job0.txtをse2_job2.pyにします。それをS3の任意の場所(今回はs3://test-glue00/se2/script/)にアップロードし、JSON形式でそこを指定してジョブ作成します

今回のジョブ作成に使ったJSON

test.json
{
    "Name": "se2_job2", 
    "Description": "test", 
    "Role": "test-glue", 
    "ExecutionProperty": {
        "MaxConcurrentRuns": 1
    }, 
    "Command": {
        "Name": "glueetl", 
        "ScriptLocation": "s3://test-glue00/se2/script/se2_job2.py"
    }, 
    "MaxRetries": 0, 
    "AllocatedCapacity": 5
}
  • Name
    ジョブ名

  • AllocatedCapacity
    “The number of AWS Glue data processing units (DPUs) to allocate to this Job.From 2 to 100 DPUs can be allocated; the default is 10.”
    とあるようにこのジョブに割り当てるDPUを指定します。2-100で指定。デフォは10
    https://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html

  • ExecutionPropertyのMaxConcurrentRuns
    ジョブの最大同時実行数。デフォは1

  • ScriptLocation
    PySparkスクリプトファイルの保存場所

  • MaxRetries
    ジョブの最大リトライ数。デフォは0
    ※別途スケジューラーを使ってジョブを実行してるならリトライ制御はスケジューラー側に任せたほうがよいかも

  • CommandのName
    “glueetl”でなければなりません。固定のようです

スケルトン出力

他のCLIと同じく補助機能でJSONのスケルトンを作るコマンドもあります。

cli
$ aws glue create-job --generate-cli-skeleton 
{
    "Name": "", 
    "Description": "", 
    "LogUri": "", 
    "Role": "", 
    "ExecutionProperty": {
        "MaxConcurrentRuns": 0
    }, 
    "Command": {
        "Name": "", 
        "ScriptLocation": ""
    }, 
    "DefaultArguments": {
        "KeyName": ""
    }, 
    "Connections": {
        "Connections": [
            ""
        ]
    }, 
    "MaxRetries": 0, 
    "AllocatedCapacity": 0
}

実行結果

こんな感じで実行します
以下実行結果です

cli
$ aws glue create-job --cli-input-json file://test.json
{
    "Name": "se2_job2"
}

コマンドラインに他の引数があればJSONを上書きします。ファイルよりコマンドの引数の方が強いです。
ベースのJSONを作っておいて、DPUなどを状況に応じて大きくするとかいいと思います

引数では以下を指定できます

$ aws glue create-job help

CREATE-JOB()                                                      CREATE-JOB()



NAME
       create-job -

DESCRIPTION
       Creates a new job.

       See also: AWS API Documentation

SYNOPSIS
            create-job
          --name <value>
          [--description <value>]
          [--log-uri <value>]
          --role <value>
          [--execution-property <value>]
          --command <value>
          [--default-arguments <value>]
          [--connections <value>]
          [--max-retries <value>]
          [--allocated-capacity <value>]
          [--cli-input-json <value>]
          [--generate-cli-skeleton <value>]

GUIからもジョブが作成されていることがわかります。

スクリーンショット 0030-01-02 16.05.41.png

ジョブを実行

対象ジョブのse2_job2をチェックし、Actionの”Run job”をクリックします。
画面のように正常に完了しています。
スクリーンショット 0030-01-02 16.10.18.png

同じ結果がS3へ出力されています。
このようにコマンドであれば同じジョブを作りやすいので、DPUだけ変えるとか、テストのためにテストデータの入力や出力だけ変えて同じ処理を実行するなど、パラメータの一部だけ変更したジョブを作りやすいです。

ただ、画面からわかるように今回の出力のparquetファイル1つとその他メタデータが最新の日付で出力と更新がされていますが、コピー元ジョブで実行したparquetファイルが1つ残っています。

スクリーンショット 0030-01-02 16.12.04.png

コピー元ジョブの処理内容は

“S3にある1つのcsvファイルをparquetにしてS3に出力する”という処理でした。

Athenaでクエリ実行すると件数が倍の38件で、出力が重複していることがわかります。(元データは19件でした)

スクリーンショット 0030-01-02 16.17.43.png

ジョブフローを設計する時やスケジューラーの機能などで、処理したデータはムーブしたり消したりする場合もありますし、処理対象のディレクトリをタイムスタンプなどで判別して古いデータは処理対象から除外する場合もあると思います。どういったジョブフローにしているかに依存する部分かもですが、Glueにはこういった事象を防ぐブックマークという機能があります。ブックマーク機能を有効にすると既に処理したデータを処理の対象外とすることができます。
これはまたの機会で書けたらと思います。

トリガー作成

Glueには簡易的なスケジュール機能にTriggerがあります。
以下3つのスケジューリングができます。今回はCRON形式で作成してみます
選べるTrigger Typeは以下3つです
・CRON形式
・前のジョブが完了したら実行
・手動実行

スクリーンショット 0030-01-02 16.36.19.png

このトリガーの対象とするジョブを選びます。ジョブ名の横にある”Add”をクリックすることで選択できます。
“Next”をクリックし最後にサマリがでるので問題なければ”Finish”をクリックします。

スクリーンショット 0030-01-02 16.37.22.png

Triggerのコマンドライン操作もいくつか書いておきます。

先程作ったse2_trigger2トリガーの内容表示

get-trigger
$ aws glue get-trigger --name se2_trigger2
{
    "Trigger": {
        "Predicate": {}, 
        "Name": "se2_trigger2", 
        "Schedule": "cron(0 0 * * ? *)", 
        "Actions": [
            {
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }, 
                "JobName": "se2_job2"
            }
        ], 
        "State": "CREATED", 
        "Type": "SCHEDULED"
    }
}

トリガー作成用のjsonスケルトン表示

skelton
$ aws glue create-trigger --generate-cli-skeleton 
{
    "Name": "", 
    "Type": "SCHEDULED", 
    "Schedule": "", 
    "Predicate": {
        "Logical": "AND", 
        "Conditions": [
            {
                "LogicalOperator": "EQUALS", 
                "JobName": "", 
                "State": "STARTING"
            }
        ]
    }, 
    "Actions": [
        {
            "JobName": "", 
            "Arguments": {
                "KeyName": ""
            }
        }
    ], 
    "Description": ""
}

スケルトンを元に先程のトリガse2_trigger2と同じ内容で、名前だけse2_trigger3に変更したJSON作成

testtrigger2.json
$ cat testtrigger2.json 
{
    "Name": "se2_trigger3", 
    "Type": "SCHEDULED", 
    "Schedule": "cron(0 0 * * ? *)", 
    "Actions": [
        {
            "JobName": "se2_job2", 
            "Arguments": {
                "--job-bookmark-option": "job-bookmark-enable"
            }
        }
    ], 
    "Description": ""
}

se2_trigger3を作成

$ aws glue create-trigger --cli-input-json file://testtrigger2.json
{
    "Name": "se2_trigger3"
}

トリガー更新
以下でcronの時間を0時から2時に変更してます。

$ cat testtrigger2_upd.json 
{
    "Name": "se2_trigger3", 
    "TriggerUpdate": {
        "Name": "se2_trigger3", 
        "Description": "", 
        "Schedule": "cron(0 2 * * ? *)", 
        "Actions": [
            {
                "JobName": "se2_job2", 
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }
            }
        ] 
    }
}
$ aws glue update-trigger --cli-input-json file://testtrigger2_upd.json
{
    "Trigger": {
        "Predicate": {}, 
        "Name": "se2_trigger3", 
        "Schedule": "cron(0 2 * * ? *)", 
        "Actions": [
            {
                "Arguments": {
                    "--job-bookmark-option": "job-bookmark-enable"
                }, 
                "JobName": "se2_job2"
            }
        ], 
        "State": "CREATED", 
        "Type": "SCHEDULED"
    }
}

その他

CloudFormationも対応しているのでそちらでもCLI操作や自動化が可能かと思います。

ジョブのデプロイ時、より慎重にやるなら、出力先を一時的な別のS3や一時的なDBに変更したジョブを作り、既存と並行稼働させるのもいいかと思います。

ジョブフローの形成は、どのようなデプロイツールを使っているか、どのようなジョブスケジューラーを使っているかにもよるものです。

To Be Continue

Triggerの設定時やStartJobRunコマンドの”–job-bookmark-option”でブックマークを有効無効にできます。
ブックマーク機能について今後書いていければと思います。

Glueのトリガーで設定できるスケジュールは見ての通り現在は簡易的なものです。
AWS Step Functionsで少し複雑なジョブフロー作成について今後書いていければと思います。

参考

AWS CLI
http://docs.aws.amazon.com/cli/latest/reference/glue/create-job.html
http://docs.aws.amazon.com/glue/latest/webapi/API_CreateJob.html

PythonのAPI
http://boto3.readthedocs.io/en/latest/reference/services/glue.html#Glue.Client.create_job

続きを読む

Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)

パーティション分割するジョブを作る

ジョブの内容

※”Glueの使い方的な①(GUIでジョブ実行)“(以後①とだけ書きます)と同様のcsvデータを使います

“csvデータのタイムスタンプのカラムごとにパーティション分割してparquetで出力する”

ジョブ名

se2_job1

クローラー名

se2_in0
se2_out1

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

※①で作ったものを使います。

テーブルの情報は以下です。

スクリーンショット 0030-01-03 15.45.21.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job1ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”です。

se2_job1
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job1_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

### Add(start)
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
#dropnullfields3 = DynamicFrame.fromDF(df, glueContext, "dropnullfields3")
### Add(end)

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
###
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-03 16.25.57.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/
2018-01-03 16:25:49        926 part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet . 
download: s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet to ./part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head  part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out1でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-03 16.37.26.png

S3の出力パスを入力

スクリーンショット 0030-01-03 16.37.50.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.38.46.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-03 16.38.55.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.39.03.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-03 16.39.16.png

クローラー実行

形式の違うデータが混在しているとテーブルが複数できてしまう

スクリーンショット 0030-01-03 16.41.46.png

クローラーを作り直し、”Add a data store”の”Exclude patterns”の箇所で
不要なものがあれば除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-05 20.18.38.png

クローラー実行後
1つのテーブルとして認識される

スクリーンショット 0030-01-03 16.48.52.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-03 16.49.09.png

Athenaから確認

左メニューからse2_out1のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-05 20.39.54.png

件数も19件で合っている

スクリーンショット 0030-01-03 16.51.35.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばcountryというカラムがあるので、国ごとに集計をするようなケースが多いならcountryも含めてパーティション分割する

他にもappidとかでアプリごとに集計したり
一時的な調査にも役立つかも

出力に”out2″ディレクトリ作成
①と同様のジョブをse2_job3で作成
作成したジョブのPySparkスクリプトに以下2点修正

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['country','year','month','day','hour']
output='s3://test-glue00/se2/out2/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

countryごとにパーティション別れてます

スクリーンショット 0030-01-03 17.33.24.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-03 17.36.02.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-03 17.37.19.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-03 17.39.12.png

件数も同じく19件

スクリーンショット 0030-01-03 17.38.57.png

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/preview/api/python/pyspark.sql.html
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

続きを読む

Glueの使い方的な①(GUIでジョブ実行)

GUIによる操作でGlueジョブを作って実行する

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

ジョブ名

se2_job0

クローラー名

se2_in0
se2_out0

全体の流れ

  • 前準備
  • クローラー作成と実行、Athenaで確認
  • ジョブの作成と実行、Athenaで確認
  • 出来上がったPySparkスクリプト確認

前準備

ジョブで使うIAM role

以下のポリシーを付与した任意の名前のロールを作っておく。今回はtest-glueという名前にした。
・AmazonS3FullAccess
・AWSGlueServiceRole
※権限は必要に応じてより厳しくしてください。今回は検証のため緩めにしてあります。

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

S3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

クローラー作成と実行、Athenaで確認

ジョブ作成の前に、こちらもGlueの重要な機能の1つであるData CatalogのCrawlerを作成して動かしていきます。
クローラはデータをスキャン、分類、スキーマ情報を自動認識し、そのメタデータをデータカタログに保存する機能です。これを使って入力となるサンプルデータのスキーマを事前に抽出してデータカタログにメタデータとして保存します。Athena、EMR、RedshiftからもこのGlueのデータカタログを使えます。Hive互換のメタストアです(Hiveメタストアのマネージドサービスと思ってもらえればいいと思います)。

基本的な操作はGUIを使って行えます。

AWSマネージメントコンソールから、Glueをクリックし、画面左側メニューの”Crawlers”をクリックし、”Add crawler”をクリック
スクリーンショット 0029-12-28 13.24.04.png

クローラーの名前入力
スクリーンショット 0029-12-28 11.19.32.png

S3にあるソースデータのパス入力(今回はS3に配置してあるデータが対象)
スクリーンショット 0030-01-02 15.17.11.png

そのまま”Next”
スクリーンショット 0029-12-28 9.38.41.png

“Choose an existing IAM role”にチェックを入れ、IAM roleをプルダウンからtest-glueを選択する
スクリーンショット 0030-01-02 15.17.49.png

“Run on demand”にチェックを入れ”Next”(今回のクローラーはスケジュールせずに手動実行とする)
スクリーンショット 0029-12-28 9.39.03.png

スキーマ情報を保存するDatabaseを選択、既存のものがあればそれでもいいし、なければ下の”Add database”でdatabase作成しそれを選択
Prefixは作成されるテーブル名の先頭に付くもの。見分け分類しやすいものにしておくと良いと思います(現状テーブルにtagとかつけられないので)
スクリーンショット 0029-12-28 9.39.23.png

クローラー実行

対象のクローラーにチェックを入れ、”Run Crawler”をクリック
スクリーンショット 0030-01-01 17.02.58.png

テーブルが出来上がる

“se2_”のPrefixが付いた”se2_in0″のテーブルができています。”in0″は指定した”Include Path”の一番下のディレクトリ名です。指定した”Include Path”(ソースデータがあるS3のパス、画像の”Location”の部分に表示されている)の配下のディレクトリは自動でパーティションとして認識されます。今回は配下にディレクトリはありませんのでパーティションも作成されません。
スクリーンショット 0030-01-02 15.21.15.png

テーブルの内容を確認するとスキーマが自動で作成されています
実データ配置場所のLocationがs3://test-glue00/se2/in0
Table名のNameが、prefixのse2_とInclude Pathで指定したs3://test-glue00/se2/in0の一番下のディレクトリのin0でse2_in0

Schemaを見るとuuidやappidなどがbigintで数値型になってます、文字列型がよければここでも修正できます。
今回は一旦このまま進めます
※本来はClassifierでいい感じにしたほうがいいと思う

スクリーンショット 0030-01-02 15.23.51.png

AthenaからもGlueのData Catalog使えます
Athenaから同様のテーブルとスキーマの内容が確認できます。Athenaがスキーマ情報にGlueのデータカタログを使ってることがわかります。画面右上にもGlue Data Catalogへのショートカットもありますね。
スクリーンショット 0030-01-02 15.27.07.png

もちろんクエリ実行もできます。
スクリーンショット 0030-01-02 15.29.31.png

ジョブの作成と実行、Athenaで確認

今回のようなジョブであれば、基本は画面ポチポチするだけです

Glueのメニューから”ETL”の”Job”をクリックし、”Add job”をクリック
スクリーンショット 0030-01-01 17.36.49.png

“Name”にジョブ名を入れ、”IAM role”はクローラーでも使ったロールを指定、”Temporary directory”は任意の場所で構いません。
スクリーンショット 0030-01-02 15.31.34.png

ソースデータとなるテーブルを選択。(先程作成したテーブルをクリック)
スクリーンショット 0030-01-02 15.33.35.png

ターゲットとなるテーブルは作成していないのでここで作ります。
今回は、保存先のData storeは”S3″、出力ファイルフォーマットのFormatは”Parquet”、出力先のパスのTargetPathは”任意の場所”を指定
※圧縮も選べます
スクリーンショット 0030-01-02 15.34.41.png

ソースデータとターゲットデータのマッピング変換ができます。いらないカラムを出力から除外したり、カラムの順序を変えたり、”Data Type”をstring,long,intなどに変えたり、新しいカラムを追加してそこに出力させる(カラム名を変える時に使えそうです)などができます
スクリーンショット 0029-12-28 10.08.35.png

次にサマリがでますので問題なければ”Finish”をクリック

ジョブの実行

作成したジョブにチェックを入れ、”Action”から”Run job”をクリック
スクリーンショット 0029-12-28 10.27.38.png

数分待って以下のように”Run status”が”Succeeded”となれば問題なく完了しています。
※問題があれば、”Error”の箇所にエラーの概要、”Logs”、”Error logs”の箇所の出力がリンクになっていてクリックするとCloudWatch logsに移動します。GlueのログはデフォルトでCloudWatch logsに出力されます
スクリーンショット 0030-01-02 15.46.47.png

出力したParquetフォーマットのファイルを、ソースデータと同様にクローラーを使ってスキーマを作り、スキーマオンリードでAthenaクエリを実行してみます。クローラー作成手順は前回と同様なので割愛します。
クローラーにより自動作成されたスキーマ
スクリーンショット 0030-01-02 15.49.41.png

クエリ結果です。データ量が少なくselectしてるだけなのでアレですが、parquetになったので列単位での集計処理などが高速化されるデータフォーマットに変換ができました。
スクリーンショット 0030-01-02 15.51.00.png

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”くらいであればGlueはGUIだけでサーバーレスでできます。

出来上がったPySparkスクリプト確認

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ビルトイン変換のいくつか補足

ApplyMapping:ソースの列とデータ型をターゲットの列とデータ型にマップします

ResolveChoice:複数の型の値が含まれている場合の列の処理方法を指定します。列を単一のデータ型にキャストするか、1つ以上の型を破棄するか、またはすべての型を別々の列または構造体に保持するかを選択できます
make_structは構造体を使用してデータを表現することにより、潜在的なあいまいさを解決します。たとえば、列のデータがintまたはstringの場合、make_structアクションを使用すると、生成されたDynamicFrameにintとstringの両方を含む構造体の列が生成されます。
choiceはspecが空の場合のデフォルトのresolution actionです

DropNullFields:nullフィールドを削除します

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

ジョブを作る際に似たようなジョブを作りたい、テスト段階でジョブのパラメータを一部だけ変えたジョブを作りたいことあると思います。現在はジョブのコピーがGUIからはできないのでそのあたりの運用を考慮する場合はCLIの利用がおすすめです。またあとで書きます。

こちらも是非

似た内容ですがより丁寧にかかれています。安定のクラメソブログ
https://dev.classmethod.jp/cloud/aws/aws-glue-released/
https://dev.classmethod.jp/cloud/aws/aws-glue-tutorial/

Built-In Transforms
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/built-in-transforms.html
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-ApplyMapping.html

続きを読む

AthenaのAPI操作をIAMで制御

Athena のAPIは今のところこれ

pioho $ aws athena help
:
AVAILABLE COMMANDS
       o batch-get-named-query

       o batch-get-query-execution

       o create-named-query

       o delete-named-query

       o get-named-query

       o get-query-execution

       o get-query-results

       o help

       o list-named-queries

       o list-query-executions

       o start-query-execution

       o stop-query-execution

クエリ実行して、クエリの状態見て、結果を得る

start-query-execution

※ちなみにOutputLocationでクエリ結果がS3に出力されますが、これ必須です。つけないとエラーです。今のところそういった仕様です。

pioho $ aws athena start-query-execution --query-string "select * from sampledb.elb_logs limit 10" --result-configuration OutputLocation=s3://aws-athena-query-results
{
    "QueryExecutionId": "0ec3a09e-144b-4a6c-b2f4-a2fb790e4141"
}

get-query-execution

pioho $ aws athena get-query-execution --query-execution-id 0ec3a09e-144b-4a6c-b2f4-a2fb790e4141
{
    "QueryExecution": {
        "Status": {
            "SubmissionDateTime": 1499483900.008, 
            "State": "SUCCEEDED", 
            "CompletionDateTime": 1499483901.083
        }, 
        "Query": "select * from sampledb.elb_logs limit 10", 
        "Statistics": {
            "DataScannedInBytes": 282613, 
            "EngineExecutionTimeInMillis": 956
        }, 
        "ResultConfiguration": {
            "OutputLocation": "s3://aws-athena-query-results/0ec3a09e-144b-4a6c-b2f4-a2fb790e4141.csv"
        }, 
        "QueryExecutionId": "0ec3a09e-144b-4a6c-b2f4-a2fb790e4141"
    }
}

get-query-results

pioho $ aws athena get-query-results --query-execution-id 0ec3a09e-144b-4a6c-b2f4-a2fb790e4141
{
    "ResultSet": {
        "Rows": [
            {
                "Data": [
                    {
                        "VarCharValue": "request_timestamp"
                    }, 
                    {
                        "VarCharValue": "elb_name"
                    }, 
                    {
                        "VarCharValue": "request_ip"
                    }, 
                    {
                        "VarCharValue": "request_port"
                    }, 
                    {
                        "Var
:
:
:

getresultを拒否する

IAM ポリシーをDenyで作成し適用(画像は許可だど..)

スクリーンショット 0029-07-08 11.35.45.png

Deny get-query-results

pioho $ aws athena get-query-results --query-execution-id 0ec3a09e-144b-4a6c-b2f4-a2fb790e4141
An error occurred (AccessDeniedException) when calling the GetQueryResults operation: User: arn:aws:iam::xxxxxxx:user/piko is not authorized to perform: athena:GetQueryResults

AWS Organization

こちらでも制御できるのでAthenaだけ実行させたいアカウントとかもいいかもね

スクリーンショット 0029-07-08 13.18.29.png

あとは

リソースをもう少し細かく制御できるといいなぁと

続きを読む