Generate data with TPC-DS Connector in Athena's Federated Query

awslogdatabase

Athena’s Federated Query is a feature to execute queries on non-S3 data sources such as DynamoDB and RDS through Lambda function which is a data sources connector.

Implement Athena’s data source connectors and user defined functions (UDF) - sambaiz-net

This article uses TPC-DS Connector in the AWS official repository. It generates the data of TPC-DS, which is a database benchmark in Decision Support.

Although it is in the official repository, it is a custom connector, so you need to build it yourself. Basically, you can do it according to README, but it fails to build in jdk16, so install jdk8.

$ brew tap homebrew/cask-versions
$ brew install --cask corretto8
export JAVA_HOME=`/usr/libexec/java_home -v 1.8`
export PATH=${JAVA_HOME}/bin:${PATH}
$ java -version
openjdk version "1.8.0_312"
OpenJDK Runtime Environment Corretto-8.312.07.1 (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM Corretto-8.312.07.1 (build 25.312-b07, mixed mode)

publish.sh take a region as a parameter but if it is not set in .aws/config etc, AWS SDK cannot read it so it fails.

$ git clone https://github.com/awslabs/aws-athena-query-federation.git -b v2021.51.1 --depth 1 
$ cd aws-athena-query-federation/
$ mvn clean install -DskipTests=true
$ cd athena-tpcds/
$ mvn clean install -DskipTests=true

# export AWS_REGION=us-west-2
$ ../tools/publish.sh <BUCKET_NAME> athena-tpcds us-west-2
# Run this script from the directory of the module (e.g. athena-example) that you wish to publish.
# This script performs the following actions:
# 1. Builds the maven project
# 2. Creates a Serverless Application Package using the athena-example.yaml
# 3. Produces a final packaged.yaml which can be used to publish the application to your
#     private Serverless Application Repository or deployed via Cloudformation.
# 4. Uploads the packaged connector code to the S3 bucket you specified.
# 5. Uses sar_bucket_policy.json to grant Serverless Application Repository access to our connector code in s3.
# 6. Published the connector to you private Serverless Application Repository where you can 1-click deploy it.
Do you wish to proceed? (yes or no) yes

When completed, the private application is registered in the Serverless Application Repository, so create the lambda function.

AWS SAMでLambdaの関数をデプロイしServerless Application Repositoryに公開する - sambaiz-net

If you execute a query like select * from “lambda:tpcds_catalog”.tpcds1.customer limit 100, the lambda function runs in the backend and returns the generated data. The number of tpcds1 is a scale factor and it contains 1GB data in total.

It can be saved with UNLOAD.

UNLOAD (select * from "lambda:tpcds_catalog".tpcds250.catalog_sales) 
TO 's3://<BUCKET_NAME>/tpcds_data/' 
WITH ( format = 'JSON', compression = 'gzip')

So I tried to UNLOAD all the tables of tcpds250, but it timed out at catalog_sales even if I increased the memory of Lambda to the maximum of 10GB, so it is not suitable to generate files of a huge dataset.

GoでAthenaのクエリを実行する - sambaiz-net

package main

import (
	"database/sql"
	"fmt"
	"log"
	"os"

	_ "github.com/segmentio/go-athena"
)

var tables = []string{
	"call_center",
	"catalog_page",
	"catalog_returns",
	"catalog_sales",
	"customer",
	"customer_address",
	"customer_demographics",
	"date_dim",
	"dbgen_version",
	"household_demographics",
	"income_band",
	"inventory",
	"item",
	"promotion",
	"reason",
	"ship_mode",
	"store",
	"store_returns",
	"store_sales",
	"time_dim",
	"warehouse",
	"web_page",
	"web_returns",
	"web_sales",
	"web_site",
}

func main() {
	unloadBucket := os.Getenv("UNLOAD_BUCKET")
	scaleFactor := os.Getenv("SCALE_FACTOR")
	queryResultBucket := os.Getenv("QUERY_RESULT_BUCKET")
	db, err := sql.Open("athena", fmt.Sprintf("db=default&output_location=%s", fmt.Sprintf("s3://%s", queryResultBucket)))
	if err != nil {
		log.Fatal(err)
	}

	for _, format := range []string{"JSON", "PARQUET"} {
		for i, table := range tables {
			fmt.Printf("%s [%d/%d] unload %s\n", format, i+1, len(tables), table)
			_, err := db.Query(fmt.Sprintf(`
		UNLOAD (select * from "lambda:tpcds_catalog".tpcds%s.%s) 
		TO 's3://%s/tcpds%s/%s/%s/' 
		WITH ( format = '%s', compression = 'gzip')`,
				scaleFactor, table,
				unloadBucket, scaleFactor, format, table,
				format))
			if err != nil {
				panic(err)
			}
		}
	}
}

Generate data with TPC-DS Connector for Glue - sambaiz-net

References

TPC-DSから学ぶPostgreSQLの弱点と今後の展望

Java 8 (OpenJDK: Amazon Corretto) を Homebrew で macOS にインストールする - Qiita