Kinesis Data Analyticsによる集計遅延箇所の特定

awsmonitoringfluentd

Kinesis Data Analytics (KDA)はマッピングしたカラムに加えて、Kinesis Data Streams (KDS)に入った時間 APPROXIMATE_ARRIVAL_TIME とKDAのin-app STREAMに入った時間 ROWTIME をソースに含める。これらとログのタイムスタンプを合わせることで集計遅延が起きた際にどこが原因になっているかを特定することができる。

Log aggregatorでのタイムスタンプの付加

集計対象のログを集約サーバーを中継して送っている場合、そこでのタイムスタンプを付加しておくとバッファリングによる滞りを検知できる。 fluentdのmonitor_agentの値をDatadogなどに送って監視することもできるが、 集計ウィンドウが短い場合、collection_intervalの関係で、正確な状態を把握しづらいことがある。

fluentdのmonitor_agentのデータをGoでGoogle Stackdriverに送って監視する - sambaiz-net

レコードへの付加にはfluentdのコアに含まれているrecord_transformerを使うこともできるが、性能が良いfluent-plugin-record-modifierという選択肢もある。

fluentdのrecord_transformerでログを加工する - sambaiz-net

<filter test.log>
  @type record_modifier
  <record>
    ts_aggr ${Time.now().strftime('%Y-%m-%dT%H:%M:%S.%L%z')}
  </record>
</filter>

また、ログのタイムスタンプをtime_keyとしている場合、keep_time_key trueにするか<inject>してKDSに送られるようにする。

fluentdのでtime_formatを指定しなかった場合の挙動と内部処理 - sambaiz-net

TIMESTAMPのパース

タイムスタンプ文字列をTIMESTAMP型のカラムにマッピングすることもできるが、2021-01-01T00:00:00.000+0900形式の文字列をマッピングしたところ、UTCで18時間後ろにずれた時間になってしまった。

タイムスタンプ文字列とマッピングしたTIMESTAMP型のカラム

そこで一度CHARにマッピングしてからフォーマットを明示してCHAR_TO_TIMESTAMP(’<format_string>’,“column_name”)することにした。 <format_string>はJavaのSimpleDateFormatで記述し、この例の場合はyyyy-MM-dd''T''HH:mm:ss.SSSZのようになる。

マッピングの変更後にSourcesの方に何も出てこなくなった場合は、クエリの方で問題が起きている可能性があるので、コンソール上でクエリを保存してみてエラーにならないか確認し、必要なら修正する。

クエリ

Kinesis Data AnalyticsのSQL, Lambdaへの出力とCDKによるリソースの作成 - sambaiz-net

各タイムスタンプのMINを取って現在時刻との差を取れば、そのウインドウ集計対象のログのうち最も古いものの各箇所からのレイテンシが出せる。

CREATE OR REPLACE PUMP "TEST_PUMP" AS 
  INSERT INTO "TEST_STREAM" ("min_ts", "min_ts_kds", "min_ts_kda")
  SELECT STREAM 
    MIN(CHAR_TO_TIMESTAMP('yyyy-MM-dd''T''HH:mm:ss.SSSZ', "ts")) as min_ts,
    MIN("APPROXIMATE_ARRIVAL_TIME") as min_ts_kds,
    "ROWTIME" as min_ts_kda
  FROM "SOURCE_SQL_STREAM_001"
  GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '10' SECOUND)

メトリクス

関連するKDAのメトリクスとしてストリームからの読み込みのレイテンシであるMillisBehindLatestがある。これが通常の読み込みの頻度である1000msを超えて上昇し続けている場合、InputBytesのスループットやdestinationへの送信に問題がないかを確認する。なお、この1000msという値はKDSで推奨されている読み込み頻度で、GetRecords APIの呼び出し上限に当たるのを防いでいる。