GlueはConnectorによって様々なデータソースに対応していて、RDSやMongoDBなど標準で提供されているもの以外にも カスタムコネクタを用いることで接続できる。今回はMarketplaceで提供されているBigQueryのカスタムコネクタを用いてBigQueryのテーブルの内容をS3に出力するJobを作成する。
GlueのETL JobをGUIで構築したりモニタリングできるサービス、Glue StudioからMarketplaceに飛んで AWS Glue Connector for Google BigQueryをSubscribeする。
BigQuery ConnectorをactivateしConnectionを作成する。
Studioでない方のGlueのConnectionからも確認できる。
JobのRoleには
117940112483.dkr.ecr.us-east-1.amazonaws.com/818e4ebf-997f-4d87-beb3-e0196e500474/cg-1025003233/bigquery-spark-connector:2.12.0-latest
をpullするための GetAuthorizationToken
と GCPのcredentialをSecretsManagerから持ってくるための GetSecretValue
が必要。
SecretsManagerにそのままcredentialのjsonを入れて実行したところ次のエラーが出た。正しくはcredentialsというキーにbase64エンコードしたjsonの値を入れる。
IllegalArgumentException: 'A project ID is required for this service but could not be determined from the builder or the environment. Please set a project ID using the builder.'
また、Apache Spark SQL connector for Google BigQueryが
Storage Read APIを呼ぶので bigquery.readsessions.*
の権限が必要。Viewを参照する場合はconnection_options
でviewsEnabled=“true”を渡して、Viewのクエリを実行するためのbigquery.jobs.create
と、クエリの結果が保存されるMaterialized viewの作成に必要なbigquery.tables.create
の権限が必要になる。Materialized viewはデフォルトで同じdatasetに作られるが、materializationProject
やmaterializationDataset
で変更できる。
JobのConnection Options に parentProject
としてGCPのProjectID、table
としてBQのテーブル名を入れると次のようなスクリプトが生成され、
実行するとS3にBQのデータが保存される。DataFrameに変換してクエリを実行することもできる。DynamicFrameは現状overwriteできないので次のスクリプトは冪等性を持たない。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"parentProject":"<project-id>","connectionName":"test-bq","table":"<table-name>"}, transformation_ctx = "DataSource0")
DataSink0 = glueContext.getSink(path = "s3://hogefuga/", connection_type = "s3", updateBehavior = "UPDATE_IN_DATABASE", partitionKeys = [], enableUpdateCatalog = True, transformation_ctx = "DataSink0")
DataSink0.setCatalogInfo(catalogDatabase = "testdb",catalogTableName = "<catalog-table-name>")
DataSink0.setFormat("json")
DataSink0.writeFrame(DataSource0)
job.commit()
_PARTITIONDATE
と _PARTITIONTIME
はクエリで参照できないので次のようにフィルタする。
DataSource0.filter(f"""_PARTITIONDATE = '{time.strftime("%Y-%m-%d")}'""")
参考
AWS Glueカスタムコネクタを使用したGoogle BigQueryからAmazon S3へのデータの移行 | Amazon Web Services ブログ