Kinesis Data Analytics (KDA)はマッピングしたカラムに加えて、Kinesis Data Streams (KDS)に入った時間 APPROXIMATE_ARRIVAL_TIME
とKDAのin-app STREAMに入った時間 ROWTIME
をソースに含める。これらとログのタイムスタンプを合わせることで集計遅延が起きた際にどこが原因になっているかを特定することができる。
Log aggregatorでのタイムスタンプの付加
集計対象のログを集約サーバーを中継して送っている場合、そこでのタイムスタンプを付加しておくとバッファリングによる滞りを検知できる。 fluentdのmonitor_agentの値をDatadogなどに送って監視することもできるが、 集計ウィンドウが短い場合、collection_intervalの関係で、正確な状態を把握しづらいことがある。
fluentdのmonitor_agentのデータをGoでGoogle Stackdriverに送って監視する - sambaiz-net
レコードへの付加にはfluentdのコアに含まれているrecord_transformerを使うこともできるが、性能が良いfluent-plugin-record-modifierという選択肢もある。
fluentdのrecord_transformerでログを加工する - sambaiz-net
<filter test.log>
@type record_modifier
<record>
ts_aggr ${Time.now().strftime('%Y-%m-%dT%H:%M:%S.%L%z')}
</record>
</filter>
また、ログのタイムスタンプをtime_keyとしている場合、keep_time_key true
にするか<inject>
してKDSに送られるようにする。
fluentdのとでtime_formatを指定しなかった場合の挙動と内部処理 - sambaiz-net
TIMESTAMPのパース
タイムスタンプ文字列をTIMESTAMP型のカラムにマッピングすることもできるが、2021-01-01T00:00:00.000+0900
形式の文字列をマッピングしたところ、UTCで18時間後ろにずれた時間になってしまった。
そこで一度CHARにマッピングしてからフォーマットを明示してCHAR_TO_TIMESTAMP(’<format_string>’,“column_name”)することにした。
<format_string>
はJavaのSimpleDateFormatで記述し、この例の場合はyyyy-MM-dd''T''HH:mm:ss.SSSZ
のようになる。
マッピングの変更後にSourcesの方に何も出てこなくなった場合は、クエリの方で問題が起きている可能性があるので、コンソール上でクエリを保存してみてエラーにならないか確認し、必要なら修正する。
クエリ
Kinesis Data AnalyticsのSQL, Lambdaへの出力とCDKによるリソースの作成 - sambaiz-net
各タイムスタンプのMINを取って現在時刻との差を取れば、そのウインドウ集計対象のログのうち最も古いものの各箇所からのレイテンシが出せる。
CREATE OR REPLACE PUMP "TEST_PUMP" AS
INSERT INTO "TEST_STREAM" ("min_ts", "min_ts_kds", "min_ts_kda")
SELECT STREAM
MIN(CHAR_TO_TIMESTAMP('yyyy-MM-dd''T''HH:mm:ss.SSSZ', "ts")) as min_ts,
MIN("APPROXIMATE_ARRIVAL_TIME") as min_ts_kds,
"ROWTIME" as min_ts_kda
FROM "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '10' SECOUND)
メトリクス
関連するKDAのメトリクスとしてストリームからの読み込みのレイテンシであるMillisBehindLatest
がある。これが通常の読み込みの頻度である1000msを超えて上昇し続けている場合、InputBytesのスループットやdestinationへの送信に問題がないかを確認する。なお、この1000msという値はKDSで推奨されている読み込み頻度で、GetRecords
APIの呼び出し上限に当たるのを防いでいる。