aws-fluent-plugin-kinesisでKinesis Streamsに送り、Lambdaで読んでS3に保存する。 要するにFirehoseのようなことをやりたいのだけれどTokyoリージョンにまだ来ないので自分でやる。
fluentdで送る
$ td-agent-gem install fluent-plugin-kinesis
try_flush_interval
とqueued_chunk_flush_interval
はドキュメントには載っていないが、
以下のページによるとそれぞれqueueに次のchunkがないときとあるときのflushする間隔。
いずれもデフォルトは1だが、これを減らすことでもっと頻繁に吐き出されるようになるらしい。
Fluentd の out_forward と BufferedOutput
あとシャードに振り分けるためのpartition_key を指定できる。デフォルトはランダム。
<source>
@type tail
path /var/log/td-agent/hoge.log
pos_file /etc/td-agent/log.pos
tag hoge.log
format json
time_key timestamp
# 2017-01-01T01:01:01+0900
time_format %Y-%m-%dT%H:%M:%S%z
</source>
<match hoge.log>
@type kinesis_streams
region ap-northeast-1
stream_name teststream
include_time_key true
flush_interval 1
buffer_chunk_limit 1m
try_flush_interval 0.1
queued_chunk_flush_interval 0.01
num_threads 15
</match>
いくつか送ってみる。
for i in `seq 1 1000`
do
echo '{"hoge": "fuga", "timestamp": "2017-01-01T01:01:01+0900"}' >> /var/log/td-agent/hoge.log
done
kinesisのシャードが足りないと詰まってしまうので注意。
FluentdとKPL(Kinesis Producer Library)でログをまとめてスループットを稼ぐ - sambaiz.net
Lambdaで読む
Lambdaのトリガーの設定でKinesisを選ぶと、バッチサイズや開始位置を設定できる。
コードはこんな感じ。
'use strict';
const zlib = require('zlib');
const aws = require('aws-sdk');
const s3 = new aws.S3({ apiVersion: '2006-03-01' });
const BUCKET_NAME = process.env.BUCKET_NAME; // 環境変数で設定する
exports.handler = (event, context, callback) => {
const data = event.Records.map((record) => new Buffer(record.kinesis.data, 'base64').toString()).join("\n");
const key = new Date().toISOString();
putS3(key, data, true).then(
(data) => callback(null, `Successfully processed ${event.Records.length} records.`),
(err) => callback(err, null)
);
};
function putS3(key, data, gzip){
return new Promise((resolve, reject) => {
const params = {
Bucket: BUCKET_NAME,
Key: key
};
if(gzip){
params.Body = zlib.gzipSync(data);
params.ContentEncoding = "gzip";
}else{
params.Body = data;
}
s3.putObject(params, (err, data) => {
if (err) reject(err);
else resolve(data);
});
});
}
トリガーを有効にするとイベントが発火してS3に保存されるようになった。
ただ、Kinesisをイベントトリガーにして都度出力すると、1ファイルのサイズが非常に小さくなってしまう。 なんとかして都度出力しないようにするか、あるいは時間トリガーで実行するか、いずれにしてもどこまで読んだか記録しておかなくちゃいけないのでちょっと面倒だ。バッファリングしてくれるFirehoseが早く日本にも来て欲しい。