AthenaのTPC-DS Connectorで250GBのデータを生成してS3に出力しようとしたところ最大までLambdaのリソースを上げてもタイムアウトしてしまったのでGlueで行う。
AthenaのFederated QueryでTPC-DS Connectorを用いてデータを生成する - sambaiz-net
GlueのTPC-DS connectorをSubscribeしてActivateする。
スクリプトはこんな感じで必要なパラメータを外から渡せるようにした。scaleはAthenaのものと同じくGB単位。 アップロード時にカタログにテーブルが追加されるようにしている。
from socket import create_connection
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_NAME", "DATABASE_NAME", "SCALE", "NUM_PARTITIONS", "FORMAT", "CONNECTION_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
bucketName = args["BUCKET_NAME"]
tables = [
"call_center",
"catalog_page",
"catalog_returns",
"catalog_sales",
"customer",
"customer_address",
"customer_demographics",
"date_dim",
"dbgen_version",
"household_demographics",
"income_band",
"inventory",
"item",
"promotion",
"reason",
"ship_mode",
"store",
"store_returns",
"store_sales",
"time_dim",
"warehouse",
"web_page",
"web_returns",
"web_sales",
"web_site"
]
for table in tables:
df = glueContext.create_dynamic_frame.from_options(
connection_type="marketplace.spark",
connection_options={
"table": table,
"scale": args["SCALE"],
"numPartitions": args["NUM_PARTITIONS"],
"connectionName": args["CONNECTION_NAME"],
},
transformation_ctx=f"src_{table}",
)
database = args["DATABASE_NAME"]
sink = glueContext.getSink(
path=f"s3://{bucketName}/{database}/{table}/",
connection_type="s3",
compression="gzip",
updateBehavior="UPDATE_IN_DATABASE",
enableUpdateCatalog=True,
transformation_ctx=f"sink_{table}")
if args["FORMAT"] == "parquet":
sink.setFormat("glueparquet", compression="snappy")
else:
sink.setFormat(args["FORMAT"])
sink.setCatalogInfo(catalogDatabase = database, catalogTableName = table)
sink.writeFrame(df)
job.commit()
JobはCDKで作成した。connectionsにTPC-DS Connectorを指定している。
import { Role, CompositePrincipal, ServicePrincipal, PolicyStatement, ManagedPolicy } from 'aws-cdk-lib/aws-iam';
import { Database, Job, JobExecutable, GlueVersion, Code, PythonVersion, WorkerType, Connection } from '@aws-cdk/aws-glue-alpha';
const glueWriteTPCDSRole = new Role(this, `GlueWriteTPCDSRole${databaseName}`, {
assumedBy: new CompositePrincipal(
new ServicePrincipal('glue.amazonaws.com'),
),
managedPolicies: [
ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole'),
ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2ContainerRegistryReadOnly")
]
})
glueWriteTPCDSRole.addToPolicy(new PolicyStatement({
resources: [
`${bucket.bucketArn}/*`,
],
actions: ['s3:Put*'],
}))
const workerCount = 10
new Job(this, `GlueWriteTPCDSJob${databaseName}`, {
jobName: `write-tpcds-job-${databaseName}`,
workerType: WorkerType.STANDARD,
workerCount: workerCount,
executable: JobExecutable.pythonEtl({
glueVersion: GlueVersion.V3_0,
pythonVersion: PythonVersion.THREE,
script: Code.fromAsset('./write_data.py'),
}),
role: glueWriteTPCDSRole,
enableProfilingMetrics: true,
connections: [Connection.fromConnectionName(this, `TCPDSConnector${databaseName}`, 'tpcds')],
defaultArguments: {
"--BUCKET_NAME": bucket.bucketName,
"--DATABASE_NAME": databaseName,
"--SCALE": scale.toString(),
"--NUM_PARTITIONS": ((workerCount * 2 - 1) * 4).toString(),
"--FORMAT": format,
"--CONNECTION_NAME": "tpcds"
}
})
}
worker=10,scale=1000で実行するとjson+gzipで6時間半、parquet+snappyで8時間かかった。 実行後そのままAthenaでデータを確認できる。