GlueのカスタムコネクタでBigQueryに接続する

awssparkgcp

GlueはConnectorによって様々なデータソースに対応していて、RDSやMongoDBなど標準で提供されているもの以外にも カスタムコネクタを用いることで接続できる。今回はMarketplaceで提供されているBigQueryのカスタムコネクタを用いてBigQueryのテーブルの内容をS3に出力するJobを作成する。

GlueのETL JobをGUIで構築したりモニタリングできるサービス、Glue StudioからMarketplaceに飛んで AWS Glue Connector for Google BigQueryをSubscribeする。

ConnectorのSubscribe

BigQuery ConnectorをactivateしConnectionを作成する。

ConnectorとConnection

Studioでない方のGlueのConnectionからも確認できる。

GlueのConnection

JobのRoleには 117940112483.dkr.ecr.us-east-1.amazonaws.com/818e4ebf-997f-4d87-beb3-e0196e500474/cg-1025003233/bigquery-spark-connector:2.12.0-latest をpullするための GetAuthorizationToken と GCPのcredentialをSecretsManagerから持ってくるための GetSecretValue が必要。 SecretsManagerにそのままcredentialのjsonを入れて実行したところ次のエラーが出た。正しくはcredentialsというキーにbase64エンコードしたjsonの値を入れる。

IllegalArgumentException: 'A project ID is required for this service but could not be determined from the builder or the environment. Please set a project ID using the builder.'

また、Apache Spark SQL connector for Google BigQueryStorage Read APIを呼ぶので bigquery.readsessions.*権限が必要。

JobのConnection Options に parentProject としてGCPのProjectID、table としてBQのテーブル名を入れると次のようなスクリプトが生成され、 実行するとS3にBQのデータが保存される。DataFrameに変換してクエリを実行することもできる。DynamicFrameは現状overwriteできないので次のスクリプトは冪等性を持たない。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"parentProject":"<project-id>","connectionName":"test-bq","table":"<table-name>"}, transformation_ctx = "DataSource0")
DataSink0 = glueContext.getSink(path = "s3://hogefuga/", connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = [], enableUpdateCatalog = True, transformation_ctx = "DataSink0")
DataSink0.setCatalogInfo(catalogDatabase = "testdb",catalogTableName = "<catalog-table-name>")
DataSink0.setFormat("json")
DataSink0.writeFrame(DataSource0)

job.commit()

_PARTITIONDATE_PARTITIONTIME はクエリで参照できないので次のようにフィルタする。

DataSource0.filter(f"""_PARTITIONDATE = '{time.strftime("%Y-%m-%d")}'""")

参考

AWS Glueカスタムコネクタを使用したGoogle BigQueryからAmazon S3へのデータの移行 | Amazon Web Services ブログ