KPL(Kinesis Producer Library)とは
Kinesisに送るとき、自動リトライしてくれたり、レコードをまとめてスループットを向上してくれたりするアプリケーション。Protobufを使っている。 普通に送るとどんなに小さくてもシャード*1000レコード/秒しか最大でPUTできないのを、KPLを使ってまとめることで増やすことができる。
fluentdで送る
aws-fluent-plugin-kinesisでkinesis_producer
を指定するとKPLを使って送信する。
<kinesis_producer>
の中にKPLの設定を書くことができる。
<kinesis_producer>
record_max_buffered_time 10
</kinesis_producer>
record_max_bufferd_time はバッファされたレコードが送られるまでの最大時間(ms)。デフォルトは100ms。この時間が経つか、他のリミットに当たったらレコードは送られる。
- AggregationMaxCount: 一つのレコードにまとめる最大レコード数
- AggregationMaxSize: まとめたレコードの最大バイト数
- CollectionMaxCount: PutRecordsで送る最大アイテム数
- CollectionMaxSize: PutRecordsで送るデータ量
CloudWatchに送るmetrics_levelはデフォルトでdetailedになっていて、
コンソールのメトリクスからstream名で検索すると
KinesisProducerLibrary
にUserRecordsPerKinesisRecord
や、UserRecordsDataPut
、BufferingTime
、RequestTime
などいろいろ表示される。
とりあえず試しにこんな設定で送ってみる。
<match hoge.log>
@type kinesis_producer
region ap-northeast-1
stream_name teststream
include_time_key true
flush_interval 1
buffer_chunk_limit 1m
try_flush_interval 0.1
queued_chunk_flush_interval 0.01
num_threads 15
</match>
Lambdaで読む
まとめられたレコードをkinesis-aggregationで分解して読む。 今回はNode.jsでやる。
$ npm install --save aws-kinesis-agg
注意する必要があるのはドキュメントの情報が古くて、 関数の引数が足りないこと。第二引数のcomputeChecksumsが抜けているので気付かないと一つずつずれていくことになる。
'use strict';
const agg = require('aws-kinesis-agg');
exports.handler = (event, context, callback) => {
Promise.all(
event.Records.map(
(record) => deaggregate(record)
)
).then(
(records) => {
// LambdaのNode.jsはまだ4.3なのでSpread operatorが使えない・・・
// const message = `${[].concat(...records).length} came in`;
let sumCount = 0;
records.forEach((r) => sumCount += r.length);
const message = `${records.length} aggregated records and ${sumCount} records come in`;
console.log(message);
callback(null, message);
},
(err) => callback(err)
);
};
function deaggregate(record){
return new Promise((resolve, reject) => {
agg.deaggregateSync(record.kinesis, true, (err, userRecords) => {
if (err) {
reject(err);
} else {
resolve(userRecords);
}
});
});
}
175レコードが10レコードにまとめられた。
10 aggregated records and 175 records come in
参考
Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる | Developers.IO