Querying Snowflake with the Spark Connector

sparksnowflakescala

To access Snowflake data from applications, you can use the Spark connector (which connects via JDBC) or the Snowpark API etc. In both cases, queries are basically lazily evaluated using Snowflake’s compute resources, but operations that cannot be pushed down, such as Spark UDFs, are executed on the Spark cluster. In contrast, the Snowpark API guarantees that all processing is performed on the Snowflake side, so you don’t need to manage a cluster, which is why it is recommended. However, if you need Spark to process large data outside of Snowflake, the Spark connector, which returns a Spark DataFrame, is convenient.

Note that the Snowpark library currently does not support Scala 2.13.

Install spark-snowflake and a compatible version of snowflake-jdbc.

libraryDependencies ++= Seq(
  "net.snowflake" % "snowflake-jdbc" % "3.24.0",
  "net.snowflake" %% "spark-snowflake" % "3.1.1"
)

You can run queries by passing either a password or private key for authentication. The BEGIN/END PRIVATE KEY parts of the private key must be removed. Utils.runQuery() is concise but does not return results.

Running Terraform and Querying from Snowflake CLI and gosnowflake with Key Pair Authentication - sambaiz-net

import net.snowflake.spark.snowflake.Utils
import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME

val keyPem = getPrivateKey()
val trimHeaderAndFooter: Regex =
  """-----BEGIN PRIVATE KEY-----|-----END PRIVATE KEY-----|\s""".r

var sfOptions = Map(
  "sfURL" -> "*****.snowflakecomputing.com",
  "sfUser" -> "ADMIN",
  "pem_private_key" -> trimHeaderAndFooter.replaceAllIn(keyPem, ""),
  "sfDatabase" -> "TEST_DB",
  "sfSchema" -> "PUBLIC",
  "sfWarehouse" -> "DEFAULT"
)

val ddl =
  """
    |create or replace table USERS (
    |  id    integer,
    |  name string
    |);
    |""".stripMargin

Utils.runQuery(sfOptions, ddl)

val df = spark.read
  .format(SNOWFLAKE_SOURCE_NAME)
  .options(sfOptions)
  .option("query", "select * from users")
  .load()

df.show()