AWS GlueでCSVを加工しParquetに変換してパーティションを切りAthenaで参照する

(2019-01-01)

AWS GlueはマネージドなETL(Extract/Transform/Load)サービスで、Sparkを使ってS3などにあるデータを読み込み加工して変換出力したり、AthenaやRedshift Spectrumで参照できるデータカタログを提供する。 今回はS3のCSVを読み込んで加工し、列指向フォーマットParquetに変換しパーティションを切って出力、その後クローラを回してデータカタログにテーブルを作成してAthenaで参照できることを確認する。

料金はジョブがDPU(4vCPU/16GBメモリ)時間あたり$0.44(最低2DPU/10分)かかる。 また、クローラも同様にDPUで課金される。結構高い。

なお、AthenaのCTASでもParquetを出力することができる。 出力先にファイルがないようにする必要があったり重いクエリは失敗することがあるが手軽で良い。

import * as athena from 'athena-client'
const clientConfig: athena.AthenaClientConfig = {
	bucketUri: 's3://*****/*****'
  skipFetchResult: true,
};
const awsConfig: athena.AwsConfig = {
	region: 'us-east-1',
};

const client = athena.createClient(clientConfig, awsConfig);

(async () => {
	await client.execute(`
		CREATE TABLE *****
    WITH (
      format = 'PARQUET',
      external_location = 's3://*****'
    ) AS (
      SELECT ~~
    )
})();

開発用エンドポイント

ジョブの立ち上がりにやや時間がかかるため開発用エンドポイントを立ち上げておくとDPUが確保されて効率よく開発できる。 立ち上げている間のDPUの料金がかかる。つまりずっとジョブを実行し続けているようなもので結構高くつくので終わったら閉じるようにしたい。

ローカルやEC2から自分で開発用エンドポイントにsshしてNotebookを立てることもできるが、 コンソールから立ち上げたNotebookは最初からつながっていて鍵の登録も必要なくて楽。

ssh -i private-key-file-path -NTL 9007:169.254.76.1:9007 [email protected]

NotebookのインスタンスのIAMロールを作成するとCloudwatch Logsまわりに加えてGlueのDevEndpointとAssetの取得権限が付与されている。

{
  "Action": [
    "s3:ListBucket"
  ],
  "Effect": "Allow",
  "Resource": [
    "arn:aws:s3:::aws-glue-jes-prod-us-east-1-assets"
  ]
}
{
  "Action": [
    "s3:GetObject"
  ],
  "Effect": "Allow",
  "Resource": [
    "arn:aws:s3:::aws-glue-jes-prod-us-east-1-assets*"
  ]
}
{
  "Action": [
    "glue:UpdateDevEndpoint",
    "glue:GetDevEndpoint",
    "glue:GetDevEndpoints"
  ],
  "Effect": "Allow",
  "Resource": [
    "arn:aws:glue:us-east-1:*****:devEndpoint/test*"
  ]
}

たまに立ち上げに失敗したり次のようなエラーで実行できないことがあったが、立ち上げ直したらうまくいった。

The code failed because of a fatal error:
	Error sending http request and maximum retry encountered..

Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.

printデバッグが捗る。

...
print(output_df.toDF().collect()[0].asDict())

ジョブのスクリプト

GlueはSpark標準のDataFrameを扱うこともできるが、独自にスキーマを柔軟に扱えるDynamicFrameというのをサポートしている。DataFrameとは相互に変換できるので、SQL文の実行などDataFrameにしかないAPIを使いたい場合は変換する。 ただ、Sparkの設定を変えられない分、DynamicFrameがうまくやっているところもある。

Apache SparkのRDD, DataFrame, DataSetとAction, Transformation - sambaiz-net

from pyspark.sql.utils import ParseException
data_frame = dynamc_frame.toDF()
data_frame.toDF().createOrReplaceTempView('t1')
try:
  data_frame2 = spark.sql('SELECT * FROM t1')
except ParseException as e:
    print(str(e).replace("\\n", "\n"))

入力は為替データのCSV。

$ less kawase1.csv
Date,USD,GBP,EUR,CAD,CHF,SEK,DKK,NOK,AUD,NZD
2002/4/1,133.15,189.79,116.12,83.48,79.28,12.87,15.63,15.08,71.14,58.8
...

$ less kawase2.csv
Date,ZAR,BHD,HKD,INR,PHP,SGD,THB,KWD,SAR,AED,MXN,TWD
2002/4/1,11.76,353.65,17.07,2.73,2.61,72.21,3.07,434.14,35.52,36.26,14.81,3.82
...

やることは次の通り。

transformation_ctxは後述するジョブのブックマークで使われる一意な識別子。

import sys
import datetime
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


def split_date(rec):
    date = datetime.datetime.strptime(rec["Date"], "%Y/%m/%d")
    rec["year"] = date.year
    rec["month"] = date.month
    rec["day"] = date.day
    return rec


def convert_ratio_usd(rec):
    ret = {}
    for field in rec:
        ret[field] = rec[field]
        if field not in ["year", "month", "day"]:
            ret[field] /= rec["usd"]
    return ret


input_df = glueContext.create_dynamic_frame.from_options(
    transformation_ctx="source",
    connection_type="s3",
    connection_options={"paths": ["s3://*****/kawase1.csv"]},
    format="csv",
    format_options={"withHeader": True})
input_df2 = glueContext.create_dynamic_frame.from_options(
    transformation_ctx="source",
    connection_type="s3",
    connection_options={"paths": ["s3://*****/kawase2.csv"]},
    format="csv",
    format_options={"withHeader": True})

output_df = input_df.join(
    ["Date"], ["Date"], input_df2, transformation_ctx="info").map(
        split_date, transformation_ctx="split_date").apply_mapping(
            [("year", "smallint", "year", "smallint"),
             ("month", "smallint", "month", "smallint"),
             ("day", "smallint", "day", "smallint"),
             ("USD", "string", "usd", "double"),
             ("EUR", "string", "eur", "double"),
             ("AUD", "string", "aud", "double"),
             ("ZAR", "string", "zar", "double")],
            transformation_ctx="apply_mapping").map(
                convert_ratio_usd, transformation_ctx="convert_ratio_usd")

glueContext.write_dynamic_frame.from_options(
    frame=output_df,
    connection_type="s3",
    connection_options={
        "path": "s3://*****/target",
        "partitionKeys": ["year"],
        "compression": "gzip"
    },
    format="parquet",
    transformation_ctx="sink")
job.commit()

glue_client = boto3.client('glue', region_name='us-east-1')
glue_client.start_crawler(Name='kawase')

Glueのロールで読める場所にアップロードする。

$ aws s3 cp main.py s3://aws-glue-scripts-*****/root/main.py

ジョブとクローラの作成

ジョブを作成する。デフォルトで10DPU使うようになっているので調整する。1DPUで2つのExecutorが動くブックマークを有効にすると以前処理したデータは処理しないようにできるが、残念ながらParquetは対応していないため2回実行すると重複して出力されてしまう。

ジョブの作成

クローラはデータをクローリングしてデータカタログの指定のデータベースにスキーマやパーティションといったメタデータテーブルを作成する。 以前、LambdaでAthenaのスキーマを管理したりパーティションを切ったりするバッチを作ったが、Glueのデータカタログ上で同じことをやってくれる。 楽だけど現状DPUの設定項目がないのでデータが多くて時間がかかる場合はどうしようもなさそう。

Athenaのmigrationやpartitionするathena-adminを作った - sambaiz-net

ジョブの実行

トリガーを設定して定期的に実行することもできるが、今回は手動で実行する。

$ aws glue start-job-run --job-name kawase

パーティションごとにParquetが出力されている。

パーティションごとのディレクトリ

また、クローラの実行が終わるとデータカタログにテーブルが追加される。

Athenaで参照

AthenaをGlueリリース以前から使っていた場合はデータカタログをGlueのものにアップグレードする必要がある。 Athenaのところにリンクが出るので押しとけばIAMロールの更新など大体やってくれる。 これによってDatabaseにGlueのものが出るようになりクエリも実行できるようになる。

Athenaからの参照