Previously, I tried to generate 250GB of data with Athena’s TPC-DS Connector and output it to S3 but it timed out even if I increased the Lambda resource to the maximum, so I do it with Glue this time.
Generate data with TPC-DS Connector in Athena’s Federated Query - sambaiz-net
Subscribe and activate TPC-DS connector for Glue.
Write a script like following. The scale is in GB, the same as Athena’s one. Tables are added to a catalog at the time of upload.
from socket import create_connection
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME", "BUCKET_NAME", "DATABASE_NAME", "SCALE", "NUM_PARTITIONS", "FORMAT", "CONNECTION_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
bucketName = args["BUCKET_NAME"]
tables = [
"call_center",
"catalog_page",
"catalog_returns",
"catalog_sales",
"customer",
"customer_address",
"customer_demographics",
"date_dim",
"dbgen_version",
"household_demographics",
"income_band",
"inventory",
"item",
"promotion",
"reason",
"ship_mode",
"store",
"store_returns",
"store_sales",
"time_dim",
"warehouse",
"web_page",
"web_returns",
"web_sales",
"web_site"
]
for table in tables:
df = glueContext.create_dynamic_frame.from_options(
connection_type="marketplace.spark",
connection_options={
"table": table,
"scale": args["SCALE"],
"numPartitions": args["NUM_PARTITIONS"],
"connectionName": args["CONNECTION_NAME"],
},
transformation_ctx=f"src_{table}",
)
database = args["DATABASE_NAME"]
sink = glueContext.getSink(
path=f"s3://{bucketName}/{database}/{table}/",
connection_type="s3",
compression="gzip",
updateBehavior="UPDATE_IN_DATABASE",
enableUpdateCatalog=True,
transformation_ctx=f"sink_{table}")
if args["FORMAT"] == "parquet":
sink.setFormat("glueparquet", compression="snappy")
else:
sink.setFormat(args["FORMAT"])
sink.setCatalogInfo(catalogDatabase = database, catalogTableName = table)
sink.writeFrame(df)
job.commit()
Created a job with CDK. Specify TPC-DS Connector as connections.
import { Role, CompositePrincipal, ServicePrincipal, PolicyStatement, ManagedPolicy } from 'aws-cdk-lib/aws-iam';
import { Database, Job, JobExecutable, GlueVersion, Code, PythonVersion, WorkerType, Connection } from '@aws-cdk/aws-glue-alpha';
const glueWriteTPCDSRole = new Role(this, `GlueWriteTPCDSRole${databaseName}`, {
assumedBy: new CompositePrincipal(
new ServicePrincipal('glue.amazonaws.com'),
),
managedPolicies: [
ManagedPolicy.fromAwsManagedPolicyName('service-role/AWSGlueServiceRole'),
ManagedPolicy.fromAwsManagedPolicyName("AmazonEC2ContainerRegistryReadOnly")
]
})
glueWriteTPCDSRole.addToPolicy(new PolicyStatement({
resources: [
`${bucket.bucketArn}/*`,
],
actions: ['s3:Put*'],
}))
const workerCount = 10
new Job(this, `GlueWriteTPCDSJob${databaseName}`, {
jobName: `write-tpcds-job-${databaseName}`,
workerType: WorkerType.STANDARD,
workerCount: workerCount,
executable: JobExecutable.pythonEtl({
glueVersion: GlueVersion.V3_0,
pythonVersion: PythonVersion.THREE,
script: Code.fromAsset('./write_data.py'),
}),
role: glueWriteTPCDSRole,
enableProfilingMetrics: true,
connections: [Connection.fromConnectionName(this, `TCPDSConnector${databaseName}`, 'tpcds')],
defaultArguments: {
"--BUCKET_NAME": bucket.bucketName,
"--DATABASE_NAME": databaseName,
"--SCALE": scale.toString(),
"--NUM_PARTITIONS": ((workerCount * 2 - 1) * 4).toString(),
"--FORMAT": format,
"--CONNECTION_NAME": "tpcds"
}
})
}
When this job runs with worker=10 and scale=1000, it tooks 6.5 hours if json + gzip, and 8 hours if parquet + snappy. After execution, the data can be checked in Athena as it is after execution.
Compare Redshift Serverless and Athena performances by TPC-DS queries - sambaiz-net