FluentdとKPL(Kinesis Producer Library)でログをまとめてスループットを稼ぐ

fluentdaws

KPL(Kinesis Producer Library)とは

Developing Amazon Kinesis Streams Producers Using the Amazon Kinesis Producer Library - Amazon Kinesis Streams

Kinesisに送るとき、自動リトライしてくれたり、レコードをまとめてスループットを向上してくれたりするアプリケーション。Protobufを使っている。 普通に送るとどんなに小さくてもシャード*1000レコード/秒しか最大でPUTできないのを、KPLを使ってまとめることで増やすことができる。

fluentdで送る

aws-fluent-plugin-kinesiskinesis_producerを指定するとKPLを使って送信する。

<kinesis_producer>の中にKPLの設定を書くことができる。

<kinesis_producer>
    record_max_buffered_time 10
</kinesis_producer>

record_max_bufferd_time はバッファされたレコードが送られるまでの最大時間(ms)。デフォルトは100ms。この時間が経つか、他のリミットに当たったらレコードは送られる。

CloudWatchに送るmetrics_levelはデフォルトでdetailedになっていて、 コンソールのメトリクスからstream名で検索すると KinesisProducerLibraryUserRecordsPerKinesisRecordや、UserRecordsDataPutBufferingTimeRequestTimeなどいろいろ表示される。

とりあえず試しにこんな設定で送ってみる。

<match hoge.log>
  @type kinesis_producer
  region ap-northeast-1
  stream_name teststream
  include_time_key true

  flush_interval 1
  buffer_chunk_limit 1m
  try_flush_interval 0.1
  queued_chunk_flush_interval 0.01
  num_threads 15
</match>

Lambdaで読む

まとめられたレコードをkinesis-aggregationで分解して読む。 今回はNode.jsでやる。

$ npm install --save aws-kinesis-agg

注意する必要があるのはドキュメントの情報が古くて、 関数の引数が足りないこと。第二引数のcomputeChecksumsが抜けているので気付かないと一つずつずれていくことになる。

'use strict';

const agg = require('aws-kinesis-agg');

exports.handler = (event, context, callback) => {
    Promise.all(
        event.Records.map(
            (record) => deaggregate(record)
        )
    ).then(
        (records) => {
            // LambdaのNode.jsはまだ4.3なのでSpread operatorが使えない・・・
            // const message = `${[].concat(...records).length} came in`; 
            let sumCount = 0;
            records.forEach((r) => sumCount += r.length);
            const message = `${records.length} aggregated records and ${sumCount} records come in`; 
            console.log(message);
            callback(null, message);
        },
        (err) => callback(err)
    );
};

function deaggregate(record){
    return new Promise((resolve, reject) => {
        agg.deaggregateSync(record.kinesis, true, (err, userRecords) => {
            if (err) {
                reject(err);
            } else {
                resolve(userRecords);
            }
        });
    });
}

175レコードが10レコードにまとめられた。

10 aggregated records and 175 records come in

参考

Kinesis Producer Library(KPL)とfluentdとLambdaを連携してKinesisのスループットを上げる | Developers.IO