Unlike regular tables that can be read just by placing files, Iceberg tables require to perform INSERTs to update metadata. If you use batch processing for this, there will be a lag before the latest data becomes available. To minimize this lag, you could implement your own streaming process using Kinesis Data Streams, but by setting an Iceberg table as the Firehose destination, you can easily write to it.
If you do Direct PUT to Firehose, you don’t need to go through KDS, but be careful not to hit the throughput limits. You can’t include multiple JSONs in a single record, so it’s hard to work around this. If you use KDS as the data source, you can aggregate with KPL.
FluentdとKPL(Kinesis Producer Library)でログをまとめてスループットを稼ぐ - sambaiz-net
You would think that one JSON per record causes Firehose ingestion costs to be high, but with Iceberg, there is no rounding up to 5 KB units.
Create a stream with the access policies. If the destination table does not exist or cannot be referenced, an error will occur.
Register Iceberg Tables in Glue Data Catalog to query from Athena and Snowflake - sambaiz-net
const database = new glue.CfnDatabase(this, "Database", {
catalogId: this.account,
databaseInput: {
name: databaseName,
}
})
const table = new glue.CfnTable(this, "Table", {
databaseName: database.ref,
catalogId: this.account,
tableInput: {
name: tableName,
storageDescriptor: {
columns: [
{ name: "remote_addr", type: "string" },
{ name: "remote_user", type: "string" },
{ name: "time_local", type: "string" },
{ name: "request", type: "string" },
{ name: "status", type: "int" },
{ name: "body_bytes_sent", type: "int" },
{ name: "http_referer", type: "string" },
{ name: "http_user_agent", type: "string" },
],
location: `s3://${bucketName}/${tableName}`,
},
tableType: 'EXTERNAL_TABLE',
},
openTableFormatInput: {
icebergInput: {
metadataOperation: "CREATE",
version: "2",
}
}
})
const firehoseRole = new iam.Role(this, 'FirehoseIcebergRole', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
inlinePolicies: {
'GluePolicy': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['glue:GetTable', 'glue:GetDatabase', 'glue:UpdateTable'],
resources: [
`arn:aws:glue:${this.region}:${this.account}:catalog`,
`arn:aws:glue:${this.region}:${this.account}:database/*`,
`arn:aws:glue:${this.region}:${this.account}:table/*/*`,
],
})
]
}),
'S3Policy': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: ['s3:PutObject', 's3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:DeleteObject'],
resources: [
`arn:aws:s3:::${bucketName}`,
`arn:aws:s3:::${bucketName}/*`,
],
})
]
}),
}
})
const deliveryStream = new CfnDeliveryStream(this, 'IcebergDeliveryStream', {
deliveryStreamName: tableName,
deliveryStreamType: 'DirectPut',
icebergDestinationConfiguration: {
roleArn: firehoseRole.roleArn,
catalogConfiguration: {
catalogArn: `arn:aws:glue:${this.region}:${this.account}:catalog`,
},
destinationTableConfigurationList: [
{
destinationDatabaseName: databaseName,
destinationTableName: tableName,
},
],
s3BackupMode: 'FailedDataOnly',
s3Configuration: {
bucketArn: `arn:aws:s3:::${bucketName}`,
roleArn: firehoseRole.roleArn,
errorOutputPrefix: `${tableName}-error/`,
},
}
})
Send logs to Firehose from ECS using FireLens.
Configure FireLens to set up Fluent Bit sidecar in ECS to forward logs - sambaiz-net
const taskRole = new iam.Role(this, 'TaskRole', {
assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
})
taskRole.addToPolicy(new iam.PolicyStatement({
actions: [
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
resources: [
firehose.getAtt('Arn').toString()
]
}))
const taskDef = new ecs.FargateTaskDefinition(this, 'TaskDef', {
cpu: 256,
memoryLimitMiB: 512,
taskRole,
executionRole: execRole,
})
taskDef.addContainer('AppContainer', {
containerName: 'app',
image: ecs.ContainerImage.fromRegistry('nginx'),
portMappings: [{ containerPort: 80 }],
logging: ecs.LogDrivers.firelens({
options: {
Name: 'firehose',
delivery_stream: firehose.ref,
region: cdk.Stack.of(this).region,
}
}),
})
taskDef.addFirelensLogRouter('LogRouter', {
containerName: 'fluent-bit',
image: ecs.ContainerImage.fromAsset('assets/fluent-bit', {
platform: ecrassets.Platform.LINUX_AMD64,
}),
firelensConfig: {
type: ecs.FirelensLogRouterType.FLUENTBIT,
options: {
configFileType: ecs.FirelensConfigFileType.FILE,
configFileValue: '/extra.conf',
},
},
logging: ecs.LogDrivers.awsLogs({ streamPrefix: 'firelens' }),
})
Parse nginx access logs to JSON.
$ cat assets/fluent-bit/extra.conf
[Service]
Parsers_File /parsers.conf
[FILTER]
Name parser
Match app-firelens*
Key_Name log
Parser nginx
$ cat assets/fluent-bit/parsers.conf
[PARSER]
Name nginx
Format regex
Regex ^(?<remote_addr>[^ ]*) - (?<remote_user>[^ ]*) \[(?<time_local>[^\]]*)\] "(?<request>[^"]*)" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)"
Time_Key time_local
Time_Format %d/%b/%Y:%H:%M:%S %z
Time_Keep On
Then, sending a request will write data to the table. If data is not written, check if there is anything in (errorOutputPrefix)iceberg-failed/.
{
"attemptsMade": 0,
"arrivalTimestamp": 1746839862000,
"errorCode": "Iceberg.NoSuchIcebergTable",
"errorMessage": "Input Glue table is not an iceberg table. Table: testdb.test_table",
"attemptEndingTimestamp": 1746840238899,
"rawData": "eyJ***"
}
You can also route writing tables based on the record content.
icebergDestinationConfiguration: {
...
destinationTableConfigurationList: [],
processingConfiguration: {
enabled: true,
processors: [{
type: 'MetadataExtraction',
parameters: [
{ parameterName: 'JsonParsingEngine', parameterValue: 'JQ-1.6' },
{
parameterName: 'MetadataExtractionQuery',
parameterValue: `{
destinationDatabaseName: "${databaseName}",
destinationTableName: ("request_" + (.status | tostring)),
operation: "insert"
}`.replace(/\s*/g, "")
}
]
}]
},
...
}
You can confirm that records with status=200 are written by querying the request_200 table from Athena.