fluentdでKinesis Streamsに送ってLambdaで読んでS3に保存する

awsfluentdetl

aws-fluent-plugin-kinesisでKinesis Streamsに送り、Lambdaで読んでS3に保存する。 要するにFirehoseのようなことをやりたいのだけれどTokyoリージョンにまだ来ないので自分でやる。

fluentdで送る

$ td-agent-gem install fluent-plugin-kinesis

try_flush_intervalqueued_chunk_flush_intervalはドキュメントには載っていないが、 以下のページによるとそれぞれqueueに次のchunkがないときとあるときのflushする間隔。 いずれもデフォルトは1だが、これを減らすことでもっと頻繁に吐き出されるようになるらしい。

Fluentd の out_forward と BufferedOutput

あとシャードに振り分けるためのpartition_key を指定できる。デフォルトはランダム。

<source>
  @type tail
  path /var/log/td-agent/hoge.log
  pos_file /etc/td-agent/log.pos
  tag hoge.log
  format json

  time_key timestamp
  # 2017-01-01T01:01:01+0900
  time_format %Y-%m-%dT%H:%M:%S%z
</source>

<match hoge.log>
  @type kinesis_streams
  region ap-northeast-1
  stream_name teststream
  include_time_key true

  flush_interval 1
  buffer_chunk_limit 1m
  try_flush_interval 0.1
  queued_chunk_flush_interval 0.01
  num_threads 15
</match>

いくつか送ってみる。

for i in `seq 1 1000`
do
  echo '{"hoge": "fuga", "timestamp": "2017-01-01T01:01:01+0900"}' >> /var/log/td-agent/hoge.log
done

kinesisのシャードが足りないと詰まってしまうので注意。

FluentdとKPL(Kinesis Producer Library)でログをまとめてスループットを稼ぐ - sambaiz.net

Lambdaで読む

Lambdaのトリガーの設定でKinesisを選ぶと、バッチサイズや開始位置を設定できる。

トリガーの設定

コードはこんな感じ。

'use strict';

const zlib = require('zlib');
const aws = require('aws-sdk');
const s3 = new aws.S3({ apiVersion: '2006-03-01' });
const BUCKET_NAME = process.env.BUCKET_NAME; // 環境変数で設定する

exports.handler = (event, context, callback) => {

    const data = event.Records.map((record) => new Buffer(record.kinesis.data, 'base64').toString()).join("\n");
    const key = new Date().toISOString();
    
    putS3(key, data, true).then(
        (data) => callback(null, `Successfully processed ${event.Records.length} records.`),
        (err) => callback(err, null)
    );
};

function putS3(key, data, gzip){    
    return new Promise((resolve, reject) => {
        
        const params = {
            Bucket: BUCKET_NAME,
            Key: key
        };

        if(gzip){
            params.Body = zlib.gzipSync(data);
            params.ContentEncoding = "gzip";
        }else{
            params.Body = data;
        }
        
        s3.putObject(params, (err, data) => {
            if (err) reject(err);
            else resolve(data);
        });
    });
}

トリガーを有効にするとイベントが発火してS3に保存されるようになった。

ただ、Kinesisをイベントトリガーにして都度出力すると、1ファイルのサイズが非常に小さくなってしまう。 なんとかして都度出力しないようにするか、あるいは時間トリガーで実行するか、いずれにしてもどこまで読んだか記録しておかなくちゃいけないのでちょっと面倒だ。バッファリングしてくれるFirehoseが早く日本にも来て欲しい。