Copy and Query S3 Data in Snowflake

snowflakeawsdatabase

Snowflake is a fully managed data platform that runs on AWS, GCP, and Azure. It stores data in columnar format, is built with hybrid shared-nothing architecture where nodes cache portions of shared datasets locally to suppress network traffic, and does automatic micro-partitioning for fine-grained pruning, enabling efficient large-scale query execution.

Columnar format Parquet structure and read optimization - sambaiz-net

In addition to eliminating the need to manage infrastructure like Hadoop clusters, it offers features such as time travel similar to BigQuery and easily data sharing between accounts.

Restoring BigQuery Data using time travel and snapshots - sambaiz-net

Pricing is based on computing credits that the unit price depends on the plan, storage, and data egress to another region or cloud. There is no Snowflake charge for ingress, but egress costs on the cloud side may occur, so it is better to run it in a cloud region where data is concentrated as much as possible to reduce costs, but currently there are few supported regions for GCP. In many cases, credits will account for the majority of the charge, and you can check your consumption on the screen or with the following query.

WITH wh_bill AS (
   SELECT DATE(start_time) as day, warehouse_name, SUM(credits_used_compute) AS compute_credits
     FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
     WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
   GROUP BY 1, 2
),
user_credits AS (
   SELECT DATE(start_time) as day, user_name, warehouse_name, SUM(credits_attributed_compute) AS credits
     FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ATTRIBUTION_HISTORY
     WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
     GROUP BY 1, 2, 3
),
total_credit AS (
   SELECT day, warehouse_name, SUM(credits) AS sum_all_credits
     FROM user_credits
   GROUP BY 1, 2
)
SELECT day,
       u.user_name,
       u.warehouse_name,
       u.credits / t.sum_all_credits * w.compute_credits AS attributed_credits
  FROM user_credits u
  JOIN total_credit t USING (day, warehouse_name) 
  JOIN wh_bill w USING (day, warehouse_name)

First, select a ROLE with appropriate permissions and a provisioned WAREHOUSE.

USE ROLE ACCOUNTADMIN;
USE WAREHOUSE COMPUTE_WH;

While the WAREHOUSE is running, it consumes credits according to its size. It can also be automatically resumed/suspended.

Create a STORAGE INTEGRATION with S3 and obtain the accessing AWS account for Snowflake and External ID.

CREATE OR REPLACE STORAGE INTEGRATION S3_DATA_INTEGRATION
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::*****:role/*****'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('s3://*****/');

DESC INTEGRATION S3_DATA_INTEGRATION;
...
# STORAGE_AWS_IAM_USER_ARN,String,arn:aws:iam::*****:user/*****,
# STORAGE_AWS_EXTERNAL_ID,String,*****,
...

Create an IAM Role that this user will assume.

const snowflakeRole = new iam.Role(this, 'SnowflakeS3Role', {
  assumedBy: new iam.ServicePrincipal('sts.amazonaws.com'),
  inlinePolicies: {
    SnowflakeS3AccessPolicy: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: [
            's3:GetObject',
            's3:PutObject',
            's3:ListBucket',
          ],
          resources: [
            bucket.bucketArn,
            `${bucket.bucketArn}/*`,
          ],
        }),
      ],
    })
  }
})

snowflakeRole.assumeRolePolicy?.addStatements(
  new iam.PolicyStatement({
    actions: ['sts:AssumeRole'],
    principals: [new iam.AccountPrincipal('*****')],
    conditions: {
      StringEquals: {
        'sts:ExternalId': '*****',
      },
    },
  })
)

Next, create a STAGE and COPY the data to a TABLE. You can also load automatically using Snowpipe. Besides, if you don’t enclose identifiers such as database names in double quotes, they will be treated as uppercase, but you can create lowercase objects by enclosing them, and they will be created as written in Terraform, etc. However, in that case, you will need to enclose them in double quotes when referencing them, which can lead to unnecessary confusion, so I think it would be better to standardize on uppercase.

CREATE OR REPLACE DATABASE TESTDB;
  
CREATE OR REPLACE SCHEMA TESTDB.S3_DATA;

CREATE OR REPLACE TABLE TESTDB.S3_DATA.USERS (
  id INT
  ,name VARCHAR(30)
);

CREATE OR REPLACE STAGE TESTDB.S3_DATA.S3_DATA_STAGE
  STORAGE_INTEGRATION = S3_DATA_INTEGRATION
  URL = 's3://*****/'
  FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1);
  
COPY INTO TESTDB.S3_DATA.USERS
  FROM @TESTDB.S3_DATA.S3_DATA_STAGE
    FILES = ('users.csv');

SELECT * FROM TESTDB.S3_DATA.USERS;

Then you can query from Snowflake.

Create Snowflake roles with Terraform and grant users table access permissions - sambaiz-net

Reference

Snowflakeの論文「The Snowflake Elastic Data Warehouse」を読んでみた_Part1