AWS S3勉強まとめ

ブロックストレージ
EBS, インスタンスストア
→EC2にマウントして活用
→Block番号で管理

オブジェクトストレージ
S3, Glacier
→安価かつ高い耐久性を持つオンラインストレージ
→オブジェクト、それに付随するメタデータ、そのオブジェクトにアクセスするためのユニークなIDで構成

ファイルストレージ
EFS
→EC2から同時マウントできる共有ストレージサービス
→ファイルシステム

・S3特徴
→容量無制限、安価なストレージ(1GB3円)、データ容量に依存しない性能(RAIDやサーバー台数を考える必要なし)

・S3用途
①コンテンツ配信、保管サーバ
②ログ&データハブストレージ
③バックアップやDR

バケット
オブジェクトの保存場所。デフォルト100個/1アカウントまで作成可能。名前はグローバルでユニークな必要あり。
オブジェクト
データ本体。URLが付与される
キー
オブジェクトの格納URL
メタデータ
オブジェクトに付随する属性情報。システム定義メタデータ、ユーザ定義メタデータあり
リージョン
バケットを配置するAWSのロケーション
アクセスコントロールリスト(ACL)
バケットやオブジェクトのアクセス管理

・ストレージクラス
スタンダード
標準低頻度アクセスストレージ:スタンダードに比べて安価だが、データの読出し容量に対して課金
Glacier:最も低コスト。データの取り出しにコストと時間
低冗長化ストレージ:Glacierから取り出したデータの置き場所として利用

結果整合性(Eventual Consistency Readモデル)
「更新はそのうち全体に反映される」
読み取り一貫性
– あるトランザクションがデータを変更中のとき、ほかのトランザクションからは変更される前のデータを参照します。
– ほかのトランザクションからは変更前の確定されたデータを参照します。
– あるユーザーAが値をUPDATEしたとき、ユーザーBがそのデータを参照すると、戻ってくる値はUPDATE前の値となります。
– あるトランザクションで変更した確定前のデータをほかのトランザクションから参照することはできません。

・パソコンのファイルシステムやデータベースと同じようにロックやトランザクション処理は行われない
参考URL:https://dev.classmethod.jp/cloud/amazon-s3-eventually-consistent-and-consistent-read/

・アクセス管理
①ユーザポリシー
→IAMuserに対して権限設定
②バケットポリシー
→バケットごとに権限設定。クロスアカウントで使用する際など
③ACL
→バケット、オブジェクトごとに指定可能(オブジェクトACLが優先)

署名付きURL
AWS SDKで作成。S3のプライベートなオブジェクトに対して一定時間アクセスを許可

・Webサイトホスティング機能
静的なWebサイトをS3のみでホスティング可能
– バケット単位で指定
– 独自ドメインの設定→ドメイン名をバケット名として指定
– リダイレクト機能→任意のドメインにリダイレクト設定が可能
CloudFrontとの経由で配信することを推奨。バケットポリシーでHTTP/HTTPSリクエストのみを許可可能

VPCエンドポイント
プライベートサブネットからNATゲートウェイなどを経由せずに直接S3とセキュアに通信可能
同一リージョンのみ

S3 support for IPv6
追加費用なし
静的ウェブホスティングは使用不可

・暗号化
– サーバーサイド暗号化(サーバリソースを利用して格納データの暗号化)
– クライアントサイド暗号化(クライアント側で暗号化したデータをS3にアップロード)

クロスリージョンレプリケーション
異なるリージョン間のS3バケットオブジェクトのレプリケーションを実施
→オブジェクトに対する動作を非同期でレプリケーション
→対象元バケットはバージョニングの機能を有効にする必要あり
※リージョン間データ転送費用が発生

バージョン管理機能
誤操作による削除対策に有効
バケットに対して設定
任意のオブジェクトを参照可能
バージョニングのオブジェクト分も課金。保存期間も指定可能

ライプサイクル管理
バケット内のオブジェクトに対して、ストレージクラスの変更や、削除処理の自動化
データ登録→Standard保存(一定期間過ぎたら削除)→Standard-IA移動(一定期間過ぎたら削除)→Glacierにアーカイブ(一定期間過ぎたら削除)

・アーカイブ
S3上のデータを削除でGlacier側のデータも削除
S3には8KBのオブジェクト名とメタデータのみ保管

・復元
オブジェクトごと
一時的にS3の低冗長化ストレージに指定日数複製(Glacierと低冗長化ストレージ両方課金)
復元にかかる時間の選択肢は3つ
①Expedited:緊急のアクセス
②Standard:3-5時間。標準的
③Bulk:大量のデータ。5-12時間
それぞれによってコストが異なる

・オブジェクト移動
Standard⇔Standard-IA→Glacier
→Glacier

S3分析
Standard-IAとGlacierどちらにいつ移動すればいいだろうかという疑問に答える可視化ツール
→ライフサイクルポリシーの設定値の参考になる

S3インベントリ
S3のオブジェクトのリストを一気にcsvファイルで取得
スケジュールかも可能

・イベント通知
SNS:メール送信
SQS:キューメッセージの登録
Lambda:ファンクションの実行

・CloudWatchによる監視
ストレージメトリクス:バケット単位。1日単位でのレポート。追加費用なし
リクエストメトリクス:オブジェクト単位。通常のCloudWatch料金

CloudTrailによるAPI(操作ログ。Get, Delete, Putなど)管理
S3への操作ログを収集
監査対象とは別のS3バケットの用意推奨

Logging
バケットに対するアクセスログの出力設定可能

Tag管理
バケット/オブジェクトに対してタグの指定可能

・パフォーマンスの最適化
大きなサイズのファイルをアップロード、ダウンロード
RANGE GETを活用。マルチパートアップロード機能
大量のGETリクエストが発生する場合はCloudFrontを併用することを推奨

Transfer Acceleration(高速ファイル転送サービス)
AWSのエッジネットワークから最適化されたAWSのネットワークを経由する。
S3のデータ転送コストとは別に加算
※通常の転送より高速でない場合は、課金されない

コンテンツ配信サーバ
データをS3に配置、CloudFrontでキャッシュさせる
CloudFrontで静的コンテンツ配信。CloudFrontの料金はかからない
Webサーバーで動的コンテンツは処理

ログ&データハブストレージ
オンプレ:Direct Connectでログデータ収集
外部データソース;Kinesisで収集
AWS;S3に保管。Glacierにアーカイブ
分析:Redshift, EMR, Atenaなど

バックアップ、DR
クロスリージョンでデータの複製を保持
リージョン内でもDR設定

参考URL:https://www.slideshare.net/AmazonWebServicesJapan/aws-black-belt-online-seminar-2017-amazon-s3

続きを読む

AWS Auto Scaling勉強まとめ

・需要に応じて自動的にサーバーが増減し、コストカット

Auto Scaling Group
・設定した最小値~最大値に起動インスタンスを収める
・起動台数をAZ間でバランシング
・AZ障害時は他のAZでインスタンス起動

Launch Configuration
・AMIやインスタンスタイプ、IAMなどを設定して起動する

Scaling Plan
・どのようにインスタンスを起動するか
①Auto Scaling Planの維持
最小台数を維持する。
Auto Healing:インスタンスに障害発生時に自動的にサービスから切り離し、健全なインスタンスをサービスイン

②手動管理
インスタンスを手動で変更

③スケジュールベース
CLI/SDKで定義
スケーリング開始は最大2分遅れる場合があるので注意

④動的スケーリング
監視:CloudWatchに応じたインスタンスの増減

上記の複数のプランを組み合わせることも可能

ヘルスチェック
①EC2ヘルスチェック:インスタンスのステータスがrunning以外を以上と判断
②ELBヘルスチェック
・ヘルスチェックの猶予期間がある。インスタンス起動からヘルスチェック開始までの時間。アプリケーションデプロイを考慮
・異常と判断されたインスタンスは自動的に終了

クールダウン
スケーリングアクション実行後指定した時間は次にスケーリングアクションを実行しない仕組み
→インスタンス初期化中の無駄なスケーリングを回避するため
※シンプルスケーリングポリシーにのみ対応

ターミネーションポリシー
①OldestInstance/NewestInstance:起動時刻
②OldestLaunchConfiguration:最も古いLaunch Configuration
③ClosestToNextInstanceHour:課金のタイミングが近い
④Default:②③の順に適用。複数インスタンスが残ればランダム

インスタンス保護
任意のインスタンスを削除されないよう保護できる

・インスタンスのデタッチ、アタッチ、スタンバイ

ライフサイクルフック
Auto Scalingの起動/終了時に一定時間(デフォルトは1時間、最大48時間)待機させ、カスタムアクションを実行できる

・スケールアウト時の初期化処理
①設定済みのAMIを用いる
②user-dataで初期化スクリプトを実行(Bootstrap処理)
③ライフサイクルフックで初期化

・サーバーをステートレスにする
ステートレス:サーバーにセッション情報などがない。スケールアウトに向いている
ステートフル:サーバーにセッション情報あり。常にサーバー間で同期が必要なので、スケールアウトに向いていない

・突発的なスパイクには向いていない
→インスタンス作成~アプリ起動の時間がかかるため。
対応策としては、
①CloudFrontなど大きなキャパシティを持ったAWSサービスに処理をオフロードする
②スパイクを裁くのを諦め、スロットリング機能(処理性能の上限)を設ける
③一定以上の負荷を超えたら静的ページに切り替える

・ユースケース
①ELB配下のWebサーバーをAuto Scaling
→EC2は複数AZに分散し、高可用性
ELBのリクエスト数、EC2の平均CPU使用率などがトリガー

②SQSのジョブを処理するWorkerをAuto Scaling
キューのメッセージ数などがトリガー

③Blue/Green
Blue/Green
デプロイ時に、既存のインスタンスとは違うインスタンスを作成し、一気に/徐々に作成したインスタンスを使用するようにする。
移行方法は下記
・DNSのWeighted round robinを使用して、徐々にトラフィックを移行
→DNSのTTLを考慮する必要あり。
・ELBを利用して、移行する
→Elastic Container Service(ECS。Dockerコンテナを格納する場所)を利用して、新しいインスタンスを作成することも可能

In place
デプロイ時に既存のインスタンスを操作することで対応する

参考URL:http://aws.typepad.com/sajp/2015/12/what-is-blue-green-deployment.html

Elastic MapReduce(EMR)

<Hadoop(分散処理をしてくれるソフトウェア)を動かせる環境を提供してくれるサービス
参考URL:http://mgi.hatenablog.com/entry/2014/05/04/085148

参考URL:https://www.slideshare.net/AmazonWebServicesJapan/aws-black-belt-online-seminar-2017-auto-scaling

続きを読む

AWS GlueでネストされたJSONファイルをCSVファイルやParquetに変換する | Developers.IO

AWS GlueのRelationalizeというTransformを利用して、ネストされたJSONをCSVファイルやParquetに変換する方法をご紹介します。CSV形式に変換することでリレーショナルデータベースに簡単にインポートできます。また、Parquetフォーマットに変換することでAthena、Redshift Spectrum、EMRからより高速にクエリできる … 続きを読む

Dynamic DNS creation for core nodes in case of Autoscaling – AWS

パイソン & Linux Projects for $30 – $250. We wanted to implement the Dynamic creation/Deletion of DNS for nodes, when the AWS EMR cluster Autoscales up and down for Route53 using Autoscaling cloudwatch events that triggers a lambda function… 続きを読む

Glueの使い方的な①(GUIでジョブ実行)

GUIによる操作でGlueジョブを作って実行する

ジョブの内容

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”

ジョブ名

se2_job0

クローラー名

se2_in0
se2_out0

全体の流れ

  • 前準備
  • クローラー作成と実行、Athenaで確認
  • ジョブの作成と実行、Athenaで確認
  • 出来上がったPySparkスクリプト確認

前準備

ジョブで使うIAM role

以下のポリシーを付与した任意の名前のロールを作っておく。今回はtest-glueという名前にした。
・AmazonS3FullAccess
・AWSGlueServiceRole
※権限は必要に応じてより厳しくしてください。今回は検証のため緩めにしてあります。

今回使うサンプルログファイル(19件)

csvlog.csv
deviceid,uuid,appid,country,year,month,day,hour
iphone,11111,1,JP,2017,12,14,12
android,11112,1,FR,2017,12,14,14
iphone,11113,9,FR,2017,12,16,21
iphone,11114,007,AUS,2017,12,17,18
other,11115,005,JP,2017,12,29,15
iphone,11116,001,JP,2017,12,15,11
pc,11118,001,FR,2017,12,01,01
pc,11117,009,FR,2017,12,02,18
iphone,11119,007,AUS,2017,11,21,14
other,11110,005,JP,2017,11,29,15
iphone,11121,001,JP,2017,11,11,12
android,11122,001,FR,2017,11,30,20
iphone,11123,009,FR,2017,11,14,14
iphone,11124,007,AUS,2017,12,17,14
iphone,11125,005,JP,2017,11,29,15
iphone,11126,001,JP,2017,12,19,08
android,11127,001,FR,2017,12,19,14
iphone,11128,009,FR,2017,12,09,04
iphone,11129,007,AUS,2017,11,30,14

S3に配置

$ aws s3 ls s3://test-glue00/se2/in0/
2018-01-02 15:13:27          0 
2018-01-02 15:13:44        691 cvlog.csv

ディレクトリ構成

in0に入力ファイル、out0に出力ファイル

$ aws s3 ls s3://test-glue00/se2/
                           PRE in0/
                           PRE out0/
                           PRE script/
                           PRE tmp/

クローラー作成と実行、Athenaで確認

ジョブ作成の前に、こちらもGlueの重要な機能の1つであるData CatalogのCrawlerを作成して動かしていきます。
クローラはデータをスキャン、分類、スキーマ情報を自動認識し、そのメタデータをデータカタログに保存する機能です。これを使って入力となるサンプルデータのスキーマを事前に抽出してデータカタログにメタデータとして保存します。Athena、EMR、RedshiftからもこのGlueのデータカタログを使えます。Hive互換のメタストアです(Hiveメタストアのマネージドサービスと思ってもらえればいいと思います)。

基本的な操作はGUIを使って行えます。

AWSマネージメントコンソールから、Glueをクリックし、画面左側メニューの”Crawlers”をクリックし、”Add crawler”をクリック
スクリーンショット 0029-12-28 13.24.04.png

クローラーの名前入力
スクリーンショット 0029-12-28 11.19.32.png

S3にあるソースデータのパス入力(今回はS3に配置してあるデータが対象)
スクリーンショット 0030-01-02 15.17.11.png

そのまま”Next”
スクリーンショット 0029-12-28 9.38.41.png

“Choose an existing IAM role”にチェックを入れ、IAM roleをプルダウンからtest-glueを選択する
スクリーンショット 0030-01-02 15.17.49.png

“Run on demand”にチェックを入れ”Next”(今回のクローラーはスケジュールせずに手動実行とする)
スクリーンショット 0029-12-28 9.39.03.png

スキーマ情報を保存するDatabaseを選択、既存のものがあればそれでもいいし、なければ下の”Add database”でdatabase作成しそれを選択
Prefixは作成されるテーブル名の先頭に付くもの。見分け分類しやすいものにしておくと良いと思います(現状テーブルにtagとかつけられないので)
スクリーンショット 0029-12-28 9.39.23.png

クローラー実行

対象のクローラーにチェックを入れ、”Run Crawler”をクリック
スクリーンショット 0030-01-01 17.02.58.png

テーブルが出来上がる

“se2_”のPrefixが付いた”se2_in0″のテーブルができています。”in0″は指定した”Include Path”の一番下のディレクトリ名です。指定した”Include Path”(ソースデータがあるS3のパス、画像の”Location”の部分に表示されている)の配下のディレクトリは自動でパーティションとして認識されます。今回は配下にディレクトリはありませんのでパーティションも作成されません。
スクリーンショット 0030-01-02 15.21.15.png

テーブルの内容を確認するとスキーマが自動で作成されています
実データ配置場所のLocationがs3://test-glue00/se2/in0
Table名のNameが、prefixのse2_とInclude Pathで指定したs3://test-glue00/se2/in0の一番下のディレクトリのin0でse2_in0

Schemaを見るとuuidやappidなどがbigintで数値型になってます、文字列型がよければここでも修正できます。
今回は一旦このまま進めます
※本来はClassifierでいい感じにしたほうがいいと思う

スクリーンショット 0030-01-02 15.23.51.png

AthenaからもGlueのData Catalog使えます
Athenaから同様のテーブルとスキーマの内容が確認できます。Athenaがスキーマ情報にGlueのデータカタログを使ってることがわかります。画面右上にもGlue Data Catalogへのショートカットもありますね。
スクリーンショット 0030-01-02 15.27.07.png

もちろんクエリ実行もできます。
スクリーンショット 0030-01-02 15.29.31.png

ジョブの作成と実行、Athenaで確認

今回のようなジョブであれば、基本は画面ポチポチするだけです

Glueのメニューから”ETL”の”Job”をクリックし、”Add job”をクリック
スクリーンショット 0030-01-01 17.36.49.png

“Name”にジョブ名を入れ、”IAM role”はクローラーでも使ったロールを指定、”Temporary directory”は任意の場所で構いません。
スクリーンショット 0030-01-02 15.31.34.png

ソースデータとなるテーブルを選択。(先程作成したテーブルをクリック)
スクリーンショット 0030-01-02 15.33.35.png

ターゲットとなるテーブルは作成していないのでここで作ります。
今回は、保存先のData storeは”S3″、出力ファイルフォーマットのFormatは”Parquet”、出力先のパスのTargetPathは”任意の場所”を指定
※圧縮も選べます
スクリーンショット 0030-01-02 15.34.41.png

ソースデータとターゲットデータのマッピング変換ができます。いらないカラムを出力から除外したり、カラムの順序を変えたり、”Data Type”をstring,long,intなどに変えたり、新しいカラムを追加してそこに出力させる(カラム名を変える時に使えそうです)などができます
スクリーンショット 0029-12-28 10.08.35.png

次にサマリがでますので問題なければ”Finish”をクリック

ジョブの実行

作成したジョブにチェックを入れ、”Action”から”Run job”をクリック
スクリーンショット 0029-12-28 10.27.38.png

数分待って以下のように”Run status”が”Succeeded”となれば問題なく完了しています。
※問題があれば、”Error”の箇所にエラーの概要、”Logs”、”Error logs”の箇所の出力がリンクになっていてクリックするとCloudWatch logsに移動します。GlueのログはデフォルトでCloudWatch logsに出力されます
スクリーンショット 0030-01-02 15.46.47.png

出力したParquetフォーマットのファイルを、ソースデータと同様にクローラーを使ってスキーマを作り、スキーマオンリードでAthenaクエリを実行してみます。クローラー作成手順は前回と同様なので割愛します。
クローラーにより自動作成されたスキーマ
スクリーンショット 0030-01-02 15.49.41.png

クエリ結果です。データ量が少なくselectしてるだけなのでアレですが、parquetになったので列単位での集計処理などが高速化されるデータフォーマットに変換ができました。
スクリーンショット 0030-01-02 15.51.00.png

“S3の指定した場所に配置したcsvデータを指定した場所にparquetとして出力する”くらいであればGlueはGUIだけでサーバーレスでできます。

出来上がったPySparkスクリプト確認

se2_job0.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "se2", table_name = "se2_in0", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("deviceid", "string", "deviceid", "string"), ("uuid", "long", "uuid", "long"), ("appid", "long", "appid", "long"), ("country", "string", "country", "string"), ("year", "long", "year", "long"), ("month", "long", "month", "long"), ("day", "long", "day", "long"), ("hour", "long", "hour", "long")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://test-glue00/se2/out0"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

ビルトイン変換のいくつか補足

ApplyMapping:ソースの列とデータ型をターゲットの列とデータ型にマップします

ResolveChoice:複数の型の値が含まれている場合の列の処理方法を指定します。列を単一のデータ型にキャストするか、1つ以上の型を破棄するか、またはすべての型を別々の列または構造体に保持するかを選択できます
make_structは構造体を使用してデータを表現することにより、潜在的なあいまいさを解決します。たとえば、列のデータがintまたはstringの場合、make_structアクションを使用すると、生成されたDynamicFrameにintとstringの両方を含む構造体の列が生成されます。
choiceはspecが空の場合のデフォルトのresolution actionです

DropNullFields:nullフィールドを削除します

To Be Continue

よくありそうな変換処理ケースを今後書いていければと思います。

ジョブを作る際に似たようなジョブを作りたい、テスト段階でジョブのパラメータを一部だけ変えたジョブを作りたいことあると思います。現在はジョブのコピーがGUIからはできないのでそのあたりの運用を考慮する場合はCLIの利用がおすすめです。またあとで書きます。

こちらも是非

似た内容ですがより丁寧にかかれています。安定のクラメソブログ
https://dev.classmethod.jp/cloud/aws/aws-glue-released/
https://dev.classmethod.jp/cloud/aws/aws-glue-tutorial/

Built-In Transforms
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/built-in-transforms.html
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-transforms-ApplyMapping.html

続きを読む

AWS Glueが東京リージョンに来たのでScalaを動かしてみた

はじめに

現在オシゴトでSparkを導入しようとしていたので、今更感がありつつも入門記事を書こうと思っていたのですが、マネージドサービスのAWS Glueが東京リージョンに来たのでその話を書くことにしました。

やったこと

現時点でGlueはPythonのみサポートのようですが、ScalaやJavaもPy4J経由で呼び出せるようなので動かしてみました。

Scalaコード

とりあえず動くことを確認したかったので、S3から読み取ったCSVをそのまま吐き出すだけです。
面白くないです。
AssemblyでFatJarに固めた後、S3にアップロードしておきます。

注意点として、Python側で生成したSparkContext内のjvmに対して呼び出しを行うため、Scala側ではSparkContextの生成は行いません。

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.types.StructType

case class TestCSV(
  id: Int,
  name: String
)

object GlueScala {
  def main() {
    val spark = SparkSession.builder().getOrCreate()

    val ds: Dataset[TestCSV] = {
      import spark.implicits._
      val schema = ScalaReflection.schemaFor[TestCSV].dataType.asInstanceOf[StructType]
      spark.read
        .schema(schema)
        .option("header", true)
        .csv("s3n://path/to/input.csv").as[TestCSV]
    }

    ds.write.csv("s3n://path/to/output")
  }
}

CSVファイルはこんな感じ。

id,name
1,a
2,b
3,c
...

GlueのETL Jobを作成する

Glueの画面から、ETL -> Jobsと進み、Add Jobを押した後のメニューでA new script to be authored by youを選びました。

権限

以下の権限を持つIAMロールも作成/設定する必要があります。

  • Glueの実行権限
  • 処理対象データや、Pythonコード、Scalaで作成したJarファイルを置くS3バケットへの参照権限
  • 処理結果を吐き出すためのS3書き込み権限

Jarの場所を設定

Scalaで作成したJarファイルへのパスを通してあげましょう。
Script libraries and job parameters (optional)エリア内のDependent jars pathの欄にS3のパスを設定します。
このエリアで並列度の設定などもできるようですが今回はデフォルトのままです。

Pythonコードを書く

上記のAdd Jobの設定後出てくるConnectionの画面でOKを押すと、Pythonコードを書くためのエディタが出てきます。

Pythonでは、SparkContextの作成とScalaのメソッドを呼び出す部分だけを記述しました。

S3へアクセスするためキーを設定している個所については、IAMロールの権限を引き継げそうな気もしますが書かないと動かなかったのでとりあえず書いてしまいました。
今後試行錯誤してみます。

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from py4j.java_gateway import java_import

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "XXXXXXXXXXXXXX")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "XXXXXXXXXXXXXX")
sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

java_import(sc._jvm, "GlueScala")
sc._jvm.GlueScala.main()

なお、上記の権限で言及しているとおり、ここで作成したPythonコードはS3上に置かれます。

動かす

あとはPythonエディタの画面の Run job を押すだけです。
もちろんJob一覧から選択してRun job を押してもよいです。
Scalaの ds.write.csv で指定したS3パスにCSVファイルが吐かれることでしょう。

さいごに

というわけでGlue上でScalaコードを動かす事ができました。
実用性については試行錯誤や検証が必要かと思いますが、EMRクラスタを立てなくても動かせる気軽さがうれしいですね。

続きを読む

GCPクラウドアーキテクトの模擬試験をAWS脳で解説してみる

Fusic Advent Calendar 2017 19日目担当、@Kta-Mです。
IoTのテスト用仮想デバイス作成サービスmockmockのプロダクトオーナーをしております。

以前よりAWS アドバンスドコンサルティングパートナーの認定を頂いている弊社ですが、今年はmockmockを通じてソラコム インテグレーションパートナーGCP テクノロジーパートナー と、新たな領域に踏み出した一年でした。

で、ただいまGCPのクラウドアーキテクトの認定を取るべく、お勉強中です。
GCPの試験、AWSと違って模擬試験が無料で何度も受けられるんです!しかも正解を教えてくれる!!

ただ、さすがになぜそれが正解なのか解説まではしてくれないので、GCPにまだ慣れていないAWS脳で解説+蛇足をつらつら書いてみます。AWSは知ってるけどこれからGCPの試験を受けるという、かなりピンポイントな方々の手助けになればと思います。

問題と選択肢の順番は固定のようなので、その前提で書いています。模擬試験を受けないことには何を書いてるのか分からないと思うので、受けてきてからここに戻ってきてください!
https://cloud.google.com/certification/practice-exam/cloud-architect?hl=ja

No.1 (アクセス頻度の低いデータをクラウドに…)

  • AWSのS3に相当するCloud Storageのストレージクラスに関する問題。
  • 「アクセス頻度が低い」のでまずNearlineへ保存する。
  • 5年以上古いものは使わないので削除する。
  • ストレージMultiRegionはアクセスされるエリアへの言及がないので違う。
  • (参考)ストレージクラス
    • https://cloud.google.com/storage/docs/storage-classes?hl=ja
    • Multi-Regional Storage
      • 複数リージョンで保存(AWSにはない)
      • 世界中からアクセスされるようなデータに使う
      • Regional Storageに比べるとちょっとパフォーマンスが低いので分析対象のデータなんかには向かない
      • AWSにはCross-Region Replicationがあるけど、こちらはバケット名が別になるし、リージョン間データ転送量がかかってしまう。
    • Regional Storage : 1つのリージョンで保存 (AWS S3の標準クラス)
      • 普通の。
    • Nearline : 低頻度アクセス用(AWS S3の標準 – 低頻度アクセスクラス)
      • 月イチ程度でアクセスされるぐらいのデータ向き
      • AWSと同じく取り出し料金が発生する
      • AWSと同じく最小保存期間が30日
      • AWSと違って最小オブジェクトサイズがない
    • Coldline : さらに低頻度アクセス用(AWS Glacier)
      • 年イチ程度でアクセスされるぐらいのデータ向き
      • AWSと同じく取り出し料金が発生する
      • AWSと同じく最小保存期間が90日
      • AWSと違って低レイテンシで取り出せる
    • 低冗長化(Durable Reduced Availability)も一応あるけど非推奨。
  • (参考)ライフサイクル

No.2 (会社のアーキテクチャは以下の図のように…)

  • 異なるリージョンからのアクセスが必要なので、Cloud Storageを選択
  • (参考)ストレージ比較
    • https://cloud.google.com/storage-options/?hl=ja
    • 非構造化データ
      • Cloud Storage for Firebase : Mobile用
      • Cluod Storage : AWSのS3/Glacier的なやつ
    • 分析用ストレージ
      • Cloud Bigtable : NoSQLのビッグデータデータベース。低レイテンシで高スループット。HadoopやCloud Dataflow、Dataprocと簡単に統合できる
      • Big Query : SQLでの分析が可能なデータウェアハウス。スケーラブルでフルマネージド。
    • RDB
      • Cloud Spanner : 水平スケーリングが可能なRDB。AWSでいうとAurora Serverlessが近い?
      • Cloud SQL : AWSのRDS的なやつ
    • NoSQL
      • Cloud Firestore for Firebase : Mobile用
      • Cloud Datastore : AWSのDynamoDB的なやつ

No.3 (大規模なウェブ アプリケーションを構築しています…)

  • 「独立したプロジェクト」「RFC1918」を満たすのは共有VPC
  • (参考)共有VPC
     - https://cloud.google.com/compute/docs/shared-vpc/?hl=ja

    • 同一アカウントの別プロジェクト間でVPCを共有
    • AWSと違ってGCPには「プロジェクト」という概念があって、プロジェクトごとにまるっと別の環境が作られるイメージ。
  • (参考)RFC1918
    • https://www.nic.ad.jp/ja/translation/rfc/1918.html
    • プライベートアドレス空間
      • 10.0.0.0 – 10.255.255.255 (10.0.0.0/8)
      • 172.16.0.0 – 172.31.255.255 (172.16.0.0/12)
      • 192.168.0.0 – 192.168.255.255 (192.168.0.0/16)
    • GCPのドキュメントにはわりとよく出てくる
  • (参考)VPC
    • https://cloud.google.com/compute/docs/vpc/?hl=ja
    • http://www.mpon.me/entry/2017/04/22/020428
    • AWSと違ってグローバルなプライベート通信スペースを提供
    • AWSと同じくVPC Peeringがある
    • 同一アカウントの別プロジェクト間でVPCを共有できる(共有VPC)
    • Cloud StorageなどのGCPサービスにもプライベートネットワークで接続可能
    • サブネットの予約IPアドレスは4個
      • ネットワーク アドレス(CIDR 範囲の最初のアドレス)
      • デフォルト ゲートウェイ アドレス(CIDR 範囲の 2 番目のアドレス)
      • 予約アドレス(CIDR 範囲の最後から 2 番目のアドレス)
      • ブロードキャスト アドレス(CIDR 範囲の最後のアドレス)
      • AWSは、0,1,2,3,255の5個が予約されている

No.4 (セキュリティカメラの映像を収集して…)

  • 最初の30日間は読み込み頻度が高そうなのでRegional
  • それ以降はほぼ使われなさそうなのでアーカイブとしてColdlineに移行
  • (参考)永続化ディスク

No.5 (以下に示す Google Cloud Deployment Manager…)

  • インスタンステンプレートに、具体的な永続化ディスクが指定されているのがダメ

    • 1台目はOKかもしれないが、2台目からはアタッチできなくてエラーになりそう
    • そもそもインスタンステンプレートに具体的なディスクの指定があるのがダメなのかも
  • (参考)オートスケーリング関連の名称
    • マネージドインスタンスグループ(AWSのAuto Scaling Group)
    • インスタンステンプレート(AWSのLaunch Config)
  • (参考)永続化ディスクのアタッチ
    • https://cloud.google.com/persistent-disk/?hl=ja
    • 読み書き可能にするのであれば、1台のインスタンスにしかアタッチできない
    • 読み込みのみであれば、AWSと違って複数台のインスタンスにアタッチできる

No.6 (次の図に示すような CI/CD パイプラインプロセスが…)

No.7 (本番環境トラフィックを提供するアプリケーションを…)

  • プロダクションでしかテストできないので、問題が起こっても一番傷が浅い選択肢を選ぶ
  • 選択肢1
    • 全アクセスが新アプリの方に行ってしまう。そもそもロールバックは緊急時の手段なので、ロールバックが計画手順に入ってるのは不適切
  • 選択肢2
    • 全アクセスが新アプリの方に行ってしまう。ちょっと大げさすぎる気も。オンボードの意味が調べても分からないけど向き先を変えるぐらいの意味かな。。
  • 選択肢3
    • 全アクセスが新しいアプリの方に行ってしまう。クライアントのサブセットってなんだろう。
  • 選択肢4
    • 一部のアクセスのみ新アプリに行くので、様子見するには最適。問題が起きたときに戻しやすくもある。
  • (参考)トラフィックの分割

No.8 (アプリケーションのマイクロサービスの1つに…)

  • 「問題が発生している間に、マシンをデバッグする必要があります」-> 問題の発生をリアルタイムで知りたい
  • 選択肢1
    • 日が暮れる
  • 選択肢2
    • 方法としては有効かもしれないが、設問とズレてる
  • 選択肢3
    • 方法としては有効かもしれないが、設問とズレてる
  • 選択肢4
    • 「特定のログ」が発生する分、ログの行数が増えるということ?いずれにせよこれでリアルタイムで問題発生を認識できる。
  • (参考)Stackdriver
    • https://cloud.google.com/stackdriver/?hl=ja
    • AWSのCloudWatch + α
    • もともと単体のサービスだったのをGCPが取り込んだらしく、GCPだけでなくAWSやその他オープンソースパッケージとも統合されているとか。
    • Stackdriver Error Reporting
      • 実行中のクラウドサービスで発生したクラッシュ数をカウントして、分析と集計を実施
    • Stackdriver Trace
      • VM,コンテナ,GAEなどのレイテンシ情報を集計、可視化
    • Stackdriver Logging
      • ログ管理、分析
    • Stackdriver Debugger
      • コードの任意の位置でのアプリケーションの状態をキャプチャ
    • Stackdriver Monitoring
      • ダッシュボード、モニタリング、アラート
      • Slackとも統合できるらしい

No.9 (Dress4Win は将来的に Google Cloud に…)

No.10 (Dress4Win の開発者は、Google Cloud Platform を評価…)

  • GAEがマネージドサービス
  • Google Cloud SDKを使うことで、プロビジョニングが自動化できる

No.11 (下のアーキテクチャ図は、ユーザーがアップロードした…)

  • 生画像はCloud Storageに入れるのが楽な気がする。GAEに送ってもデータが残らないし、作るの大変だし、コストかかるし、タグ付け処理への受渡しを頑張らなきゃだし。
  • タグ付けはCloud Function, 圧縮処理はデータ変換なのでCloud Dataflowを使う
  • Cloud StorageへのファイルアップロードをトリガーにCloud Functionを実行できるらしいけど、なんでPub/Subを間に入れてるんだろ。同時実行数の上限にかからないようにとか?
  • (参考)Google Cloud Function
  • (参考)Google Cloud Pub/Sub

No.12 (Dress4Win は、クラウドにデプロイされたテスト環境…)

  • ペネトレーションテストは自分でしなければならない(Googleに依頼できない)
  • 外部からの侵入テストなので、それに近い状況で試さねばならない
    • GCP内部にセキュリティスキャナを置くのは不適切。
    • VPNもプライベートネットワーク内でのアクセスになるので不適切。
  • (参考)ペネトレーションテスト

No.13 (Dress4Winのセキュリティチームは…)

  • Google Cloud Shellを使えば満たせる操作ばかり

No.14 (マルチペタバイトのデータセットをクラウドに…)

No.15 (米国中部リージョンにある本番環境 Linux 仮想マシンの…)

  • スナップショットはグローバルに使えるので、コピーするだけならCloud Storageに上げる必要は無いんだけど、「コピーの管理」のために上げてるのかな。。
  • (参考)イメージ VS スナップショット
    • https://www.apps-gcp.com/gce-snapshot-backup/
    • イメージはあくまでオートスケーリングのイメージテンプレート用らしく、AMIと同じように扱うのは違うかも。
    • スナップショットからのインスタンス起動は可能。
    • スナップショットは差分で保存されてくが、イメージは都度まるごと保存する。
    • スナップショットは取得したプロジェクト内でしか使えない。
    • どちらもリージョン間で共有可能

No.16 (お客様がストレージプロダクトを…)

No.17 (お客様が、会社のアプリケーションを Google Cloud Platform に移行…)

No.18 (最近のソフトウェア更新が原因となり…)

  • Cloud Storageの静的データはバージョニングしておけばロールバックできる
  • ローリング更新をすると引き返しやすい
  • Cloud Deployment Managerは関係ない
  • VMのスナップショットからの復元は、スナップショット取った後の更新が消える
  • (参考)ローリング更新

No.19 (企業のウェブ ホスティングプラットフォーム上で…)

  • green-blueデプロイモデルはロールバックしやすくするだけで減りはしないような気がするけど。。途中で引き返すのはロールバックにカウントされない?
  • カナリアリリースは本番環境でやらないと意味がない。
  • マイクロサービス化することで、サービス間が疎結合になってミスが減らせそう。
  • (参考)green-blueデプロイ
  • (参考)カナリアリリース
    • https://cloudplatform-jp.googleblog.com/2017/04/how-release-canaries-can-save-your-bacon-CRE-life-lessons.html
    • カナリアリリースのコンセプトは、1913 年に生理学者の John Scott Haldane 氏が、一酸化炭素を検出するためにカゴの中の鳥を炭鉱に連れて行ったことが始まりです。かよわい鳥は人間よりもこの無臭ガスに敏感で、ガス漏れが起きているとすぐに木から落ちてしまうため、それが炭鉱員にとってその場から離れるべきサインとなるのです。

No.20 (リードソフトウェアエンジニアは、新しいアプリケーションの…)

  • 問題がよくわからないけど、複数台構成のWebシステムで、サーバー間でセッションが共有されなくても大丈夫なようにするには?ってこと?
  • セッションアフィニティの話かな。

あわせて読みたい

まとめ

比較しながら見ていくと、それぞれの思想、生い立ちが垣間見えて面白いですね。
たとえばVPCについては、GCPはグローバル、AWSはリージョン毎だったりするわけですが、Googleのサービスはあまり地域性が無いのに対し、Amazonは各国内での商品販売がメインなのでこんな感じになってるのかなと思いを馳せてみたり。

需要がどれほどあるかは分かりませんが、同じ境遇の人の助けになれば幸いです!

続きを読む

【AWS】【EMR】スポットインスタンスでの安定稼働を目指して(3/3):処理特性と設定指針

こちらはフロムスクラッチ Advent Calendar 2017の18日目の記事です。

EEEEEEEEEEEEEEEEEEEE MMMMMMMM         MMMMMMMM RRRRRRRRRRRRRR
E::::::::::::::::::E M:::::::M       M:::::::M R::::::::::::::R
EE:::::EEEEEEEEE:::E M::::::::M     M::::::::M R:::::RRRRRR:::::R
  E::::E       EEEEE M:::::::::M   M:::::::::M RR::::R      R::::R
  E::::E             M::::::M:::M M:::M::::::M   R:::R      R::::R
  E:::::EEEEEEEEEE   M:::::M M:::M:::M M:::::M   R:::RRRRRR:::::R
  E::::::::::::::E   M:::::M  M:::::M  M:::::M   R:::::::::::RR
  E:::::EEEEEEEEEE   M:::::M   M:::M   M:::::M   R:::RRRRRR::::R
  E::::E             M:::::M    MMM    M:::::M   R:::R      R::::R
  E::::E       EEEEE M:::::M           M:::::M   R:::R      R::::R
EE:::::EEEEEEEE::::E M:::::M           M:::::M   R:::R      R::::R
E::::::::::::::::::E M:::::M           M:::::M RR::::R      R::::R
EEEEEEEEEEEEEEEEEEEE MMMMMMM           MMMMMMM RRRRRRR      RRRRRR

当記事に興味を持っていただきありがとうございます。

この記事では、AWS EMR利用者向けにスポットインスタンスを使いつつも、それら安定的に稼働させることを目指して私が取り組んでいることを紹介する記事です。前回の記事はこちらです。

・【AWS】【EMR】スポットインスタンスでの安定稼働を目指して(1/3):スポットインスタンスとは
https://qiita.com/S_Haraguchi/items/92ee6d64a75242742da6
・【AWS】【EMR】スポットインスタンスでの安定稼働を目指して(2/3):インスタンスフリートを考えるhttps://qiita.com/S_Haraguchi/items/1e2ee16b927a790d46b8

今回は3回目の最終回ですが、前回紹介したインスタンスフリートを用いて、実際にどのように構成を組んでみると良いか、を考えていきたいと思います。

オプションの検討ポイント

身もふたもないことを言うようですが、最初に検討していただきたいのは、そもそもオンデマンドではダメなのか?と言うことです。安定的に稼働させたい、というのが第一なのであれば絶対そのほうがいいです。インスタンスフリートだからといって落ちない訳ではないですから。
何かの理由で安定性は気にしなければならないのだが、コスト面も・・・という方は以下の検討ポイントについてご覧ください。

その処理はリトライができるのか?

処理によってリトライが許容されるのかどうか、例えば、データの整合性の面などで、リトライが許されない(またはリトライにコストがかかる)かどうかです。この場合、マスターノードはオンデマンドで構成しておくことが望ましいでしょう。この場合、例えコアノードやタスクノードが落ちてしまったとしても、マスターノードが生きていれば処理は継続されるからです。
代わりに、コアノードやタスクノードでインスタンスフリートを検討するなどのコスト削減方法を考えたほうが良いでしょう。

その処理は時間制約があるのか?

例えば、その処理の完了を前提とする後続のバッチ処理がxx時に開始するから、それまでに必ず終わってなければならない、というような制約があるかどうかです。この場合、インスタンスフリートのオプション、プロビジョニングのタイムアウトとブロックインスタンスの採用を検討すると良いでしょう。
その処理にかかる最長時間を、インスタンスフリートで使用するクラスタタイプのうち、もっともロースペックなもので、予め算出しておいて、プロビジョニングタイムアウトを

プロビジョニングタイムアウトの時間 = (処理が終わっていないといけない日時 - 処理が開始する日時)の時間 - 処理にかかる時間 - 余裕時間

で設定しておきます。

例えば、21:00実行の後続のバッチがあり、15:00にEMRを含む処理があり、その処理が2時間程度かかる、余裕として30分を見ておきたい場合は、

プロビジョニングタイムアウトの時間 = (21:00 - 15:00) - 2時間 - 30分 = 1時間30分

となります。この1時間30分をプロビジョニングタイムアウトに設定し、プロビジョニングタイムアウト時にオンデマンドに切り替えるようにします。
そして、ブロックインスタンスを (処理時間 + 余裕時間)を超える時間(60分単位)、今回ならば3時間でかけると良いでしょう。

こうすることで、スポット価格がやすい時はスポットで実行され、混雑している時でもギリギリまでスポットの実行をトライして、それでもダメならばオンデマンドで実行する・・・と言う構成を組むことができます。

どうでしょう、ちょっと実践的な設定ではないでしょうか?

なお、21:00のバッチに間に合わなかったとしても処理を継続したいのであれば、ブロックインスタンスは使わないようにしたほうがいいでしょう。
ブロックインスタンスを使ってしまうと、マスターノードが生きていたとしてもエラーで落ちたではなく、正常に終了したと見なされてしまうので、再立ち上げは行われないのです。

その処理はいつか終わればいいのか?

昔、PS3が発売された時に、世界中にあるユーザーが使っていない時のPS3のリソースを使用して、ガンの研究の計算を行う、といった取り組み(グリッドコンピューティング)がありました。
あれに近いような形で、スポット価格が高い時は控えめのリソース行わせて、安い時には一気にパフォーマンスあげて計算させる・・・ようなことをさせたい場合、On Demand vCPUを使うと良いでしょう。
マスターノードとコアノード1つずつはオンデマンドでおいておき、課金額を低めに設定したスポットインスタンスをいくつか設定すれば良いのです。
こうした使い方は科学計算をはじめとする研究活動で使うようなことが多いのかなと思います。

インスタンスタイプ構成

前回の記事の中でも記載はしたのですが、インスタンスフリートの構成をとると、そのうちどれが採用されるかはわかりません。
よってそのうちのどれが選ばれても処理が正常終了すること、それは大前提です。
その中で何を選んでいくのかについてです。

使うアプリケーションの種類

せっかく上位のインスタンスタイプを使うのであれば、それが上位であるが故のメリットが出せるような構成にするべきでしょう。例えば、 Prestoを使うのであれば、処理速度にメモリ総量が効いてくるので、例えば必要最低スペックがm4.largeであれば、r4.large、m4.xlarge、r4.xlarge、r4.2xlargeといったr系の上位とm系の上位を組み合わせます。
それによって、EMRの処理が早く終わるようであれば、その分EMRの使用コスト削減にも繋がるわけですから。

課金上限額

ここに関しては考え方は色々あると思うのですが、、、
私は安定性も一定重視するのであれば、プロビジョニングタイムアウトの設定と、タイムアウト時にオンデマンドに切り替えるオプションはほぼ必須だと考えています。これを使う場合であれば、一番ロースペックなもののオンデマンド価格で設定するのが良いと思います。プロビジョニングタイムアウト時にオンデマンドに切り替わる場合、インスタンスどれが選ばれるのかはこれまたコストパフォーマンスによるのですが、多くの場合はもっともロースペックのものが選ばれます。なので、それと同じ価格を上限に設定する価値があるのではないか(同じ値段でよりハイスペックなものが使えるので)と思います。
逆に、立ち上がらなくても問題ないよということであれば、現実的な範囲でなるべく底値に近いような値を指定しておくのがいいんじゃないかなと思いますよ。

オンデマンドと戦う

個人的にはここが結構大事かなと思っています。
課金上限額の欄でも書きましたが、私は安定性も一定重視するのであれば、プロビジョニングタイムアウトの設定と、タイムアウト時にオンデマンドに切り替えるオプションはほぼ必須だと考えています。ただ、その設定を入れた場合、オンデマンドで立ち上がります。いえ、意図通りなんですが・・・
EMRをお使いいただいたことがある方はわかるかもしれませんが、オンデマンドだと30日クラスタが生き続けているようなケースもざらにあります。そしてインスタンスフリートは起動時にしか考慮されないため、一度オンデマンドで立ち上がってしまうと、スポット価格が安くなったとしてもずっとオンデマンドのまま起動しています。
このままではせっかくインスタンスフリートで安くしようと試みたのに、藻屑の泡ですよね・・・。

なので、私は定期的にEMRクラスターを落とすことを考えています。

もちろん、一気に落としてしまうとその間機能が使えなくなってしまうので、代替手段が確保されていることを確認しつつ少しずつ・・・です。最初にメインの方を落として、立ち上がったら、今度は冗長系の方を落とす、とか。
これにより、オンデマンドで立ち上がったとしても、再びインスタンスフリートの世界にひきずりこむことができるわけです。楽はさせまへんで。

おわりに

EMRをスポットインスタンスを活用しつつ安定的に稼働させることを目標に、全3回に渡って執筆して参りました。
正直私はスポットインスタンスの入札価格の安定しなささ、乱高下に困っています。同じように、私以外に困っている方も大勢いるでしょう。
私は今から2ヶ月弱前にEMRを触り始めたばかりの若輩者ですので、玄人の諸先輩方からすると一言物申したいこともあるかと思います。ぜひ!ぜひ!コメントをいただけると、そしてもっとこうした方がいいよというアドバイスをいただけると幸いです・・・。
この記事を起点にして、誰かの何かが解決してくれることを祈っています。

(願わくば私の。)

参考文献

・新機能 – Amazon EMRインスタンスフリート(公式)
https://aws.amazon.com/jp/blogs/news/new-amazon-emr-instance-fleets/

続きを読む

EMR不使用時のコスト削減のために自動terminate処理を実装してみた

はじめに

企業活動を円滑にすすめるためには、ムリムダムラと呼ばれるものはなるべく削除したほうがよい、ということで本投稿内容をやってみました。対象は、AWSサービスの中でも分散処理に使用することの多いEMRサービスです。

背景

分散処理に、AWSのEMRを使用しているのですが、利用後に落とし忘れる人が多いです。
AWSを使っている人ならだれでも気になると思うのですが、立ち上げっぱなしだとコストがかかります。つまり、使わないインスタンスを落とし忘れると無駄な費用が掛かってしまいます。
「もったいない」精神による「ムダなコストは敵だ」の認識のもと、この敵を倒すために落とし忘れ防止策を作ってみました。

今回やりたいこと

・落とし忘れ防止のためのEMRクラスタの自動terminate処理

自動terminate処理を実装してみた

すでにEMRクラスタを立ち上げたり、各アプリが使用するクラスタを管理するアプリケーションがあって、それがrailsで実装されているので、今回もrailsで実装してみました。

削除対象のリストの取得

今回処理を追加しているEMR管理アプリでは、クラスタの情報をDBに保持しているのでlistはDBから取得しています。

  cluster_lists = Cluster.where.not(state: 'TERMINATED')

ちなみに取得元のテーブルはこのような感じです。

mysql> desc clusters;
+--------------------+------------------+------+-----+---------+----------------+
| Field              | Type             | Null | Key | Default | Extra          |
+--------------------+------------------+------+-----+---------+----------------+
| id                 | int(10) unsigned | NO   | PRI | NULL    | auto_increment |
| aws_cluster_id     | varchar(255)     | NO   |     | NULL    |                |
| state              | varchar(255)     | NO   |     | NULL    |                |
| public_dns         | varchar(255)     | YES  |     | NULL    |                |
| created_at         | datetime         | NO   |     | NULL    |                |
| updated_at         | datetime         | NO   |     | NULL    |                |
+--------------------+------------------+------+-----+---------+----------------+
6 rows in set (0.01 sec)

メインであるクラスタ削除を行うメソッドを実装

クラスタのterminate処理から実装してみました。
参考にしたのは、もちろん公式のAPIリファレンスです。
Aws::EMR::Clientを定義して、各APIをメソッドとして実装しています。

# Emr Gateway
class EmrGateway
  # 初期化
  #
  # @param args [Hash] 引数
  # @param client_options [Hash] Emr Client オプション
  def initialize(args = {})
    @client_options = args[:client_options] || { region: Settings.aws.default_region }
  end

  # クラスターにぶらさがるインスタンスの一覧
  #
  # @param cluster_id [String] emrクラスタID
  # @param instance_group_types [Array<String>] インスタンスグループの種類 MASTER CORE TASK
  # @return [Array] 渡されたクラスタIDとグループにぶらさがるインスタンス群
  def list_instances(cluster_id, instance_group_types)
    client.list_instances(
      cluster_id: cluster_id,
      instance_group_types: instance_group_types
    )
  end

  # クラスターそのものの情報を取得
  #
  # @param cluster_id [String] emrクラスタID
  # @return [Aws::EMR::Types::DescribeClusterOutput]
  def describe_cluster(cluster_id)
    client.describe_cluster(cluster_id: cluster_id)
  end

  # クラスターの削除
  #
  # @params cluster_ids [Array] emrクラスタIDのArray
  # 
  def terminate_job_flows(cluster_ids)
    client.terminate_job_flows(job_flow_ids: cluster_ids)
  end

  # Emrクラスタのプロビジョン
  #
  # @param provision_params [Hash] Emrクラスタ設定群
  # @return N/A
  delegate :run_job_flow, to: :client

  private

  # AWS Clientインスタンス生成
  #
  # @return [Aws::EMR::Client]
  def client
    @client ||= Aws::EMR::Client.new(@client_options)
  end
end

実際にクラスタ削除処理のメソッドを実装できたかrails cで確認してみました。

[220] pry(main)> emr=EmrGateway.new
[220] pry(main)> emr.terminate_job_flows(["j-1CG53E3YDQDDO"])
=> #<struct Aws::EmptyStructure>

あとはlist-clustersで回して削除処理を行えばいいだけ・・・と思っていた時期が私にもありました。

EMRクラスタでの処理はSTEPを追加することで行っているのですが、今のままだと処理中にもかかわらず強制終了させてしまうことになります。もし数日かかる処理をEMRクラスタに追加し終了を待っていたとしたら、追加された処理にも関わらず翌日クラスタが削除されていたら泣きたくなるでしょう。

クラスタが未処理のSTEPの保持を確認するためのメソッドを追加

# Emr Gateway
class EmrGateway

  # クラスターのSTEPあるかどうかを確認
  #
  # @params cluster_id [String] emrクラスタID
  # @return 
  def list_steps(cluster_id, status)
    client.list_steps(cluster_id: cluster_id, step_states: status)
  end

同じくrails cで確認を取ってみました。

[12] pry(main)> emr.list_steps("j-1TQ7Y5A9JVMD6",["PENDING"]).steps.count
=> 17
[13] pry(main)> emr.list_steps("j-1TQ7Y5A9JVMD6",["PENDING","RUNNING"]).steps.count
=> 18
[14] pry(main)> emr.list_steps("j-1TQ7Y5A9JVMD6",["PENDING","COMPLETED"]).steps.count
=> 50

ちゃんとSTEPのステータスに応じた確認ができそうです。

プログラム実装

実際にプログラムを組むとこんな感じになりました。
もう少しうまい書き方はある気がします・・・

module EmrResources
  class TerminateClusterUsecase
   def terminate_clusters
      @emr = EmrGateway.new
      cluster_lists = Cluster.where.not(state: 'TERMINATED')

      terminate_cluster(cluster_lists)
    end

    private

    # クラスタの削除処理
    #
    # @params [Array] cluster_ids
    # @return [Aws::EmptyStructure]
    def terminate_cluster(clusters)
      terminate_cluster_ids = [] # 初期化
      # クラスタID指定でSTEPの有無を確認する
      # クラスタをterminatedする
      clusters.each do |cluster|
        terminate_cluster_ids << cluster.aws_cluster_id if @emr.list_steps(cluster.aws_cluster_id,["PENDING","RUNNING"]).steps.count == 0
      end
      @emr.terminate_job_flows(terminate_cluster_ids)
    end
  end
end

実行結果

実際に実行してみた結果です。
今回の削除対象はDBに格納している以下のEMRクラスターです。

mysql> select * from clusters;
+----+-----------------+---------+--------------------------------------------------+---------------------+---------------------+
| id | aws_cluster_id  | state   | public_dns                                       | created_at          | updated_at          |
+----+-----------------+---------+--------------------------------------------------+---------------------+---------------------+
|  1 | j-1HJKN2CY5OCLE | RUNNING | ip-172-16-23-89.us-west-2.compute.internal       | 2017-10-24 11:00:16 | 2017-10-24 11:00:16 |
+----+-----------------+---------+--------------------------------------------------+---------------------+---------------------+
1 row in set (0.00 sec)

実際に実行してみました。
想定通りのレスポンスが返ってきてくれました。

[121] pry(main)> ET = EmrResources::TerminateClusterUsecase.new
=> #<EmrResources::TerminateClusterUsecase:0x0055ce68778580>
[122] pry(main)> ET.terminate_clusters
  Cluster Load (0.6ms)  SELECT `clusters`.* FROM `clusters` WHERE (`clusters`.`state` != 'TERMINATED')
=> #<struct Aws::EmptyStructure>

念のためにcliでも確認を取ってみることに・・・
ちゃんとterminatedになっていてよかったです。

root@VirtualBox:~$ aws emr describe-cluster --cluster-id j-1HJKN2CY5OCLE
{
    "Cluster": {
        "Status": {
            "Timeline": {
                "ReadyDateTime": 1513060872.93, 
                "CreationDateTime": 1513060265.909, 
                "EndDateTime": 1513558939.982
            }, 
            "State": "TERMINATED", 
            "StateChangeReason": {
                "Message": "Terminated by user request", 
                "Code": "USER_REQUEST"

あとはcronで登録するだけで自動削除が完成!

まとめ

今回はEMRの落とし忘れ防止のために、EMRの自動削除プログラムをrailsで実装してみました。
STEP処理後に自動でクラスタが削除される方法もありますが、開発用だと毎回立ち上げる手間が惜しいので、なかなか運用にのらず・・・
とはいえ、何もしないのに立ち上がり続けるEMRクラスタのコストは馬鹿にできないので、今回の実装をしてみました。

本格的に運用に載せていくには、STEP以外の処理への配慮とか、どれくらいの頻度で回すのか、すべてのクラスタを一度に落としていいのか、などなど考えることは満載なので、今後も開発を続けていけるといいなと思います。

続きを読む

Hadoop 3 の GA を記念して S3Guard を試してみる

本記事は個人の見解であり、所属組織の立場、意見を代表するものではありません.

Distributed computing (Apache Hadoop, Spark, Kafka, …) Advent Calendar 2017 の 12/18 分です。

ついに Hadoop 3 が GA になりましたね!
本記事では Hadoop 3 の GA を記念して、新しい機能である S3Guard を試してみます。

S3Guard とは

Amazon S3 は広く知られている通り、整合性モデルとして以下の特徴をもちます。

  • 新しいオブジェクトの PUT に対する 書き込み後の読み取り整合性 (Read-after-write consistency)
  • 上書き PUT および DELETE に対する 結果整合性 (eventual consistency)

Hadoop から S3 を DFS として使う場合 S3A などを使う※わけですが、当然このような結果整合性の影響を受けます。
※ちなみに EMR では S3A ではなく EMRFS が推奨されています。

S3Guard とは S3A の拡張で、S3 上のオブジェクトのメタデータを別途保存・活用することで、Hadoop に整合性をもったビューを提供するための新機能です。
※他にもパフォーマンス改善を目的に含みますが、今回の説明では割愛します。

オフィシャルドキュメントはこちら。

Hadoop と S3 の関係性については imai_factory 先生が去年の Advent Calendar で詳しく解説してくれてますので、そちらをご参照ください。

Hadoop 3 環境の構築

では、早速環境を準備しましょう。
今回は以下の環境に Single Node Cluster を構築します。
– Amazon Linux 2 LTS Candidate AMI 2017.12.0.20171212.2 x86_64 HVM GP2
– c5.xlarge
– IAM ロール/インスタンスプロファイル付与 (DynamoDB/S3 のアクションをすべて許可)

まずは OpenJDK 1.8.0 をインストールして JAVA_HOME を export。

$ sudo yum install java-1.8.0-openjdk
$ export JAVA_HOME=/usr/lib/jvm/jre/

Hadoop 3 GA パッケージをダウンロードして配置。今回は riken さんのミラーを使います。

$ wget http://ftp.riken.jp/net/apache/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
$ tar -zxvf hadoop-3.0.0.tar.gz
$ cd hadoop-3.0.0/

では早速サンプルを動かしてみましょう。正規表現にマッチしたものを表示します。

$ mkdir /tmp/input
$ cp etc/hadoop/*.xml /tmp/input/
$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0.jar grep /tmp/input /tmp/output 'dfs[a-z.]+'
$ cat /tmp/output/*
1       dfsadmin

うまく動きました。

S3A

次に、S3A を使って S3 にアクセスしてみます。
S3A を使うには hadoop-aws-3.0.0.jar をクラスパスに含める必要があるので、hadoop-env.sh を以下のように編集します。

etc/hadoop/hadoop-env.sh
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:share/hadoop/tools/lib/*

これだけですね。では早速 S3A を使って S3 上の入力データを S3 に出力してみます。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0.jar wordcount s3a://us-east-1.elasticmapreduce.samples/wordcount/data/ s3a://sample-bucket/hadoop3/output_wc/
$ bin/hdfs dfs -cat s3a://sample-bucket/hadoop3/output_wc/*

できました。WordCount の結果がどばーっと出ました。(ここでは長いので省略しています。)

S3Guard

ようやく本題。S3Guard を試してみます。
S3Guard を使うには core-site.xml に設定を追加する必要があります。
S3Guard ではメタデータ保存用のデータストアとして DynamoDB を使用します。S3 を使ってるんだから AWS アカウントをもってるだろうということで、DynamoDB を使うのは自然ですね。
ちなみに、EMR には EMRFS Consistent View という機能があり、そちらでも DynamoDB が使用されています。

etc/hadoop/core-site.xml
<property>
    <name>fs.s3a.metadatastore.impl</name>
    <value>org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore</value>
</property>

<property>
    <name>fs.s3a.s3guard.ddb.table</name>
    <value>s3guard-table</value>
</property>

<property>
  <name>fs.s3a.s3guard.ddb.region</name>
  <value>us-east-1</value>
</property>

<property>
    <name>fs.s3a.s3guard.ddb.table.create</name>
    <value>true</value>
</property>

それではサンプルジョブを実行します。

$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0.jar wordcount s3a://us-east-1.elasticmapreduce.samples/wordcount/data/ s3a://sample-bucket/hadoop3/output_wc_s3guard/
$ bin/hdfs dfs -cat s3a://sample-bucket/hadoop3/output_wc_s3guard/*

できました。またもや WordCount の結果がどばーっと出ました。(長いので省略します。)

。。。と、S3Guard のあり/なしの影響がよくわからないですよね。
整合性が影響するようなワークロードじゃないと効果のほどはかなりわかりづらいです。
同時に複数のジョブがアドホックに走る状況で、結果整合性を許容できないようなワークロードで初めて生きてくるわけです。

ただ、DynamoDB に実際に保存されたメタデータの状態には興味が出てきますよね。
そこで、ジョブの実行が終わったこの時点のメタデータを見てみます。

まずはテーブルがちゃんとできているか Describe してみましょう。

$ aws dynamodb describe-table --table-name s3guard-table
{
    "Table": {
        "TableArn": "arn:aws:dynamodb:us-east-1:123456789101:table/s3guard-table",
        "AttributeDefinitions": [
            {
                "AttributeName": "child",
                "AttributeType": "S"
            },
            {
                "AttributeName": "parent",
                "AttributeType": "S"
            }
        ],
        "ProvisionedThroughput": {
            "NumberOfDecreasesToday": 0,
            "WriteCapacityUnits": 100,
            "ReadCapacityUnits": 500
        },
        "TableSizeBytes": 0,
        "TableName": "s3guard-table",
        "TableStatus": "ACTIVE",
        "TableId": "a0227da8-6f98-40b7-b973-67e83a206532",
        "KeySchema": [
            {
                "KeyType": "HASH",
                "AttributeName": "parent"
            },
            {
                "KeyType": "RANGE",
                "AttributeName": "child"
            }
        ],
        "ItemCount": 0,
        "CreationDateTime": 1513514910.2
    }
}

ちゃんとできてました。
それでは、このテーブルの中身をスキャンしてみましょう。
(出力がかなり長いので興味がない方は読み飛ばしてください。)

$ aws dynamodb scan --table-name s3guard-table
{
    "Count": 24,
    "Items": [
        {
            "is_dir": {
                "BOOL": true
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples"
            },
            "child": {
                "S": "wordcount"
            }
        },
        {
            "mod_time": {
                "N": "1513514946338"
            },
            "is_deleted": {
                "BOOL": true
            },
            "parent": {
                "S": "/sample-bucket/hadoop3/output_wc_s3guard/_temporary"
            },
            "child": {
                "S": "0"
            },
            "file_length": {
                "N": "0"
            },
            "block_size": {
                "N": "0"
            }
        },
        {
            "mod_time": {
                "N": "1513514944878"
            },
            "is_deleted": {
                "BOOL": true
            },
            "parent": {
                "S": "/sample-bucket/hadoop3/output_wc_s3guard/_temporary/0/_temporary/attempt_local1505268872_0001_r_000000_0"
            },
            "child": {
                "S": "part-r-00000"
            },
            "file_length": {
                "N": "0"
            },
            "block_size": {
                "N": "0"
            }
        },
        {
            "is_dir": {
                "BOOL": true
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount"
            },
            "child": {
                "S": "data"
            }
        },
        {
            "mod_time": {
                "N": "1513514948217"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/sample-bucket/hadoop3/output_wc_s3guard"
            },
            "child": {
                "S": "_SUCCESS"
            },
            "file_length": {
                "N": "0"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1513514946328"
            },
            "is_deleted": {
                "BOOL": true
            },
            "parent": {
                "S": "/sample-bucket/hadoop3/output_wc_s3guard"
            },
            "child": {
                "S": "_temporary"
            },
            "file_length": {
                "N": "0"
            },
            "block_size": {
                "N": "0"
            }
        },
        {
            "mod_time": {
                "N": "1513514944034"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/sample-bucket/hadoop3/output_wc_s3guard"
            },
            "child": {
                "S": "part-r-00000"
            },
            "file_length": {
                "N": "733216"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1513514946347"
            },
            "is_deleted": {
                "BOOL": true
            },
            "parent": {
                "S": "/sample-bucket/hadoop3/output_wc_s3guard/_temporary/0"
            },
            "child": {
                "S": "_temporary"
            },
            "file_length": {
                "N": "0"
            },
            "block_size": {
                "N": "0"
            }
        },
        {
            "mod_time": {
                "N": "1513514946357"
            },
            "is_deleted": {
                "BOOL": true
            },
            "parent": {
                "S": "/sample-bucket/hadoop3/output_wc_s3guard/_temporary/0/_temporary"
            },
            "child": {
                "S": "attempt_local1505268872_0001_r_000000_0"
            },
            "file_length": {
                "N": "0"
            },
            "block_size": {
                "N": "0"
            }
        },
        {
            "mod_time": {
                "N": "1406157796000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0001"
            },
            "file_length": {
                "N": "2392524"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157796000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0002"
            },
            "file_length": {
                "N": "2396618"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157796000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0003"
            },
            "file_length": {
                "N": "1593915"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157796000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0004"
            },
            "file_length": {
                "N": "1720885"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157796000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0005"
            },
            "file_length": {
                "N": "2216895"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157797000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0006"
            },
            "file_length": {
                "N": "1906322"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157798000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0007"
            },
            "file_length": {
                "N": "1930660"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157798000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0008"
            },
            "file_length": {
                "N": "1913444"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157798000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0009"
            },
            "file_length": {
                "N": "2707527"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157798000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0010"
            },
            "file_length": {
                "N": "327050"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157798000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0011"
            },
            "file_length": {
                "N": "8"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "mod_time": {
                "N": "1406157798000"
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/us-east-1.elasticmapreduce.samples/wordcount/data"
            },
            "child": {
                "S": "0012"
            },
            "file_length": {
                "N": "8"
            },
            "block_size": {
                "N": "33554432"
            }
        },
        {
            "is_dir": {
                "BOOL": true
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/sample-bucket"
            },
            "child": {
                "S": "hadoop3"
            }
        },
        {
            "table_created": {
                "N": "1513514920327"
            },
            "table_version": {
                "N": "100"
            },
            "parent": {
                "S": "../VERSION"
            },
            "child": {
                "S": "../VERSION"
            }
        },
        {
            "is_dir": {
                "BOOL": true
            },
            "is_deleted": {
                "BOOL": false
            },
            "parent": {
                "S": "/sample-bucket/hadoop3"
            },
            "child": {
                "S": "output_wc_s3guard"
            }
        }
    ],
    "ScannedCount": 24,
    "ConsumedCapacity": null
}

入出力の両方に関係する S3 オブジェクトのメタデータが格納されていることが確認できます。

おわりに

今回は Hadoop 3 GA 記念ということで S3Guard を触ってみました。
Hadoop 3 には他にも面白い機能がたくさんありますので、どんどん試して使っていきたいですね。

続きを読む