AWSが提供するアカウント・アクティビティのリアルタイム分析ソリューションを試してみた

AWS上のサービス運用では、セキュリティ・監査・コスト効率化などの観点から、どのようにリソースが操作され、どのようなリソースが利用されているのか、といった情報が求められます。 このようなシステムは、自社で頑張って独自に作 […] 続きを読む

Amazon Kinesis Firehoseが北カリフォルニア、シンガポール、シドニー・リージョンで利用可能になりました

Amazon Kinesis Firehoseはリアルタイムのデータストリームを準備し、簡単にデータストアや分析ツールにロードするサービスです。 re:invent 2015 で発表され、2017年8月には東京リージョン […] 続きを読む

Athenaで入れ子のjsonにクエリを投げる方法が分かりづらかったので整理する

Kinesis FirehoseでS3に置かれた圧縮したjsonファイルを、それに対してクエリを投げる、というのを検証してたのですが、Hive素人なのでスキーマの作り方もクエリの投げ方も正直あんまり良くわかってませんでした。

そこで下記を参照しながらスキーマの作成とクエリ投入をやってみて、最終的にうまくいきました。

日本語記事
https://aws.amazon.com/jp/blogs/news/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/

元記事
https://aws.amazon.com/jp/blogs/big-data/create-tables-in-amazon-athena-from-nested-json-and-mappings-using-jsonserde/

ずーっと日本語記事を読みながらやっていたのですが、これがめちゃくちゃわかりづらい!!!
※理解度には個人差があります

多分知っている人が見たら何となくわかるんでしょうが、恐らくこれを見るのは自分みたいにあまり良く知らないので参考にしながら実際にやってみている、という層だと思います。
最終的に上手く行ってから思ったのは、前提知識がないと読むのがしんどい、ということですね…。
ただもう少し書いといてくれるだけで十分なのに…。
原文も軽く見ましたが、そっちにも書いてないのでそもそも記述されてません。

調べてもまだ中々情報が出てこない上に、クエリ投入時にエラーが出た場合もエラーメッセージが淡白すぎてどこが問題でエラーになってるのかさっぱりわからなくて悪戦苦闘してました。

そんなわけで、今後同じところで困る人が一人でも減るように、自分用メモも兼ねてハマったところについて補足をしておきたいと思います。

概要

リンク先で書いてあることの流れは大まかに下記のとおりです。

  1. FirehoseでSESの送信イベントログをS3に保存する
    送信イベントログはjson形式で、それをFirehoseでS3に保存しています。
  2. Athenaのテーブルを作成して、クエリを投げる
    • ただテーブル作成して投げる場合
    • 入れ子になっているjsonに対してテーブル作成してクエリを投げる場合
    • 禁止文字を含んでいるものに対してテーブルを作成してクエリを投げる場合
      わかりづらいですが、禁止文字を含む項目をマッピングする項目とクエリを投げる項目が分かれています。)
  3. hive-json-schemaの紹介
    jsonからテーブル作成のためのクエリを生成するツールっぽいのですが、紹介してるわりにちっとも使い方が書いてません…。
    使い方の解説をどなたか…。

ハマったところ

入れ子になったjsonに対するテーブル作成について

ハマったところといいつつ、自分はこの辺は割とスムーズに行ったのですが、ちょっとわかりづらいかもしれないので念のため。
サンプルにもありますが、jsonの中にまたjsonとか配列とかが入っている、みたいなケースは多くあります。
そういった場合、内部にあるjsonに対してstruct型を使って、その下の項目について型を定義してやればOKです。
その中にさらにjsonがある場合はさらにその中にstruct型で定義をすればOK。
例にあるものだと、内部にmail{~}とjsonがあり、その中にさらにいくつかのjsonがあるので、それぞれに対してstruct型で定義をしています。
以下引用(全文は貼っていないので、元はリンク先を見てください。)
※一部バッククオート(`)で囲われている項目がありますが、予約語として使われている言葉をそのまま使用するとエラーになるそうです。
そのため、バッククオートで囲うことによってエスケープしてるようです。

抜粋した入れ子の部分
 mail struct<`timestamp`:string,
              source:string,
              sourceArn:string,
              sendingAccountId:string,
              messageId:string,
              destination:string,
              headersTruncated:boolean,
              headers:array<struct<name:string,value:string>>,
              commonHeaders:struct<`from`:array<string>,to:array<string>,messageId:string,subject:string>
              > 

禁止文字そのものについて

まず、禁止文字が色々あることを最初大して理解してませんでした。
項目名(↑の例だと、timestampとかsourceとかのところ)の定義に使用できない文字があります。
記事中だと「:」(コロン)が禁止文字列なので、それがクエリ中の該当箇所に入っているとエラーになります。
あとは「-」(ハイフン)なんかも禁止文字のようです。
例えばHTTPリクエストのログを見たとき、ヘッダとかはハイフンを使った項目がいくつもあったりするので困りますよね。
一応記事中の例では両方「_」(アンダースコア)に変換しています。(コロンにしか触れてませんが…。)
最初は禁止文字があると知らず、なぜエラーになっているかわからずにハマってました。
この辺どっかにまとまってるのかな…?
どうやって回避するかというと、それがWITH SERDEPROPERTIESの部分です。

禁止文字を含む場合のマッピングの仕方について

最初見た時はなんでこんなことをするのかわかりませんでしたが、上記の通り項目名を定義するときに禁止文字が入っているとエラーになります。
なので、WITH SERDEPROPERTIESの項目で、禁止文字列を含んだ項目名を、禁止文字列のない文字列にマッピングし、元のjsonのkeyでは禁止文字列を含んでいたものに対し、テーブル上ではカラム名として別の文字列をあてがうことができます。
記事中では、コロンやハイフンをアンダースコアに変換した文字列にマッピングしています。
式の左側がカラム名に使いたい文字列で、それに対して右側がデータの元の実際の名前です。
"mapping.カラム名に使いたい文字列"="実際の名前" みたいに記述してます。

マッピングの仕方

WITH SERDEPROPERTIES (
  "mapping.ses_configurationset"="ses:configuration-set",
  "mapping.ses_source_ip"="ses:source-ip", 
  "mapping.ses_from_domain"="ses:from-domain", 
  "mapping.ses_caller_identity"="ses:caller-identity"
  )

クエリの投げ方

これもまあおまけで書いておくと、ここまでしっかりと下の項目までテーブルを定義しておくと、下の項目までクエリで引っ張ることが出来ます。
記事中では下記のような例が出ています。

元記事にある例
SELECT eventtype as Event,
         mail.timestamp as Timestamp,
         mail.tags.ses_source_ip as SourceIP,
         mail.tags.ses_caller_identity as AuthenticatedBy,
         mail.commonHeaders."from" as FromAddress,
         mail.commonHeaders.to as ToAddress
FROM sesblog2
WHERE eventtype = 'Bounce'

mail{〜}の下の項目を参照する時は上記のようにドットをつけて該当項目の名前を指定しています。
さらにその下の項目を参照する時はその後ろにさらにドットをつけています。
この辺は直感的にわかりやすいかもしれません。

おまけとかtipsとか

Firehoseで配置されたフォルダ構成ではパーティションを自動で切ってもらえない

hiveではフォルダが/bucketname/path/to/log/year=YY/month=MM/day=dd/foo
みたいな構成だと自動でパーティション設定してくれるらしいのですが、FirehoseでS3にデータ配置すると/bucketname/path/to/log/YYYY/MM/DD/fooみたいになるので、自分でパーティションを作成する必要があります。
パーティションがない状態でクエリを投げても1件も引っかかりません。
これを作るには下記のようなクエリを投げる必要があります。

elbログを対象としたテーブルにパーティションを作成する場合
ALTER TABLE database_name.table_name
ADD PARTITION (year='2016',month='08',day='28')
location 's3://elb-access-log/AWSLogs/00000000000000/elasticloadbalancing/ap-northeast-1/2016/08/28/';

※参考
https://qiita.com/r4-keisuke/items/d3d339b76d4368b6b30a

上記の例だと1日ずつパーティションを設定する必要があるのですが、
パーティション数には上限があるらしい(1テーブル20000まで)ので、1日ずつとか、1時間ずつとかフォルダ分けしている場合はちょっと注意が必要かもしれないです。
※パーティションの上限については下記
https://docs.aws.amazon.com/ja_jp/general/latest/gr/aws_service_limits.html#limits_glue
さすがに対象が多すぎとなるとしんどいので、シェルスクリプトとかで回すといいと思います。
ただ、シェルスクリプト自体も1つ1つの処理実行だとそこそこ時間かかるのと、パーティションを設定するためのクエリでクエリ履歴が埋め尽くされるのが難点です。

データ元にない項目を定義しても値がnullになるだけで問題はない

jsonの出力が一定じゃなくて、いくつかの似たような型のjsonが混ざっていたり、ものによって存在しない項目があったりしても、それらのキーを全て網羅するようにまとめて定義しちゃって問題ないみたいです。
定義したけどデータ元に項目がない場合はnullが入るだけのようで。
逆に元データにある項目を全部定義する必要はないので、元データにあっても使わないような項目はテーブル作成の段階で定義しないようにしてもいいみたいですね。

ざっと書いたので、わかりづらいとか、もっとこうすればみたいな指摘があればいただけると嬉しいです。

続きを読む

Amazon GuardDutyのイベントをSplunkで検索・可視化

re:Invent 2017で発表されたGuardDutyですが、Splunkでそのデータを取り込んで分析できるとのことなので、さっそく試してみました。

記事(英語)
Splunk Announces New Integrations With Amazon Kinesis Firehose and Amazon GuardDuty

Amazon GuardDutyって?

Amazon GuardDuty – 継続したセキュリティ監視と脅威の検知

(抜粋)

GuardDutyは 脅威情報を含む複数のデータストリームから、悪意のあるIPアドレス、デバイスドメインを認識し、あなたのAWSアカウントで悪意のある、もしくは不正な行動があるか特定するために学習します。VPC Flow Logs、CloudTrail のイベントログ、DNS ログを集め組み合わせることにより、GuardDuty は非常に多くのことなったタイプの危険性のある、悪意のある行動を検知します。

なるほど、AWSリソースの脅威を機械学習で発見するんですね。

Splunkって?

https://www.splunk.com/
ログ分析のソフトウェア。
あらゆるマシンデータをインデックスして検索や可視化、アラート通知や分析ができるっていう優れモノ。

設定してみた

5つのステップで設定できます。
1. SplunkにAppインストール
2. Splunk HTTP Event Collector有効化
3. Amazon GuardDuty有効化
4. AWS Lambdaでテンプレから関数作成
5. AWS CloudWatchでGuardDutyとLambdaを設定したルールを作成

ということで、設定方法を書いていきます。

Splunk設定

まずはデータの受け口であるSplunkの設定から
App入れてHTTP Event Collector (HEC)有効化するだけです。

Appインストール

このApp↓をSplunkインスタンスにインストールしましょう。
AWS GuardDuty Add-on for Splunk

(補足)Appインストール方法

Splunkにログインした後、左側のメニューにある歯車アイコンをクリック
Screen Shot 2017-12-14 1.31.38 PM.png

上記リンク先からAppをダウンロードして ファイルからAppをインストール からインスコ、もしくは、 他のAppを参照 からGuardDutyを検索してインスコ
Screen Shot 2017-12-14 1.35.35 PM.png

データ入力設定

Appインストール完了後、ログイン後のトップ画面に aws_guardduty というAppが追加されています。
Screen Shot 2017-12-14 1.38.53 PM.png

早速 aws_guardduty に移動
Screen Shot 2017-12-14 1.41.51 PM.png

まだ何もデータが入ってきていない状態なので、データ受け取りとしてHTTP Event Collectorを設定します。

右上の 設定 から データ入力 をクリック
Screen Shot 2017-12-14 1.42.50 PM.png

HTTPイベントコレクタ をクリック
Screen Shot 2017-12-14 1.44.07 PM.png

別の記事でHECの設定方法は書いたので、これ以降の手順は割愛します。こちらを参照ください。
https://qiita.com/kikeyama/items/515d65906537239e04d2#splunk%E3%81%AE%E8%A8%AD%E5%AE%9A

(注意)ソースタイプは aws:cloudwatch:guardduty を選択してください。

設定後のトークンはどこかにコピペしておいてください。

GuardDuty設定

AWSコンソールからAmazon GuardDutyに行って有効化。

今すぐ始める をクリック
Screen Shot 2017-12-14 1.15.38 PM.png

GuardDutyの有効化 をクリック
Screen Shot 2017-12-14 1.16.47 PM.png

GuardDuty設定は完了
Screen Shot 2017-12-14 1.20.55 PM.png

今はまだ空っぽですけど、とりあえずGuardDutyの設定はこれでおしまいです。

Lambda設定

まずは 関数の作成
これでSplunkにHTTPでイベントをPOSTするインターフェースを作ります。
Screen Shot 2017-12-14 1.49.39 PM.png

設計図 (Blueprints) を選択して、検索画面に splunk と入力して検索
Screen Shot 2017-12-14 1.51.43 PM.png

Splunk Logging を選択
Screen Shot 2017-12-14 1.53.08 PM.png

下にスクロールすると環境変数の設定があるので、こちらにSplunkのHECエンドポイントURLとトークンを設定
Screen Shot 2017-12-14 1.58.09 PM_mosaic.png

で、名前をつけて保存

その後、作成した関数を編集して sourcetype の値を aws:cloudwatch:guardduty に上書き

index.js
    // Advanced:
    // Log event with user-specified request parameters - useful to set input settings per event vs token-level
    // Full list of request parameters available here:
    // http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector
    logger.logEvent({
        time: Date.now(),
        host: 'serverless',
        source: `lambda:${context.functionName}`,
        sourcetype: 'aws:cloudwatch:guardduty',
        event: event,
    });

CloudWatch設定

ルールを一個作りましょう
Screen Shot 2017-12-14 2.08.15 PM.png

サービス名は GuardDuty 、ターゲットは Lambda関数 から、先ほど作成したLambda関数を選択
Screen Shot 2017-12-14 2.09.28 PM.png

あとは名前をつけて保存

以上、すべての設定は完了!

GuardDutyイベントを検索

ということで、しばらく待つとGuardDutyデータがSplunkにインデックスされてきました。

Screen Shot 2017-12-14 2.16.08 PM_mosaic.png

ダッシュボード

GuardDuty Appには既成のダッシュボードがあるようです。

GuardDuty Examples からダッシュボードに移動してみましょう
Screen Shot 2017-12-14 2.19.22 PM.png

Screen Shot 2017-12-14 12.42.27 PM.png

早速脅威が検知されてしまったみたいですね・・・。
比較的シンプルなダッシュボードですが、可視化やモニタリングには十分かな、と。
運用してみて足りない部分は自前でダッシュボード作ってみよう。

最後に

どうやらダッシュボード内のテーブルをクリックすると、Splunk App for AWSにドリルダウンできるようです。

Kinesis FirehoseからSplunkにデータを流せるとのことですし、せっかくなので近日中にこのAppも設定してみて記事を書いてみようかなと思います。

続きを読む

Terraformにおけるクロスアカウント構成なモジュール

これはDMM.com #2 Advent Calendar 2017 8日目の記事です。

カレンダーのURLはこちら
DMM.com #1 Advent Calendar 2017
DMM.com #2 Advent Calendar 2017

こんにちは@mafuyukです。
最近はAWSでクロスアカウントなログ収集基盤の設計、実装、構成の自動化をしていました。

本記事では実際に作成した、ログ収集基盤の構成を参考にTerraformにおけるクロスアカウント構成なモジュールについて紹介したいと思います。

ログ収集基盤の構成紹介

Untitled Diagram (3).png

構成図の説明

別々のAWSアカウントのログを一元管理したいという要件を満たすために用意したログ収集基盤の構成図です。

図の上部にあるAWSアカウントでは、ログの一元管理を行っていて、同じAWSアカウント内のログや複数の別AWSアカウントのログを1つのアカウントに集約しています。
AWSアカウント同士の連携はAssumeRoleで実現しています。

ログ収集の流れ

  1. CWLのログをサブスクリプションフィルターをトリガーとしたLambdaで取得
  2. LambdaはAssume Roleでアクセス権限を取得後、Kinesis FirehoseのAPIをcallしログをストリームに流し入れる
  3. ストリームはログ保管用S3とログ可視化用Elasticsearch Serviceに出力する

構成図の赤丸で囲った部分(上記の1と2の部分)をモジュール化します。

Terraform新機能

実践に入る前におめでたい話

祝Terraform v0.11 リリース

2017年11月16日にTerraform v0.11がリリースされましたー:tada::tada::tada::tada:

今回作成するモジュールのテンプレートではv0.11で新たに追加されたprovidersというオプションを利用するのでTerraformのversionを確認してください:bow:

対象version 0.11.1

v0.11で追加されたmoduleの新オプションprovidersとは??

v0.10まで

v0.10までmoduleを利用する際にはプロバイダの継承が暗黙的に行われていました

provider "aws" {
  version                 = "~> 1.0"
  region                  = "us-west-2"
  shared_credentials_file = "${var.shared_credentials_file}"
}

module "create_module" {
  source = "git::https://github.com/mafuyuk/tf-aws-template?ref=master"
  // moduleを利用しているTerraform環境のデフォルトのプロパイダに対しての操作になる 
}

v0.11から

v0.11からはmoduleに対してプロバイダ情報をprovidersを使って明示的に渡す事ができるようになりました。

provider "aws" {
  version                 = "~> 1.0"
  region                  = "us-west-2"
  shared_credentials_file = "${var.shared_credentials_file}"
}

provider "aws" {
  version                 = "~> 1.0"
  alias                   = "foo"
  region                  = "us-west-2"
  shared_credentials_file = "${var.shared_credentials_file}"
}

module "create_module" {
  source = "git::https://github.com/mafuyuk/tf-aws-template?ref=master"
  providers = {
    "aws" = "aws.foo" // module内のaws(デフォルト)の値がaws.fooプロパイダになる
    "aws.bar" = "aws" // module内のaw.barの値がデフォルトプロパイダになる
  }
}

:warning: module内で1つのプロバイダのみに対して操作する場合は、従来の暗黙的なプロバイダ継承を推奨すると公式に記載されていましたのでうまく使い分けましょう。

実践

module側プロパイダ設定

クロスアカウント構成を実現するためにmodule側でプロパイダ設定を2つ受け取れるようにします。

aws.tf
// ログがあるAWSアカウントのプロバイダ
provider "aws" {
  alias  = "src"
}

// ログを集約するAWSアカウントのプロバイダ
provider "aws" {
  alias  = "dst"
}

moduleを利用する

このモジュールを利用する場合は以下のようにprovidersを渡します。

provider "aws" {
  version                 = "~> 1.0"
  alias                   = "myprov"
  profile                 = "myprov"
  region                  = "${var.myprov_region}"
  shared_credentials_file = "${var.shared_credentials_file}"
}

provider "aws" {
  version                 = "~> 1.0"
  alias                   = "logprov"
  profile                 = "logprov"
  region                  = "${var.logprov_region}"
  shared_credentials_file = "${var.shared_credentials_file}"
}

module "create_resource" {
  source = "git::https://github.com/mafuyuk/tf-aws-template?ref=master"
  providers = {
    "aws.src" = "aws.myprov"
    "aws.dst" = "aws.logprov"
  }
}

受け取った2つのプロバイダ情報を利用する

参考にしたログの収集基盤では、AssumeRole周りで2つのプロパイダ構成を利用する必要がありました。
実際に今回の構成のどの部分に使ったのかみてみましょう。

iam.tf
resource "aws_iam_role" "lambda" {
  provider = "aws.src"
  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy" "writes_to_cwl_policy" {
  // ただのCWLへの書き込み権限なので省略
}

// src側のRoleにdst側のRoleに対してAssumeRoleを行えるポリシー付与
resource "aws_iam_role_policy" "fh_sts_policy" {
  provider = "aws.src"
  role     = "${aws_iam_role.lambda.id}"

  policy = <<EOF
{
    "Version": "2012-10-17",
    "Statement": {
        "Effect": "Allow",
        "Action": "sts:AssumeRole",
        "Resource": "${aws_iam_role.lambda_assume.arn}"
    }
}
EOF
}

// dst側ではsrc側のロールがAssumeRoleを行った際に
// KinesisFirehoseの実行権限を持った一時クレデンシャルを発行することが可能なRoleを作成
resource "aws_iam_role" "lambda_assume" {
  provider = "aws.dst"
  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "${aws_iam_role.lambda.arn}"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF
}

resource "aws_iam_role_policy" "writes_to_fh_policy" {
  provider = "aws.dst"
  role     = "${aws_iam_role.lambda_assume.id}"

  policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "firehose:PutRecordBatch",
      "Resource": [
        "${var.fh_stream_arn}" // 出力先のFirehoseのARN
      ]
    }
  ]
}
EOF
}

Terraformにおけるクロスアカウント構成なモジュールに必要な実装の紹介は以上です。
これで複数のAWSアカウントを股にかけたモジュールの作成ができるようになったと思います:100:

続きを読む

AWSで実現するLambda Architecture

背景

データ分析やデータを活用したアプリケーション開発をしていると、データ基盤をどう構築しようかという点が最初の悩みだと思います。
弊社ではリアルタイムにデータ分析、活用ができるようLambda Architectureをデータ基盤に採用しました。
AWSを利用することによってほぼインフラ運用の労力を負担すること無く実現できています。

Lambda Architecture

Lambda Architectureは当時Twitter社に勤務していたNathan Marz氏によって提唱されたデータ基盤の考え方です。

  • 耐障害性
  • 保守性
  • リアルタイム性

に優れたアーキテクチャと言われています。

Batch Layer, Service Layer, Speed Layerに分かれデータを処理します。

lambda.png

Batch Layer

生データを保存されるレイヤーです。アプリやサーバーから送られる生データがそのまま保存されます。

Service Layer

Batch Layerに保存された生データが集計、加工され、実際にサービスで利用する用のビューが保存されるレイヤーです。

Speed Layer

Batch LayerからService Layerにビューが生成されるのには遅延が発生します。例えば日時バッチでビューを生成する場合、1日の遅延が発生します。
その遅延を埋めるためのレイヤーがSpeed Layerです。
Speed LayerにもBatch Layerと同様生データを保存します。Batch Layerとの違いはデータの鮮度であり、Service Layerに現れるまでのリアルタイムデータをSpeed Layerから取得します。逆にService Layerに取り込まれたデータはSpeed Layerではもう不要となり、削除しても問題ありません。障害が発生してもせいぜいService Layerに取り込まれるまでの遅延分しか影響はありません。
リアルタイム性を求めない場合このレイヤーは不要です。

Query

Service Layerにクエリをかけ、Service Layerに取り込まれていないリアルタイムデータはSpeed Layerにクエリをかけ、両者のデータをマージすることで必要なデータを取得します。

Lambda Architectureの何が良いの?

  1. Batch Layerは不変である

    • クエリをかける側で必要なデータが変更された場合、変更を加えるべきはService Layerに保存されるビューの計算だけですみます。
    • Service Layerが壊れたとしてもBatch Layerから再度Viewを生成するだけで復元できます。
  2. リアルタイム性を求める時の対応が楽

    • Speed Layerを追加するだけで対応できます。

AWSで実現するLambda Architecture

このような構成でLambda Architectureを実現しています。

lambda_origami.png

Batch Layer

Kinesis Firehoseを利用してS3に生データを投げております。
2017/03/01にS3の大規模障害が発生しましたが、Kinesis Firehoseは何かの問題でデータの保存ができなかった場合、最大24時間データをバッファリングして自動で再保存してくれる機能があるためデータを失うことはありませんでした。

Service Layer

S3に保存されたデータはcronサーバーを利用して、Redshiftに保存しています。Firehoseから直接Redshiftまで保存することも可能ですが、Public Accessを許可しなければならないため利用していません。
また、Elastic Map Reduce(EMR)でSparkを動かすことでS3のデータを分析し、結果をRDSに保存することもしています。サービスからはこのRDSにクエリをかけ分析結果を利用しています。

Speed Layer

Kinesis Firehoseを利用してAmazon Elasticsearchにデータを送っています。リアルタイムデータはElasticsearchにクエリをかけ取得しています。
AWS Elasticsearchは日時でindexを切ることを自動でやってくれるため、不要になった過去のデータを削除するのも簡単です。

まとめ

上記のようにLambda Architectureを実現しデータ活用をしています。インフラはほぼ運用することなく実現できているため、本業のデータ分析、活用に力を注ぐことができAWS様様という所存でございます。

参考文献

続きを読む

業務でawsを使っていたらCloudTrailを設定しよう

awsを使っていたらCloudTrailを設定しよう

CloudTrailとは

  • AWS アカウントのガバナンス、コンプライアンス、運用監査、リスク監査を可能にするサービスです。(公式)

CloudTrail-01-HeaderPic-v2-small.png

できること

  • アカウントアクティビティをログに記録し、継続的に監視し、保持できます。
  • SDK やコマンドラインツール、その他の AWS のサービスを使用して実行されるアクションなど、AWS アカウントアクティビティのイベント履歴を把握できます。
  • このイベント履歴により、セキュリティ分析、リソース変更の追跡、トラブルシューティングをより簡単に実行できるようになります。

メリット

コンプライアンスの簡素化

  • アカウント内で行われた操作のイベントログが自動的に記録および保存されるため、コンプライアンス監査を簡素化できます。
  • Amazon CloudWatch Logs との統合により、ログデータを検索したり、コンプライアンス違反のイベントを特定したり、インシデントの調査と監査者の要求に対する応答を迅速化したりできます。

ユーザーとリソースのアクティビティの可視化

  • AWS マネジメントコンソールでの操作と AWS API コールを記録することにより、ユーザーおよびリソースのアクティビティを把握しやすくなります。
  • AWS を呼び出したユーザーとアカウント、呼び出し元 IP アドレス、および呼び出し日時を特定できます。

セキュリティのオートメーション

  • セキュリティの脆弱性を引き起こす可能性のあるイベントが検出されたときに実行されるワークフローを定義できます。
  • 例えば、 Amazon S3 バケットを公開する API コールが CloudTrail によってログに記録されたときに特定のポリシーをそのバケットに追加する、というワークフローを作成できます。

利用してみる

  • CloudTrailにアクセスするとすでにログが記録されているのがわかります
  • 誰が何時に何の操作をしたのかがわかります

Screen Shot 2017-09-16 at 21.28.41.png

  • すべてのイベントを表示してみると

Screen Shot 2017-09-16 at 21.28.56.png

  • 7日以上のログを保存する場合や、アラートを飛ばしたいときは「証跡情報を作成」します

Screen Shot 2017-09-16 at 21.29.17.png

  • s3バケットなどを細かく設定できます!!!!!!

料金

  • WS CloudTrail を使用すると、サポートされたサービスの作成、変更、削除のオペレーションに関する過去 7 日間分のアカウントアクティビティの表示とダウンロードを無料で実行できます。

  • 証跡情報を作成する際に AWS CloudTrail からの課金は発生しません。CloudTrail の証跡を作成すると、Amazon S3 バケットに 2 種類のイベントを送信できるようになります。

  • 詳しくは公式の料金表

サポートされているサービス

ほとんどサポートされていますね

  • AWS Marketplace
  • Amazon Athena
  • Amazon CloudSearch
  • Amazon EMR
  • AWS Data Pipeline
  • Amazon Kinesis Firehose
  • Amazon Kinesis Streams
  • Amazon QuickSight
  • Amazon API Gateway
  • Amazon Elastic Transcoder
  • Amazon Elasticsearch Service
  • Amazon Simple Workflow Service
  • AWS Step Functions
  • Amazon Machine Learning
  • Amazon Polly
  • Amazon WorkDocs
  • アプリケーションの Auto Scaling
  • Auto Scaling
  • Amazon EC2 Container Registry
  • Amazon EC2 Container Service
  • AWS Elastic Beanstalk
  • Amazon Elastic Compute Cloud
  • Elastic Load Balancing
  • AWS Lambda
  • Amazon Lightsail
  • Amazon DynamoDB
  • Amazon ElastiCache
  • Amazon Redshift
  • Amazon Relational Database Service
  • Amazon WorkSpaces
  • AWS CodeBuild
  • AWS CodeCommit
  • AWS CodeDeploy
  • AWS CodePipeline
  • AWS CodeStar
  • Amazon GameLift
  • AWS IoT
  • AWS Application Discovery Service
  • AWS CloudFormation
  • AWS CloudTrail
  • Amazon CloudWatch
  • Amazon CloudWatch Events
  • Amazon CloudWatch Logs
  • AWS Config
  • AWS マネージドサービス
  • AWS OpsWorks
  • AWS OpsWorks for Chef Automate
  • AWS Organizations
  • AWS Service Catalog
  • Amazon Simple Email Service
  • Amazon Simple Notification Service
  • Amazon Simple Queue Service
  • AWS Database Migration Service
  • AWS Server Migration Service
  • Amazon Cognito
  • AWS Device Farm
  • Amazon CloudFront
  • AWS Direct Connect
  • Amazon Route 53
  • Amazon Virtual Private Cloud
  • AWS Certificate Manager
  • Amazon Cloud Directory
  • AWS CloudHSM
  • AWS Directory Service
  • AWS Identity and Access Management
  • Amazon Inspector
  • AWS Key Management Service
  • AWS Security Token Service
  • AWS WAF
  • Amazon Elastic Block Store
  • Amazon Elastic File System
  • Amazon Glacier
  • Amazon Simple Storage Service
  • AWS Storage Gateway
  • AWS Personal Health Dashboard
  • AWS サポート

サポートされていないもの

サポートされていないものもありますので注意が必要です。

Amazon AppStream
Amazon AppStream 2.0
Amazon Connect
Amazon Lex
Amazon Mobile Analytics
Amazon Pinpoint
Amazon Rekognition
Amazon WorkMail
AWS Batch
AWS Greengrass
AWS Shield
AWS Snowball
AWS X-Ray
Amazon Chime
Amazon WorkSpaces Application Manager
AWS Artifact
AWS Mobile Hub
AWS Snowball Edge
AWS Snowmobile
AWS Trusted Advisor

便利なリンク先

こちらいろいろやっていらっしゃる方がいます

まとめ

  • これでアカウントのガバナンス、コンプライアンス、運用監査、リスク監査を可能になりました。
  • 案外かんたんなので、設定触ってみることをおすすめします!

続きを読む

AWSでガッチリログ解析

はじめに

現在EC2上で動いているサービスのログを解析することになり、方法を調べた結果AWS上のサービスを使用してする方法がベストだと考え実装した。これらのサービスは比較的に最近できた(たぶん)のであまりこの方法で紹介している日本語の記事がなかった。今でもログ解析といえば、fluentd + Elasticsearch + Kibanaが圧倒的に王道みたいだがわざわざAWSが色々と提供してくれているのでそちらを使う。以下、EC2でサーバーが動きログファイルがあることを前提にしています。インフラの人でもなんでもないので間違いがあれば、優しくご指導ください。

参考資料

この記事で詰まった際は下記を参照ください。また私は下記の資料を通して実装しました。
AWS公式 ログ解析のチュートリアル
AWS Kinesis Firehose
AWS Kinesis Analytics
AWS Elastic Search
Amazon Kinesis Agent

全体図

Screenshot 2017-09-06 02.55.05.png

全体的な図は上記のようになります。画像ではインスタンスで走っているサーバーがApacheとなっていますが、Nginxや他のサーバーを使用でも設定次第で問題にはなりません。

全体の流れ

システムを構築している時、記述通りしているつもりでも簡単な間違いで動かなくなることが多々あります。そんなときに全体の流れ、繋がりをしっかりと理解することによりデバックが容易になります。

  1. EC2インスタンスから、サービスとサービスをつなげる役割を持つAmazon Kinesis Firehose(以Firehose)にログを流す。
  2. このFirehoseはS3 Bucketにデータを流す先としている。
  3. 次に、Amazon Kinesis AnalyticsはFirehoseからS3 Bucketに流れているログデータを解析し、解析後のデータを他のFirehoseに渡す。
  4. 解析後のログを渡されたFirehoseはAmazon Elasticsearch Service(以下Elasticsearch)にデータを受け流す。
  5. Elasticsearchに保存されたデータはkibinaを通じデータを可視化し、ユーザーにたどり着く。

注意: ヘッダーの右上からRegionをN.Virginiaにしてから以下を進めてください。(N.Virginia を含む特定のRegionでしかここで使用するAmazon Elasticsearch Serviceがないためです。)

ステップ1:一つ目のFirehoseを作成

EC2インスタンスとS3へログを流すFirehoseを作成します。
1. Amazon Kinesis へアクセスします。

2. Firehoseへ行き、 Create Deliver Stream(デリバリーストリームを作成) ボタンを押し作成します。

3. Delivery stream nameを入力しますが、ここはこのFirehoseの名前なのでなんでも構いません。何を入力すればよいかわからない方は 「log-ingestion-stream」としましょう。

4. 次に情報元をDirect PUT or other sources(直接のPUTまたは他の情報源)に選択します。

先にお話したとおりFirehoseは情報を一つの場所からもう一つの場所へ移行させる役割を持ちます。ここではKinesis StreamまたはDirect PUT or other sourcesの選択肢があり、Kinesis Streamを情報元にしている場合はKinesis Streamからそれ以外はDirect PUT or other sourcesを選択します。今回の情報源はEC2なのでDirect PUT or other sourcesとなります。

5. 一番下まで行き、Nextボタンを押します。押した後、次の選択肢はデフォルトのDisabledを選択します。

Firehoseではただ情報を受け渡すだけではなく、AWS Lambdaを使用して情報に変更を加えてから渡すことができるようです。ここではそれの選択肢として、Disable(しない)とEnable(する)があります。私は使用したことがないのでこれ以上の解説はできません。

6. Nextボタンを押し、次は送り先(Destination)をAmazon S3に選択します。

一つ目はEC2からS3へ情報(データ)を送ります。

7. S3のバケットとして、ログの保管する場所を指定します。ここではCreate Newボタンを押してバケットの名前をなんでも構いませんのでつけてください。何にすればいいかわからない方は「log-bucket」としてください。

8. Nextボタンを押し、一番したまで行けば、IAM roleをしていするフォームがありますので、Create new or Chooseボタンを押しましょう。

9. タブが開けば、Create new IAM roleを選択したまま他はデフォルトで右下のAllowを押します。

10. 完了すればNextを押し確認に問題がないようでしたらCreate delivery streamボタンを押し、作成します。

ステップ2:Amazon Kinesis AgentをEC2にインストール

先程作成した、firehoseにEC2からログデータを送るために、AWSが公式に開発しているJAVA製のEC2からFirehoseにリアルタイムでファイル情報を送ってくれるAmazon Kinesis Agentをインストールしましょう。ここが最も間違いが起きやすいところかと思います。

1. ダウンロード方法を参考にAmazon Kinesis Agentインストールしましょうしてください。

yumが入っていれば以下のコマンドでインストールできます。

sudo yum install –y aws-kinesis-agent

私の場合は、 Java JDEなかったのでtools.jar が見つからないというようなエラーが出ました。そのようなエラーが出た方は

sudo apt-get install openjdk-7-jdk

JDKをインストールしましょう。(参考)

2. agentの設定をする

amazon kinesis agentに「どのファイルログのデータをどこに送るのか」または「どのような形で送るのか」という事などを設定していきます。
Amazon Kinesis Agentの設定ファイルは /etc/aws-kinesis/agent.jsonにあるかと思います。そのファイルを以下のように設定してください。
なお、
filePattern: "full-path-to-log-file"full-path-to-log-file解析したいログファイルへのフルパスに(nginxをご使用の方は etc/nginx/access.log かと思います)してください。

deliveryStream: "name-of-delivery-stream"name-of-delivery-streamを送りたいfirehoseの名前に(ステップ1で作成したfirehoseの名前)してください。

/etc/aws-kinesis/agent.json
{
  "cloudwatch.endpoint": "monitoring.us-east-1.amazonaws.com",
  "cloudwatch.emitMetrics": true,
  "firehose.endpoint": "firehose.us-east-1.amazonaws.com",
  "flows": [
  {
    "filePattern": "full-path-to-log-file",
    "deliveryStream": "name-of-delivery-stream",
    "dataProcessingOptions": [
    {
      "initialPostion": "START_OF_FILE",
      "maxBufferAgeMillis":"2000",
      "optionName": "LOGTOJSON",
      "logFormat": "COMBINEDAPACHELOG"
    }]
 }
 ]
}

長くなってしまいますが、親切にするためにはここで気にしなくてはならないことが幾つかあります。

エンドポイント(firehose.endpoint)

一番初めの注意通り、N.VirginiaにFirehoseを作成している方はなんの問題もありませんが、そのようにしていない方はエンドポイント一覧を参考にして, cloudwatch.endpointfirehose.endpointを変更してください。なお、はじめの注意でも述べたようにElasticsearchでは数少ないリージョンにしか対応していないため、他のリージョンにしている方は最後になってやり直さなければならない可能性もあります。

情報処理の設定(dataProcessingOptions)

Apacheを使用している方はなんの問題もありませんが、nginxを使用している方はここに少し変更が必要です。こちらの設定でログデータの処理方法を変更できます。
設定のオプションとしてmatchPatternがあり、こちらと正規表現を使用してどのようなログフォーマットでも処理が可能になります。下のものは処理をしたいログとそのmatchPatternの一つの例ですので参考にしてご自身のものを変更してください。(参照)

111.111.111.111 - - [02/Dec/2016:13:58:47 +0000] "POST /graphql HTTP/1.1" 200 1161 "https://www.myurl.com/endpoint/12345" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.99 Safari/537.36" 0.172 "query user userMessages hasPermissions F0 F1" 11111;222222
{
    "dataProcessingOptions": [
    {
      "optionName": "LOGTOJSON",
      "logFormat": "COMMONAPACHELOG",
      "matchPattern": "^([\d.]+) - - \[([\w:/]+).*\] "(\w+) /(\w+).*(\d.+)" (\d{3}) (\d+) "(\S+)" "(.*?)" ([\d.]+) "(.*?)" (\d+);(\d+)",
      "customFieldNames": ["client_ip", "time_stamp", "verb", "uri_path", "http_ver", "http_status", "bytes", "referrer", "agent", "response_time", "graphql", "company_id", "user_id"]
    }]
}

3. Amazon Kinesis Agent始動

以下のコマンドでagentを動かします。

sudo service aws-kinesis-agent start

これで終わってもいいのですが、Agentのログファイルでしっかり動いていることを確認しましょう。

tail -f /var/log/aws-kinesis-agent/aws-kinesis-agent.log

tailコマンドを利用してAgentの活動ログを監視します。無事動いているようなら

2017-09-08 08:47:29.734+0000 ip-10-0-0-226 (Agent.MetricsEmitter RUNNING) com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 9135 records parsed (1257812 bytes), and 9132 records sent successfully to destinations. Uptime: 88380043ms

このようなログが定期的に流れてきます。

考えられるエラーと解決方法を紹介します

agentがログファイルを読めていない

考えれる原因の考えられる原因は、
1. ログファイルへのフルパスが間違えている
2. ログファイルを読む権限がない

1.のフルパスが間違えている場合は、先程のagentの設定ファイルで変更した。”filePattern”のパスに問題があることになるので、そのパスが実際存在するか確認しましょう。

tail ログファイルへのパス

file does not exists 的なメッセージがでたらファイルが存在しないということなので正しいログファイルへのフルパスに変えてあげましょう。

2.のログファイルを読む権限がない場合は、agentがログファイルを読み込めれるように

sudo chmod o+r ログファイルへのパス

で誰でもログファイルを読めるようにしましょう。(インフラエンジニア的にこれはありなのか疑問)

ログデータの送り先が不在

これが該当する場合は、確か404のエラーコードを大量にログファイルに出力されます。
この場合は、agentのログファイルのdeliveryStreamのプロパティの名前と一番初めに作成したAmazon Kinesis Firehoseの名前があっていることを確認しましょう。
それがタイポもない場合はagentを再インストールしたらなおるかもしれません。

アクセスキーとシークレットキーがみつからない

credentialsなのでAWSのアクセスキーとシークレットキーが見つからない方は、awsのcredentialsに登録してもいいですが、確実なのはagentの設定ファイルで設定することです。

/etc/aws-kinesis/agent.json
{
  "awsAccessKeyId": "あなたのアクセスキー",
  "awsSecretAccessKey": あなたのシークレットキー
}

ステップ3:Amazon Elasticsearch Serviceドメインの作成

  1. Elasticsearchのページに行き、新たなドメインを作成するためにGet StartもしくはCreate a new domainのボタンをクリックします。
  2. はじめにドメインに名前をつけます。他と同じくなんでも構いませんが、何をつければいいのかがわからない方は「log-summary」にでもしましょう。
  3. 次の選択肢はElasticsearchのバージョンですが、特にこだわりがないかたは最新でものでいいと思います。なのでそのままにして、Nextボタンをおしましょう。
  4. 次の設定も同じく特にこだわりがない方はそのままにしておいて、Nextボタンをおして次にいってください。
  5. その次では、このElasticsearchへのアクセスを制限をします。Templateから簡単に設定ができますのでそちらから各自設定してください。難しいことはわからないしめんどくさいのも嫌な方はTemplateからAllow open access to the domainを選択してください。こちらは特に制限なくアクセスが可能になるのでAWS側ではおすすめしていません。重要なデータをお使う場合は必ずきちんと設定しましょう。
  6. 次の画面でもろもろの設定を確認してConfirmボタンをおしましょう。そうすればElasticsearchドメインが作成されます。(起動までしばらく時間がかかります)

ステップ3:二つ目のFirehoseの作成

次に先程作成したAmazon Elasticsearch Serviceへデータを送るためにfirehoseを作成します。これは、EC2から一つ目のFirehoseを通じ、(まだ作ってない)Amazon Kinesis Analyticsにより処理された情報をElasticsearchに送るためのFirehoseです。

  1. Amazon Kinesis へアクセスします。
  2. Firehoseへ行き、 Create Deliver Stream(デリバリーストリームを作成) ボタンを押し作成します。
  3. Delivery stream nameを入力しますが、ここはこのFirehoseの名前なのでなんでも構いません。何を入力すればよいかわからない方は 「log-summary-stream」としましょう。
  4. 次に情報元をDirect PUT or other sources(直接のPUTまたは他の情報源)に選択します。
  5. 一番下まで行き、Nextボタンを押します。押した後、次の選択肢はデフォルトのDisabledを選択します。
  6. Nextボタンを押し、次は送り先(Destination)をAmazon Elasticsearch Serviceに選択します。
  7. 送り先のElasticsearchとして、先ほど作成したドメインを選択します
  8. Indexは、request_dataにして、typesはrequestsにします。その他のIndex RotationとRetry durationはそのままにしておいてください。
  9. Elasticsearchに送るのを失敗した場合にS3にバックアップとしてデータを送ります。Backup S3 bucketとして新しく、バケットを作成しましょう。名前はなんでも構いませんが、思いつかない方は「log-summary-failed」とでもしておいてください。
  10. Nextボタンを押し、一番したまで行けば、IAM roleをしていするフォームがありますので、Create new or Chooseボタンを押しましょう。
  11. タブが開けば、Create new IAM roleを選択したまま他はデフォルトで右下のAllowを押します。
  12. 完了すればNextを押し確認に問題がないようでしたらCreate delivery streamボタンを押し、作成します。

ステップ4: Amazon Kinesis Analytics を作成

Amazon Kinesis Analyticsでははじめに作成したEC2からログデータを取ってくる一つ目のFirehoseと先ほど作成したElasticsearchを目的地にしている二つ目のFirehoseの中間にあるものになります。つまりは一つ目のFirehoseからログ情報を所得し、それを処理した後、二つ目のFirehoseに渡しそれがElasticsearchへ流れ着きます。では実際に作っていきましょう。

  1. Amazon Kinesis Analyticsへアクセスする。
  2. Create Applicationに行き、Applicationの名前をつけます。何をつければいいかわからない方は、log-aggregationとでもしておいてください。なんでも構いません。
  3. Descriptionの方も何か書きておいた方は書いていただき、特にわからない方は空欄のままCreate Applicationボタンをおしましょう。

情報元(Source)を選択する

  1. 作成した後、作成されたアプリケーションのホームページに行くと思います。 そのままConnect to a sourceボタンをおして情報元を選択します。
  2. 情報元を選択するページに行くと2つのFirehoseの名前が出てくると思いますが、ここのステップのはじめにも紹介したように一つ目のFirehose(EC2から情報を取ってきている方)を選択してください。
  3. 選択した後しばらく待つと、流れてきているログをAnalyticsアプリケーションが所得するので、それが確認出来しだいSave and continueボタンをおしましょう。

SQLを使用して処理をする

今まではクリックするだけだっだのですが、ここがおそらく最も大変なところです。SQLをもともとちゃんと書けるエンジニアは簡単に実装できていいじゃんという感じですが、私みたいなエセエンジニアは困ったものです。

Go to SQL editorでFirehoseにホーム画面からSQLおエディターに移動できます。
ここで好きなようにデータを処理変更してくださいと言っても何をどうすればいいかわかんない方がいると思います。
このSQLではtableの代わりにstreamを変更します。SOURCE_SQL_STREAM_001が情報が流れているstreamでこちらからログ情報を取ってきて、処理をしDESTINATION_SQL_STEAMという名前のストリームに変換します。そうしたらそのストリームを次のFirehoseに送ってくれます。
以下に一つの処理方法の例を載せておきます。これはresponseというカラムの数の合計をstatusCountに入れています。つまりresponse(200, 404とか)の数をまとめて数を数えているということです。
ここからわかることは、”(ダブルクオーテーションマーク)で囲むことで、SOURCE_SQL_STREAM__001のカラムの値が手に入ることです。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
 datetime VARCHAR(30),
 status INTEGER,
 statusCount INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
 INSERT INTO "DESTINATION_SQL_STREAM"
 SELECT
 STREAM TIMESTAMP_TO_CHAR('yyyy-MM-dd''T''HH:mm:ss.SSS',
LOCALTIMESTAMP) as datetime,
 "response" as status,
 COUNT(*) AS statusCount
 FROM "SOURCE_SQL_STREAM_001"
 GROUP BY
 "response",
 FLOOR(("SOURCE_SQL_STREAM_001".ROWTIME - TIMESTAMP '1970-01-01 00:00:00') minute / 1 TO MINUTE);

こちらが僕が実際に使っているコードです。 SOURCE_SQL_STREAM_001のdatetimeのフォーマットyyyy-MM-dd''T''HH:mm:ss.SSSのように変換してElasticsearchへ保存しています。

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
 datetime VARCHAR(30),
 host VARCHAR(16),
 request VARCHAR(32),
 response INTEGER,
 bytes INTEGER,
 agent VARCHAR(32));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
 INSERT INTO "DESTINATION_SQL_STREAM"
 SELECT
 STREAM 
 TIMESTAMP_TO_CHAR('yyyy-MM-dd''T''HH:mm:ss.SSS', CHAR_TO_TIMESTAMP('dd/MMM/yyyy:HH:mm:ss Z', "datetime")) as datetime,
 "host" as host,
 "request" as request,
 "response" as response,
 "bytes" as bytes,
 "agent" as agent
 FROM "SOURCE_SQL_STREAM_001"

より詳しくは公式ページドキュメントを参照してください。
コメントで質問していただければ頑張って答えます。

目的地を選択する

そのままDestinationタブを選択して、処理された情報の送り先を選択します。Select a streamから二つ目のFirehose(Elasticsearchにつながっている方)を選択し、他のものはそのままでSave and continueをクリックしましょう。

これで全てのパイプラインは繋がり作業はほとんど完成です。

ステップ5:Kibanaで可視化されたデータをみる

Amazon Elsacticsearch serviceにはデータを可視化してくれるKibanaが含まれています。

Elasticsearchの作成したドメインのホーム画面に行きkibanaという文字の横にリンクが有ると思います。そこをクリックするとKibinaのページに飛びます。はじめて訪れた場合はindex-patternを入力しなければなりません。そこにはrequest_dataと入力してください。

仮に、request_dataと入力しても続けれない方はしばらく時間をおいてみてもう一度試してください。データがElasticsearchに貯まるまで時間がかかるためだと思われます。
KibanaはDatetimeのカラムを自動で認識します。ただし、フォーマットに制限があるらしくyyyy-MM-dd''T''HH:mm:ss.SSSのような形で情報を保存すればKibanaは必ずDatetimeだと認識します。

そのまま続行を押すとKibanaのサイトで自由に可視化されたデータをみれます。

長い作業、お疲れ様でした。他にはAmazon Kinesis Analyticsでデータを複雑に処理をしたりも可能なので挑戦してみてください。
編集リクエストお待ちしております。m(_ _)m

続きを読む