Snowflake の Spark コネクタでクエリを実行する

sparksnowflakescala

アプリケーションから Snowflake のデータを参照する方法として JDBC で接続する Spark コネクターSnowpark API がある。いずれの場合も基本的には Snowflake のコンピューティングリソースを用いてクエリが遅延評価されるが、Spark の UDF など変換できない処理は Spark クラスタ側で実行されるのに対して、Snowpark API は全ての処理が Snowflake 側で行われることが保証されておりクラスタを立てなくてもよいという点で推奨されている。ただ、Snowflake 外に大きなデータがあるなど Spark で処理する場合は Spark の Dataframe を返す Spark コネクターが便利。

なお Snowpark ライブラリは現状 Scala 2.13 には対応していない

spark-snowflake と、互換性のあるバージョンの snowflake-jdbc をインストールする。

libraryDependencies ++= Seq(
  "net.snowflake" % "snowflake-jdbc" % "3.24.0",
  "net.snowflake" %% "spark-snowflake" % "3.1.1"
)

パスワードか秘密鍵の認証情報を渡しクエリを実行する。秘密鍵の BEGIN/END PRIVATE 部分は消す必要がある。Utils.runQuery() は簡潔に記述できるが結果を返さない

キーペア認証で Terraform を実行したり Snowflake CLI や gosnowflake でクエリを実行する - sambaiz-net

import net.snowflake.spark.snowflake.Utils
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

val keyPem = getPrivateKey()
val trimHeaderAndFooter: Regex =
  """-----BEGIN PRIVATE KEY-----|-----END PRIVATE KEY-----|\s""".r

var sfOptions = Map(
  "sfURL" -> "*****.snowflakecomputing.com",
  "sfUser" -> "ADMIN",
  "pem_private_key" -> trimHeaderAndFooter.replaceAllIn(keyPem, ""),
  "sfDatabase" -> "TEST_DB",
  "sfSchema" -> "PUBLIC",
  "sfWarehouse" -> "DEFAULT"
)

val ddl =
  """
    |create or replace table USERS (
    |  id    integer,
    |  name string
    |);
    |""".stripMargin

Utils.runQuery(sfOptions, ddl)

val df = spark.read
  .format(SNOWFLAKE_SOURCE_NAME)
  .options(sfOptions)
  .option("query", "select * from users")
  .load()

df.show()