クラウドに乗せないほうがいいシステム、その見極め方

AWSやMicrosoft Azureなどのクラウドベンダーは、金融機関のシステム管理の指針であるFISC(金融情報システムセンター)の「金融機関等コンピュータシステムの安全対策基準・解説書」や、医薬品や医療機器向けの指針「コンピュータ化システム適正管理ガイドライン(CSV)」への対応リファレンスなどを用意し始めている … 続きを読む

AWS S3勉強まとめ

ブロックストレージ
EBS, インスタンスストア
→EC2にマウントして活用
→Block番号で管理

オブジェクトストレージ
S3, Glacier
→安価かつ高い耐久性を持つオンラインストレージ
→オブジェクト、それに付随するメタデータ、そのオブジェクトにアクセスするためのユニークなIDで構成

ファイルストレージ
EFS
→EC2から同時マウントできる共有ストレージサービス
→ファイルシステム

・S3特徴
→容量無制限、安価なストレージ(1GB3円)、データ容量に依存しない性能(RAIDやサーバー台数を考える必要なし)

・S3用途
①コンテンツ配信、保管サーバ
②ログ&データハブストレージ
③バックアップやDR

バケット
オブジェクトの保存場所。デフォルト100個/1アカウントまで作成可能。名前はグローバルでユニークな必要あり。
オブジェクト
データ本体。URLが付与される
キー
オブジェクトの格納URL
メタデータ
オブジェクトに付随する属性情報。システム定義メタデータ、ユーザ定義メタデータあり
リージョン
バケットを配置するAWSのロケーション
アクセスコントロールリスト(ACL)
バケットやオブジェクトのアクセス管理

・ストレージクラス
スタンダード
標準低頻度アクセスストレージ:スタンダードに比べて安価だが、データの読出し容量に対して課金
Glacier:最も低コスト。データの取り出しにコストと時間
低冗長化ストレージ:Glacierから取り出したデータの置き場所として利用

結果整合性(Eventual Consistency Readモデル)
「更新はそのうち全体に反映される」
読み取り一貫性
– あるトランザクションがデータを変更中のとき、ほかのトランザクションからは変更される前のデータを参照します。
– ほかのトランザクションからは変更前の確定されたデータを参照します。
– あるユーザーAが値をUPDATEしたとき、ユーザーBがそのデータを参照すると、戻ってくる値はUPDATE前の値となります。
– あるトランザクションで変更した確定前のデータをほかのトランザクションから参照することはできません。

・パソコンのファイルシステムやデータベースと同じようにロックやトランザクション処理は行われない
参考URL:https://dev.classmethod.jp/cloud/amazon-s3-eventually-consistent-and-consistent-read/

・アクセス管理
①ユーザポリシー
→IAMuserに対して権限設定
②バケットポリシー
→バケットごとに権限設定。クロスアカウントで使用する際など
③ACL
→バケット、オブジェクトごとに指定可能(オブジェクトACLが優先)

署名付きURL
AWS SDKで作成。S3のプライベートなオブジェクトに対して一定時間アクセスを許可

・Webサイトホスティング機能
静的なWebサイトをS3のみでホスティング可能
– バケット単位で指定
– 独自ドメインの設定→ドメイン名をバケット名として指定
– リダイレクト機能→任意のドメインにリダイレクト設定が可能
CloudFrontとの経由で配信することを推奨。バケットポリシーでHTTP/HTTPSリクエストのみを許可可能

VPCエンドポイント
プライベートサブネットからNATゲートウェイなどを経由せずに直接S3とセキュアに通信可能
同一リージョンのみ

S3 support for IPv6
追加費用なし
静的ウェブホスティングは使用不可

・暗号化
– サーバーサイド暗号化(サーバリソースを利用して格納データの暗号化)
– クライアントサイド暗号化(クライアント側で暗号化したデータをS3にアップロード)

クロスリージョンレプリケーション
異なるリージョン間のS3バケットオブジェクトのレプリケーションを実施
→オブジェクトに対する動作を非同期でレプリケーション
→対象元バケットはバージョニングの機能を有効にする必要あり
※リージョン間データ転送費用が発生

バージョン管理機能
誤操作による削除対策に有効
バケットに対して設定
任意のオブジェクトを参照可能
バージョニングのオブジェクト分も課金。保存期間も指定可能

ライプサイクル管理
バケット内のオブジェクトに対して、ストレージクラスの変更や、削除処理の自動化
データ登録→Standard保存(一定期間過ぎたら削除)→Standard-IA移動(一定期間過ぎたら削除)→Glacierにアーカイブ(一定期間過ぎたら削除)

・アーカイブ
S3上のデータを削除でGlacier側のデータも削除
S3には8KBのオブジェクト名とメタデータのみ保管

・復元
オブジェクトごと
一時的にS3の低冗長化ストレージに指定日数複製(Glacierと低冗長化ストレージ両方課金)
復元にかかる時間の選択肢は3つ
①Expedited:緊急のアクセス
②Standard:3-5時間。標準的
③Bulk:大量のデータ。5-12時間
それぞれによってコストが異なる

・オブジェクト移動
Standard⇔Standard-IA→Glacier
→Glacier

S3分析
Standard-IAとGlacierどちらにいつ移動すればいいだろうかという疑問に答える可視化ツール
→ライフサイクルポリシーの設定値の参考になる

S3インベントリ
S3のオブジェクトのリストを一気にcsvファイルで取得
スケジュールかも可能

・イベント通知
SNS:メール送信
SQS:キューメッセージの登録
Lambda:ファンクションの実行

・CloudWatchによる監視
ストレージメトリクス:バケット単位。1日単位でのレポート。追加費用なし
リクエストメトリクス:オブジェクト単位。通常のCloudWatch料金

CloudTrailによるAPI(操作ログ。Get, Delete, Putなど)管理
S3への操作ログを収集
監査対象とは別のS3バケットの用意推奨

Logging
バケットに対するアクセスログの出力設定可能

Tag管理
バケット/オブジェクトに対してタグの指定可能

・パフォーマンスの最適化
大きなサイズのファイルをアップロード、ダウンロード
RANGE GETを活用。マルチパートアップロード機能
大量のGETリクエストが発生する場合はCloudFrontを併用することを推奨

Transfer Acceleration(高速ファイル転送サービス)
AWSのエッジネットワークから最適化されたAWSのネットワークを経由する。
S3のデータ転送コストとは別に加算
※通常の転送より高速でない場合は、課金されない

コンテンツ配信サーバ
データをS3に配置、CloudFrontでキャッシュさせる
CloudFrontで静的コンテンツ配信。CloudFrontの料金はかからない
Webサーバーで動的コンテンツは処理

ログ&データハブストレージ
オンプレ:Direct Connectでログデータ収集
外部データソース;Kinesisで収集
AWS;S3に保管。Glacierにアーカイブ
分析:Redshift, EMR, Atenaなど

バックアップ、DR
クロスリージョンでデータの複製を保持
リージョン内でもDR設定

参考URL:https://www.slideshare.net/AmazonWebServicesJapan/aws-black-belt-online-seminar-2017-amazon-s3

続きを読む

AWS GlueでネストされたJSONファイルをCSVファイルやParquetに変換する | Developers.IO

AWS GlueのRelationalizeというTransformを利用して、ネストされたJSONをCSVファイルやParquetに変換する方法をご紹介します。CSV形式に変換することでリレーショナルデータベースに簡単にインポートできます。また、Parquetフォーマットに変換することでAthena、Redshift Spectrum、EMRからより高速にクエリできる … 続きを読む

Glueの使い方的な⑦(Step Functionsでジョブフロー)

Step FunctionsでGlueのジョブフローを作る

Glueの使い方的な③(CLIでジョブ作成)“(以後③と書きます)で書いたように、現在Glueのジョブスケジュール機能は簡易的なものなので、複雑なジョブフロー形成には別のスケジューラーが必要になる場合もあります。
例えばGlueのクローラーとGlueジョブもそれぞれにスケジュール機能があり統合したジョブフローを作ることがGlueだけでは出来ません(例えばクローラーを実行し終わったらジョブを実行するとか)。今回はサーバーレスなジョブフローのサービスであるStep Functionsを使って、クローラーを実行し正常終了したら後続のジョブを実行するというフローを作ってみます。

全体の流れ

  • Glue処理内容
  • StepFunctionsの処理内容
  • 前準備
  • Step FunctionsでStateMachine作成
  • 実行

処理内容

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)で実行したものと同じクローラーとジョブを使います。入力データも出力結果も①と同じです。
今回行うのはGlueクローラー処理が終わったら次のGlueジョブ処理開始というジョブフロー形成です。

あらためて①のクローラーとジョブの処理内容は以下の通りです

クローラーの内容

入力のCSVファイルからスキーマを作成します

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

Step Functionsを使ったジョブフローの内容

図の四角をStep Functionsでは”State”と呼びます。処理の1単位と思ってください。

ジョブフローは以下のような形です。

Stateごとに流れを説明します

  • “Submit Crawler Job”でLambdaを使いGlueクローラーを実行
  • “Wait X Seconds”で指定時間待つ
  • “Get Crawler Job Status”でLambdaを使いGlueクローラーの状態をポーリングして確認
  • “Job Complete?”で状態を判定して結果によって3つに処理が分岐
    • 失敗なら”Job Failed”エラー処理
    • 終了なら”Run Final Glue Job”でLambdaを使い後続のGlueジョブを実行
    • 処理中なら”Add Count”でLambdaを使いカウンタをインクリメント。
      • “Add Count”の後”Chk Count”でカウンタをチェックし3回以上になっていたら”Job Failed Timeout”でタイムアウト処理、3未満なら”Wait X Seconds”に戻りループ処理

スクリーンショット 0030-01-13 21.47.05.png

前準備

①と同じです

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

入力ファイルをS3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

ジョブのPySparkスクリプト

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

入力のCSVデータのスキーマ

クローラーによって作成されるスキーマ

スクリーンショット 0030-01-13 22.01.43.png

StepFunctionsでStateMachine作成

StepFunctionsは一連のジョブフローをJSONで定義しこれを”StateMachine”と呼びます。
StateMachine内の処理の1つ1つの四角をStateと呼びます。処理の1単位です。
このJSONの記述はASL(AmazonStatesLanguages)と呼ばれStateTypeとしてChoice(分岐処理)やWait(待ち)やParallel(並列実行)などがJSONだけで表現出来ます。またTaskというStateTypeからはLambdaやアクティビティ(EC2からStepFunctionsをポーリングする)を定義できます。前述の通り今回はLmabdaを使います。

マネージメントコンソールからいくつかあるテンプレートを元に作ることも出来ますが、カスタムでJSONを一から作ることもできます。

新規StateMachine作成画面
“Author from scrach”で一からJSON作成

スクリーンショット 0030-01-14 10.24.59.png

“Template”を選ぶとASLのStateパターンのいくつかのテンプレが選べます

スクリーンショット 0030-01-14 10.28.14.png

左側の”コード”部分にJSONを書き、右側の”ビジュアルワークフロー”の部分にJSONコードで書いたフローがビジュアライズされます

スクリーンショット 0030-01-14 10.29.50.png

StateMachine

今回のStateMachieのJSONは以下です。
内容は前述の通りです。
※[AWSID]のところは自身のAWSIDと置き換えてください

{
  "Comment": "A state machine that submits a Job to Glue Batch and monitors the Job until it completes.",
  "StartAt": "Submit Crawler Job",
  "States": {
    "Submit Crawler Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-cr1",
      "ResultPath": "$.chkcount",
      "Next": "Wait X Seconds",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 120,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
    "Wait X Seconds": {
      "Type": "Wait",
      "SecondsPath": "$.wait_time",
      "Next": "Get Crawler Job Status"
    },
    "Get Crawler Job Status": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-crcheck",
      "Next": "Job Complete?",
      "InputPath": "$",
      "ResultPath": "$.response",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Job Complete?": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.response",
          "StringEquals": "FAILED",
          "Next": "Job Failed"
        },
        {
          "Variable": "$.response",
          "StringEquals": "READY",
          "Next": "Run Final Glue Job"
        }
      ],
      "Default": "Add Count"
        },
    "Add Count": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-addcount",
      "Next": "Chk Count",
      "InputPath": "$",
      "ResultPath": "$.chkcount",
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    },
      "Chk Count": {
      "Type": "Choice",
      "Choices": [{
          "Variable": "$.chkcount",
          "NumericGreaterThan": 3,
          "Next": "Job Failed Timeout"
        }],
      "Default": "Wait X Seconds"
    },
    "Job Failed": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED"
    },
        "Job Failed Timeout": {
      "Type": "Fail",
      "Cause": "Glue Crawler Job Failed",
      "Error": "DescribeJob returned FAILED Because of Timeout"
    },
    "Run Final Glue Job": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:[AWSID]:function:glue-test1-job1",
      "End": true,
      "Retry": [
        {
          "ErrorEquals": ["States.ALL"],
          "IntervalSeconds": 1,
          "MaxAttempts": 3,
          "BackoffRate": 2.0
        }
      ]
    }
  }
}

Lambda

今回使うLambdaは4つです。流れも振り返りながら見ていきます
書き方はいろいろあるし今回はエラーハンドリングも甘いのであくまでも動きのイメージをつかむための参考程度にしてください。最後のGlueジョブの実行についてはジョブの終了判定とかはしてないです。

“Submit Crawler Job”

GlueのAPIを使ってクローラーのStartを行う

glue-test1-cr1
# coding: UTF-8

import sys
import boto3
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_crawler(Name='se2_in0')
    return 1

“Wait X Seconds”

Waitで指定秒数待つ

“Get Crawler Job Status”

GlueのAPIを使ってクローラーのステータスを取得します

glue-test1-crcheck
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.get_crawler(Name='se2_in0')
    response = response['Crawler']['State']
    return response

“Job Complete?”

Choiceで取得したステータスが、”READY”なら正常終了、”FAILED”なら失敗、それ以外は実行中の分岐処理

“Job Failed”

ステータスが失敗なら
FailでStepFunctionsをエラーさせます

“Run Final Glue Job”

ステータスが正常終了なら
GlueのAPIを使ってジョブをStartします

glue-test1-job1
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    client = boto3.client('glue')
    response = client.start_job_run(
    JobName='se2_job0')
    return response['JobRunId']

“Add Count”

クローラーがまだ実行中なら
カウンタにインクリメントします

glue-test1-addcount
# coding: UTF-8

import sys
import boto3
import json
glue = boto3.client('glue')

def lambda_handler(event, context):
    chkcount = event["chkcount"]
    chkcount = chkcount + 1

    return chkcount

“Chk Count”

choiceでカウンタが3未満か3以上かをチェックします

“Job Failed Timeout”

Failでカウンタが3以上だった時のエラー処理

“Wait X Seconds”

3未満の場合はここに戻りループ処理

実行

Step Functionsを実行

作成したStateMachineを選び”新しい実行”をクリック

スクリーンショット 0030-01-14 10.54.54.png

JSONに引数を入れて”実行の開始”をクリック
今回はJSON内で使う変数で”wait_time”を60秒で待ちの時間として入力しています

スクリーンショット 0030-01-14 10.55.52.png

実行状況

スクリーンショット 0030-01-14 10.59.44.png

CloudWatchイベントでスケジュール

あとは上記で作成したStateMachineをCloudWatchイベントでCRON指定すれば定期的実行されるジョブフローの完成です。This is Serverless!

スクリーンショット 0030-01-13 22.34.00.png

その他

今回はクローラー実行後にジョブ実行というシンプルなフローでしたが、Step Functionsは並列度を替えたり引数の受け渡しをしたり、さらにLambdaでロジックを書くことができるので自由度高く複雑なフローの作成が行えます。Glueとの相性はいいのではないでしょうか?

JSON部分も30分もあれば学習完了というカジュアルさがありLambdaを使ってAPI操作で様々なAWSの処理を繋げるのにはとてもいい印象です。

かなりシンプルな処理だったのですがコードがやや多い印象で、より複雑な処理になると結構大きいJSONになりそうで、JSONなのでコメント書けないとか少し大変な部分が出て来るのかもしれません。

バージョン管理を考えるとCliでの処理で運用したほうが良さそうですが、こういったサービスはGUIでの良さもあるのでどちらに比重を置いた運用がいいかは考慮が必要かもです

本文中で使ったカウンタのステート情報はDynamoDBなどに入れた方が良いかもです。

マイクロサービス化しやすいので、極力本来の処理のロジックをLambda側にやらせてそれ以外のフロー処理(分岐とかカウンタインクリメントとか)をJSONで書くのがいいと思います。今回カウンタはLambdaでやってしまいましたが。

ログはCloudWatchLogsに出ます

To Be Continue

TODO

参考

StepFunctions BlackBelt資料
https://www.slideshare.net/AmazonWebServicesJapan/20170726-black-beltstepfunctions-78267693

続きを読む

sagemakerで簡易電力予測ページを作ってみた

前回エンドポイント作るまではできたけどそこからどうやってそのエンドポイントを使ったらいいかわからず1日うーーーんってやってたらできたので記事にしてみました。

できたもの

FireShot Capture 36 - 消費電力予測 - http___localhost_8000_.png
作ったものはすごくシンプルでこのフォームに好きな数字を入れて送信ボタンを押下したら

電力予測結果.png
こんな感じで予測消費電力が返って来るっていうもの

ハマったとこ

このページ自体はflaskを使っていて
フォーム作ったりフォームの内容をPOSTして〜ってところは
色々調べていたらできたのですがハマったのは
どうやったら予測消費電力が返って来るか、というところ
読解力が乏しいので色んな記事やら読んでも理解できず……
AWSのブログにlambdaにこうやったら使えるよっていうのが
書いてあったのでそれを使ってみることに…

import boto3

sagemaker = boto3.client('sagemaker-runtime')
result = sagemaker.invoke_endpoint(
      EndpointName='test',
      Body=pre_data
    )

でもデータの形をどうすればいいかわからず(モデル作成はcsvファイルでやってました)
配列を渡したり適当にオブジェクトにしたりcsvに変換してs3に落としてみたり….色々やっていたのですがずっとエラーが出っ放しでした。
特に出てたエラーが

Received client error (400) from model with message "unable to evaluate payload provided". See https://us-east-1.console.aws.amazon.com/cloudwatch/~~~ (Service: AmazonSageMakerRuntime; Status Code: 424; Error Code: ModelError; Request ID: null)

全然意味がわかんないしCloudWatch見ても訳がわからん状態でした。
色々調べてたらsagemaker.invoke_endpointの引数にContentType='text/csv'って入れたらいいよってここに書いてあったので入れたけど結局データの形をどうしたらいいかわからずまたエラー

Invalid type for parameter Body, value: [12, 0, 12, 12.0, 0], type: <class 'list'>, valid types: <class 'bytes'>, <class 'bytearray'>, file-like object

エラーを読んでみるとfile-like objectにしろって書いてある…
でもfile-like objectが何かわからないで調べていたら

ファイルオブジェクトには実際には 3 種類あります: 生の バイナリーファイル、バッファされた バイナリーファイル、そして テキストファイル です。

ってここにありました。
テキストファイルってことは、モデル作るときcsvだったしカンマ区切りか!って思ってカンマ区切りでデータを送ったら

from flask import Flask, render_template, request, redirect
import json
import boto3

app = Flask(__name__)
@app.route('/')
def index():
  return render_template('/sagemaker.html')

@app.route('/predict', methods=['POST'])
def predict():
  if request.method == 'POST':
    month = request.form['month']
    week = request.form['week']
    time = request.form['time']
    temp = request.form['temp']
    weather = request.form['weather']
    pre_data_str = month + "," + week + "," + time + "," + temp + "," + weather
    sagemaker = boto3.client('sagemaker-runtime', region_name='us-east-1')
    result = sagemaker.invoke_endpoint(
      ContentType='text/csv',
      EndpointName='test',
      Body=pre_data_str
    )
    result_body = json.load(result['Body'])
    pre = int(float(result_body['predictions'][0]['score']))
    return render_template('/predict.html', predict=pre)
  else:
    return redirect('/')

if __name__ == '__main__':
  app.run(port=8000, debug=True)

いけました!
わからんすぎて本当に投げそうになったけど最後まで頑張ってよかったです。

これを次はlambdaで使ってみて、その後カスタムでやってみようと思います。

続きを読む

SageMakerでlinear learnerを使うときにハマったのでメモ

sageMakerを使ってみていてジョブの作成時にめちゃめちゃハマったのでメモ
(ハマった部分だけ記載しているので他の部分は割愛します)

とりあえずデフォルトで用意されている学習法を使ってみようと思い今回はliner learnerを選択。
トレーニングデータは自前で用意していました。

Kobito.yVk02D.png

それで今回ハマったのはチャネルの作り方とトレーニングデータの作り方。
もともと作成していたトレーニングデータは1~5列目に説明変数、6列目に目的変数を記載していました。
コンテンツタイプにtext/csv;label_size=<num><num>に目的変数が記載してある列番号書いとけばいいのかなって思ったのでtext/csv;label_size=6ってしてジョブを作成していたのですが何度やってもfailed…
ヘッダつけてるからダメなのかなって思ってヘッダ消したけどやっぱダメでした。
結局AWSのドキュメント見て気づいたのですが
コンテンツタイプをtext/csvにしてトレーニングデータの1列目に目的変数を記載してねって書いてあったのでトレーニングデータを書き換えることで無事にジョブが通りました。

まだモデル作成等はやっていないのでこれからやります。

追記

記事書いた直後にモデル作成とかもやったのですがモデル、エンドポイントの作成では特にハマることなくできました
ただエンドポイントを作成してどうやって使ったらいいのか全くわからない……
どなたかわかる方がいらっしゃいましたらご享受お願いします

追記2

エンドポイント作成して実際に使うことができましたので記事にしました
やったね

続きを読む

カテゴリー 未分類 | タグ

MetabaseからAmazon Auroraにアクセスしてみた

Metabaseのアクセス先にMySQLとか書いてあったので、Aurora接続もできるだろうと思って勉強がてら試してみました。

Metabaseとは

ベタですが、公式ページが一番わかりやすいっす。

Auroraの準備

デフォルトの設定でとりあえず作成します。

  1. Publicアクセスさせたかったので(色々考えるの面倒)、Publicアクセスの設定
  2. お金ないのでMulti AZはなしん設定

というあたりはデフォルトと変えた感じ。それでサクッと用意しました。

スクリーンショット 2018-01-06 21.41.00.png

Auroraにデータを投入

こちらのページを参考に、DBとテーブルの作成など諸々行いました。

Amazon RDS for Aurora を試してみた

ここで紹介されている郵便データに関してですが、時間が経ったためか実際のカラムの並び順などが変わっていたので、そういう点だけ変更してあとは上記のページのまま進めます。(ありがたや)

mysql> CREATE TABLE zipcode_list (
    -> zipcode varchar(10),
    -> prefecture varchar(255),
    -> city varchar(255),
    -> street varchar(255),
    -> prefecture_kana varchar(255),
    -> city_kana varchar(255),
    -> street_kana varchar(255)
    -> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query OK, 0 rows affected (0.38 sec)

データ投入もすんなり行きました。

$ mysqlimport --local --compress --user=root --password --host=xxxxxxxx.xxxxxxxxx.us-east-1.rds.amazonaws.com --fields-terminated-by=',' --fields-enclosed-by='"' --lines-terminated-by='rn' -d metabase ./zipcode_list.csv
Enter password: 
metabase.zipcode_list: Records: 124117  Deleted: 0  Skipped: 0  Warnings: 0 

Metabaseを起動する

今回はJavaで起動しました。

$ java jar metabase.jar 

起動したら、localhost:3000でアクセスできます。で、諸々画面に従って設定をしますが、MySQLベースの設定をします。

Name Value
Database MySQL
URL Auroraのエンドポイント
Port 3306

スクリーンショット 2018-01-06 21.27.43.png

すると無事繋がって、普通に作成したデータベースが表示されました。(予想通りでした)

データも何か欠損しているわけではなく、投入分がしっかり反映されています。

スクリーンショット 2018-01-06 21.29.02.png

まとめ

予想通り、Auroraは問題なくつなぐことができました。一応GithubのIssueではDynamoDBやAthenaへの対応の要望も出ているので、少し待てばどんどん対応されるんじゃないでしょうか。個人的にはMetabaseの操作性が抜群にいいので、Athenaのクライアントとして使ってみたいと思っています。

おまけ

操作性が抜群にいいですねこれ。

例えば一覧でデータを表示した後に

スクリーンショット 2018-01-06 21.29.29.png

絞り込みたい対象のカラムをクリックすると、その値で絞り込めるようになってる。

スクリーンショット 2018-01-06 21.31.50.png

まだ試してませんが、Pulseとかも使ってみたいですね。

続きを読む

Glueの使い方的な

Glueのすぐ使えそうな操作

1.Glueの使い方的な①(GUIでジョブ実行)
GUIだけでcsv->parquet変換処理を作る

2.Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)
元データにタイムスタンプが入ってるデータを、パーティションによるディレクトリ構成にしてデータ配置とフォーマットなど変換

3.Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)
CLIでジョブ作成操作。いろんなスケジューラーとの連携想定

4.Glueの使い方的な④(ブックマーク)
Glueのブックマーク機能を使って重複した処理を防ぐ

5.Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)
パーティション分割して配置されてるcsvを同じパーティション分割してparquetにする

6.Glueの使い方的な⑥(監視モニタリング)
Glueの監視

7.Glueの使い方⑥(StepFunctionsでジョブフロー)

8.いろんな変換パターン
XMLの変換とか

9.いろんな変換パターン

10.いろんな変換パターン

11.todo

続きを読む

カテゴリー 未分類 | タグ

Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)

パーティション分割csv->パーティション分割parquet

ジョブの内容

※”Glueの使い方①(GUIでジョブ実行)”(以後①とだけ書きます)と同様のcsvデータを使います

“パーティション分割されたcsvデータを同じパーティションで別の場所にparquetで出力する”

ジョブ名

se2_job4

クローラー名

se2_in1
se2_out3

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

内容としては①と同じデータで、year,month,day,hourのパーティションごとに分けたcsvファイルを配置します。
year,month,day,hourのカラムは削除しています。

元となる①のデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

今回使う入力データ(19件)
year,month,day,hourのカラムは削除しています

$ ls
cvlog_2017111112.csv    cvlog_2017120101.csv    cvlog_2017121621.csv
cvlog_2017111414.csv    cvlog_2017120218.csv    cvlog_2017121714.csv
cvlog_2017112114.csv    cvlog_2017120904.csv    cvlog_2017121718.csv
cvlog_2017112915.csv    cvlog_2017121412.csv    cvlog_2017121908.csv
cvlog_2017113014.csv    cvlog_2017121414.csv    cvlog_2017121914.csv
cvlog_2017113020.csv    cvlog_2017121511.csv    cvlog_2017122915.csv
$ cat *
deviceid,uuid,appid,country
iphone,11121,001,JP
deviceid,uuid,appid,country
iphone,11123,009,FR
deviceid,uuid,appid,country
iphone,11119,007,AUS
deviceid,uuid,appid,country
other,11110,005,JP
iphone,11125,005,JP
deviceid,uuid,appid,country
iphone,11129,007,AUS
deviceid,uuid,appid,country
android,11122,001,FR
deviceid,uuid,appid,country
pc,11118,001,FR
deviceid,uuid,appid,country
pc,11117,009,FR
deviceid,uuid,appid,country
iphone,11128,009,FR
deviceid,uuid,appid,country
iphone,11111,001,JP
deviceid,uuid,appid,country
android,11112,001,FR
deviceid,uuid,appid,country
iphone,11116,001,JP
deviceid,uuid,appid,country
iphone,11113,009,FR
deviceid,uuid,appid,country
iphone,11124,007,AUS
deviceid,uuid,appid,country
iphone,11114,007,AUS
deviceid,uuid,appid,country
iphone,11126,001,JP
deviceid,uuid,appid,country
android,11127,001,FR
deviceid,uuid,appid,country
other,11115,005,JP

データの場所
year,month,day,hourのパーティションごとに分けたcsvファイルを配置しています

# aws s3 ls s3://test-glue00/se2/in1/year=2017/ --recursive
2018-01-04 09:38:13          0 se2/in1/year=2017/
2018-01-04 09:40:12          0 se2/in1/year=2017/month=11/
2018-01-04 09:40:38          0 se2/in1/year=2017/month=11/day=11/
2018-01-04 09:41:23          0 se2/in1/year=2017/month=11/day=11/hour=12/
2018-01-04 10:19:28         48 se2/in1/year=2017/month=11/day=11/hour=12/cvlog_2017111112.csv
2018-01-04 09:40:42          0 se2/in1/year=2017/month=11/day=14/
2018-01-04 09:41:41          0 se2/in1/year=2017/month=11/day=14/hour=14/
2018-01-04 10:19:45         48 se2/in1/year=2017/month=11/day=14/hour=14/cvlog_2017111414.csv
2018-01-04 09:40:47          0 se2/in1/year=2017/month=11/day=21/
2018-01-04 09:41:55          0 se2/in1/year=2017/month=11/day=21/hour=14/
2018-01-04 10:20:01         49 se2/in1/year=2017/month=11/day=21/hour=14/cvlog_2017112114.csv
2018-01-04 09:40:50          0 se2/in1/year=2017/month=11/day=29/
2018-01-04 09:42:09          0 se2/in1/year=2017/month=11/day=29/hour=15/
2018-01-04 10:20:22         67 se2/in1/year=2017/month=11/day=29/hour=15/cvlog_2017112915.csv
2018-01-04 09:41:01          0 se2/in1/year=2017/month=11/day=30/
2018-01-04 09:42:22          0 se2/in1/year=2017/month=11/day=30/hour=14/
2018-01-04 10:20:41         49 se2/in1/year=2017/month=11/day=30/hour=14/cvlog_2017113014.csv
2018-01-04 09:42:40          0 se2/in1/year=2017/month=11/day=30/hour=20/
2018-01-04 10:20:52         49 se2/in1/year=2017/month=11/day=30/hour=20/cvlog_2017113020.csv
2018-01-04 09:40:16          0 se2/in1/year=2017/month=12/
2018-01-04 09:43:11          0 se2/in1/year=2017/month=12/day=1/
2018-01-04 09:45:16          0 se2/in1/year=2017/month=12/day=1/hour=1/
2018-01-04 10:21:19         44 se2/in1/year=2017/month=12/day=1/hour=1/cvlog_2017120101.csv
2018-01-04 09:43:21          0 se2/in1/year=2017/month=12/day=14/
2018-01-04 09:46:50          0 se2/in1/year=2017/month=12/day=14/hour=12/
2018-01-04 10:22:28         48 se2/in1/year=2017/month=12/day=14/hour=12/cvlog_2017121412.csv
2018-01-04 09:47:01          0 se2/in1/year=2017/month=12/day=14/hour=14/
2018-01-04 10:22:51         49 se2/in1/year=2017/month=12/day=14/hour=14/cvlog_2017121414.csv
2018-01-04 09:43:38          0 se2/in1/year=2017/month=12/day=15/
2018-01-04 09:47:11          0 se2/in1/year=2017/month=12/day=15/hour=11/
2018-01-04 10:23:12         48 se2/in1/year=2017/month=12/day=15/hour=11/cvlog_2017121511.csv
2018-01-04 09:43:42          0 se2/in1/year=2017/month=12/day=16/
2018-01-04 09:47:23          0 se2/in1/year=2017/month=12/day=16/hour=21/
2018-01-04 10:23:34         48 se2/in1/year=2017/month=12/day=16/hour=21/cvlog_2017121621.csv
2018-01-04 09:43:45          0 se2/in1/year=2017/month=12/day=17/
2018-01-04 09:47:38          0 se2/in1/year=2017/month=12/day=17/hour=14/
2018-01-04 10:23:54         49 se2/in1/year=2017/month=12/day=17/hour=14/cvlog_2017121714.csv
2018-01-04 09:47:43          0 se2/in1/year=2017/month=12/day=17/hour=18/
2018-01-04 10:24:12         49 se2/in1/year=2017/month=12/day=17/hour=18/cvlog_2017121718.csv
2018-01-04 09:43:49          0 se2/in1/year=2017/month=12/day=19/
2018-01-04 09:48:04          0 se2/in1/year=2017/month=12/day=19/hour=14/
2018-01-04 10:25:07         49 se2/in1/year=2017/month=12/day=19/hour=14/cvlog_2017121914.csv
2018-01-04 09:47:59          0 se2/in1/year=2017/month=12/day=19/hour=8/
2018-01-04 10:24:56         48 se2/in1/year=2017/month=12/day=19/hour=8/cvlog_2017121908.csv
2018-01-04 09:43:15          0 se2/in1/year=2017/month=12/day=2/
2018-01-04 09:45:44          0 se2/in1/year=2017/month=12/day=2/hour=18/
2018-01-04 10:21:41         44 se2/in1/year=2017/month=12/day=2/hour=18/cvlog_2017120218.csv
2018-01-04 09:43:52          0 se2/in1/year=2017/month=12/day=29/
2018-01-04 09:48:17          0 se2/in1/year=2017/month=12/day=29/hour=15/
2018-01-04 10:25:26         47 se2/in1/year=2017/month=12/day=29/hour=15/cvlog_2017122915.csv
2018-01-04 09:43:18          0 se2/in1/year=2017/month=12/day=9/
2018-01-04 09:46:24          0 se2/in1/year=2017/month=12/day=9/hour=4/
2018-01-04 10:22:02         48 se2/in1/year=2017/month=12/day=9/hour=4/cvlog_2017120904.csv

S3のディレクトリ構成

Glueジョブの入力データは”in1″ディレクトリ配下、出力は”out3″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE in1/
                           PRE out0/
                           PRE out1/
                           PRE out2/
                           PRE out3/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

入力データ用に新しくクローラーを作り実行してテーブルを作ります。

出来上がるテーブルの情報は以下です。

スクリーンショット 0030-01-04 10.52.04.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job4ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”パーティション分割されたcsvデータを同じパーティションで別の場所にparquetとして出力する”です。

se2_job4
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションをマッピング対象のカラムとして含めてくれません
入力のパーティションのカラムをマップの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

↓↓↓

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job4_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out3/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
###add

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-04 15.54.04.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/
2018-01-04 15:15:41        926 part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet .
download: s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet to ./part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# ls
part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out3でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-04 15.59.33.png

S3の出力パスを入力
形式の違うデータが混在しているとテーブルが複数できてしまうので、不要なものがあれば、excludeで除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-04 16.00.06.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.19.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-04 16.00.28.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.36.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-04 16.00.49.png

クローラー実行

1つのテーブルとして認識している

スクリーンショット 0030-01-04 16.09.01.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-04 16.09.52.png

Athenaから確認

左メニューからse2_out3のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-04 16.11.18.png

件数も19件で合っている

スクリーンショット 0030-01-04 16.11.01.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばappidというカラムがあるので、アプリごとに集計をするようなケースが多いならappidも含めてパーティション分割する

他にもdeviceidとかでデバイスごとに集計したり
一時的な調査にも役立つかも

出力に”out4″ディレクトリ作成
①と同様のジョブをse2_job5で作成
PySparkに以下3点修正したジョブ作成

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションはカラムとして含めてくれません。
入力のパーティションのカラムをapplyの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['appid','year','month','day','hour']
output='s3://test-glue00/se2/out4/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

appidごとにパーティション別れてます

スクリーンショット 0030-01-04 16.30.35.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-04 16.33.32.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-04 16.33.49.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-04 16.34.10.png

件数も同じく19件

スクリーンショット 0030-01-04 16.34.39.png

その他

todo

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/latest/sql-programming-guide.html

続きを読む

Glueの使い方的な④(ブックマーク)

ブックマークを使って一度処理したデータは処理対象外とする

入力パスの先にデータがあっても既に処理済なデータは除外したい。そんな思いになったことありませんでしょうか?

これを実現する機能がGlueの”ブックマーク”です。

全体の流れ

  • 前提
  • ブックマークの効果を見る
  • ジョブの永続的なブックマーク有効化
  • トリガー側のブックマーク有効無効
  • (おまけの考察)

前提

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)、”Glueの使い方的な③(CLIでジョブ実行)“(以後③と書きます)あたりを読んでいただけるとスムーズです

今回扱うジョブは①と同じ内容です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

③の後ろの方で書いたように、このジョブはGlueのGUIだけで作成し、1つのcsvを1つのparquetにするだけのなので、このジョブを2回実行すると同じ内容の出力が2つ出来てしまいます。

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

*①と同じディレクトリ
Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

ブックマークの効果を見る

①や③でジョブを実行して出力ファイルがout0に既に存在しているのでそれを事前に消しておきます。

想定される流れと結果

1回目の実行で、①と同じくparquetの出力ファイルが1つできる。(あとメタデータ2つ)
こんな感じ

2018-01-02 10:44:04        782 _common_metadata
2018-01-02 10:44:04       1746 _metadata
2018-01-02 10:27:05       2077 part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet

ブックマークを有効化することで、2回目の実行で、既に処理した入力ファイルは処理されない為、新たなparquetファイルは出力されず1つのままになります

やってみる

ジョブは①で作ったのと同じ内容で新たにジョブをse2_job2という名前で作ります
ジョブの内容は以下です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

最初のジョブの状態

実行履歴が1件ありますが気にしないでください。
スクリーンショット 0030-01-02 17.38.20.png

1回目実行

ブックマークを有効にして実行する
“Action”の”Run job”をクリックする

※ちなみにこの画面のActionの”Reset job bookmark”でブックマークに保持された状態をリセットできます

スクリーンショット 0030-01-02 17.39.21.png

Parameter(optional)の画面で”Job bookmark”を”Enable”にして”Run job”をクリックする
今回のみ有効なパラメータとしてジョブを実行します。

スクリーンショット 0030-01-02 17.39.06.png

成功して履歴が1つ増えています
スクリーンショット 0030-01-02 17.49.46.png

S3にparquetファイル1つとメタデータ2つがほぼ同じ時刻で作成されています。

スクリーンショット 0030-01-02 17.50.06.png

続いて同じ手順で2回目を実行します。

実行後のS3はメタデータは更新されている(?)
ただ、肝心のparquetファイルは1つのみで、1度処理した入力データを対象外としていることがわかる
※メタデータの内容は後半で確認してみます

スクリーンショット 0030-01-02 17.55.50.png

このようにGlueのブックマークを有効にしておくと、同じ場所にあるソースデータの中でまだ処理していないデータだけを処理対象とすることができます。

ジョブ側の永続的なブックマーク有効化

さっきまではジョブ実行時の1度だけのブックマーク有効でした
今回は永続的なブックマーク有効化をします。

対象ジョブにチェックを入れ、”Action”をクリックし、”Edit job”をクリックする

スクリーンショット 0030-01-02 17.53.30.png

この画面が出て一見ブックマークの設定箇所がなさそうだが、実は下にスクロールできる

スクリーンショット 0030-01-02 17.53.49.png

あった

スクリーンショット 0030-01-02 17.54.07.png

“Enable”に変更して”Save”
するとエラーが、、”temporary directory”と”IAM role”をこのタイミングでも入れないといけなようです。
それぞれ入力後”Save”をクリックします。

スクリーンショット 0030-01-02 17.54.20.png

画面の右端にあるように”Job bookmark”が”Enable”になっているのがわかります。

スクリーンショット 0030-01-02 18.11.49.png

トリガー側のブックマーク有効無効

ブックマークの有効無効確認

対象のトリガーのse2_trigger2の部分をクリック

スクリーンショット 0030-01-02 18.18.50.png

以下の画面になり、Parametersのところに”–job-bookmark-option:job-bookmark-enable”とあればブックマークが有効になっている

スクリーンショット 0030-01-02 18.19.04.png

ブックマーク有効無効変更

対象のトリガーse2_trigger2にチェックを入れ、”Action”をクリックし”Edit trigger”をクリックする

スクリーンショット 0030-01-02 18.16.45.png

“Next”をクリックする

スクリーンショット 0030-01-02 18.16.59.png

画面下の部分に”Job bookmark”がありDisable、Enable、Pauseの3つの状態が選べる
これを選びNextと進めればよさそうだが、それだけだとダメである

スクリーンショット 0030-01-02 18.17.16.png

画面下部にあるKeyとValueの箇所の”job-bookmark-enable”を事前に消しておく必要がある。右側の✖をクリックすることで消せます。消した後に”Job bookmark”をDisableにして”Next”をクリックし、次にサマリが出るので問題なければ”Finish”をクリックする

スクリーンショット 0030-01-02 18.17.39.png

確認すると”–job-bookmark-option:job-bookmark-disable”になっていて無効に変わったことがわかる

スクリーンショット 0030-01-02 18.33.18.png

(ブックマーク有効にしてジョブ実行してもメタデータが更新された件の考察)

結果的にはparquetの仕様っぽいので気にする必要もなさそうです。

メタを詳細に見るためにparquet-toolsを入れます。
このツール相変わらず普通にビルドできない・・

ここは弊社の担々麺デカの力を借りて無事入りました。(+1タンタンメン)
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

とは言え改善してるんじゃと思い、ちょっと脱線しますが

【parquet-toolsインストール2018年版】

github
また(?)リポジトリ変わってます・・
https://github.com/apache/parquet-mr

最新のv1.8.0、v1.8.1はpom.xmlをゴニョゴニョしないとダメだ
Issueは多分こちら
https://issues.apache.org/jira/browse/PARQUET-1129

v1.7はなぜかなく、結果的にv1.6系最後(?)の1.6.0rc7だとすんなりビルドできました。

jdkインストール

yum -y install java-1.7.0-openjdk-devel

mavenインストール

wget http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar xvfz apache-maven-3.3.9-bin.tar.gz 
./apache-maven-3.3.9/bin/mvn -version

.bash_profileとかでパス通す

PATH=$PATH:$HOME/bin:/root/apache-maven-3.3.9/bin

parquet-toolsインストール

git clone https://github.com/apache/parquet-mr.git
cd parquet-mr
git checkout ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7
cd parquet-tools/
mvn clean package -Plocal 

実行

aws s3 cp s3://test-glue00/se2/out0/ .data/ --recursive

java -jar target/parquet-tools-1.6.0rc7.jar head -n 1 data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
deviceid = iphone
uuid = 11111
appid = 1
country = JP
year = 2017
month = 12
day = 14
hour = 12

parquet-toolsでメタ情報も見れます。
ブックマークを使ったのにメタデータは更新があったので、更新前と後のメタの情報を比較してみます。

メタ情報表示

# java -jar target/parquet-tools-1.6.0rc7.jar meta data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
file:        file:/root/parquet/parquet-mr/parquet-tools/data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"deviceid","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"long","nullable":true,"metadata":{}},{"name":"appid","type":"long","nullable":true,"metadata":{}},{"name":"country","type":"string","nullable":true,"metadata":{}},{"name":"year","type":"long","nullable":true,"metadata":{}},{"name":"month","type":"long","nullable":true,"metadata":{}},{"name":"day","type":"long","nullable":true,"metadata":{}},{"name":"hour","type":"long","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
deviceid:    OPTIONAL BINARY O:UTF8 R:0 D:1
uuid:        OPTIONAL INT64 R:0 D:1
appid:       OPTIONAL INT64 R:0 D:1
country:     OPTIONAL BINARY O:UTF8 R:0 D:1
year:        OPTIONAL INT64 R:0 D:1
month:       OPTIONAL INT64 R:0 D:1
day:         OPTIONAL INT64 R:0 D:1
hour:        OPTIONAL INT64 R:0 D:1

row group 1: RC:19 TS:952 OFFSET:4 
--------------------------------------------------------------------------------
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 ENC:PLAIN,BIT_PACKED,RLE
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE

更新前のメタ情報:/tmp/data1/meta.log
更新後のメタ情報:/tmp/data2/meta.log
diffは見切れちゃってますが、差分は以下の並びが違うということでした。内容は同じです。
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
もう1回ジョブを実行した場合、上記の並びは同じでしたがタイムスタンプは最新になってました。
そういうもののようです;

# sdiff -s /tmp/data1/meta.log /tmp/data2/meta.log 
file:        file:/root/parquet/parquet-mr/parquet-tools/data | file:        file:/root/parquet/parquet-mr/parquet-tools/data
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E | deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 | uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19  | appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19  | country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E | year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E | month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 | day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 | hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19

その他

  • Glueで自動生成されるPySparkコードに以下のようなコンテキストオブジェクトがあります。これにはブックマークとしての意味もあり、それぞれの実行時にソース、変換、およびシンクの状態をブックマークの状態のキーとして使用します。状態はタイムスタンプとして記録し維持します。
    なのでブックマークを使用していない場合は、この変数を指定しなくても問題はありません。
    transformation_ctx = “datasource0”
    transformation_ctx = “applymapping1”
    transformation_ctx = “datasink4”

  • S3の結果整合性への対処
    ジョブ開始前に、以前のデータと不整合があるデータをジョブの対象とする(整合なデータは除外リストとして維持する)
    状態としてサイズも持っているということかもしれません。

  • 例えばあるファイルは処理対象か対象ではないのか?と言った詳細なブックマークの状態を見ることは現在はできません。

To Be Continue

todo

参考

Bookmarkの公式ページ
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-continuations.html

re:Invent資料。34ページあたりにBookMarkの細かい情報あり
https://www.slideshare.net/AmazonWebServices/abd315serverless-etl-with-aws-glue

parquet-toolsインストール
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

parquet-tools
https://github.com/apache/parquet-mr

続きを読む