AthenaのFederated QueryでTPC-DS Connectorを用いてデータを生成する

awsetldatabase

AthenaのFederated Queryは データソースコネクタとなるLambdaを通してDynamoDBやRDSといったS3以外のデータソースにクエリを実行できる機能。

Athenaのデータソースコネクタとユーザー定義関数(UDF)を実装する - sambaiz-net

今回はAWS公式のリポジトリにあるTPC-DS Connectorを用いて、意思決定支援(Decision Support)におけるデータベースのベンチマークであるTPC-DSのデータを生成する。

公式のリポジトリにあるとはいえカスタム扱いなので自分でビルドする必要がある。基本的にREADME通り進めれば良いが、jdk16ではビルドに失敗したのでjdk8を入れて実行した。

$ brew tap homebrew/cask-versions
$ brew install --cask corretto8
export JAVA_HOME=`/usr/libexec/java_home -v 1.8`
export PATH=${JAVA_HOME}/bin:${PATH}
$ java -version
openjdk version "1.8.0_312"
OpenJDK Runtime Environment Corretto-8.312.07.1 (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM Corretto-8.312.07.1 (build 25.312-b07, mixed mode)

publish.shの引数でもリージョンを取るが、これとは別に .aws/config などで指定されていないとAWS SDKの方で読めずに失敗する。

$ git clone https://github.com/awslabs/aws-athena-query-federation.git -b v2021.51.1 --depth 1 
$ cd aws-athena-query-federation/
$ mvn clean install -DskipTests=true
$ cd athena-tpcds/
$ mvn clean install -DskipTests=true

# export AWS_REGION=us-west-2
$ ../tools/publish.sh <BUCKET_NAME> athena-tpcds us-west-2
# Run this script from the directory of the module (e.g. athena-example) that you wish to publish.
# This script performs the following actions:
# 1. Builds the maven project
# 2. Creates a Serverless Application Package using the athena-example.yaml
# 3. Produces a final packaged.yaml which can be used to publish the application to your
#     private Serverless Application Repository or deployed via Cloudformation.
# 4. Uploads the packaged connector code to the S3 bucket you specified.
# 5. Uses sar_bucket_policy.json to grant Serverless Application Repository access to our connector code in s3.
# 6. Published the connector to you private Serverless Application Repository where you can 1-click deploy it.
Do you wish to proceed? (yes or no) yes

完了するとServerless Application Repositoryにprivateなアプリケーションが登録されるのでLambda関数を作成する。

AWS SAMでLambdaの関数をデプロイしServerless Application Repositoryに公開する - sambaiz-net

後は select * from “lambda:tpcds_catalog”.tpcds1.customer limit 100 のようなクエリを実行すると 裏側でLambdaが走り生成されたデータが返る。tpcds1の数字はscale factorで、合わせて1GBのデータが含まれる。

UNLOADで保存できる。

UNLOAD (select * from "lambda:tpcds_catalog".tpcds1.catalog_sales) 
TO 's3://<BUCKET_NAME>/tpcds_data/' 
WITH ( format = 'JSON', compression = 'gzip')

そこでtcpds250の全てのテーブルをUNLOADしようとしたが、catalog_salesで最大の10GBまでLambdaのメモリを増やしてもタイムアウトしてしまったので、巨大なデータセットのファイルを生成する用途には向いていない。

GoでAthenaのクエリを実行する - sambaiz-net

package main

import (
	"database/sql"
	"fmt"
	"logging"
	"os"

	_ "github.com/segmentio/go-athena"
)

var tables = []string{
	"call_center",
	"catalog_page",
	"catalog_returns",
	"catalog_sales",
	"customer",
	"customer_address",
	"customer_demographics",
	"date_dim",
	"dbgen_version",
	"household_demographics",
	"income_band",
	"inventory",
	"item",
	"promotion",
	"reason",
	"ship_mode",
	"store",
	"store_returns",
	"store_sales",
	"time_dim",
	"warehouse",
	"web_page",
	"web_returns",
	"web_sales",
	"web_site",
}

func main() {
	unloadBucket := os.Getenv("UNLOAD_BUCKET")
	scaleFactor := os.Getenv("SCALE_FACTOR")
	queryResultBucket := os.Getenv("QUERY_RESULT_BUCKET")
	db, err := sql.Open("athena", fmt.Sprintf("db=default&output_location=%s", fmt.Sprintf("s3://%s", queryResultBucket)))
	if err != nil {
		log.Fatal(err)
	}

	for _, format := range []string{"JSON", "PARQUET"} {
		for i, table := range tables {
			fmt.Printf("%s [%d/%d] unload %s\n", format, i+1, len(tables), table)
			_, err := db.Query(fmt.Sprintf(`
		UNLOAD (select * from "lambda:tpcds_catalog".tpcds%s.%s) 
		TO 's3://%s/tcpds%s/%s/%s/' 
		WITH ( format = '%s', compression = 'gzip')`,
				scaleFactor, table,
				unloadBucket, scaleFactor, format, table,
				format))
			if err != nil {
				panic(err)
			}
		}
	}
}

GlueのTPC-DS Connectorでデータを生成する - sambaiz-net

参考

TPC-DSから学ぶPostgreSQLの弱点と今後の展望

Java 8 (OpenJDK: Amazon Corretto) を Homebrew で macOS にインストールする - Qiita