KDAがPyFlinkをサポートしたのでCDKで構築して動かしてみる。 全体のコードはGitHubにある。
Kinesis Data AnalyticsのSQL, Lambdaへの出力とCDKによるリソースの作成 - sambaiz-net
今回動かすのは次の、KDSから流れてきたデータにクエリを実行し、その結果をS3に書き込む簡単なコード。
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.window import Tumble
import os
import json
APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" # on kda
def get_application_properties():
if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
contents = file.read()
properties = json.loads(contents)
return properties
else:
print('A file at "{}" was not found'.format(
APPLICATION_PROPERTIES_FILE_PATH))
def property_map(props, property_group_id):
for prop in props:
if prop["PropertyGroupId"] == property_group_id:
return prop["PropertyMap"]
def main():
table_env = StreamTableEnvironment.create(
environment_settings=EnvironmentSettings.new_instance(
).in_streaming_mode().use_blink_planner().build()
)
props = get_application_properties()
input_property_map = property_map(props, "consumer.config.0")
output_property_map = property_map(props, "sink.config.0")
input_table_name = "input_table"
output_table_name = "output_table"
table_env.execute_sql(
"""CREATE TABLE {0} (
foo BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = '{1}',
'aws.region' = '{2}',
'scan.stream.initpos' = '{3}',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)""".format(
input_table_name,
input_property_map["input.stream.name"],
input_property_map["aws.region"],
input_property_map["flink.stream.initpos"])
)
table_env.execute_sql(
"""CREATE TABLE {0} (
bar BIGINT
) WITH (
'connector' = 'filesystem',
'path' = 's3a://{1}/',
'format' = 'csv',
'sink.partition-commit.policy.kind' = 'success-file',
'sink.partition-commit.delay' = '1 min'
)""".format(
output_table_name,
output_property_map["output.bucket.name"])
)
table_env.execute_sql(
"""INSERT INTO {0}
SELECT foo as bar
FROM {1}
""".format(output_table_name, input_table_name)
)
if __name__ == "__main__":
main()
次のようにpythonファイルとconnectorのJARをpropertyGroupで指定する。
{
propertyGroupId: "kinesis.analytics.flink.run.options",
propertyMap: {
"python": "kda-app.py",
"jarfile": "amazon-kinesis-sql-connector-flink-2.0.3.jar",
}
},
{
propertyGroupId: "consumer.config.0",
propertyMap: {
"input.stream.name": srcStream.streamName,
"aws.region": this.region,
"flink.stream.initpos": "LATEST",
}
},
{
propertyGroupId: "sink.config.0",
propertyMap: {
"output.bucket.name": destBucket.bucketName,
}
}
JARはmavenから持ってきてパッケージに含める。
amazon-kinesis-connector-flink
というのもあるが、2.0.3
時点では依存が足りずそのままでは動かない。
$ mvn dependency:get -Dartifact=software.amazon.kinesis:amazon-kinesis-sql-connector-flink:2.0.3 -Ddest=./kda-app
デプロイ完了後実行する。
アプリケーショングラフに何も表示されない場合はログを確認する。そのままだと見づらいので次のように整形すると良い。
$ export LOG_GROUP_NAME=**** LOG_STREAM_NAME=*****
$ aws logs get-log-events --log-group-name ${LOG_GROUP_NAME} --log-stream-name ${LOG_STREAM_NAME} | jq -r '.events[] | (.timestamp | ./1000 | todate) + "\t" + (.message | fromjson | .messageType + "\t" + .message)'
...
2021-04-24T12:57:52Z INFO Triggering checkpoint 28 (type=CHECKPOINT) @ 1619269072699 for job 8c9c84e4c6183d644fdd765c0ba3b31c.
2021-04-24T12:57:53Z INFO Completed checkpoint 28 for job 8c9c84e4c6183d644fdd765c0ba3b31c (3748 bytes in 395 ms).
2021-04-24T12:57:53Z INFO Subtask 0 received completion notification for checkpoint with id=28.
2021-04-24T12:58:52Z INFO Triggering checkpoint 29 (type=CHECKPOINT) @ 1619269132699 for job 8c9c84e4c6183d644fdd765c0ba3b31c.
2021-04-24T12:58:52Z INFO Subtask 0 checkpointing for checkpoint with id=29 (max part counter=0).
2021-04-24T12:58:53Z INFO Completed checkpoint 29 for job 8c9c84e4c6183d644fdd765c0ba3b31c (3748 bytes in 375 ms).
2021-04-24T12:58:53Z INFO Subtask 0 received completion notification for checkpoint with id=29.
Kinesis Data GeneratorでKDSにデータを流し込んで、S3に保存されていたら成功だ。もし何も保存されていない場合はFlinkのダッシュボードに何か出ていないか確認する。