Kinesis Data Firehose で Iceberg テーブルにストリーミング書き込みを行う

awsiceberg

ファイルを置くだけで読むことができる通常のテーブルと異なり、Iceberg テーブルはメタデータを更新するために INSERT する必要がある。これをバッチ処理で行うと最新のデータが読めるまでにラグが発生してしまうので、ラグを最小限にするため Kinesis Data Streams などを介して自前でストリーミング処理することもできるが、Firehose の出力先として Iceberg テーブルを設定すると容易に書き込むことができる。

Firehose に Direct PUT すると KDS を挟む必要がないが、スループット上限に当たらないか注意する。1 レコードに複数の JSON を含めるといったこともできないので回避が難しい。なお、KDS をデータソースとする場合は KPL での集約ができる。

FluentdとKPL(Kinesis Producer Library)でログをまとめてスループットを稼ぐ - sambaiz-net

1 レコード 1 JSON だと Firehose の取り込み料金が高くつきそうだが、Iceberg においては 5 KB 単位での切り上げが行われないようになっている。

権限を付与した Stream を作成する。この際、出力先のテーブルが存在しなかったり参照できないとエラーになる。

Iceberg テーブルを Glue Data Catalog に登録して Athena や Snowflake からクエリする - sambaiz-net

const database = new glue.CfnDatabase(this, "Database", {
  catalogId: this.account,
  databaseInput: {
    name: databaseName,
  }
})

const table = new glue.CfnTable(this, "Table", {
  databaseName: database.ref,
  catalogId: this.account,
  tableInput: {
      name: tableName,
      storageDescriptor: {
          columns: [
            { name: "remote_addr", type: "string" },
            { name: "remote_user", type: "string" },
            { name: "time_local", type: "string" },
            { name: "request", type: "string" },
            { name: "status", type: "int" },
            { name: "body_bytes_sent", type: "int" },
            { name: "http_referer", type: "string" },
            { name: "http_user_agent", type: "string" },
          ],
          location: `s3://${bucketName}/${tableName}`,
      },
      tableType: 'EXTERNAL_TABLE',
  },
  openTableFormatInput: {
      icebergInput: {
          metadataOperation: "CREATE",
          version: "2",
      }
  }
})

const firehoseRole = new iam.Role(this, 'FirehoseIcebergRole', {
  assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
  inlinePolicies: {
    'GluePolicy': new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ['glue:GetTable', 'glue:GetDatabase', 'glue:UpdateTable'],
          resources: [
            `arn:aws:glue:${this.region}:${this.account}:catalog`,
            `arn:aws:glue:${this.region}:${this.account}:database/*`,
            `arn:aws:glue:${this.region}:${this.account}:table/*/*`,
          ],
        })
      ]
    }),
    'S3Policy': new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ['s3:PutObject', 's3:AbortMultipartUpload', 's3:GetBucketLocation', 's3:GetObject', 's3:ListBucket', 's3:ListBucketMultipartUploads', 's3:DeleteObject'],
          resources: [
            `arn:aws:s3:::${bucketName}`,
            `arn:aws:s3:::${bucketName}/*`,
          ],
        })
      ]
    }),
  } 
})

const deliveryStream = new CfnDeliveryStream(this, 'IcebergDeliveryStream', {
  deliveryStreamName: tableName,
  deliveryStreamType: 'DirectPut',
  icebergDestinationConfiguration: {
    roleArn: firehoseRole.roleArn,
    catalogConfiguration: {
      catalogArn: `arn:aws:glue:${this.region}:${this.account}:catalog`,
    },
    destinationTableConfigurationList: [
      {
        destinationDatabaseName: databaseName,
        destinationTableName: tableName,
      },
    ],
    s3BackupMode: 'FailedDataOnly',
    s3Configuration: {
      bucketArn: `arn:aws:s3:::${bucketName}`,
      roleArn: firehoseRole.roleArn,
      errorOutputPrefix: `${tableName}-error/`,
    },
  }
})

ECS から FireLens で Firehose に送る。

FireLens で ECS の FluentBit サイドカーに設定を配置しログが転送されるようにする - sambaiz-net

const taskRole = new iam.Role(this, 'TaskRole', {
  assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
})

taskRole.addToPolicy(new iam.PolicyStatement({
  actions: [
    "firehose:PutRecord",
    "firehose:PutRecordBatch"
  ],
  resources: [
    firehose.getAtt('Arn').toString()
  ]
}))

const taskDef = new ecs.FargateTaskDefinition(this, 'TaskDef', {
  cpu: 256,
  memoryLimitMiB: 512,
  taskRole,
  executionRole: execRole,
})

taskDef.addContainer('AppContainer', {
  containerName: 'app',
  image: ecs.ContainerImage.fromRegistry('nginx'),
  portMappings: [{ containerPort: 80 }],
  logging: ecs.LogDrivers.firelens({
    options: {
      Name: 'firehose',
      delivery_stream: firehose.ref,
      region: cdk.Stack.of(this).region,
    }
  }),
})

taskDef.addFirelensLogRouter('LogRouter', {
  containerName: 'fluent-bit',
  image: ecs.ContainerImage.fromAsset('assets/fluent-bit', {
    platform: ecrassets.Platform.LINUX_AMD64,
  }),
  firelensConfig: {
    type: ecs.FirelensLogRouterType.FLUENTBIT,
    options: {
      configFileType: ecs.FirelensConfigFileType.FILE,
      configFileValue: '/extra.conf',
    },
  },
  logging: ecs.LogDrivers.awsLogs({ streamPrefix: 'firelens' }),
})

nginx のアクセスログをパースして JSON にする。

$ cat assets/fluent-bit/extra.conf 
[Service]
    Parsers_File    /parsers.conf

[FILTER]
    Name              parser
    Match             app-firelens*
    Key_Name          log
    Parser            nginx
                                                                                                            
$ cat assets/fluent-bit/parsers.conf   
[PARSER]
    Name        nginx
    Format      regex
    Regex       ^(?<remote_addr>[^ ]*) - (?<remote_user>[^ ]*) \[(?<time_local>[^\]]*)\] "(?<request>[^"]*)" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*) "(?<http_referer>[^"]*)" "(?<http_user_agent>[^"]*)"
    Time_Key    time_local
    Time_Format %d/%b/%Y:%H:%M:%S %z 
    Time_Keep   On

これでリクエストを送るとテーブルにデータが書き込まれる。書き込まれない場合は (errorOutputPrefix)iceberg-failed/ に何か出ていないか確認する。

{
  "attemptsMade": 0,
  "arrivalTimestamp": 1746839862000,
  "errorCode": "Iceberg.NoSuchIcebergTable",
  "errorMessage": "Input Glue table is not an iceberg table. Table: testdb.test_table",
  "attemptEndingTimestamp": 1746840238899,
  "rawData": "eyJ***"
}

レコードの内容をもとに書き込み先をルーティングすることもできる。

icebergDestinationConfiguration: {
  ...
  destinationTableConfigurationList: [],
  processingConfiguration: {
    enabled: true,
    processors: [{
      type: 'MetadataExtraction',
      parameters: [
        { parameterName: 'JsonParsingEngine', parameterValue: 'JQ-1.6' },
        {
          parameterName: 'MetadataExtractionQuery',
          parameterValue: `{
            destinationDatabaseName: "${databaseName}",
            destinationTableName: ("request_" + (.status | tostring)),
            operation: "insert"
          }`.replace(/\s*/g, "")
        }
      ]
    }]
  },
  ...
}

Athena から request_200 テーブルをクエリして status=200 のレコードが書き込まれていることが確認できた。