データベース
Amazon Redshift
概要
Amazon Redshiftは、AWSが提供するフルマネージドのクラウドデータウェアハウスサービスです。ペタバイト規模のデータ分析に対応し、PostgreSQL互換性により既存のSQLツールとの連携が容易です。列指向ストレージアーキテクチャを採用し、大規模なデータセットに対する高速な分析クエリを実現します。
詳細
Amazon Redshiftは、PostgreSQL 8.0.2をベースとしたOLAP(Online Analytical Processing)に特化したデータウェアハウスです。従来のOLTPデータベースとは異なり、分析ワークロードに最適化された列指向ストレージを使用し、データの圧縮と並列処理により高いパフォーマンスを実現します。
Amazon Redshiftの主な特徴:
- PostgreSQL 8.0.2ベースのSQL互換性
- 列指向ストレージエンジン
- 自動圧縮とエンコーディング最適化
- 並列分散処理アーキテクチャ
- AWSマネージドサービス
- ペタバイト規模のスケーラビリティ
- Amazon S3とのシームレス統合
- Redshift Spectrum(データレイク統合)
- RA3インスタンス(コンピューティングとストレージ分離)
- ゼロETL統合(Aurora PostgreSQL連携)
- 機械学習統合(SageMaker)
- 自動バックアップとポイントインタイム復旧
メリット・デメリット
メリット
- フルマネージド: インフラ管理不要でデータ分析に集中可能
- 高速分析: 列指向ストレージによる高速クエリ実行
- PostgreSQL互換: 既存のSQL知識とツールを活用可能
- AWSエコシステム: S3、Firehose、Lambda等との統合
- 自動スケーリング: データ量に応じた柔軟なスケーリング
- コスト効率: 従量課金とコンカレンシースケーリング
- 高可用性: 自動バックアップとマルチAZ対応
- セキュリティ: データ暗号化とVPC統合
- ゼロETL: Auroraとのリアルタイム連携
- 機械学習: Amazon SageMakerとの統合
デメリット
- OLTP不適: トランザクション処理には向かない
- PostgreSQL制限: PostgreSQL 8.0.2ベースで新機能制限
- ベンダーロックイン: AWS固有のサービス
- コスト複雑性: 複数の課金要素の理解が必要
- 更新制限: 頻繁な更新・削除は非効率
- 同時接続制限: 大量の同時ユーザー制限
- データロード時間: 大量データのロードに時間
主要リンク
書き方の例
クラスター作成・セットアップ
# AWS CLI でRedshiftクラスター作成
aws redshift create-cluster \
--cluster-identifier my-redshift-cluster \
--node-type dc2.large \
--master-username admin \
--master-user-password MyPassword123 \
--db-name analytics \
--cluster-type single-node \
--publicly-accessible
# クラスター状況確認
aws redshift describe-clusters \
--cluster-identifier my-redshift-cluster
# Redshift Serverless ワークグループ作成
aws redshift-serverless create-workgroup \
--workgroup-name my-workgroup \
--namespace-name default \
--base-capacity 8 \
--enhanced-vpc-routing
# psqlクライアント接続
export PGHOST=my-redshift-cluster.abcdefg.us-east-1.redshift.amazonaws.com
export PGPORT=5439
export PGDATABASE=analytics
export PGUSER=admin
export PGPASSWORD=MyPassword123
psql
# JDBC接続文字列
jdbc:redshift://my-redshift-cluster.abcdefg.us-east-1.redshift.amazonaws.com:5439/analytics
基本操作(DDL/DML)
-- データベース・スキーマ作成
CREATE DATABASE sales_analytics;
CREATE SCHEMA ecommerce;
-- テーブル作成(圧縮エンコーディング指定)
CREATE TABLE ecommerce.customer (
customer_id INTEGER ENCODE delta,
first_name VARCHAR(50) ENCODE lzo,
last_name VARCHAR(50) ENCODE lzo,
email VARCHAR(100) ENCODE lzo,
phone VARCHAR(20) ENCODE bytedict,
country_code CHAR(2) ENCODE raw,
signup_date DATE ENCODE delta32k,
lifetime_value DECIMAL(10,2) ENCODE az64,
last_login TIMESTAMP ENCODE az64
)
DISTSTYLE KEY
DISTKEY (customer_id)
SORTKEY (signup_date);
-- 売上データテーブル
CREATE TABLE ecommerce.sales (
sale_id BIGINT IDENTITY(1,1),
customer_id INTEGER,
product_id INTEGER,
sale_date DATE,
quantity INTEGER,
unit_price DECIMAL(10,2),
total_amount DECIMAL(12,2),
discount_percent DECIMAL(3,2)
)
DISTSTYLE KEY
DISTKEY (customer_id)
SORTKEY (sale_date);
-- 外部テーブル(Redshift Spectrum)
CREATE EXTERNAL SCHEMA ecommerce_s3
FROM DATA CATALOG
DATABASE 'sales_catalog'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftSpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;
CREATE EXTERNAL TABLE ecommerce_s3.web_logs (
log_date DATE,
user_id INTEGER,
session_id VARCHAR(50),
page_url VARCHAR(500),
user_agent VARCHAR(500),
ip_address VARCHAR(15)
)
STORED AS PARQUET
LOCATION 's3://my-bucket/web-logs/'
TABLE PROPERTIES ('has_encrypted_data'='false');
データロード・COPY
-- S3からのデータロード(自動圧縮)
COPY ecommerce.customer
FROM 's3://my-bucket/customer-data/'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
CSV
IGNOREHEADER 1
COMPUPDATE ON
STATUPDATE ON;
-- JSON形式のデータロード
COPY ecommerce.customer_events
FROM 's3://my-bucket/events/2024/01/'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
JSON 'auto'
TIMEFORMAT 'YYYY-MM-DDTHH:MI:SS'
GZIP;
-- 圧縮ファイルからのロード
COPY ecommerce.sales
FROM 's3://my-bucket/sales/sales_2024.csv.gz'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
CSV
DELIMITER ','
IGNOREHEADER 1
GZIP;
-- 複数ファイルからの並列ロード
COPY ecommerce.sales
FROM 's3://my-bucket/sales/2024/'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
CSV
MANIFEST;
-- エラーハンドリング付きロード
COPY ecommerce.customer
FROM 's3://my-bucket/customer-data/'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftRole'
CSV
MAXERROR 100
NOLOAD;
-- ロード統計確認
SELECT query, filename, line_number, colname, type, position, raw_field_value, err_reason
FROM stl_load_errors
WHERE query = pg_last_copy_id()
ORDER BY query DESC;
クエリ・分析
-- 基本的な集計クエリ
SELECT
DATE_TRUNC('month', sale_date) AS month,
COUNT(*) AS total_sales,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(total_amount) AS revenue,
AVG(total_amount) AS avg_order_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY total_amount) AS median_order_value
FROM ecommerce.sales
WHERE sale_date >= '2024-01-01'
GROUP BY 1
ORDER BY 1;
-- ウィンドウ関数を使った分析
SELECT
customer_id,
sale_date,
total_amount,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY sale_date) AS order_sequence,
LAG(total_amount) OVER (PARTITION BY customer_id ORDER BY sale_date) AS prev_order_amount,
SUM(total_amount) OVER (PARTITION BY customer_id ORDER BY sale_date
ROWS UNBOUNDED PRECEDING) AS cumulative_spending
FROM ecommerce.sales
WHERE customer_id IN (SELECT TOP 100 customer_id FROM ecommerce.customer);
-- 顧客セグメンテーション分析
WITH customer_metrics AS (
SELECT
c.customer_id,
c.signup_date,
COUNT(s.sale_id) AS order_count,
SUM(s.total_amount) AS total_spent,
MAX(s.sale_date) AS last_order_date,
DATEDIFF(day, c.signup_date, MAX(s.sale_date)) AS customer_lifespan_days
FROM ecommerce.customer c
LEFT JOIN ecommerce.sales s ON c.customer_id = s.customer_id
GROUP BY c.customer_id, c.signup_date
)
SELECT
CASE
WHEN total_spent >= 1000 AND order_count >= 10 THEN 'VIP'
WHEN total_spent >= 500 AND order_count >= 5 THEN 'High Value'
WHEN total_spent >= 100 AND order_count >= 2 THEN 'Regular'
ELSE 'Low Value'
END AS customer_segment,
COUNT(*) AS customer_count,
AVG(total_spent) AS avg_spending,
AVG(order_count) AS avg_orders
FROM customer_metrics
GROUP BY 1
ORDER BY avg_spending DESC;
-- 時系列分析(移動平均)
SELECT
sale_date,
SUM(total_amount) AS daily_revenue,
AVG(SUM(total_amount)) OVER (ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS weekly_avg,
AVG(SUM(total_amount)) OVER (ORDER BY sale_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW) AS monthly_avg
FROM ecommerce.sales
WHERE sale_date >= '2024-01-01'
GROUP BY sale_date
ORDER BY sale_date;
高度な分析機能
-- Redshift ML による機械学習
CREATE MODEL customer_lifetime_value_model
FROM (
SELECT
customer_id,
EXTRACT(year FROM signup_date) AS signup_year,
EXTRACT(month FROM signup_date) AS signup_month,
COUNT(sale_id) AS order_count,
AVG(total_amount) AS avg_order_value,
SUM(total_amount) AS total_spent
FROM ecommerce.customer c
JOIN ecommerce.sales s ON c.customer_id = s.customer_id
WHERE signup_date >= '2023-01-01'
GROUP BY c.customer_id, signup_date
)
TARGET total_spent
FUNCTION ml_fn_customer_ltv
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftMLRole'
SETTINGS (
S3_BUCKET 'my-ml-bucket',
MAX_RUNTIME 5400
);
-- 予測クエリ
SELECT
customer_id,
ml_fn_customer_ltv(signup_year, signup_month, order_count, avg_order_value) AS predicted_ltv
FROM (
SELECT
customer_id,
EXTRACT(year FROM signup_date) AS signup_year,
EXTRACT(month FROM signup_date) AS signup_month,
COUNT(sale_id) AS order_count,
AVG(total_amount) AS avg_order_value
FROM ecommerce.customer c
LEFT JOIN ecommerce.sales s ON c.customer_id = s.customer_id
WHERE signup_date >= '2024-01-01'
GROUP BY c.customer_id, signup_date
)
ORDER BY predicted_ltv DESC
LIMIT 100;
-- 空間データ分析(PostGIS関数)
SELECT
customer_id,
ST_Distance(
ST_GeomFromText('POINT(' || longitude || ' ' || latitude || ')'),
ST_GeomFromText('POINT(-73.935242 40.730610)') -- NYC座標
) AS distance_from_nyc_km
FROM ecommerce.customer_locations
WHERE longitude IS NOT NULL AND latitude IS NOT NULL;
-- 配列操作と JSON処理
SELECT
customer_id,
JSON_EXTRACT_PATH_TEXT(preferences, 'favorite_categories') AS favorite_categories,
JSON_EXTRACT_ARRAY_ELEMENT_TEXT(
JSON_EXTRACT_PATH_TEXT(preferences, 'favorite_categories'), 0
) AS top_category
FROM ecommerce.customer_preferences
WHERE preferences IS NOT NULL;
パフォーマンス最適化
-- テーブル統計更新
ANALYZE ecommerce.sales;
ANALYZE ecommerce.customer;
-- VACUUMによる領域回収
VACUUM DELETE ONLY ecommerce.sales;
VACUUM REINDEX ecommerce.customer;
-- 圧縮エンコーディング分析
SELECT
"table",
"column",
type,
encoding,
distkey,
sortkey,
"notnull"
FROM pg_table_def
WHERE schemaname = 'ecommerce'
ORDER BY "table", "column";
-- ディスク使用量確認
SELECT
schema AS table_schema,
"table" AS table_name,
size AS size_in_mb,
tbl_rows AS row_count,
(size::float / tbl_rows::float) AS avg_row_size_bytes
FROM SVV_TABLE_INFO
WHERE schema = 'ecommerce'
ORDER BY size DESC;
-- クエリ実行計画
EXPLAIN
SELECT
c.customer_id,
SUM(s.total_amount) AS total_spent
FROM ecommerce.customer c
JOIN ecommerce.sales s ON c.customer_id = s.customer_id
WHERE c.signup_date >= '2024-01-01'
GROUP BY c.customer_id
ORDER BY total_spent DESC
LIMIT 100;
-- マテリアライズドビュー作成
CREATE MATERIALIZED VIEW ecommerce.monthly_sales_summary AS
SELECT
DATE_TRUNC('month', sale_date) AS month,
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS total_spent,
AVG(total_amount) AS avg_order_value
FROM ecommerce.sales
GROUP BY 1, 2;
-- 自動リフレッシュ設定
ALTER MATERIALIZED VIEW ecommerce.monthly_sales_summary AUTO REFRESH YES;
データ統合・ETL
-- Aurora PostgreSQL ゼロETL統合
-- 統合設定(AWS Console/CLIで実行)
-- aws rds create-db-cluster-zero-etl-integration \
-- --source-arn arn:aws:rds:us-east-1:123456789012:cluster:aurora-postgres-cluster \
-- --target-arn arn:aws:redshift:us-east-1:123456789012:cluster:redshift-cluster
-- ゼロETL統合データの確認
SELECT schemaname, tablename, owner
FROM pg_tables
WHERE schemaname LIKE 'aurora_%'
ORDER BY schemaname, tablename;
-- Federated Query(外部データベース連携)
CREATE EXTERNAL SCHEMA postgres_federation
FROM POSTGRES
DATABASE 'production_db'
URI 'postgres-instance.abcdefg.us-east-1.rds.amazonaws.com'
PORT 5432
USER 'federated_user'
PASSWORD 'federated_password'
IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftFederatedRole';
-- フェデレーテッドクエリ実行
SELECT
r.customer_id,
r.total_spent,
p.recent_activity
FROM ecommerce.customer_summary r
JOIN postgres_federation.user_activity p ON r.customer_id = p.user_id
WHERE r.total_spent > 1000;
-- Data API による外部アクセス
-- Python例
import boto3
redshift_data = boto3.client('redshift-data')
response = redshift_data.execute_statement(
ClusterIdentifier='my-redshift-cluster',
Database='analytics',
DbUser='admin',
Sql='SELECT COUNT(*) FROM ecommerce.sales WHERE sale_date = CURRENT_DATE'
)
# 結果取得
result = redshift_data.get_statement_result(Id=response['Id'])
実用例
-- リアルタイム ダッシュボード用クエリ
WITH daily_metrics AS (
SELECT
CURRENT_DATE AS metric_date,
COUNT(*) AS todays_orders,
SUM(total_amount) AS todays_revenue,
COUNT(DISTINCT customer_id) AS active_customers,
AVG(total_amount) AS avg_order_value
FROM ecommerce.sales
WHERE sale_date = CURRENT_DATE
),
comparison_metrics AS (
SELECT
COUNT(*) AS yesterday_orders,
SUM(total_amount) AS yesterday_revenue
FROM ecommerce.sales
WHERE sale_date = CURRENT_DATE - 1
)
SELECT
d.*,
c.yesterday_orders,
c.yesterday_revenue,
ROUND(((d.todays_revenue - c.yesterday_revenue) / c.yesterday_revenue) * 100, 2) AS revenue_growth_pct,
ROUND(((d.todays_orders - c.yesterday_orders) / c.yesterday_orders) * 100, 2) AS order_growth_pct
FROM daily_metrics d
CROSS JOIN comparison_metrics c;
-- 商品推奨システム(協調フィルタリング)
WITH user_product_matrix AS (
SELECT
customer_id,
product_id,
SUM(quantity) AS total_purchased
FROM ecommerce.sales
GROUP BY customer_id, product_id
),
similar_customers AS (
SELECT
u1.customer_id AS customer_a,
u2.customer_id AS customer_b,
COUNT(*) AS common_products,
CORR(u1.total_purchased, u2.total_purchased) AS similarity_score
FROM user_product_matrix u1
JOIN user_product_matrix u2 ON u1.product_id = u2.product_id
AND u1.customer_id != u2.customer_id
GROUP BY u1.customer_id, u2.customer_id
HAVING COUNT(*) >= 3
)
SELECT
sc.customer_a AS target_customer,
upm.product_id AS recommended_product,
AVG(sc.similarity_score) AS recommendation_score
FROM similar_customers sc
JOIN user_product_matrix upm ON sc.customer_b = upm.customer_id
LEFT JOIN user_product_matrix existing ON sc.customer_a = existing.customer_id
AND upm.product_id = existing.product_id
WHERE existing.product_id IS NULL -- 未購入商品のみ
AND sc.similarity_score > 0.7
GROUP BY sc.customer_a, upm.product_id
ORDER BY sc.customer_a, recommendation_score DESC;
-- 異常検知(売上急変の検出)
WITH daily_sales AS (
SELECT
sale_date,
SUM(total_amount) AS daily_revenue
FROM ecommerce.sales
WHERE sale_date >= CURRENT_DATE - 90
GROUP BY sale_date
),
sales_with_stats AS (
SELECT
sale_date,
daily_revenue,
AVG(daily_revenue) OVER (ORDER BY sale_date ROWS BETWEEN 13 PRECEDING AND 1 PRECEDING) AS avg_14day,
STDDEV(daily_revenue) OVER (ORDER BY sale_date ROWS BETWEEN 13 PRECEDING AND 1 PRECEDING) AS stddev_14day
FROM daily_sales
)
SELECT
sale_date,
daily_revenue,
avg_14day,
ROUND((daily_revenue - avg_14day) / stddev_14day, 2) AS z_score,
CASE
WHEN ABS((daily_revenue - avg_14day) / stddev_14day) > 2.5 THEN 'ANOMALY'
WHEN ABS((daily_revenue - avg_14day) / stddev_14day) > 1.5 THEN 'WARNING'
ELSE 'NORMAL'
END AS status
FROM sales_with_stats
WHERE stddev_14day > 0
AND sale_date >= CURRENT_DATE - 30
ORDER BY sale_date DESC;
システム管理・監視
-- クラスター情報確認
SELECT
node,
slice,
disk_space_used_mb,
disk_space_total_mb,
disk_space_used_mb::float / disk_space_total_mb::float * 100 AS disk_usage_pct
FROM stv_partitions
ORDER BY disk_usage_pct DESC;
-- 実行中クエリ監視
SELECT
query,
pid,
userid,
query_start,
substring(querytxt, 1, 100) AS query_text,
elapsed_time,
cpu_time
FROM stv_recents
WHERE status = 'Running'
ORDER BY elapsed_time DESC;
-- 長時間実行クエリの確認
SELECT
query,
userid,
query_start,
DATEDIFF(seconds, query_start, GETDATE()) AS runtime_seconds,
substring(querytxt, 1, 200) AS query_text
FROM stl_query
WHERE query_start >= DATEADD(hour, -1, GETDATE())
AND DATEDIFF(seconds, query_start, endtime) > 300
ORDER BY runtime_seconds DESC;
-- テーブルロック確認
SELECT
lock_owner,
lock_mode,
table_id,
granted
FROM stv_locks
WHERE granted = 'f'
ORDER BY lock_owner;
-- データ分散確認
SELECT
slice,
COUNT(*) AS row_count
FROM ecommerce.sales
GROUP BY slice
ORDER BY slice;
-- ワークロード管理(WLM)設定確認
SELECT
service_class,
num_query_tasks,
query_working_mem,
query_temp_blocks_to_disk,
total_queue_time
FROM stl_wlm_query
WHERE service_class > 4
AND query_start >= DATEADD(hour, -1, GETDATE())
ORDER BY total_queue_time DESC;
-- スナップショット・バックアップ確認
SELECT
snapshot_id,
cluster_identifier,
snapshot_type,
status,
start_time,
end_time
FROM pg_snapshot
ORDER BY start_time DESC
LIMIT 10;