Kinesis Data AnalyticsでStreaming SQLを実行し、 Lambdaに送る。ほかの接続先としてData StreamやFirehoseがあり、フォーマットはJSONとCSVから選べる。
Kinesis Streams/Firehose/Analyticsを試す - sambaiz-net
STREAMとPUMP
(in-application) STREAMを作成し、PUMPで他のSTREAMをSELECTした結果をINSERTすることでデータを流していく。出力先を設定する際にSTREAMを選ぶ。 STREAMはRDBのテーブルのようにカラムを持ち、JOINもできる。
CREATE OR REPLACE STREAM "TEMPSTREAM" (
"column1" BIGINT NOT NULL,
"column2" INTEGER,
"column3" VARCHAR(64));
CREATE OR REPLACE PUMP "SAMPLEPUMP" AS
INSERT INTO "TEMPSTREAM" ("column1",
"column2",
"column3")
SELECT STREAM inputcolumn1,
inputcolumn2,
inputcolumn3
FROM "INPUTSTREAM";
Windowed Queries
固定の重複しない区間で集計するクエリ。次のように書くと60秒区切りで集計できる。
GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND)
ROWTIMEは 最初のSTREAMにデータが入った時のタイムスタンプが格納される特別なカラム。SELECTしなくても自動で渡される。
固定の区間ではなく最初に対象パーティションのデータが届くとそこからINTERVALの区間のWindowが始まる。 Tumbling Windowsではデータに含まれるタイムスタンプで集計する場合、遅れて届くことで同じ区間のレコードが分かれてしまう問題があるがそれを緩和できる。
WINDOWED BY STAGGER (
PARTITION BY FLOOR(EVENT_TIME TO MINUTE), TICKER_SYMBOL
RANGE INTERVAL '1' MINUTE);
区間をずらしながら集計するクエリ。次のように書くと{2|10}個のデータの移動平均を出せる。
avg(price) over last2rows
avg(price) over last10rows
...
WINDOW
last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING),
last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING)
Lambdaへの出力
データはbase64エンコードされているのでデコードして使う。
レスポンスでrecordId
とresult
を返す必要があり、失敗するとretryHintをインクリメントして再送する。
フォーマットが正しくないと OkRecords
にも DeliveryFailedRecords
にもカウントされないが、その場合でも再送される。
package main
import (
"encoding/base64"
"fmt"
"github.com/aws/aws-lambda-go/lambda"
)
// https://docs.aws.amazon.com/kinesisanalytics/latest/dev/how-it-works-output-lambda.html
type KinesisDataAnalyticsInput struct {
InvocationID string `json:"invocationId"`
ApplicationARN string `json:"applicationArn"`
Records []KinesisDataAnalyticsInputRecord `json:"records"`
}
type KinesisDataAnalyticsInputRecord struct {
RecordID string `json:"recordId"`
LambdaDeliveryRecordMetadata KinesisDataAnalyticsInputRecordMetadata `json:"lambdaDeliveryRecordMetadata"`
Data string `json:"data"`
}
type KinesisDataAnalyticsInputRecordMetadata struct {
RetryHint int `json: "retryHint"`
}
type KinesisDataAnalyticsOutputs struct {
Records []KinesisAnalyticsOutput
}
type KinesisDataAnalyticsOutput struct {
RecordID string `json:"recordId"`
Result string `json:"result"`
}
func handler(request KinesisDataAnalyticsInput) (interface{}, error) {
var outputs []KinesisDataAnalyticsOutput
for _, record := range request.Records {
decoded, err := base64.StdEncoding.DecodeString(request.Records[0].Data)
if err != nil {
fmt.Println(err)
outputs = append(outputs, KinesisDataAnalyticsOutput{
RecordID: record.RecordID,
Result: "DeliveryFailed",
})
continue
}
outputs = append(outputs, KinesisDataAnalyticsOutput{
RecordID: record.RecordID,
Result: "Ok",
})
}
return KinesisDataAnalyticsOutputs{outputs}, nil
}
func main() {
lambda.Start(handler)
}
CDKによるリソースの作成
LambdaはSAMなどで別に作成している前提で、Data StreamsとAnalyticsのApplicationをCDKで作成する。 Flinkにも対応しているV2系のリソースで作成したところ現状コンソール上からスキーマやリアルタイムのデータを見ることができなかった。
クエリがsyntax errorでもデプロイは成功してしまうので注意が必要。
import * as cdk from '@aws-cdk/core';
import { Stream } from '@aws-cdk/aws-kinesis';
import * as analytics from '@aws-cdk/aws-kinesisanalytics';
import { Role, ServicePrincipal, PolicyDocument, PolicyStatement, Effect } from '@aws-cdk/aws-iam'
export class CdkStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
const lambdaArn = "<lambda_arn>"
const stream = new Stream(this, "TestStream", {
streamName: "test-stream",
shardCount: 3,
});
const role = new Role(this, 'TestAnalyticsRole', {
assumedBy: new ServicePrincipal('kinesisanalytics.amazonaws.com'),
inlinePolicies: {
'test-analytics-policy': new PolicyDocument({
statements: [
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
resources: [
stream.streamArn
]
}),
new PolicyStatement({
effect: Effect.ALLOW,
actions: [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration"
],
resources: [
lambdaArn
]
})
]
})
}
})
const applicationName = "test-analytics-application"
const application = new analytics.CfnApplicationV2(this, 'TestAnalyticsApplication', {
applicationName: applicationName,
applicationDescription: "test",
runtimeEnvironment: "SQL-1_0",
serviceExecutionRole: role.roleArn,
applicationConfiguration: {
sqlApplicationConfiguration: {
inputs: [{
namePrefix: "SOURCE_SQL_STREAM",
inputSchema: {
recordColumns: [{
name: "column1",
sqlType: "BIGINT",
mapping: "$.column1",
}],
recordFormat: {
recordFormatType: "JSON",
mappingParameters: {
jsonMappingParameters: {
recordRowPath: "$",
}
}
}
},
kinesisStreamsInput: {
resourceArn: stream.streamArn,
}
}]
},
applicationCodeConfiguration: {
codeContent: {
textContent: `
CREATE OR REPLACE STREAM "SUM_STREAM" (
"cnt" BIGINT NOT NULL
);
CREATE OR REPLACE PUMP "SUM_PUMP" AS
INSERT INTO "SUM_STREAM" ("cnt")
SELECT STREAM count(1) AS cnt
FROM "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '10' SECOND);
`
},
codeContentType: "PLAINTEXT"
}
}
})
const applicationOutput = new analytics.CfnApplicationOutputV2(this, 'TestAnalyticsApplicationOutput', {
applicationName: applicationName,
output: {
name: 'test_analytics_output',
destinationSchema: {
recordFormatType: "JSON"
},
lambdaOutput: {
resourceArn: lambdaArn
}
}
})
applicationOutput.addDependsOn(application)
}
}
非V2の方で書くとこんな感じ。
import * as cdk from '@aws-cdk/core';
import { Stream } from '@aws-cdk/aws-kinesis';
import * as analytics from '@aws-cdk/aws-kinesisanalytics';
import { Role, ServicePrincipal, PolicyDocument, PolicyStatement, Effect } from '@aws-cdk/aws-iam'
export class CdkStack extends cdk.Stack {
constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
...
const applicationName = "test-analytics-application"
const application = new analytics.CfnApplication(this, 'TestAnalyticsApplication', {
applicationName: applicationName,
applicationDescription: "test",
applicationCode: `
CREATE OR REPLACE STREAM "SUM_STREAM" (
"cnt" BIGINT NOT NULL
);
CREATE OR REPLACE PUMP "SUM_PUMP" AS
INSERT INTO "SUM_STREAM" ("cnt")
SELECT STREAM count(1) AS cnt
FROM "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '10' SECOND);
`
runtimeEnvironment: "SQL-1_0",
inputs: [{
namePrefix: "SOURCE_SQL_STREAM",
inputSchema: {
recordColumns: [{
name: "column1",
sqlType: "BIGINT",
mapping: "$.column1",
}],
recordFormat: {
recordFormatType: "JSON",
mappingParameters: {
jsonMappingParameters: {
recordRowPath: "$",
}
}
}
},
kinesisStreamsInput: {
resourceArn: stream.streamArn,
roleArn: role.roleArn
}
}]
})
const applicationOutput = new analytics.CfnApplicationOutput(this, 'TestAnalyticsApplicationOutput', {
applicationName: applicationName,
output: {
name: 'test_analytics_output',
destinationSchema: {
recordFormatType: "JSON"
},
lambdaOutput: {
resourceArn: stream.streamArn,
roleArn: role.roleArn
}
}
})
applicationOutput.addDependsOn(application)
}
}