Amazon Linux2とAmazon Linuxの違いについて(メモ)

Amazon Linux2が発表されたので、今までのAmazonLinuxとの違いについてメモしてみます。
基本的にはRHEL7ベースに変わっているので、CentOS7の知識が役に立ちます。
検証にはHyper-V上にインストールしたAmazonLinux2(amzn2-hyperv-2017.12.0.20171212.2-x86_64.xfs.gpt)を使用しました。

  • initからsystemdに変わった。

    • 今まで/etc/rc3.d/xxxとか作っていたり、chkconfigとかは使えない。
  • ホスト名の設定方法が違う

    • Amazon Linux
     /etc/sysconfig/networkを編集
     /etc/hostnameを編集
    
    • Amazon Linux2
      hostnamectl set-hostname amazonlinux2.example.com
    
  • epelリポジトリがない。Amazon公式リポジトリに色々入っていない。

    • CentOSのように、コミュニティリポジトリをガシガシ入れないといけない?

      • ミドルウエアはDockerでコンテナ化する前提なのかなぁ。
      • CentOSみたく、あとで相性問題でyum update出来ないとか起きそうだし、セキュリティパッチの提供とかあるので今まで通りAmazonのリポジトリから入れられると良いんだけどなぁ・・
  • タイムゾーンの変更方法が違う

    • Amazon Linux
     /etc/sysconfig/clockを編集
     cp /usr/share/zoneinfo/Asia/Tokyo /etc/localtime
    
    • Amazon Linux2
     timedatectl set-timezone Asia/Tokyo
    
  • MySQLが公式リポジトリにない。一見MySQLがインストールされるように見えるが、MariaDBが入る。

    [root@amazon ~]# yum install mysql
    Loaded plugins: langpacks, update-motd
    Resolving Dependencies
    --> Running transaction check
    ---> Package mariadb.x86_64 1:5.5.56-2.amzn2 will be installed
    --> Finished Dependency Resolution
    
    Dependencies Resolved
    
    ================================================================================
    Package        Arch          Version                   Repository         Size
    ================================================================================
    Installing:
    mariadb        x86_64        1:5.5.56-2.amzn2          amzn2-core        8.9 M
    
    Transaction Summary
    ================================================================================
    Install  1 Package
    
    • MySQLを入れる場合はコミュニティリポジトリを追加して入れるしかないっぽい。
    • 入るMariaDBのバージョンが微妙に古いが、amazon-linux-extrasでmariaDB単品で入れる方法がない?
      • amazon-linux-extras install lamp-mariadb10.2-php7.2 とするとリポジトリが追加される&phpとMariaDBがインストールされるのだが、何故かMariaDBはクライアントしか入らないので注意。(書いてある通りphpとMariaDBしか入らないので、Apacheも入らないのですが、まさかlampと書いてあってDBサーバが入らないとは。RDS利用を想定??)
  • 同じく、PHPが5.4しかない。amazon-linux-extrasで入るのは7.2で、それ以外のバージョンは無いので、他のバージョンを入れたい場合はRemiなどで入れるしかない。ただしその場合パッケージ名が違うので注意

    • php71-devel -> php71-php-develなど。
    • 今のところalternatives –config phpで複数バージョンを切り替えられないようです。
    • 何故かphp-fpmが入るので、Nginx前提っぽい。
  • ファイルシステムがXFSになったので、fallocateで作ったファイルをswaponすることができない。ので、SWAP用のEBSを別でつけて、スワップ領域とするか、DDで固定容量のスワップファイルを作成してマウントする。AmazonLinuxの頃にやっていたクラスメソッドさんの記事は動きませんので、EC2のCentOS7でSWAP領域を作る方法にあるようにすればいいのですが、起動毎に数分かかるのでちょっと・・・

    • ついでに、/etc/rc.localに実行権限は付いていないので注意です。
  • その他

    • Hyper-V版ではpostfixがインストールされていたのですが、EC2版ではMTAがインストールされていなかった。(sendmailコマンド自体がなかった)
    • Apache2.4が標準になったので、パッケージ名が変更されています。Chefなど構成管理を使っていると修正が必要
      • httpd24 -> httpd
      • mod_php70 -> mod_php(extra入れた場合)
      • mod24_ssl -> mod_ssl
    • Chefのohaiで取得できる情報が違いますので、レシピの条件分岐で注意しましょう。

      [root@amazon ~]# ohai | grep platform
      "platform": "amazon",
      "platform_version":"2.0",
      "platform_family": "rhel",
          "platform": "x86_64-linux",
      
      • systemdになったので動くか不安でしたが、Chef側で検知できるので通常のserviceで問題ありませんでした。
    • ntpが動いていない!と思ったらchronydになったようです。

      • ntpqで時刻監視しているところは注意が必要。
    • rsyslogがない!と思ったらsystemd-journald.serviceになったようです。

      • ntpqはないので、アンインストールしてntpを入れるか、chronyc sourcesが同等コマンドらしいので書き換える。
     [root@amazon ~]# systemctl status rsyslog.service
    Unit rsyslog.service could not be found.
    [root@amazon ~]# systemctl status systemd-journald.service
    ● systemd-journald.service - Journal Service
    
    • ifconfigやpstreeは生き残っているみたいw
    • yum check --securityを実行すると、Hyper-V環境だと何故かCPU100%で暴走しましたOrz
  • 追記

    • amazon-linux-extrasでphp7.2を入れたところ、ssh2やzipなどエクステンションが全然足りない。

      • 結局amzn2-coreリポジトリを無効化してepelとremiから入れた。無効化すればパッケージ名を気にしなくてよいが、アップデート時に衝突する、yum update securityが効くのか疑問なので、コアのほうでパッケージが充実してほしい。
    • Apacheのバージョンが2.4.6となんと現行AmazonLinuxよりも古いため、SSLSessionTicketsがエラーになる。地味に嵌った。nginx使えという事なのかなぁ。
    • 標準で入っているPythonは2系ですが、pipが入っていないので個別インストールが必要

続きを読む

AnsibleでAWSの複数のEC2にWordPressをS3プラグインの設定も行いながら100サイト以上デプロイする方法。

課題

Wordpressで構築されたWEBパッケージを単一サーバのバーチャルホストで提供していました。
1年ほど経過した時点で、200個近くに膨れ上がってしまいました。
また、動画を扱うサービスのためストレージが不足するようになってきました。

柔軟に複数のサーバに同一ドメインのバーチャルホストを分散してデプロイしたら解決すると考えました。

今回の目標は次の通りです。

  • 複数のサーバにバーチャルホストを分散させたい。
  • 低価格サービスなので多重化は行わない。
  • route53の設定も自動化したい。
  • iamの設定も自動化したい。
  • jenkinsでデプロイを自動化したい。
  • S3プラグインを使用して動画・画像はS3に追い出したい。

例として、EC2インスタンスが2台あって以下のようにデプロイするイメージです。

【EC2インスタンス1】
site01.hoge.com
site02.hoge.com
site03.hoge.com
site04.hoge.com
site99.hoge.com

【EC2インスタンス2】
site10.hoge.com
site11.hoge.com
site12.hoge.com
site13.hoge.com

site199.hoge.com

対応

以前作った、単一のEC2にバーチャルホストを追加していくAnsibleのスクリプトをがあります。それを改良して複数ホスト対応にする事にしました。

そのスクリプトはまた別の記事でご紹介したいと思います。

yamlの書き方の考える

以前の方法は、単一サーバに対して複数のroleのパラメータを変えて何度も実行させるという方法でした。しかし、この方法だと複数ホストに異なるバーチャルホストをデプロイすることができません。
単にホストインベントリーにホストを追加しただけだと、全く同一設定のホストが2個できます。Ansibleの使い方としては真っ当ですが、今回の要件に適していません。

site.yml

  roles:
    - common
    - { role: "ec2" }
    - { role: "apache" }
    - { role: "vhost-wordpress",
        vhost_name: "site01" }
    - { role: "vhost-wordpress",
        vhost_name: "site02" }
    - { role: "vhost-wordpress",
        vhost_name: "site03" }

今回はYAMLを以下のように変更しました。

ホストインベントリに複数のサーバを指定します。

hosts

host1
host2

そして各ホストにバーチャルホストのレイアウトを記述します。

host_vars/host1

vhosts:
    site1.hoge.com:
        param1: huga
    site2.hoge.com:
        param1: huga

host_vars/host2

vhosts:
    site10.hoge.com:
        param1: huga
    site11.hoge.com:
        param1: huga

loop内で複数の処理を行う。

各ホストのProvisioningはいつも通りroleで定義した処理を実行させます。
tasksのセクションで各ホスト毎に違った処理を行わせるために、include_tasksを使ってループの内側の処理を実行させます。

---
- hosts: webservers
  become: yes
  become_user: root
  roles:
    - ec2
    - common
    - apache
  tasks:
    - include_vars: vars/defaults.yml
    - name: setup vhost
      include_tasks: tasks/deploy_wp_vhost.yml
        HOST="{{ inventory_hostname }}"
        HOSTNAME="{{ item.key.split('.')[0] }}"
        VHOSTNAME="{{ item.key }}"
        MANAGE_PASSWORD="{{  item.value.manage_password }}"
      with_dict: "{{ hostvars[inventory_hostname].vhosts }}"

Ansibleではloop内に複数の処理を書くことはできません。
別のymlに処理を追い出して、それをincludeすることでloop内の複雑な処理を実行させています。

vhostのサブドメインをroute53に登録したい

設定したバーチャルホストをブラウザで見れる状態にするにはDNSレコードを設定する必要があります。
AnsibleにはAWSのRoute53を操作するモジュールがあります。

- name: route53 update
  route53:
    aws_access_key: "{{ lookup('env', 'AWS_ACCESS_KEY_ID') }}"
    aws_secret_key: "{{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}"
    state: present
    overwrite: yes
    zone: "{{ R53_ZONE }}"
    record: "{{ VHOSTNAME }}"
    type: A
    ttl: 300
    value: "{{ HOST }}"
  delegate_to: localhost
  become: false
  register: r53_status

delegate_to: localhost という記述があります。これは実行を指定したホストの移譲せよという指定です。
この記述がないとroute53モジュールはリモートのpythonで実行されます。
Ansibleを動かしているホストで実行したい場合は注意しましょう。
レコードが変更されたかどうかは、r53_status.changedで判定できます。

S3バケットを設定する

EC2のサーバのストレージを食いつぶすwordpressのmediaファイルをS3に追い出してしまいたいので、S3のバケットも自動で準備しましょう。

- name: S3バケットを作成しています。 "{{ bucket_name }}"
  s3_bucket:
    name: "{{ bucket_name }}"
    policy: "{{ lookup('template','public.json.j2') }}"
    state: present
    region: ap-northeast-1
    aws_access_key: "{{ lookup('env', 'AWS_ACCESS_KEY_ID') }}"
    aws_secret_key: "{{ lookup('env', 'AWS_SECRET_ACCESS_KEY') }}"
  delegate_to: localhost
  become: false
  register: s3_status

s3_bucketモジュールを使えば簡単です。先程のモジュールと同じでansbileを動作させているホストで実行したいので、同じくdelegate_toを指定してあります。

また、作成したバケットにpublic_readを与えたいので、バケットポリシも同時に指定しています。

public.json.j2

{
  "Version":"2012-10-17",
  "Statement":[{
    "Sid":"PublicReadGetObject",
        "Effect":"Allow",
      "Principal": "*",
      "Action":["s3:GetObject"],
      "Resource":["arn:aws:s3:::{{ bucket_name }}/*"
      ]
    }
  ]
}

JSONファイルもテンプレート化してしまえるのでとても便利です。

バケットごとにIAMを発行して個別にアクセスキーを発行したい

以下、IAMとクレデンシャル取得のyamlの例です。
Ansibleのiamとiam_policyのモジュールを使用しました。
作成したcredentialは変数に格納されます。
初回のみsecret_keyが格納されます。二回目は格納されません。
この動作はAWSコンソールでクレデンシャルを作成したときと同じ動きです。

- name: S3バケットに対応したIAMを作成しクレデンシャル取得
  iam:
    iam_type: user
    name: "{{ VHOSTNAME }}"
    key_count: 1
    access_key_state: create
    state: present
    groups: "{{ IAM_GROUP }}"
  delegate_to: localhost
  become: false
  register: credentials

- name: iamのpolicy設定
  iam_policy:
    iam_type: user
    iam_name: "{{ VHOSTNAME }}"
    policy_name: "s3_limited_access_{{ bucket_name }}"
    policy_json: "{{ lookup( 'template', 's3_policy.json.j2') }}"
    state: present
  delegate_to: localhost
  become: false
  when: credentials.changed

ちなみに、作成したaccess_key, secret_access_keyはこのように取り出せます。

- name: access_keyを保存
  set_fact:
    s3_access_key: "{{ credentials['user_meta']['access_keys'][0]['access_key_id'] }}"
  when: credentials['user_meta']['access_keys'][0]['access_key_id'] is defined

- name: s3_secret_access_keyを保存
  set_fact:
    s3_secret_access_key: "{{ credentials['user_meta']['access_keys'][0]['secret_access_key'] }}"
  when: credentials['user_meta']['access_keys'][0]['secret_access_key'] is defined

is definedでキー存在を確認しています。いきなり値を取り出そうとすると、2回目は存在しないキーをアクセスすることになりますので、エラーが発生してそこで実行が止まってしまいます。

wordpressにAWSのaccess_keyを渡す

もっと行儀よく環境変数で渡すこともできますが、今回はwp_config.php に埋め込む方法をとりました。

- name: copy wordpres config
  become: true
  template:
    src: wp-config-sample.php
    dest: "{{ wp_vhost_path }}/wp-config.php"
    owner: apache
    group: apache
    mode: 0755
  when: s3_secret_access_key is defined

secret_keyが生成された初回のみwp-config.phpをテンプレートから所定の場所にコピーしました。
必要な情報をコード内に埋め込みたかったので以下のようにテンプレート化してあります。

define('DBI_AWS_ACCESS_KEY_ID', '{{ s3_access_key }}');
define('DBI_AWS_SECRET_ACCESS_KEY', '{{ s3_secret_access_key }}');
define('AS3CF_BUCKET', '{{ bucket_name }}');
define('AS3CF_REGION', 'ap-northeast-1');

wordpressのpluginの自動設定

wp_cliを入れてしまえばこっちのものです。以下の例はwp-cliをダウンロードして、/usr/local/bin/ の下に配置します。

- name: install wp-cli
  become: true
  get_url: url=https://raw.githubusercontent.com/wp-cli/builds/gh-pages/phar/wp-cli.phar
    dest="/usr/local/bin/" validate_certs=no mode=0755

この処理は各ホスト1回だけでいい処理です。

各バーチャルホストの設定のときに
必要なプラグインをinstallしたり。

- name: instlal plugin
  become: yes
  become_user: apache
  shell: "/usr/local/bin/wp-cli.phar --path={{ wp_vhost_path }} plugin install {{ plugin }}"
  with_items:
    - "amazon-web-services"
    - "amazon-s3-and-cloudfront"
  loop_control:
      loop_var: plugin
  when: db_created.changed

プラグインを有効にしたり、

- name: activate plugin
  become: yes
  become_user: apache
  shell: "/usr/local/bin/wp-cli.phar --path={{ wp_vhost_path }} plugin activate {{ plugin }}"
  with_items:
    - "amazon-web-services"
    - "amazon-s3-and-cloudfront"
  loop_control:
      loop_var: plugin
  when: db_created.changed

そして、プラグインの設定を更新したりできます。

- name: config plugin
  become: yes
  become_user: apache
  shell: "/usr/local/bin/wp-cli.phar --path={{ wp_vhost_path }} option update tantan_wordpress_s3 '{{ lookup('template', 'tantan_wordpress_s3_option.json') }}' --format=json"
  when: db_created.changed

wp-cliでプラグインの設定をJSONでDBに書き込むことができます。

{
  "post_meta_version": 6,
  "region": "",
  "domain": "path",
  "cloudfront": "",
  "object-prefix": "wp-content\/uploads\/",
  "copy-to-s3": "1",
  "serve-from-s3": "0",
  "remove-local-file": "0",
  "force-https": "0",
  "object-versioning": "1",
  "use-yearmonth-folders": "1",
  "enable-object-prefix": "1"
}

成果

  • virtual hostの設定をyamlに記述することができる。
  • bitbucket/githubのwebhookでJenkinsと連携しAnsibleを叩ける
  • AWSコンソールを一切触らなくても良くなりました。
  • インフラの知識がない担当者もとりあえずPullRequestを作成できればサイトを一つ増やすことができるようになりました。

続きを読む

今まで普通に動いていたApacheが起動に失敗した(No space left on device)

状況確認

EC2上で稼働していたhttpdがある日突然起動に失敗した。
エラーログdefault:/var/log/httpd/error_logを確認のところ、次の一文が確認された

No space left on device: AH00023: Couldn't create the watchdog-callback mutex

検索すると、ログローテーション時にセマフォを使い切っていて起動できなかった模様。
現在のセマフォ状況はipcs -sコマンドで確認できる

[root@test-server]# ipcs -s
------ セマフォ配列 --------
キー     semid      所有者  権限     nsems
0x00000000 48365568   apache     600        1
0x00000000 48398337   apache     600        1
0x00000000 48431106   apache     600        1
0x00000000 48463875   apache     600        1
0x00000000 48496644   apache     600        1
0x00000000 48529414   apache     600        1

セマフォの上限はsysctl -aコマンドで確認できる

[root@test-server]# /sbin/sysctl -a | grep sem
kernel.sem = 250        32000   32      128

おそらくセマフォの識別子数が128で、ipcsコマンドの結果が128行に達すると起動に失敗する。

初期対応

service httpd restartでセマフォ数は減って起動するようになったが、翌日また増えている。
どうやらservice httpd gracefulコマンドだったりログローテーションの場合のみ、セマフォが増え続ける模様

調査

以下確認のところ、Apache/2.4.25で発生するバグであった模様
http://forum.directadmin.com/showthread.php?t=54265
https://svn.apache.org/repos/asf/httpd/httpd/branches/2.4.x/CHANGES

恒久対応

バージョン確認すると、ばっちり該当した

[root@test-server]# httpd -v
Server version: Apache/2.4.25 (Amazon)
Server built:   Jan 19 2017 16:55:49

yum update httpdで2.4.27にアップデートし、問題が解消されたことを確認した

続きを読む

cloudfront+ELB+EC2環境でのSSL化(ACM)のざっくり流れだけ

ACMにて証明書取得

2つ取得する。現時点でcloudfrontは東京リージョンの対応がなく、ELBとEC2を東京リージョンで運用する際はこちらの証明書も必要になるため、現状では2つ取得する必要がある。

cloudfront,elbに上記証明書を適用

コンソール画面から適用できる。

ELBのリスナー設定

HTTPS 443 HTTP 80

セキュリティグループを設定

ELBとEC2の設定をHTTPS対応にしてHTTPのポートは最小限開く設定にする。

備考

ELBとEC2間をSSL通信するとするとACMの証明書がEC2に適用できないという問題にぶつかる。
これを回避するにはさらに一つACM以外での証明書を取得する必要があり
さらにapacheに設定、鍵のアップロードなどが必要になる。(前述のELBリスナーも設定変更)

続きを読む

Glueの使い方的な⑤(パーティション分割してるcsvデータをパーティション分割したparquetに変換)

パーティション分割csv->パーティション分割parquet

ジョブの内容

※”Glueの使い方①(GUIでジョブ実行)”(以後①とだけ書きます)と同様のcsvデータを使います

“パーティション分割されたcsvデータを同じパーティションで別の場所にparquetで出力する”

ジョブ名

se2_job4

クローラー名

se2_in1
se2_out3

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

内容としては①と同じデータで、year,month,day,hourのパーティションごとに分けたcsvファイルを配置します。
year,month,day,hourのカラムは削除しています。

元となる①のデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

今回使う入力データ(19件)
year,month,day,hourのカラムは削除しています

$ ls
cvlog_2017111112.csv    cvlog_2017120101.csv    cvlog_2017121621.csv
cvlog_2017111414.csv    cvlog_2017120218.csv    cvlog_2017121714.csv
cvlog_2017112114.csv    cvlog_2017120904.csv    cvlog_2017121718.csv
cvlog_2017112915.csv    cvlog_2017121412.csv    cvlog_2017121908.csv
cvlog_2017113014.csv    cvlog_2017121414.csv    cvlog_2017121914.csv
cvlog_2017113020.csv    cvlog_2017121511.csv    cvlog_2017122915.csv
$ cat *
deviceid,uuid,appid,country
iphone,11121,001,JP
deviceid,uuid,appid,country
iphone,11123,009,FR
deviceid,uuid,appid,country
iphone,11119,007,AUS
deviceid,uuid,appid,country
other,11110,005,JP
iphone,11125,005,JP
deviceid,uuid,appid,country
iphone,11129,007,AUS
deviceid,uuid,appid,country
android,11122,001,FR
deviceid,uuid,appid,country
pc,11118,001,FR
deviceid,uuid,appid,country
pc,11117,009,FR
deviceid,uuid,appid,country
iphone,11128,009,FR
deviceid,uuid,appid,country
iphone,11111,001,JP
deviceid,uuid,appid,country
android,11112,001,FR
deviceid,uuid,appid,country
iphone,11116,001,JP
deviceid,uuid,appid,country
iphone,11113,009,FR
deviceid,uuid,appid,country
iphone,11124,007,AUS
deviceid,uuid,appid,country
iphone,11114,007,AUS
deviceid,uuid,appid,country
iphone,11126,001,JP
deviceid,uuid,appid,country
android,11127,001,FR
deviceid,uuid,appid,country
other,11115,005,JP

データの場所
year,month,day,hourのパーティションごとに分けたcsvファイルを配置しています

# aws s3 ls s3://test-glue00/se2/in1/year=2017/ --recursive
2018-01-04 09:38:13          0 se2/in1/year=2017/
2018-01-04 09:40:12          0 se2/in1/year=2017/month=11/
2018-01-04 09:40:38          0 se2/in1/year=2017/month=11/day=11/
2018-01-04 09:41:23          0 se2/in1/year=2017/month=11/day=11/hour=12/
2018-01-04 10:19:28         48 se2/in1/year=2017/month=11/day=11/hour=12/cvlog_2017111112.csv
2018-01-04 09:40:42          0 se2/in1/year=2017/month=11/day=14/
2018-01-04 09:41:41          0 se2/in1/year=2017/month=11/day=14/hour=14/
2018-01-04 10:19:45         48 se2/in1/year=2017/month=11/day=14/hour=14/cvlog_2017111414.csv
2018-01-04 09:40:47          0 se2/in1/year=2017/month=11/day=21/
2018-01-04 09:41:55          0 se2/in1/year=2017/month=11/day=21/hour=14/
2018-01-04 10:20:01         49 se2/in1/year=2017/month=11/day=21/hour=14/cvlog_2017112114.csv
2018-01-04 09:40:50          0 se2/in1/year=2017/month=11/day=29/
2018-01-04 09:42:09          0 se2/in1/year=2017/month=11/day=29/hour=15/
2018-01-04 10:20:22         67 se2/in1/year=2017/month=11/day=29/hour=15/cvlog_2017112915.csv
2018-01-04 09:41:01          0 se2/in1/year=2017/month=11/day=30/
2018-01-04 09:42:22          0 se2/in1/year=2017/month=11/day=30/hour=14/
2018-01-04 10:20:41         49 se2/in1/year=2017/month=11/day=30/hour=14/cvlog_2017113014.csv
2018-01-04 09:42:40          0 se2/in1/year=2017/month=11/day=30/hour=20/
2018-01-04 10:20:52         49 se2/in1/year=2017/month=11/day=30/hour=20/cvlog_2017113020.csv
2018-01-04 09:40:16          0 se2/in1/year=2017/month=12/
2018-01-04 09:43:11          0 se2/in1/year=2017/month=12/day=1/
2018-01-04 09:45:16          0 se2/in1/year=2017/month=12/day=1/hour=1/
2018-01-04 10:21:19         44 se2/in1/year=2017/month=12/day=1/hour=1/cvlog_2017120101.csv
2018-01-04 09:43:21          0 se2/in1/year=2017/month=12/day=14/
2018-01-04 09:46:50          0 se2/in1/year=2017/month=12/day=14/hour=12/
2018-01-04 10:22:28         48 se2/in1/year=2017/month=12/day=14/hour=12/cvlog_2017121412.csv
2018-01-04 09:47:01          0 se2/in1/year=2017/month=12/day=14/hour=14/
2018-01-04 10:22:51         49 se2/in1/year=2017/month=12/day=14/hour=14/cvlog_2017121414.csv
2018-01-04 09:43:38          0 se2/in1/year=2017/month=12/day=15/
2018-01-04 09:47:11          0 se2/in1/year=2017/month=12/day=15/hour=11/
2018-01-04 10:23:12         48 se2/in1/year=2017/month=12/day=15/hour=11/cvlog_2017121511.csv
2018-01-04 09:43:42          0 se2/in1/year=2017/month=12/day=16/
2018-01-04 09:47:23          0 se2/in1/year=2017/month=12/day=16/hour=21/
2018-01-04 10:23:34         48 se2/in1/year=2017/month=12/day=16/hour=21/cvlog_2017121621.csv
2018-01-04 09:43:45          0 se2/in1/year=2017/month=12/day=17/
2018-01-04 09:47:38          0 se2/in1/year=2017/month=12/day=17/hour=14/
2018-01-04 10:23:54         49 se2/in1/year=2017/month=12/day=17/hour=14/cvlog_2017121714.csv
2018-01-04 09:47:43          0 se2/in1/year=2017/month=12/day=17/hour=18/
2018-01-04 10:24:12         49 se2/in1/year=2017/month=12/day=17/hour=18/cvlog_2017121718.csv
2018-01-04 09:43:49          0 se2/in1/year=2017/month=12/day=19/
2018-01-04 09:48:04          0 se2/in1/year=2017/month=12/day=19/hour=14/
2018-01-04 10:25:07         49 se2/in1/year=2017/month=12/day=19/hour=14/cvlog_2017121914.csv
2018-01-04 09:47:59          0 se2/in1/year=2017/month=12/day=19/hour=8/
2018-01-04 10:24:56         48 se2/in1/year=2017/month=12/day=19/hour=8/cvlog_2017121908.csv
2018-01-04 09:43:15          0 se2/in1/year=2017/month=12/day=2/
2018-01-04 09:45:44          0 se2/in1/year=2017/month=12/day=2/hour=18/
2018-01-04 10:21:41         44 se2/in1/year=2017/month=12/day=2/hour=18/cvlog_2017120218.csv
2018-01-04 09:43:52          0 se2/in1/year=2017/month=12/day=29/
2018-01-04 09:48:17          0 se2/in1/year=2017/month=12/day=29/hour=15/
2018-01-04 10:25:26         47 se2/in1/year=2017/month=12/day=29/hour=15/cvlog_2017122915.csv
2018-01-04 09:43:18          0 se2/in1/year=2017/month=12/day=9/
2018-01-04 09:46:24          0 se2/in1/year=2017/month=12/day=9/hour=4/
2018-01-04 10:22:02         48 se2/in1/year=2017/month=12/day=9/hour=4/cvlog_2017120904.csv

S3のディレクトリ構成

Glueジョブの入力データは”in1″ディレクトリ配下、出力は”out3″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE in1/
                           PRE out0/
                           PRE out1/
                           PRE out2/
                           PRE out3/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

入力データ用に新しくクローラーを作り実行してテーブルを作ります。

出来上がるテーブルの情報は以下です。

スクリーンショット 0030-01-04 10.52.04.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job4ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”パーティション分割されたcsvデータを同じパーティションで別の場所にparquetとして出力する”です。

se2_job4
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションをマッピング対象のカラムとして含めてくれません
入力のパーティションのカラムをマップの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

↓↓↓

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job4_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in1", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

###add
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out3/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
###add

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out3"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-04 15.54.04.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/
2018-01-04 15:15:41        926 part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet .
download: s3://test-glue00/se2/out3/year=2017/month=11/day=14/hour=14/part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet to ./part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# ls
part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head part-00149-21f4b84d-3fb2-4095-bf1c-cad2f6a58e63.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out3でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-04 15.59.33.png

S3の出力パスを入力
形式の違うデータが混在しているとテーブルが複数できてしまうので、不要なものがあれば、excludeで除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-04 16.00.06.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.19.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-04 16.00.28.png

そのまま”Next”をクリック

スクリーンショット 0030-01-04 16.00.36.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-04 16.00.49.png

クローラー実行

1つのテーブルとして認識している

スクリーンショット 0030-01-04 16.09.01.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-04 16.09.52.png

Athenaから確認

左メニューからse2_out3のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-04 16.11.18.png

件数も19件で合っている

スクリーンショット 0030-01-04 16.11.01.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばappidというカラムがあるので、アプリごとに集計をするようなケースが多いならappidも含めてパーティション分割する

他にもdeviceidとかでデバイスごとに集計したり
一時的な調査にも役立つかも

出力に”out4″ディレクトリ作成
①と同様のジョブをse2_job5で作成
PySparkに以下3点修正したジョブ作成

25行目を以下のように修正します
GUIでジョブを作った場合、入力データのパーティションはカラムとして含めてくれません。
入力のパーティションのカラムをapplyの対象として追加します

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string")], transformation_ctx = "applymapping1")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string"), ("hour", "string", "hour", "string")], transformation_ctx = "applymapping1")

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['appid','year','month','day','hour']
output='s3://test-glue00/se2/out4/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

appidごとにパーティション別れてます

スクリーンショット 0030-01-04 16.30.35.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-04 16.33.32.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-04 16.33.49.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-04 16.34.10.png

件数も同じく19件

スクリーンショット 0030-01-04 16.34.39.png

その他

todo

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/latest/sql-programming-guide.html

続きを読む

Glueの使い方的な④(ブックマーク)

ブックマークを使って一度処理したデータは処理対象外とする

入力パスの先にデータがあっても既に処理済なデータは除外したい。そんな思いになったことありませんでしょうか?

これを実現する機能がGlueの”ブックマーク”です。

全体の流れ

  • 前提
  • ブックマークの効果を見る
  • ジョブの永続的なブックマーク有効化
  • トリガー側のブックマーク有効無効
  • (おまけの考察)

前提

Glueの使い方的な①(GUIでジョブ実行)“(以後①と書きます)、”Glueの使い方的な③(CLIでジョブ実行)“(以後③と書きます)あたりを読んでいただけるとスムーズです

今回扱うジョブは①と同じ内容です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

③の後ろの方で書いたように、このジョブはGlueのGUIだけで作成し、1つのcsvを1つのparquetにするだけのなので、このジョブを2回実行すると同じ内容の出力が2つ出来てしまいます。

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

*①と同じディレクトリ
Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

ブックマークの効果を見る

①や③でジョブを実行して出力ファイルがout0に既に存在しているのでそれを事前に消しておきます。

想定される流れと結果

1回目の実行で、①と同じくparquetの出力ファイルが1つできる。(あとメタデータ2つ)
こんな感じ

2018-01-02 10:44:04        782 _common_metadata
2018-01-02 10:44:04       1746 _metadata
2018-01-02 10:27:05       2077 part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet

ブックマークを有効化することで、2回目の実行で、既に処理した入力ファイルは処理されない為、新たなparquetファイルは出力されず1つのままになります

やってみる

ジョブは①で作ったのと同じ内容で新たにジョブをse2_job2という名前で作ります
ジョブの内容は以下です。
“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

最初のジョブの状態

実行履歴が1件ありますが気にしないでください。
スクリーンショット 0030-01-02 17.38.20.png

1回目実行

ブックマークを有効にして実行する
“Action”の”Run job”をクリックする

※ちなみにこの画面のActionの”Reset job bookmark”でブックマークに保持された状態をリセットできます

スクリーンショット 0030-01-02 17.39.21.png

Parameter(optional)の画面で”Job bookmark”を”Enable”にして”Run job”をクリックする
今回のみ有効なパラメータとしてジョブを実行します。

スクリーンショット 0030-01-02 17.39.06.png

成功して履歴が1つ増えています
スクリーンショット 0030-01-02 17.49.46.png

S3にparquetファイル1つとメタデータ2つがほぼ同じ時刻で作成されています。

スクリーンショット 0030-01-02 17.50.06.png

続いて同じ手順で2回目を実行します。

実行後のS3はメタデータは更新されている(?)
ただ、肝心のparquetファイルは1つのみで、1度処理した入力データを対象外としていることがわかる
※メタデータの内容は後半で確認してみます

スクリーンショット 0030-01-02 17.55.50.png

このようにGlueのブックマークを有効にしておくと、同じ場所にあるソースデータの中でまだ処理していないデータだけを処理対象とすることができます。

ジョブ側の永続的なブックマーク有効化

さっきまではジョブ実行時の1度だけのブックマーク有効でした
今回は永続的なブックマーク有効化をします。

対象ジョブにチェックを入れ、”Action”をクリックし、”Edit job”をクリックする

スクリーンショット 0030-01-02 17.53.30.png

この画面が出て一見ブックマークの設定箇所がなさそうだが、実は下にスクロールできる

スクリーンショット 0030-01-02 17.53.49.png

あった

スクリーンショット 0030-01-02 17.54.07.png

“Enable”に変更して”Save”
するとエラーが、、”temporary directory”と”IAM role”をこのタイミングでも入れないといけなようです。
それぞれ入力後”Save”をクリックします。

スクリーンショット 0030-01-02 17.54.20.png

画面の右端にあるように”Job bookmark”が”Enable”になっているのがわかります。

スクリーンショット 0030-01-02 18.11.49.png

トリガー側のブックマーク有効無効

ブックマークの有効無効確認

対象のトリガーのse2_trigger2の部分をクリック

スクリーンショット 0030-01-02 18.18.50.png

以下の画面になり、Parametersのところに”–job-bookmark-option:job-bookmark-enable”とあればブックマークが有効になっている

スクリーンショット 0030-01-02 18.19.04.png

ブックマーク有効無効変更

対象のトリガーse2_trigger2にチェックを入れ、”Action”をクリックし”Edit trigger”をクリックする

スクリーンショット 0030-01-02 18.16.45.png

“Next”をクリックする

スクリーンショット 0030-01-02 18.16.59.png

画面下の部分に”Job bookmark”がありDisable、Enable、Pauseの3つの状態が選べる
これを選びNextと進めればよさそうだが、それだけだとダメである

スクリーンショット 0030-01-02 18.17.16.png

画面下部にあるKeyとValueの箇所の”job-bookmark-enable”を事前に消しておく必要がある。右側の✖をクリックすることで消せます。消した後に”Job bookmark”をDisableにして”Next”をクリックし、次にサマリが出るので問題なければ”Finish”をクリックする

スクリーンショット 0030-01-02 18.17.39.png

確認すると”–job-bookmark-option:job-bookmark-disable”になっていて無効に変わったことがわかる

スクリーンショット 0030-01-02 18.33.18.png

(ブックマーク有効にしてジョブ実行してもメタデータが更新された件の考察)

結果的にはparquetの仕様っぽいので気にする必要もなさそうです。

メタを詳細に見るためにparquet-toolsを入れます。
このツール相変わらず普通にビルドできない・・

ここは弊社の担々麺デカの力を借りて無事入りました。(+1タンタンメン)
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

とは言え改善してるんじゃと思い、ちょっと脱線しますが

【parquet-toolsインストール2018年版】

github
また(?)リポジトリ変わってます・・
https://github.com/apache/parquet-mr

最新のv1.8.0、v1.8.1はpom.xmlをゴニョゴニョしないとダメだ
Issueは多分こちら
https://issues.apache.org/jira/browse/PARQUET-1129

v1.7はなぜかなく、結果的にv1.6系最後(?)の1.6.0rc7だとすんなりビルドできました。

jdkインストール

yum -y install java-1.7.0-openjdk-devel

mavenインストール

wget http://ftp.yz.yamagata-u.ac.jp/pub/network/apache/maven/maven-3/3.3.9/binaries/apache-maven-3.3.9-bin.tar.gz
tar xvfz apache-maven-3.3.9-bin.tar.gz 
./apache-maven-3.3.9/bin/mvn -version

.bash_profileとかでパス通す

PATH=$PATH:$HOME/bin:/root/apache-maven-3.3.9/bin

parquet-toolsインストール

git clone https://github.com/apache/parquet-mr.git
cd parquet-mr
git checkout ec6f200b4943cfcbc8be5a8e53fdebf07a8e16f7
cd parquet-tools/
mvn clean package -Plocal 

実行

aws s3 cp s3://test-glue00/se2/out0/ .data/ --recursive

java -jar target/parquet-tools-1.6.0rc7.jar head -n 1 data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
deviceid = iphone
uuid = 11111
appid = 1
country = JP
year = 2017
month = 12
day = 14
hour = 12

parquet-toolsでメタ情報も見れます。
ブックマークを使ったのにメタデータは更新があったので、更新前と後のメタの情報を比較してみます。

メタ情報表示

# java -jar target/parquet-tools-1.6.0rc7.jar meta data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
file:        file:/root/parquet/parquet-mr/parquet-tools/data/part-00000-19a94695-23f8-492d-81ee-6a0772e4a3b5.snappy.parquet 
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf) 
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"deviceid","type":"string","nullable":true,"metadata":{}},{"name":"uuid","type":"long","nullable":true,"metadata":{}},{"name":"appid","type":"long","nullable":true,"metadata":{}},{"name":"country","type":"string","nullable":true,"metadata":{}},{"name":"year","type":"long","nullable":true,"metadata":{}},{"name":"month","type":"long","nullable":true,"metadata":{}},{"name":"day","type":"long","nullable":true,"metadata":{}},{"name":"hour","type":"long","nullable":true,"metadata":{}}]} 

file schema: spark_schema 
--------------------------------------------------------------------------------
deviceid:    OPTIONAL BINARY O:UTF8 R:0 D:1
uuid:        OPTIONAL INT64 R:0 D:1
appid:       OPTIONAL INT64 R:0 D:1
country:     OPTIONAL BINARY O:UTF8 R:0 D:1
year:        OPTIONAL INT64 R:0 D:1
month:       OPTIONAL INT64 R:0 D:1
day:         OPTIONAL INT64 R:0 D:1
hour:        OPTIONAL INT64 R:0 D:1

row group 1: RC:19 TS:952 OFFSET:4 
--------------------------------------------------------------------------------
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 ENC:PLAIN,BIT_PACKED,RLE
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE

更新前のメタ情報:/tmp/data1/meta.log
更新後のメタ情報:/tmp/data2/meta.log
diffは見切れちゃってますが、差分は以下の並びが違うということでした。内容は同じです。
ENC:BIT_PACKED,PLAIN_DICTIONARY,RLE
もう1回ジョブを実行した場合、上記の並びは同じでしたがタイムスタンプは最新になってました。
そういうもののようです;

# sdiff -s /tmp/data1/meta.log /tmp/data2/meta.log 
file:        file:/root/parquet/parquet-mr/parquet-tools/data | file:        file:/root/parquet/parquet-mr/parquet-tools/data
deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E | deviceid:     BINARY SNAPPY DO:0 FPO:4 SZ:101/97/0.96 VC:19 E
uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19 | uuid:         INT64 SNAPPY DO:0 FPO:105 SZ:138/201/1.46 VC:19
appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19  | appid:        INT64 SNAPPY DO:0 FPO:243 SZ:98/100/1.02 VC:19 
country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19  | country:      BINARY SNAPPY DO:0 FPO:341 SZ:80/76/0.95 VC:19 
year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E | year:         INT64 SNAPPY DO:0 FPO:421 SZ:74/70/0.95 VC:19 E
month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E | month:        INT64 SNAPPY DO:0 FPO:495 SZ:86/82/0.95 VC:19 E
day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19 | day:          INT64 SNAPPY DO:0 FPO:581 SZ:137/171/1.25 VC:19
hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19 | hour:         INT64 SNAPPY DO:0 FPO:718 SZ:129/155/1.20 VC:19

その他

  • Glueで自動生成されるPySparkコードに以下のようなコンテキストオブジェクトがあります。これにはブックマークとしての意味もあり、それぞれの実行時にソース、変換、およびシンクの状態をブックマークの状態のキーとして使用します。状態はタイムスタンプとして記録し維持します。
    なのでブックマークを使用していない場合は、この変数を指定しなくても問題はありません。
    transformation_ctx = “datasource0”
    transformation_ctx = “applymapping1”
    transformation_ctx = “datasink4”

  • S3の結果整合性への対処
    ジョブ開始前に、以前のデータと不整合があるデータをジョブの対象とする(整合なデータは除外リストとして維持する)
    状態としてサイズも持っているということかもしれません。

  • 例えばあるファイルは処理対象か対象ではないのか?と言った詳細なブックマークの状態を見ることは現在はできません。

To Be Continue

todo

参考

Bookmarkの公式ページ
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/monitor-continuations.html

re:Invent資料。34ページあたりにBookMarkの細かい情報あり
https://www.slideshare.net/AmazonWebServices/abd315serverless-etl-with-aws-glue

parquet-toolsインストール
http://d.hatena.ne.jp/yohei-a/20170629/1498710035

parquet-tools
https://github.com/apache/parquet-mr

続きを読む

Glueの使い方的な②(csvデータをパーティション分割したparquetに変換)

パーティション分割するジョブを作る

ジョブの内容

※”Glueの使い方的な①(GUIでジョブ実行)“(以後①とだけ書きます)と同様のcsvデータを使います

“csvデータのタイムスタンプのカラムごとにパーティション分割してparquetで出力する”

ジョブ名

se2_job1

クローラー名

se2_in0
se2_out1

全体の流れ

  • 前準備
  • ジョブ作成と修正
  • ジョブ実行と確認
  • 出力データのクローラー作成、実行、Athenaで確認
  • 別のカラムでパーティション分割

※①のGUIで作成したPySparkスクリプトに最小限の変更を入れる形で進めます

前準備

ソースデータ(19件)

※①と同じデータ

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,001,JP,2017,12,14,12
android,11112,001,FR,2017,12,14,14
iphone,11113,009,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

データの場所

※①と同じ場所

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

S3のディレクトリ構成

Glueジョブの入力データは”in0″ディレクトリ配下、出力は”out1″ディレクトリ配下

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE out1/
                           PRE script/
                           PRE tmp/

入力テーブルのクローラー

※①で作ったものを使います。

テーブルの情報は以下です。

スクリーンショット 0030-01-03 15.45.21.png

ここから、ジョブ作成とPySparkスクリプト修正、出力データのクローラー作成を行っていきます

ジョブ作成と修正

①と同じ手順のGUIのみの操作でse2_job1ジョブを作成
この段階では①とほぼ同じ内容のジョブです
コードは以下になります。

処理内容は”S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”です。

se2_job1
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

以下の部分を修正します。

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

toDF:DynamicFrameをDataFrameに変換
write:DataFrameのデータを外部に保存。jdbc, parquet, json, orc, text, saveAsTable
parquetのcompression:none, snappy, gzip, and, lzoから選べる
partitionBy:Hiveパーティションのようにカラム=バリュー形式でパーティション化されたディレクトリにデータを保存
mode:ファイルやテーブルが既に存在してる場合の振る舞い。overwrite,append,ignore,error(デフォ)
repartition(numPartitions, *cols)[source]:パーティションの再配置、カッコ内はパーティションする単位を数字かカラムで選ぶ、カラムが優先

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

修正したコードです

se2_job1_update
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

### Add(start)
df = dropnullfields3.toDF()

partitionby=['year','month','day','hour']
output='s3://test-glue00/se2/out1/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)
#dropnullfields3 = DynamicFrame.fromDF(df, glueContext, "dropnullfields3")
### Add(end)

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
###
#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ジョブ実行と確認

ジョブ実行

対象ジョブにチェックを入れ、ActionからRun jobをクリックしジョブ実行します
出力が指定したyaerやmonthでパーティション分割されている。

スクリーンショット 0030-01-03 16.25.57.png

コマンドで確認

s3のparquetファイルを確認
ローカルにダウンロードし、parquet-toolsで内容確認

# aws s3 ls s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/
2018-01-03 16:25:49        926 part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# aws s3 cp s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet . 
download: s3://test-glue00/se2/out1/year=2017/month=11/day=14/hour=14/part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet to ./part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet
# java -jar /root/parquet/parquet-mr/parquet-tools/target/parquet-tools-1.6.0rc7.jar head  part-00049-b7b7bd75-25ac-4a17-bd01-642bde94a648.snappy.parquet 
deviceid = iphone
uuid = 11123
appid = 9
country = FR

出力データのクローラー作成、実行、Athenaで確認

se2_out1でクローラー作成

GlueのCrawlersをクリックし、”Add crawler”をクリック

スクリーンショット 0030-01-03 16.37.26.png

S3の出力パスを入力

スクリーンショット 0030-01-03 16.37.50.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.38.46.png

IAM roleに”test-glue”を選択

スクリーンショット 0030-01-03 16.38.55.png

そのまま”Next”をクリック

スクリーンショット 0030-01-03 16.39.03.png

Databaseを選択(今回はse2)
Prefixを入力(今回はse2_)

スクリーンショット 0030-01-03 16.39.16.png

クローラー実行

形式の違うデータが混在しているとテーブルが複数できてしまう

スクリーンショット 0030-01-03 16.41.46.png

クローラーを作り直し、”Add a data store”の”Exclude patterns”の箇所で
不要なものがあれば除外する。
今回は、_common_metadataと_metadataを除外してる

スクリーンショット 0030-01-05 20.18.38.png

クローラー実行後
1つのテーブルとして認識される

スクリーンショット 0030-01-03 16.48.52.png

スキーマも、yearやmonthなどで分割したパーティションを認識している

スクリーンショット 0030-01-03 16.49.09.png

Athenaから確認

左メニューからse2_out1のスキーマ情報確認、クエリ実行

スクリーンショット 0030-01-05 20.39.54.png

件数も19件で合っている

スクリーンショット 0030-01-03 16.51.35.png

別のカラムでパーティション切る

タイムスタンプ以外のカラムでももちろんパーティションを切れます。

例えばcountryというカラムがあるので、国ごとに集計をするようなケースが多いならcountryも含めてパーティション分割する

他にもappidとかでアプリごとに集計したり
一時的な調査にも役立つかも

出力に”out2″ディレクトリ作成
①と同様のジョブをse2_job3で作成
作成したジョブのPySparkスクリプトに以下2点修正

35行目の”dropnullfields3″の後に以下を追加

df = dropnullfields3.toDF()

partitionby=['country','year','month','day','hour']
output='s3://test-glue00/se2/out2/'
codec='snappy'

df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output,compression=codec)

最後の方のsink処理をコメントアウト

#datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out1"}, format = "parquet", transformation_ctx = "datasink4")

ジョブ実行

countryごとにパーティション別れてます

スクリーンショット 0030-01-03 17.33.24.png

クローラー作成と実行

手順はさっきと同じなので省きます

テーブルが作成され

スクリーンショット 0030-01-03 17.36.02.png

スキーマはcountryがパーティションに追加されています

スクリーンショット 0030-01-03 17.37.19.png

Athenaから確認

左側メニューでスキーマ確認と、クエリ実行

スクリーンショット 0030-01-03 17.39.12.png

件数も同じく19件

スクリーンショット 0030-01-03 17.38.57.png

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

こちらも是非

参考サンプル
https://github.com/awslabs/aws-big-data-blog/blob/master/aws-blog-spark-parquet-conversion/convert2parquet.py

Spark API
https://spark.apache.org/docs/preview/api/python/pyspark.sql.html
https://hackernoon.com/managing-spark-partitions-with-coalesce-and-repartition-4050c57ad5c4

続きを読む

I need someone strong in AWS cloud services, Microservices, Full-stack development. Inbox me first.

Java & クラウドコンピューティング Projects for $250 – $750. I need someone can handle and vast experience in AWS cloud services, Hands on experience with Java / J2EE, Spring, Apache, AWS Elastic Bean Stalk, Cloud Formation, MySQL, Docker, Jenkins, GitHub, Java… 続きを読む

【レポート】Deep Dive: ビッグデータワークロードをAWSに移行する #reinvent #ABD312 | Developers.IO

原題. ABD312 – Deep Dive: Migrating Big Data Workloads to AWS. 概要. Customers are migrating their analytics, data processing (ETL), and data science workloads running on Apache Hadoop, Spark, and data warehouse appliances from on-premise deployments to AWS in order to save costs, … 続きを読む

AWSにJenkinsをインストール – Qiita

AWSにJenkinsをインストール – Qiita. やりたいことyumでインストールする最新安定版を使う (2016/7/26 時点では、2.7.1) Apacheと連携する … 続きを表示 やりたいことyumでインストールする最新安定版を使う (2016/7/26 時点では、2.7.1) Apacheと連携する (ブラウザから8080ポートを使用しない) 他のアプリも同一サーバ … 続きを読む