センサデータを fluentd 経由で Amazon Elasticsearch Service に送信して可視化

1. はじめに

以前の記事 で、RaspberryPi で収集したセンサデータを、 さくらVPS上に構築した Elasticsearch に送信して、Kibana で可視化しました。
今回は勉強を兼ねて、データを Amazon Elasticsearch Service ( Amazon ES ) に送信するように構成を変更します。

2. 全体の構成

image.png

3. 設定

3-1. Server side ( Amazon ES )

Amazon ES を立ち上げます。

Amazon ES ダッシュボード

  • 画面右上で、東京リージョン( ap-northeast-1 )が選択されていることを確認します
  • 新しいドメインの作成ボタンを押します
image

ドメインの定義

  • ドメイン名と Elasticsearch のバージョンを設定します
image

クラスターの設定

  • 今回は最小構成にします
image

アクセスの設定

  • ダッシュボードは特定メンバーに公開したかったので、パブリックアクセスとして、IPアドレスでアクセス制限を設定しました
  • 本当は IAM Role で制限したかったのですが、Webブラウザからのアクセスが面倒になるので今回は見送りました ( ブラウザはIAM認証できない )
image

完了確認

  • 10分ほど待ちます
image
  • 設定の状態が「アクティブ」になれば完了です
image

3-2. Sensor side ( Raspberry PI )

前提条件

以前の記事 の状態が前提です。今回はこれに変更を加えます。

プラグインのインストール

  • fluentd から Elasticsearch に直接格納するためのプラグインをインストールします
  • なお、IAM 認証をする場合は fluent-plugin-aws-elasticsearch-service を使うようです
sudo fluent-gem install fluent-plugin-elasticsearch

fluentd の設定

  • fluentd の設定ファイルを編集して、データの送信先を変更して、fluentd を再起動します
/home/pi/fluent/fluent.conf
<source>
  @type tail
  format json
  path /home/pi/myroom.log
  pos_file /home/pi/myroom.log.pos
  tag log.myroom
</source>

<match log.myroom>
  @type copy
  <store>
    @type elasticsearch
    type_name myroom
    logstash_format true
    logstash_prefix myroom
    reload_connections false
    hosts https://search-myroom-********.ap-northeast-1.es.amazonaws.com
  </store>
</match>

4. 確認

データが送信されていることを確認しました。
image.png

続きを読む

AWSでElastic Stack – 前回の続き Kibana Filebeatのアップグレード

はじめに

前回、KibanaとFilebeatもアップグレードするような記事を書いたのですが、途中で力尽きてElasticsearchのアップグレードで終わってしまったので、短くなりますが、KibanaとFilebeatもアップグレードしたいと思います。
前回の記事(前回中途半端で申し訳ないですが)の構成を参照して頂ければと思います。

AWSの小ネタ

本題とは関係ありませんが、一応AWSタグを付けているので、AWSで悩んだ話について。

IAMロールで起動したEC2インスタンスのプロキシ環境下にて、プロキシサーバのログに以下のログが大量に出力されていました。

TCP_MISS/404 609 GET http://169.254.169.254/latest/meta-data/network/interfaces/macs/xx:xx:xx:xx:xx:xx/local-ipv4s - HIER_DIRECT/169.254.169.254 text/html

それでxxになっているMACアドレスを持つサーバを調べると以下のログが連続して大量に出力されていました。

ec2net: [get_meta] Trying to get http://169.254.169.254/latest/meta-data/network/interfaces/macs/xx:xx:xx:xx:xx:xx/local-ipv4s

恐らくec2がmetadataを取りにいくのですが、プロキシサーバを経由してしまうせいで、
ご本人じゃありませんよとAWSのmetadataサーバに拒否されているんでしょうと推測しました。

プロキシ環境下では169.254.169.254はプロキシを経由しないようにNO_PROXYの設定を設定します。
http://docs.aws.amazon.com/ja_jp/cli/latest/userguide/cli-http-proxy.html

自分もこの設定を入れていましたので、設定が有効になっていないかと悩まされることになりました。そこで一旦、exportしているプロキシの設定を削除しましたが、
やっぱりプロキシサーバを経由することを止められませんでした。

そこで他のプロキシを利用する設定を考えたところ、そういえば最初は全てのアクセスをプロキシ経由にするつもりが無くて、yumやwget,curlくらいしかInternetへのアクセスをしないので、それぞれのコンフィグにプロキシの設定を個別に書いていたなと思い至りました。

それで結局当たったのは、curlの.curlrcのプロキシの設定でした。
ここにはNO_PROXYの設定は書いていませんでした。
ここの設定が有効でプロキシサーバ経由でアクセスしているとは・・
ec2がmetadataを取得する際にはcurlで取りにいっているってことですかね?(分かってない)

1.アップグレード作業

では前回やり残したKibanaとFilebeatのアップグレード作業を実施したいと思います。

1.1.事前準備

公式のドキュメントを参考にしながら進めていきます。
Kibana
https://www.elastic.co/guide/en/kibana/current/rpm.html
Filebeat
https://www.elastic.co/guide/en/beats/filebeat/current/setup-repositories.html

あれ・・・前回見た時は6.0だったのに、どんどん更新されていきますね。

1.1.1.GPGキーのインストール

KibanaとFilebeatをインストールしているそれぞれのサーバにて実施します。

Kibanaをインストールしたサーバ(srv1)

# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

Filebeatをインストールしたサーバ(srv4)

# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

1.1.2.リポジトリの修正

6.0系のリポジトリを用意します。

Kibana

# vi /etc/yum.repos.d/kibana.repo
[kibana-6.x]
name=Kibana repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

Filebeat

# vi /etc/yum.repos.d/beats.repo
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

1.2.アップグレード

Kibana

# yum update kibana
Loaded plugins: priorities, update-motd, upgrade-helper
42 packages excluded due to repository priority protections
Resolving Dependencies
--> Running transaction check
---> Package kibana.x86_64 0:5.6.2-1 will be updated
---> Package kibana.x86_64 0:6.1.1-1 will be an update
--> Finished Dependency Resolution

Dependencies Resolved

==============================================================================================================================
 Package                     Arch                        Version                        Repository                       Size
==============================================================================================================================
Updating:
 kibana                      x86_64                      6.1.1-1                        kibana-6.x                       63 M

Transaction Summary
==============================================================================================================================
Upgrade  1 Package

Total download size: 63 M
Is this ok [y/d/N]: y

Filebeat

# yum update filebeat
Loaded plugins: priorities, update-motd, upgrade-helper
Resolving Dependencies
--> Running transaction check
---> Package filebeat.x86_64 0:1.3.1-1 will be updated
---> Package filebeat.x86_64 0:6.1.1-1 will be an update
--> Finished Dependency Resolution

Dependencies Resolved

===================================================================================================================================================================================================
 Package                                        Arch                                         Version                                       Repository                                         Size
===================================================================================================================================================================================================
Updating:
 filebeat                                       x86_64                                       6.1.1-1                                       elastic-6.x                                        12 M

Transaction Summary
===================================================================================================================================================================================================
Upgrade  1 Package

Total download size: 12 M
Is this ok [y/d/N]: y

1.3.サービスの再起動

Kibana

# service kibana restart
kibana started

Filebeat

 service filebeat restart
2017/12/25 08:46:58.653044 beat.go:436: INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017/12/25 08:46:58.653113 metrics.go:23: INFO Metrics logging every 30s
2017/12/25 08:46:58.653234 beat.go:443: INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017/12/25 08:46:58.653256 beat.go:203: INFO Setup Beat: filebeat; Version: 6.1.1
2017/12/25 08:46:58.653386 client.go:123: INFO Elasticsearch url: http://192.100.0.4:9200
2017/12/25 08:46:58.653586 module.go:76: INFO Beat name: ip-192-100-0-36
Config OK
Stopping filebeat:                                         [  OK  ]
Starting filebeat: 2017/12/25 08:46:58.773001 beat.go:436: INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017/12/25 08:46:58.773063 metrics.go:23: INFO Metrics logging every 30s
2017/12/25 08:46:58.773112 beat.go:443: INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017/12/25 08:46:58.773132 beat.go:203: INFO Setup Beat: filebeat; Version: 6.1.1
2017/12/25 08:46:58.773280 client.go:123: INFO Elasticsearch url: http://192.100.0.4:9200
2017/12/25 08:46:58.773479 module.go:76: INFO Beat name: ip-192-100-0-36
Config OK
                                                           [  OK  ]

1.4.確認

KibanaとFilebeatのバージョンやログが取れているか等確認します。

Filebeatアップグレード前

# filebeat -version
filebeat version 1.3.1 (amd64)

Filebeatアップグレード後

# filebeat -version
filebeat version 6.1.1 (amd64), libbeat 6.1.1

Kibanaアップグレード前
kibana-version.png

kibanaアップグレード後
kibana6.1ver.png

あ~~ elasticsearchとバージョンが一致しないって怒られてますね。
マイナーバージョンも一致が必要なのは作業的に面倒ですね・・

というわけでまた前回の記事と同じようにアップグレードしてきました。

# curl -XGET 'localhost:9200/'
{
  "name" : "node001",
  "cluster_name" : "my-cluster",
  "cluster_uuid" : "e06BKBFFSpiSkFwNT3kWLw",
  "version" : {
    "number" : "6.1.1",
    "build_hash" : "bd92e7f",
    "build_date" : "2017-12-17T20:23:25.338Z",
    "build_snapshot" : false,
    "lucene_version" : "7.1.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

# curl -XGET 'http://localhost:9200/_cat/plugins?v'
name    component         version
node002 analysis-kuromoji 6.1.1
node002 x-pack            6.1.1
node003 analysis-kuromoji 6.1.1
node003 x-pack            6.1.1
node001 analysis-kuromoji 6.1.1
node001 x-pack            6.1.1

改めてKibanaを確認してみます。

kibana6-1.png

エラー直りました。通常のログイン後の画面に。
先ほどのエラー画面だから画面の色合いが違うのかと思っていたのですが、普通に落ち着いた感じになっております。
改めてバージョンを確認します。

kibana_ver6-1.png

OKですね。

では、次にfilebeatからデータが送られているかを確認します。

filebeat1.png

(新しいデータが)有りません

そこでfilebeatのログを確認してみたところ・・・

2017-12-27T07:53:01Z INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017-12-27T07:53:01Z INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017-12-27T07:53:01Z INFO Metrics logging every 30s
2017-12-27T07:53:01Z INFO Setup Beat: filebeat; Version: 6.1.1
2017-12-27T07:53:01Z INFO Elasticsearch url: http://192.100.0.4:9200
2017-12-27T07:53:01Z INFO Beat name: ip-192-100-0-36
2017-12-27T07:53:01Z INFO filebeat start running.
2017-12-27T07:53:01Z INFO Registry file set to: /var/lib/filebeat/registry
2017-12-27T07:53:01Z INFO Loading registrar data from /var/lib/filebeat/registry
2017-12-27T07:53:01Z INFO Total non-zero values:  beat.info.uptime.ms=3 beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3081016 beat.memstats.memory_total=3081016 filebeat.harvester.open_files=0 filebeat.harvester.running=0 libbeat.config.module.running=0 libbeat.output.type=elasticsearch libbeat.pipeline.clients=0 libbeat.pipeline.events.active=0 registrar.states.current=0
2017-12-27T07:53:01Z INFO Uptime: 3.375689ms
2017-12-27T07:53:01Z INFO filebeat stopped.
2017-12-27T07:53:01Z CRIT Exiting: Could not start registrar: Error loading state: Error decoding states: json: cannot unmarshal object into Go value of type []file.State

なんかエラー出て、filebeatが起動していない感じですかね。
そのエラーについて調べてみたところ、同じような状態になった方がおり、解決されていました。
https://discuss.elastic.co/t/exiting-could-not-start-registrar-error-loading-state-error-decoding-states-eof/74430

/var/lib/filebeat/registryを削除してから起動すれば良いようですね。
この環境は壊れても失うのは時間だけなので、やってみます。

# rm /var/lib/filebeat/registry
# service filebeat start
# cat /var/log/filebeat/filebeat

2017-12-27T08:14:08Z INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017-12-27T08:14:08Z INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017-12-27T08:14:08Z INFO Metrics logging every 30s
2017-12-27T08:14:08Z INFO Setup Beat: filebeat; Version: 6.1.1
2017-12-27T08:14:08Z INFO Elasticsearch url: http://192.100.0.4:9200
2017-12-27T08:14:08Z INFO Beat name: ip-192-100-0-36
2017-12-27T08:14:08Z INFO filebeat start running.
2017-12-27T08:14:08Z INFO No registry file found under: /var/lib/filebeat/registry. Creating a new registry file.
2017-12-27T08:14:08Z INFO Loading registrar data from /var/lib/filebeat/registry
2017-12-27T08:14:08Z INFO States Loaded from registrar: 0
2017-12-27T08:14:08Z INFO Loading Prospectors: 1
2017-12-27T08:14:08Z WARN DEPRECATED: input_type prospector config is deprecated. Use type instead. Will be removed in version: 6.0.0
2017-12-27T08:14:08Z INFO Starting Registrar
2017-12-27T08:14:08Z INFO Starting prospector of type: log; ID: 5240556406633074861
2017-12-27T08:14:08Z INFO Loading and starting Prospectors completed. Enabled prospectors: 1
2017-12-27T08:14:08Z INFO Harvester started for file: /var/log/secure
2017-12-27T08:14:09Z INFO Connected to Elasticsearch version 6.1.1
2017-12-27T08:14:09Z INFO Loading template for Elasticsearch version: 6.1.1
2017-12-27T08:14:09Z INFO Elasticsearch template with name 'filebeat-6.1.1' loaded

criticalなエラーは解決したようです。

しかしながら新たなエラーが。

2017-12-27T09:24:40Z ERR  Failed to publish events: temporary bulk send failure

そういえば、WARNもありますね。まずこれが気になるのでfilebeat.reference.ymlを見ながら修正してみました。

修正したfilebeat.yml

filebeat.modules:
- module: kafka
  log:
    enabled: true
filebeat.prospectors:
- type: log
  enabled: false
  paths:
    - /var/log/secure.log
output.elasticsearch:
  hosts: ["192.100.0.4:9200"]
setup.template.settings:
setup.kibana:
logging.to_files: true
logging.files:

filebeatを再起動することでWARNは消えました。

# cat /var/log/filebeat/filebeat
2017-12-27T09:49:12Z INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017-12-27T09:49:12Z INFO Metrics logging every 30s
2017-12-27T09:49:12Z INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017-12-27T09:49:12Z INFO Setup Beat: filebeat; Version: 6.1.1
2017-12-27T09:49:12Z INFO Elasticsearch url: http://192.100.0.4:9200
2017-12-27T09:49:12Z INFO Beat name: ip-192-100-0-36
2017-12-27T09:49:12Z INFO Enabled modules/filesets: kafka (log),  ()
2017-12-27T09:49:12Z INFO filebeat start running.
2017-12-27T09:49:12Z INFO Registry file set to: /var/lib/filebeat/registry
2017-12-27T09:49:12Z INFO Loading registrar data from /var/lib/filebeat/registry
2017-12-27T09:49:12Z INFO States Loaded from registrar: 1
2017-12-27T09:49:12Z INFO Loading Prospectors: 2
2017-12-27T09:49:12Z INFO Starting Registrar
2017-12-27T09:49:12Z INFO Starting prospector of type: log; ID: 15188226147135990593
2017-12-27T09:49:12Z INFO Loading and starting Prospectors completed. Enabled prospectors: 1

ERRが出力されるのかしばらく待ちます。

ERRも出なくなってました。しかし肝心のデータがkibanaで表示されない・・・

 おわりに

今回はこれで終わりにしたいと思います。
また来年続きやります・・・

こんなんばっかりや・・

続きを読む

ALBのアクセスログに対してElasticStackのMachine Learningを試してみた

はじめに

Advent Calendar25日目ということでクリスマスですね!:santa:
でも、いつもと変わらないですね!

ということで、
今回は、異常検知してくれる?ElasticStackのX-Packの中のMachine Learningについて試してみましたので書きたいと思います( ゚Д゚)ゞビシッ1

こんな流れで話すよ

  1. ログ取込フロー
  2. 環境について
  3. ALBのログを出力して、LogstashからElasticsearchにストア
  4. Machine Learning使うよ!

ログ取込フロー

AWSでWebサービスを提供している環境で、WebAPサーバの前段にはALBを配置してます。
以下の流れでログを取り込みます。

image.png

  1. ALBログを指定バケットに出力
  2. LogstashがS3からALBのログを収集
  3. LogstashがElasticsearchにストア

環境について

  • ALB
  • Amazon Linux AMI 2017.09.1 (HVM)
  • Logstash 6.0
  • logstash-input-s3
  • Elasticsearch 6.0
  • Kibana 6.0
  • X-Pack 6.0

ElasticStackは、事前にインストールしていることを前提にしてます。
インストールされていない環境は、以下を参考にして頂ければと思います。

ALBのログを出力して、LogstashからElasticsearchにストア

ALBのロギングを有効化については以下を参考にしてください。

ここからは、S3に格納されているALBログをLogstashで取得するための方法について書きます。

S3 input pluginをインストール

S3からログを取得するには、Logstashのプラグインである”S3 input plugin”をインストールする必要があります。

インストール方法を以下に書きます。

$ cd /usr/share/logstash
$ bin/logstash-plugin install logstash-input-s3

ALBのログを取り込むための準備

ALBのログは、以下の形式で出力されます。

### Log Format
type timestamp elb client:port target:port request_processing_time target_processing_time response_processing_time elb_status_code target_status_code received_bytes sent_bytes "request" "user_agent" ssl_cipher ssl_protocol target_group_arn trace_id

### Sample Log
http 2016-08-10T22:08:42.945958Z app/my-loadbalancer/50dc6c495c0c9188 
192.168.131.39:2817 10.0.0.1:80 0.000 0.001 0.000 200 200 34 366 
"GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - 
arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067
"Root=1-58337262-36d228ad5d99923122bbe354"

この出力されたログを取り込むために、Grok Patternを作成したり、型変換をLogstashで施す必要があります。

フィールド定義

それでは、ログフォーマットからフィールド名と型の定義を行います。
ALBが受け付けてからクライアントに返すまでの処理時間を表すフィールドをfloatにします。
対象は、”request_processing_time”、”target_processing_time”、”response_processing_time”です。

Log Field Type
type type string
timestamp date date
elb elb string
client:port client_port string
target:port target_port string
request_processing_time request_processing_time float
target_processing_time target_processing_time float
response_processing_time response_processing_time float
elb_status_code elb_status_code string
target_status_code target_status_code string
received_bytes received_bytes long
sent_bytes sent_bytes long
request ELB_REQUEST_LINE string
user_agent user_agent string
ssl_cipher ssl_cipher string
ssl_protocol ssl_protocol string
target_group_arn target_group_arn string
trace_id trace_id string

Grok Pattern

ALBのログに正規表現をかけて、構造化するため、Grokする必要があります。
Grokを直接Logstashのconfファイルに書くことも可能ですが、可読性を重視するため、パターンファイルとして外出しします。

以下が、ALBのログをよしなに取り込むためのGrok Patternファイルです。

alb_patterns
### Application Load Balancing
ALB_ACCESS_LOG %{NOTSPACE:type} %{TIMESTAMP_ISO8601:date} %{NOTSPACE:elb} (?:%{IP:client_ip}:%{INT:client_port}|-) (?:%{IP:backend_ip}:%{INT:backend_port}|-) (:?%{NUMBER:request_processing_time}|-1) (?:%{NUMBER:target_processing_time}|-1) (?:%{NUMBER:response_processing_time}|-1) (?:%{INT:elb_status_code}|-) (?:%{INT:backend_status_code}|-) %{INT:received_bytes} %{INT:sent_bytes} \"%{ELB_REQUEST_LINE}\" \"(?:%{DATA:user_agent}|-)\" (?:%{NOTSPACE:ssl_cipher}|-) (?:%{NOTSPACE:ssl_protocol}|-) %{NOTSPACE:target_group_arn} \"%{NOTSPACE:trace_id}\"

Multiple Pipelines

Logstashのconfファイルを呼び出すためにMultiple Pipelinesを使用します。
Pipeline.ymlの定義は、以下です。

pipelines.yml
- pipeline.id: alb
  pipeline.batch.size: 125
  path.config: "/etc/logstash/conf.d/alb.cfg"
  pipeline.workers: 1

Pipelineの設定については、以下を参考にして頂ければと思います。

Logstashのディレクトリ構成

ここまで準備しましたが、以下の様に各ファイルを配置します。

/etc/logstash
  ├ logstash.yml
  ├ conf.d
  │ └ alb.cfg
  ├ jvm.options
  ├ log4j2.properties
  ├ alb_patterns
  └ pipelines.yml
  • ALBのalb.cfgをconf.d配下に配置
  • ALBのGrok PatternファイルをLogstash配下に配置
  • alb.cfgを呼び出すpipeline.ymlをLogstash配下に配置

ALBのconfファイルを作成するよ

  • Input: S3からALBのログを取得する2

  • Filter: 取得したログに対してフィルタをかける

  • Output: Elasticsearchにストア

alb.cfg
input {
  s3 {
    region => "ap-northeast-1"
    bucket => "hoge"
    prefix => "hoge"
    ### S3へのポーリング間隔を指定
    interval => "60"
    sincedb_path => "/var/lib/logstash/sincedb_alb"
  }
}
filter {
  ### 読み込むGrok Patternファイルを"patterns_dir"で指定
  grok {
    patterns_dir => ["/etc/logstash/alb_patterns"]
    match => { "message" => "%{ALB_ACCESS_LOG}" }
  }
  date {
    match => [ "date", "ISO8601" ]
    timezone => "Asia/Tokyo"
    target => "@timestamp"
  }
  ### グローバルIPから国などをマッピングするために指定
  geoip {
    source => "client_ip"
  }
  ### デフォルトの型がstringのため、フィールド定義で定義した型に変換
  mutate {
    convert => [ "request_processing_time", "float" ]
    convert => [ "response_processing_time", "float" ]
    convert => [ "target_processing_time", "float" ]
    convert => [ "received_bytes", "int" ]
    convert => [ "sent_bytes", "int" ]
    ### 不要なフィールドを削除
    remove_field => [ "date", 'message' ]
  }
}
output {
  elasticsearch {
    hosts => [ "localhost:9200" ]
  }
}

ここまで整ったらLogstashを起動

$ initctl start logstash
logstash start/running, process 3121

Machine Learning使うよ!

KibanaでアクセスしてMachine Learingをクリックします。
新しくジョブを作成するので、”Create new job”をクリックします。

以下の画面で”Single metric”をクリックします。

image.png

“Aggregation”と”Filed”を指定します。
Filedは、”target_processing_time”にすることで、バックエンドのサーバへリクエストしてから返ってくるまでの時間をみます。
そうすることで、サーバ内の処理遅延が発生しているかを確認できます。
画面の右上から期間を指定します。

image.png

“Name”と”Description”を入力し、”Create job”をクリックします。

image.png

ジョブが実行され、結果を表示するため”View Results”をクリックします。

image.png

指定期間の中で普段より乖離している部分を赤やオレンジでマークされます。

image.png

今回は、Single metricでやりましたが、Multi metricを使うことで、
グローバルIPアドレスやUserAgentと合わせて検出することも可能です。

さいごに

いかがでしたか?ログを取り込んでジョブを作成するだけで簡単に実行できました。
今までは、人の目で判断していたところをMachine Learningを用いることで、普段の傾向から乖離している部分を自動で検出してくれるのは便利ですね。

Advent Calendar最終日ですが、これにておしまいです。
ありがとうございました!


  1. Machine Learningは、X-Packという有償プラグインですが、期間限定で試すことが可能です 

  2. S3バケットへのアクセスするためのIAM Roleが適用されていることを前提とする 

続きを読む

CloudWatchLogsAPIとfluent-plugin-s3を使ったLambdaログの保管と分析、可視化について

はじめに

これは、Sansan Advent Calendar 2017の24日目の記事です。

  • CloudWatchLogsはログを時系列で絞込検索がしにくいとか、見にくいし使いにくいツールである
  • 本番でLambdaの関数がxx 個以上動いており、障害発生時の調査にはCloudWatchLogsを頑張って使うしかない
  • apexでログを見る方法もないわけではないが、ずっとtailするのは難しい
  • CloudWatchLogsにずっと置いとくのもアレ
  • CloudWatchLogs早くいい感じになって欲しい

ということですでにfluentdなどで収集しているアプリケーションログやアクセスログと同様にS3に保管し、Elasticsearchに乗せてKibanaで検索、分析できるようにしたかったのです。CloudWatchLogsAPIとfluent-plugin-s3を使ってLambdaログの保管と分析、可視化をできるようにしました。

あきらめた案

fluent-plugin-cloudwatch-logsで収集しようともしていました。しかし、新たに生み出されるLogStreamの収集ができず、理由もよくわからず断念した。fluentdをpryで止めて結構デバッグしたものの、fluentd力もっとほしい…!となりました。
Lambdaのログは、CloudWatchLogsのLogGroup以下にLogStreamとしてつくられていくが、LogStreamに[$LATEST]というLambdaのバージョンプレフィックス的なやつが入るため、fluentdのbuffer_path設定と相性が悪かったです。(この仕様マジでやめてほしい。括弧とかドル記号とか入らないでほしい。)
また、50個以上のLogStreamsがある際に、このpluginではログを扱うArrayが予期せぬ入れ子構造になってしまっていたため修正しました。
どちらもマージされてはいるが、力不足によりこのプラグインでは収集を完遂できませんでした。

https://github.com/ryotarai/fluent-plugin-cloudwatch-logs/pull/80
https://github.com/ryotarai/fluent-plugin-cloudwatch-logs/pull/84

詳細な説明

ピタゴラ装置

Lambda

まずは、CloudWatchLogsのAPIを叩くためのLambdaをつくりました。Python3.6で実装しました。
このLambdaは、CloudWatchLogs.Client.create_export_taskを叩いています。
http://boto3.readthedocs.io/en/latest/reference/services/logs.html#CloudWatchLogs.Client.create_export_task

しかし、エクスポート対象にする期間の指定はコード上に置いておきたくありませんでした。(create_export_taskのfrom, toの指定)
そのため、前回実行時のタイムスタンプをS3上に置くようにし、これが無ければ現在からn時間分エクスポートし、あればfromにセットするという実装にしました。こうすることで、実行タイミングはCloudWatch EventsのRuleのみで与えられるようになります。

Lambdaの周辺

fluent-plugin-s3では、S3からログを取り込むことができます。S3イベント通知でSQSに流しています。あとはわりと普通です。

cloudwatchlogs-import-after.png

ちなみに

アレがこれであーなので、実はあと一歩のところで本番投入できていません。年内にはやっておきたいです。
現在の職場での特異的な話が絡むfluent-s3-plugin関連の設定についてなので、根底から覆るような話ではありません。もし同じような構成を考えている人がいたら安心してほしいです。

最後に

CloudWatchLogsが使いやすくなることや、Lambdaのログが自動的にS3へエクスポートし続ける設定がほしいです。LambdaのためにLambdaを作ることが減るといいなぁと思いますので、サンタさん(AWS)何卒よろしくお願いします。

続きを読む

Amazon Elasticsearch ServiceのKibanaへ、SSH port forwardingでトンネルを掘り、踏み台サーバ経由でアクセスする

TL;DR

たまにSSH port forwardingのやり方を聞かれるのでメモがてら。

3+1行で説明するとこんな感じ。
SSH port forwardingでトンネルを掘る

ssh -i ~/Path_to_SSH_key/id_xxxx {account_name}@{踏み台サーバIP or DNS name} -L 8888:{Kibana private IP}:80 -N

ブラウザで(トンネルを通ってKibanaへアクセスする

http://localhost:8888/_plugin/kibana

Kibanaへのアクセス

今のところKibanaへアクセスするニーズがあるのは開発者のみ。
開発者は踏み台サーバにアカウントを持っている。

というわけで ssh port forwardingでアクセスできれば良しとしているケースです。

手順

1. KibanaのPrivate IPを確認

※東京リージョンの場合
https://ap-northeast-1.console.aws.amazon.com/es/home
My domains > “Kibanaにアクセスしたいドメイン” > Overview

Screen Shot 2017-12-20 at 17.50.05.png

赤枠部分のドメインをメモし、IPを確認する

$ host {赤枠部分のドメイン}.es.amazonaws.com
{赤枠部分のドメイン}.es.amazonaws.com has address 172.xx.xx.xx
{赤枠部分のドメイン}.es.amazonaws.com has address 172.xx.xx.xx

2. SSH port forwardingでトンネルを掘る

ssh -i ~/Path_to_SSH_key/id_xxxx {account_name}@{踏み台サーバIP or DNS name} -L 8888:{Kibana private IP:Port}:80 -N


e.g)
ssh -i ~/.ssh/id_rsa nntsugu@ec2-xx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com -L 8888:xxx.xxx.xxx.xxx:80 -N

3. ブラウザで(トンネルを通ってKibanaへアクセスする

http://localhost:8888/_plugin/kibana

お手軽に検索

Kibana > Dev Tools > Console
からだとJSONでクエリを投げられるのでとっつきやすい。かもしれない。

GET _search
{
  "query": {
    "match": {
      "severity": "ERROR"
    }
  }
}

続きを読む

Meraki WiFi 位置情報の活用シーン

はじめに

以下2つの記事は Meraki Scanning API に関して、設定手順を中心にまとめています:

本記事はこれらの補足として、もう少しイメージが付くよう活用シーン付きで基本的な事柄も交えながら説明を試みます。

活用シーン

Meraki WiFi の活用シーンを小売店と想定します。Meraki WiFi はもともと次のような用途に向いています。

  • 店舗で WiFi を使えるようにしたい
  • 用途は業務用・お客様用の両方に使いたい
  • ビデオ視聴も可能な高速アクセスが必要
  • 店舗数は一店~数万店
  • 専任管理者がいないのでできるだけ簡単に運用したい
  • WiFi 位置情報を活用したい

Meraki WiFi の説明

そもそも Meraki WiFi とは?

Meraki WiFi はクラウド管理型 WiFi サービスです。

一般に、クラウドとは皆さんご存知の通り、データの保存や処理を手元の端末で行うのではなくインターネットを通じて「向こう側」に保存・処理する手法を指します。ところが、中には「向こう側」では不都合が出るサービスもあります。例えばリアルタイム処理が必要で遅延が問題となる場合には「こちら側」で処理できるよう機器を置くといいかもしれません。

WiFi も「向こう側」で全てを行うことはできません。WiFi 電波は仕様上 30メートルほどしか飛びませんので例えば東京タワーから送信しても「こちら側」の端末まで届きません。そこで Meraki アクセスポイント(AP) を「こちら側=店舗」に置くことになります。

AP がこちらにある以外は通常のクラウドサービスと同じ使い勝手で設定や監視ができるため、従来の WiFi AP を買ってきて個々に設定・設置する方法に比べると、特に多店舗では工数が圧倒的に少なくなるメリットがあります。

写真1:Meraki AP

image.png

クラウドへは Web ブラウザ経由でアクセスします。仮に複雑な機能を使ってもクラウド上に設定が残りますので、故障などで AP を交換した場合に一から再設定する必要はありません。また、数万 AP に対して一斉に設定変更を行うこともできます。

イメージ1:Meraki ダッシュボード(日本語対応済)
image.png

WiFi 端末の位置情報について(Meraki ダッシュボード)

スマホ等の WiFi 端末は使っていなくても(WiFi がオンになっていれば)プローブリクエストを定期的に送信します。送信間隔は機器の種類や状態によって異なります。

スマホの状態(iOS、アンドロイド等) プローブリクエストの間隔
スリープ(スクリーンオフ) 1分に1回程度
スリープ(スクリーンオン) 10-15回/分
APにアソシエート アプリケーションに大きく依存

WiFi 端末の送信するプローブリクエストを Meraki AP が受信すると、その時刻を SeenTime として記録します。また、プローブリクエストから端末の位置を算出する機能を持ちます。

WiFi 端末の識別には MACアドレスが使われますが、プライバシーの問題もあるため Meraki クラウドは一切の MAC アドレス情報を保存せず、そのハッシュ値のみを保存しています。

これらデータを利用して生成された端末位置情報ヒートマップを Meraki ダッシュボード(標準の管理インターフェイス)で見ることができます。

イメージ2:端末位置情報ヒートマップ(青はアソシエートした端末、灰色はアソシエートしていない端末、風船アイコンはAP)
image.png

また、データ解析レポートも Meraki ダッシュボードで見ることができます。

イメージ3:データ解析レポート
image.png

グラフは上から順に以下の意味を持っています:

  • 一つ目:近くを通っただけの人(Passersby)、訪問した人(Visitors)、接続して使った人(Connected)の割合
  • 二つ目:滞在時間毎の割合
  • 三つ目:訪問頻度の割合

データ解析レポートでは次のような比較が可能です。マーケティング的な分析に使えそうです。

  • 一店舗内のデータを異なる期間で比較(今週と先週)
  • 複数店舗間のデータを同一期間で比較(A店とB店、A店と全店の平均、A店とA~D店の平均、A~D店の平均と全店の平均)

以上は標準の Meraki ダッシュボードで利用可能です。

Scanning API の活用方法

端末位置情報を保持するには?

Meraki クラウドは上述のレポート提供を目的として WiFi 端末の位置情報を利用していますが、「個々の端末の時系列の位置情報」を保存している訳ではありません。保存したいと思った場合には Meraki クラウドから Scanning API を経由して取得可能です。ただし、保存用サーバを準備する必要があります。

サーバの準備が面倒であれば、例えば AWS の API Gateway/Lambda/DynamoDB を組み合わせて使うことでサーバレスにて比較的簡単に実現できます。手順はこちらの記事をご参照下さい(データ量は多くなり得るのでご注意下さい)。

DynamoDB は NoSQL データベースで可視化はあまり得意ではありません。可視化には同じく AWS の Elasticsearch service/kibana が利用できます。手順はこちらの記事をご参照ください。

端末位置情報から来店の有無を確認する

多くの小売店ではポイントカードを導入しており、お客様毎の購買履歴をデータとして蓄積しているかもしれません。ポイントカードは基本的に購入時のみ使うので、お客様が何も購入せずに店舗を去った場合には履歴が残りません。

それに対し、WiFi 位置情報はお客様が来店すれば(WiFi がオンになっている端末を持っている前提)データが残るため、これまでに扱えなかったデータの分析ができるようになる可能性があります。例えば、来店に対する購入頻度や、商品入替え・欠品による機会損失のより正確な額が求められるかもしれません。

イメージ4:既存お客様購買履歴とWiFi位置情報をマッピングした例。Meraki が提供する機能ではなくあくまでアイデアです。
image.png

※ イメージ4を実現するには、MACアドレスと個人情報の紐づけを何らかの方法で行う必要もあります

もっとも、お客様にとって来店記録が残ること自体気持ち良いものではなく、個人情報保護法を遵守しながらプライバシーを慎重に扱う必要があります(オプトイン・オプトアウト、保存データのハッシュ化等)。お客様にとって不利益が無く、新しい顧客体験を提供できることが何より重要かもしれません。技術の領域を越えますのでこの議論はここまでにしておきたいと思います。

最後に、MAC アドレス毎の SeenTime (AP が端末のプローブリクエストを受信した時刻)を可視化したグラフを作ってみました。Meraki の位置情報にご興味持たれた方は、データ活用に使えそうか眺めて頂けると幸いです。

DynamoDB 上のデータを CSV でファイルへ出力し、matplotlib で読み込んで描画しました。

イメージ5:MACアドレス毎の SeenTime (MACアドレスは左側をちょん切ってます)
image.png

イメージ5のコード
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pylab
from datetime import datetime as dt

# グラフ表示する MACアドレスの数を指定
cmac_num4graph = 50

# DynamoDB から取得した CSV ファイルの読込み
df = pd.read_csv("cmxdata.csv", parse_dates = ['seenString (S)'])

# MACアドレスの一覧作成
cmac_index = set(df['clientMac (S)'])

# MACアドレスの一覧作成(グラフ表示用)
cmac_index = list(cmac_index)[:cmac_num4graph]

# CSV ファイルから 'clientMac (S)' と 'seenString (S)' のみ抜き出したデータフレーム StringMac を作成
StringMac = pd.concat([df['clientMac (S)'], df['seenString (S)']], axis = 1)

# グラフ描画用のデータフレーム result_df を作成
result_df = pd.DataFrame()
for mac in cmac_index:
    seenString = StringMac[StringMac['clientMac (S)'] == mac]['seenString (S)']
    seenString = pd.Series(seenString, name = mac)
    df_seenString = seenString.to_frame()
    result_df = pd.concat([result_df, seenString], axis = 1)

# グラフを描画
pylab.figure(figsize=(15, 8))
xmin = dt(2017,12,17,7,22,0)
xmax = dt(2017,12,17,7,33,0)
for (y, mac) in enumerate(cmac_index):
    m = result_df[mac].dropna()
    y = np.linspace(y, y, len(m))
    result_df[mac].dropna()
    plt.scatter(m.tolist(), y)

plt.grid(True)
plt.xlim(xmin,xmax)
plt.yticks(range(0, cmac_num4graph, 1), cmac_index, fontsize = 8)
plt.show()

続きを読む

Kubernetes上のアプリケーションログを自動収集する

image.png

TL;DR;

新サービスや既存サービスをKubernetesに移行するたびに、ログの収集設定のためインフラエンジニア待ちになってしまうのは面倒ですよね。
そこで、アプリのログをFluentdとDatadog LogsやStackdriver Loggingで自動的に収集する方法を紹介します。

主に以下のOSSを利用します。

今回はDatadog Logsを使いますが、Stackdriver Loggingを使う場合でもUIやAPIクレデンシャル等の設定以外は同じです。

お急ぎの方へ: アプリ側の設定手順

標準出力・標準エラーログを出力するだけでOKです。

参考: The Twelve-Factor App (日本語訳)

詳しくは、この記事の「サンプルアプリからログを出力する」以降を読んでください。

あとはクラスタ側に用意しておいたFluentdの仕事ですが、Kubernetesがノード上に特定のフォーマットで保存するため、アプリ毎の特別な設定は不要です。

まえおき1: なぜDatadogやStackdriver Loggingなのか

分散ロギングのインフラを準備・運用するのがつらい

分散ロギングと一口にいっても、実現したいことは様々です。例えば、多数のサービス、サーバ、プロセス、コンテナから出力されたログを

  1. 分析などの用途で使いやすいようにETLしてRedshiftのようなデータウェアハウスに投入しておきたい
  2. S3などのオブジェクトストレージに低コストでアーカイブしたい
  3. ほぼリアルタイムでストリーミングしたり、絞込検索したい
    • Web UI、CLIなど

1.はtd-agent + TreasureData or BigQuery、2.はfluentd (+ Kinesis Streams) + S3、3.はfilebeat or Logstash + Elasticsearch + Kibana、Graylog2、専用のSaaSなど、ざっとあげられるだけでも多数の選択肢があります。

方法はともかく、できるだけ運用保守の手間を省いて、コアな開発に集中したいですよね。

メトリクス、トレース、ログを一つのサービスで一元管理したい・運用工数を節約したい

「Kubernetesにデプロイしたアプリケーションのメトリクスを自動収集する – Qiita」でも書きましたが、例えばKubernetesの分散ロギング、分散トレーシング、モニタリングをOSSで実現すると以下のような構成が定番だと思います。

  • 基本的なグラフ作成とメトリクス収集、アラート設定はPrometheus
  • 分散ログはEKF(Elasticsearch + Kibana Fluentd)
  • 分散トレースはZipkinやJaeger

もちろん、ソフトウェアライセンス費用・サポート費用、将来の拡張性などの意味では良い判断だと思います。

一方で、

  • アラートを受けたときに、その原因調査のために3つもサービスを行ったり来たりするのは面倒
  • 人が少ない場合に、セルフホストしてるサービスの運用保守に手間をかけたくない
    • アカウント管理を個別にやるだけでも面倒・・最低限、SSO対応してる?

などの理由で

  • 個別のシステムではなく3つの役割を兼ねられる単一のシステム

がほしいと思うことがあると思います。

fluentd + Datadog Logs/Stackdriver Logging

StackdriverとDatadogはSaaSで、かつ(それぞれサブサービスで、サブサービス間連携の度合いはそれぞれではありますが、)3つの役割を兼ねられます。

SaaSへログを転送する目的でfluentdを利用しますが、Kubernetesのログを収集するエージェントとしてfluentdがよく使われている関係で、Kubernetes界隈でよく使われるfluentdプラグインに関しては、よくある「メンテされていない、forkしないと動かない」という問題に遭遇しづらいというのも利点です。

まえおき2: なぜDatadogなのか

もともとStackdriver Loggingを利用していたのですが、以下の理由で乗り換えたので、この記事ではDatadog Logsの例を紹介することにします。

  • メトリクスやAPMで既にDatadogを採用していた
  • UI面で使いやすさを感じた

UI面に関して今のところ感じている使いやすさは以下の2点です。

  • ログメッセージの検索ボックスでメタデータの補完が可能

    • hostで絞込をしようとすると、hostの値が補完される
    • あとで説明します
  • 柔軟なFaceting
    • Datadog LogsもStackdriver Loggingもログにメタデータを付与できるが、Datadog Logsは任意のメタデータキーで絞り込むためのショートカットを簡単に追加できる
    • あとで説明します

Stackdriver Loggingを利用する場合でも、この記事で紹介する手順はほぼ同じです。Kubernetesの分散ロギングをSaaSで実現したい場合は、せっかくなので両方試してみることをおすすめします。

Fluentdのセットアップ手順

DatadogのAPIキー取得

Datadog > Integratinos > APIsの「New API Key」から作成できます。

image.png

以下では、ここで取得したAPIキーをDD_API_KEYという環境変数に入れた前提で説明を続けます。

fluentdのインストール

今回はkube-fluentdを使います。

$ git clone git@github.com:mumoshu/kube-fluentd.git
$ cd kube-fluentd

# 取得したAPIキーをsecretに入れる
$ kubectl create secret generic datadog --from-literal=api-key=$DD_API_KEY

# FluentdにK8Sへのアクセス権を与えるためのRBAC関連のリソース(RoleやBinding)を作成
$ kubectl create -f fluentd.rbac.yaml

# 上記で作成したsecretとRBAC関連リソースを利用するfluentd daemonsetの作成
$ kubectl create -f fluentd.datadog.daemonset.yaml

設定内容の説明

今回デプロイするfluentdのmanifestを上から順に読んでみましょう。

kind: DaemonSet

Kubernetes上のアプリケーションログ(=Podの標準出力・標準エラー)は各ノードの/var/log/containers以下(より正確には、そこからsymlinkされているファイル)に出力されます。それをfluentdで集約しようとすると、必然的に各ノードにいるfluentdがそのディレクトリ以下のログファイルをtailする構成になります。fluentdに限らず、何らかのコンテナをデプロイしたいとき、KubernetesではPodをつくります。Podを各ノードに一つずつPodをスケジュールするためにはDaemonSetを使います。

  updateStrategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 1

DaemonSetをアップデートするとき(例えばDockerイメージを最新版にするためにタグの指定を変える)、Podを1つずつローリングアップデートします。アップデートによってfluentdが動かなくなった場合の影響を抑えることが目的ですが、気にしない場合はこの設定は記述は不要です。

serviceAccountName: fluentd-cloud-logging

kubectl -f fluent.rbac.yamlで作成されたサービスアカウントを利用する設定です。これがないとデフォルトのサービスアカウントが使われてしまいますが、ほとんどのツールでつくられたKubernetesクラスタではデフォルトのサービスアカウントに与えられる権限が絞られているので、デフォルトのサービスアカウントではkube-fluentdが動作しない可能性があります。

      tolerations:
      - operator: Exists
        effect: NoSchedule
      - operator: Exists
        effect: NoExecute
      - operator: Exists

KubernetesのMasterノードや、その他taintが付与された特定のワークロード専用のWorkerノード含めて、すべてのノードにfluentd podをスケジュールための記述です。何らかの理由でfluentdを動作させたくないノードがある場合は、tolerationをもう少し絞り込む必要があります。

env:
        - name: DD_API_KEY
          valueFrom:
            secretKeyRef:
              name: datadog
              key: api-key
        - name: DD_TAGS
          value: |
            ["env:test", "kube_cluster:k8s1"]

一つめは、secretに保存したDatadog APIキーを環境変数DD_API_KEYにセットする、二つめはfluentdが収集したすべてのログに二つのタグをつける、という設定です。タグはDatadogの他のサブサービスでもよく見られる形式で、”key:value”形式になっています。

Datadogタグのenvは、Datadogで環境名を表すために慣習的に利用されています。もしDatadogでメトリクスやトレースを既に収集していて、それにenvタグをつけているのであれば、それと同じような環境名をログにも付与するとよいでしょう。

kube_clusterは個人的におすすめしたいタグです。Kubernetesクラスタは複数同時に運用する可能性があります。このタグがあると、メトリクスやトレース、ログをクラスタ毎に絞り込むことができ、何か障害が発生したときにその原因が特定のクラスタだけで起きているのかどうか切り分ける、などの用途で役立ちます。

        ports:
        - containerPort: 24231
          name: prometheus-metrics

「Kubernetesにデプロイしたアプリケーションのメトリクスを自動収集する」で紹介した方法でdd-agentにfluentdのPrometheusメトリクスをスクレイプさせるために必要なポートです。

サンプルアプリからログを出力する

適当なPodを作成して、testmessage1というメッセージを出力します。

$ kubectl run -it --image ruby:2.4.2-slim-stretch distlogtest-$(date +%s) -- ruby -e 'puts %q| mtestmessage1|; sleep 60'

ログの確認

何度か同じコマンドを実行したうえで、DatadogのLog Explorerでtestmessage1を検索してみると、以下のようにログエントリがヒットします。

image.png

ログエントリを一つクリックして詳細を開いてみると、testmessage1というログメッセージの他に、それに付随する様々なメタデータが確認できます。

image.png

ログエントリに自動付与されたメタデータの確認

  • HOST: ログを出力したPodがスケジュールされているホスト名(=EC2インスタンスのインスタンスID)
  • SOURCE: コンテナ名
  • TAGS: Datadogタグ
    • pod_name: Pod名
    • kube_replicaset: ReplicaSet名
    • container_name: Dockerコンテナ名
    • kube_namespace: PodがスケジュールされているNamespace名
    • host: PodがスケジュールされているKubernetesノードのEC2インスタンスID
    • zone: Availability Zone
    • aws_account_id: AWSアカウントID
    • env: 環境名

Log Explorerを使うと、すべてのAWSアカウントのすべてのKubernetesクラスタ上のすべてのPodからのログが一つのタイムラインで見られます。それを上記のようなメタデータを使って絞り込むことができます。

ログエントリの絞り込み

ログエントリの詳細から特定のタグを選択すると、「Filter by」というメニュー項目が見つかります。

image.png

これを選択すると、検索ボックスに選択したタグがkey:value形式で入力された状態になり、そのタグが付与されたログエントリだけが絞り込まれます。

もちろん、検索ボックスに直接フリーワードを入力したり、key:value形式でタグを入力してもOKです。

Facetingを試す

定型的な絞り込み条件がある場合は、Facetを作成すると便利です。

ログエントリの詳細から特定のタグを選択すると、「Create new facet」というメニュー項目が見つかります。

image.png

これを選択すると、以下のようにどのような階層のどのような名前のFacetにするかを入力できます。

image.png

例えば、

  • Path: kube_namespace
  • Name: Namespace
  • Group: Kubernetes

のようなFacetを作成すると、ログエントリに付与されたkube_namespaceというタグキーとペアになったことがある値を集約して、検索条件のショートカットをつくってくれます。実際のNamespace Facetは以下のように見えます。

image.png

kube-system、mumoshu、istio-system、defaultなどが表示されていますが、それぞれkube_namespaceというタグキーとペアになったことがある値(=クラスタに実在するNamespace名)です。また、その右の数値はそのNamespaceから転送されたログエントリの件数です。この状態で例えばistio-systemを選択すると、kube_namespace:istio-systemというタグが付与されたログエントリだけを絞り込んでみることができます。

image.png

アーカイブ、ETLパイプラインへの転送など

kube-fluentdにはアーカイブやETLパイプラインのサポートは今のところないので、必要に応じてはfluentd.confテンプレート変更して、それを含むDockerイメージをビルドしなおす必要があります。

fluentd.confテンプレートは以下の場所にあります。

https://github.com/mumoshu/kube-fluentd/blob/master/rootfs/etc/confd/templates/fluent.conf.tmpl

fluentd.confテンプレートから参照できる環境変数を追加したい場合は、以下のconfd設定ファイルを変更します。

https://github.com/mumoshu/kube-fluentd/blob/master/rootfs/etc/confd/conf.d/fluent.conf.toml

// 今後、configmap内に保存したfluent.confの断片をfluentdの@includeを使ってマージしてくれるような機能を追加してもよいかもしれませんね。

まとめ

FluentdとDatadog Logsを使って、Kubernetes上のアプリケーションログを自動的に収集し、Datadog LogsのWeb UIからドリルダウンできるようにしました。

アプリ側はTwelve-Factor Appに則って標準出力・標準エラーにログを出力するだけでよい、という簡単さです。ドリルダウンしたり、そのためのFacetを作成するときも、グラフィカルな操作で完結できます。

また、ログの収集をするためだけにいちいちインフラエンジニアが呼び出されることもなくなって、楽になりますね!

Kubernetes上のアプリケーションの分散ロギングを自動化したい方は、ぜひ試してみてください。

(おまけ) 課題: ログメッセージに含まれるメタデータの抽出

Stackdriver Loggingではできて、Datadog Logsでは今のところできないことに、ログメッセージに含まれるメタデータの抽出があります。

例えば、Stackdriver Loggingの場合、

  • ログにメタデータを付与して検索対象としたい

    • 例えば「ログレベルDEBUGでHello World」のようなログを集約して、Web UIなどから「DEBUGレベルのログだけを絞り込みたい」

というような場合、アプリからは1行1 jsonオブジェクト形式で標準出力に流しておいて、fluent-plugin-google-cloud outputプラグイン(kube-fluentd内で利用しているプラグイン)でStackdriver Loggingに送ると、jsonオブジェクトをパースして、検索可能にしてくれます。

例えば、

{"message":"Hello World", "log_level":"info"}

のようなログをStackdriver Loggingにおくると、log_levelで検索可能になる、ということです。

このユースケースに対応する必要がある場合は、いまのところDatadog LogsではなくStackdriver Loggingを採用するとよいと思います。

今後の展望

同じくkube-fluentdでDatadog Logsへログを転送するために利用しているfluent-plugin-datadog-logに、fluent-plugin-google-cloudと同様にJSON形式のログをパースしてDatadogのタグに変換する機能を追加することはできるかもしれません。

また、Datadog Logsには、ログエントリのメッセージ部分に特定のミドルウェアの標準的な形式のログ(例えばnginxのアクセスログ)が含まれる場合に、それをよしなにパースしてくれる機能があります。その場合にログエントリに付与されるメタデータは、タグではなくアトリビュートというものになります。アトリビュートはタグ同様に検索条件に利用することができます。

ただ、いまのところfluent-plugin-datadog-logからの出力はすべてsyslog扱いになってしまっており、ログの内容によらず以下のようなアトリビュートが付与されてしまっています。

image.png

JSONをパースした結果がこのアトリビュートに反映されるような実装が可能であれば、それが最適なように思えます。

続きを読む

HoneypotのDionaeaでマルウェアを収集しちゃって、APIでスキャンして、結果をビジュアライズしちゃうぞ

はじめに

こんばんはー
Advent Calendar11日目ですね!
そして、間違えて同じ日に二つもエントリーしてしまいましたd(゚∀゚d)ォゥィェ!!

ちなみに、もう一つのQiita記事は、以下なので興味ある人は見て頂ければと思います。

絶対的に使った方がいいLogstashのMultiple Pipelinesについて書いてみた

今年は、ハニーポッターになるべく、いろいろ試してたのですが、Qiita記事を一切書いてませんでした。。
書こう書こうと思ってたら、今年ももう終わり。。
でも、最後の追い上げ!ということで、しっかりと足跡を残したいと思いますー

てことで、今回は、Dionaeaというハニーポットツールを使ってマルウェアを収集して、スキャンとか可視化までをゆるーく書いていきます。
流れはこんな感じです!

  1. 環境について
  2. 全体構成について
  3. Dionaeaのインストール
  4. VirusTotalのAPIを利用するよ!
  5. Logstashで取り込むよ
  6. Kibanaで見てみる

環境について

すべてAWSで構築してます。

  • Dionaea Server

    • Ubuntu Server 14.04 LTS
    • Dionaea
    • Python 2.7.6
  • ElastciStack Server

    • Logstash 6.0
    • logstash-input-s3
    • Elasticsearch 6.0
    • Kibana 6.0

全体構成について

ざっくりと構成について説明しますー

test.png

Dionaeaを配置し、VirusTotalにAPIコールし、マルウェアかどうかをスキャンします。
スキャン結果のjsonファイルをS3に格納し、Logstashがデータを取得します。
Logstashが取得したデータをElasticsearchにストアし、Kibanaがビジュアライズします。
といった感じの構成です!

Dionaeaのインストール

Dionaeaのインストールは、以下の手順でインストール完了です。
公式の手順に則るため、Ubuntuのバージョンは14.04にしてます。

参考:Install Dionaea

$ sudo apt-get update
$ sudo apt-get dist-upgrade
$ sudo add-apt-repository ppa:honeynet/nightly
$ sudo apt-get update
$ sudo apt-get install dionaea
$ sysv-rc-conf dionaea on
$ sysv-rc-conf --list | grep dionaea
$ service dionaea start

Dionaeaについて

Dionaeaは、低対話型のハニーポットのため、危険度は低いです。
ただ、一般的なサーバのセキュリティ対応は実施していることを前提にしてます。
細心の注意を払って対応してくださいね!

Dionaeaが、アタックされるように対象ポートをセキュリティグループでフルオープンにします。

Port Service Source
21/TCP ftp 0.0.0.0/0
42/TCP nameserver 0.0.0.0/0
80/TCP http 0.0.0.0/0
135/TCP msrpc 0.0.0.0/0
443/TCP https 0.0.0.0/0
445/TCP microsoft-ds 0.0.0.0/0
1433/TCP mssql 0.0.0.0/0
3306/TCP mysql 0.0.0.0/0
5060/TCP sip 0.0.0.0/0
5061/TCP sip-tls 0.0.0.0/0

Dionaeaのディレクトリについて

  • /opt/dionaea/var/dionaea/binaries/: マルウェアが配置されるディレクトリ
  • /opt/dionaea/log/: dionaea自身のログを保存
  • /opt/dionaea/wwwroot/: Webのドキュメントルート

VirusTotalのAPIを利用するよ!

VirusTotalは、マルウェアなんじゃないか?!とか、怪しいURLなのでは?!といった時に使用するマルウェアチェックサービスです。
VirusTotalは、マルウェア検出するために50種類以上のエンジンで検査ができます。
しかも、APIも公開されており、スキャン結果を容易に取得できます。
そして、無料!

今回は、VirusTotalのAPIを利用して、Dionaeaに仕込まれたマルウェアをスキャンします。
APIを利用するには、API Keyの取得が必要なため、以下のサイトからアカウント登録を実施してAPI Keyを取得します。

アカウント登録:VirusTotal

注意点ですが、APIは、1分間に4回までしかリクエストできません。
以下のドキュメントを参考にしてください。

参考:Document VirusTotal

あと、森久さんのサイトも非常に参考になります!
VirusTotalだけの話ではなく、ハニーポットに興味ある方は、読んだ方がいいです!

参考:www.morihi-soc.net

APIコール用のスクリプト

Dionaeaのbinariesに仕込まれたマルウェアに対してスキャンを実行するスクリプトです。

#!/usr/bin/env python
import sys
import json
import urllib
import urllib2
import os
import time

files = os.listdir('/opt/dionaea/var/dionaea/binaries/')

for hash in files:
  print hash

  url = "https://www.virustotal.com/vtapi/v2/file/report"
  # API Keyにはアカウント登録した際に取得したKeyを入力する
  params = {"resource": hash, "apikey": "API Key"}

  data = urllib.urlencode(params)
  request = urllib2.Request(url, data)
  response = urllib2.urlopen(request)
  json = response.read()

  with open("{}.json".format(hash), "w") as result:
    result.write(json)

  print "..processing"
  # 1分間に4回までのAPIコール制限があるためスリープを設ける
  time.sleep(20)

結果をjsonファイルで出力します。
出力したファイルをS3マウントした箇所に配置することで、S3にアップロードされます。

Logstashで取り込むよ

ElasticStackはすでにインストール済みを前提にしてます。
また、S3にアクセスするため、Logstashのプラグインの”logstash-input-s3″もインストールしていることとします。
もしインストールされていない場合は、対応お願いします。

Logstash.conf

Logstashのconfですが、データソースがjsonのため、フィルタに書くのはjsonだけです。
めっちゃ楽ですね!
これでElasticsearchにストアされます。

logstash.conf
input {
  s3 {
    tags => "dionaea"
    bucket => "hoge_backet"
    region => "ap-northeast-1"
    prefix => "hoge/"
    interval => "30"
    sincedb_path => "/var/lib/logstash/sincedb_dionaea"
    codec => json
  }
}
filter {
  json {
    source => "message"
  }
}
output {
  elasticsearch {
    hosts => [ "localhost:9200" ]
    index => "vt-logs-%{+YYYYMMdd}"
  }
}

Kibanaで見てみる

それでは、ElasticsearchにストアしたデータをKibanaで確認してみましょう!

FireShot Capture 50 - Kibana_ - http___54.211.11.251_5601_app_kibana#_discover.png

ちゃんとビジュアライズできてますね!

最後に

こんな感じでマルウェアを収集し、スキャン結果をビジュアライズできちゃうのです!
それでは、楽しいお時間をお過ごしくださいーヽ(*゚д゚)ノ

ではでは、明日は、”GoogleHome”についてですね!
めっちゃ楽しみだー

続きを読む