GlueのTPC-DS Connectorでデータを生成する

awsdatabase

AthenaのTPC-DS Connectorで250GBのデータを生成してS3に出力しようとしたところ最大までLambdaのリソースを上げてもタイムアウトしてしまったのでGlueで行う。

AthenaのFederated QueryでTPC-DS Connectorを用いてデータを生成する - sambaiz-net

GlueのTPC-DS connectorをSubscribeしてActivateする。

スクリプトはこんな感じで必要なパラメータを外から渡せるようにした。scaleはAthenaのものと同じくGB単位。 アップロード時にカタログにテーブルが追加されるようにしている。

from socket import create_connection
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_NAME", "DATABASE_NAME", "SCALE", "NUM_PARTITIONS", "FORMAT", "CONNECTION_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
bucketName = args["BUCKET_NAME"]

tables = [
  "call_center",
	"catalog_page",
	"catalog_returns",
	"catalog_sales",
	"customer",
	"customer_address",
	"customer_demographics",
	"date_dim",
	"dbgen_version",
	"household_demographics",
	"income_band",
	"inventory",
	"item",
	"promotion",
	"reason",
	"ship_mode",
	"store",
	"store_returns",
	"store_sales",
	"time_dim",
	"warehouse",
	"web_page",
	"web_returns",
	"web_sales",
	"web_site"
]

for table in tables:
  df = glueContext.create_dynamic_frame.from_options(
    connection_type="marketplace.spark",
    connection_options={
      "table": table,
      "scale": args["SCALE"],
      "numPartitions": args["NUM_PARTITIONS"],
      "connectionName": args["CONNECTION_NAME"],
    },
    transformation_ctx=f"src_{table}",
  )

  database = args["DATABASE_NAME"]
  sink = glueContext.getSink(
    path=f"s3://{bucketName}/{database}/{table}/",
    connection_type="s3", 
    compression="gzip",
    updateBehavior="UPDATE_IN_DATABASE", 
    enableUpdateCatalog=True, 
    transformation_ctx=f"sink_{table}")
  if args["FORMAT"] == "parquet":
    sink.setFormat("glueparquet", compression="snappy")
  else:
    sink.setFormat(args["FORMAT"])
  sink.setCatalogInfo(catalogDatabase = database, catalogTableName = table)
  sink.writeFrame(df)

job.commit()

JobはCDKで作成した。connectionsにTPC-DS Connectorを指定している。

import { Role, CompositePrincipal, ServicePrincipal, PolicyStatement, ManagedPolicy } from 'aws-cdk-lib/aws-iam';
import { Database, Job, JobExecutable, GlueVersion, Code, PythonVersion, WorkerType, Connection } from '@aws-cdk/aws-glue-alpha';

const glueWriteTPCDSRole = new Role(this, `GlueWriteTPCDSRole${databaseName}`, {
    assumedBy: new CompositePrincipal(
      new ServicePrincipal('glue.amazonaws.com'),
    ),
    managedPolicies: [
      ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole'),
      ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2ContainerRegistryReadOnly")
    ]
  })
  glueWriteTPCDSRole.addToPolicy(new PolicyStatement({
    resources: [
      `${bucket.bucketArn}/*`,
    ],
    actions: ['s3:Put*'],
  }))

  const workerCount = 10
  new Job(this, `GlueWriteTPCDSJob${databaseName}`, {
    jobName: `write-tpcds-job-${databaseName}`,
    workerType: WorkerType.STANDARD,
    workerCount: workerCount,
    executable: JobExecutable.pythonEtl({
      glueVersion: GlueVersion.V3_0,
      pythonVersion: PythonVersion.THREE,
      script: Code.fromAsset('./write_data.py'),
    }),
    role: glueWriteTPCDSRole,
    enableProfilingMetrics: true,
    connections: [Connection.fromConnectionName(this, `TCPDSConnector${databaseName}`, 'tpcds')],
    defaultArguments: {
      "--BUCKET_NAME": bucket.bucketName,
      "--DATABASE_NAME": databaseName,
      "--SCALE": scale.toString(),
      "--NUM_PARTITIONS": ((workerCount * 2 - 1) * 4).toString(),
      "--FORMAT": format,
      "--CONNECTION_NAME": "tpcds"
    }
  })
}

worker=10,scale=1000で実行するとjson+gzipで6時間半、parquet+snappyで8時間かかった。 実行後そのままAthenaでデータを確認できる。

TPC-DSのクエリを用いたRedshift ServerlessとAthenaの性能比較 - sambaiz-net