コピペで使えるELBのアクセスログ解析による事象分析 (ShellScript, Athena)

アクセスログ解析

ELBのアクセスログの事象分析について、ShellScriptとAthenaを用いた実行例についてまとめます。

ShellScript

CLB

No.1 : レスポンスが正常に受け取れていないELBのレスポンスコード毎のカウント

$ awk '$10 == "-"' * | awk '{print $9}' | sort | uniq -c

No.2 : ELBのレスポンスコード毎の数集計

$ awk '{print $8}' *.log | sort | uniq -c

No.3 : 504のレコード一覧

$ awk '$8 == 504'

No.4 : 504がどのELBノードから多く出力されているか

$ grep ' 504 ' *.log | awk '{print $3}' | sed 's/:.*//' | sort | uniq -c

No.5 : バックエンドから正常に応答が受け取れていない時

$ awk '{if (! int($5) < 0) {print $0}}' * | egrep '2018-01-2[45]'

No.6 : target_processing_time の3つの統計値(最小値、最大値、平均)と -1 の値を取った回数を表示する

$ awk '{ print $4,$8,$9,$6 }' * | sort | sed -e 's/ /!!/' -e 's/ /!!/' | awk '{if(count[$1]==0) min[$1]=100; count[$1]+=1; if(max[$1]<$2&&$2!=-1) max[$1]=$2; if(min[$1]>$2&&$2!=-1) min[$1]=$2; if($2!=-1)sum[$1]+=$2; else minus[$1]+=1;} END{for(k in count)print k,", count:",count[k],", max:",max[k],", min:",min[k],", avg:",sum[k]/count[k],", -1:",minus[k];}' | sort -k4nr

No.7 : response_processing_time の3つの統計値(最小値、最大値、平均)と -1 の値を取った回数を表示する

$ awk '{ print $4,$8,$9,$7 }' * | sort | sed -e 's/ /!!/' -e 's/ /!!/' | awk '{if(count[$1]==0) min[$1]=100; count[$1]+=1; if(max[$1]<$2&&$2!=-1) max[$1]=$2; if(min[$1]>$2&&$2!=-1) min[$1]=$2; if($2!=-1)sum[$1]+=$2; else minus[$1]+=1;} END{for(k in count)print k,", count:",count[k],", max:",max[k],", min:",min[k],", avg:",sum[k]/count[k],", -1:",minus[k];}' | sort -k4nr

No.8 : 最も多いリクエスト元のELBノードIPアドレスのリクエスト数

$ awk '{print $3}' * | awk -F ":" '{print $1}' | sort | uniq -c | sort -r| head -n 10 

No.9 : 時間毎のリクエスト数

grep中の二重引用符内は適宜日付等を入れて絞り込み

grep -r "" . | cut -d [ -f2 | cut -d] -f1 | awk -F: '{print $2":00"}' | sort -n | uniq -c

No.10 : 分単位でのリクエスト数

grep中の二重引用符内は適宜日付等を入れて絞り込み

$ grep "" * | cut -d [ -f2 | cut -d ] -f1 | awk -F: '{print $2":"$3}' | sort -nk1 -nk2 | uniq -c | awk '{ if ($1 > 10) print $0}'

No.11 : ユーザーエージェント毎のランキング

$ awk '{split($0, array, """); agent=array[4]; print agent}' * | sort | uniq -c | sort -nr | head

No.12 : TLSでクライアントが最も使った暗号スイートのランキング

$ awk '{split($0, array, """); afterUserAgent=array[5]; print afterUserAgent}' * | awk '{print $1}' | sort | uniq -c | sort -nr | head -5

No.13 : TLSでクライアントが最も使ったTLSバージョンのランキング

$ awk '{split($0, array, """); afterUserAgent=array[5]; print afterUserAgent}' * | awk '{print $2}' | sort | uniq -c | sort -nr | head

No.14 : TLSでクライアントが最も使ったプロトコルと暗号スイートのランキング

$ awk '{split($0, array, """); proto=array[1]; afterUserAgent=array[5]; print proto afterUserAgent}' * | awk '{print $1 " " $13}' | sort | uniq -c | sort -nr | head

ALB

No.1 : target_processing_time の3つの統計値(最小値、最大値、平均)と -1 の値を取った回数を表示する

 $ awk '{ print $5,$9,$10,$7 }' * | sort | sed -e 's/ /!!/' -e 's/ /!!/' | awk '{if(count[$1]==0) min[$1]=100; count[$1]+=1; if(max[$1]<$2&&$2!=-1) max[$1]=$2; if(min[$1]>$2&&$2!=-1) min[$1]=$2; if($2!=-1)sum[$1]+=$2; else minus[$1]+=1;} END{for(k in count)print k,", count:",count[k],", max:",max[k],", min:",min[k],", avg:",sum[k]/count[k],", -1:",minus[k];}' | sort -k4nr

No.2 : response_processing_time の3つの統計値(最小値、最大値、平均)と -1 の値を取った回数を表示する

$ awk '{ print $5,$9,$10,$8 }' * | sort | sed -e 's/ /!!/' -e 's/ /!!/' | awk '{if(count[$1]==0) min[$1]=100; count[$1]+=1; if(max[$1]<$2&&$2!=-1) max[$1]=$2; if(min[$1]>$2&&$2!=-1) min[$1]=$2; if($2!=-1)sum[$1]+=$2; else minus[$1]+=1;} END{for(k in count)print k,", count:",count[k],", max:",max[k],", min:",min[k],", avg:",sum[k]/count[k],", -1:",minus[k];}' | sort -k4nr

Athena

以下、全て CLB を前提とします。
また、以下のような、デフォルトで生成されている sampledb データベースの elb_logs テーブルを使用します。

CREATE EXTERNAL TABLE `elb_logs`(
  `request_timestamp` string COMMENT '', 
  `elb_name` string COMMENT '', 
  `request_ip` string COMMENT '', 
  `request_port` int COMMENT '', 
  `backend_ip` string COMMENT '', 
  `backend_port` int COMMENT '', 
  `request_processing_time` double COMMENT '', 
  `backend_processing_time` double COMMENT '', 
  `client_response_time` double COMMENT '', 
  `elb_response_code` string COMMENT '', 
  `backend_response_code` string COMMENT '', 
  `received_bytes` bigint COMMENT '', 
  `sent_bytes` bigint COMMENT '', 
  `request_verb` string COMMENT '', 
  `url` string COMMENT '', 
  `protocol` string COMMENT '', 
  `user_agent` string COMMENT '', 
  `ssl_cipher` string COMMENT '', 
  `ssl_protocol` string COMMENT '')
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.RegexSerDe' 
WITH SERDEPROPERTIES ( 
  'input.regex'='([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\" ("[^"]*") ([A-Z0-9-]+) ([A-Za-z0-9.-]*)$') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://athena-examples-us-west-2/elb/plaintext'
TBLPROPERTIES (
  'transient_lastDdlTime'='1480278335');

HTTPステータスコードが200のレコード一覧

SELECT * 
FROM elb_logs
WHERE elb_response_code <> '200'
ORDER BY request_timestamp;

ELB毎のリクエスト数

SELECT elb_name,
         count(*) AS request_count
FROM elb_logs
GROUP BY elb_name
ORDER BY request_count DESC;

ELB毎のリクエスト数(期間指定)

SELECT elb_name,
         count(*) AS request_count
FROM elb_logs
WHERE request_timestamp >= '2014-01-01T00:00:00Z'
        AND request_timestamp < '2016-01-01T00:00:00Z'
GROUP BY elb_name
ORDER BY request_count DESC;

ELB毎のリクエスト数(期間+ELB 指定)

SELECT elb_name,
         count(*) AS request_count
FROM elb_logs
WHERE elb_name LIKE 'elb_demo_008'
        AND request_timestamp >= '2014-01-01T00:00:00Z'
        AND request_timestamp < '2016-01-01T00:00:00Z'
GROUP BY elb_name
ORDER BY request_count DESC;

ELB毎の5XXエラーのリクエスト数

SELECT elb_name,
         backend_response_code,
         count(*) AS request_count
FROM elb_logs
WHERE backend_response_code >= '500'
GROUP BY backend_response_code, elb_name
ORDER BY backend_response_code, elb_name;

ELB毎の5XXエラーのリクエスト数(ELB指定)

SELECT elb_name,
         backend_response_code,
         count(*) AS request_count
FROM elb_logs
WHERE elb_name LIKE 'elb_demo_008'
        AND backend_response_code >= '500'
GROUP BY backend_response_code, elb_name
ORDER BY backend_response_code, elb_name;

ELB毎の5XXエラーのリクエスト数(期間+ELB 指定)

SELECT elb_name,
         backend_response_code,
         count(*) AS request_count
FROM elb_logs
WHERE elb_name LIKE 'elb_demo_008'
        AND backend_response_code >= '500'
        AND request_timestamp >= '2014-01-01T00:00:00Z'
        AND request_timestamp < '2016-01-01T00:00:00Z'
GROUP BY backend_response_code, elb_name
ORDER BY backend_response_code, elb_name;

ELB毎の5XXエラーのリクエスト数(期間+ELB+URL 指定)

SELECT count(*) AS request_count,
         elb_name,
         url,
         elb_response_code,
         backend_response_code
FROM elb_logs
WHERE elb_name LIKE 'elb_demo_008'
        AND backend_response_code >= '500'
        AND url LIKE 'http://www.example.com/jobs/%'
        AND request_timestamp >= '2014-01-01T00:00:00Z'
        AND request_timestamp < '2016-01-01T00:00:00Z'
GROUP BY elb_name,url,elb_response_code,backend_response_code
ORDER BY request_count DESC limit 10;

ELB毎の5XXエラーのリクエスト数(期間+ELB+URL+UserAgent 指定)

SELECT count(*) AS request_count,
         elb_name,
         url,
         elb_response_code,
         backend_response_code,
         user_agent
FROM elb_logs
WHERE elb_name LIKE 'elb_demo_008'
        AND backend_response_code >= '500'
        AND url LIKE 'http://www.example.com/jobs/%'
        AND user_agent LIKE '%Mozilla/5.0%'
        AND request_timestamp >= '2014-01-01T00:00:00Z'
        AND request_timestamp < '2016-01-01T00:00:00Z'
GROUP BY elb_name,url,elb_response_code,backend_response_code,user_agent
ORDER BY request_count DESC limit 10;

送信元IPのリクエスト数ランキング

SELECT request_ip,
         url,
         count(*) AS request_count
FROM elb_logs
WHERE elb_name LIKE 'elb_demo_008'
        AND request_timestamp >= '2014-01-01T00:00:00Z'
        AND request_timestamp < '2016-01-01T00:00:00Z'
GROUP BY request_ip,url
ORDER BY request_count DESC limit 5;

日付ごとのリクエスト数

SELECT date(from_iso8601_timestamp(request_timestamp)),
         count(*)
FROM elb_logs
WHERE url LIKE '%/jobs/%'
        AND date(from_iso8601_timestamp(request_timestamp)) >= date('2014-12-01')
GROUP BY  1
ORDER BY  1;

直近1年の500エラー発生のリクエスト数

SELECT elb_response_code,
         count(*)
FROM elb_logs
WHERE from_iso8601_timestamp(request_timestamp) >= date_add('day', -365 * 1, now())
        AND elb_response_code >= '500'
GROUP BY  1
ORDER BY  1;

レスポンスに1.0s以上時間がかかっているリクエスト

SELECT url,
         count(*) AS count,
         backend_processing_time
FROM elb_logs
WHERE backend_processing_time >= 1.0
GROUP BY  url, backend_processing_time
ORDER BY backend_processing_time DESC;

任意のエントリ取得(期間+リクエスト元IP 指定)

SELECT *
FROM elb_logs
WHERE request_ip = '245.85.197.169'
        AND request_timestamp >= '2014-01-01T00:00:00Z'
        AND request_timestamp <= '2016-01-01T00:00:00Z';

あるページからの遷移先ページ傾向

SELECT d.*
FROM 
    (SELECT b.request_ip,
         min(b.request_timestamp) AS request_timestamp
    FROM 
        (SELECT *
        FROM elb_logs
        WHERE url LIKE '%/jobs/%') a
        JOIN elb_logs b
            ON a.request_timestamp < b.request_timestamp
        GROUP BY  1 ) c
    JOIN elb_logs d
    ON c.request_ip = d.request_ip
        AND c.request_timestamp = d.request_timestamp
ORDER BY  d.request_timestamp;

参考

ELB アクセスログ

Classic Load Balancer のアクセスログ – Elastic Load Balancing
https://docs.aws.amazon.com/ja_jp/elasticloadbalancing/latest/classic/access-log-collection.html#access-log-entry-syntax
Application Load Balancer のアクセスログ – Elastic Load Balancing
https://docs.aws.amazon.com/ja_jp/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-entry-syntax

Athena

Querying Classic Load Balancer Logs – Amazon Athena
https://docs.aws.amazon.com/ja_jp/athena/latest/ug/elasticloadbalancer-classic-logs.html
Querying Application Load Balancer Logs – Amazon Athena
https://docs.aws.amazon.com/athena/latest/ug/application-load-balancer-logs.html

Amazon AthenaでELBのログを調査するときに使ったSQL
https://dev.classmethod.jp/cloud/amazon-athena-sql-for-elb/
Amazon AthenaでELBログをSQLで解析する #reinvent
https://dev.classmethod.jp/cloud/aws/amazon-athena-sql-elb-log-reinvent/
Amazon Athenaではじめるログ分析入門
https://qiita.com/miyasakura_/items/174dc73f706e8951dbdd

続きを読む

Athenaで入れ子のjsonにクエリを投げる方法が分かりづらかったので整理する

Kinesis FirehoseでS3に置かれた圧縮したjsonファイルを、それに対してクエリを投げる、というのを検証してたのですが、Hive素人なのでスキーマの作り方もクエリの投げ方も正直あんまり良くわかってませんでした。

そこで下記を参照しながらスキーマの作成とクエリ投入をやってみて、最終的にうまくいきました。

日本語記事
https://aws.amazon.com/jp/blogs/news/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/

元記事
https://aws.amazon.com/jp/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/

ずーっと日本語記事を読みながらやっていたのですが、これがめちゃくちゃわかりづらい!!!
※理解度には個人差があります

多分知っている人が見たら何となくわかるんでしょうが、恐らくこれを見るのは自分みたいにあまり良く知らないので参考にしながら実際にやってみている、という層だと思います。
最終的に上手く行ってから思ったのは、前提知識がないと読むのがしんどい、ということですね…。
ただもう少し書いといてくれるだけで十分なのに…。
原文も軽く見ましたが、そっちにも書いてないのでそもそも記述されてません。

調べてもまだ中々情報が出てこない上に、クエリ投入時にエラーが出た場合もエラーメッセージが淡白すぎてどこが問題でエラーになってるのかさっぱりわからなくて悪戦苦闘してました。

そんなわけで、今後同じところで困る人が一人でも減るように、自分用メモも兼ねてハマったところについて補足をしておきたいと思います。

概要

リンク先で書いてあることの流れは大まかに下記のとおりです。

  1. FirehoseでSESの送信イベントログをS3に保存する
    送信イベントログはjson形式で、それをFirehoseでS3に保存しています。
  2. Athenaのテーブルを作成して、クエリを投げる
    • ただテーブル作成して投げる場合
    • 入れ子になっているjsonに対してテーブル作成してクエリを投げる場合
    • 禁止文字を含んでいるものに対してテーブルを作成してクエリを投げる場合
      わかりづらいですが、禁止文字を含む項目をマッピングする項目とクエリを投げる項目が分かれています。)
  3. hive-json-schemaの紹介
    jsonからテーブル作成のためのクエリを生成するツールっぽいのですが、紹介してるわりにちっとも使い方が書いてません…。
    使い方の解説をどなたか…。

ハマったところ

入れ子になったjsonに対するテーブル作成について

ハマったところといいつつ、自分はこの辺は割とスムーズに行ったのですが、ちょっとわかりづらいかもしれないので念のため。
サンプルにもありますが、jsonの中にまたjsonとか配列とかが入っている、みたいなケースは多くあります。
そういった場合、内部にあるjsonに対してstruct型を使って、その下の項目について型を定義してやればOKです。
その中にさらにjsonがある場合はさらにその中にstruct型で定義をすればOK。
例にあるものだと、内部にmail{~}とjsonがあり、その中にさらにいくつかのjsonがあるので、それぞれに対してstruct型で定義をしています。
以下引用(全文は貼っていないので、元はリンク先を見てください。)
※一部バッククオート(`)で囲われている項目がありますが、予約語として使われている言葉をそのまま使用するとエラーになるそうです。
そのため、バッククオートで囲うことによってエスケープしてるようです。

抜粋した入れ子の部分
 mail struct<`timestamp`:string,
              source:string,
              sourceArn:string,
              sendingAccountId:string,
              messageId:string,
              destination:string,
              headersTruncated:boolean,
              headers:array<struct<name:string,value:string>>,
              commonHeaders:struct<`from`:array<string>,to:array<string>,messageId:string,subject:string>
              > 

禁止文字そのものについて

まず、禁止文字が色々あることを最初大して理解してませんでした。
項目名(↑の例だと、timestampとかsourceとかのところ)の定義に使用できない文字があります。
記事中だと「:」(コロン)が禁止文字列なので、それがクエリ中の該当箇所に入っているとエラーになります。
あとは「-」(ハイフン)なんかも禁止文字のようです。
例えばHTTPリクエストのログを見たとき、ヘッダとかはハイフンを使った項目がいくつもあったりするので困りますよね。
一応記事中の例では両方「_」(アンダースコア)に変換しています。(コロンにしか触れてませんが…。)
最初は禁止文字があると知らず、なぜエラーになっているかわからずにハマってました。
この辺どっかにまとまってるのかな…?
どうやって回避するかというと、それがWITH SERDEPROPERTIESの部分です。

禁止文字を含む場合のマッピングの仕方について

最初見た時はなんでこんなことをするのかわかりませんでしたが、上記の通り項目名を定義するときに禁止文字が入っているとエラーになります。
なので、WITH SERDEPROPERTIESの項目で、禁止文字列を含んだ項目名を、禁止文字列のない文字列にマッピングし、元のjsonのkeyでは禁止文字列を含んでいたものに対し、テーブル上ではカラム名として別の文字列をあてがうことができます。
記事中では、コロンやハイフンをアンダースコアに変換した文字列にマッピングしています。
式の左側がカラム名に使いたい文字列で、それに対して右側がデータの元の実際の名前です。
"mapping.カラム名に使いたい文字列"="実際の名前" みたいに記述してます。

マッピングの仕方

WITH SERDEPROPERTIES (
  "mapping.ses_configurationset"="ses:configuration-set",
  "mapping.ses_source_ip"="ses:source-ip", 
  "mapping.ses_from_domain"="ses:from-domain", 
  "mapping.ses_caller_identity"="ses:caller-identity"
  )

クエリの投げ方

これもまあおまけで書いておくと、ここまでしっかりと下の項目までテーブルを定義しておくと、下の項目までクエリで引っ張ることが出来ます。
記事中では下記のような例が出ています。

元記事にある例
SELECT eventtype as Event,
         mail.timestamp as Timestamp,
         mail.tags.ses_source_ip as SourceIP,
         mail.tags.ses_caller_identity as AuthenticatedBy,
         mail.commonHeaders."from" as FromAddress,
         mail.commonHeaders.to as ToAddress
FROM sesblog2
WHERE eventtype = 'Bounce'

mail{〜}の下の項目を参照する時は上記のようにドットをつけて該当項目の名前を指定しています。
さらにその下の項目を参照する時はその後ろにさらにドットをつけています。
この辺は直感的にわかりやすいかもしれません。

おまけとかtipsとか

Firehoseで配置されたフォルダ構成ではパーティションを自動で切ってもらえない

hiveではフォルダが/bucketname/path/to/log/year=YY/month=MM/day=dd/foo
みたいな構成だと自動でパーティション設定してくれるらしいのですが、FirehoseでS3にデータ配置すると/bucketname/path/to/log/YYYY/MM/DD/fooみたいになるので、自分でパーティションを作成する必要があります。
パーティションがない状態でクエリを投げても1件も引っかかりません。
これを作るには下記のようなクエリを投げる必要があります。

elbログを対象としたテーブルにパーティションを作成する場合
ALTER TABLE database_name.table_name
ADD PARTITION (year='2016',month='08',day='28')
location 's3://elb-access-log/AWSLogs/00000000000000/elasticloadbalancing/ap-northeast-1/2016/08/28/';

※参考
https://qiita.com/r4-keisuke/items/d3d339b76d4368b6b30a

上記の例だと1日ずつパーティションを設定する必要があるのですが、
パーティション数には上限があるらしい(1テーブル20000まで)ので、1日ずつとか、1時間ずつとかフォルダ分けしている場合はちょっと注意が必要かもしれないです。
※パーティションの上限については下記
https://docs.aws.amazon.com/ja_jp/general/latest/gr/aws_service_limits.html#limits_glue
さすがに対象が多すぎとなるとしんどいので、シェルスクリプトとかで回すといいと思います。
ただ、シェルスクリプト自体も1つ1つの処理実行だとそこそこ時間かかるのと、パーティションを設定するためのクエリでクエリ履歴が埋め尽くされるのが難点です。

データ元にない項目を定義しても値がnullになるだけで問題はない

jsonの出力が一定じゃなくて、いくつかの似たような型のjsonが混ざっていたり、ものによって存在しない項目があったりしても、それらのキーを全て網羅するようにまとめて定義しちゃって問題ないみたいです。
定義したけどデータ元に項目がない場合はnullが入るだけのようで。
逆に元データにある項目を全部定義する必要はないので、元データにあっても使わないような項目はテーブル作成の段階で定義しないようにしてもいいみたいですね。

ざっと書いたので、わかりづらいとか、もっとこうすればみたいな指摘があればいただけると嬉しいです。

続きを読む

Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)

パーティション分割csv->パーティション分割parquet

ジョブの内容

※”Glueの使い方①(GUIでジョブ実行)”(以後①とだけ書きます)と同様のcsvデータを使います

“パーティション分割されたcsvデータを同じパーティションで別の場所にparquetで出力する”

ジョブ名

se2_job4

クローラー名

se2_in1
se2_out3

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

内容としては①と同じデータで、year,month,day,hourのパーティションごとに分けたcsvファイルを配置します。
year,month,day,hourのカラムは削除しています。

元となる①のデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

今回使う入力データ(19件)
year,month,day,hourのカラムは削除しています

$ ls
cvlog_2017111112.csv    cvlog_2017120101.csv    cvlog_2017121621.csv
cvlog_2017111414.csv    cvlog_2017120218.csv    cvlog_2017121714.csv
cvlog_2017112114.csv    cvlog_2017120904.csv    cvlog_2017121718.csv
cvlog_2017112915.csv    cvlog_2017121412.csv    cvlog_2017121908.csv
cvlog_2017113014.csv    cvlog_2017121414.csv    cvlog_2017121914.csv
cvlog_2017113020.csv    cvlog_2017121511.csv    cvlog_2017122915.csv
$ cat *
deviceid,uuid,appid,country
iphone,11121,001,JP
deviceid,uuid,appid,country
iphone,11123,009,FR
deviceid,uuid,appid,country
iphone,11119,007,AUS
deviceid,uuid,appid,country
other,11110,005,JP
iphone,11125,005,JP
deviceid,uuid,appid,country
iphone,11129,007,AUS
deviceid,uuid,appid,country
android,11122,001,FR
deviceid,uuid,appid,country
pc,11118,001,FR
deviceid,uuid,appid,country
pc,11117,009,FR
deviceid,uuid,appid,country
iphone,11128,009,FR
deviceid,uuid,appid,country
iphone,11111,001,JP
deviceid,uuid,appid,country
android,11112,001,FR
deviceid,uuid,appid,country
iphone,11116,001,JP
deviceid,uuid,appid,country
iphone,11113,009,FR
deviceid,uuid,appid,country
iphone,11124,007,AUS
deviceid,uuid,appid,country
iphone,11114,007,AUS
deviceid,uuid,appid,country
iphone,11126,001,JP
deviceid,uuid,appid,country
android,11127,001,FR
deviceid,uuid,appid,country
other,11115,005,JP

データの場所
year,month,day,hourのパーティションごとに分けたcsvファイルを配置しています

# aws s3 ls s3://test-glue00/se2/in1/year=2017/ --recursive
2018-01-04 09:38:13          0 se2/in1/year=2017/
2018-01-04 09:40:12          0 se2/in1/year=2017/month=11/
2018-01-04 09:40:38          0 se2/in1/year=2017/month=11/day=11/
2018-01-04 09:41:23          0 se2/in1/year=2017/month=11/day=11/hour=12/
2018-01-04 10:19:28         48 se2/in1/year=2017/month=11/day=11/hour=12/cvlog_2017111112.csv
2018-01-04 09:40:42          0 se2/in1/year=2017/month=11/day=14/
2018-01-04 09:41:41          0 se2/in1/year=2017/month=11/day=14/hour=14/
2018-01-04 10:19:45         48 se2/in1/year=2017/month=11/day=14/hour=14/cvlog_2017111414.csv
2018-01-04 09:40:47          0 se2/in1/year=2017/month=11/day=21/
2018-01-04 09:41:55          0 se2/in1/year=2017/month=11/day=21/hour=14/
2018-01-04 10:20:01         49 se2/in1/year=2017/month=11/day=21/hour=14/cvlog_2017112114.csv
2018-01-04 09:40:50          0 se2/in1/year=2017/month=11/day=29/
2018-01-04 09:42:09          0 se2/in1/year=2017/month=11/day=29/hour=15/
2018-01-04 10:20:22         67 se2/in1/year=2017/month=11/day=29/hour=15/cvlog_2017112915.csv
2018-01-04 09:41:01          0 se2/in1/year=2017/month=11/day=30/
2018-01-04 09:42:22          0 se2/in1/year=2017/month=11/day=30/hour=14/
2018-01-04 10:20:41         49 se2/in1/year=2017/month=11/day=30/hour=14/cvlog_2017113014.csv
2018-01-04 09:42:40          0 se2/in1/year=2017/month=11/day=30/hour=20/
2018-01-04 10:20:52         49 se2/in1/year=2017/month=11/day=30/hour=20/cvlog_2017113020.csv
2018-01-04 09:40:16          0 se2/in1/year=2017/month=12/
2018-01-04 09:43:11          0 se2/in1/year=2017/month=12/day=1/
2018-01-04 09:45:16          0 se2/in1/year=2017/month=12/day=1/hour=1/
2018-01-04 10:21:19         44 se2/in1/year=2017/month=12/day=1/hour=1/cvlog_2017120101.csv
2018-01-04 09:43:21          0 se2/in1/year=2017/month=12/day=14/
2018-01-04 09:46:50          0 se2/in1/year=2017/month=12/day=14/hour=12/
2018-01-04 10:22:28         48 se2/in1/year=2017/month=12/day=14/hour=12/cvlog_2017121412.csv
2018-01-04 09:47:01          0 se2/in1/year=2017/month=12/day=14/hour=14/
2018-01-04 10:22:51         49 se2/in1/year=2017/month=12/day=14/hour=14/cvlog_2017121414.csv
2018-01-04 09:43:38          0 se2/in1/year=2017/month=12/day=15/
2018-01-04 09:47:11          0 se2/in1/year=2017/month=12/day=15/hour=11/
2018-01-04 10:23:12         48 se2/in1/year=2017/month=12/day=15/hour=11/cvlog_2017121511.csv
2018-01-04 09:43:42          0 se2/in1/year=2017/month=12/day=16/
2018-01-04 09:47:23          0 se2/in1/year=2017/month=12/day=16/hour=21/
2018-01-04 10:23:34         48 se2/in1/year=2017/month=12/day=16/hour=21/cvlog_2017121621.csv
2018-01-04 09:43:45          0 se2/in1/year=2017/month=12/day=17/
2018-01-04 09:47:38          0 se2/in1/year=2017/month=12/day=17/hour=14/
2018-01-04 10:23:54         49 se2/in1/year=2017/month=12/day=17/hour=14/cvlog_2017121714.csv
2018-01-04 09:47:43          0 se2/in1/year=2017/month=12/day=17/hour=18/
2018-01-04 10:24:12         49 se2/in1/year=2017/month=12/day=17/hour=18/cvlog_2017121718.csv
2018-01-04 09:43:49          0 se2/in1/year=2017/month=12/day=19/
2018-01-04 09:48:04          0 se2/in1/year=2017/month=12/day=19/hour=14/
2018-01-04 10:25:07         49 se2/in1/year=2017/month=12/day=19/hour=14/cvlog_2017121914.csv
2018-01-04 09:47:59          0 se2/in1/year=2017/month=12/day=19/hour=8/
2018-01-04 10:24:56         48 se2/in1/year=2017/month=12/day=19/hour=8/cvlog_2017121908.csv
2018-01-04 09:43:15          0 se2/in1/year=2017/month=12/day=2/
2018-01-04 09:45:44          0 se2/in1/year=2017/month=12/day=2/hour=18/
2018-01-04 10:21:41         44 se2/in1/year=2017/month=12/day=2/hour=18/cvlog_2017120218.csv
2018-01-04 09:43:52          0 se2/in1/year=2017/month=12/day=29/
2018-01-04 09:48:17          0 se2/in1/year=2017/month=12/day=29/hour=15/
2018-01-04 10:25:26         47 se2/in1/year=2017/month=12/day=29/hour=15/cvlog_2017122915.csv
2018-01-04 09:43:18          0 se2/in1/year=2017/month=12/day=9/
2018-01-04 09:46:24          0 se2/in1/year=2017/month=12/day=9/hour=4/
2018-01-04 10:22:02         48 se2/in1/year=2017/month=12/day=9/hour=4/cvlog_2017120904.csv

S3のディレクトリ構成

Glueジョブの入力データは”in1″ディレクトリ配下、出力は”out3″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE in1/
                           PRE out0/
                           PRE out1/
                           PRE out2/
                           PRE out3/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

入力データ用に新しくクローラーを作り実行してテーブルを作ります。

出来上がるテーブルの情報は以下です。

スクリーンショット 0030-01-04 10.52.04.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job4ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”パーティション分割されたcsvデータを同じパーティションで別の場所にparquetとして出力する”です。

se2_job4
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションをマッピング対象のカラムとして含めてくれません
入力のパーティションのカラムをマップの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

↓↓↓

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job4_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out3/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
###add

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-04 15.54.04.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/
2018-01-04 15:15:41        926 part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet .
download: s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet to ./part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# ls
part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out3でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-04 15.59.33.png

S3の出力パスを入力
形式の違うデータが混在しているとテーブルが複数できてしまうので、不要なものがあれば、excludeで除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-04 16.00.06.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.19.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-04 16.00.28.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.36.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-04 16.00.49.png

クローラー実行

1つのテーブルとして認識している

スクリーンショット 0030-01-04 16.09.01.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-04 16.09.52.png

Athenaから確認

左メニューからse2_out3のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-04 16.11.18.png

件数も19件で合っている

スクリーンショット 0030-01-04 16.11.01.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばappidというカラムがあるので、アプリごとに集計をするようなケースが多いならappidも含めてパーティション分割する

他にもdeviceidとかでデバイスごとに集計したり
一時的な調査にも役立つかも

出力に”out4″ディレクトリ作成
①と同様のジョブをse2_job5で作成
PySparkに以下3点修正したジョブ作成

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションはカラムとして含めてくれません。
入力のパーティションのカラムをapplyの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['appid','year','month','day','hour']
output='s3://test-glue00/se2/out4/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

appidごとにパーティション別れてます

スクリーンショット 0030-01-04 16.30.35.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-04 16.33.32.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-04 16.33.49.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-04 16.34.10.png

件数も同じく19件

スクリーンショット 0030-01-04 16.34.39.png

その他

todo

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/latest/sql-programming-guide.html

続きを読む

Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)

パーティション分割するジョブを作る

ジョブの内容

※”Glueの使い方的な①(GUIでジョブ実行)“(以後①とだけ書きます)と同様のcsvデータを使います

“csvデータのタイムスタンプのカラムごとにパーティション分割してparquetで出力する”

ジョブ名

se2_job1

クローラー名

se2_in0
se2_out1

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

※①で作ったものを使います。

テーブルの情報は以下です。

スクリーンショット 0030-01-03 15.45.21.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job1ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”です。

se2_job1
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job1_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

### Add(start)
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
#dropnullfields3 = DynamicFrame.fromDF(df, glueContext, "dropnullfields3")
### Add(end)

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
###
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-03 16.25.57.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/
2018-01-03 16:25:49        926 part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet . 
download: s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet to ./part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head  part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out1でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-03 16.37.26.png

S3の出力パスを入力

スクリーンショット 0030-01-03 16.37.50.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.38.46.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-03 16.38.55.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.39.03.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-03 16.39.16.png

クローラー実行

形式の違うデータが混在しているとテーブルが複数できてしまう

スクリーンショット 0030-01-03 16.41.46.png

クローラーを作り直し、”Add a data store”の”Exclude patterns”の箇所で
不要なものがあれば除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-05 20.18.38.png

クローラー実行後
1つのテーブルとして認識される

スクリーンショット 0030-01-03 16.48.52.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-03 16.49.09.png

Athenaから確認

左メニューからse2_out1のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-05 20.39.54.png

件数も19件で合っている

スクリーンショット 0030-01-03 16.51.35.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばcountryというカラムがあるので、国ごとに集計をするようなケースが多いならcountryも含めてパーティション分割する

他にもappidとかでアプリごとに集計したり
一時的な調査にも役立つかも

出力に”out2″ディレクトリ作成
①と同様のジョブをse2_job3で作成
作成したジョブのPySparkスクリプトに以下2点修正

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['country','year','month','day','hour']
output='s3://test-glue00/se2/out2/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

countryごとにパーティション別れてます

スクリーンショット 0030-01-03 17.33.24.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-03 17.36.02.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-03 17.37.19.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-03 17.39.12.png

件数も同じく19件

スクリーンショット 0030-01-03 17.38.57.png

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/preview/api/python/pyspark.sql.html
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

続きを読む

Glueの使い方的な①(GUIでジョブ実行)

GUIによる操作でGlueジョブを作って実行する

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

ジョブ名

se2_job0

クローラー名

se2_in0
se2_out0

全体の流れ

  • 前準備
  • クローラー作成と実行、Athenaで確認
  • ジョブの作成と実行、Athenaで確認
  • 出来上がったPySparkスクリプト確認

前準備

ジョブで使うIAM role

以下のポリシーを付与した任意の名前のロールを作っておく。今回はtest-glueという名前にした。
・AmazonS3FullAccess
・AWSGlueServiceRole
※権限は必要に応じてより厳しくしてください。今回は検証のため緩めにしてあります。

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

S3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

クローラー作成と実行、Athenaで確認

ジョブ作成の前に、こちらもGlueの重要な機能の1つであるData CatalogのCrawlerを作成して動かしていきます。
クローラはデータをスキャン、分類、スキーマ情報を自動認識し、そのメタデータをデータカタログに保存する機能です。これを使って入力となるサンプルデータのスキーマを事前に抽出してデータカタログにメタデータとして保存します。Athena、EMR、RedshiftからもこのGlueのデータカタログを使えます。Hive互換のメタストアです(Hiveメタストアのマネージドサービスと思ってもらえればいいと思います)。

基本的な操作はGUIを使って行えます。

AWSマネージメントコンソールから、Glueをクリックし、画面左側メニューの”Crawlers”をクリックし、”Add crawler”をクリック
スクリーンショット 0029-12-28 13.24.04.png

クローラーの名前入力
スクリーンショット 0029-12-28 11.19.32.png

S3にあるソースデータのパス入力(今回はS3に配置してあるデータが対象)
スクリーンショット 0030-01-02 15.17.11.png

そのまま”Next”
スクリーンショット 0029-12-28 9.38.41.png

“Choose an existing IAM role”にチェックを入れ、IAM roleをプルダウンからtest-glueを選択する
スクリーンショット 0030-01-02 15.17.49.png

“Run on demand”にチェックを入れ”Next”(今回のクローラーはスケジュールせずに手動実行とする)
スクリーンショット 0029-12-28 9.39.03.png

スキーマ情報を保存するDatabaseを選択、既存のものがあればそれでもいいし、なければ下の”Add database”でdatabase作成しそれを選択
Prefixは作成されるテーブル名の先頭に付くもの。見分け分類しやすいものにしておくと良いと思います(現状テーブルにtagとかつけられないので)
スクリーンショット 0029-12-28 9.39.23.png

クローラー実行

対象のクローラーにチェックを入れ、”Run Crawler”をクリック
スクリーンショット 0030-01-01 17.02.58.png

テーブルが出来上がる

“se2_”のPrefixが付いた”se2_in0″のテーブルができています。”in0″は指定した”Include Path”の一番下のディレクトリ名です。指定した”Include Path”(ソースデータがあるS3のパス、画像の”Location”の部分に表示されている)の配下のディレクトリは自動でパーティションとして認識されます。今回は配下にディレクトリはありませんのでパーティションも作成されません。
スクリーンショット 0030-01-02 15.21.15.png

テーブルの内容を確認するとスキーマが自動で作成されています
実データ配置場所のLocationがs3://test-glue00/se2/in0
Table名のNameが、prefixのse2_とInclude Pathで指定したs3://test-glue00/se2/in0の一番下のディレクトリのin0でse2_in0

Schemaを見るとuuidやappidなどがbigintで数値型になってます、文字列型がよければここでも修正できます。
今回は一旦このまま進めます
※本来はClassifierでいい感じにしたほうがいいと思う

スクリーンショット 0030-01-02 15.23.51.png

AthenaからもGlueのData Catalog使えます
Athenaから同様のテーブルとスキーマの内容が確認できます。Athenaがスキーマ情報にGlueのデータカタログを使ってることがわかります。画面右上にもGlue Data Catalogへのショートカットもありますね。
スクリーンショット 0030-01-02 15.27.07.png

もちろんクエリ実行もできます。
スクリーンショット 0030-01-02 15.29.31.png

ジョブの作成と実行、Athenaで確認

今回のようなジョブであれば、基本は画面ポチポチするだけです

Glueのメニューから”ETL”の”Job”をクリックし、”Add job”をクリック
スクリーンショット 0030-01-01 17.36.49.png

“Name”にジョブ名を入れ、”IAM role”はクローラーでも使ったロールを指定、”Temporary directory”は任意の場所で構いません。
スクリーンショット 0030-01-02 15.31.34.png

ソースデータとなるテーブルを選択。(先程作成したテーブルをクリック)
スクリーンショット 0030-01-02 15.33.35.png

ターゲットとなるテーブルは作成していないのでここで作ります。
今回は、保存先のData storeは”S3″、出力ファイルフォーマットのFormatは”Parquet”、出力先のパスのTargetPathは”任意の場所”を指定
※圧縮も選べます
スクリーンショット 0030-01-02 15.34.41.png

ソースデータとターゲットデータのマッピング変換ができます。いらないカラムを出力から除外したり、カラムの順序を変えたり、”Data Type”をstring,long,intなどに変えたり、新しいカラムを追加してそこに出力させる(カラム名を変える時に使えそうです)などができます
スクリーンショット 0029-12-28 10.08.35.png

次にサマリがでますので問題なければ”Finish”をクリック

ジョブの実行

作成したジョブにチェックを入れ、”Action”から”Run job”をクリック
スクリーンショット 0029-12-28 10.27.38.png

数分待って以下のように”Run status”が”Succeeded”となれば問題なく完了しています。
※問題があれば、”Error”の箇所にエラーの概要、”Logs”、”Error logs”の箇所の出力がリンクになっていてクリックするとCloudWatch logsに移動します。GlueのログはデフォルトでCloudWatch logsに出力されます
スクリーンショット 0030-01-02 15.46.47.png

出力したParquetフォーマットのファイルを、ソースデータと同様にクローラーを使ってスキーマを作り、スキーマオンリードでAthenaクエリを実行してみます。クローラー作成手順は前回と同様なので割愛します。
クローラーにより自動作成されたスキーマ
スクリーンショット 0030-01-02 15.49.41.png

クエリ結果です。データ量が少なくselectしてるだけなのでアレですが、parquetになったので列単位での集計処理などが高速化されるデータフォーマットに変換ができました。
スクリーンショット 0030-01-02 15.51.00.png

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”くらいであればGlueはGUIだけでサーバーレスでできます。

出来上がったPySparkスクリプト確認

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ビルトイン変換のいくつか補足

ApplyMapping:ソースの列とデータ型をターゲットの列とデータ型にマップします

ResolveChoice:複数の型の値が含まれている場合の列の処理方法を指定します。列を単一のデータ型にキャストするか、1つ以上の型を破棄するか、またはすべての型を別々の列または構造体に保持するかを選択できます
make_structは構造体を使用してデータを表現することにより、潜在的なあいまいさを解決します。たとえば、列のデータがintまたはstringの場合、make_structアクションを使用すると、生成されたDynamicFrameにintとstringの両方を含む構造体の列が生成されます。
choiceはspecが空の場合のデフォルトのresolution actionです

DropNullFields:nullフィールドを削除します

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

ジョブを作る際に似たようなジョブを作りたい、テスト段階でジョブのパラメータを一部だけ変えたジョブを作りたいことあると思います。現在はジョブのコピーがGUIからはできないのでそのあたりの運用を考慮する場合はCLIの利用がおすすめです。またあとで書きます。

こちらも是非

似た内容ですがより丁寧にかかれています。安定のクラメソブログ
https://dev.classmethod.jp/cloud/aws/aws-glue-released/
https://dev.classmethod.jp/cloud/aws/aws-glue-tutorial/

Built-In Transforms
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/built-in-transforms.html
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-ApplyMapping.html

続きを読む

Amazon Athenaではじめるログ分析入門

はじめに

Amazon AthenaはAWSの分析関連サービスの1つで、S3に保存・蓄積したログに対してSQLクエリを投げて分析を行えるサービスです。分析基盤を整えたり分析サービスにログを転送したりする必要が無いため、簡単に利用できるのが特長です。

今回はAthenaを使ってこんなことできるよー、というのを紹介したいと思います。

※社内勉強会向け資料をQiita向けに修正して公開しています

ログ分析とAmazon Athena

ログ分析は定量的にユーザ行動を分析してサービスの改善に役立つだけでなく、障害時の調査にも役立つなど非常に便利です。ログ分析に利用されるサービスとしてはGoogle BigQueryやAmazon Redshiftなど様々なものがありますが、その中でAmazon Athenaの立ち位置を確認したいと思います。

ログ分析の流れ

ログ分析の基盤の概念図は下記のようになります。

Screen Shot 2017-12-11 at 17.22.23.png

この図からわかるように、考えることは多く、どのようなログを出すのか、どのように分析用のDBやストレージにデータを移すのか(ETL)、分析エンジンは何を使うのか、可視化のためにどのようなツールを利用するかなどの検討が必要です。

また初期は問題なく動作していてもデータ量が増えるうちに障害が起きたりログがドロップしてしまうなど多くの問題が出てきます。

そのため、多くの場合専門のチームが苦労をしながら分析基盤を構築・運用しています。

Amazon Athenaとは

Amazon Athenaはサーバレスな分析サービスで、S3に直接クエリを投げることができます。AWS上での分析としてはEMRやRedShiftなどがありますが、インスタンス管理などの手間があり導入にはハードルがありました。

Athenaでは分析エンジンがフルマネージドであり、ログ収集の仕組みやサーバ管理の必要がなく分析クエリを投げられます。

Screen Shot 2017-12-11 at 17.22.30.png

RDBのテーブルなどとのJOINは(データをS3に持ってこないと)できないためあくまで簡易的な分析にとどまりますが、ログを集計するというだけであればとても簡単に分析の仕組みが構築できます。

Screen Shot 2017-12-05 at 14.28.56.png

料金について

BigQueryなどと同じクエリ課金です。一歩間違えると爆死するので注意は必要です。

  • ストレージ

    • S3を利用するのでS3の保存料金のみ
  • クエリ
    • スキャンされたデータ 1 TB あたり 5 USD
  • データ読み込み
    • S3からの通常のデータ転送料金。AthenaとS3が同一リージョンなら転送料金はかからない。

使ってみる – ELB(CLB/ALB)のログを分析

Athenaのユースケースとして一番簡単で有用なのはELBのログ分析だと思っています。ここでは簡単に利用手順を紹介します。

1. S3バケットを作成し適切なアクセス権を付与

ドキュメントを参考にS3のバケットを作成し、アクセスログを有効化すると、15分毎にアクセスログが出力されます。

http://docs.aws.amazon.com/ja_jp/elasticloadbalancing/latest/application/load-balancer-access-logs.html#enable-access-logging

2. Athena でテーブルを作る

公式ドキュメントにそのまま利用できるクエリが載っているので、AWSマネージメントコンソールからAthenaのQueryエディタを開きこれを実行します。

http://docs.aws.amazon.com/athena/latest/ug/application-load-balancer-logs.html

3. クエリを実行

Athenaの実行エンジンPrestoは標準SQLなので通常のRDBMSへのクエリのように実行できます。

SELECT count(*)
FROM alb_logs

使ってみる – ELBログから様々なデータを取得する

ELBのログに出力されている内容はそこまで多くないものの、意外と色々取れたりします。

1. あるエンドポイントの日別アクセス数を調べる

SELECT date(from_iso8601_timestamp(time)),
         count(*)
FROM default.alb_logs
WHERE request_url LIKE '%/users/sign_in'
        AND date(from_iso8601_timestamp(time)) >= date('2017-12-01')
GROUP BY  1
ORDER BY  1;

2. 直近24時間の500エラーの発生数を調べる

SELECT elb_status_code,
         count(*)
FROM default.alb_logs
WHERE from_iso8601_timestamp(time) >= date_add('day', -1, now())
        AND elb_status_code >= '500'
GROUP BY  1
ORDER BY  1;

3. レスポンスに1.0s以上時間がかかっているエンドポイント一覧を出す

SELECT request_url,
         count(*)
FROM alb_logs
WHERE target_processing_time >= 1
GROUP BY  1
ORDER BY  2 DESC ;

4. ユーザ単位の行動ログを出す

Cookieは出せないのでIPから。連続したアクセスだとポートも同じになるのでそこまで入れても良い。

SELECT *
FROM alb_logs
WHERE client_ip = 'xx.xxx.xxx.xxx'
        AND timestamp '2017-12-24 21:00' <= from_iso8601_timestamp(time)
        AND from_iso8601_timestamp(time) <= timestamp '2017-12-25 06:00';

5. あるページにアクセスした後、次にどのページに移動しているかを調べる

3回joinしているのでちょっとわかりづらい。これもIPがユーザー毎にユニークと仮定しているので正確ではない。

SELECT d.*
FROM 
    (SELECT b.client_ip,
         min(b.time) AS time
    FROM 
        (SELECT *
        FROM alb_logs
        WHERE request_url LIKE '%/users/sign_in') a
        JOIN alb_logs b
            ON a.time < b.time
        GROUP BY  1 ) c
    JOIN alb_logs d
    ON c.client_ip = d.client_ip
        AND c.time = d.time
ORDER BY  d.time

運用してみる

Athenaについて何となくわかってもらえたでしょうか。次に、実際に運用するときのためにいくつか補足しておきます。

パーティションを切る

ログデータはすぐにTB級の大きなデータとなります。データをWHERE句で絞るにしてもデータにアクセスしないことには絞込はできませんので、Athenaは基本的には保存されている全てのデータにアクセスしてしまいます。

これを防ぐためにパーティションを作って運用します。パーティションを作成するにはCREATE TABLEでPARTITIONED BYでパーティションのキーを指定しておきます。

CREATE EXTERNAL TABLE IF NOT EXISTS table_name (
  type string,
  `timestamp` string,
  elb string,
  client_ip string,
  client_port int,
  target_ip string,
  target_port int,
  request_processing_time double,
  target_processing_time double,
  response_processing_time double,
  elb_status_code string,
  target_status_code string,
  received_bytes bigint,
  sent_bytes bigint,
  request_verb string,
  url string,
  protocol string,
  user_agent string,
  ssl_cipher string,
  ssl_protocol string,
  target_group_arn string,
  trace_id string )
 PARTITIONED BY(d string)
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' WITH SERDEPROPERTIES (  'serialization.format' = '1',
  'input.regex' = '([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\" ("[^"]*") ([A-Z0-9-]+) ([A-Za-z0-9.-]*) ([^ ]*) ([^ ]*)$' )
LOCATION
 's3://bucket_name/prefix/AWSLogs/123456789012/elasticloadbalancing/ap-northeast-1/'

実際のパーティションの構築るには2種類の方法があります。

http://docs.aws.amazon.com/athena/latest/ug/partitions.html

1.ファイルの保存パスにパーティションの情報を入れる(Hiveフォーマット)

s3のkeyを例えば s3://bucket_name/AWSLogs/123456789012/d=2017-12-24/asdf.logという形で保存します。

この状態で次のコマンドを実行すると自動でパーティションが認識されます。

MSCK REPAIR TABLE impressions

2.ALTER TABLEを実行する

ELBのログなどAWSが自動で保存するログは上記のような形式で保存できないので、直接パーティションを作成します。

ALTER TABLE elb_logs
ADD PARTITION (d='2017-12-24')
LOCATION 's3://bucket/prefix/AWSLogs/123456789012/elasticloadbalancing/ap-northeast-1/2017/12/24/'

これらを毎日実行するようなLambdaなどを作成して運用することになります。

BIツールを利用する

Athenaを実行するだけだと生データが手に入るだけなので、必要に応じてグラフにしたりといったことが必要になるかと思います。

ExcelやGoogle SpreadSheetでも良いですが、BI(Business Intelligence)ツールと呼ばれるものを使って定期的なクエリ実行やその可視化、通知などを実現できます。

Athenaに対応しているものではAWSのサービスの一つであるQuickSightやRedashがあります。

列指向フォーマットについて

データ量が増大してくると、クエリの実行時間が増えると同時にお金もかかるようになってきます。そんな時に利用を検討したいのが列指向フォーマットです。

列指向フォーマットでは列単位でデータを取り出せるため、JOINやWHEREを行う際にすべてのデータを取り出す必要がありません。そのため、高速にかつ低料金でクエリを実行できます。

通常のログは行単位で出力されているため、あらかじめ変換処理を行う必要があります。これにはEMRを利用する方法がAWSで解説されているので、大量データに対して頻繁にクエリを行う際は利用を検討してみてください。

https://aws.amazon.com/jp/blogs/news/analyzing-data-in-s3-using-amazon-athena/

アプリケーションのログを分析する(CloudWatch Logsの場合)

ここまではELBのログでここまでできる的な内容でしたが、実際にはアプリケーションが出力したログを利用したくなります。とにかくS3に集めれば良いのでFluentdなどで集めればOKです。

ただ最近はECSやElastic Beanstalkを使っているとCloudWatch Logsに集約しているケースも増えてきているのではないかと思います。この場合S3に持っていくのが微妙に手間になってきます。CloudWatch LogsではS3にログをエクスポートできますが、通常では特定のログをフィルタをして出力したいと思うので、少し工夫が必要になります。

例えば下記のような構成です。

Screen Shot 2017-12-11 at 17.43.23.png

こちらについては https://aws.amazon.com/jp/blogs/news/analyzing-vpc-flow-logs-with-amazon-kinesis-firehose-amazon-athena-and-amazon-quicksight/ この記事などが参考になります。

おわりに

Amazon Athenaの使い方について自分の持っている知識をまとめてみました。Athenaの利用までの簡単さはログ分析の導入としては非常にハードルが低く、とても有用だと思っています。

他の分析サービスとどう違うのかとかは難しいところですが、AthenaはManagedなPrestoであってそれ以上ではなく、EMRやRedshiftの方が上位互換的に機能が多いので、Athenaで出来ないことが出てきたら他を使うとかでも良いのかなぁと思っています。(ここは詳しい人に教えて貰いたいところ。。)

S3 Selectという機能も出てきてS3ログの分析が更に柔軟になっていく予感もありますのでその導入としてのAmazon Athenaを触ってみてはいかがでしょうか。

注意事項

  • ELBログについて

    • ベストエフォート型のため、全てのログの取得は保証されていません
    • ELBのログはUTCで保存するのでJSTの日付とはパーティションがずれます
    • ログは15分ごとにまとめられるので 23:45〜23:59 頃のログは翌日のパーティションに入ってしまいます

参考資料

続きを読む

AWS AthenaでBigデータ解析

WHY Athena

複数社のASPで吐き出されたフォーマットばらばらの膨大なcsvデータ達を結合しまくってデータ解析せなあかん時ってありますよね

  • いちいちDBサーバー作ってインポートしてらんない
  • いちいちフォーマット合わせてらんない
  • ExcelでVLOOKUPとか処理重すぎてクライアント動かない

そこでAthena

Analytics_AmazonAthena.png

WHAT Athena

  • サーバーレスで環境構築不要
  • S3にファイルを上げてAthenaのコンソール画面でSQL叩くだけで結果出力
  • Prestoベースなので重たいクエリの結果が早い
  • 出力されたデータはcsvでDL可能
  • S3の標準の金額とAthenaのクエリ従量なので安い(クエリのスキャン 1 TB あたり 5 USD)

いいことだらけ

Saving Point

リージョン

実行タイミングが毎日何百回とかでなければS3のリージョンは 米国東部 (バージニア北部) か 米国西部 (オレゴン) がオススメ

Athenaはどのリージョンでも金額は変わらない

スクリーンショット_2017-12-12_14_48_47.jpg

データ圧縮

本データがGZIP圧縮されてればその分スキャンデータが少なくなるので安くなる

列のチューニング

元データに不要な列はなるべく消すこと、消せば消した分だけスキャンデータが少なくなるので安くなる

列指向データ形式(Apache Parquet や Apache ORC)に形式を変換すれば不要な列を読まなくなるのでこれも安くなる

列の削除はExcelやエディタでの編集も良いが、AWS Glueを使うと良い

AWS Glue

Athenaの作業がもっと楽になるやつ
S3の本データをの変換が素早く出来る+Athena用のデータ形式に変換してくれる
他社都合で勝手にデータの列形式とか変わっちゃった時にお役立ち
東京リージョンデビューはもう少しかかる模様なので 米国東部 (バージニア北部) か 米国西部 (オレゴン) で

詳しい使い方はクラスメソッドさんのところ

パーティション

テーブル作成時にパーティション設定することで、クエリの条件に日付関連の方法を入れると、勝手に条件外のデータを読まなくなります

日毎とか週毎、月毎などに分類されるデータはパーティション設定するといいでしょう

パーティションにするにはS3に上げる時にディレクトリを dt=ほにゃらら にしてCREATE文に PARTITIONED BY って書けばいいです

こんな感じ

yakiniku.osushi.sql
CREATE EXTERNAL TABLE IF NOT EXISTS yakiniku.osushi (
  `name` string,
  `media_name` string,
  `media_type` string,
  `device` string,
  `carrier` string,
  `click_date` timestamp,
  `regist_date` timestamp,
  `user_id` string,
  `session_id` string 
) PARTITIONED BY (
  dt string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'field.delim' = ','
) LOCATION 's3://yakiniku-tabetai/osushi-tabetai/';

パーティション用のディレクトリを新たに作った場合は、テーブルに認識する必要があるので、Load partitionsしましょう

スクリーンショット_2017-12-12_14_28_15.jpg

Trouble Shooting

クエリは262144バイト以内

クエリが長すぎだとこんな風に怒られます、修正しましょう

Your query has the following error(s):

Your query has exceeded the maximum query length of 262144 bytes. Please reduce the length of your query and try again. If you continue to see this issue after reducing your query length, contact customer support for further assistance. (Service: AmazonAthena; Status Code: 400; Error Code: InvalidRequestException; Request ID: 1effab6b-df02-11e7-820b-c99df83a2b03)

稀にクエリが通らない

最近は頻度は減りましたが、並行で重たいクエリを投げてると稀にエラーになります

サーバレスアーキテクチャ上Lambdaみたいなもんだと思っているので、負荷によるエラーはないと思っているのですが、そういうときは時間を追いて再度実行するとクエリが通ります

S3 SELECTでいいんじゃね?

S3 SELECTはAWS re:Invent 2017で発表されたサービスで、S3にあるデータファイルを直接SELECT分で出力できるみたいです

今はプレビュー版ですが、AWSから許可してもらったので試した所、group by、order by 使えない等の制約があったため使っていません

あとCLI慣れてないと他にも躓くところがあるかもです

ただ、今後今回のようなケース(他社ASPからデータDLして分析)はS3 SELECTで十分になるかもしれません

あくまでAthenaはCLI苦手な人用となるのかも

おわり

続きを読む

EMRクラスターを起動するシェルスクリプト

… test-cluster” aws emr create-cluster –name test-cluster –ami-version 3.4.0 –applications Name=Hive Name=Hue –instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=c3.xlarge –use-default-roles … 続きを読む