Snowflake に S3 のデータをコピーしてクエリする

snowflakeawsdatabase

Snowflake はAWS、GCP、Azure上で動作するフルマネージドなデータプラットフォーム。カラムナフォーマットでデータを持ち、各ノードが共有のデータセットの一部をローカルにキャッシュするハイブリッドなシェアードナッシングアーキテクチャでネットワークトラフィックを削減したり、自動でマイクロパーティションに切ってきめ細かなプルーニングを行うことで効率的に大規模なクエリを実行できる。

カラムナフォーマットParquetの構造とReadの最適化 - sambaiz-net

Hadoop クラスタなどのインフラストラクチャの面倒を見る必要がないこと以外にも、BigQuery のようなタイムトラベルや、アカウント間で容易にデータを共有できる特長がある。

BigQueryのタイムトラベルやスナップショットでデータを復元する - sambaiz-net

料金はプランによって単価が異なるコンピューティングのクレジットとストレージ、別のリージョンやクラウドへのデータエグレスに対してかかる。イングレスでは Snowflake の料金はかからないがクラウド側のエゲレスのコストは発生する場合がある。そのため、なるべくデータが集まっているクラウドのリージョン上で動かした方がコストを抑えられるが、GCP に関しては現状対応リージョンが少ない。多くの場合でクレジットが料金の大半を占めることになり、画面上や次のようなクエリで消費量を確認することができる。

WITH wh_bill AS (
   SELECT DATE(start_time) as day, warehouse_name, SUM(credits_used_compute) AS compute_credits
     FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
     WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
   GROUP BY 1, 2
),
user_credits AS (
   SELECT DATE(start_time) as day, user_name, warehouse_name, SUM(credits_attributed_compute) AS credits
     FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ATTRIBUTION_HISTORY
     WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
     GROUP BY 1, 2, 3
),
total_credit AS (
   SELECT day, warehouse_name, SUM(credits) AS sum_all_credits
     FROM user_credits
   GROUP BY 1, 2
)
SELECT day,
       u.user_name,
       u.warehouse_name,
       u.credits / t.sum_all_credits * w.compute_credits AS attributed_credits
  FROM user_credits u
  JOIN total_credit t USING (day, warehouse_name) 
  JOIN wh_bill w USING (day, warehouse_name)

まず権限を持つ ROLE とリソースをプロビジョニングした WAREHOUSE を選択する。

USE ROLE ACCOUNTADMIN;
USE WAREHOUSE COMPUTE_WH;

WAREHOUSE を立ち上げている間サイズに応じたクレジットを消費する。自動で再開/停止することもできる。

S3 との STORAGE INTEGRATION を作成し、Snowflake の AWS アカウントと External ID を取得する。

CREATE OR REPLACE STORAGE INTEGRATION S3_DATA_INTEGRATION
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::*****:role/*****'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('s3://*****/');

DESC INTEGRATION S3_DATA_INTEGRATION;
...
# STORAGE_AWS_IAM_USER_ARN,String,arn:aws:iam::*****:user/*****,
# STORAGE_AWS_EXTERNAL_ID,String,*****,
...

このユーザーが assume する IAM Role を作成する。

const snowflakeRole = new iam.Role(this, 'SnowflakeS3Role', {
  assumedBy: new iam.ServicePrincipal('sts.amazonaws.com'),
  inlinePolicies: {
    SnowflakeS3AccessPolicy: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: [
            's3:GetObject',
            's3:PutObject',
            's3:ListBucket',
          ],
          resources: [
            bucket.bucketArn,
            `${bucket.bucketArn}/*`,
          ],
        }),
      ],
    })
  }
})

snowflakeRole.assumeRolePolicy?.addStatements(
  new iam.PolicyStatement({
    actions: ['sts:AssumeRole'],
    principals: [new iam.AccountPrincipal('*****')],
    conditions: {
      StringEquals: {
        'sts:ExternalId': '*****',
      },
    },
  })
)

あとは STAGE を作成し、データを TABLE に COPY する。Snowpipe で自動でロードすることもできる。なお、データベース名などの識別子をダブルクォートで囲まない場合、大文字として扱われるわけだが、囲むことで小文字のオブジェクトを作ることもでき Terraform などでも記述通りに作成される。ただ、そうした場合参照する際にもダブルクォートで囲まなければならず無用な混乱を招いてしまうので大文字で統一した方がよいのではと考えている。

CREATE OR REPLACE DATABASE TESTDB;
  
CREATE OR REPLACE SCHEMA TESTDB.S3_DATA;

CREATE OR REPLACE TABLE TESTDB.S3_DATA.USERS (
  id INT
  ,name VARCHAR(30)
);

CREATE OR REPLACE STAGE TESTDB.S3_DATA.S3_DATA_STAGE
  STORAGE_INTEGRATION = S3_DATA_INTEGRATION
  URL = 's3://*****/'
  FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1);
  
COPY INTO TESTDB.S3_DATA.USERS
  FROM @TESTDB.S3_DATA.S3_DATA_STAGE
    FILES = ('users.csv');

SELECT * FROM TESTDB.S3_DATA.USERS;

すると Snowflake からクエリできるようになる。

Snowflake の Role を Terraform で作成しユーザーにテーブルへのアクセス権限を与える - sambaiz-net

参考

Snowflakeの論文「The Snowflake Elastic Data Warehouse」を読んでみた_Part1