Athena(Presto)とGlue(Spark)で同じクエリを実行した際に異なる値が返る原因

awsprestosparketl

AWSではSQL-likeなクエリで集計を行うマネージドなサービスが複数あり、 アドホックな集計はGlueのデータカタログでテーブルを共有して手軽にクエリを実行できるPrestoベースのAthena、 重いバッチ集計はリソースや時間の制約を回避できるSparkベースのGlueといったように併用することができる。

Redshift Serverlessと他のサーバーレス集計サービス、Glue Data Catalogのテーブルへのクエリ実行 - sambaiz-net

ANSI互換のSQLを実行するPrestoと デフォルトでHive互換のSpark SQLを実行するSparkで 使える文法に差があったりするものの、同じクエリが使い回せることもあって、そのような場合は同じ結果が返ってくることを期待してしまうが、 次のような挙動の違いによって大きく結果が異なってしまうことがある。

数値の型

Presto 0.198 以降ではデフォルトで小数リテラルをDECIMALとして扱うが、 Athena engine version 2 (Presto 0.217)では DOUBLEになる。engine version 1 (Presto 0.172)との互換のために parse-decimal-literals-as-double が渡されているのかもしれない。

SELECT typeof(1.2), /* double */
       1 / 3.0 * 10000000 /* 3333333.333333333 */

Spark 2.3 以降も小数リテラルをDECIMALとして扱い、Glue 2.0 (Spark 2.4.3) でもそうなっている。 DECIMALのscale(小数点以下の桁数)は最大6に制限される。

print(spark.sql("""SELECT 1.2""").dtypes) 
# [('1.2', 'decimal(2,1)')]
print(spark.sql("""SELECT 1 / 3.0 * 10000000""").collect())
# [Row((CAST((CAST(CAST(1 AS DECIMAL(1,0)) AS DECIMAL(2,1)) / CAST(3.0 AS DECIMAL(2,1))) AS DECIMAL(14,6)) * CAST(CAST(10000000 AS DECIMAL(8,0)) AS DECIMAL(14,6)))=Decimal('3333330.000000'))]

Prestoでは整数同士で割るとキャストされずに切り捨てられるが、SparkではDOUBLEにキャストされる。 また、整数を小数リテラルで割るとPrestoではDOUBLE、SparkではDECIMALになるので精度が異なる。

SELECT 1 / 3, /* 0 */
       1 / 3.0 /* 0.3333333333333333 */
print(spark.sql("SELECT 1 / 3").collect())
#[Row((CAST(1 AS DOUBLE) / CAST(3 AS DOUBLE))=0.3333333333333333)]   
print(spark.sql("SELECT 1 / 3.0").collect())
# [Row((CAST(CAST(1 AS DECIMAL(1,0)) AS DECIMAL(2,1)) / CAST(3.0 AS DECIMAL(2,1)))=Decimal('0.333333'))]
print(spark.sql("SELECT 1D / 3.0").collect())
# [Row((1.0 / CAST(3.0 AS DOUBLE))=0.3333333333333333)] 

整数へのCAST

Athenaでは小数を整数にCASTするとroundされるが、Glueではfloorされる。

SELECT CAST(1.49 AS INTEGER), /* 1 */
       CAST(1.50 AS INTEGER)  /* 2 */
print(spark.sql("""SELECT CAST(1.49 AS INTEGER)""").collect()[0][0])
# 1
print(spark.sql("""SELECT CAST(1.50 AS INTEGER)""").collect()[0][0])
# 1

配列のインデックス

Athenaのインデックスは1から始まるがSparkは0から。

SELECT split('aaa,bbb,ccc,ddd', ',')[1] /* aaa */
print(spark.sql("SELECT split('aaa,bbb,ccc,ddd', ',')[1] as v").collect()[0]['v']) 
# bbb