CDKでKinesis Data Analytics上にPyFlinkのコードをデプロイして動かす

awsetl

KDAがPyFlinkをサポートしたのでCDKで構築して動かしてみる。 全体のコードはGitHubにある。

Kinesis Data AnalyticsのSQL, Lambdaへの出力とCDKによるリソースの作成 - sambaiz-net

今回動かすのは次の、KDSから流れてきたデータにクエリを実行し、その結果をS3に書き込む簡単なコード。

from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.window import Tumble
import os
import json

APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json"  # on kda


def get_application_properties():
    if os.path.isfile(APPLICATION_PROPERTIES_FILE_PATH):
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents = file.read()
            properties = json.loads(contents)
            return properties
    else:
        print('A file at "{}" was not found'.format(
            APPLICATION_PROPERTIES_FILE_PATH))


def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]


def main():
    table_env = StreamTableEnvironment.create(
        environment_settings=EnvironmentSettings.new_instance(
        ).in_streaming_mode().use_blink_planner().build()
    )

    props = get_application_properties()
    input_property_map = property_map(props, "consumer.config.0")
    output_property_map = property_map(props, "sink.config.0")

    input_table_name = "input_table"
    output_table_name = "output_table"

    table_env.execute_sql(
        """CREATE TABLE {0} (
                foo BIGINT
            ) WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
            )""".format(
            input_table_name,
            input_property_map["input.stream.name"],
            input_property_map["aws.region"],
            input_property_map["flink.stream.initpos"])
    )

    table_env.execute_sql(
        """CREATE TABLE {0} (
                bar BIGINT
            ) WITH (
                'connector' = 'filesystem',
                'path' = 's3a://{1}/',
                'format' = 'csv',
                'sink.partition-commit.policy.kind' = 'success-file',
                'sink.partition-commit.delay' = '1 min'
            )""".format(
            output_table_name,
            output_property_map["output.bucket.name"])
    )

    table_env.execute_sql(
        """INSERT INTO {0}
            SELECT foo as bar
            FROM {1}
        """.format(output_table_name, input_table_name)
    )


if __name__ == "__main__":
    main()

次のようにpythonファイルとconnectorのJARをpropertyGroupで指定する

{
    propertyGroupId: "kinesis.analytics.flink.run.options",
    propertyMap: {
        "python": "kda-app.py",
        "jarfile": "amazon-kinesis-sql-connector-flink-2.0.3.jar",
    }
}, 
{
    propertyGroupId: "consumer.config.0",
    propertyMap: {
        "input.stream.name": srcStream.streamName,
        "aws.region": this.region,
        "flink.stream.initpos": "LATEST",
    }
},
{
    propertyGroupId: "sink.config.0",
    propertyMap: {
        "output.bucket.name": destBucket.bucketName,
    }
}

JARはmavenから持ってきてパッケージに含める。 amazon-kinesis-connector-flinkというのもあるが、2.0.3時点では依存が足りずそのままでは動かない

$ mvn dependency:get -Dartifact=software.amazon.kinesis:amazon-kinesis-sql-connector-flink:2.0.3 -Ddest=./kda-app

デプロイ完了後実行する。

アプリケーショングラフ

アプリケーショングラフに何も表示されない場合はログを確認する。そのままだと見づらいので次のように整形すると良い。

$ export LOG_GROUP_NAME=**** LOG_STREAM_NAME=*****
$ aws logs get-log-events --log-group-name ${LOG_GROUP_NAME} --log-stream-name ${LOG_STREAM_NAME} | jq -r '.events[] | (.timestamp | ./1000 | todate) + "\t" + (.message | fromjson | .messageType + "\t" + .message)'
...
2021-04-24T12:57:52Z	INFO	Triggering checkpoint 28 (type=CHECKPOINT) @ 1619269072699 for job 8c9c84e4c6183d644fdd765c0ba3b31c.
2021-04-24T12:57:53Z	INFO	Completed checkpoint 28 for job 8c9c84e4c6183d644fdd765c0ba3b31c (3748 bytes in 395 ms).
2021-04-24T12:57:53Z	INFO	Subtask 0 received completion notification for checkpoint with id=28.
2021-04-24T12:58:52Z	INFO	Triggering checkpoint 29 (type=CHECKPOINT) @ 1619269132699 for job 8c9c84e4c6183d644fdd765c0ba3b31c.
2021-04-24T12:58:52Z	INFO	Subtask 0 checkpointing for checkpoint with id=29 (max part counter=0).
2021-04-24T12:58:53Z	INFO	Completed checkpoint 29 for job 8c9c84e4c6183d644fdd765c0ba3b31c (3748 bytes in 375 ms).
2021-04-24T12:58:53Z	INFO	Subtask 0 received completion notification for checkpoint with id=29.

Kinesis Data GeneratorでKDSにデータを流し込んで、S3に保存されていたら成功だ。もし何も保存されていない場合はFlinkのダッシュボードに何か出ていないか確認する。

Apache Flink Dashboard