AWS Lambda から Incoming Webhooks を使って Slack に投稿する

概要

slack では webhook 用のURL を通し、メッセージを投稿することができます。
ここでは、AWS Lambda を invoke -> 内部でシェルスクリプトを実行 -> Incoming Webhooks に POST という手順を試してみたので、その方法を書いておきます。

Incoming Webhooks

Slack Apps の Incoming Webhooks のページから、web hook 用の URL を取得します。

https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX
↑こんな感じのURL

この URL にメッセージをPOSTすることで、設定したチャンネルにそのメッセージが投稿されます。
メッセージだけでなく、投稿者の名前、アイコンなども設定できます。

例えば、

curl -X POST https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXXXXXXXX -d '{"text": "TEST!!!!", "username": "dango", "icon_emoji": ":dango:"}'

こんな感じのPOSTをすると、

20170222-183306.png

こんな感じの投稿になります。

引数にテキスト、ユーザー名、アイコンを受け取ってポストできるような適当スクリプトを作っておきます。

post.sh
#!/bin/sh

TEXT=$1
USERNAME=$2
ICON=$3

BODY="{'text': '${TEXT}', 'username': '${USERNAME}', 'icon_emoji': '${ICON}'}"

curl -X POST https://hooks.slack.com/services/XXXXXXXXX/XXXXXXXXX/XXXXXXXXXXXXXXXXXX -d "${BODY}" 

Lambda

次は上記を Lambda を通してやってみます。
Lambda 関数の作成時に、ランタイムは node.js を選択、設計図には node-exec を選択します。

このLambda 関数から先ほど作ったシェルスクリプトを実行し、Slack への投稿を行います。

Lambda 関数は実行時に json 形式のペイロードを受け取り、その内容を関数に渡すことができます。

event.json
{
    "text": "Post from lambda!!",
    "username": "test",
    "icon": ":golfer:"
}

↑こんなペイロードを作っておきます。
これを渡してスクリプトを実行できるような関数を作成します。

index.js
var exec = require('child_process').exec;

exports.handler = function(event, context) {
  command = '/bin/bash ./post.sh ' + event['text'] + ' ' + event['username'] + ' ' + event['icon'];
  child = exec(command, function(error) {
    // Resolve with result of process
    context.done(error, 'Process complete!');
  });

  // Log process stdout and stderr
  child.stdout.on('data', console.log);
  child.stderr.on('data', console.error);
};

この index.js、post.sh を zip 形式にまとめてlambda へとアップロードし、Lambda 関数を作成します。
後の設定は適当に。。。

作成した関数に、アクション>テストイベントの設定から、先ほど作った json を設定し、テストを実行します。

20170218-124510.png

20170222-183909.png

成功のログとともに、Slack へと投稿されていることが確認できました!

感想

Lambda を使うのはこれが初めてなので、細かい設定はよくわからず作ってますが、なんとか成功しました。
(基本的に参考に記したリンク先のやり方を真似させていただきました。)
Lambda を使ってみたかっただけなので Lambda である必要はないと思いますが、割と簡単に設定できました。
Incoming Webhooks ではいろんな形式の投稿ができるっぽいので時間があれば試してみたい。。。

参考

続きを読む

CloudWatch専用の通知bot、marbotを使ってみた

CloudWatchのslack連携というとLambdaでpostが鉄板ですが、より良くしようとするとLambdaをメンテしていく必要があります。

メッセージを視覚的に良くするならslackのattachmentsなどで工夫が必要だし、用途別に通知を分けるとかなると、Lambdaに分岐書いたりとかLambdaをコピペしたりなど。。。

たまたま、marbotというCloudWatchの専用botを見つけ、試しに導入してみたら結構イイ感じだったのでその紹介です。

marbotについて

公式サイト – marbot

CloudWatch専用の通知botで、アラートのslack通知や簡易的なエスカレーションがあります。
今のところは有料プランはなく、費用ゼロで使用できます。

AWSのChatbotコンテストで入賞してたみたいです。
https://aws.amazon.com/jp/blogs/news/congratulations-to-the-winners-of-the-serverless-chatbot-competition/

marbotの機能

marbotのFeaturesに説明がありますが、ザックリまとめると以下になります。

  • Slackへのアラート通知
  • アラートのエスカレーション
  • アラートの管理(チェック、パス、クローズ)
  • Dailyのアラートサマリ

導入方法

ここではCloudWatchの設定(SNSのtopicも登録済み)は済んでいるという前提で進めます。

  1. marbot# Add to Slackがあるので、認可してSlackに追加
  2. slackのchannelにmarbotを追加
    • 追加するとSNSで設定するEndpointが表示されます
    • 指定したchannelに参加しているメンバーがmarbot通知の対象になり、これについては後述
  3. CloudWatchのnotificationで指定しているSNSにsubscriptionを追加
    • HTTPS形式を選び、2.のEndpointを設定
  4. slack上でYou completed the SNS topic subscriptionとなっていれば、SNS連携は完了

マスクしてますが、ここにEndpointが表示されます。

invite-image

あとはCloudWatchのイベントが発火すると、marbotから通知がslackへ送信されます。

marbotのアラート通知

marbotからのアラート通知ですが、基本はchannelメンバーへのダイレクトメッセージです。
ここではアラート発生の流れと、アラートへのアクション、エスカレーションについて説明します。

アラートへのアクション

marbotからのアラート通知には、Acknowledge,Pass,Closeいずれかのアクションが出来るようになっています。
実際のインシデント想定をした場合、こんな感じでしょう。

  • Acknowledge : アラートの認知、障害内容のチェック開始など
  • Pass : 誰かにパス
  • Close : 終了

アラート発生のフロー

実際にやってみたところ、以下の流れになりました。

  1. アラート発生 🔔
  2. channelのオンラインメンバーにmarbotからアラート通知のダイレクトメッセージ
    • オンラインメンバーが複数の場合、誰か一人に通知するっぽい
  3. アラートへのアクション
    • Acknowledge => marbotがAcknowledgedと認知
    • Pass => marbotが次のオンラインメンバーへ通知
    • Close => marbotがClosedと認知

marbotのアラート通知はこんな感じになります。

alert-image

エスカレーション

marbotは特定の条件で、エスカレーションとしてchannelへ全体通知します。

  • オンラインメンバーが誰もいなかった
  • 通知を受けたメンバーがアクションを起こさなかった(5分固定っぽい)
  • 全員がPassした

つまり、誰も気づかなかった、誰も行動を起こさなかったらchannelへの全体通知されることになります。

escalate-image

Dailyのアラートサマリ

marbotは一日のアラートを集計し、日々channelへこんな感じのまとめを投稿してくれます。
(これはテストで適当に発生させたアラート)

summary-image

marbotの運用を考えてみる

marbotはslackのchannelをグループ管理としてみなしているようです。
なので、アラートを受けるべきメンバーを集めたchannelにmarbotを入れれば、アラートのコントロールもしやすくなるかと思います。

例えばこんな感じにわけてみるとか。

  • myservice : サービス関係者全員
  • myservice-dev : サービスのエンジニアAll
  • myservice-incident : 通知に必ず対応すべき関係者
    • marbot参加

ちなみにインテグレーション数ですが、channel個別にmarbotを招待できるので登録は1つで済みます。

Architectureについて

この記事に、アーキテクチャの概要が説明されていました。
https://cloudonaut.io/marbot-aws-serverless-chatbot-competition/

  • AWS Lambda and Amazon API Gateway
  • Amazon DynamoDB
  • Amazon Kinesis Streams and Amazon Kinesis Analytics
  • Amazon SQS
  • Amazon SNS and AWS CloudWatch

LambdaとAPI Gateway、DynamoDBはサーバレスの鉄板なので、すぐに想像できましたが、目を引いたのはSQSとKinesisです。

記事を読んで理解できましたが、アラート通知でn分スルーされたらchannelへ通知などを実装するために、SQSをタイマーとして使っているようです。なるほど。

KinesisはDailyサマリーの算出用途ですね。

まとめ

CloudWatchの通知をSlackに連携していない、またはslackに連携しているけどアラートを放置しがちなケースでは、marbotのエスカレーションが効果的になるかも。

続きを読む

DBのint枯渇を目の前にした僕らは

MySQLのint型は符号付きで -2147483647〜2147483647 の範囲をサポートし、レコードを記録する際にこの範囲を超えて記録しようとするともちろんエラーとなります。

これは、長い運用の末にデータが膨大になり、ついにintのサポート範囲が枯渇寸前となった話です。

方針

DBはAWS Auroraを使用しており、アプリケーションはRailsで構築されています。RailsのMigrationはデフォルトでidカラムをAUTO INCREMENTのint型で作成します。1。サービスの特徴としては他のサービスと比較すると高トラフィックに晒されるもので、DBに大量のログを記録する必要がありテーブルによっては1ヶ月で1億レコード以上記録されるものもあります。対処方法を検討し始めた時にはidは既に18億を超えており、やるべきことは対象のテーブルのidカラム、及びそのidを関連として保持しているテーブルのカラムの型をBigintに変換することでした。

ただカラムの型をBigintにするだけであれば通常はALTER TABLEを実行すれば良いだけですが、レコードが数億規模になってくるとALTER TABLEが完了するまでに時間が掛かりすぎ、その間テーブルはロックされてしまうので、事実上サービスが停止してしまいます。

そこで、対象のカラムをBigint化した新しいテーブルを用意し、既存のデータを(リアルタイムに増えるものも含めて)コピーし、入れ替えることでサービスを停止することなくBigint化する方法を模索し始めました。

手段

候補として挙がったのはPERCONA社のpt-online-schema-changeと、
AWS Database Migration Service(以下DMS)の2つでした。DMSを使う多くの目的はオンプレミスに運用しているDBをAWSのクラウド上に移行することだと思いますが、DMSはRDSからRDSへの移行もサポートしており、今回の目的を達成できる可能性があると踏みました。

そこで、今回のケースがDMSの用途としてふさわしいのかAWSのサポートに問い合わせたところ、同一データベース内での使用は検証した限りでは可能なようだが、本来の用途からは逸脱しており、結果想定しない挙動をする恐れがあり推奨しない、という返答をいただきました。

代替案として、RDSのインスタンスごと新たに立ち上げ、そのインスタンス間でDBを丸ごと移行する形にすれば本来の用途と合致するという案内を受けたので、チームで少し議論を交わし、コストは増すがpt-online-schema-changeを使用するよりマネージドなサービスを使うほうが人的コストを抑えられることと、いざというときにAWSのサポートを受けられる方が安心なことにより、今回はDMSでDBごと移行することにしました。

なお、同じくAWSが提供しており、DMSと合わせて使用するスキーマ変換ツールとしてSchema Conversion Toolがありますが、ソースデータベースとしてMySQLを選択した場合は、移行先のRDSのターゲットデータベースとしてPostgreSQLのみが選択可能という仕様により今回は使用しませんでした。そもそもAurora同士の移行であればターゲットデータベースからスキーマをダンプし、カラム型を変更してターゲットデータベース上に作成するだけで充分でした。

検証

まずはStagingのDBで検証を行いました。StagingのDBは7GBほどで、インスタンスは一番小さいdms.t2.micro2、1タスクに全テーブルを指定し、Limited LOBモードで移行したところ、だいたい1時間ほどで完了しました。DMSにはリアルタイムに挿入や変更されたデータに追従してくれるCDCモードがありますが、こちらも正常に動作していました。

DMSのパフォーマンスに強く影響するのはLOBの移行モードで、これをデフォルトのFull LOBサポートモードにしていると倍以上の時間が掛かりました。これはLimited LOBモードを使用することで回避できますが、こちらはMax LOB Sizeを指定する必要があり、これを超えるデータが存在した場合は切り捨てられてしまうので注意が必要です。テーブルのLOB列の量によりますが、多くの場合ではLOBデータが64KBに収まっていれば高速化が望めるそうです。DMSがLOBとみなすMySQLのデータ型はこちらで確認できます。

本番

Stagingで問題なく動作することが確認できたので、本番DBで実施することにしました。本番DBのデータサイズは9TBを超えており、1インスタンスではストレージがパンクする恐れがあったため、c4.4xlargeのインスタンスを3台用意し、それぞれのインスタンスにタスクを分散させ、移行するテーブルの担当を複数に分けました。

稼働してしばらくすると、アプリケーションからの本番DBへの負荷により、DMSのタスクを動かしていると本番DBのCommit Latencyが増大する事象が確認できたため、日中はDMSタスクを止め、移行は夜間から早朝に掛けて実施することにしました。DMSにはそういったスケジューリングの機能はないので、AWS LambdaでDMSタスクを停止、再開するスクリプトを書きました。

DMSタスクを停止させると、全てのデータのロードが完了していないテーブルは再開時に再ロードとなり、つまり移行したデータを破棄して最初からやり直しとなってしまいます。このままではテーブルが巨大だとロードを完了させることは不可能なので、ResumeEnabledオプションをtrueに変更することで中断したポイントから再開できるようにしました。このオプションはマネジメントコンソールからは変更できないため、AWS CLIから create-replication-task でタスクを生成する必要があります。

タスクがデータの移行に失敗するとCloudWatch Logsにログを出力されますが、今回の移行時に下記のようなエラーを確認しました。

into table `db_name`.`table_name` CHARACTER SET UTF8 fields terminated by ',' enclosed by '"' lines terminated by 'n

エラー内容としてはこちらに酷似していましたが、SQL_MODEにANSI_QUOTESは設定しておらず、サポートに問い合わせても結局原因は掴めませんでした。データを確認したところエラーが発生したレコードは移行されておらず、エラーが発生したテーブルだけを別タスクに切り出し、再度移行を行ってもエラーは解消しなかったので、その箇所についてはmysqldumpにより手動で移行しました。

カットオーバー

既存データは移行が完了し、DMSはCDCでリアルタイムに変更されるデータを移行している状態となったところで、新規DBに向き先を変更することにしました。向き先変更と同じタイミングでデータが挿入されidが衝突してしまわないように、新規DBの方はAUTO_INCREMENT値を増加させておきました。新規DBに向けたアプリケーションを別途用意し、動作に問題ないことを確認し、本番環境を新規DBに移行しました。

さいごに

検証期間を含めると一ヶ月以上の作業期間となりましたが、晴れてintの枯渇問題から開放されました。なお移行が完了したしばらくした後に外道父さんがDMSを使わずにリアルタイムの完全コピーを実現しており、もう少し早ければ…!とも思いましたが無事完了できたことに変わりはないので良しとしました。あまり出くわさないケースかとは思いますが、誰かの参考になれば幸いです。

続きを読む

Python(boto3)でS3にデータをファイル保存せず直接アップロードする方法

こんにちは、臼田です。 Pythonを利用してS3にデータをアップロードする際、boto3を利用することになると思いますが、検索するとファイルからアップロードする方法がいっぱい出てきます。 でも、私はスクリプトの中で作成 […] 続きを読む

Firehoseで出力したファイルをAthenaのパーティション形式に変換するLambda

概要

FirehoseはデフォルトでYYYY/MM/DD/HHというプレフィクスを付けてS3にファイルを格納する。
このままでもAthenaでパーティションとして利用することはできるが、ディレクトリが作成されるたびにADD PARTITIONしなければならない。

Amazon Athenaのパーティションを理解する #reinvent

大量データをクエリする場合はググればEMRを使ったカラムナフォーマットへの変換記事がたくさんヒットするけど、ライトにAthenaを使いたい場合に有効。

新しいパーティションを勝手には読み込んでくれないのでクエリ実行前に以下のSQLを実行する必要があります。冪等なので毎回実行すればOKです。

MSCK REPAIR TABLE database_name.table_name;

Lambda Function

Athenaの自動パーティション認識を有効にするために、Hiveパーティションに対応した形式のディレクトリにコピーするLambda関数を作った。
さらにFirehoseのYYYY/MM/DD/HHはUTCのため、JST(+9:00)のYYYY-MM-DD/HHに変更する。

'use strict';
process.env.TZ = 'Asia/Tokyo'; // Timezoneを明示的にJSTに

const aws = require('aws-sdk');
const s3client = new aws.S3({ apiVersion: '2006-03-01' });
const path = require('path');

const toBucket = process.env.TO_BUCKET; // Lambda Functionの環境変数
const toPrefix = process.env.TO_PREFIX; // Lambda Functionの環境変数

function getS3EventKey(s3obj) {
    return decodeURIComponent(s3obj.object.key.replace(/\+/g, ' '));
}

function getCopySource(s3obj) {
    const srcBucket = s3obj.bucket.name;
    const srcKey = getS3EventKey(s3obj);
    return `${srcBucket}/${srcKey}`;
}

function getTodayPartition() {
    const today = new Date();
    const year = today.getFullYear();
    let month = today.getMonth() + 1;
    let day = today.getDate();
    let hour = today.getHours();
    if (month < 10) month = "0" + month;
    if (day < 10) day = "0" + day;
    if (hour < 10) hour = "0" + hour;
    // ここがキモ。dtとhourというパーティションが自動認識される
    return `dt=${year}-${month}-${day}/hour=${hour}`;
}

function getDestKey(s3obj) {
    const srcKey = getS3EventKey(s3obj);
    const filename = path.basename(srcKey);
    const todayPartition = getTodayPartition();
    return `${toPrefix}${todayPartition}/${filename}`;
}

exports.handler = (event, context, callback) => {
    const s3obj = event.Records[0].s3;
    const copySrc = getCopySource(s3obj);
    // ディレクトリは無視
    if (copySrc.endsWith("/")) {
        callback(null, `Skip, because ${copySrc} is directory.`);
        return;
    }
    const destKey = getDestKey(s3obj);
    const params = { CopySource: copySrc, Bucket: toBucket, Key: destKey };
    console.log(`s3://${copySrc} copy to s3://${toBucket}/${destKey}`);
    s3client.copyObject(params, (err, data) => {
        if (err) {
            console.log(err, err.stack);
            callback(err);
        } else {
            console.log(data);
            callback(null, data);
        }
    });
};

Firehoseが出力するS3バケットのCreate*イベントで実行するようにこのLambda関数を作成する。

このコードは東京リージョンには対応していません。

AWSの設定など

Lambdaの実行ロールに以下のポリシーを設定。(Listはいらんかもしれんけど念のため)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "Enable S3 Read Permissions",
      "Effect": "Allow",
      "Action": [
        "s3:List*",
        "s3:Get*"
      ],
      "Resource": [
        "arn:aws:s3:::Firehoseが出力するバケット名",
        "arn:aws:s3:::Firehoseが出力するバケット名/*"
      ]
    },
    {
      "Sid": "Enable S3 Write Permissions",
      "Effect": "Allow",
      "Action": [
        "s3:List*",
        "s3:Put*"
      ],
      "Resource": [
        "arn:aws:s3:::Athenaが参照するバケット名",
        "arn:aws:s3:::Athenaが参照するバケット名/*"
      ]
    }
  ]
}

余談

FirehoseがDynamic Prefix対応してくれればこんなんしなくて済むんだけど。

参考リンク

続きを読む