Kinesis Data AnalyticsのSQL, Lambdaへの出力とCDKによるリソースの作成

awsetl

Kinesis Data AnalyticsでStreaming SQLを実行し、 Lambdaに送る。ほかの接続先としてData StreamやFirehoseがあり、フォーマットはJSONとCSVから選べる。

Kinesis Streams/Firehose/Analyticsを試す - sambaiz-net

STREAMとPUMP

(in-application) STREAMを作成し、PUMPで他のSTREAMをSELECTした結果をINSERTすることでデータを流していく。出力先を設定する際にSTREAMを選ぶ。 STREAMはRDBのテーブルのようにカラムを持ち、JOINもできる。

CREATE OR REPLACE STREAM "TEMPSTREAM" ( 
   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64));

CREATE OR REPLACE PUMP "SAMPLEPUMP" AS 
INSERT INTO "TEMPSTREAM" ("column1", 
                          "column2", 
                          "column3") 
SELECT STREAM inputcolumn1, 
              inputcolumn2, 
              inputcolumn3
FROM "INPUTSTREAM";

Windowed Queries

固定の重複しない区間で集計するクエリ。次のように書くと60秒区切りで集計できる。

GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND)

ROWTIMEは 最初のSTREAMにデータが入った時のタイムスタンプが格納される特別なカラム。SELECTしなくても自動で渡される。

固定の区間ではなく最初に対象パーティションのデータが届くとそこからINTERVALの区間のWindowが始まる。 Tumbling Windowsではデータに含まれるタイムスタンプで集計する場合、遅れて届くことで同じ区間のレコードが分かれてしまう問題があるがそれを緩和できる。

WINDOWED BY STAGGER (
	PARTITION BY FLOOR(EVENT_TIME TO MINUTE), TICKER_SYMBOL 
	RANGE INTERVAL '1' MINUTE);

区間をずらしながら集計するクエリ。次のように書くと{2|10}個のデータの移動平均を出せる。

avg(price) over last2rows
avg(price) over last10rows
...
WINDOW 
	last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING),
	last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING)

Lambdaへの出力

データはbase64エンコードされているのでデコードして使う。 レスポンスでrecordIdresultを返す必要があり、失敗するとretryHintをインクリメントして再送する。 フォーマットが正しくないと OkRecords にも DeliveryFailedRecords にもカウントされないが、その場合でも再送される。

package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/lambda"
)

// https://docs.aws.amazon.com/kinesisanalytics/latest/dev/how-it-works-output-lambda.html
type KinesisDataAnalyticsInput struct {
	InvocationID   string                            `json:"invocationId"`
	ApplicationARN string                            `json:"applicationArn"`
	Records        []KinesisDataAnalyticsInputRecord `json:"records"`
}

type KinesisDataAnalyticsInputRecord struct {
	RecordID                     string                                  `json:"recordId"`
	LambdaDeliveryRecordMetadata KinesisDataAnalyticsInputRecordMetadata `json:"lambdaDeliveryRecordMetadata"`
	Data                         string                                  `json:"data"`
}

type KinesisDataAnalyticsInputRecordMetadata struct {
	RetryHint int `json: "retryHint"`
}


type KinesisDataAnalyticsOutputs struct {
	Records []KinesisAnalyticsOutput
}

type KinesisDataAnalyticsOutput struct {
	RecordID string `json:"recordId"`
	Result   string `json:"result"`
}

func handler(request KinesisDataAnalyticsInput) (interface{}, error) {
	var outputs []KinesisDataAnalyticsOutput
	for _, record := range request.Records {
		decoded, err := base64.StdEncoding.DecodeString(request.Records[0].Data)
		if err != nil {
			fmt.Println(err)
			outputs = append(outputs, KinesisDataAnalyticsOutput{
				RecordID: record.RecordID,
				Result:   "DeliveryFailed",
			})
			continue
		}
		outputs = append(outputs, KinesisDataAnalyticsOutput{
			RecordID: record.RecordID,
			Result:   "Ok",
		})
	}
	return KinesisDataAnalyticsOutputs{outputs}, nil
}

func main() {
	lambda.Start(handler)
}

CDKによるリソースの作成

LambdaはSAMなどで別に作成している前提で、Data StreamsとAnalyticsのApplicationをCDKで作成する。 Flinkにも対応しているV2系のリソースで作成したところ現状コンソール上からスキーマやリアルタイムのデータを見ることができなかった。

コンソール非対応

クエリがsyntax errorでもデプロイは成功してしまうので注意が必要。

import * as cdk from '@aws-cdk/core';
import { Stream } from '@aws-cdk/aws-kinesis';
import * as analytics from '@aws-cdk/aws-kinesisanalytics';
import { Role, ServicePrincipal, PolicyDocument, PolicyStatement, Effect } from '@aws-cdk/aws-iam'

export class CdkStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const lambdaArn = "<lambda_arn>"

    const stream = new Stream(this, "TestStream", {
      streamName: "test-stream",
      shardCount: 3,
    });

    const role = new Role(this, 'TestAnalyticsRole', {
      assumedBy: new ServicePrincipal('kinesisanalytics.amazonaws.com'),
      inlinePolicies: {
        'test-analytics-policy': new PolicyDocument({
          statements: [
            new PolicyStatement({
              effect: Effect.ALLOW,
              actions: [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
              ],
              resources: [
                stream.streamArn
              ]
            }),
            new PolicyStatement({
              effect: Effect.ALLOW,
              actions: [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
              ],
              resources: [
                lambdaArn
              ]
            })
          ]
        })
      }
    })

    const applicationName = "test-analytics-application"
    const application = new analytics.CfnApplicationV2(this, 'TestAnalyticsApplication', {
      applicationName: applicationName,
      applicationDescription: "test",
      runtimeEnvironment: "SQL-1_0",
      serviceExecutionRole: role.roleArn,
      applicationConfiguration: {
        sqlApplicationConfiguration: {
          inputs: [{
            namePrefix: "SOURCE_SQL_STREAM",
            inputSchema: {
              recordColumns: [{
                name: "column1",
                sqlType: "BIGINT",
                mapping: "$.column1",
              }],
              recordFormat: {
                recordFormatType: "JSON",
                mappingParameters: {
                  jsonMappingParameters: {
                    recordRowPath: "$",
                  }
                }
              } 
            },
            kinesisStreamsInput: {
              resourceArn: stream.streamArn,
            }
          }]
        },
        applicationCodeConfiguration: {
          codeContent: {
            textContent: `
            CREATE OR REPLACE STREAM "SUM_STREAM" (
                "cnt" BIGINT NOT NULL
            );

            CREATE OR REPLACE PUMP "SUM_PUMP" AS
                INSERT INTO "SUM_STREAM" ("cnt")
                SELECT STREAM count(1) AS cnt
                FROM "SOURCE_SQL_STREAM_001"
                GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '10' SECOND);
            `
          },
          codeContentType: "PLAINTEXT"
        }
      }
    })

    const applicationOutput = new analytics.CfnApplicationOutputV2(this, 'TestAnalyticsApplicationOutput', {
      applicationName: applicationName,
      output: {
        name: 'test_analytics_output',
        destinationSchema: {
          recordFormatType: "JSON"
        },
        lambdaOutput: {
          resourceArn: lambdaArn
        }
      }
    })
    applicationOutput.addDependsOn(application)
  }
}

非V2の方で書くとこんな感じ。

import * as cdk from '@aws-cdk/core';
import { Stream } from '@aws-cdk/aws-kinesis';
import * as analytics from '@aws-cdk/aws-kinesisanalytics';
import { Role, ServicePrincipal, PolicyDocument, PolicyStatement, Effect } from '@aws-cdk/aws-iam'

export class CdkStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    ...
    const applicationName = "test-analytics-application"
    const application = new analytics.CfnApplication(this, 'TestAnalyticsApplication', {
      applicationName: applicationName,
      applicationDescription: "test",
      applicationCode: `
      CREATE OR REPLACE STREAM "SUM_STREAM" (
          "cnt" BIGINT NOT NULL
      );

      CREATE OR REPLACE PUMP "SUM_PUMP" AS
          INSERT INTO "SUM_STREAM" ("cnt")
          SELECT STREAM count(1) AS cnt
          FROM "SOURCE_SQL_STREAM_001"
          GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '10' SECOND);
      `
      runtimeEnvironment: "SQL-1_0",
      inputs: [{
        namePrefix: "SOURCE_SQL_STREAM",
        inputSchema: {
          recordColumns: [{
            name: "column1",
            sqlType: "BIGINT",
            mapping: "$.column1",
          }],
          recordFormat: {
            recordFormatType: "JSON",
            mappingParameters: {
              jsonMappingParameters: {
                recordRowPath: "$",
              }
            }
          } 
        },
        kinesisStreamsInput: {
          resourceArn: stream.streamArn,
          roleArn: role.roleArn
        }
      }]
    })

    const applicationOutput = new analytics.CfnApplicationOutput(this, 'TestAnalyticsApplicationOutput', {
      applicationName: applicationName,
      output: {
        name: 'test_analytics_output',
        destinationSchema: {
          recordFormatType: "JSON"
        },
        lambdaOutput: {
          resourceArn: stream.streamArn,
          roleArn: role.roleArn
        }
      }
    })
    applicationOutput.addDependsOn(application)
  }
}