AWS GlueはマネージドなETL(Extract/Transform/Load)サービスで、Sparkを使ってS3などにあるデータを読み込み加工して変換出力したり、AthenaやRedshift Spectrumで参照できるデータカタログを提供する。 今回はS3のCSVを読み込んで加工し、列指向フォーマットParquetに変換しパーティションを切って出力、その後クローラを回してデータカタログにテーブルを作成してAthenaで参照できることを確認する。
料金はジョブがDPU(4vCPU/16GBメモリ)時間あたり$0.44(最低2DPU/10分)かかる。 また、クローラも同様にDPUで課金される。
なお、AthenaのCTASでもParquetを出力することができる。 出力先にファイルがないようにする必要があったり重いクエリは失敗することがあるが手軽で良い。
import * as athena from 'athena-client'
const clientConfig: athena.AthenaClientConfig = {
bucketUri: 's3://*****/*****'
skipFetchResult: true,
};
const awsConfig: athena.AwsConfig = {
region: 'us-east-1',
};
const client = athena.createClient(clientConfig, awsConfig);
(async () => {
await client.execute(`
CREATE TABLE *****
WITH (
format = 'PARQUET',
external_location = 's3://*****'
) AS (
SELECT ~~
)
})();
開発用エンドポイント
ジョブの立ち上がりにやや時間がかかるため開発用エンドポイントを立ち上げておくとDPUが確保されて効率よく開発できる。 立ち上げている間のDPUの料金がかかる。つまりずっとジョブを実行し続けているようなもので結構高くつくので終わったら閉じるようにしたい。
ローカルやEC2から自分で開発用エンドポイントにsshしてREPLで実行したりNotebookを立てることもできるが、 コンソールから立ち上げたNotebookは最初からつながっていて鍵の登録も必要なくて楽。
$ ssh -i private-key-file-path 9007:169.254.76.1 [email protected] -t gluepyspark
>>> df = spark.read.parquet("s3://foo/bar")
$ ssh -i private-key-file-path -NTL 9007:169.254.76.1:9007 [email protected]
NotebookのインスタンスのIAMロールを作成するとCloudwatch Logsまわりに加えてGlueのDevEndpointとAssetの取得権限が付与されている。
{
"Action": [
"s3:ListBucket"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::aws-glue-jes-prod-us-east-1-assets"
]
}
{
"Action": [
"s3:GetObject"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::aws-glue-jes-prod-us-east-1-assets*"
]
}
{
"Action": [
"glue:UpdateDevEndpoint",
"glue:GetDevEndpoint",
"glue:GetDevEndpoints"
],
"Effect": "Allow",
"Resource": [
"arn:aws:glue:us-east-1:*****:devEndpoint/test*"
]
}
たまに立ち上げに失敗したり次のようなエラーで実行できないことがあったが、立ち上げ直したらうまくいった。
The code failed because of a fatal error:
Error sending http request and maximum retry encountered..
Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.
printデバッグが捗る。
...
print(output_df.toDF().collect()[0].asDict())
ジョブのスクリプト
GlueはSpark標準のDataFrameを扱うこともできるが、スキーマを明示しない独自のDynamicFrameというのをサポートしている。DataFrameとは相互に変換できるのでSQL文の実行などDataFrameにしかないAPIを使いたい場合は変換すれば良い。
Apache SparkのRDD, DataFrame, DataSetとAction, Transformation - sambaiz-net
from pyspark.sql.utils import ParseException
data_frame = dynamc_frame.toDF()
data_frame.toDF().createOrReplaceTempView('t1')
try:
data_frame2 = spark.sql('SELECT * FROM t1')
except ParseException as e:
print(str(e).replace("\\n", "\n"))
ただ、データフォーマットが不定な場合、読み込んだデータセットによってはクエリで参照しているフィールドがなくcannnot resolve *** given input columns
で失敗することがある。これは create_dynamic_frame_from_catalog()で読み込んだ際も同様で、カタログのスキーマに存在しても実際のデータに存在しないフィールドは、変換したDataFrameのスキーマには含まれない。
入力は為替データのCSV。
$ less kawase1.csv
Date,USD,GBP,EUR,CAD,CHF,SEK,DKK,NOK,AUD,NZD
2002/4/1,133.15,189.79,116.12,83.48,79.28,12.87,15.63,15.08,71.14,58.8
...
$ less kawase2.csv
Date,ZAR,BHD,HKD,INR,PHP,SGD,THB,KWD,SAR,AED,MXN,TWD
2002/4/1,11.76,353.65,17.07,2.73,2.61,72.21,3.07,434.14,35.52,36.26,14.81,3.82
...
やることは次の通り。
- create_dynamic_frame.from_options()でS3のCSVを読んでDynamicFrameを生成
- Dateでjoin()
- apply_mapping()でフィールド名と型をマッピングする
- map()でDateをyear, month, dateの3フィールドに分割、各値をUSDとの比にする
- create_dynamic_frame.from_options()でパーティションをyearで切ってフォーマットはParquetで書き込む
- クローラの実行を開始する
transformation_ctxは後述するジョブのブックマークで使われる一意な識別子。
import sys
import datetime
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
def split_date(rec):
date = datetime.datetime.strptime(rec["Date"], "%Y/%m/%d")
rec["year"] = date.year
rec["month"] = date.month
rec["day"] = date.day
return rec
def convert_ratio_usd(rec):
ret = {}
for field in rec:
ret[field] = rec[field]
if field not in ["year", "month", "day"]:
ret[field] /= rec["usd"]
return ret
input_df = glueContext.create_dynamic_frame.from_options(
transformation_ctx="source",
connection_type="s3",
connection_options={"paths": ["s3://*****/kawase1.csv"]},
format="csv",
format_options={"withHeader": True})
input_df2 = glueContext.create_dynamic_frame.from_options(
transformation_ctx="source",
connection_type="s3",
connection_options={"paths": ["s3://*****/kawase2.csv"]},
format="csv",
format_options={"withHeader": True})
output_df = input_df.join(
["Date"], ["Date"], input_df2, transformation_ctx="info").map(
split_date, transformation_ctx="split_date").apply_mapping(
[("year", "smallint", "year", "smallint"),
("month", "smallint", "month", "smallint"),
("day", "smallint", "day", "smallint"),
("USD", "string", "usd", "double"),
("EUR", "string", "eur", "double"),
("AUD", "string", "aud", "double"),
("ZAR", "string", "zar", "double")],
transformation_ctx="apply_mapping").map(
convert_ratio_usd, transformation_ctx="convert_ratio_usd")
glueContext.write_dynamic_frame.from_options(
frame=output_df,
connection_type="s3",
connection_options={
"path": "s3://*****/target",
"partitionKeys": ["year"],
"compression": "gzip"
},
format="parquet",
transformation_ctx="sink")
job.commit()
glue_client = boto3.client('glue', region_name='us-east-1')
glue_client.start_crawler(Name='kawase')
Glueのロールで読める場所にアップロードする。
$ aws s3 cp main.py s3://aws-glue-scripts-*****/root/main.py
ちなみに、glueContext.get_logger()
でログを出力すると、
/aws-glue/jobs/error
に送られてしまうので
/aws-glue/jobs/output
に送りたい場合はloggerを作る。
def make_logger():
logger = logging.getLogger()
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
ジョブとクローラの作成
ジョブを作成する。デフォルトで10DPU使うようになっているので調整する。1DPUで2つのExecutorが動く。 ブックマークを有効にすると以前処理したデータは処理しないようにできるが、残念ながらParquetは対応していないため2回実行すると重複して出力されてしまう。
(追記: 2021-04-16) 今はParquetも対応している
クローラはデータをクローリングしてデータカタログの指定のデータベースにスキーマやパーティションといったメタデータテーブルを作成する。 以前、LambdaでAthenaのスキーマを管理したりパーティションを切ったりするバッチを作ったが、Glueのデータカタログ上で同じことをやってくれる。 楽だが現状DPUの設定項目がないのでデータが多くて時間がかかる場合はどうしようもなさそう。
Athenaのmigrationやpartitionするathena-adminを作った - sambaiz-net
CDKでGlue Data CatalogのDatabase,Table,Partitionを作成する - sambaiz-net
ジョブの実行
トリガーを設定して定期的に実行することもできるが、今回は手動で実行する。
$ aws glue start-job-run --job-name kawase
パーティションごとにParquetが出力されている。
また、クローラの実行が終わるとデータカタログにテーブルが追加される。
Athenaで参照
AthenaをGlueリリース以前から使っていた場合はデータカタログをGlueのものにアップグレードする必要がある。 Athenaのところにリンクが出るので押しとけばIAMロールの更新など大体やってくれる。 これによってDatabaseにGlueのものが出るようになりクエリも実行できるようになる。
なお、マニュアルでテーブルを作ることもできるが、array
やstruct
のスキーマを小文字で記述しないとAthenaのクエリを実行する際にHIVE_METASTORE_ERROR
となる。