AWS GlueでCSVを加工しParquetに変換してパーティションを切りAthenaで参照する

awspythonsparketl

AWS GlueはマネージドなETL(Extract/Transform/Load)サービスで、Sparkを使ってS3などにあるデータを読み込み加工して変換出力したり、AthenaやRedshift Spectrumで参照できるデータカタログを提供する。 今回はS3のCSVを読み込んで加工し、列指向フォーマットParquetに変換しパーティションを切って出力、その後クローラを回してデータカタログにテーブルを作成してAthenaで参照できることを確認する。

料金はジョブがDPU(4vCPU/16GBメモリ)時間あたり$0.44(最低2DPU/10分)かかる。 また、クローラも同様にDPUで課金される。

なお、AthenaのCTASでもParquetを出力することができる。 出力先にファイルがないようにする必要があったり重いクエリは失敗することがあるが手軽で良い。

import * as athena from 'athena-client'
const clientConfig: athena.AthenaClientConfig = {
	bucketUri: 's3://*****/*****'
  skipFetchResult: true,
};
const awsConfig: athena.AwsConfig = {
	region: 'us-east-1',
};

const client = athena.createClient(clientConfig, awsConfig);

(async () => {
	await client.execute(`
		CREATE TABLE *****
    WITH (
      format = 'PARQUET',
      external_location = 's3://*****'
    ) AS (
      SELECT ~~
    )
})();

開発用エンドポイント

ジョブの立ち上がりにやや時間がかかるため開発用エンドポイントを立ち上げておくとDPUが確保されて効率よく開発できる。 立ち上げている間のDPUの料金がかかる。つまりずっとジョブを実行し続けているようなもので結構高くつくので終わったら閉じるようにしたい。

ローカルやEC2から自分で開発用エンドポイントにsshしてREPLで実行したりNotebookを立てることもできるが、 コンソールから立ち上げたNotebookは最初からつながっていて鍵の登録も必要なくて楽。

$ ssh -i private-key-file-path 9007:169.254.76.1 [email protected] -t gluepyspark
>>> df = spark.read.parquet("s3://foo/bar")

$ ssh -i private-key-file-path -NTL 9007:169.254.76.1:9007 [email protected]

NotebookのインスタンスのIAMロールを作成するとCloudwatch Logsまわりに加えてGlueのDevEndpointとAssetの取得権限が付与されている。

{
  "Action": [
    "s3:ListBucket"
  ],
  "Effect": "Allow",
  "Resource": [
    "arn:aws:s3:::aws-glue-jes-prod-us-east-1-assets"
  ]
}
{
  "Action": [
    "s3:GetObject"
  ],
  "Effect": "Allow",
  "Resource": [
    "arn:aws:s3:::aws-glue-jes-prod-us-east-1-assets*"
  ]
}
{
  "Action": [
    "glue:UpdateDevEndpoint",
    "glue:GetDevEndpoint",
    "glue:GetDevEndpoints"
  ],
  "Effect": "Allow",
  "Resource": [
    "arn:aws:glue:us-east-1:*****:devEndpoint/test*"
  ]
}

たまに立ち上げに失敗したり次のようなエラーで実行できないことがあったが、立ち上げ直したらうまくいった。

The code failed because of a fatal error:
	Error sending http request and maximum retry encountered..

Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.

printデバッグが捗る。

...
print(output_df.toDF().collect()[0].asDict())

ジョブのスクリプト

GlueはSpark標準のDataFrameを扱うこともできるが、スキーマを明示しない独自のDynamicFrameというのをサポートしている。DataFrameとは相互に変換できるのでSQL文の実行などDataFrameにしかないAPIを使いたい場合は変換すれば良い。

Apache SparkのRDD, DataFrame, DataSetとAction, Transformation - sambaiz-net

from pyspark.sql.utils import ParseException
data_frame = dynamc_frame.toDF()
data_frame.toDF().createOrReplaceTempView('t1')
try:
  data_frame2 = spark.sql('SELECT * FROM t1')
except ParseException as e:
    print(str(e).replace("\\n", "\n"))

ただ、データフォーマットが不定な場合、読み込んだデータセットによってはクエリで参照しているフィールドがなくcannnot resolve *** given input columnsで失敗することがある。これは create_dynamic_frame_from_catalog()で読み込んだ際も同様で、カタログのスキーマに存在しても実際のデータに存在しないフィールドは、変換したDataFrameのスキーマには含まれない。

入力は為替データのCSV。

$ less kawase1.csv
Date,USD,GBP,EUR,CAD,CHF,SEK,DKK,NOK,AUD,NZD
2002/4/1,133.15,189.79,116.12,83.48,79.28,12.87,15.63,15.08,71.14,58.8
...

$ less kawase2.csv
Date,ZAR,BHD,HKD,INR,PHP,SGD,THB,KWD,SAR,AED,MXN,TWD
2002/4/1,11.76,353.65,17.07,2.73,2.61,72.21,3.07,434.14,35.52,36.26,14.81,3.82
...

やることは次の通り。

transformation_ctxは後述するジョブのブックマークで使われる一意な識別子。

import sys
import datetime
import boto3
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


def split_date(rec):
    date = datetime.datetime.strptime(rec["Date"], "%Y/%m/%d")
    rec["year"] = date.year
    rec["month"] = date.month
    rec["day"] = date.day
    return rec


def convert_ratio_usd(rec):
    ret = {}
    for field in rec:
        ret[field] = rec[field]
        if field not in ["year", "month", "day"]:
            ret[field] /= rec["usd"]
    return ret


input_df = glueContext.create_dynamic_frame.from_options(
    transformation_ctx="source",
    connection_type="s3",
    connection_options={"paths": ["s3://*****/kawase1.csv"]},
    format="csv",
    format_options={"withHeader": True})
input_df2 = glueContext.create_dynamic_frame.from_options(
    transformation_ctx="source",
    connection_type="s3",
    connection_options={"paths": ["s3://*****/kawase2.csv"]},
    format="csv",
    format_options={"withHeader": True})

output_df = input_df.join(
    ["Date"], ["Date"], input_df2, transformation_ctx="info").map(
        split_date, transformation_ctx="split_date").apply_mapping(
            [("year", "smallint", "year", "smallint"),
             ("month", "smallint", "month", "smallint"),
             ("day", "smallint", "day", "smallint"),
             ("USD", "string", "usd", "double"),
             ("EUR", "string", "eur", "double"),
             ("AUD", "string", "aud", "double"),
             ("ZAR", "string", "zar", "double")],
            transformation_ctx="apply_mapping").map(
                convert_ratio_usd, transformation_ctx="convert_ratio_usd")

glueContext.write_dynamic_frame.from_options(
    frame=output_df,
    connection_type="s3",
    connection_options={
        "path": "s3://*****/target",
        "partitionKeys": ["year"],
        "compression": "gzip"
    },
    format="parquet",
    transformation_ctx="sink")
job.commit()

glue_client = boto3.client('glue', region_name='us-east-1')
glue_client.start_crawler(Name='kawase')

Glueのロールで読める場所にアップロードする。

$ aws s3 cp main.py s3://aws-glue-scripts-*****/root/main.py

ちなみに、glueContext.get_logger()でログを出力すると、 /aws-glue/jobs/errorに送られてしまうので /aws-glue/jobs/outputに送りたい場合はloggerを作る。

def make_logger():
    logger = logging.getLogger()
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(message)s'))
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    return logger

ジョブとクローラの作成

ジョブを作成する。デフォルトで10DPU使うようになっているので調整する。1DPUで2つのExecutorが動くブックマークを有効にすると以前処理したデータは処理しないようにできるが、残念ながらParquetは対応していないため2回実行すると重複して出力されてしまう。

(追記: 2021-04-16) 今はParquetも対応している

GlueのJobのBookmarkを有効にして前回の続きから処理を行う - sambaiz-net

ジョブの作成

クローラはデータをクローリングしてデータカタログの指定のデータベースにスキーマやパーティションといったメタデータテーブルを作成する。 以前、LambdaでAthenaのスキーマを管理したりパーティションを切ったりするバッチを作ったが、Glueのデータカタログ上で同じことをやってくれる。 楽だが現状DPUの設定項目がないのでデータが多くて時間がかかる場合はどうしようもなさそう。

Athenaのmigrationやpartitionするathena-adminを作った - sambaiz-net

CDKでGlue Data CatalogのDatabase,Table,Partitionを作成する - sambaiz-net

ジョブの実行

トリガーを設定して定期的に実行することもできるが、今回は手動で実行する。

$ aws glue start-job-run --job-name kawase

パーティションごとにParquetが出力されている。

パーティションごとのディレクトリ

また、クローラの実行が終わるとデータカタログにテーブルが追加される。

Athenaで参照

AthenaをGlueリリース以前から使っていた場合はデータカタログをGlueのものにアップグレードする必要がある。 Athenaのところにリンクが出るので押しとけばIAMロールの更新など大体やってくれる。 これによってDatabaseにGlueのものが出るようになりクエリも実行できるようになる。

Athenaからの参照

なお、マニュアルでテーブルを作ることもできるが、arraystructのスキーマを小文字で記述しないとAthenaのクエリを実行する際にHIVE_METASTORE_ERRORとなる。