Streaming Writes to Iceberg Tables with Kinesis Data Firehose

awsiceberg

Unlike regular tables that can be read just by placing files, Iceberg tables require to perform INSERTs to update metadata. If you use batch processing for this, there will be a lag before the latest data becomes available. To minimize this lag, you could implement your own streaming process using Kinesis Data Streams, but by setting an Iceberg table as the Firehose destination, you can easily write to it.

If you do Direct PUT to Firehose, you don’t need to go through KDS, but be careful not to hit the throughput limits. You can’t include multiple JSONs in a single record, so it’s hard to work around this. If you use KDS as the data source, you can aggregate with KPL.

FluentdとKPL(Kinesis Producer Library)でログをまとめてスループットを稼ぐ - sambaiz-net

You would think that one JSON per record causes Firehose ingestion costs to be high, but with Iceberg, there is no rounding up to 5 KB units.

Create a stream with the access policies. If the destination table does not exist or cannot be referenced, an error will occur.

Register Iceberg Tables in Glue Data Catalog to query from Athena and Snowflake - sambaiz-net

const database = new glue.CfnDatabase(this, "Database", {
  catalogId: this.account,
  databaseInput: {
    name: databaseName,
  }
})

const table = new glue.CfnTable(this, "Table", {
  databaseName: database.ref,
  catalogId: this.account,
  tableInput: {
      name: tableName,
      storageDescriptor: {
          columns: [
            { name: "remote_addr", type: "string" },
            { name: "remote_user", type: "string" },
            { name: "time_local", type: "string" },
            { name: "request", type: "string" },
            { name: "status", type: "int" },
            { name: "body_bytes_sent", type: "int" },
            { name: "http_referer", type: "string" },
            { name: "http_user_agent", type: "string" },
          ],
          location: `s3://${bucketName}/${tableName}`,
      },
      tableType: 'EXTERNAL_TABLE',
  },
  openTableFormatInput: {
      icebergInput: {
          metadataOperation: "CREATE",
          version: "2",
      }
  }
})

const firehoseRole = new iam.Role(this, 'FirehoseIcebergRole', {
  assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
  inlinePolicies: {
    'GluePolicy': new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ['glue:GetTable', 'glue:GetDatabase', 'glue:UpdateTable'],
          resources: [
            `arn:aws:glue:${this.region}:${this.account}:catalog`,
            `arn:aws:glue:${this.region}:${this.account}:database/*`,
            `arn:aws:glue:${this.region}:${this.account}:table/*/*`,
          ],
        })
      ]
    }),
    'S3Policy': new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ['s3:PutObject', 's3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:DeleteObject'],
          resources: [
            `arn:aws:s3:::${bucketName}`,
            `arn:aws:s3:::${bucketName}/*`,
          ],
        })
      ]
    }),
  } 
})

const deliveryStream = new CfnDeliveryStream(this, 'IcebergDeliveryStream', {
  deliveryStreamName: tableName,
  deliveryStreamType: 'DirectPut',
  icebergDestinationConfiguration: {
    roleArn: firehoseRole.roleArn,
    catalogConfiguration: {
      catalogArn: `arn:aws:glue:${this.region}:${this.account}:catalog`,
    },
    destinationTableConfigurationList: [
      {
        destinationDatabaseName: databaseName,
        destinationTableName: tableName,
      },
    ],
    s3BackupMode: 'FailedDataOnly',
    s3Configuration: {
      bucketArn: `arn:aws:s3:::${bucketName}`,
      roleArn: firehoseRole.roleArn,
      errorOutputPrefix: `${tableName}-error/`,
    },
  }
})

Send logs to Firehose from ECS using FireLens.

Configure FireLens to set up Fluent Bit sidecar in ECS to forward logs - sambaiz-net

const taskRole = new iam.Role(this, 'TaskRole', {
  assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
})

taskRole.addToPolicy(new iam.PolicyStatement({
  actions: [
    "firehose:PutRecord",
    "firehose:PutRecordBatch"
  ],
  resources: [
    firehose.getAtt('Arn').toString()
  ]
}))

const taskDef = new ecs.FargateTaskDefinition(this, 'TaskDef', {
  cpu: 256,
  memoryLimitMiB: 512,
  taskRole,
  executionRole: execRole,
})

taskDef.addContainer('AppContainer', {
  containerName: 'app',
  image: ecs.ContainerImage.fromRegistry('nginx'),
  portMappings: [{ containerPort: 80 }],
  logging: ecs.LogDrivers.firelens({
    options: {
      Name: 'firehose',
      delivery_stream: firehose.ref,
      region: cdk.Stack.of(this).region,
    }
  }),
})

taskDef.addFirelensLogRouter('LogRouter', {
  containerName: 'fluent-bit',
  image: ecs.ContainerImage.fromAsset('assets/fluent-bit', {
    platform: ecrassets.Platform.LINUX_AMD64,
  }),
  firelensConfig: {
    type: ecs.FirelensLogRouterType.FLUENTBIT,
    options: {
      configFileType: ecs.FirelensConfigFileType.FILE,
      configFileValue: '/extra.conf',
    },
  },
  logging: ecs.LogDrivers.awsLogs({ streamPrefix: 'firelens' }),
})

Parse nginx access logs to JSON.

$ cat assets/fluent-bit/extra.conf 
[Service]
    Parsers_File    /parsers.conf

[FILTER]
    Name              parser
    Match             app-firelens*
    Key_Name          log
    Parser            nginx
                                                                                                            
$ cat assets/fluent-bit/parsers.conf   
[PARSER]
    Name        nginx
    Format      regex
    Regex       ^(?<remote_addr>[^ ]*) - (?<remote_user>[^ ]*) \[(?<time_local>[^\]]*)\] "(?<request>[^"]*)" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)"
    Time_Key    time_local
    Time_Format %d/%b/%Y:%H:%M:%S %z 
    Time_Keep   On

Then, sending a request will write data to the table. If data is not written, check if there is anything in (errorOutputPrefix)iceberg-failed/.

{
  "attemptsMade": 0,
  "arrivalTimestamp": 1746839862000,
  "errorCode": "Iceberg.NoSuchIcebergTable",
  "errorMessage": "Input Glue table is not an iceberg table. Table: testdb.test_table",
  "attemptEndingTimestamp": 1746840238899,
  "rawData": "eyJ***"
}

You can also route writing tables based on the record content.

icebergDestinationConfiguration: {
  ...
  destinationTableConfigurationList: [],
  processingConfiguration: {
    enabled: true,
    processors: [{
      type: 'MetadataExtraction',
      parameters: [
        { parameterName: 'JsonParsingEngine', parameterValue: 'JQ-1.6' },
        {
          parameterName: 'MetadataExtractionQuery',
          parameterValue: `{
            destinationDatabaseName: "${databaseName}",
            destinationTableName: ("request_" + (.status | tostring)),
            operation: "insert"
          }`.replace(/\s*/g, "")
        }
      ]
    }]
  },
  ...
}

You can confirm that records with status=200 are written by querying the request_200 table from Athena.