CDKでGlue Data CatalogのDatabase,Table,Partition,Crawlerを作成する

awsetl

CDKでGlue Data CatlogのDatabase,Table,Partition,Crawlerを作成する。 PartitionやCrawlerはまだL2 constructが存在しないstorageDescriptorの設定はTableの実装を参考にした。

TableやPartitionはCrawlerで自動生成することができるが、ファイル数が膨大だと時間がかかることもあり、Tableはマニュアルで、Partitionの作成はAPIを呼んで作ることがあって、CDKで管理すると今の状態に関わらずデプロイによって必要なものが存在することを保証できる。

AWS GlueでCSVを加工しParquetに変換してパーティションを切りAthenaで参照する - sambaiz-net

import * as cdk from '@aws-cdk/core';
import { Database, Table, DataFormat, Schema, CfnPartition } from '@aws-cdk/aws-glue';
import { Bucket } from '@aws-cdk/aws-s3';
import { format, subDays } from 'date-fns'

export class CdkGlueDataCatalogSampleStack extends cdk.Stack {
  constructor(scope: cdk.Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const db = new Database(this, 'TestDB', {
      databaseName: "test-db",
    })

    const bucket = new Bucket(this, 'DBBucket', {
      bucketName: `test-db-bucket-${this.account}`
    })

    const table = new Table(this, 'TestTable', {
      tableName: "test_table",
      database: db,
      columns: [{
        name: "id",
        type: Schema.BIG_INT,
        comment: "some identifier"
      }, {
        name: "foo",
        type: Schema.struct([{
          name: "bar",
          type: Schema.STRING
        }])
      }],
      partitionKeys: [{
        name: "ymd", 
        type: Schema.DATE
      }],
      dataFormat: DataFormat.JSON,
      bucket: bucket,
      s3Prefix: "test-table/",
      compressed: false,
    })

    const now = new Date()
    const partitionRetainDays = 30;
    for (let i = 0; i <= partitionRetainDays; i++) {
      const ymd = format(subDays(now, i), 'yyyy-MM-dd')
      new CfnPartition(this, `Partition${ymd}`, {
        catalogId: this.account,
        databaseName: db.databaseName,
        tableName: table.tableName,
        partitionInput: {
          storageDescriptor: {
            location: `s3://${table.bucket.bucketName}/${table.s3Prefix}${ymd}/`,
            compressed: table.compressed,
            inputFormat: table.dataFormat.inputFormat.className,
            outputFormat: table.dataFormat.outputFormat.className,
            serdeInfo: {
              serializationLibrary: table.dataFormat.serializationLibrary.className
            },
          },
          values: [ymd],
        }
      })
    }

    const role = new Role(this, 'TestTableCrawlerRole', {
      roleName: "test-table-crawler-role",
      assumedBy: new ServicePrincipal("glue.amazonaws.com"),
      managedPolicies: [
        ManagedPolicy.fromAwsManagedPolicyName("service-role/AWSGlueServiceRole")
      ]
    })

    role.attachInlinePolicy(new Policy(this, "TestTableCrawlerRolePolicy", {
      statements: [new PolicyStatement({
        effect: Effect.ALLOW,
        actions: ["s3:GetObject"],
        resources: [`arn:aws:s3:::${bucket.bucketName}/*`]
      })]
    }))

    new CfnCrawler(this, 'TestTableCrawler', {
      name: "test-crawler",
      role: role.roleArn,
      databaseName: db.databaseName,
      targets: {
        s3Targets: [{
          path: `s3://${bucket.bucketName}/test/`,
        }]
      },
      schedule: {
        scheduleExpression: "cron(0 0 * * ? *)"
      }
    })
  }
}