AWS GlueでCSVを加工しParquetに変換してパーティションを切りAthenaで参照する

(2019-01-01)

AWS GlueはマネージドなETL(Extract/Transform/Load)サービスで、Sparkを使ってS3などにあるデータを読み込み加工して変換出力したり、AthenaやRedshift Spectrumで参照できるデータカタログを提供する。 今回はS3のCSVを読み込んで加工し、列指向フォーマットParquetに変換しパーティションを切って出力、その後クローラを回してデータカタログにテーブルを作成してAthenaで参照できることを確認する。

料金はジョブがDPU(4vCPU/16GBメモリ)時間あたり$0.44(最低2DPU/10分)かかる。 また、クローラも同様にDPUで課金される。

ジョブのスクリプトのアップロード

スクリプトをS3に上げる。 GlueはSpark標準のDataFrameを扱うこともできるが、独自にスキーマを柔軟に扱えるDynamicFrameというのをサポートしている。DataFrameとは相互に変換できるので、SQL文の実行などDataFrameにしかないAPIを使いたい場合は変換する。

data_frame = dynamc_frame.toDF()
data_frame.toDF().createOrReplaceTempView('t1')
data_frame2 = spark.sql('SELECT * FROM t1')

入力は為替データのCSV。

$ less kawase1.csv
Date,USD,GBP,EUR,CAD,CHF,SEK,DKK,NOK,AUD,NZD
2002/4/1,133.15,189.79,116.12,83.48,79.28,12.87,15.63,15.08,71.14,58.8
...

$ less kawase2.csv
Date,ZAR,BHD,HKD,INR,PHP,SGD,THB,KWD,SAR,AED,MXN,TWD
2002/4/1,11.76,353.65,17.07,2.73,2.61,72.21,3.07,434.14,35.52,36.26,14.81,3.82
...

やることは次の通り。

transformation_ctxは後述するジョブのブックマークで使われる一意な識別子。

import sys
import datetime
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


def split_date(rec):
    date = datetime.datetime.strptime(rec["Date"], "%Y/%m/%d")
    rec["year"] = date.year
    rec["month"] = date.month
    rec["day"] = date.day
    return rec


def convert_ratio_usd(rec):
    ret = {}
    for field in rec:
        ret[field] = rec[field]
        if field not in ["year", "month", "day"]:
            ret[field] /= rec["usd"]
    return ret


input_df = glueContext.create_dynamic_frame.from_options(
    transformation_ctx="source",
    connection_type="s3",
    connection_options={"paths": ["s3://*****/kawase1.csv"]},
    format="csv",
    format_options={"withHeader": True})
input_df2 = glueContext.create_dynamic_frame.from_options(
    transformation_ctx="source",
    connection_type="s3",
    connection_options={"paths": ["s3://*****/kawase2.csv"]},
    format="csv",
    format_options={"withHeader": True})

output_df = input_df.join(
    ["Date"], ["Date"], input_df2, transformation_ctx="info").map(
        split_date, transformation_ctx="split_date").apply_mapping(
            [("year", "smallint", "year", "smallint"),
             ("month", "smallint", "month", "smallint"),
             ("day", "smallint", "day", "smallint"),
             ("USD", "string", "usd", "double"),
             ("EUR", "string", "eur", "double"),
             ("AUD", "string", "aud", "double"),
             ("ZAR", "string", "zar", "double")],
            transformation_ctx="apply_mapping").map(
                convert_ratio_usd, transformation_ctx="convert_ratio_usd")

glueContext.write_dynamic_frame.from_options(
    frame=output_df,
    connection_type="s3",
    connection_options={
        "path": "s3://*****/target",
        "partitionKeys": ["year"],
        "compression": "gzip"
    },
    format="parquet",
    transformation_ctx="sink")
job.commit()

glue_client = boto3.client('glue', region_name='us-east-1')
glue_client.start_crawler(Name='kawase')

Glueのロールで読める場所にアップロードする。

$ aws s3 cp main.py s3://aws-glue-scripts-*****/root/main.py

ジョブとクローラの作成

ジョブを作成する。デフォルトで10DPU使うようになっているので調整する。ブックマークを有効にすると以前処理したデータは処理しないようにできるが、残念ながらParquetは対応していないため2回実行すると重複して出力されてしまう。

ジョブの作成

クローラはデータをクローリングしてデータカタログの指定のデータベースにスキーマやパーティションといったメタデータテーブルを作成する。 以前、LambdaでAthenaのスキーマを管理したりパーティションを切ったりするバッチを作ったが、Glueのデータカタログ上で同じことをやってくれる。 楽だけど現状DPUの設定項目がないのでデータが多くて時間がかかる場合はどうしようもなさそう。

Athenaのmigrationやpartitionするathena-adminを作った - sambaiz-net

ジョブの実行

トリガーを設定して定期的に実行することもできるが、今回は手動で実行する。

$ aws glue start-job-run --job-name kawase

パーティションごとにParquetが出力されている。

パーティションごとのディレクトリ

また、クローラの実行が終わるとデータカタログにテーブルが追加される。

Athenaで参照

AthenaをGlueリリース以前から使っていた場合はデータカタログをGlueのものにアップグレードする必要がある。 Athenaのところにリンクが出るので押しとけばIAMロールの更新など大体やってくれる。 これによってDatabaseにGlueのものが出るようになりクエリも実行できるようになる。

Athenaからの参照