Amazon Elasticsearch Service の Kibana にログイン機能を追加できるようになりました | Developers.IO

Amazon Elasticsearch Service は Elasticsearch、Kibana の AWSマネージドサービスです。クラスタ・ノードを AWS が運用・管理してくれ、数クリックで Elasticsearch、Kibana のクラスタをプロビジョニングでき、ノードで障害が発生すればユーザーは何もしなくても復旧します。 今まで Amazon Elasticsearch Service の … 続きを読む

センサデータを fluentd 経由で Amazon Elasticsearch Service に送信して可視化

1. はじめに

以前の記事 で、RaspberryPi で収集したセンサデータを、 さくらVPS上に構築した Elasticsearch に送信して、Kibana で可視化しました。
今回は勉強を兼ねて、データを Amazon Elasticsearch Service ( Amazon ES ) に送信するように構成を変更します。

2. 全体の構成

image.png

3. 設定

3-1. Server side ( Amazon ES )

Amazon ES を立ち上げます。

Amazon ES ダッシュボード

  • 画面右上で、東京リージョン( ap-northeast-1 )が選択されていることを確認します
  • 新しいドメインの作成ボタンを押します
image

ドメインの定義

  • ドメイン名と Elasticsearch のバージョンを設定します
image

クラスターの設定

  • 今回は最小構成にします
image

アクセスの設定

  • ダッシュボードは特定メンバーに公開したかったので、パブリックアクセスとして、IPアドレスでアクセス制限を設定しました
  • 本当は IAM Role で制限したかったのですが、Webブラウザからのアクセスが面倒になるので今回は見送りました ( ブラウザはIAM認証できない )
image

完了確認

  • 10分ほど待ちます
image
  • 設定の状態が「アクティブ」になれば完了です
image

3-2. Sensor side ( Raspberry PI )

前提条件

以前の記事 の状態が前提です。今回はこれに変更を加えます。

プラグインのインストール

  • fluentd から Elasticsearch に直接格納するためのプラグインをインストールします
  • なお、IAM 認証をする場合は fluent-plugin-aws-elasticsearch-service を使うようです
sudo fluent-gem install fluent-plugin-elasticsearch

fluentd の設定

  • fluentd の設定ファイルを編集して、データの送信先を変更して、fluentd を再起動します
/home/pi/fluent/fluent.conf
<source>
  @type tail
  format json
  path /home/pi/myroom.log
  pos_file /home/pi/myroom.log.pos
  tag log.myroom
</source>

<match log.myroom>
  @type copy
  <store>
    @type elasticsearch
    type_name myroom
    logstash_format true
    logstash_prefix myroom
    reload_connections false
    hosts https://search-myroom-********.ap-northeast-1.es.amazonaws.com
  </store>
</match>

4. 確認

データが送信されていることを確認しました。
image.png

続きを読む

AWSでElastic Stack – 前回の続き Kibana Filebeatのアップグレード

はじめに

前回、KibanaとFilebeatもアップグレードするような記事を書いたのですが、途中で力尽きてElasticsearchのアップグレードで終わってしまったので、短くなりますが、KibanaとFilebeatもアップグレードしたいと思います。
前回の記事(前回中途半端で申し訳ないですが)の構成を参照して頂ければと思います。

AWSの小ネタ

本題とは関係ありませんが、一応AWSタグを付けているので、AWSで悩んだ話について。

IAMロールで起動したEC2インスタンスのプロキシ環境下にて、プロキシサーバのログに以下のログが大量に出力されていました。

TCP_MISS/404 609 GET http://169.254.169.254/latest/meta-data/network/interfaces/macs/xx:xx:xx:xx:xx:xx/local-ipv4s - HIER_DIRECT/169.254.169.254 text/html

それでxxになっているMACアドレスを持つサーバを調べると以下のログが連続して大量に出力されていました。

ec2net: [get_meta] Trying to get http://169.254.169.254/latest/meta-data/network/interfaces/macs/xx:xx:xx:xx:xx:xx/local-ipv4s

恐らくec2がmetadataを取りにいくのですが、プロキシサーバを経由してしまうせいで、
ご本人じゃありませんよとAWSのmetadataサーバに拒否されているんでしょうと推測しました。

プロキシ環境下では169.254.169.254はプロキシを経由しないようにNO_PROXYの設定を設定します。
http://docs.aws.amazon.com/ja_jp/cli/latest/userguide/cli-http-proxy.html

自分もこの設定を入れていましたので、設定が有効になっていないかと悩まされることになりました。そこで一旦、exportしているプロキシの設定を削除しましたが、
やっぱりプロキシサーバを経由することを止められませんでした。

そこで他のプロキシを利用する設定を考えたところ、そういえば最初は全てのアクセスをプロキシ経由にするつもりが無くて、yumやwget,curlくらいしかInternetへのアクセスをしないので、それぞれのコンフィグにプロキシの設定を個別に書いていたなと思い至りました。

それで結局当たったのは、curlの.curlrcのプロキシの設定でした。
ここにはNO_PROXYの設定は書いていませんでした。
ここの設定が有効でプロキシサーバ経由でアクセスしているとは・・
ec2がmetadataを取得する際にはcurlで取りにいっているってことですかね?(分かってない)

1.アップグレード作業

では前回やり残したKibanaとFilebeatのアップグレード作業を実施したいと思います。

1.1.事前準備

公式のドキュメントを参考にしながら進めていきます。
Kibana
https://www.elastic.co/guide/en/kibana/current/rpm.html
Filebeat
https://www.elastic.co/guide/en/beats/filebeat/current/setup-repositories.html

あれ・・・前回見た時は6.0だったのに、どんどん更新されていきますね。

1.1.1.GPGキーのインストール

KibanaとFilebeatをインストールしているそれぞれのサーバにて実施します。

Kibanaをインストールしたサーバ(srv1)

# rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

Filebeatをインストールしたサーバ(srv4)

# rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch

1.1.2.リポジトリの修正

6.0系のリポジトリを用意します。

Kibana

# vi /etc/yum.repos.d/kibana.repo
[kibana-6.x]
name=Kibana repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

Filebeat

# vi /etc/yum.repos.d/beats.repo
[elastic-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

1.2.アップグレード

Kibana

# yum update kibana
Loaded plugins: priorities, update-motd, upgrade-helper
42 packages excluded due to repository priority protections
Resolving Dependencies
--> Running transaction check
---> Package kibana.x86_64 0:5.6.2-1 will be updated
---> Package kibana.x86_64 0:6.1.1-1 will be an update
--> Finished Dependency Resolution

Dependencies Resolved

==============================================================================================================================
 Package                     Arch                        Version                        Repository                       Size
==============================================================================================================================
Updating:
 kibana                      x86_64                      6.1.1-1                        kibana-6.x                       63 M

Transaction Summary
==============================================================================================================================
Upgrade  1 Package

Total download size: 63 M
Is this ok [y/d/N]: y

Filebeat

# yum update filebeat
Loaded plugins: priorities, update-motd, upgrade-helper
Resolving Dependencies
--> Running transaction check
---> Package filebeat.x86_64 0:1.3.1-1 will be updated
---> Package filebeat.x86_64 0:6.1.1-1 will be an update
--> Finished Dependency Resolution

Dependencies Resolved

===================================================================================================================================================================================================
 Package                                        Arch                                         Version                                       Repository                                         Size
===================================================================================================================================================================================================
Updating:
 filebeat                                       x86_64                                       6.1.1-1                                       elastic-6.x                                        12 M

Transaction Summary
===================================================================================================================================================================================================
Upgrade  1 Package

Total download size: 12 M
Is this ok [y/d/N]: y

1.3.サービスの再起動

Kibana

# service kibana restart
kibana started

Filebeat

 service filebeat restart
2017/12/25 08:46:58.653044 beat.go:436: INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017/12/25 08:46:58.653113 metrics.go:23: INFO Metrics logging every 30s
2017/12/25 08:46:58.653234 beat.go:443: INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017/12/25 08:46:58.653256 beat.go:203: INFO Setup Beat: filebeat; Version: 6.1.1
2017/12/25 08:46:58.653386 client.go:123: INFO Elasticsearch url: http://192.100.0.4:9200
2017/12/25 08:46:58.653586 module.go:76: INFO Beat name: ip-192-100-0-36
Config OK
Stopping filebeat:                                         [  OK  ]
Starting filebeat: 2017/12/25 08:46:58.773001 beat.go:436: INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017/12/25 08:46:58.773063 metrics.go:23: INFO Metrics logging every 30s
2017/12/25 08:46:58.773112 beat.go:443: INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017/12/25 08:46:58.773132 beat.go:203: INFO Setup Beat: filebeat; Version: 6.1.1
2017/12/25 08:46:58.773280 client.go:123: INFO Elasticsearch url: http://192.100.0.4:9200
2017/12/25 08:46:58.773479 module.go:76: INFO Beat name: ip-192-100-0-36
Config OK
                                                           [  OK  ]

1.4.確認

KibanaとFilebeatのバージョンやログが取れているか等確認します。

Filebeatアップグレード前

# filebeat -version
filebeat version 1.3.1 (amd64)

Filebeatアップグレード後

# filebeat -version
filebeat version 6.1.1 (amd64), libbeat 6.1.1

Kibanaアップグレード前
kibana-version.png

kibanaアップグレード後
kibana6.1ver.png

あ~~ elasticsearchとバージョンが一致しないって怒られてますね。
マイナーバージョンも一致が必要なのは作業的に面倒ですね・・

というわけでまた前回の記事と同じようにアップグレードしてきました。

# curl -XGET 'localhost:9200/'
{
  "name" : "node001",
  "cluster_name" : "my-cluster",
  "cluster_uuid" : "e06BKBFFSpiSkFwNT3kWLw",
  "version" : {
    "number" : "6.1.1",
    "build_hash" : "bd92e7f",
    "build_date" : "2017-12-17T20:23:25.338Z",
    "build_snapshot" : false,
    "lucene_version" : "7.1.0",
    "minimum_wire_compatibility_version" : "5.6.0",
    "minimum_index_compatibility_version" : "5.0.0"
  },
  "tagline" : "You Know, for Search"
}

# curl -XGET 'http://localhost:9200/_cat/plugins?v'
name    component         version
node002 analysis-kuromoji 6.1.1
node002 x-pack            6.1.1
node003 analysis-kuromoji 6.1.1
node003 x-pack            6.1.1
node001 analysis-kuromoji 6.1.1
node001 x-pack            6.1.1

改めてKibanaを確認してみます。

kibana6-1.png

エラー直りました。通常のログイン後の画面に。
先ほどのエラー画面だから画面の色合いが違うのかと思っていたのですが、普通に落ち着いた感じになっております。
改めてバージョンを確認します。

kibana_ver6-1.png

OKですね。

では、次にfilebeatからデータが送られているかを確認します。

filebeat1.png

(新しいデータが)有りません

そこでfilebeatのログを確認してみたところ・・・

2017-12-27T07:53:01Z INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017-12-27T07:53:01Z INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017-12-27T07:53:01Z INFO Metrics logging every 30s
2017-12-27T07:53:01Z INFO Setup Beat: filebeat; Version: 6.1.1
2017-12-27T07:53:01Z INFO Elasticsearch url: http://192.100.0.4:9200
2017-12-27T07:53:01Z INFO Beat name: ip-192-100-0-36
2017-12-27T07:53:01Z INFO filebeat start running.
2017-12-27T07:53:01Z INFO Registry file set to: /var/lib/filebeat/registry
2017-12-27T07:53:01Z INFO Loading registrar data from /var/lib/filebeat/registry
2017-12-27T07:53:01Z INFO Total non-zero values:  beat.info.uptime.ms=3 beat.memstats.gc_next=4473924 beat.memstats.memory_alloc=3081016 beat.memstats.memory_total=3081016 filebeat.harvester.open_files=0 filebeat.harvester.running=0 libbeat.config.module.running=0 libbeat.output.type=elasticsearch libbeat.pipeline.clients=0 libbeat.pipeline.events.active=0 registrar.states.current=0
2017-12-27T07:53:01Z INFO Uptime: 3.375689ms
2017-12-27T07:53:01Z INFO filebeat stopped.
2017-12-27T07:53:01Z CRIT Exiting: Could not start registrar: Error loading state: Error decoding states: json: cannot unmarshal object into Go value of type []file.State

なんかエラー出て、filebeatが起動していない感じですかね。
そのエラーについて調べてみたところ、同じような状態になった方がおり、解決されていました。
https://discuss.elastic.co/t/exiting-could-not-start-registrar-error-loading-state-error-decoding-states-eof/74430

/var/lib/filebeat/registryを削除してから起動すれば良いようですね。
この環境は壊れても失うのは時間だけなので、やってみます。

# rm /var/lib/filebeat/registry
# service filebeat start
# cat /var/log/filebeat/filebeat

2017-12-27T08:14:08Z INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017-12-27T08:14:08Z INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017-12-27T08:14:08Z INFO Metrics logging every 30s
2017-12-27T08:14:08Z INFO Setup Beat: filebeat; Version: 6.1.1
2017-12-27T08:14:08Z INFO Elasticsearch url: http://192.100.0.4:9200
2017-12-27T08:14:08Z INFO Beat name: ip-192-100-0-36
2017-12-27T08:14:08Z INFO filebeat start running.
2017-12-27T08:14:08Z INFO No registry file found under: /var/lib/filebeat/registry. Creating a new registry file.
2017-12-27T08:14:08Z INFO Loading registrar data from /var/lib/filebeat/registry
2017-12-27T08:14:08Z INFO States Loaded from registrar: 0
2017-12-27T08:14:08Z INFO Loading Prospectors: 1
2017-12-27T08:14:08Z WARN DEPRECATED: input_type prospector config is deprecated. Use type instead. Will be removed in version: 6.0.0
2017-12-27T08:14:08Z INFO Starting Registrar
2017-12-27T08:14:08Z INFO Starting prospector of type: log; ID: 5240556406633074861
2017-12-27T08:14:08Z INFO Loading and starting Prospectors completed. Enabled prospectors: 1
2017-12-27T08:14:08Z INFO Harvester started for file: /var/log/secure
2017-12-27T08:14:09Z INFO Connected to Elasticsearch version 6.1.1
2017-12-27T08:14:09Z INFO Loading template for Elasticsearch version: 6.1.1
2017-12-27T08:14:09Z INFO Elasticsearch template with name 'filebeat-6.1.1' loaded

criticalなエラーは解決したようです。

しかしながら新たなエラーが。

2017-12-27T09:24:40Z ERR  Failed to publish events: temporary bulk send failure

そういえば、WARNもありますね。まずこれが気になるのでfilebeat.reference.ymlを見ながら修正してみました。

修正したfilebeat.yml

filebeat.modules:
- module: kafka
  log:
    enabled: true
filebeat.prospectors:
- type: log
  enabled: false
  paths:
    - /var/log/secure.log
output.elasticsearch:
  hosts: ["192.100.0.4:9200"]
setup.template.settings:
setup.kibana:
logging.to_files: true
logging.files:

filebeatを再起動することでWARNは消えました。

# cat /var/log/filebeat/filebeat
2017-12-27T09:49:12Z INFO Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]
2017-12-27T09:49:12Z INFO Metrics logging every 30s
2017-12-27T09:49:12Z INFO Beat UUID: 1267efb0-a1af-4f02-9e18-d7120d6bc2bc
2017-12-27T09:49:12Z INFO Setup Beat: filebeat; Version: 6.1.1
2017-12-27T09:49:12Z INFO Elasticsearch url: http://192.100.0.4:9200
2017-12-27T09:49:12Z INFO Beat name: ip-192-100-0-36
2017-12-27T09:49:12Z INFO Enabled modules/filesets: kafka (log),  ()
2017-12-27T09:49:12Z INFO filebeat start running.
2017-12-27T09:49:12Z INFO Registry file set to: /var/lib/filebeat/registry
2017-12-27T09:49:12Z INFO Loading registrar data from /var/lib/filebeat/registry
2017-12-27T09:49:12Z INFO States Loaded from registrar: 1
2017-12-27T09:49:12Z INFO Loading Prospectors: 2
2017-12-27T09:49:12Z INFO Starting Registrar
2017-12-27T09:49:12Z INFO Starting prospector of type: log; ID: 15188226147135990593
2017-12-27T09:49:12Z INFO Loading and starting Prospectors completed. Enabled prospectors: 1

ERRが出力されるのかしばらく待ちます。

ERRも出なくなってました。しかし肝心のデータがkibanaで表示されない・・・

 おわりに

今回はこれで終わりにしたいと思います。
また来年続きやります・・・

こんなんばっかりや・・

続きを読む

ALBのアクセスログに対してElasticStackのMachine Learningを試してみた

はじめに

Advent Calendar25日目ということでクリスマスですね!:santa:
でも、いつもと変わらないですね!

ということで、
今回は、異常検知してくれる?ElasticStackのX-Packの中のMachine Learningについて試してみましたので書きたいと思います( ゚Д゚)ゞビシッ1

こんな流れで話すよ

  1. ログ取込フロー
  2. 環境について
  3. ALBのログを出力して、LogstashからElasticsearchにストア
  4. Machine Learning使うよ!

ログ取込フロー

AWSでWebサービスを提供している環境で、WebAPサーバの前段にはALBを配置してます。
以下の流れでログを取り込みます。

image.png

  1. ALBログを指定バケットに出力
  2. LogstashがS3からALBのログを収集
  3. LogstashがElasticsearchにストア

環境について

  • ALB
  • Amazon Linux AMI 2017.09.1 (HVM)
  • Logstash 6.0
  • logstash-input-s3
  • Elasticsearch 6.0
  • Kibana 6.0
  • X-Pack 6.0

ElasticStackは、事前にインストールしていることを前提にしてます。
インストールされていない環境は、以下を参考にして頂ければと思います。

ALBのログを出力して、LogstashからElasticsearchにストア

ALBのロギングを有効化については以下を参考にしてください。

ここからは、S3に格納されているALBログをLogstashで取得するための方法について書きます。

S3 input pluginをインストール

S3からログを取得するには、Logstashのプラグインである”S3 input plugin”をインストールする必要があります。

インストール方法を以下に書きます。

$ cd /usr/share/logstash
$ bin/logstash-plugin install logstash-input-s3

ALBのログを取り込むための準備

ALBのログは、以下の形式で出力されます。

### Log Format
type timestamp elb client:port target:port request_processing_time target_processing_time response_processing_time elb_status_code target_status_code received_bytes sent_bytes "request" "user_agent" ssl_cipher ssl_protocol target_group_arn trace_id

### Sample Log
http 2016-08-10T22:08:42.945958Z app/my-loadbalancer/50dc6c495c0c9188 
192.168.131.39:2817 10.0.0.1:80 0.000 0.001 0.000 200 200 34 366 
"GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - 
arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067
"Root=1-58337262-36d228ad5d99923122bbe354"

この出力されたログを取り込むために、Grok Patternを作成したり、型変換をLogstashで施す必要があります。

フィールド定義

それでは、ログフォーマットからフィールド名と型の定義を行います。
ALBが受け付けてからクライアントに返すまでの処理時間を表すフィールドをfloatにします。
対象は、”request_processing_time”、”target_processing_time”、”response_processing_time”です。

Log Field Type
type type string
timestamp date date
elb elb string
client:port client_port string
target:port target_port string
request_processing_time request_processing_time float
target_processing_time target_processing_time float
response_processing_time response_processing_time float
elb_status_code elb_status_code string
target_status_code target_status_code string
received_bytes received_bytes long
sent_bytes sent_bytes long
request ELB_REQUEST_LINE string
user_agent user_agent string
ssl_cipher ssl_cipher string
ssl_protocol ssl_protocol string
target_group_arn target_group_arn string
trace_id trace_id string

Grok Pattern

ALBのログに正規表現をかけて、構造化するため、Grokする必要があります。
Grokを直接Logstashのconfファイルに書くことも可能ですが、可読性を重視するため、パターンファイルとして外出しします。

以下が、ALBのログをよしなに取り込むためのGrok Patternファイルです。

alb_patterns
### Application Load Balancing
ALB_ACCESS_LOG %{NOTSPACE:type} %{TIMESTAMP_ISO8601:date} %{NOTSPACE:elb} (?:%{IP:client_ip}:%{INT:client_port}|-) (?:%{IP:backend_ip}:%{INT:backend_port}|-) (:?%{NUMBER:request_processing_time}|-1) (?:%{NUMBER:target_processing_time}|-1) (?:%{NUMBER:response_processing_time}|-1) (?:%{INT:elb_status_code}|-) (?:%{INT:backend_status_code}|-) %{INT:received_bytes} %{INT:sent_bytes} \"%{ELB_REQUEST_LINE}\" \"(?:%{DATA:user_agent}|-)\" (?:%{NOTSPACE:ssl_cipher}|-) (?:%{NOTSPACE:ssl_protocol}|-) %{NOTSPACE:target_group_arn} \"%{NOTSPACE:trace_id}\"

Multiple Pipelines

Logstashのconfファイルを呼び出すためにMultiple Pipelinesを使用します。
Pipeline.ymlの定義は、以下です。

pipelines.yml
- pipeline.id: alb
  pipeline.batch.size: 125
  path.config: "/etc/logstash/conf.d/alb.cfg"
  pipeline.workers: 1

Pipelineの設定については、以下を参考にして頂ければと思います。

Logstashのディレクトリ構成

ここまで準備しましたが、以下の様に各ファイルを配置します。

/etc/logstash
  ├ logstash.yml
  ├ conf.d
  │ └ alb.cfg
  ├ jvm.options
  ├ log4j2.properties
  ├ alb_patterns
  └ pipelines.yml
  • ALBのalb.cfgをconf.d配下に配置
  • ALBのGrok PatternファイルをLogstash配下に配置
  • alb.cfgを呼び出すpipeline.ymlをLogstash配下に配置

ALBのconfファイルを作成するよ

  • Input: S3からALBのログを取得する2

  • Filter: 取得したログに対してフィルタをかける

  • Output: Elasticsearchにストア

alb.cfg
input {
  s3 {
    region => "ap-northeast-1"
    bucket => "hoge"
    prefix => "hoge"
    ### S3へのポーリング間隔を指定
    interval => "60"
    sincedb_path => "/var/lib/logstash/sincedb_alb"
  }
}
filter {
  ### 読み込むGrok Patternファイルを"patterns_dir"で指定
  grok {
    patterns_dir => ["/etc/logstash/alb_patterns"]
    match => { "message" => "%{ALB_ACCESS_LOG}" }
  }
  date {
    match => [ "date", "ISO8601" ]
    timezone => "Asia/Tokyo"
    target => "@timestamp"
  }
  ### グローバルIPから国などをマッピングするために指定
  geoip {
    source => "client_ip"
  }
  ### デフォルトの型がstringのため、フィールド定義で定義した型に変換
  mutate {
    convert => [ "request_processing_time", "float" ]
    convert => [ "response_processing_time", "float" ]
    convert => [ "target_processing_time", "float" ]
    convert => [ "received_bytes", "int" ]
    convert => [ "sent_bytes", "int" ]
    ### 不要なフィールドを削除
    remove_field => [ "date", 'message' ]
  }
}
output {
  elasticsearch {
    hosts => [ "localhost:9200" ]
  }
}

ここまで整ったらLogstashを起動

$ initctl start logstash
logstash start/running, process 3121

Machine Learning使うよ!

KibanaでアクセスしてMachine Learingをクリックします。
新しくジョブを作成するので、”Create new job”をクリックします。

以下の画面で”Single metric”をクリックします。

image.png

“Aggregation”と”Filed”を指定します。
Filedは、”target_processing_time”にすることで、バックエンドのサーバへリクエストしてから返ってくるまでの時間をみます。
そうすることで、サーバ内の処理遅延が発生しているかを確認できます。
画面の右上から期間を指定します。

image.png

“Name”と”Description”を入力し、”Create job”をクリックします。

image.png

ジョブが実行され、結果を表示するため”View Results”をクリックします。

image.png

指定期間の中で普段より乖離している部分を赤やオレンジでマークされます。

image.png

今回は、Single metricでやりましたが、Multi metricを使うことで、
グローバルIPアドレスやUserAgentと合わせて検出することも可能です。

さいごに

いかがでしたか?ログを取り込んでジョブを作成するだけで簡単に実行できました。
今までは、人の目で判断していたところをMachine Learningを用いることで、普段の傾向から乖離している部分を自動で検出してくれるのは便利ですね。

Advent Calendar最終日ですが、これにておしまいです。
ありがとうございました!


  1. Machine Learningは、X-Packという有償プラグインですが、期間限定で試すことが可能です 

  2. S3バケットへのアクセスするためのIAM Roleが適用されていることを前提とする 

続きを読む

CloudWatchLogsAPIとfluent-plugin-s3を使ったLambdaログの保管と分析、可視化について

はじめに

これは、Sansan Advent Calendar 2017の24日目の記事です。

  • CloudWatchLogsはログを時系列で絞込検索がしにくいとか、見にくいし使いにくいツールである
  • 本番でLambdaの関数がxx 個以上動いており、障害発生時の調査にはCloudWatchLogsを頑張って使うしかない
  • apexでログを見る方法もないわけではないが、ずっとtailするのは難しい
  • CloudWatchLogsにずっと置いとくのもアレ
  • CloudWatchLogs早くいい感じになって欲しい

ということですでにfluentdなどで収集しているアプリケーションログやアクセスログと同様にS3に保管し、Elasticsearchに乗せてKibanaで検索、分析できるようにしたかったのです。CloudWatchLogsAPIとfluent-plugin-s3を使ってLambdaログの保管と分析、可視化をできるようにしました。

あきらめた案

fluent-plugin-cloudwatch-logsで収集しようともしていました。しかし、新たに生み出されるLogStreamの収集ができず、理由もよくわからず断念した。fluentdをpryで止めて結構デバッグしたものの、fluentd力もっとほしい…!となりました。
Lambdaのログは、CloudWatchLogsのLogGroup以下にLogStreamとしてつくられていくが、LogStreamに[$LATEST]というLambdaのバージョンプレフィックス的なやつが入るため、fluentdのbuffer_path設定と相性が悪かったです。(この仕様マジでやめてほしい。括弧とかドル記号とか入らないでほしい。)
また、50個以上のLogStreamsがある際に、このpluginではログを扱うArrayが予期せぬ入れ子構造になってしまっていたため修正しました。
どちらもマージされてはいるが、力不足によりこのプラグインでは収集を完遂できませんでした。

https://github.com/ryotarai/fluent-plugin-cloudwatch-logs/pull/80
https://github.com/ryotarai/fluent-plugin-cloudwatch-logs/pull/84

詳細な説明

ピタゴラ装置

Lambda

まずは、CloudWatchLogsのAPIを叩くためのLambdaをつくりました。Python3.6で実装しました。
このLambdaは、CloudWatchLogs.Client.create_export_taskを叩いています。
http://boto3.readthedocs.io/en/latest/reference/services/logs.html#CloudWatchLogs.Client.create_export_task

しかし、エクスポート対象にする期間の指定はコード上に置いておきたくありませんでした。(create_export_taskのfrom, toの指定)
そのため、前回実行時のタイムスタンプをS3上に置くようにし、これが無ければ現在からn時間分エクスポートし、あればfromにセットするという実装にしました。こうすることで、実行タイミングはCloudWatch EventsのRuleのみで与えられるようになります。

Lambdaの周辺

fluent-plugin-s3では、S3からログを取り込むことができます。S3イベント通知でSQSに流しています。あとはわりと普通です。

cloudwatchlogs-import-after.png

ちなみに

アレがこれであーなので、実はあと一歩のところで本番投入できていません。年内にはやっておきたいです。
現在の職場での特異的な話が絡むfluent-s3-plugin関連の設定についてなので、根底から覆るような話ではありません。もし同じような構成を考えている人がいたら安心してほしいです。

最後に

CloudWatchLogsが使いやすくなることや、Lambdaのログが自動的にS3へエクスポートし続ける設定がほしいです。LambdaのためにLambdaを作ることが減るといいなぁと思いますので、サンタさん(AWS)何卒よろしくお願いします。

続きを読む

Amazon Elasticsearch ServiceのKibanaへ、SSH port forwardingでトンネルを掘り、踏み台サーバ経由でアクセスする

TL;DR

たまにSSH port forwardingのやり方を聞かれるのでメモがてら。

3+1行で説明するとこんな感じ。
SSH port forwardingでトンネルを掘る

ssh -i ~/Path_to_SSH_key/id_xxxx {account_name}@{踏み台サーバIP or DNS name} -L 8888:{Kibana private IP}:80 -N

ブラウザで(トンネルを通ってKibanaへアクセスする

http://localhost:8888/_plugin/kibana

Kibanaへのアクセス

今のところKibanaへアクセスするニーズがあるのは開発者のみ。
開発者は踏み台サーバにアカウントを持っている。

というわけで ssh port forwardingでアクセスできれば良しとしているケースです。

手順

1. KibanaのPrivate IPを確認

※東京リージョンの場合
https://ap-northeast-1.console.aws.amazon.com/es/home
My domains > “Kibanaにアクセスしたいドメイン” > Overview

Screen Shot 2017-12-20 at 17.50.05.png

赤枠部分のドメインをメモし、IPを確認する

$ host {赤枠部分のドメイン}.es.amazonaws.com
{赤枠部分のドメイン}.es.amazonaws.com has address 172.xx.xx.xx
{赤枠部分のドメイン}.es.amazonaws.com has address 172.xx.xx.xx

2. SSH port forwardingでトンネルを掘る

ssh -i ~/Path_to_SSH_key/id_xxxx {account_name}@{踏み台サーバIP or DNS name} -L 8888:{Kibana private IP:Port}:80 -N


e.g)
ssh -i ~/.ssh/id_rsa nntsugu@ec2-xx-xxx-xxx-xxx.ap-northeast-1.compute.amazonaws.com -L 8888:xxx.xxx.xxx.xxx:80 -N

3. ブラウザで(トンネルを通ってKibanaへアクセスする

http://localhost:8888/_plugin/kibana

お手軽に検索

Kibana > Dev Tools > Console
からだとJSONでクエリを投げられるのでとっつきやすい。かもしれない。

GET _search
{
  "query": {
    "match": {
      "severity": "ERROR"
    }
  }
}

続きを読む

Meraki WiFi 位置情報の活用シーン

はじめに

以下2つの記事は Meraki Scanning API に関して、設定手順を中心にまとめています:

本記事はこれらの補足として、もう少しイメージが付くよう活用シーン付きで基本的な事柄も交えながら説明を試みます。

活用シーン

Meraki WiFi の活用シーンを小売店と想定します。Meraki WiFi はもともと次のような用途に向いています。

  • 店舗で WiFi を使えるようにしたい
  • 用途は業務用・お客様用の両方に使いたい
  • ビデオ視聴も可能な高速アクセスが必要
  • 店舗数は一店~数万店
  • 専任管理者がいないのでできるだけ簡単に運用したい
  • WiFi 位置情報を活用したい

Meraki WiFi の説明

そもそも Meraki WiFi とは?

Meraki WiFi はクラウド管理型 WiFi サービスです。

一般に、クラウドとは皆さんご存知の通り、データの保存や処理を手元の端末で行うのではなくインターネットを通じて「向こう側」に保存・処理する手法を指します。ところが、中には「向こう側」では不都合が出るサービスもあります。例えばリアルタイム処理が必要で遅延が問題となる場合には「こちら側」で処理できるよう機器を置くといいかもしれません。

WiFi も「向こう側」で全てを行うことはできません。WiFi 電波は仕様上 30メートルほどしか飛びませんので例えば東京タワーから送信しても「こちら側」の端末まで届きません。そこで Meraki アクセスポイント(AP) を「こちら側=店舗」に置くことになります。

AP がこちらにある以外は通常のクラウドサービスと同じ使い勝手で設定や監視ができるため、従来の WiFi AP を買ってきて個々に設定・設置する方法に比べると、特に多店舗では工数が圧倒的に少なくなるメリットがあります。

写真1:Meraki AP

image.png

クラウドへは Web ブラウザ経由でアクセスします。仮に複雑な機能を使ってもクラウド上に設定が残りますので、故障などで AP を交換した場合に一から再設定する必要はありません。また、数万 AP に対して一斉に設定変更を行うこともできます。

イメージ1:Meraki ダッシュボード(日本語対応済)
image.png

WiFi 端末の位置情報について(Meraki ダッシュボード)

スマホ等の WiFi 端末は使っていなくても(WiFi がオンになっていれば)プローブリクエストを定期的に送信します。送信間隔は機器の種類や状態によって異なります。

スマホの状態(iOS、アンドロイド等) プローブリクエストの間隔
スリープ(スクリーンオフ) 1分に1回程度
スリープ(スクリーンオン) 10-15回/分
APにアソシエート アプリケーションに大きく依存

WiFi 端末の送信するプローブリクエストを Meraki AP が受信すると、その時刻を SeenTime として記録します。また、プローブリクエストから端末の位置を算出する機能を持ちます。

WiFi 端末の識別には MACアドレスが使われますが、プライバシーの問題もあるため Meraki クラウドは一切の MAC アドレス情報を保存せず、そのハッシュ値のみを保存しています。

これらデータを利用して生成された端末位置情報ヒートマップを Meraki ダッシュボード(標準の管理インターフェイス)で見ることができます。

イメージ2:端末位置情報ヒートマップ(青はアソシエートした端末、灰色はアソシエートしていない端末、風船アイコンはAP)
image.png

また、データ解析レポートも Meraki ダッシュボードで見ることができます。

イメージ3:データ解析レポート
image.png

グラフは上から順に以下の意味を持っています:

  • 一つ目:近くを通っただけの人(Passersby)、訪問した人(Visitors)、接続して使った人(Connected)の割合
  • 二つ目:滞在時間毎の割合
  • 三つ目:訪問頻度の割合

データ解析レポートでは次のような比較が可能です。マーケティング的な分析に使えそうです。

  • 一店舗内のデータを異なる期間で比較(今週と先週)
  • 複数店舗間のデータを同一期間で比較(A店とB店、A店と全店の平均、A店とA~D店の平均、A~D店の平均と全店の平均)

以上は標準の Meraki ダッシュボードで利用可能です。

Scanning API の活用方法

端末位置情報を保持するには?

Meraki クラウドは上述のレポート提供を目的として WiFi 端末の位置情報を利用していますが、「個々の端末の時系列の位置情報」を保存している訳ではありません。保存したいと思った場合には Meraki クラウドから Scanning API を経由して取得可能です。ただし、保存用サーバを準備する必要があります。

サーバの準備が面倒であれば、例えば AWS の API Gateway/Lambda/DynamoDB を組み合わせて使うことでサーバレスにて比較的簡単に実現できます。手順はこちらの記事をご参照下さい(データ量は多くなり得るのでご注意下さい)。

DynamoDB は NoSQL データベースで可視化はあまり得意ではありません。可視化には同じく AWS の Elasticsearch service/kibana が利用できます。手順はこちらの記事をご参照ください。

端末位置情報から来店の有無を確認する

多くの小売店ではポイントカードを導入しており、お客様毎の購買履歴をデータとして蓄積しているかもしれません。ポイントカードは基本的に購入時のみ使うので、お客様が何も購入せずに店舗を去った場合には履歴が残りません。

それに対し、WiFi 位置情報はお客様が来店すれば(WiFi がオンになっている端末を持っている前提)データが残るため、これまでに扱えなかったデータの分析ができるようになる可能性があります。例えば、来店に対する購入頻度や、商品入替え・欠品による機会損失のより正確な額が求められるかもしれません。

イメージ4:既存お客様購買履歴とWiFi位置情報をマッピングした例。Meraki が提供する機能ではなくあくまでアイデアです。
image.png

※ イメージ4を実現するには、MACアドレスと個人情報の紐づけを何らかの方法で行う必要もあります

もっとも、お客様にとって来店記録が残ること自体気持ち良いものではなく、個人情報保護法を遵守しながらプライバシーを慎重に扱う必要があります(オプトイン・オプトアウト、保存データのハッシュ化等)。お客様にとって不利益が無く、新しい顧客体験を提供できることが何より重要かもしれません。技術の領域を越えますのでこの議論はここまでにしておきたいと思います。

最後に、MAC アドレス毎の SeenTime (AP が端末のプローブリクエストを受信した時刻)を可視化したグラフを作ってみました。Meraki の位置情報にご興味持たれた方は、データ活用に使えそうか眺めて頂けると幸いです。

DynamoDB 上のデータを CSV でファイルへ出力し、matplotlib で読み込んで描画しました。

イメージ5:MACアドレス毎の SeenTime (MACアドレスは左側をちょん切ってます)
image.png

イメージ5のコード
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pylab
from datetime import datetime as dt

# グラフ表示する MACアドレスの数を指定
cmac_num4graph = 50

# DynamoDB から取得した CSV ファイルの読込み
df = pd.read_csv("cmxdata.csv", parse_dates = ['seenString (S)'])

# MACアドレスの一覧作成
cmac_index = set(df['clientMac (S)'])

# MACアドレスの一覧作成(グラフ表示用)
cmac_index = list(cmac_index)[:cmac_num4graph]

# CSV ファイルから 'clientMac (S)' と 'seenString (S)' のみ抜き出したデータフレーム StringMac を作成
StringMac = pd.concat([df['clientMac (S)'], df['seenString (S)']], axis = 1)

# グラフ描画用のデータフレーム result_df を作成
result_df = pd.DataFrame()
for mac in cmac_index:
    seenString = StringMac[StringMac['clientMac (S)'] == mac]['seenString (S)']
    seenString = pd.Series(seenString, name = mac)
    df_seenString = seenString.to_frame()
    result_df = pd.concat([result_df, seenString], axis = 1)

# グラフを描画
pylab.figure(figsize=(15, 8))
xmin = dt(2017,12,17,7,22,0)
xmax = dt(2017,12,17,7,33,0)
for (y, mac) in enumerate(cmac_index):
    m = result_df[mac].dropna()
    y = np.linspace(y, y, len(m))
    result_df[mac].dropna()
    plt.scatter(m.tolist(), y)

plt.grid(True)
plt.xlim(xmin,xmax)
plt.yticks(range(0, cmac_num4graph, 1), cmac_index, fontsize = 8)
plt.show()

続きを読む