ファイルを置くだけで読むことができる通常のテーブルと異なり、Iceberg テーブルはメタデータを更新するために INSERT する必要がある。これをバッチ処理で行うと最新のデータが読めるまでにラグが発生してしまうので、ラグを最小限にするため Kinesis Data Streams などを介して自前でストリーミング処理することもできるが、Firehose の出力先として Iceberg テーブルを設定すると容易に書き込むことができる。
Firehose に Direct PUT すると KDS を挟む必要がないが、スループット上限に当たらないか注意する。1 レコードに複数の JSON を含めるといったこともできないので回避が難しい。なお、KDS をデータソースとする場合は KPL での集約ができる。
FluentdとKPL(Kinesis Producer Library)でログをまとめてスループットを稼ぐ - sambaiz-net
1 レコード 1 JSON だと Firehose の取り込み料金が高くつきそうだが、Iceberg においては 5 KB 単位での切り上げが行われないようになっている。
権限を付与した Stream を作成する。この際、出力先のテーブルが存在しなかったり参照できないとエラーになる。
Iceberg テーブルを Glue Data Catalog に登録して Athena や Snowflake からクエリする - sambaiz-net
const database = new glue.CfnDatabase(this, "Database", {
catalogId: this.account,
databaseInput: {
name: databaseName,
}
})
const table = new glue.CfnTable(this, "Table", {
databaseName: database.ref,
catalogId: this.account,
tableInput: {
name: tableName,
storageDescriptor: {
columns: [
{ name: "remote_addr", type: "string" },
{ name: "remote_user", type: "string" },
{ name: "time_local", type: "string" },
{ name: "request", type: "string" },
{ name: "status", type: "int" },
{ name: "body_bytes_sent", type: "int" },
{ name: "http_referer", type: "string" },
{ name: "http_user_agent", type: "string" },
],
location: `s3://${bucketName}/${tableName}`,
},
tableType: 'EXTERNAL_TABLE',
},
openTableFormatInput: {
icebergInput: {
metadataOperation: "CREATE",
version: "2",
}
}
})
const firehoseRole = new iam.Role(this, 'FirehoseIcebergRole', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
inlinePolicies: {
'GluePolicy': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['glue:GetTable', 'glue:GetDatabase', 'glue:UpdateTable'],
resources: [
`arn:aws:glue:${this.region}:${this.account}:catalog`,
`arn:aws:glue:${this.region}:${this.account}:database/*`,
`arn:aws:glue:${this.region}:${this.account}:table/*/*`,
],
})
]
}),
'S3Policy': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['s3:PutObject', 's3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:DeleteObject'],
resources: [
`arn:aws:s3:::${bucketName}`,
`arn:aws:s3:::${bucketName}/*`,
],
})
]
}),
}
})
const deliveryStream = new CfnDeliveryStream(this, 'IcebergDeliveryStream', {
deliveryStreamName: tableName,
deliveryStreamType: 'DirectPut',
icebergDestinationConfiguration: {
roleArn: firehoseRole.roleArn,
catalogConfiguration: {
catalogArn: `arn:aws:glue:${this.region}:${this.account}:catalog`,
},
destinationTableConfigurationList: [
{
destinationDatabaseName: databaseName,
destinationTableName: tableName,
},
],
s3BackupMode: 'FailedDataOnly',
s3Configuration: {
bucketArn: `arn:aws:s3:::${bucketName}`,
roleArn: firehoseRole.roleArn,
errorOutputPrefix: `${tableName}-error/`,
},
}
})
ECS から FireLens で Firehose に送る。
FireLens で ECS の FluentBit サイドカーに設定を配置しログが転送されるようにする - sambaiz-net
const taskRole = new iam.Role(this, 'TaskRole', {
assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
})
taskRole.addToPolicy(new iam.PolicyStatement({
actions: [
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
resources: [
firehose.getAtt('Arn').toString()
]
}))
const taskDef = new ecs.FargateTaskDefinition(this, 'TaskDef', {
cpu: 256,
memoryLimitMiB: 512,
taskRole,
executionRole: execRole,
})
taskDef.addContainer('AppContainer', {
containerName: 'app',
image: ecs.ContainerImage.fromRegistry('nginx'),
portMappings: [{ containerPort: 80 }],
logging: ecs.LogDrivers.firelens({
options: {
Name: 'firehose',
delivery_stream: firehose.ref,
region: cdk.Stack.of(this).region,
}
}),
})
taskDef.addFirelensLogRouter('LogRouter', {
containerName: 'fluent-bit',
image: ecs.ContainerImage.fromAsset('assets/fluent-bit', {
platform: ecrassets.Platform.LINUX_AMD64,
}),
firelensConfig: {
type: ecs.FirelensLogRouterType.FLUENTBIT,
options: {
configFileType: ecs.FirelensConfigFileType.FILE,
configFileValue: '/extra.conf',
},
},
logging: ecs.LogDrivers.awsLogs({ streamPrefix: 'firelens' }),
})
nginx のアクセスログをパースして JSON にする。
$ cat assets/fluent-bit/extra.conf
[Service]
Parsers_File /parsers.conf
[FILTER]
Name parser
Match app-firelens*
Key_Name log
Parser nginx
$ cat assets/fluent-bit/parsers.conf
[PARSER]
Name nginx
Format regex
Regex ^(?<remote_addr>[^ ]*) - (?<remote_user>[^ ]*) \[(?<time_local>[^\]]*)\] "(?<request>[^"]*)" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)"
Time_Key time_local
Time_Format %d/%b/%Y:%H:%M:%S %z
Time_Keep On
これでリクエストを送るとテーブルにデータが書き込まれる。書き込まれない場合は (errorOutputPrefix)iceberg-failed/ に何か出ていないか確認する。
{
"attemptsMade": 0,
"arrivalTimestamp": 1746839862000,
"errorCode": "Iceberg.NoSuchIcebergTable",
"errorMessage": "Input Glue table is not an iceberg table. Table: testdb.test_table",
"attemptEndingTimestamp": 1746840238899,
"rawData": "eyJ***"
}
レコードの内容をもとに書き込み先をルーティングすることもできる。
icebergDestinationConfiguration: {
...
destinationTableConfigurationList: [],
processingConfiguration: {
enabled: true,
processors: [{
type: 'MetadataExtraction',
parameters: [
{ parameterName: 'JsonParsingEngine', parameterValue: 'JQ-1.6' },
{
parameterName: 'MetadataExtractionQuery',
parameterValue: `{
destinationDatabaseName: "${databaseName}",
destinationTableName: ("request_" + (.status | tostring)),
operation: "insert"
}`.replace(/\s*/g, "")
}
]
}]
},
...
}
Athena から request_200 テーブルをクエリして status=200 のレコードが書き込まれていることが確認できた。