Snowflake はAWS、GCP、Azure上で動作するフルマネージドなデータプラットフォーム。カラムナフォーマットでデータを持ち、各ノードが共有のデータセットの一部をローカルにキャッシュするハイブリッドなシェアードナッシングアーキテクチャでネットワークトラフィックを削減したり、自動でマイクロパーティションに切ってきめ細かなプルーニングを行うことで効率的に大規模なクエリを実行できる。
カラムナフォーマットParquetの構造とReadの最適化 - sambaiz-net
Hadoop クラスタなどのインフラストラクチャの面倒を見る必要がないこと以外にも、BigQuery のようなタイムトラベルや、アカウント間で容易にデータを共有できる特長がある。
BigQueryのタイムトラベルやスナップショットでデータを復元する - sambaiz-net
料金はプランによって単価が異なるコンピューティングのクレジットとストレージ、別のリージョンやクラウドへのデータエグレスに対してかかる。イングレスでは Snowflake の料金はかからないがクラウド側のエゲレスのコストは発生する場合がある。そのため、なるべくデータが集まっているクラウドのリージョン上で動かした方がコストを抑えられるが、GCP に関しては現状対応リージョンが少ない。多くの場合でクレジットが料金の大半を占めることになり、画面上や次のようなクエリで消費量を確認することができる。
WITH wh_bill AS (
SELECT DATE(start_time) as day, warehouse_name, SUM(credits_used_compute) AS compute_credits
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
GROUP BY 1, 2
),
user_credits AS (
SELECT DATE(start_time) as day, user_name, warehouse_name, SUM(credits_attributed_compute) AS credits
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ATTRIBUTION_HISTORY
WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
GROUP BY 1, 2, 3
),
total_credit AS (
SELECT day, warehouse_name, SUM(credits) AS sum_all_credits
FROM user_credits
GROUP BY 1, 2
)
SELECT day,
u.user_name,
u.warehouse_name,
u.credits / t.sum_all_credits * w.compute_credits AS attributed_credits
FROM user_credits u
JOIN total_credit t USING (day, warehouse_name)
JOIN wh_bill w USING (day, warehouse_name)
まず権限を持つ ROLE とリソースをプロビジョニングした WAREHOUSE を選択する。
USE ROLE ACCOUNTADMIN;
USE WAREHOUSE COMPUTE_WH;
WAREHOUSE を立ち上げている間サイズに応じたクレジットを消費する。自動で再開/停止することもできる。
S3 との STORAGE INTEGRATION を作成し、Snowflake の AWS アカウントと External ID を取得する。
CREATE OR REPLACE STORAGE INTEGRATION S3_DATA_INTEGRATION
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::*****:role/*****'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('s3://*****/');
DESC INTEGRATION S3_DATA_INTEGRATION;
...
# STORAGE_AWS_IAM_USER_ARN,String,arn:aws:iam::*****:user/*****,
# STORAGE_AWS_EXTERNAL_ID,String,*****,
...
このユーザーが assume する IAM Role を作成する。
const snowflakeRole = new iam.Role(this, 'SnowflakeS3Role', {
assumedBy: new iam.ServicePrincipal('sts.amazonaws.com'),
inlinePolicies: {
SnowflakeS3AccessPolicy: new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
actions: [
's3:GetObject',
's3:PutObject',
's3:ListBucket',
],
resources: [
bucket.bucketArn,
`${bucket.bucketArn}/*`,
],
}),
],
})
}
})
snowflakeRole.assumeRolePolicy?.addStatements(
new iam.PolicyStatement({
actions: ['sts:AssumeRole'],
principals: [new iam.AccountPrincipal('*****')],
conditions: {
StringEquals: {
'sts:ExternalId': '*****',
},
},
})
)
あとは STAGE を作成し、データを TABLE に COPY する。Snowpipe で自動でロードすることもできる。なお、データベース名などの識別子をダブルクォートで囲まない場合、大文字として扱われるわけだが、囲むことで小文字のオブジェクトを作ることもでき Terraform などでも記述通りに作成される。ただ、そうした場合参照する際にもダブルクォートで囲まなければならず無用な混乱を招いてしまうので大文字で統一した方がよいのではと考えている。
CREATE OR REPLACE DATABASE TESTDB;
CREATE OR REPLACE SCHEMA TESTDB.S3_DATA;
CREATE OR REPLACE TABLE TESTDB.S3_DATA.USERS (
id INT
,name VARCHAR(30)
);
CREATE OR REPLACE STAGE TESTDB.S3_DATA.S3_DATA_STAGE
STORAGE_INTEGRATION = S3_DATA_INTEGRATION
URL = 's3://*****/'
FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1);
COPY INTO TESTDB.S3_DATA.USERS
FROM @TESTDB.S3_DATA.S3_DATA_STAGE
FILES = ('users.csv');
SELECT * FROM TESTDB.S3_DATA.USERS;
すると Snowflake からクエリできるようになる。
Snowflake の Role を Terraform で作成しユーザーにテーブルへのアクセス権限を与える - sambaiz-net
参考
Snowflakeの論文「The Snowflake Elastic Data Warehouse」を読んでみた_Part1