新卒エンジニアとして半年ちょっとで得た気づき

新卒でエンジニアとしてデータ基盤関連の部署に入って、この約半年とちょっとで学んだことをまとめたいと思います。そして振り返って特に大事だと思ったことについて書いていこうと思います。

自己紹介

 まず自分の紹介からなのですがもともと物理系の院に通っており、fortranをひたすら書いていた状態からエンジニアとして新卒で入社しました。入社半年前にやばいと思いpythonなどを軽く触っていましたが、ほぼ経験なしの状態でしたがOJTでいきなり現場に入りました。自分と似た境遇の人がどのくらいいるかわかりませんがそのような人々に対して何か参考になるところがあれば幸いです!
 部署はデータ基盤系の部署と兼務でデータを使った何でも屋の部署に配属されました。何でも屋の方ではまだあまり活動をできていなかったので主にデータ基盤系の部署の話が中心になります。

やってきたこと

 自分の整理も兼ねてやってきたことを箇条書きにしてますが、基本的に読み飛ばしちゃって大丈夫です!
個人的に大きな気づきがあったところは赤字にしています。

時期 内容 学んだ知識
4月~5月 ・データ基盤理解のために運用の仕事を中心に行う SQL
アカウント発行・データ連携対応・障害対応 Python・AWS
上司の趣味によりruby使いになる(railsチュートリアル制覇) Ruby
6月~8月 郵便番号をデータ基盤に連携する(初のデータ開発的な動き?)
社内用botのエンハンスで機能追加や内部処理のapi化(Jenkins) Chatops・Jenkins・api
定常モニタリング用にredashをterraformで作成し始める terraform・docker・ECS・redash
アカウント作成webアプリをフルスクラッチで書き始める(rails) rails
9月~10月 jenkinsとGHEの連携を使って自分の開発周りを自動化する
データ連携作業から延長してモニタリングレポートの作成(Tableau) Tableau
slack botのエンハンス(snippetを使ったり)
社内のデータ基盤ユーザーに利用者アンケートを取り始める
アカウント作成webアプリが完成と同時にDBの構造、命名規則等の指摘を受けまくる ドメイン駆動(触りのみ)・要件定義
11月~12月 社内向けデータ基盤勉強会を開催する
S3でwebアプリを簡単に実装(サーバーレス) js
slack botの裏側をjenkinsからapi gatewayに(サーバーレス) api-gateway・lambda・AWS batch

半年ちょっとエンジニアをして思ったこと

 感じたことはたった一つだけでいかに自分の開発スピードを加速できるかに全てがかかっているということです。先輩の受け売りになりますが自分の能力yをy=ax+bで表すとしたらaをあげることが重要ってことを実感しました。そして自分の開発スピードを加速させるために意識したことが以下の三つです。

  1. できるエンジニアの近くで作業する(運と行動)
  2. 開発で自動化できるところは早い段階で済ませる(Jenkins)
  3. 変更を前提で開発を行う(terraform、docker、サーバーレス)

 一つ目は説明はいらないですね笑。2つ目は特に強調したいところで、自動化が上手くなるにつれてどんどん開発スピードが加速します。現在gitにコードをあげればその後は全てJenkinsがやってくれる状態にしてています。3つ目に関してはterraformとdocker、サーバーレスを例に挙げていますが、普段のコードの書き方でも気をつけた方が絶対いいです。(DBの構造、命名規則等の指摘を受けまくるの時に生き残れたのは少しだけこれを意識した設計になっていたからだと思います)

最後に

 三つを気づきとして挙げたのですが成長率をあげる方法という意味では無限に手段があると思っています。今後もエンジニア方面に限らず成長率をいかにあげるかを常に考えていきたいです。

続きを読む

Kubernetes上のアプリケーションログを自動収集する

image.png

TL;DR;

新サービスや既存サービスをKubernetesに移行するたびに、ログの収集設定のためインフラエンジニア待ちになってしまうのは面倒ですよね。
そこで、アプリのログをFluentdとDatadog LogsやStackdriver Loggingで自動的に収集する方法を紹介します。

主に以下のOSSを利用します。

今回はDatadog Logsを使いますが、Stackdriver Loggingを使う場合でもUIやAPIクレデンシャル等の設定以外は同じです。

お急ぎの方へ: アプリ側の設定手順

標準出力・標準エラーログを出力するだけでOKです。

参考: The Twelve-Factor App (日本語訳)

詳しくは、この記事の「サンプルアプリからログを出力する」以降を読んでください。

あとはクラスタ側に用意しておいたFluentdの仕事ですが、Kubernetesがノード上に特定のフォーマットで保存するため、アプリ毎の特別な設定は不要です。

まえおき1: なぜDatadogやStackdriver Loggingなのか

分散ロギングのインフラを準備・運用するのがつらい

分散ロギングと一口にいっても、実現したいことは様々です。例えば、多数のサービス、サーバ、プロセス、コンテナから出力されたログを

  1. 分析などの用途で使いやすいようにETLしてRedshiftのようなデータウェアハウスに投入しておきたい
  2. S3などのオブジェクトストレージに低コストでアーカイブしたい
  3. ほぼリアルタイムでストリーミングしたり、絞込検索したい
    • Web UI、CLIなど

1.はtd-agent + TreasureData or BigQuery、2.はfluentd (+ Kinesis Streams) + S3、3.はfilebeat or Logstash + Elasticsearch + Kibana、Graylog2、専用のSaaSなど、ざっとあげられるだけでも多数の選択肢があります。

方法はともかく、できるだけ運用保守の手間を省いて、コアな開発に集中したいですよね。

メトリクス、トレース、ログを一つのサービスで一元管理したい・運用工数を節約したい

「Kubernetesにデプロイしたアプリケーションのメトリクスを自動収集する – Qiita」でも書きましたが、例えばKubernetesの分散ロギング、分散トレーシング、モニタリングをOSSで実現すると以下のような構成が定番だと思います。

  • 基本的なグラフ作成とメトリクス収集、アラート設定はPrometheus
  • 分散ログはEKF(Elasticsearch + Kibana Fluentd)
  • 分散トレースはZipkinやJaeger

もちろん、ソフトウェアライセンス費用・サポート費用、将来の拡張性などの意味では良い判断だと思います。

一方で、

  • アラートを受けたときに、その原因調査のために3つもサービスを行ったり来たりするのは面倒
  • 人が少ない場合に、セルフホストしてるサービスの運用保守に手間をかけたくない
    • アカウント管理を個別にやるだけでも面倒・・最低限、SSO対応してる?

などの理由で

  • 個別のシステムではなく3つの役割を兼ねられる単一のシステム

がほしいと思うことがあると思います。

fluentd + Datadog Logs/Stackdriver Logging

StackdriverとDatadogはSaaSで、かつ(それぞれサブサービスで、サブサービス間連携の度合いはそれぞれではありますが、)3つの役割を兼ねられます。

SaaSへログを転送する目的でfluentdを利用しますが、Kubernetesのログを収集するエージェントとしてfluentdがよく使われている関係で、Kubernetes界隈でよく使われるfluentdプラグインに関しては、よくある「メンテされていない、forkしないと動かない」という問題に遭遇しづらいというのも利点です。

まえおき2: なぜDatadogなのか

もともとStackdriver Loggingを利用していたのですが、以下の理由で乗り換えたので、この記事ではDatadog Logsの例を紹介することにします。

  • メトリクスやAPMで既にDatadogを採用していた
  • UI面で使いやすさを感じた

UI面に関して今のところ感じている使いやすさは以下の2点です。

  • ログメッセージの検索ボックスでメタデータの補完が可能

    • hostで絞込をしようとすると、hostの値が補完される
    • あとで説明します
  • 柔軟なFaceting
    • Datadog LogsもStackdriver Loggingもログにメタデータを付与できるが、Datadog Logsは任意のメタデータキーで絞り込むためのショートカットを簡単に追加できる
    • あとで説明します

Stackdriver Loggingを利用する場合でも、この記事で紹介する手順はほぼ同じです。Kubernetesの分散ロギングをSaaSで実現したい場合は、せっかくなので両方試してみることをおすすめします。

Fluentdのセットアップ手順

DatadogのAPIキー取得

Datadog > Integratinos > APIsの「New API Key」から作成できます。

image.png

以下では、ここで取得したAPIキーをDD_API_KEYという環境変数に入れた前提で説明を続けます。

fluentdのインストール

今回はkube-fluentdを使います。

$ git clone git@github.com:mumoshu/kube-fluentd.git
$ cd kube-fluentd

# 取得したAPIキーをsecretに入れる
$ kubectl create secret generic datadog --from-literal=api-key=$DD_API_KEY

# FluentdにK8Sへのアクセス権を与えるためのRBAC関連のリソース(RoleやBinding)を作成
$ kubectl create -f fluentd.rbac.yaml

# 上記で作成したsecretとRBAC関連リソースを利用するfluentd daemonsetの作成
$ kubectl create -f fluentd.datadog.daemonset.yaml

設定内容の説明

今回デプロイするfluentdのmanifestを上から順に読んでみましょう。

kind: DaemonSet

Kubernetes上のアプリケーションログ(=Podの標準出力・標準エラー)は各ノードの/var/log/containers以下(より正確には、そこからsymlinkされているファイル)に出力されます。それをfluentdで集約しようとすると、必然的に各ノードにいるfluentdがそのディレクトリ以下のログファイルをtailする構成になります。fluentdに限らず、何らかのコンテナをデプロイしたいとき、KubernetesではPodをつくります。Podを各ノードに一つずつPodをスケジュールするためにはDaemonSetを使います。

  updateStrategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 1

DaemonSetをアップデートするとき(例えばDockerイメージを最新版にするためにタグの指定を変える)、Podを1つずつローリングアップデートします。アップデートによってfluentdが動かなくなった場合の影響を抑えることが目的ですが、気にしない場合はこの設定は記述は不要です。

serviceAccountName: fluentd-cloud-logging

kubectl -f fluent.rbac.yamlで作成されたサービスアカウントを利用する設定です。これがないとデフォルトのサービスアカウントが使われてしまいますが、ほとんどのツールでつくられたKubernetesクラスタではデフォルトのサービスアカウントに与えられる権限が絞られているので、デフォルトのサービスアカウントではkube-fluentdが動作しない可能性があります。

      tolerations:
      - operator: Exists
        effect: NoSchedule
      - operator: Exists
        effect: NoExecute
      - operator: Exists

KubernetesのMasterノードや、その他taintが付与された特定のワークロード専用のWorkerノード含めて、すべてのノードにfluentd podをスケジュールための記述です。何らかの理由でfluentdを動作させたくないノードがある場合は、tolerationをもう少し絞り込む必要があります。

env:
        - name: DD_API_KEY
          valueFrom:
            secretKeyRef:
              name: datadog
              key: api-key
        - name: DD_TAGS
          value: |
            ["env:test", "kube_cluster:k8s1"]

一つめは、secretに保存したDatadog APIキーを環境変数DD_API_KEYにセットする、二つめはfluentdが収集したすべてのログに二つのタグをつける、という設定です。タグはDatadogの他のサブサービスでもよく見られる形式で、”key:value”形式になっています。

Datadogタグのenvは、Datadogで環境名を表すために慣習的に利用されています。もしDatadogでメトリクスやトレースを既に収集していて、それにenvタグをつけているのであれば、それと同じような環境名をログにも付与するとよいでしょう。

kube_clusterは個人的におすすめしたいタグです。Kubernetesクラスタは複数同時に運用する可能性があります。このタグがあると、メトリクスやトレース、ログをクラスタ毎に絞り込むことができ、何か障害が発生したときにその原因が特定のクラスタだけで起きているのかどうか切り分ける、などの用途で役立ちます。

        ports:
        - containerPort: 24231
          name: prometheus-metrics

「Kubernetesにデプロイしたアプリケーションのメトリクスを自動収集する」で紹介した方法でdd-agentにfluentdのPrometheusメトリクスをスクレイプさせるために必要なポートです。

サンプルアプリからログを出力する

適当なPodを作成して、testmessage1というメッセージを出力します。

$ kubectl run -it --image ruby:2.4.2-slim-stretch distlogtest-$(date +%s) -- ruby -e 'puts %q| mtestmessage1|; sleep 60'

ログの確認

何度か同じコマンドを実行したうえで、DatadogのLog Explorerでtestmessage1を検索してみると、以下のようにログエントリがヒットします。

image.png

ログエントリを一つクリックして詳細を開いてみると、testmessage1というログメッセージの他に、それに付随する様々なメタデータが確認できます。

image.png

ログエントリに自動付与されたメタデータの確認

  • HOST: ログを出力したPodがスケジュールされているホスト名(=EC2インスタンスのインスタンスID)
  • SOURCE: コンテナ名
  • TAGS: Datadogタグ
    • pod_name: Pod名
    • kube_replicaset: ReplicaSet名
    • container_name: Dockerコンテナ名
    • kube_namespace: PodがスケジュールされているNamespace名
    • host: PodがスケジュールされているKubernetesノードのEC2インスタンスID
    • zone: Availability Zone
    • aws_account_id: AWSアカウントID
    • env: 環境名

Log Explorerを使うと、すべてのAWSアカウントのすべてのKubernetesクラスタ上のすべてのPodからのログが一つのタイムラインで見られます。それを上記のようなメタデータを使って絞り込むことができます。

ログエントリの絞り込み

ログエントリの詳細から特定のタグを選択すると、「Filter by」というメニュー項目が見つかります。

image.png

これを選択すると、検索ボックスに選択したタグがkey:value形式で入力された状態になり、そのタグが付与されたログエントリだけが絞り込まれます。

もちろん、検索ボックスに直接フリーワードを入力したり、key:value形式でタグを入力してもOKです。

Facetingを試す

定型的な絞り込み条件がある場合は、Facetを作成すると便利です。

ログエントリの詳細から特定のタグを選択すると、「Create new facet」というメニュー項目が見つかります。

image.png

これを選択すると、以下のようにどのような階層のどのような名前のFacetにするかを入力できます。

image.png

例えば、

  • Path: kube_namespace
  • Name: Namespace
  • Group: Kubernetes

のようなFacetを作成すると、ログエントリに付与されたkube_namespaceというタグキーとペアになったことがある値を集約して、検索条件のショートカットをつくってくれます。実際のNamespace Facetは以下のように見えます。

image.png

kube-system、mumoshu、istio-system、defaultなどが表示されていますが、それぞれkube_namespaceというタグキーとペアになったことがある値(=クラスタに実在するNamespace名)です。また、その右の数値はそのNamespaceから転送されたログエントリの件数です。この状態で例えばistio-systemを選択すると、kube_namespace:istio-systemというタグが付与されたログエントリだけを絞り込んでみることができます。

image.png

アーカイブ、ETLパイプラインへの転送など

kube-fluentdにはアーカイブやETLパイプラインのサポートは今のところないので、必要に応じてはfluentd.confテンプレート変更して、それを含むDockerイメージをビルドしなおす必要があります。

fluentd.confテンプレートは以下の場所にあります。

https://github.com/mumoshu/kube-fluentd/blob/master/rootfs/etc/confd/templates/fluent.conf.tmpl

fluentd.confテンプレートから参照できる環境変数を追加したい場合は、以下のconfd設定ファイルを変更します。

https://github.com/mumoshu/kube-fluentd/blob/master/rootfs/etc/confd/conf.d/fluent.conf.toml

// 今後、configmap内に保存したfluent.confの断片をfluentdの@includeを使ってマージしてくれるような機能を追加してもよいかもしれませんね。

まとめ

FluentdとDatadog Logsを使って、Kubernetes上のアプリケーションログを自動的に収集し、Datadog LogsのWeb UIからドリルダウンできるようにしました。

アプリ側はTwelve-Factor Appに則って標準出力・標準エラーにログを出力するだけでよい、という簡単さです。ドリルダウンしたり、そのためのFacetを作成するときも、グラフィカルな操作で完結できます。

また、ログの収集をするためだけにいちいちインフラエンジニアが呼び出されることもなくなって、楽になりますね!

Kubernetes上のアプリケーションの分散ロギングを自動化したい方は、ぜひ試してみてください。

(おまけ) 課題: ログメッセージに含まれるメタデータの抽出

Stackdriver Loggingではできて、Datadog Logsでは今のところできないことに、ログメッセージに含まれるメタデータの抽出があります。

例えば、Stackdriver Loggingの場合、

  • ログにメタデータを付与して検索対象としたい

    • 例えば「ログレベルDEBUGでHello World」のようなログを集約して、Web UIなどから「DEBUGレベルのログだけを絞り込みたい」

というような場合、アプリからは1行1 jsonオブジェクト形式で標準出力に流しておいて、fluent-plugin-google-cloud outputプラグイン(kube-fluentd内で利用しているプラグイン)でStackdriver Loggingに送ると、jsonオブジェクトをパースして、検索可能にしてくれます。

例えば、

{"message":"Hello World", "log_level":"info"}

のようなログをStackdriver Loggingにおくると、log_levelで検索可能になる、ということです。

このユースケースに対応する必要がある場合は、いまのところDatadog LogsではなくStackdriver Loggingを採用するとよいと思います。

今後の展望

同じくkube-fluentdでDatadog Logsへログを転送するために利用しているfluent-plugin-datadog-logに、fluent-plugin-google-cloudと同様にJSON形式のログをパースしてDatadogのタグに変換する機能を追加することはできるかもしれません。

また、Datadog Logsには、ログエントリのメッセージ部分に特定のミドルウェアの標準的な形式のログ(例えばnginxのアクセスログ)が含まれる場合に、それをよしなにパースしてくれる機能があります。その場合にログエントリに付与されるメタデータは、タグではなくアトリビュートというものになります。アトリビュートはタグ同様に検索条件に利用することができます。

ただ、いまのところfluent-plugin-datadog-logからの出力はすべてsyslog扱いになってしまっており、ログの内容によらず以下のようなアトリビュートが付与されてしまっています。

image.png

JSONをパースした結果がこのアトリビュートに反映されるような実装が可能であれば、それが最適なように思えます。

続きを読む

動画を探して自動ツイートしてくれるPython製botをAWSに載せてみた(前編)

TL;DR

  • YouTubeから動画を拾ってTweetするbotをPythonで開発し、AWS Lambdaに載せてみました
  • 全2記事です。前編のこちらでは、主にPythonでの開発周りのトピックにフォーカスします
    • TwitterAPIを使ってプログラムからツイートしてみます
    • YouTubeのページを構文解析し、文字列操作を使って動画URLを抽出してみます

動機

新しい職場にて初めてAWSを触ることになったので、これを機にと個人アカウントを取ってみました。チュートリアルだけというのももったいないので、何か自分のためのサービスを作って載せると面白そうです。

で、Twitterのbot開発にはもともと興味があったので、これも前から興味を持ちつつ触ってなかったPythonでbotを作り、lambdaを使って運用してみようと思い立ちました。AWS lambdaは2017年4月からPython3系を扱えるようになったので、心置き無く最新バージョンで書けそうだなー、というのも狙いです。

ユーザーストーリー

毎日の退勤をもう少し楽しみにするために、定時になると自分が興味ありそうなYouTube動画をbotが勝手に検索して、自分のTwitterアカウントに届けてくれるようにしたい。
スクリーンショット 2017-12-06 23.30.33.png

前提

  • 開発にはMacを使用します
  • Pythonは3.6系を使用します
  • pyenvもvirtualenvも使用しません。議論はあろうかと思いますが、個人開発なので。。
  • で、開発環境構築はこちらの記事等を参照しました
  • bot化したいTwitterアカウントはあらかじめ用意してあるものとします

TwitterAPIを使ってプログラムに呟かせる

アクセスキーの取得

bot化したいアカウントでTwitter Application Managementにログインすると、アプリケーションの作成とConsumer Key、及びAccess Tokenの取得ができます。

なお、Appの作成にはTwitterアカウントが電話番号認証済みである必要があります。認証済みでないと怒られるので、エラーメッセージ中のリンクからさらっと済ませておきましょう。

  • Consumer Key
  • Consumer Key Secret
  • Access Token
  • Access Token Secret

以上の4パラメータがあればプログラムからのツイートができます。コピーしてこんな感じのファイルを作っておきましょう。

config.py
CONSUMER_KEY        = "xxxxxxxxxxxxxxxxx"
CONSUMER_SECRET     = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ACCESS_TOKEN        = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"
ACCESS_TOKEN_SECRET = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy"

複数の外部ユーザーからアクセスがあるようなアプリケーションの場合(=「このアプリケーションとの連携を許可しますか?」など出るやつ)はそれぞれの役割についてもう少し説明が必要ですが、今回はある程度一緒くたに考えてしまっても実装に支障ありません。

PythonでOAuth認証

ライブラリの導入と管理

Pythonのライブラリは、パッケージ管理ツールであるpipでインストールできます。仮想環境がない場合、オプション無しで勝手にglobalに入るのがうーん、という感じですがまあそれは置いておいて。

PythonでHttp通信を行うライブラリとしては、requestsがポピュラーなようです。また、今回はTwitterAPIを使うための認証が必要なので、OAuth認証を扱えるライブラリも必須です。ここはrequestsと同じところが公開しているrequests_oauthlibを使用しました。

pip3 install requests requests_oauthlib

さて、インストールはできましたが、今度は開発するプロジェクトがこれらのライブラリに依存していることを表明しておくのがマナーです。js界隈で言うところのpackage.jsonですね。

Pythonでは依存関係を記したrequirements.txtなどを作っておくケースが多いようです。

requirements.txt
requests==2.18.4
requests-oauthlib==0.8.0

ちなみに、pip3 freeze > requirements.txtでインストールされた依存関係をrequirements.txtに吐き出せます。

逆に.txtファイルを元に一括インストールする場合は、-rオプションを用いてpip3 install -r requirements.txtなどと書けます。結構便利です。

つぶやいてみる

first_tweet.py
from requests_oauthlib import OAuth1Session
import config, json

twAuth = OAuth1Session(
  config.CONSUMER_KEY,
  config.CONSUMER_SECRET,
  config.ACCESS_TOKEN,
  config.ACCESS_TOKEN_SECRET)
apiURL = "https://api.twitter.com/1.1/statuses/update.json"
params = { "status": "プログラムにツイートさせてみるテスト" }

res = twAuth.post(apiURL, params = params)
print(json.loads(res.text))

先ほど作ったconfig.pyimportして、これだけ。思ったよりだいぶ手軽です。Twitterにアクセスして実際にツイートされたことを確認しましょう!

また、せっかくなのでレスポンスをjsonライブラリでロードして吐き出してみます。

{'created_at': 'Wed Dec 06 14:00:00 +0000 2017', 'id': 9384076800000000, 'id_str': '9384076800000000', 'text': 'プログラム
にツイートさせてみるテスト', 'truncated': False, 

...(中略)...

'retweeted': False, 'lang': 'ja'}

思ったよりいろんな属性があることがわかりますね。深掘りは公式のリファレンスにて。

YouTubeから動画のURLを拾ってくる

続いて、YouTubeから動画を探してくるパートです。

Webクローリング

この分野では、「クローリング」や「スクレイピング」と言った言葉が有名です。

クローリングとスクレイピング

クローリングはウェブサイトからHTMLや任意の情報を取得する技術・行為で、 スクレイピングは取得したHTMLから任意の情報を抽出する技術・行為のことです。

たとえば、あるブログの特徴を分析したい場合を考えてみましょう。
この場合、作業の流れは

  1. そのブログサイトをクローリングする。
  2. クローリングしたHTMLからタイトルや記事の本文をスクレイピングする。
  3. スクレイピングしたタイトルや記事の本文をテキスト解析する。

というようになります。

今回は、YouTubeをクローリングし、その中から動画のURLをスクレイピングすることになりますね。

Webページのクローリングとスクレイピングを行う際は、それがどんな目的のものであれ、HTMLを構文解析することが必須となります。Pythonでは、これを強力に支援するBeautifulSoupと言うライブラリがあります。執筆時点で最新のbeautifulsoup4を導入してみます。

pip3 install beautifulsoup4

早速使ってみましょう。Qiitaのトップページから<a>タグを探し、その中に含まれるhref属性の値を取得してみます。

crawling.py
import requests
from bs4 import BeautifulSoup

URL = "https://qiita.com/"
resp = requests.get(URL)

soup = BeautifulSoup(resp.text)

# aタグの取得
a_tags = soup.find_all("a", href=True)
for a in a_tags:
    print(a["href"])

結果

/about
https://qiita.com/sessions/forgot_password
https://oauth.qiita.com/auth/github?callback_action=login_or_signup
https://oauth.qiita.com/auth/twitter?callback_action=login_or_signup

・・・(中略)

https://qiita.com/api/v2/docs
https://teams.qiita.com/
http://kobito.qiita.com

いい感じです!

HTMLパーサーについて

さて、先のコードを実際に試すと、HTMLパーサーが明示されていないために警告が出ます。これは実際の解析時に使われるパーサーが実行時の環境に依存するためです。異なる環境下で同じ振る舞いを期待するには、使用するHTMLパーサーを明示してあげる必要があります。

デフォルトではhtml.parserが使われますが、lxmlかhtml5libを導入してこちらを明示してあげるのが無難なようです。このあたりの情報は下記の記事をだいぶ参考にさせていただきました。パーサーの選択だけでなくスクレイピング全般の情報が非常によくまとまっているエントリなので、オススメです。

PythonでWebスクレイピングする時の知見をまとめておく – Stimulator

パーサの良し悪しを考えるとlxmlでチャレンジしてダメならhtml5libを試すのが良さそう。

今回はこの1文に愚直に従ってみます。事前にpip3 install lxml html5libも忘れずに。


import requests
from bs4 import BeautifulSoup

URL = "https://qiita.com/"
resp = requests.get(URL)

+try:
+  soup = BeautifulSoup(resp.text, "lxml")
+except:
+  soup = BeautifulSoup(resp.text, "html5lib")
-soup = BeautifulSoup(resp.text)

# ...以下は先ほどと同様

Crawlerクラスを作ってみる

すでにPythonでオブジェクト指向な書き方を経験している方はこの辺りを飛ばしていただいて構いません。せっかくHTMLを解析してくれるコードができたので、クラスとして書き換えてみます。

crawler.py
import requests
from bs4 import BeautifulSoup

class Crawler:
    def hrefs_from(self, URL):
        a_tags = self.soup_from(URL).find_all("a", href=True)
        return set(map(lambda a:a["href"], a_tags))

    def soup_from(self, URL):
        res_text = requests.get(URL).text
        try:
            return BeautifulSoup(res_text, "lxml")
        except:
            return BeautifulSoup(res_text, "html5lib")

個人的にはインスタンスメソッドの第1引数が常にselfでなければならないのは書く量が増えるので少しもどかしいですね。ハマりポイントにもなりかねない…。

ちなみに、ここではラムダ式を使用し、hrefs_fromメソッドの戻り値の型をsetにしてみました。これは、今回のユースケースを鑑みてリンク先URLの重複を排除した方が便利と判断したためです。出現頻度など解析したい場合はまた改めて設計を考える必要があるでしょう。

継承と、YouTubeへのアクセス

YouTubeをクローリングするにあたって、「検索文字列を与えたら検索結果のページをクローリングし、動画を探してくる」などの機能があると便利そうです。先ほどのクラスを継承して、実装してみます。

tube_crawler.py
import random
import re
from crawler import Crawler

class TubeCrawler(Crawler):

    URLBase = "https://www.youtube.com"

    def hrefs_from_query(self, key_phrase):
        """
        検索文字列を与えると検索結果ページに含まれるhref属性の値を全て返す
        """
        return super().hrefs_from(self.URLBase + 
            "/results?search_query=" + key_phrase.replace(" ", "+"))



    def movies_from_query(self, key_phrase, max_count = 10):
        """
        検索文字列を与えると検索結果ページに含まれる動画のビデオIDを返す
        """
        return self.__select_movies(self.hrefs_from_query(key_phrase), max_count)



    def __select_movies(self, hrefs, max_count):
        """
        privateメソッド。href属性の値のsetからビデオIDのみを返す
        """
        filtered = [ re.sub( "^.*/watch?v=", "", re.sub( "&(list|index)=.*$", "", href )) 
            for href in hrefs if "/watch?v=" in href ]
        return filtered[:min(max_count, len(filtered))]



    def choose(self, movie_ids, prefix = "https://youtu.be/"):
        """
        渡した文字列のリスト(ビデオIDのリストを想定)から1つを選び、prefixをつけて返す
        """
        return prefix + random.choice(movie_ids)

文法的には継承とprivateメソッドの書き方あたりが新しい話題となります。この記事の主題ではないので特段の説明は省きます。

実際に試すとわかるのですが、検索結果のページにノイズとなるリンクが多いばかりか、再生リストへのリンクなど紛らわしいものも多く、その辺を適切に弾いていくのに手こずりました。おかげでfilter関数や正規表現に少し強くなれた気がします。

正規表現についてはこちらの記事をだいぶ参考にしました。

Pythonの正規表現の基本的な使い方

繋げてみる

準備が整ったので検索->ツイートの流れを試してみます。

main.py
#!/usr/bin/env python3
# -*- coding:utf-8 -*-

from tube_crawler import TubeCrawler
from tweeter import Tweeter
import config

def main():
    t = TubeCrawler()
    movies = t.movies_from_query("Hybrid Rudiments")
    chosen = t.choose(movies)

    # ツイートする部分をクラス化したもの
    tw = Tweeter()
    tw.reply(config.REPLY_TO, chosen)

if __name__ == '__main__':
    main()

エントリーポイントとなる関数が必要かなー、と思ったので何気なく(そう、本当に何気なく。これで良いと思っていたんですLambdaを使うまでは…)main関数を作成。

直接./main.pyでも呼べるようにこの辺からShebangを記述し始めました。また、末尾はファイル名で直接実行した場合にmain()を呼ぶためのおまじない。Rubyにも似たやつがありますね。あとはターミナルから呼んで動作確認するだけです。

$ ./main.py

実行したところ問題なく動きそうだったので、次回はAWS Lambdaに載せていきます。それなりの尺となったのでこのページはここまでです。お読みいただきありがとうございました。

リンク

続きを読む

中途入社のAWSエンジニアが、稼働中サービスの運用状況をキャッチアップするために意識すること

Classiアドベントカレンダー11日目です。
今回は、AWSエンジニアが稼働中のAWSの管理アカウントを渡されて、ビクビクしながらキャッチアップを行っていったときのメモになります。

1.TL;DR

AWSアカウントのログイン前に準備できることもあるし、AWSアカウントにログインしてわかることもあるし、サーバーにログインしてわかることもある。それぞれのレイヤーでどういったことを確認するかを意識しながらキャッチアップを進めていきましょう。

2.AWSアカウントログイン前の事前準備

pre_aws.png

サービスが稼働しているのであれば、AWSアカウントにログインせずとも、たくさんの情報をキャッチアップすることができます。1日目から何らかの大きなアウトプットを出さないと解雇するような会社は、(おそらく)存在しない筈です。まずは落ち着きましょう^^;

2-1.ドキュメント読み込み

サービスのインフラにAWSが使われることが多くなったからといって、入社前に経験したAWS運用フローがそのまま活かせる訳ではありません。まずは、前任者や運用中のドキュメント管理ツールの中から、今までどのような運用を行っていたかを確認します。
ドキュメントを見たときに意識する観点としては、

  • フロー型:時間による鮮度の劣化があるドキュメント
  • ストック型:システム仕様など、メンテナンスが求められるドキュメント

どちらの情報であるかを意識して読むことが重要です。
フロー型の情報は、障害などで一時的な対応用にメモっていることもあり、運用の中で解決済みのことがあります。そのため、ストック型のドキュメントを中心に見ることが素早いキャッチアップになると思います。
とはいえ、ドキュメントの全てがメンテナンスされている会社というのは稀だと思いますので、各種ドキュメントを見ながら、仮説程度に自分なりのシステム構成図などを書いてみましょう。
要件定義書や各種構成図の変更履歴、課題管理表、リスクコントロール表といったドキュメント類があるのであれば、目を通しておきましょう。

2-2.運用フローを観察する

サービス側のドキュメントについては、まだ文書化されてることが多いですが、運用系ツールに関しては、ドキュメント化されていないことがあります。今の開発スタイルであれば、何らかのチャットツール(Slack,ChatWork,HipChat)上で、

  • デプロイ
  • 各種の通知
  • 運用Bot

の運用といったことが行われていると思います。また、チャットだけでなく、メールでの運用フローも存在しているかもしれません。
こうした運用系ツールの存在は、今後自分がリファクタするときに、「必須要件ではないが、重宝している」ということで、「リファクタ後にも、あの機能を実装して欲しい」といった声が社内から上がると思います。
そのため、このような運用フローがどこで実装されているかを見極めることが重要になってきます。

2-3.インフラ部分のコード読み

「俺はフルスタックエンジニアだ!」という強い意思がある方は、この時点で稼働中のアプリ側のコードまで読み込んでいただければよいですが、まずは入社前に期待されたであろう、インフラまわりのコード化部分を把握しておきましょう。どのみち、いずれはメンテナンスを任されたり、質問されることが増えてきますので、自分のメンテナンスする部分を優先的に確認しておきましょう。
実サーバーの運用はAWSに任せているので、ここで意識しておくことは、

  • Infrastructure Orchestration Tools:Terraform, CloudFormationなど
  • Server Configuration Tools:Chef,Ansible,Itamaeなど

あたりのコードがgithubなどに保存されているからといって、メンテナンスされていない可能性もあります。
コードの設計方針などを確認するのは当然必要なのですが、コードの変更履歴が年単位で放置されていないかどうかも見ておきましょう。特に、AWS関連のコードについては、担当する人がアプリ側よりも少ないので、構築当初のコードのままなのか、運用されているコードなのかはPRなどで確認しておいた方がよいです。

3.AWSのアカウント内を調査する

aws_kansatsu.png

実際にAWSアカウントにログインしたり、APIで各種設定を確認していきます。Web系サービスであれば、TCP/IPモデルやC/Sモデルを意識しながら、下層レイヤー回りから調査していき、ネットワークがどうせ設定されているかを確認していきます。
おそらく、ここで多くの疑問(場合によっては、絶望)が生まれる段階です。「あれ?ドキュメントにこう記述されているけど、設定上は違うような…」という沼にハマることがあるでしょう。負けないでください、一人で抱え込まずに闇を共有できる仲間を見つけましょう。

3-1.外部システム連携の確認

関連するAWSサービス

VPC関連のサービスを中心に、自AWSアカウント以外の連携がないかの確認を行います。

関連しやすいAWSサービス例)

  • DirectConnect
  • NAT Gateway
  • Peering Connection
  • Customer Gateways
  • Virtual Private Gateways
  • VPN Connections
  • NetWorkACL
  • SecurityGroup
  • EIP

などに、何らかのインスタンスが稼働していて、productionやhonbanなどの文言がついたものがあれば、それはドキュメント上には存在しないが、サービス上何らかの理由で稼働しているものである可能性があります。
自社のサービスがAWSアカウント内だけで完結しているのであればよいのですが、誤ってここのインスタンスなどを削除すると、場合によってはシステム復旧できないぐらいの痛手になるので、慎重に確認していきましょう。
特に、SecurityGroupは、最近でこそInboundルールにDescriptionをつけられるようになりましたが、数年運用されているシステムには、何で利用しているIPアドレスなのかがわからないことがあるので、設定確認中に不明なIPアドレスを見つけたら社内で有識者に聞いてみましょう。

3-2.システム導線の確認

関連するAWSサービス

インスタンス障害があるとユーザー影響がありそうな、システム導線を追っていきます。

関連しやすいAWSサービス例)

  • ELB(CLB,ALB,NLB)
  • CloudFront
  • EC2
  • ElasticCache(redis,memcached)
  • RDS(Aurora,MySQL,PostgreSQL,SQLServer)
  • ElasticSearch
  • DynamoDB
  • S3

各種のインスタンスサイズが適切かを確認するのはもちろんですが、DB関連についてはバックアップ関連の設定がちゃんと行われているかどうかも確認しておきましょう。バックアップウィンドウの世代数やメンテナンスウィンドウの時間が営業時間内になっているとかは、結構ありがちな設定漏れケースになります。パラメータストアの設定については、本番で稼働している設定が正義なので、設計と違う場合は、社内で経緯を追ってみましょう。

3-3.運用導線の確認

関連するAWSサービス

直接のユーザー影響はないものの、バッチ系およびログインやログ連携など、システム運用で必要な運用導線を追っていきます。

関連しやすいAWSサービス例)

  • EC2
  • Lambda
  • ElasticSearch(& kibana)
  • IAM
  • CloudTrail
  • AWS Config
  • CloudWatch Logs
  • S3
  • Glacier

24224というポート開放を見れば、そのシステムはfluentd関連のフローがあるのはほぼ確定なので、ログの発生から可視化ツールおよびバックアップのフローまで追っていきましょう。また、バッチ系のEC2に関しては、最近のAWSだと、FargateやECS、Lambdaなどで定期バッチを行うことも可能なので、単一障害点をなくすことができないか、今後の計画のために、バッチ系が整理されているドキュメントなどを探してみましょう。

4.サーバー内の設定を確認する

server_chosa.png

最近だと、Server Configuration Toolsが大分普及していたり、コンテナ系の運用が発達してきているので、このあたりのキャッチアップ期間が少なくなるのかなと思います。とはいえ、SSH接続を頻繁に行うサーバーや起動時間が長いサーバーだと、コードの設定と異なる部分が出てきていることがあるかもしれません。
OSの設定やミドルウェアのバージョンなど、SSH接続すると確認した方がよいところは多々ありますが、Server Configuration Toolsの設定と異なっていたり、運用中のアラート設定などで差異がでやすそうな部分を以下に記載します。

4-1.各種メトリクス確認

メモリやプロセスの状況は、通常CloudWatchだけではわからないので、MackerelやZABBIXなどの監視ツールエージェントを入れているのであれば、各サーバーのメトリクスを確認しておきましょう。

4-2.稼働プロセスの確認

pstreeなどで、稼働しているプロセスを確認します。SSH接続が禁止されているのであれば、AWSのSSMエージェントなりで確認できないかを検討してみましょう。設計上のソフトウェアスタックに存在しないプロセスが常駐している場合は、何のエージェントが動いているかを追っておきましょう。

4-3.不要なファイルが出力されていないかの確認

ログレベルがデバッグのままだったり、ログファイルのローテートがなされていない場合があり、アラートは上がっていないけど、サーバー内のリソースを侵食しているときがあります。また、生成されるログファイルが小さすぎると、ディスクに余裕がありそうに見えても、inodeが先に枯渇するときもあります。lsofdf -iなどを可視化するなどして、サーバー内のディスク状況を確認しておきましょう。

4-4.同期処理と非同期処理のプロセス確認

同期処理のプロセスは意識しやすいので、監視対象に入っている可能性が高いです。ただ、非同期系プロセス(Rubyだとsidekiq,Pythonだとcelery,PHPだとphp-resqueなど)が監視対象に入っていないこともあるので、どのサーバーで非同期処理が行われているかを把握しておきましょう。

5.まとめ

AWSや他のパブリッククラウドが全盛になった今でも、3層アーキテクチャのシステム構成やOSI7階層などのレイヤーを意識しながらキャッチアップを行うと、システムを俯瞰しながらキャッチアップを進めることができるのではないかなと思います。とはいえ、前任者がコード化しきれなかった部分もある筈なので、そこは社内で過去経緯を知っている人に笑顔で質問をしてみましょう。技術が発達しても、人に蓄積されるノウハウはまだまだ多いので…
AWSエンジニアが転職する際などのご参考になれば。

続きを読む

CodeBuildの実行結果をslackに通知する

はじめに

Globis Advent Calendar10日目は、弊社のエンジニアチームを支えるインフラ技術をお伝えいたします。
開発スピードをさらに促進するために、Blue Green Deployment, Infrastrucure as Code, ChatOps など新しい運用思想を積極的に取り入れて、手動でのオペレーションを極力なくしています。
今回は、弊社で実運用しているChatOpsの一例として、CodeBuildの実行結果をslackに通知する方法を、可能な範囲で具体的にお伝えいたします。

設定方法

AWS Lambdaの設定

コード

import os
import json
import urllib.request

URL = os.environ['WEBHOOK_URL']


def send_slack(obj):  ## slackに通知
    method = "POST"
    headers = {"Content-Type" : "application/json"}
    js = json.dumps(obj).encode("utf-8")
    request = urllib.request.Request(URL, data=js, method=method, headers=headers)
    with urllib.request.urlopen(request) as response:
        response_body = response.read().decode("utf-8")


def check_status(status):
    fail = False
    if status == 'IN_PROGRESS':
        color = '#008000'  ## green
    elif status == 'SUCCEEDED':
        color = '#00BFFF'  ## deepskyblue
    else:
        color = '#FF00FF'  ## magenta
        fail = True
    return color, fail


def parse_event(event):  ## cloudwatch event から渡された event(dict型)をパース、slackに渡すobjectを生成。
    detail = event['detail']
    status = detail['build-status']
    initiator = detail['additional-information']['initiator']
    log = detail.get('additional-information', {}).get('logs', {}).get('deep-link', 'Not Exist')
    color, fail = check_status(status)
    fields = [{'title': 'Initiator', 'value': initiator, 'short': False},
              {'title': 'Status', 'value': status, 'short': False},
              {'title': 'LogLink', 'value': log, 'short': False}]
    obj = {'attachments': [{'color': color, 'fields': fields}]}
#    if fail == True:  ## Fail時にチャンネルメンション飛ばしたい時はコメントを外す。
#        obj['attachments'][0]['text'] = '<!channel>'
    return obj


def lambda_handler(event, context):  ## lambdaから最初にcallされる関数。
    obj = parse_event(event)
    send_slack(obj)
    return

環境変数の設定

CloudWatch Event の設定

slackでの通知結果

  • ワンクリックでビルドログを眺めることができます。

運用事例のご紹介

  • AWS Athenaのテーブルにクエリを実行し、中間テーブルを生成するETL処理のバッチ
  • Ruby on Rails アプリケーションデプロイ時の db:migrate 処理
  • 最近はCodeBuildをVPC内で実行できるようになったので、利用できる幅が広がっています!

おわりに

いかがでしたでしょうか。python初心者(僕)でもChatOpsに貢献できます。
グロービスではSREエンジニアを募集しています!
インフラの運用担当だけど手作業を自動化したいと考えている方、開発者だけどインフラも含めてコードで管理したい方、一緒に理想のインフラを作ってみませんか?

続きを読む

【CS Hack 第1弾!!】アプリケーション内でファイルの受け渡しを実装する

はじめに

皆さん、カスタマーサクセスしてますか?価値あるプロダクトを作っていますか?
日々自分たちのカスタマーがより良い活動をできるよう、プロダクトを開発・運用しているかと思いますが、活動の中でのヒューマンエラーを無くす動きも大切です。

カスタマーサクセスの為に、検証作業も含め、頻繁にカスタマーとデータのやり取りをする場合(例: 提案資料、データの受け渡し…)、うっかりミスが致命的な問題につながり兼ねません。
メールやFBで気軽に連絡できる時代において、ヒューマンエラーによる情報漏えいはかなり大きな割合を占めております。

leak07.jpg

引用:情報漏洩の原因を徹底解析!原因と結果から学ぶ意識改革

カスタマーのより良い活動を支援しようと思ったのに・・・、その活動がサービスの停止へと導いてしまった、、、なんて事があったら悲しすぎますね。
そんなミスを防ぐ為に、今日はアプリケーション内でファイルをDLする機構を作ることで、セキュアな環境下でのやり取りを実現してみたいと思います。

イメージ図

スクリーンショット 2017-12-09 12.05.27.png

実装

サーバ側

まずは、AWS側の設定を記述します。

config/initializers/aws_s3.rb
Aws.config.update({
    credentials: Aws::Credentials.new(
      #ACCESS_KEY,
      #SECRET_ACCESS_KEY
    ),
    region: #region情報を
    endpoint: #endpoint情報を
})

# もし参照パスにルールがあるなら記述しておきます
module AwsS3
  BUCKET = 'test'.freeze
  BASIC_URL = "https://s3.console.aws.amazon.com/s3/object/#{BUCKET}".freeze
end

ベースのモデルを作成。

class S3Client
  def initialize(*_)
    @client = Aws::S3::Client.new
  end
end

まずはS3にあるファイルを引っ張ってくる処理。
例えばログインしているユーザに応じて、S3のパスを分けます。

class S3Reader < S3Client
  def initialize(user)
    super
    @user_id  = user.id
  end

  def prefix
    "#{Rails.env}/#{@user_id}/"
  end

  def objects
    contents = @client.list_objects(bucket: AwsS3::BUCKET, prefix: prefix).contents
    contents.map { |content| S3Reader::Object.new(content, prefix) }
  end
end

# objectの加工用
class S3Reader::Object
  attr_reader :key, :name, :modified_at

  def initialize(content, prefix)
    @key = content.key
    @name = content.key.gsub(prefix, '')
    @modified_at = content.last_modified
  end
end

controller側に下記を実装。するとS3からDL可能なObjectが引っ張られてきて表示されます。

class DownloadFilesController < ViewBaseController
  def index
    s3 = S3Reader.new(@current_user)
    @objects = s3.objects.sort_by(&:modified_at)
  end
end

S3にアップロードしたファイルが・・・

スクリーンショット 2017-12-09 12.23.08.png

アプリケーション側でも閲覧可能に。

スクリーンショット 2017-12-09 12.23.50.png

引っ張るところまで出来れば次はクリック後、DLできるようにします。
DLの方法としては、一定時間だけオブジェクトにアクセスできるURLを作成し、send_dateをすることで、ブラウザ側にDLされるようにします。

参照)
一定時間だけS3のオブジェクトにアクセスできるURLを生成する

class S3Downloader < S3Client
  # 一時的なURLは120秒に設定
  def download_url(key)
    Aws::S3::Presigner.new(client: @client).presigned_url(
      :get_object, bucket: AwsS3::BUCKET, key: key, expires_in: 120
    )
  end

  def content(key)
    S3Downloader::Object.new(open(download_url(key)), key)
  end
end

class S3Downloader::Object
  attr_reader :content, :key

  def initialize(content, key)
    @content = content
    @key = key
  end

  def name
    @key.split('/').present? ? @key.split('/')[-1] : 'download'
  end

  def read
    @content.read
  end

  # ファイルの種類によってはsend_date時に上手くcontent_typeが出せない場合があるので、オーバーライドしておく
  def type
    @content.content_type
  end
end

後は、リストをクリックした際のアクションをController側に実装。

class DownloadFilesController < ViewBaseController
  def index
    s3 = S3Reader.new(@current_user)
    @objects = s3.objects.sort_by(&:modified_at)
  end

  def download
    downloader = S3Downloader.new
    content = downloader.content(params[:key])
    send_data content.read, filename: content.name, type: content.type
  end
end

これらを実施し、クリックすると・・・

ダウンロード.png

無事DLができました。

最後に

今回は第1弾ということで、まずはファイルをDLできる機構を作ってみました。
ログインすることでDLできるので、カスタマーには、『サービス内にアップロードしたので、DLしてくださいね。』と伝えるだけでいけますね。

ヒューマンエラーを出来る限り排除し、価値にフォーカスできる体制をつくるのもプロダクト開発において非常に重要だと感じています!

第二弾はこの機構に自動でファイルをアップロードしていく仕組みを構築します。
これで活用レポートだったり、定期的に送るデータは自動化してしまいましょう。

続きを読む

Ruby Sample for AWS Cloud9

そもそもAWS Cloud9、Ruby on Railsサポートしてないんじゃない?テンプレ消えてない??」 って言われたので方向転換してとりあえずAWS Cloud9上でRubyを動かしてみようと思います。 トライするのはこれ。 docs.aws.amazon.com Step 1: Install Require… 続きを読む

システム障害解析におけるログのあれこれ

この記事は Akatsuki Advent Calendar 2017 の 8 日目の記事です。
7日目: バイナリのビルド作業はそろそろボタンをポチるだけにしようぜ

背景

システムを運用していると、日々アプリケーション・ミドルウェア・インフラのログが蓄積されていきます。これらのログはシステムの障害対応・解析のための貴重な情報源となりますし、そうであることが期待されます。
しかし、これらのログの取り扱いを誤ると誤った障害解析結果を導き出してしまったり、解析にいたずらに時間がかかったり、障害を特定することができなかったりといったことが起こります。
今回はこれらのログを扱う上で注意すべき点とその改善案を紹介をしたいと思います。

前提

私はソーシャルゲームのインフラとサーバサイドアプリケーションを担当しており、下記のサービス・ソフトウェアを利用しています。

  • AWS

    • ELB (Classic)
    • EC2 (AmazonLinux)
    • RDS (MySQL)
    • CloudWatch
  • nginx
  • Ruby on Rails (unicorn)
  • 他 BigQuery, ElasticSearch, Re:dash, Kibana, Mackerel 等

そのタイムスタンプ、いつのもの?

通常、ログデータにはタイムスタンプが付けられていますが、このタイムスタンプは一体「いつ」の時刻を記録したものなのでしょうか。

ほとんどの場合、対象ソフトウェアが処理を開始した時間が記録されるのですが、実は例外もあります。
私の所属しているプロジェクトで使っているソフトウェアの中では nginx がこれに該当します。
nginx では「処理が完了した時刻 (= レスポンスを返した時刻)」が記録されます。

システムが正常に稼働している限りこれらの違いを気にすることは少ないと思いますが、障害解析時はその限りではありません。
各アプリケーション・ミドルウェア・インフラのログを少なくとも秒単位であわせ解析する必要があるため、各タイムスタンプが「いつ」の時刻を記録したものなのか把握していないと、誤った障害解析結果を導きかねません。
特にタイムアウト処理が絡んだ場合、レスポンスを返した時刻はリクエストを受けた時刻と大きな差が発生します。

何気なく記録されているログのタイムスタンプにも罠があります。ご注意ください。

必要な情報出してる?

前項で「nginx のタイムスタンプはレスポンスを返した時刻」と説明しましたが、ではいったいどうやって「処理を開始した時刻(= リクエストを受けた時刻)」を出力するのでしょうか。
実は nginx のデフォルトの設定ではこれができません。

nginx で「リクエストを受けた時刻」を記録する方法はいくつかあるようなのですが、最も簡単なのは「レスポンスを返すまでにかかった時間」を一緒に記録することです。ログの解析時にそれらの値を使って「リクエストを受けた時刻」を求めることができます。ログ解析時に前処理は必要になりますが、それを低コストで行える環境もあわせて用意しておくとよいです(後述)

(※ 最も良いのはもちろん予めログにリクエストを受けた時刻を記録することですが、ログ収集時に計算させる方法も可能です)

死ぬ前の情報は残した?

エラー時の情報は貴重です。この情報の有無で障害解析のスピードと精度は数倍変わってくるでしょう。しかし、中にはエラー時の情報を残さずに死んでしまうソフトウェアもあります。私のプロジェクトで利用しているもの中では unicorn がこれに該当します。

unicorn はリクエストを処理する worker プロセスと、workerプロセスを管理する masater プロセスから構成されます。
unicorn はタイムアウトの設定を持ち、worker プロセスの処理がこのタイムアウト内に完了しない場合、master プロセスは workerプロセスに対して即座に SIGKILL を送りつけます。その結果、「タイムアウト内に完了しなかった処理」がログに記録されないという事態が発生します。

これに対する改善策はいくつかあります。

  1. より上位にあるソフトウェアで記録を残す

    • 具体的には ELB や nginx でログを残す。当該リクエストを処理したホストの情報、エラーコード、エンドポイント等を記録する。
  2. SIGKILL の代わりにトラップ可能な SIGINT 等を利用し、そこで Rails.logger.flush させる
  3. Rails の ActionController の around_action で “ソフトな” タイムアウトを設定する
around_action :global_timeout

def global_timeout
  Timeout.timeout(TIMEOUT_SEC) do
    yield
  end
end

私の所属するプロジェクトで実際に適用されているのはまだ1のみですが、2,3の手法も評価していく予定です。

ログデータ膨大すぎるんだけど…

正確な障害解析には普段から多くの情報を取得しておく必要がありますが、その結果、解析に時間がかかったり、そもそも普通のマシンでは処理ができなかったりといったことが発生します。
私の所属するプロジェクトでは、ログをBigQueryとElasticSearchに格納し、Re:dashやKibanaで可視化できる仕組みを構築しています。

普段はマクロなインフラメトリクス(や、売上情報等)を表示するために使っていますが、障害解析時はクエリを書くことで簡単に情報を絞り込んだり、可視化することができ、便利です。nginxのタイムスタンプ問題もクエリを書くことで簡単に解決できます。

(※ すべてBigQuery+Re:dash に統一化したいなぁ)

さいごに

障害解析は「より少ない情報、より少ない時間で原因を特定する」エクストリームスポーツではありませんし、そうあってはなりません。
エンジニアにエスパーの力を求めるのは間違っています。
また、「システムの癖を知った、長年の経験のあるエンジニアにしかできない作業」であってもなりません。

障害解析のために十分な情報を集めることや、スピーディに解析できる環境を用意することは言うほど簡単ではありませんし、コストもかかりますが、安定したサービスを提供するには必要不可欠なものです。

堅牢なシステムの構築は1日にして成らず、頑張っていきましょう。

続きを読む

api-gateway+lambdaからlambda・AWS batchを非同期でキックしてみた記事

最近はやりのchatopsをrubyで書いていたのですが、botが死ぬたびに原因調査を行うのがめんどくさくて、slackでメッセージを受け取るところと内部の処理を分離しました。
分離した処理はjenkinsのapiを使って処理していたのですが、せっかくなのでAWSに全てあげてしまうことにしました。
そこでlambdaからLambda・AWS Batchを非同期でキックした時のことを書こうと思います。

はじめに

本稿はslackにbotを多数動かす想定で設計しているので、個人でちゃちゃっと作りたい人はbotで完結したほうがいいと思います。
またlambdaの非同期でキックするところを中心に書くのでbot部分や周辺は割愛します。
(litaをECS上で動作 passなどはkmsで管理)

想定ユーザー

  • データ基盤の機能などの一部を社内ユーザーに解放したい
  • いくつかのbotを動かしたい
  • terraformで環境構築している
  • サーバーレスが好き

全体的な構成

以下構成図になります
– メッセージは全てlita一台で受け取ってApi-GatewayにPOST
– LambdaからLambdaかAWS batchを非同期で起動

スクリーンショット 2017-12-05 12.42.21.png

post.json
{
    "method":{
         "channel_name" : "****",
         "req_usr" : "***",
         "magic_words" : "特定の情報を渡す場所(ddl取る場合はスキーマとテーブル名など)"
    }
}

api-gatewayの部分

以下の記事にlambdaをキックするところまでを書いたので割愛します。
API Gatewayを叩いてLambdaからRedshiftにSQLを投げる(ついでにslackにsnippet通知)

lambda(kicker)の部分

  • post内容から呼び出したい関数名を取り出す
  • 関数名のlambdaがあればそれをキック
  • 関数名に対応するAWS batchがあればそれをキック
kicker.py
import simplejson as json
import boto3
import logging


logger = logging.getLogger()
logger.setLevel(logging.INFO)

def handler(event, context):
    event_dict = json.loads(event["body"])
    function_name = list(event_dict.keys())[0]
    if function_name == "kicker":
        status_code = 404
        body_text = "Kicker loop error : " + function_name
        logger.error(body_text)
    elif kick_lambda(function_name, event):
        status_code = 201
        body_text = "kick lambda : " + function_name
        logger.info(body_text)
    elif kick_batch(function_name, event):
        status_code = 201
        body_text = "kick batch : " + function_name
        logger.info(body_text)
    else:
        status_code = 404
        body_text = "No Method Error : " + function_name
        logger.error(body_text)

    responseObj = {
        "statusCode": status_code,
        "body": body_text
    }
    return responseObj


def kick_lambda(function_name, event):
    try:
        clientLambda = boto3.client("lambda")
        res = clientLambda.invoke(
            FunctionName=function_name,
            InvocationType="Event",
            Payload=json.dumps(event)
        )
        return True
    except:
        return False


def kick_batch(function_name, event):
    try:
        clientBatch = boto3.client("batch")
        res = clientBatch.submit_job(
            jobName=function_name,
            jobQueue=function_name + "_queue",
            jobDefinition=function_name + "_job_definition",
            parameters={
                'event' : json.dumps(event)
            }
        )
        return True
    except:
        return False
    return False


if __name__ == '__main__':
    handler('', '')

非同期で呼び出されたlambda(右側)の部分

eventをそのまま渡しているので以下の記事のように色々作ればOK
API Gatewayを叩いてLambdaからRedshiftにSQLを投げる(ついでにslackにsnippet通知)

非同期で呼び出されたAWS batch(右側)の部分

kickerで定義された名前に対応するものを作成しておけばOK
例のものであれば
– aws_batch_job_queueのname属性が{function_name}_queue
– aws_batch_job_definitionのname属性が{function_name}_job_definition

kicker.pyの一部
def kick_batch(function_name, event):
    try:
        clientBatch = boto3.client("batch")
        res = clientBatch.submit_job(
            jobName=function_name,
            jobQueue=function_name + "_queue",
            jobDefinition=function_name + "_job_definition",
            parameters={
                'event' : json.dumps(event)
            }
        )
        return True
    except:
        return False
    return False

終わりに

全体的にスッキリして機能追加などがかなり簡単になりました。
あとはterraform・lambdaのzipファイル・AWS batchのコンテナイメージをどうやって運用するかが考え所な感じがしました

続きを読む