Kinesis Data AnalyticsのSQLとLambdaとの接続

(2020-10-03)

Kinesis Data AnalyticsでStreaming SQLを実行し、 Lambdaに送る。ほかの接続先としてData StreamやFirehoseがあり、フォーマットはJSONとCSVから選べる。

Kinesis Streams/Firehose/Analyticsを試す - sambaiz-net

STREAMとPUMP

(in-application) STREAMを作成し、PUMPで他のSTREAMをSELECTした結果をINSERTすることでデータを流していく。出力先を設定する際にSTREAMを選ぶ。 STREAMはRDBのテーブルのようにカラムを持ち、JOINもできる。

CREATE OR REPLACE STREAM "TEMPSTREAM" ( 
   "column1" BIGINT NOT NULL, 
   "column2" INTEGER, 
   "column3" VARCHAR(64));

CREATE OR REPLACE PUMP "SAMPLEPUMP" AS 
INSERT INTO "TEMPSTREAM" ("column1", 
                          "column2", 
                          "column3") 
SELECT STREAM inputcolumn1, 
              inputcolumn2, 
              inputcolumn3
FROM "INPUTSTREAM";

Windowed Queries

固定の重複しない区間で集計するクエリ。次のように書くと60秒区切りで集計できる。

GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND)

ROWTIMEは 最初のSTREAMにデータが入った時のタイムスタンプが格納される特別なカラム。SELECTしなくても自動で渡される。

固定の区間ではなく最初に対象パーティションのデータが届くとそこからINTERVALの区間のWindowが始まる。 Tumbling Windowsではデータに含まれるタイムスタンプで集計する場合、遅れて届くことで同じ区間のレコードが分かれてしまう問題があるがそれを緩和できる。

WINDOWED BY STAGGER (
	PARTITION BY FLOOR(EVENT_TIME TO MINUTE), TICKER_SYMBOL 
	RANGE INTERVAL '1' MINUTE);

区間をずらしながら集計するクエリ。次のように書くと{2|10}個のデータの移動平均を出せる。

avg(price) over last2rows
avg(price) over last10rows
...
WINDOW 
	last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING),
	last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING)

Lambdaとの接続

データはbase64エンコードされているのでデコードして使う。

package main

import (
	"encoding/base64"
	"fmt"

	"github.com/aws/aws-lambda-go/lambda"
)

// https://docs.aws.amazon.com/kinesisanalytics/latest/dev/how-it-works-output-lambda.html
type KinesisDataAnalyticsInput struct {
	InvocationID   string                            `json: "invocationId"`
	ApplicationARN string                            `json: "applicationArn"`
	Records        []KinesisDataAnalyticsInputRecord `json: "records"`
}

type KinesisDataAnalyticsInputRecord struct {
	RecordID                     string                                  `json: "recordId"`
	LambdaDeliveryRecordMetadata KinesisDataAnalyticsInputRecordMetadata `json: "lambdaDeliveryRecordMetadata"`
	Data                         string                                  `json: "data"`
}

type KinesisDataAnalyticsInputRecordMetadata struct {
	RetryHint int `json: "retryHint"`
}

func handler(request KinesisDataAnalyticsInput) (interface{}, error) {
	decoded, err := base64.StdEncoding.DecodeString(request.Records[0].Data)
	if err != nil {
		return nil, err
	}
	fmt.Printf("request: %+v, body[0]: %s", request, string(decoded))
	return "ok", nil
}

func main() {
	lambda.Start(handler)
}