データベース

Snowflake

概要

Snowflakeは、クラウドネイティブなデータウェアハウスプラットフォームです。ストレージとコンピュートの分離アーキテクチャにより、独立したスケーリングとコスト最適化を実現します。完全マネージドサービスとして提供され、AWS、Azure、GCPの主要クラウドプロバイダーで利用可能です。

詳細

Snowflakeは2012年に設立され、従来のデータウェアハウスの課題を解決するために設計されました。独自のマルチクラスターアーキテクチャにより、ストレージ、コンピュート、クラウドサービスを完全に分離し、それぞれを独立してスケールできます。データの共有、セキュリティ、ガバナンスに重点を置き、エンタープライズレベルのデータ分析を支援します。

Snowflakeの主な特徴:

  • ストレージ・コンピュート分離アーキテクチャ
  • マルチクラスター仮想ウェアハウス
  • 自動スケーリングと自動最適化
  • 標準SQLサポート
  • ACID準拠のトランザクション
  • データクローンとタイムトラベル
  • セキュアデータ共有
  • Zero-copyクローニング
  • マルチクラウド対応
  • 完全マネージドサービス

メリット・デメリット

メリット

  • コスト効率: 使用した分だけの課金、自動停止機能
  • スケーラビリティ: 独立したストレージ・コンピュートスケーリング
  • 高性能: 自動最適化とマテリアライズドビュー
  • 使いやすさ: 完全マネージド、インフラ管理不要
  • データ共有: セキュアなデータ共有とマーケットプレイス
  • マルチクラウド: AWS、Azure、GCP対応
  • セキュリティ: 暗号化、アクセス制御、監査機能

デメリット

  • ベンダーロックイン: Snowflake固有の機能への依存
  • コスト予測: 従量課金制のため予算管理が複雑
  • リアルタイム制限: ストリーミング分析には制約
  • カスタマイズ制限: インフラ設定の自由度が低い
  • データ移行: 大量データの初回移行コスト

主要リンク

書き方の例

インストール・セットアップ

-- Snowflakeへの接続(Web UI)
-- https://[account_identifier].snowflakecomputing.com/

-- SnowSQL CLIのインストール(macOS)
brew install snowflake-snowsql

-- SnowSQL CLIのインストール(Windows)
-- https://developers.snowflake.com/snowsql/ からダウンロード

-- SnowSQL設定ファイル (~/.snowsql/config)
[connections]
accountname = your_account_identifier
username = your_username
password = your_password
dbname = your_default_database
schemaname = your_default_schema
warehousename = your_default_warehouse

-- SnowSQL接続
snowsql -a your_account_identifier -u your_username

-- Python connector
pip install snowflake-connector-python

-- JDBC Driver
-- https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/

基本操作(DDL/DML)

-- データベース作成
CREATE DATABASE analytics_db;
USE DATABASE analytics_db;

-- スキーマ作成
CREATE SCHEMA sales_data;
USE SCHEMA sales_data;

-- ウェアハウス作成
CREATE WAREHOUSE compute_wh
  WITH WAREHOUSE_SIZE = 'MEDIUM'
       AUTO_SUSPEND = 60  -- 1分後に自動停止
       AUTO_RESUME = TRUE
       INITIALLY_SUSPENDED = TRUE;

-- ウェアハウス使用
USE WAREHOUSE compute_wh;

-- テーブル作成
CREATE TABLE sales_transactions (
    transaction_id VARCHAR(50),
    customer_id VARCHAR(50),
    product_id VARCHAR(50),
    transaction_date DATE,
    transaction_timestamp TIMESTAMP_NTZ,
    amount DECIMAL(10,2),
    quantity INTEGER,
    category VARCHAR(100),
    region VARCHAR(50),
    payment_method VARCHAR(50)
);

-- データ挿入
INSERT INTO sales_transactions VALUES
('T001', 'C001', 'P001', '2024-01-15', '2024-01-15 10:30:00', 299.99, 2, 'Electronics', 'North', 'Credit Card'),
('T002', 'C002', 'P002', '2024-01-15', '2024-01-15 11:45:00', 49.99, 1, 'Books', 'South', 'PayPal'),
('T003', 'C001', 'P003', '2024-01-16', '2024-01-16 09:15:00', 149.99, 1, 'Clothing', 'North', 'Credit Card');

-- CSVファイルからの一括読み込み
CREATE STAGE my_stage
  FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1);

-- S3からのデータ読み込み
COPY INTO sales_transactions
FROM @my_stage/sales_data.csv
FILE_FORMAT = (TYPE = 'CSV' FIELD_DELIMITER = ',' SKIP_HEADER = 1);

-- データ更新
UPDATE sales_transactions 
SET amount = amount * 1.1 
WHERE category = 'Electronics';

-- データ削除
DELETE FROM sales_transactions 
WHERE transaction_date < '2024-01-01';

クエリ・分析

-- 基本的な集計クエリ
SELECT 
    region,
    category,
    COUNT(*) AS transaction_count,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_transaction_value,
    SUM(quantity) AS total_quantity
FROM sales_transactions
WHERE transaction_date >= '2024-01-01'
GROUP BY region, category
ORDER BY total_revenue DESC;

-- 時系列分析
SELECT 
    DATE_TRUNC('month', transaction_date) AS month,
    SUM(amount) AS monthly_revenue,
    COUNT(DISTINCT customer_id) AS unique_customers,
    AVG(amount) AS avg_order_value,
    LAG(SUM(amount)) OVER (ORDER BY month) AS prev_month_revenue,
    (SUM(amount) - LAG(SUM(amount)) OVER (ORDER BY month)) / 
    LAG(SUM(amount)) OVER (ORDER BY month) * 100 AS growth_rate
FROM sales_transactions
GROUP BY month
ORDER BY month;

-- ウィンドウ関数による顧客分析
SELECT 
    customer_id,
    transaction_date,
    amount,
    SUM(amount) OVER (PARTITION BY customer_id ORDER BY transaction_date) AS running_total,
    ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY transaction_date) AS order_sequence,
    DENSE_RANK() OVER (ORDER BY amount DESC) AS amount_rank
FROM sales_transactions
ORDER BY customer_id, transaction_date;

-- JSON データの処理
CREATE TABLE user_events (
    event_id VARCHAR(50),
    user_id VARCHAR(50),
    event_timestamp TIMESTAMP_NTZ,
    event_data VARIANT  -- JSON型
);

INSERT INTO user_events VALUES
('E001', 'U001', '2024-01-15 10:30:00', 
 '{"action": "page_view", "page": "/products", "duration": 120, "referrer": "google.com"}'),
('E002', 'U001', '2024-01-15 10:32:00', 
 '{"action": "add_to_cart", "product_id": "P001", "quantity": 2, "price": 299.99}');

SELECT 
    user_id,
    event_data:action::STRING AS action,
    event_data:page::STRING AS page,
    event_data:duration::INTEGER AS duration,
    event_data:product_id::STRING AS product_id
FROM user_events
WHERE event_data:action::STRING = 'page_view';

高度な機能

-- タイムトラベル(履歴データアクセス)
-- 1時間前のデータ
SELECT * FROM sales_transactions AT (TIMESTAMP => DATEADD(hour, -1, CURRENT_TIMESTAMP()));

-- 特定時点のデータ
SELECT * FROM sales_transactions BEFORE (STATEMENT => '01a2b3c4-0001-2345-0000-000012345678');

-- テーブルクローン(Zero-copy)
CREATE TABLE sales_transactions_backup 
CLONE sales_transactions;

-- タスク(スケジュール実行)
CREATE TASK daily_summary_task
  WAREHOUSE = compute_wh
  SCHEDULE = 'USING CRON 0 2 * * * UTC'  -- 毎日午前2時UTC
AS
  INSERT INTO daily_sales_summary
  SELECT 
    CURRENT_DATE() - 1 AS report_date,
    region,
    SUM(amount) AS daily_revenue,
    COUNT(*) AS transaction_count
  FROM sales_transactions
  WHERE transaction_date = CURRENT_DATE() - 1
  GROUP BY region;

-- タスク開始
ALTER TASK daily_summary_task RESUME;

-- ストリーム(変更データキャプチャ)
CREATE STREAM sales_stream ON TABLE sales_transactions;

-- パイプ(自動データ読み込み)
CREATE PIPE sales_pipe 
  AUTO_INGEST = TRUE
AS
  COPY INTO sales_transactions
  FROM @my_stage
  FILE_FORMAT = (TYPE = 'CSV');

-- セキュアビュー
CREATE SECURE VIEW sensitive_sales AS
SELECT 
    region,
    category,
    SUM(amount) AS revenue,
    COUNT(*) AS transactions
FROM sales_transactions
GROUP BY region, category;

パフォーマンス最適化

-- クラスタリングキー設定
ALTER TABLE sales_transactions 
CLUSTER BY (transaction_date, region);

-- マテリアライズドビュー
CREATE MATERIALIZED VIEW sales_summary_mv AS
SELECT 
    DATE_TRUNC('day', transaction_date) AS day,
    region,
    category,
    SUM(amount) AS daily_revenue,
    COUNT(*) AS transaction_count,
    AVG(amount) AS avg_amount
FROM sales_transactions
GROUP BY day, region, category;

-- 自動クラスタリング有効化
ALTER TABLE sales_transactions 
RESUME RECLUSTER;

-- 検索最適化サービス
ALTER TABLE sales_transactions 
ADD SEARCH OPTIMIZATION;

-- Result Cache利用(自動)
-- 同じクエリは24時間キャッシュされる
SELECT COUNT(*) FROM sales_transactions 
WHERE transaction_date = CURRENT_DATE();

-- Query Acceleration Service
ALTER WAREHOUSE compute_wh 
SET ENABLE_QUERY_ACCELERATION = TRUE
    QUERY_ACCELERATION_MAX_SCALE_FACTOR = 8;

-- ウェアハウスサイズ調整
ALTER WAREHOUSE compute_wh SET WAREHOUSE_SIZE = 'LARGE';

-- 複数クラスター設定(Enterprise以上)
ALTER WAREHOUSE compute_wh SET 
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 5
  SCALING_POLICY = 'STANDARD';

実用例

-- 顧客セグメンテーション分析
WITH customer_metrics AS (
  SELECT 
    customer_id,
    COUNT(*) AS purchase_frequency,
    SUM(amount) AS total_spent,
    AVG(amount) AS avg_order_value,
    MAX(transaction_date) AS last_purchase_date,
    DATEDIFF(day, MAX(transaction_date), CURRENT_DATE()) AS days_since_last_purchase
  FROM sales_transactions
  GROUP BY customer_id
),
customer_segments AS (
  SELECT 
    customer_id,
    purchase_frequency,
    total_spent,
    avg_order_value,
    days_since_last_purchase,
    CASE 
      WHEN total_spent >= 1000 AND days_since_last_purchase <= 30 THEN 'VIP'
      WHEN total_spent >= 500 AND days_since_last_purchase <= 60 THEN 'Premium'
      WHEN days_since_last_purchase <= 90 THEN 'Active'
      WHEN days_since_last_purchase <= 180 THEN 'At Risk'
      ELSE 'Inactive'
    END AS segment
  FROM customer_metrics
)
SELECT 
  segment,
  COUNT(*) AS customer_count,
  AVG(total_spent) AS avg_total_spent,
  AVG(purchase_frequency) AS avg_frequency
FROM customer_segments
GROUP BY segment
ORDER BY avg_total_spent DESC;

-- 売上予測(線形回帰近似)
WITH daily_sales AS (
  SELECT 
    transaction_date,
    SUM(amount) AS daily_revenue,
    ROW_NUMBER() OVER (ORDER BY transaction_date) AS day_number
  FROM sales_transactions
  GROUP BY transaction_date
),
regression_stats AS (
  SELECT 
    COUNT(*) AS n,
    SUM(day_number) AS sum_x,
    SUM(daily_revenue) AS sum_y,
    SUM(day_number * daily_revenue) AS sum_xy,
    SUM(day_number * day_number) AS sum_x2
  FROM daily_sales
)
SELECT 
  (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) AS slope,
  (sum_y - slope * sum_x) / n AS intercept
FROM regression_stats;

-- データ品質チェック
SELECT 
  'sales_transactions' AS table_name,
  'Null customer_id' AS check_type,
  COUNT(*) AS issue_count
FROM sales_transactions
WHERE customer_id IS NULL
UNION ALL
SELECT 
  'sales_transactions',
  'Negative amount',
  COUNT(*)
FROM sales_transactions
WHERE amount < 0
UNION ALL
SELECT 
  'sales_transactions',
  'Future dates',
  COUNT(*)
FROM sales_transactions
WHERE transaction_date > CURRENT_DATE();

セキュリティとガバナンス

-- ロール作成
CREATE ROLE sales_analyst;
CREATE ROLE sales_manager;

-- 権限付与
GRANT USAGE ON DATABASE analytics_db TO ROLE sales_analyst;
GRANT USAGE ON SCHEMA sales_data TO ROLE sales_analyst;
GRANT SELECT ON TABLE sales_transactions TO ROLE sales_analyst;

-- マスキングポリシー
CREATE MASKING POLICY customer_id_mask AS (val STRING) 
RETURNS STRING ->
  CASE 
    WHEN CURRENT_ROLE() IN ('SALES_MANAGER', 'ADMIN') THEN val
    ELSE 'MASKED'
  END;

-- マスキング適用
ALTER TABLE sales_transactions 
MODIFY COLUMN customer_id 
SET MASKING POLICY customer_id_mask;

-- 行レベルセキュリティ
CREATE ROW ACCESS POLICY region_policy AS (region STRING) 
RETURNS BOOLEAN ->
  CASE 
    WHEN CURRENT_ROLE() = 'ADMIN' THEN TRUE
    WHEN CURRENT_ROLE() = 'NORTH_SALES' AND region = 'North' THEN TRUE
    WHEN CURRENT_ROLE() = 'SOUTH_SALES' AND region = 'South' THEN TRUE
    ELSE FALSE
  END;

-- 行ポリシー適用
ALTER TABLE sales_transactions 
ADD ROW ACCESS POLICY region_policy ON (region);

-- ネットワークポリシー
CREATE NETWORK POLICY office_network
  ALLOWED_IP_LIST = ('192.168.1.0/24', '10.0.0.0/8')
  BLOCKED_IP_LIST = ('192.168.1.99');

-- ユーザーにネットワークポリシー適用
ALTER USER sales_user SET NETWORK_POLICY = office_network;

監視・管理

-- ウェアハウス使用量監視
SELECT 
  warehouse_name,
  DATE_TRUNC('day', start_time) AS day,
  SUM(credits_used) AS daily_credits,
  AVG(credits_used) AS avg_credits_per_hour
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE start_time >= DATEADD(day, -30, CURRENT_TIMESTAMP())
GROUP BY warehouse_name, day
ORDER BY day DESC, daily_credits DESC;

-- クエリ履歴分析
SELECT 
  user_name,
  warehouse_name,
  database_name,
  query_type,
  execution_status,
  total_elapsed_time / 1000 AS execution_seconds,
  bytes_scanned,
  rows_produced,
  credits_used_cloud_services,
  query_text
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD(hour, -24, CURRENT_TIMESTAMP())
  AND total_elapsed_time > 10000  -- 10秒以上のクエリ
ORDER BY total_elapsed_time DESC
LIMIT 20;

-- ストレージ使用量
SELECT 
  database_name,
  schema_name,
  table_name,
  bytes / (1024*1024*1024) AS size_gb,
  row_count
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLE_STORAGE_METRICS
WHERE deleted = FALSE
ORDER BY bytes DESC
LIMIT 20;

-- データ共有監視
SELECT 
  consumer_account_name,
  database_name,
  share_name,
  date,
  credits_used
FROM SNOWFLAKE.ACCOUNT_USAGE.DATA_TRANSFER_HISTORY
WHERE start_time >= DATEADD(day, -7, CURRENT_TIMESTAMP())
ORDER BY credits_used DESC;

-- パフォーマンス分析
SELECT 
  query_id,
  execution_status,
  warehouse_name,
  total_elapsed_time,
  compilation_time,
  execution_time,
  bytes_scanned,
  percentage_scanned_from_cache,
  query_load_percent
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD(hour, -1, CURRENT_TIMESTAMP())
  AND warehouse_name = 'COMPUTE_WH'
ORDER BY total_elapsed_time DESC;