Athenaで入れ子のjsonにクエリを投げる方法が分かりづらかったので整理する

Kinesis FirehoseでS3に置かれた圧縮したjsonファイルを、それに対してクエリを投げる、というのを検証してたのですが、Hive素人なのでスキーマの作り方もクエリの投げ方も正直あんまり良くわかってませんでした。

そこで下記を参照しながらスキーマの作成とクエリ投入をやってみて、最終的にうまくいきました。

日本語記事
https://aws.amazon.com/jp/blogs/news/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/

元記事
https://aws.amazon.com/jp/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/

ずーっと日本語記事を読みながらやっていたのですが、これがめちゃくちゃわかりづらい!!!
※理解度には個人差があります

多分知っている人が見たら何となくわかるんでしょうが、恐らくこれを見るのは自分みたいにあまり良く知らないので参考にしながら実際にやってみている、という層だと思います。
最終的に上手く行ってから思ったのは、前提知識がないと読むのがしんどい、ということですね…。
ただもう少し書いといてくれるだけで十分なのに…。
原文も軽く見ましたが、そっちにも書いてないのでそもそも記述されてません。

調べてもまだ中々情報が出てこない上に、クエリ投入時にエラーが出た場合もエラーメッセージが淡白すぎてどこが問題でエラーになってるのかさっぱりわからなくて悪戦苦闘してました。

そんなわけで、今後同じところで困る人が一人でも減るように、自分用メモも兼ねてハマったところについて補足をしておきたいと思います。

概要

リンク先で書いてあることの流れは大まかに下記のとおりです。

  1. FirehoseでSESの送信イベントログをS3に保存する
    送信イベントログはjson形式で、それをFirehoseでS3に保存しています。
  2. Athenaのテーブルを作成して、クエリを投げる
    • ただテーブル作成して投げる場合
    • 入れ子になっているjsonに対してテーブル作成してクエリを投げる場合
    • 禁止文字を含んでいるものに対してテーブルを作成してクエリを投げる場合
      わかりづらいですが、禁止文字を含む項目をマッピングする項目とクエリを投げる項目が分かれています。)
  3. hive-json-schemaの紹介
    jsonからテーブル作成のためのクエリを生成するツールっぽいのですが、紹介してるわりにちっとも使い方が書いてません…。
    使い方の解説をどなたか…。

ハマったところ

入れ子になったjsonに対するテーブル作成について

ハマったところといいつつ、自分はこの辺は割とスムーズに行ったのですが、ちょっとわかりづらいかもしれないので念のため。
サンプルにもありますが、jsonの中にまたjsonとか配列とかが入っている、みたいなケースは多くあります。
そういった場合、内部にあるjsonに対してstruct型を使って、その下の項目について型を定義してやればOKです。
その中にさらにjsonがある場合はさらにその中にstruct型で定義をすればOK。
例にあるものだと、内部にmail{~}とjsonがあり、その中にさらにいくつかのjsonがあるので、それぞれに対してstruct型で定義をしています。
以下引用(全文は貼っていないので、元はリンク先を見てください。)
※一部バッククオート(`)で囲われている項目がありますが、予約語として使われている言葉をそのまま使用するとエラーになるそうです。
そのため、バッククオートで囲うことによってエスケープしてるようです。

抜粋した入れ子の部分
 mail struct<`timestamp`:string,
              source:string,
              sourceArn:string,
              sendingAccountId:string,
              messageId:string,
              destination:string,
              headersTruncated:boolean,
              headers:array<struct<name:string,value:string>>,
              commonHeaders:struct<`from`:array<string>,to:array<string>,messageId:string,subject:string>
              > 

禁止文字そのものについて

まず、禁止文字が色々あることを最初大して理解してませんでした。
項目名(↑の例だと、timestampとかsourceとかのところ)の定義に使用できない文字があります。
記事中だと「:」(コロン)が禁止文字列なので、それがクエリ中の該当箇所に入っているとエラーになります。
あとは「-」(ハイフン)なんかも禁止文字のようです。
例えばHTTPリクエストのログを見たとき、ヘッダとかはハイフンを使った項目がいくつもあったりするので困りますよね。
一応記事中の例では両方「_」(アンダースコア)に変換しています。(コロンにしか触れてませんが…。)
最初は禁止文字があると知らず、なぜエラーになっているかわからずにハマってました。
この辺どっかにまとまってるのかな…?
どうやって回避するかというと、それがWITH SERDEPROPERTIESの部分です。

禁止文字を含む場合のマッピングの仕方について

最初見た時はなんでこんなことをするのかわかりませんでしたが、上記の通り項目名を定義するときに禁止文字が入っているとエラーになります。
なので、WITH SERDEPROPERTIESの項目で、禁止文字列を含んだ項目名を、禁止文字列のない文字列にマッピングし、元のjsonのkeyでは禁止文字列を含んでいたものに対し、テーブル上ではカラム名として別の文字列をあてがうことができます。
記事中では、コロンやハイフンをアンダースコアに変換した文字列にマッピングしています。
式の左側がカラム名に使いたい文字列で、それに対して右側がデータの元の実際の名前です。
"mapping.カラム名に使いたい文字列"="実際の名前" みたいに記述してます。

マッピングの仕方

WITH SERDEPROPERTIES (
  "mapping.ses_configurationset"="ses:configuration-set",
  "mapping.ses_source_ip"="ses:source-ip", 
  "mapping.ses_from_domain"="ses:from-domain", 
  "mapping.ses_caller_identity"="ses:caller-identity"
  )

クエリの投げ方

これもまあおまけで書いておくと、ここまでしっかりと下の項目までテーブルを定義しておくと、下の項目までクエリで引っ張ることが出来ます。
記事中では下記のような例が出ています。

元記事にある例
SELECT eventtype as Event,
         mail.timestamp as Timestamp,
         mail.tags.ses_source_ip as SourceIP,
         mail.tags.ses_caller_identity as AuthenticatedBy,
         mail.commonHeaders."from" as FromAddress,
         mail.commonHeaders.to as ToAddress
FROM sesblog2
WHERE eventtype = 'Bounce'

mail{〜}の下の項目を参照する時は上記のようにドットをつけて該当項目の名前を指定しています。
さらにその下の項目を参照する時はその後ろにさらにドットをつけています。
この辺は直感的にわかりやすいかもしれません。

おまけとかtipsとか

Firehoseで配置されたフォルダ構成ではパーティションを自動で切ってもらえない

hiveではフォルダが/bucketname/path/to/log/year=YY/month=MM/day=dd/foo
みたいな構成だと自動でパーティション設定してくれるらしいのですが、FirehoseでS3にデータ配置すると/bucketname/path/to/log/YYYY/MM/DD/fooみたいになるので、自分でパーティションを作成する必要があります。
パーティションがない状態でクエリを投げても1件も引っかかりません。
これを作るには下記のようなクエリを投げる必要があります。

elbログを対象としたテーブルにパーティションを作成する場合
ALTER TABLE database_name.table_name
ADD PARTITION (year='2016',month='08',day='28')
location 's3://elb-access-log/AWSLogs/00000000000000/elasticloadbalancing/ap-northeast-1/2016/08/28/';

※参考
https://qiita.com/r4-keisuke/items/d3d339b76d4368b6b30a

上記の例だと1日ずつパーティションを設定する必要があるのですが、
パーティション数には上限があるらしい(1テーブル20000まで)ので、1日ずつとか、1時間ずつとかフォルダ分けしている場合はちょっと注意が必要かもしれないです。
※パーティションの上限については下記
https://docs.aws.amazon.com/ja_jp/general/latest/gr/aws_service_limits.html#limits_glue
さすがに対象が多すぎとなるとしんどいので、シェルスクリプトとかで回すといいと思います。
ただ、シェルスクリプト自体も1つ1つの処理実行だとそこそこ時間かかるのと、パーティションを設定するためのクエリでクエリ履歴が埋め尽くされるのが難点です。

データ元にない項目を定義しても値がnullになるだけで問題はない

jsonの出力が一定じゃなくて、いくつかの似たような型のjsonが混ざっていたり、ものによって存在しない項目があったりしても、それらのキーを全て網羅するようにまとめて定義しちゃって問題ないみたいです。
定義したけどデータ元に項目がない場合はnullが入るだけのようで。
逆に元データにある項目を全部定義する必要はないので、元データにあっても使わないような項目はテーブル作成の段階で定義しないようにしてもいいみたいですね。

ざっと書いたので、わかりづらいとか、もっとこうすればみたいな指摘があればいただけると嬉しいです。

続きを読む

Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)

パーティション分割csv->パーティション分割parquet

ジョブの内容

※”Glueの使い方①(GUIでジョブ実行)”(以後①とだけ書きます)と同様のcsvデータを使います

“パーティション分割されたcsvデータを同じパーティションで別の場所にparquetで出力する”

ジョブ名

se2_job4

クローラー名

se2_in1
se2_out3

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

内容としては①と同じデータで、year,month,day,hourのパーティションごとに分けたcsvファイルを配置します。
year,month,day,hourのカラムは削除しています。

元となる①のデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

今回使う入力データ(19件)
year,month,day,hourのカラムは削除しています

$ ls
cvlog_2017111112.csv    cvlog_2017120101.csv    cvlog_2017121621.csv
cvlog_2017111414.csv    cvlog_2017120218.csv    cvlog_2017121714.csv
cvlog_2017112114.csv    cvlog_2017120904.csv    cvlog_2017121718.csv
cvlog_2017112915.csv    cvlog_2017121412.csv    cvlog_2017121908.csv
cvlog_2017113014.csv    cvlog_2017121414.csv    cvlog_2017121914.csv
cvlog_2017113020.csv    cvlog_2017121511.csv    cvlog_2017122915.csv
$ cat *
deviceid,uuid,appid,country
iphone,11121,001,JP
deviceid,uuid,appid,country
iphone,11123,009,FR
deviceid,uuid,appid,country
iphone,11119,007,AUS
deviceid,uuid,appid,country
other,11110,005,JP
iphone,11125,005,JP
deviceid,uuid,appid,country
iphone,11129,007,AUS
deviceid,uuid,appid,country
android,11122,001,FR
deviceid,uuid,appid,country
pc,11118,001,FR
deviceid,uuid,appid,country
pc,11117,009,FR
deviceid,uuid,appid,country
iphone,11128,009,FR
deviceid,uuid,appid,country
iphone,11111,001,JP
deviceid,uuid,appid,country
android,11112,001,FR
deviceid,uuid,appid,country
iphone,11116,001,JP
deviceid,uuid,appid,country
iphone,11113,009,FR
deviceid,uuid,appid,country
iphone,11124,007,AUS
deviceid,uuid,appid,country
iphone,11114,007,AUS
deviceid,uuid,appid,country
iphone,11126,001,JP
deviceid,uuid,appid,country
android,11127,001,FR
deviceid,uuid,appid,country
other,11115,005,JP

データの場所
year,month,day,hourのパーティションごとに分けたcsvファイルを配置しています

# aws s3 ls s3://test-glue00/se2/in1/year=2017/ --recursive
2018-01-04 09:38:13          0 se2/in1/year=2017/
2018-01-04 09:40:12          0 se2/in1/year=2017/month=11/
2018-01-04 09:40:38          0 se2/in1/year=2017/month=11/day=11/
2018-01-04 09:41:23          0 se2/in1/year=2017/month=11/day=11/hour=12/
2018-01-04 10:19:28         48 se2/in1/year=2017/month=11/day=11/hour=12/cvlog_2017111112.csv
2018-01-04 09:40:42          0 se2/in1/year=2017/month=11/day=14/
2018-01-04 09:41:41          0 se2/in1/year=2017/month=11/day=14/hour=14/
2018-01-04 10:19:45         48 se2/in1/year=2017/month=11/day=14/hour=14/cvlog_2017111414.csv
2018-01-04 09:40:47          0 se2/in1/year=2017/month=11/day=21/
2018-01-04 09:41:55          0 se2/in1/year=2017/month=11/day=21/hour=14/
2018-01-04 10:20:01         49 se2/in1/year=2017/month=11/day=21/hour=14/cvlog_2017112114.csv
2018-01-04 09:40:50          0 se2/in1/year=2017/month=11/day=29/
2018-01-04 09:42:09          0 se2/in1/year=2017/month=11/day=29/hour=15/
2018-01-04 10:20:22         67 se2/in1/year=2017/month=11/day=29/hour=15/cvlog_2017112915.csv
2018-01-04 09:41:01          0 se2/in1/year=2017/month=11/day=30/
2018-01-04 09:42:22          0 se2/in1/year=2017/month=11/day=30/hour=14/
2018-01-04 10:20:41         49 se2/in1/year=2017/month=11/day=30/hour=14/cvlog_2017113014.csv
2018-01-04 09:42:40          0 se2/in1/year=2017/month=11/day=30/hour=20/
2018-01-04 10:20:52         49 se2/in1/year=2017/month=11/day=30/hour=20/cvlog_2017113020.csv
2018-01-04 09:40:16          0 se2/in1/year=2017/month=12/
2018-01-04 09:43:11          0 se2/in1/year=2017/month=12/day=1/
2018-01-04 09:45:16          0 se2/in1/year=2017/month=12/day=1/hour=1/
2018-01-04 10:21:19         44 se2/in1/year=2017/month=12/day=1/hour=1/cvlog_2017120101.csv
2018-01-04 09:43:21          0 se2/in1/year=2017/month=12/day=14/
2018-01-04 09:46:50          0 se2/in1/year=2017/month=12/day=14/hour=12/
2018-01-04 10:22:28         48 se2/in1/year=2017/month=12/day=14/hour=12/cvlog_2017121412.csv
2018-01-04 09:47:01          0 se2/in1/year=2017/month=12/day=14/hour=14/
2018-01-04 10:22:51         49 se2/in1/year=2017/month=12/day=14/hour=14/cvlog_2017121414.csv
2018-01-04 09:43:38          0 se2/in1/year=2017/month=12/day=15/
2018-01-04 09:47:11          0 se2/in1/year=2017/month=12/day=15/hour=11/
2018-01-04 10:23:12         48 se2/in1/year=2017/month=12/day=15/hour=11/cvlog_2017121511.csv
2018-01-04 09:43:42          0 se2/in1/year=2017/month=12/day=16/
2018-01-04 09:47:23          0 se2/in1/year=2017/month=12/day=16/hour=21/
2018-01-04 10:23:34         48 se2/in1/year=2017/month=12/day=16/hour=21/cvlog_2017121621.csv
2018-01-04 09:43:45          0 se2/in1/year=2017/month=12/day=17/
2018-01-04 09:47:38          0 se2/in1/year=2017/month=12/day=17/hour=14/
2018-01-04 10:23:54         49 se2/in1/year=2017/month=12/day=17/hour=14/cvlog_2017121714.csv
2018-01-04 09:47:43          0 se2/in1/year=2017/month=12/day=17/hour=18/
2018-01-04 10:24:12         49 se2/in1/year=2017/month=12/day=17/hour=18/cvlog_2017121718.csv
2018-01-04 09:43:49          0 se2/in1/year=2017/month=12/day=19/
2018-01-04 09:48:04          0 se2/in1/year=2017/month=12/day=19/hour=14/
2018-01-04 10:25:07         49 se2/in1/year=2017/month=12/day=19/hour=14/cvlog_2017121914.csv
2018-01-04 09:47:59          0 se2/in1/year=2017/month=12/day=19/hour=8/
2018-01-04 10:24:56         48 se2/in1/year=2017/month=12/day=19/hour=8/cvlog_2017121908.csv
2018-01-04 09:43:15          0 se2/in1/year=2017/month=12/day=2/
2018-01-04 09:45:44          0 se2/in1/year=2017/month=12/day=2/hour=18/
2018-01-04 10:21:41         44 se2/in1/year=2017/month=12/day=2/hour=18/cvlog_2017120218.csv
2018-01-04 09:43:52          0 se2/in1/year=2017/month=12/day=29/
2018-01-04 09:48:17          0 se2/in1/year=2017/month=12/day=29/hour=15/
2018-01-04 10:25:26         47 se2/in1/year=2017/month=12/day=29/hour=15/cvlog_2017122915.csv
2018-01-04 09:43:18          0 se2/in1/year=2017/month=12/day=9/
2018-01-04 09:46:24          0 se2/in1/year=2017/month=12/day=9/hour=4/
2018-01-04 10:22:02         48 se2/in1/year=2017/month=12/day=9/hour=4/cvlog_2017120904.csv

S3のディレクトリ構成

Glueジョブの入力データは”in1″ディレクトリ配下、出力は”out3″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE in1/
                           PRE out0/
                           PRE out1/
                           PRE out2/
                           PRE out3/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

入力データ用に新しくクローラーを作り実行してテーブルを作ります。

出来上がるテーブルの情報は以下です。

スクリーンショット 0030-01-04 10.52.04.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job4ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”パーティション分割されたcsvデータを同じパーティションで別の場所にparquetとして出力する”です。

se2_job4
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションをマッピング対象のカラムとして含めてくれません
入力のパーティションのカラムをマップの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

↓↓↓

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job4_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out3/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
###add

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-04 15.54.04.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/
2018-01-04 15:15:41        926 part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet .
download: s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet to ./part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# ls
part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out3でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-04 15.59.33.png

S3の出力パスを入力
形式の違うデータが混在しているとテーブルが複数できてしまうので、不要なものがあれば、excludeで除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-04 16.00.06.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.19.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-04 16.00.28.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.36.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-04 16.00.49.png

クローラー実行

1つのテーブルとして認識している

スクリーンショット 0030-01-04 16.09.01.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-04 16.09.52.png

Athenaから確認

左メニューからse2_out3のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-04 16.11.18.png

件数も19件で合っている

スクリーンショット 0030-01-04 16.11.01.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばappidというカラムがあるので、アプリごとに集計をするようなケースが多いならappidも含めてパーティション分割する

他にもdeviceidとかでデバイスごとに集計したり
一時的な調査にも役立つかも

出力に”out4″ディレクトリ作成
①と同様のジョブをse2_job5で作成
PySparkに以下3点修正したジョブ作成

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションはカラムとして含めてくれません。
入力のパーティションのカラムをapplyの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['appid','year','month','day','hour']
output='s3://test-glue00/se2/out4/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

appidごとにパーティション別れてます

スクリーンショット 0030-01-04 16.30.35.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-04 16.33.32.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-04 16.33.49.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-04 16.34.10.png

件数も同じく19件

スクリーンショット 0030-01-04 16.34.39.png

その他

todo

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/latest/sql-programming-guide.html

続きを読む

Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)

パーティション分割するジョブを作る

ジョブの内容

※”Glueの使い方的な①(GUIでジョブ実行)“(以後①とだけ書きます)と同様のcsvデータを使います

“csvデータのタイムスタンプのカラムごとにパーティション分割してparquetで出力する”

ジョブ名

se2_job1

クローラー名

se2_in0
se2_out1

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

※①で作ったものを使います。

テーブルの情報は以下です。

スクリーンショット 0030-01-03 15.45.21.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job1ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”です。

se2_job1
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job1_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

### Add(start)
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
#dropnullfields3 = DynamicFrame.fromDF(df, glueContext, "dropnullfields3")
### Add(end)

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
###
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-03 16.25.57.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/
2018-01-03 16:25:49        926 part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet . 
download: s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet to ./part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head  part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out1でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-03 16.37.26.png

S3の出力パスを入力

スクリーンショット 0030-01-03 16.37.50.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.38.46.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-03 16.38.55.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.39.03.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-03 16.39.16.png

クローラー実行

形式の違うデータが混在しているとテーブルが複数できてしまう

スクリーンショット 0030-01-03 16.41.46.png

クローラーを作り直し、”Add a data store”の”Exclude patterns”の箇所で
不要なものがあれば除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-05 20.18.38.png

クローラー実行後
1つのテーブルとして認識される

スクリーンショット 0030-01-03 16.48.52.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-03 16.49.09.png

Athenaから確認

左メニューからse2_out1のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-05 20.39.54.png

件数も19件で合っている

スクリーンショット 0030-01-03 16.51.35.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばcountryというカラムがあるので、国ごとに集計をするようなケースが多いならcountryも含めてパーティション分割する

他にもappidとかでアプリごとに集計したり
一時的な調査にも役立つかも

出力に”out2″ディレクトリ作成
①と同様のジョブをse2_job3で作成
作成したジョブのPySparkスクリプトに以下2点修正

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['country','year','month','day','hour']
output='s3://test-glue00/se2/out2/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

countryごとにパーティション別れてます

スクリーンショット 0030-01-03 17.33.24.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-03 17.36.02.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-03 17.37.19.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-03 17.39.12.png

件数も同じく19件

スクリーンショット 0030-01-03 17.38.57.png

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/preview/api/python/pyspark.sql.html
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

続きを読む

Glueの使い方的な①(GUIでジョブ実行)

GUIによる操作でGlueジョブを作って実行する

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

ジョブ名

se2_job0

クローラー名

se2_in0
se2_out0

全体の流れ

  • 前準備
  • クローラー作成と実行、Athenaで確認
  • ジョブの作成と実行、Athenaで確認
  • 出来上がったPySparkスクリプト確認

前準備

ジョブで使うIAM role

以下のポリシーを付与した任意の名前のロールを作っておく。今回はtest-glueという名前にした。
・AmazonS3FullAccess
・AWSGlueServiceRole
※権限は必要に応じてより厳しくしてください。今回は検証のため緩めにしてあります。

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

S3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

クローラー作成と実行、Athenaで確認

ジョブ作成の前に、こちらもGlueの重要な機能の1つであるData CatalogのCrawlerを作成して動かしていきます。
クローラはデータをスキャン、分類、スキーマ情報を自動認識し、そのメタデータをデータカタログに保存する機能です。これを使って入力となるサンプルデータのスキーマを事前に抽出してデータカタログにメタデータとして保存します。Athena、EMR、RedshiftからもこのGlueのデータカタログを使えます。Hive互換のメタストアです(Hiveメタストアのマネージドサービスと思ってもらえればいいと思います)。

基本的な操作はGUIを使って行えます。

AWSマネージメントコンソールから、Glueをクリックし、画面左側メニューの”Crawlers”をクリックし、”Add crawler”をクリック
スクリーンショット 0029-12-28 13.24.04.png

クローラーの名前入力
スクリーンショット 0029-12-28 11.19.32.png

S3にあるソースデータのパス入力(今回はS3に配置してあるデータが対象)
スクリーンショット 0030-01-02 15.17.11.png

そのまま”Next”
スクリーンショット 0029-12-28 9.38.41.png

“Choose an existing IAM role”にチェックを入れ、IAM roleをプルダウンからtest-glueを選択する
スクリーンショット 0030-01-02 15.17.49.png

“Run on demand”にチェックを入れ”Next”(今回のクローラーはスケジュールせずに手動実行とする)
スクリーンショット 0029-12-28 9.39.03.png

スキーマ情報を保存するDatabaseを選択、既存のものがあればそれでもいいし、なければ下の”Add database”でdatabase作成しそれを選択
Prefixは作成されるテーブル名の先頭に付くもの。見分け分類しやすいものにしておくと良いと思います(現状テーブルにtagとかつけられないので)
スクリーンショット 0029-12-28 9.39.23.png

クローラー実行

対象のクローラーにチェックを入れ、”Run Crawler”をクリック
スクリーンショット 0030-01-01 17.02.58.png

テーブルが出来上がる

“se2_”のPrefixが付いた”se2_in0″のテーブルができています。”in0″は指定した”Include Path”の一番下のディレクトリ名です。指定した”Include Path”(ソースデータがあるS3のパス、画像の”Location”の部分に表示されている)の配下のディレクトリは自動でパーティションとして認識されます。今回は配下にディレクトリはありませんのでパーティションも作成されません。
スクリーンショット 0030-01-02 15.21.15.png

テーブルの内容を確認するとスキーマが自動で作成されています
実データ配置場所のLocationがs3://test-glue00/se2/in0
Table名のNameが、prefixのse2_とInclude Pathで指定したs3://test-glue00/se2/in0の一番下のディレクトリのin0でse2_in0

Schemaを見るとuuidやappidなどがbigintで数値型になってます、文字列型がよければここでも修正できます。
今回は一旦このまま進めます
※本来はClassifierでいい感じにしたほうがいいと思う

スクリーンショット 0030-01-02 15.23.51.png

AthenaからもGlueのData Catalog使えます
Athenaから同様のテーブルとスキーマの内容が確認できます。Athenaがスキーマ情報にGlueのデータカタログを使ってることがわかります。画面右上にもGlue Data Catalogへのショートカットもありますね。
スクリーンショット 0030-01-02 15.27.07.png

もちろんクエリ実行もできます。
スクリーンショット 0030-01-02 15.29.31.png

ジョブの作成と実行、Athenaで確認

今回のようなジョブであれば、基本は画面ポチポチするだけです

Glueのメニューから”ETL”の”Job”をクリックし、”Add job”をクリック
スクリーンショット 0030-01-01 17.36.49.png

“Name”にジョブ名を入れ、”IAM role”はクローラーでも使ったロールを指定、”Temporary directory”は任意の場所で構いません。
スクリーンショット 0030-01-02 15.31.34.png

ソースデータとなるテーブルを選択。(先程作成したテーブルをクリック)
スクリーンショット 0030-01-02 15.33.35.png

ターゲットとなるテーブルは作成していないのでここで作ります。
今回は、保存先のData storeは”S3″、出力ファイルフォーマットのFormatは”Parquet”、出力先のパスのTargetPathは”任意の場所”を指定
※圧縮も選べます
スクリーンショット 0030-01-02 15.34.41.png

ソースデータとターゲットデータのマッピング変換ができます。いらないカラムを出力から除外したり、カラムの順序を変えたり、”Data Type”をstring,long,intなどに変えたり、新しいカラムを追加してそこに出力させる(カラム名を変える時に使えそうです)などができます
スクリーンショット 0029-12-28 10.08.35.png

次にサマリがでますので問題なければ”Finish”をクリック

ジョブの実行

作成したジョブにチェックを入れ、”Action”から”Run job”をクリック
スクリーンショット 0029-12-28 10.27.38.png

数分待って以下のように”Run status”が”Succeeded”となれば問題なく完了しています。
※問題があれば、”Error”の箇所にエラーの概要、”Logs”、”Error logs”の箇所の出力がリンクになっていてクリックするとCloudWatch logsに移動します。GlueのログはデフォルトでCloudWatch logsに出力されます
スクリーンショット 0030-01-02 15.46.47.png

出力したParquetフォーマットのファイルを、ソースデータと同様にクローラーを使ってスキーマを作り、スキーマオンリードでAthenaクエリを実行してみます。クローラー作成手順は前回と同様なので割愛します。
クローラーにより自動作成されたスキーマ
スクリーンショット 0030-01-02 15.49.41.png

クエリ結果です。データ量が少なくselectしてるだけなのでアレですが、parquetになったので列単位での集計処理などが高速化されるデータフォーマットに変換ができました。
スクリーンショット 0030-01-02 15.51.00.png

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”くらいであればGlueはGUIだけでサーバーレスでできます。

出来上がったPySparkスクリプト確認

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ビルトイン変換のいくつか補足

ApplyMapping:ソースの列とデータ型をターゲットの列とデータ型にマップします

ResolveChoice:複数の型の値が含まれている場合の列の処理方法を指定します。列を単一のデータ型にキャストするか、1つ以上の型を破棄するか、またはすべての型を別々の列または構造体に保持するかを選択できます
make_structは構造体を使用してデータを表現することにより、潜在的なあいまいさを解決します。たとえば、列のデータがintまたはstringの場合、make_structアクションを使用すると、生成されたDynamicFrameにintとstringの両方を含む構造体の列が生成されます。
choiceはspecが空の場合のデフォルトのresolution actionです

DropNullFields:nullフィールドを削除します

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

ジョブを作る際に似たようなジョブを作りたい、テスト段階でジョブのパラメータを一部だけ変えたジョブを作りたいことあると思います。現在はジョブのコピーがGUIからはできないのでそのあたりの運用を考慮する場合はCLIの利用がおすすめです。またあとで書きます。

こちらも是非

似た内容ですがより丁寧にかかれています。安定のクラメソブログ
https://dev.classmethod.jp/cloud/aws/aws-glue-released/
https://dev.classmethod.jp/cloud/aws/aws-glue-tutorial/

Built-In Transforms
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/built-in-transforms.html
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-ApplyMapping.html

続きを読む

Amazon Athenaではじめるログ分析入門

はじめに

Amazon AthenaはAWSの分析関連サービスの1つで、S3に保存・蓄積したログに対してSQLクエリを投げて分析を行えるサービスです。分析基盤を整えたり分析サービスにログを転送したりする必要が無いため、簡単に利用できるのが特長です。

今回はAthenaを使ってこんなことできるよー、というのを紹介したいと思います。

※社内勉強会向け資料をQiita向けに修正して公開しています

ログ分析とAmazon Athena

ログ分析は定量的にユーザ行動を分析してサービスの改善に役立つだけでなく、障害時の調査にも役立つなど非常に便利です。ログ分析に利用されるサービスとしてはGoogle BigQueryやAmazon Redshiftなど様々なものがありますが、その中でAmazon Athenaの立ち位置を確認したいと思います。

ログ分析の流れ

ログ分析の基盤の概念図は下記のようになります。

Screen Shot 2017-12-11 at 17.22.23.png

この図からわかるように、考えることは多く、どのようなログを出すのか、どのように分析用のDBやストレージにデータを移すのか(ETL)、分析エンジンは何を使うのか、可視化のためにどのようなツールを利用するかなどの検討が必要です。

また初期は問題なく動作していてもデータ量が増えるうちに障害が起きたりログがドロップしてしまうなど多くの問題が出てきます。

そのため、多くの場合専門のチームが苦労をしながら分析基盤を構築・運用しています。

Amazon Athenaとは

Amazon Athenaはサーバレスな分析サービスで、S3に直接クエリを投げることができます。AWS上での分析としてはEMRやRedShiftなどがありますが、インスタンス管理などの手間があり導入にはハードルがありました。

Athenaでは分析エンジンがフルマネージドであり、ログ収集の仕組みやサーバ管理の必要がなく分析クエリを投げられます。

Screen Shot 2017-12-11 at 17.22.30.png

RDBのテーブルなどとのJOINは(データをS3に持ってこないと)できないためあくまで簡易的な分析にとどまりますが、ログを集計するというだけであればとても簡単に分析の仕組みが構築できます。

Screen Shot 2017-12-05 at 14.28.56.png

料金について

BigQueryなどと同じクエリ課金です。一歩間違えると爆死するので注意は必要です。

  • ストレージ

    • S3を利用するのでS3の保存料金のみ
  • クエリ
    • スキャンされたデータ 1 TB あたり 5 USD
  • データ読み込み
    • S3からの通常のデータ転送料金。AthenaとS3が同一リージョンなら転送料金はかからない。

使ってみる – ELB(CLB/ALB)のログを分析

Athenaのユースケースとして一番簡単で有用なのはELBのログ分析だと思っています。ここでは簡単に利用手順を紹介します。

1. S3バケットを作成し適切なアクセス権を付与

ドキュメントを参考にS3のバケットを作成し、アクセスログを有効化すると、15分毎にアクセスログが出力されます。

http://docs.aws.amazon.com/ja_jp/elasticloadbalancing/latest/application/load-balancer-access-logs.html#enable-access-logging

2. Athena でテーブルを作る

公式ドキュメントにそのまま利用できるクエリが載っているので、AWSマネージメントコンソールからAthenaのQueryエディタを開きこれを実行します。

http://docs.aws.amazon.com/athena/latest/ug/application-load-balancer-logs.html

3. クエリを実行

Athenaの実行エンジンPrestoは標準SQLなので通常のRDBMSへのクエリのように実行できます。

SELECT count(*)
FROM alb_logs

使ってみる – ELBログから様々なデータを取得する

ELBのログに出力されている内容はそこまで多くないものの、意外と色々取れたりします。

1. あるエンドポイントの日別アクセス数を調べる

SELECT date(from_iso8601_timestamp(time)),
         count(*)
FROM default.alb_logs
WHERE request_url LIKE '%/users/sign_in'
        AND date(from_iso8601_timestamp(time)) >= date('2017-12-01')
GROUP BY  1
ORDER BY  1;

2. 直近24時間の500エラーの発生数を調べる

SELECT elb_status_code,
         count(*)
FROM default.alb_logs
WHERE from_iso8601_timestamp(time) >= date_add('day', -1, now())
        AND elb_status_code >= '500'
GROUP BY  1
ORDER BY  1;

3. レスポンスに1.0s以上時間がかかっているエンドポイント一覧を出す

SELECT request_url,
         count(*)
FROM alb_logs
WHERE target_processing_time >= 1
GROUP BY  1
ORDER BY  2 DESC ;

4. ユーザ単位の行動ログを出す

Cookieは出せないのでIPから。連続したアクセスだとポートも同じになるのでそこまで入れても良い。

SELECT *
FROM alb_logs
WHERE client_ip = 'xx.xxx.xxx.xxx'
        AND timestamp '2017-12-24 21:00' <= from_iso8601_timestamp(time)
        AND from_iso8601_timestamp(time) <= timestamp '2017-12-25 06:00';

5. あるページにアクセスした後、次にどのページに移動しているかを調べる

3回joinしているのでちょっとわかりづらい。これもIPがユーザー毎にユニークと仮定しているので正確ではない。

SELECT d.*
FROM 
    (SELECT b.client_ip,
         min(b.time) AS time
    FROM 
        (SELECT *
        FROM alb_logs
        WHERE request_url LIKE '%/users/sign_in') a
        JOIN alb_logs b
            ON a.time < b.time
        GROUP BY  1 ) c
    JOIN alb_logs d
    ON c.client_ip = d.client_ip
        AND c.time = d.time
ORDER BY  d.time

運用してみる

Athenaについて何となくわかってもらえたでしょうか。次に、実際に運用するときのためにいくつか補足しておきます。

パーティションを切る

ログデータはすぐにTB級の大きなデータとなります。データをWHERE句で絞るにしてもデータにアクセスしないことには絞込はできませんので、Athenaは基本的には保存されている全てのデータにアクセスしてしまいます。

これを防ぐためにパーティションを作って運用します。パーティションを作成するにはCREATE TABLEでPARTITIONED BYでパーティションのキーを指定しておきます。

CREATE EXTERNAL TABLE IF NOT EXISTS table_name (
  type string,
  `timestamp` string,
  elb string,
  client_ip string,
  client_port int,
  target_ip string,
  target_port int,
  request_processing_time double,
  target_processing_time double,
  response_processing_time double,
  elb_status_code string,
  target_status_code string,
  received_bytes bigint,
  sent_bytes bigint,
  request_verb string,
  url string,
  protocol string,
  user_agent string,
  ssl_cipher string,
  ssl_protocol string,
  target_group_arn string,
  trace_id string )
 PARTITIONED BY(d string)
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES (  'serialization.format' = '1',
  'input.regex' = '([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\" ("[^"]*") ([A-Z0-9-]+) ([A-Za-z0-9.-]*) ([^ ]*) ([^ ]*)$' )
LOCATION
 's3://bucket_name/prefix/AWSLogs/123456789012/elasticloadbalancing/ap-northeast-1/'

実際のパーティションの構築るには2種類の方法があります。

http://docs.aws.amazon.com/athena/latest/ug/partitions.html

1.ファイルの保存パスにパーティションの情報を入れる(Hiveフォーマット)

s3のkeyを例えば s3://bucket_name/AWSLogs/123456789012/d=2017-12-24/asdf.logという形で保存します。

この状態で次のコマンドを実行すると自動でパーティションが認識されます。

MSCK REPAIR TABLE impressions

2.ALTER TABLEを実行する

ELBのログなどAWSが自動で保存するログは上記のような形式で保存できないので、直接パーティションを作成します。

ALTER TABLE elb_logs
ADD PARTITION (d='2017-12-24')
LOCATION 's3://bucket/prefix/AWSLogs/123456789012/elasticloadbalancing/ap-northeast-1/2017/12/24/'

これらを毎日実行するようなLambdaなどを作成して運用することになります。

BIツールを利用する

Athenaを実行するだけだと生データが手に入るだけなので、必要に応じてグラフにしたりといったことが必要になるかと思います。

ExcelやGoogle SpreadSheetでも良いですが、BI(Business Intelligence)ツールと呼ばれるものを使って定期的なクエリ実行やその可視化、通知などを実現できます。

Athenaに対応しているものではAWSのサービスの一つであるQuickSightやRedashがあります。

列指向フォーマットについて

データ量が増大してくると、クエリの実行時間が増えると同時にお金もかかるようになってきます。そんな時に利用を検討したいのが列指向フォーマットです。

列指向フォーマットでは列単位でデータを取り出せるため、JOINやWHEREを行う際にすべてのデータを取り出す必要がありません。そのため、高速にかつ低料金でクエリを実行できます。

通常のログは行単位で出力されているため、あらかじめ変換処理を行う必要があります。これにはEMRを利用する方法がAWSで解説されているので、大量データに対して頻繁にクエリを行う際は利用を検討してみてください。

https://aws.amazon.com/jp/blogs/news/analyzing-data-in-s3-using-amazon-athena/

アプリケーションのログを分析する(CloudWatch Logsの場合)

ここまではELBのログでここまでできる的な内容でしたが、実際にはアプリケーションが出力したログを利用したくなります。とにかくS3に集めれば良いのでFluentdなどで集めればOKです。

ただ最近はECSやElastic Beanstalkを使っているとCloudWatch Logsに集約しているケースも増えてきているのではないかと思います。この場合S3に持っていくのが微妙に手間になってきます。CloudWatch LogsではS3にログをエクスポートできますが、通常では特定のログをフィルタをして出力したいと思うので、少し工夫が必要になります。

例えば下記のような構成です。

Screen Shot 2017-12-11 at 17.43.23.png

こちらについては https://aws.amazon.com/jp/blogs/news/analyzing-vpc-flow-logs-with-amazon-kinesis-firehose-amazon-athena-and-amazon-quicksight/ この記事などが参考になります。

おわりに

Amazon Athenaの使い方について自分の持っている知識をまとめてみました。Athenaの利用までの簡単さはログ分析の導入としては非常にハードルが低く、とても有用だと思っています。

他の分析サービスとどう違うのかとかは難しいところですが、AthenaはManagedなPrestoであってそれ以上ではなく、EMRやRedshiftの方が上位互換的に機能が多いので、Athenaで出来ないことが出てきたら他を使うとかでも良いのかなぁと思っています。(ここは詳しい人に教えて貰いたいところ。。)

S3 Selectという機能も出てきてS3ログの分析が更に柔軟になっていく予感もありますのでその導入としてのAmazon Athenaを触ってみてはいかがでしょうか。

注意事項

  • ELBログについて

    • ベストエフォート型のため、全てのログの取得は保証されていません
    • ELBのログはUTCで保存するのでJSTの日付とはパーティションがずれます
    • ログは15分ごとにまとめられるので 23:45〜23:59 頃のログは翌日のパーティションに入ってしまいます

参考資料

続きを読む

AWS AthenaでBigデータ解析

WHY Athena

複数社のASPで吐き出されたフォーマットばらばらの膨大なcsvデータ達を結合しまくってデータ解析せなあかん時ってありますよね

  • いちいちDBサーバー作ってインポートしてらんない
  • いちいちフォーマット合わせてらんない
  • ExcelでVLOOKUPとか処理重すぎてクライアント動かない

そこでAthena

Analytics_AmazonAthena.png

WHAT Athena

  • サーバーレスで環境構築不要
  • S3にファイルを上げてAthenaのコンソール画面でSQL叩くだけで結果出力
  • Prestoベースなので重たいクエリの結果が早い
  • 出力されたデータはcsvでDL可能
  • S3の標準の金額とAthenaのクエリ従量なので安い(クエリのスキャン 1 TB あたり 5 USD)

いいことだらけ

Saving Point

リージョン

実行タイミングが毎日何百回とかでなければS3のリージョンは 米国東部 (バージニア北部) か 米国西部 (オレゴン) がオススメ

Athenaはどのリージョンでも金額は変わらない

スクリーンショット_2017-12-12_14_48_47.jpg

データ圧縮

本データがGZIP圧縮されてればその分スキャンデータが少なくなるので安くなる

列のチューニング

元データに不要な列はなるべく消すこと、消せば消した分だけスキャンデータが少なくなるので安くなる

列指向データ形式(Apache Parquet や Apache ORC)に形式を変換すれば不要な列を読まなくなるのでこれも安くなる

列の削除はExcelやエディタでの編集も良いが、AWS Glueを使うと良い

AWS Glue

Athenaの作業がもっと楽になるやつ
S3の本データをの変換が素早く出来る+Athena用のデータ形式に変換してくれる
他社都合で勝手にデータの列形式とか変わっちゃった時にお役立ち
東京リージョンデビューはもう少しかかる模様なので 米国東部 (バージニア北部) か 米国西部 (オレゴン) で

詳しい使い方はクラスメソッドさんのところ

パーティション

テーブル作成時にパーティション設定することで、クエリの条件に日付関連の方法を入れると、勝手に条件外のデータを読まなくなります

日毎とか週毎、月毎などに分類されるデータはパーティション設定するといいでしょう

パーティションにするにはS3に上げる時にディレクトリを dt=ほにゃらら にしてCREATE文に PARTITIONED BY って書けばいいです

こんな感じ

yakiniku.osushi.sql
CREATE EXTERNAL TABLE IF NOT EXISTS yakiniku.osushi (
  `name` string,
  `media_name` string,
  `media_type` string,
  `device` string,
  `carrier` string,
  `click_date` timestamp,
  `regist_date` timestamp,
  `user_id` string,
  `session_id` string 
) PARTITIONED BY (
  dt string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://yakiniku-tabetai/osushi-tabetai/';

パーティション用のディレクトリを新たに作った場合は、テーブルに認識する必要があるので、Load partitionsしましょう

スクリーンショット_2017-12-12_14_28_15.jpg

Trouble Shooting

クエリは262144バイト以内

クエリが長すぎだとこんな風に怒られます、修正しましょう

Your query has the following error(s):

Your query has exceeded the maximum query length of 262144 bytes. Please reduce the length of your query and try again. If you continue to see this issue after reducing your query length, contact customer support for further assistance. (Service: AmazonAthena; Status Code: 400; Error Code: InvalidRequestException; Request ID: 1effab6b-df02-11e7-820b-c99df83a2b03)

稀にクエリが通らない

最近は頻度は減りましたが、並行で重たいクエリを投げてると稀にエラーになります

サーバレスアーキテクチャ上Lambdaみたいなもんだと思っているので、負荷によるエラーはないと思っているのですが、そういうときは時間を追いて再度実行するとクエリが通ります

S3 SELECTでいいんじゃね?

S3 SELECTはAWS re:Invent 2017で発表されたサービスで、S3にあるデータファイルを直接SELECT分で出力できるみたいです

今はプレビュー版ですが、AWSから許可してもらったので試した所、group by、order by 使えない等の制約があったため使っていません

あとCLI慣れてないと他にも躓くところがあるかもです

ただ、今後今回のようなケース(他社ASPからデータDLして分析)はS3 SELECTで十分になるかもしれません

あくまでAthenaはCLI苦手な人用となるのかも

おわり

続きを読む

EMRクラスターを起動するシェルスクリプト

… test-cluster” aws emr create-cluster –name test-cluster –ami-version 3.4.0 –applications Name=Hive Name=Hue –instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=c3.xlarge –use-default-roles … 続きを読む

BigQuery vs Athena vs RedShift vs Hive

はじめに

最近業務でAWS, GCPを選定する機会があったので,クエリエンジン周りの検証内容をざっとまとめます.
ちなみに筆者はクラウドにわか勢なので,間違っている点などあるかもしれません.詳しい方は笑って受け流してコメント欄等でご指摘いただけると幸いです.
ちなみに最終的にはGCS + BigQueryに決めたので,これらを若干贔屓目に書いているかも…

検証日時

2017/07/25なのでちょっと変わっているところもあるかも.

比較対象

  • BigQuery (GCP)
  • Athena (AWS)
  • RedShift (AWS)
  • Hive (オンプレOpenStack)

比較観点

  • オブジェクトストレージの料金
  • 料金
  • 入出力に使えるデータのフォーマット
  • Schemaの検出
  • パフォーマンス

ただし,全項目に対して比較対象4つ全てを比較したわけではありません.その辺りも筆者のさじ加減で決めてます.

想定データ

ちなみに今回想定するデータの概要は以下の通りです.
(手っ取り早く済ませたかったのでそこまで大きなデータではやってません)

フォーマット CSV (各ファイルヘッダ行付き)
ファイル数 320
ファイルサイズ 約140MB
合計ファイルサイズ 約45GB

オブジェクトストレージの料金

お金は大事ですね. まずはストレージの料金から比べます.
料金は各種操作にかかりますが,特にドミナントなのはストレージ代と転送量だと思うのでこれ以外はめんどくさいのでスルー.

料金系は変動が激しそうなので,まずリンクを貼ります.

とりあえず検証時点では,こちらの想定するデータ量で以下の通りでした. (東京リージョン)

  • S3

    • Storage料金: $0.025 / GB
    • 転送料金: $0.14 / GB
  • GCS
    • Storage料金: $0.023 / GB
    • 転送料金: $0.14 / GB

よっぽどデータが大きくない限り大した差にはならないので,決め手にはならないでしょう.
後日談の方でも少し述べますが,ストレージの機能としては若干S3の方が優れている気がしたので,若干高いのもまぁ妥当だと思います.

料金

こちらもまずはリンクを.

大きく違うのはAthena, BigQueryはサーバレスでクエリ課金,RedShiftはクラスタを常時立ち上げておくタイプで時間課金という点です.検証した時点での料金は以下の通りでした.

  • Athena, BigQuery: $5 / TB (スキャン量)
  • RedShift: dc2.large 1台辺り $0.314 / hour

最初に記載したデータ量を1ヶ月毎日受け取ると想定して試算すると,
0.045 TB × 30日 × $5 = $6.75

一方RedShiftは上記1台のクラスタでも
$0.314 × 24時間 × 30日 = $226.08

とかなりの差がつきます.

RedShiftはBIツールのバックエンドなど,かなり頻繁にクエリをかけるワークロードと併用するというような条件下でないと優位性は出にくいのではないかと思いました. 機械学習のデータの前処理や,アドホックにクエリをかけたりというような用途で使う程度ならおそらくAthena, BigQueryの方が安くつくことが多いのではないでしょうか.

入出力に使えるデータのフォーマット

用途によりますが,データパイプラインの一部として利用する場合は,フォーマット次第でデータサイズの効率,処理の効率もかなり変わってくると思います.ということで入出力に使えるデータのフォーマットも比較しておきます.

Athena

入力 (残念ながら調査時点であったリンクが無くなっていた…)

厳密にはもう少しありますが,ファイルフォーマットという意味では下記の4つです.

  • JSON
  • CSV/TSV
  • Parquet
  • ORC

ざっくり説明するとAthenaはS3上のファイルに対しExternal Table(データの実体がクエリエンジンの外側にあるテーブル)としてクエリをかけられるPrestoです.テーブルにロードすることなく使えますが,クエリはS3のファイル集合を直接見に行くことになるので,最初からクエリに効率のよいフォーマット(上記でいうとParquetとORC)にして配置するのがベターです.これらのフォーマットはデータの圧縮効率もよく,ストレージコストも下げられますし,カラムナーの性質からSELECTでカラムを選択的にREADする際のスキャン量も減るので,クエリのコストも下げられます.

出力

  • CSV

RedShift

RedShiftは料金の時点で選択肢から消え気味だったのであまり細かく見ていないですが使えるフォーマットは下記の通りです.

入力

  • CSV
  • Avro/TSV
  • JSON

出力

  • CSV/TSV

厳密にはもう少しいろいろできそうですが,実現可能なメジャーなファイルフォーマットという意味ではCSV/TSVのみです.

BigQuery

入力

  • Avro
  • JSON
  • CSV

BigQueryの方はAthenaと似ていますが,こちらはBigQuery上にデータをロードして使うこともできます.
入力としてカラムナーフォーマットのファイルは取れませんが,テーブルにロードする際に,効率のよいカラムナーフォーマットに変換しているようです.(中の状態まで詳しく知りたい方はこちらの論文を参照.なかなか面白いです)

Athenaと同様にGCS上のデータをExternal Tableで参照してクエリをかける機能もあるにはあるのですが,US, EUリージョンのバケットでないとちゃんとクエリ結果を出力できないようでした.どのみちExternal Tableでは,クエリの効率もよくないですし,手間を惜しまずちゃんとテーブルにロードして使った方がいいです.料金が気になるかもしれませんが,ロードしたデータには生存期間が設定できますし,カラムナーにすることでスキャン量が削られるのでクエリ料金の方は下げられます.

出力

  • Avro
  • JSON
  • CSV

Apache Arrowも少し話題に上がってきているので,Parquet出力もあると嬉しかった.

Hive

Hiveも案外いろいろ使えますね.

入力

こちらも合わせて参照
– Avro
– CSV/TSV
– JSON
– Parquet
– ORC

出力

  • Avro
  • CSV/TSV
  • Parquet
  • ORC

以外にも入出力フォーマットではHiveが最強です.
次点でBigQuery.カラムナーの対応がないのが惜しいです.
AWS系2つでは出力がCSV/TSVしかない点で,ちょっと弱めです.
とはいえCSV/TSVで問題になることも少ないので,ここもそこまで決め手にはならないかもしれません.

Schemaの検出

アドホックにクエリをかけたいようなケースで自分でテーブルを手入力で作るのは結構大変です.
特にフィールド数が数百,数千とあったりすると正直心が折れます. なんらかのスキーマ検出機能を自作することになるかもしれません.
ということでスキーマ検出周りも少し調べました.

  • Athena

    • 特になし,事前に自分でテーブル定義を行う必要がある.
  • RedShift:
    • こちらも特になし,Athenaと同様事前に定義が必要
  • BigQuery:
    • CSVはヘッダ行,Avroはファイルの先頭にスキーマを記述しておくと自動で解決してくれる.
    • さらにAvroは後方互換がある組み合わせならスキーマが異なっても自動で解決してくれました.
  • Hive:
    • 特になし,自分で定義する必要がある.

ここは完全にBigQuery1強でした.
特にAvroは標準ライブラリでファイルに書き出す分には自動でファイルの先頭にスキーマをつけてくれているので,そこまで先頭のスキーマの制約も気にならずに使えると思います.

パフォーマンス

やはり一番気になるのはパフォーマンスでしょう.最初に記述したデータを想定して以下のようなクエリで実行時間,スキャンされたデータ量を計測しました.

クエリ1. 条件に合うレコード数を数える
クエリ2. 条件に合うレコードの集合と,条件に合わないレコードの集合から適当にサンプリングした集合の和集合を返す

Athena

クエリ1,クエリ2は具体的には次の通りです.

クエリ1

SELECT COUNT(*) FROM samples WHERE length(column1) != 0

クエリ2

 (SELECT * FROM samples WHERE length(column1) = 0 AND random() < 17000000.0 / 352824342.0)
 UNION ALL
 (SELECT * FROM samples WHERE length(column1) != 0)

結果

クエリ 実行時間 (sec) スキャンしたデータサイズ (GB)
クエリ1 7 sec 42.17 GB
クエリ2 115 sec 84.44 GB

RedShift

RedShiftは dc1.large 1台で計測していました.
実際はクラスタのサイズを大きくすればもっと速くなるはずですが,既にお値段にげんなりしていたので,1台のパフォーマンスしか見ていません.
ちなみにこの構成だとデータのロード自体に33分かかっていた…

クエリ1

SELECT COUNT(*) FROM samples WHERE length(column1) != 0

クエリ2

UNLOAD ('
  (SELECT * FROM samples WHERE length(column1) != 0) UNION ALL
  (SELECT * FROM samples WHERE length(column1) = 0 AND random() < 17000000 / 352824342 )
') TO 's3://sample-bucket/redshift.csv'
WITH CREDENTIALS 'aws_access_key_id=<access_key>;aws_secret_access_key=<secret_key>'
DELIMITER AS ','
PARALLEL ON;

結界 (クエリ課金でないのでスキャン量は見てません)

クエリ 実行時間 (sec)
クエリ1 7 sec
クエリ2 355 sec

RedShift使うならかなりお金積まないとキビシイ…

BigQuery

BigQueryはデータをロードしてからクエリをかけています.
ちなみにデータロードは1分程度でした.

クエリ1

SELECT COUNT(*) FROM samples WHERE length(column1) != 0

クエリ2

SELECT * FROM
    (SELECT * FROM `dataset.samples` WHERE column1 IS NULL AND  RAND() < 17000000 / 352824342),
    (SELECT * FROM `dataset.samples` WHERE column1 IS NOT NULL)

結果

クエリ 実行時間 (sec) スキャンしたデータサイズ (GB)
クエリ1 2 sec 0.0126 GB
クエリ2 26 sec 53.1 GB

AthenaはS3からExternalなクエリ, BigQueryはテーブルにロードした上でのクエリなので若干フェアではありませんが,ロード時間を合わせてもBigQueryが最速でした.特に一旦テーブルに読み出してから何回もクエリをかけるような場合だとパフォーマンスはBigQuery1強といえる結果でしょう.

総括

クエリエンジンに関しては,とりあえず自分の想定する用途では,ほぼBigQuery1強状態でした.
速いしスキーマも自動検出してくれてロードも楽だし,いろんな入出力フォーマットに対応しています.

オブジェクトストレージは料金的にはS3もGCSも同じくらいになるのですが,やっぱりGCPは全体的にACLが弱いです.
特に気になった点として,GCSでは最小でもバケットという粒度でしかACLを設定できず,S3のようにPrefixベースでのアクセス権限の設定ができません.
オブジェクトストレージではまだまだS3に分があると言えます.

個人的な見解ですが,ざっくりまとめると,データを厳格に管理することを重視するならS3 + Athena
ACLの弱さに目を瞑ることができるならGCS + BigQueryの採用をオススメします.

最後に

かなり雑な検証でしたが,オブジェクトストレージ, クエリエンジンの観点からAWS, GCPの比較をまとめました.これからクラウドの選定を行う方々の一助となれば嬉しいです.

また,個人の見解を述べている箇所も多々あるので,間違い等あればコメント欄でご指摘いただけると幸いです.

続きを読む

Redshift spectrumでnginxのログ解析をする

最近東京でも利用できるようになったRedshift spectrumを使ってみた.
やりたいこととしては以下の通り.

  • 適当なnginxのログがS3に溜まっているとする
  • 形式は改行区切りのjson
  • それをRedshift spectrumから触れるようにしたい

Redshift spectrumとAthena

これは最初に知りたかったことなのだが,Redshift spectrumで利用するRedshiftの外部データテーブルはデフォルトではAthena内に作成されている
http://docs.aws.amazon.com/ja_jp/redshift/latest/dg/c-spectrum-external-schemas.html

つまり,

  • AthenaでS3のデータを参照するテーブルを作ってそれをRedshiftの外部データテーブルに指定すること
  • Redshift内でCREATE EXTERNAL TABLE して外部テーブルを作成・管理すること

の2つは,ほとんど同じことをしているといえる.

テーブルをRedshift内で作成・管理したければRedshift内でCREATE EXTERNAL TABLE すればいいし,Athenaの画面からポチポチしたい場合はAthena側で作れば良い.そこに大きな差はなかった.

最初ここがわからずに,迷子になっていた.

jsonのデータを読み込む

Athenaでjsonのデータを読み込む

というわけで,まずはAthenaでデータを読めるようにしたい.

jsonはAthenaで扱う上ではそこまで効率の良いフォーマットではないが,とりあえず読めるところまで行きたいので気にせずにjsonのまま読ませる(もっと高速なColumn-orientedなフォーマットについては後述).
https://qiita.com/moaikids/items/e91b1bcb17458d865beb

スクリーンショット 2017-11-21 15.39.02.png

重要なこととしてパーティションがある.パーティションを切っておかないとAthenaは対象S3ディレクトリの下をフルスキャンすることになり,データ量によってはかなりの時間がかかってしまう.

というわけでパーティションを切るわけだが,Athenaは裏側でHiveが動いており,Hiveのフォーマットに則ったディレクトリ構成にしておくと非常にパーティションを作りやすい.
https://dev.classmethod.jp/cloud/aws/athena-partition-reinvent/

s3://akira-playground/nginx/dt=2017-11-21-14

というようなディレクトリ構成にしておくと,parition keyをdtにするだけでパーティションを作ってくれる.

スクリーンショット 2017-11-21 15.37.40.png

なので,できるだけこの形式でログを保存しておこう.

もう一点補足がある.
timestampを保存するときにdatetime型を指定すると思うのだが,timestampの形式はHiveのdatetime形式である必要がある.
yyyy-mm-dd hh:mm:ss[.fffffffff] というような形式に収めて置くと,datetime型として検索できる.

スクリーンショット 2017-11-21 15.42.26.png

パーティションを設定した場合は,先にLoad partitionしてMSCK REPAIR TABLE を流す必要がある.

Redshift spectrumで触りたい

と思ってRedshiftからAthenaのテーブルを参照してみようと思った.

https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-with-amazon-athena/

ERROR: External Catalog Error: Unsupported file format. org.apache.hadoop.mapred.TextInputFormat org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat org.openx.data.jsonserde.JsonSerDe

おかしい,エラーが出る.

たしかにRedshift spectrumはjsonに対応していない.

https://dev.classmethod.jp/cloud/aws/amazon-redshift-spectrum-is-released/
https://www.gixo.jp/blog/10094/

結局jsonについての明確な言及記事は今のところ見つからなかった.
ただ,エラーから察するに,やっぱりまだspectrumではjsonを読むことは出来ないらしい,たとえAthenaのテーブルを参照していたとしても

諦めてparquetにする

ちなみに何度調べてもすぐに読み方を忘れるのだが,「パーケイ」と読むらしい.
parquetの詳細についてはこちらを参照してもらうとして.
http://labotech.dmm.com/entry/2015/09/08/1642

変換する

json -> parquetを行う.
embulk等を使っても良いのだが,Redshift spectrumを使うようなケースを考えると,日常的に大量のログ変換が必要になる気がしていて,Glueを使ってみた.
https://qiita.com/hideji2/items/85747e3d66026045614d

ちなみに,まだ東京には来ていないので,us-eastのGlueを使うしかなかった.

大体上記の記事と同じ作り方で変換できた.

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
from datetime import datetime, timedelta, tzinfo

sc = SparkContext()
glueContext = GlueContext(sc)
sqlContext = SQLContext(sc)

# datetime.nowで取ってきた日時をJSTにしたいがために用意しておく
class JST(tzinfo):
    def utcoffset(self, dt):
        return timedelta(hours=9)

    def dst(self, dt):
        return timedelta(0)

    def tzname(self, dt):
        return 'JST'

now = datetime.now(tz=JST())
target_date = now - timedelta(hours=1)
date_str = target_date.strftime("%Y-%m-%d-%H")

## @params: [IN_PATH, IN_PATH]
args = getResolvedOptions(sys.argv, ['IN_PATH', 'OUT_PATH'])


# 引数でもらったディレクトリ内は日時でパーティションされたディレクトリが並んでいる
in_path  = args['IN_PATH'] + "dt=" + date_str + "/*.json"
# 出力先も日時パーティションで出力する
out_path = args['OUT_PATH'] + "dt=" + date_str + "/"
print(in_path)
print(out_path)

# http://qiita.com/ajis_ka/items/e2e5b759e77933b08687
sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
# http://tech-blog.tsukaby.com/archives/1162
sc._jsc.hadoopConfiguration().set("spark.speculation", "false")

sqlContext = SQLContext(sc)

jsonDataFrame = sqlContext.read.json(in_path)

jsonDataFrame.write.mode("overwrite").format("parquet").option("compression", "snappy").mode("overwrite").save(out_path)

こいつをETLのTriggersで毎時実行にしてやると,毎時のパーティションでparquetのログをS3に保存できる.

Athenaでparquetのデータを読み込む

Glueで変換したparquetのデータは,jsonの時と同じくHiveのパーティションに沿ったディレクトリ構成にしてあった.
そのため,だいたいjsonのときと同じ手順でいけた.形式がparquetになるだけ.

スクリーンショット 2017-11-21 15.48.18.png

Redshift spectrumでparquetのデータを読む

Schemaを作る.

sample=# create external schema spectrum
sample-# from data catalog
sample-# database 'akira_playground'
sample-# iam_role 'arn:aws:iam::123456789012:role/mySpectrumRole';
CREATE SCHEMA

テーブルは先程Athenaにnginx_parquetとして用意してあるので,ここで作成する必要はない.

sample=# select count(status) from spectrum.nginx_parquet where dt='2017-11-21-11';
 count
-------
   360
(1 行)

無事こんな感じでspectrumすることができた.

まとめ

Redshift spectrumとAthena

https://qiita.com/moaikids/items/e91b1bcb17458d865beb#%E5%80%8B%E4%BA%BA%E7%9A%84%E3%81%AA%E3%81%BE%E3%81%A8%E3%82%81
こちらでも言われているとおり,spectrumを使うこと前提に立つと,Athenaはそれ単体による解析ツールというより,S3のデータをRedshift内のデータと結びつけ,spectrumしやするする補助ツールっぽい位置づけになる.

もちろん,計算リソースとしてどちらを使うか,どちらのほうが速いかは,Redshiftに普段からどのくらい課金しているかによるとは思うが.

Glueべんり

2017年11月21日現在,jsonはそのままではRedshift spectrumから触ることはできなかった.
jsonを触るなら今まで通りのAthenaを使うしかない.

かといってnginx等のアプリケーションから吐き出すログは今まで通りjsonにするしかないし,たとえparquetに変換するにしても1時間とか1日分とか,まとまった単位で変換しておかないとパフォーマンスは出ないだろう.

だからこそ,Glueはとても便利だった.

一度jsonをparquetに変換してしまえばRedshift spectrumするのもかなり速くなって良いので,長期間溜めたログはparquetに変換すると良いかもしれない.

Glue早く東京に来て欲しい.

続きを読む