データベース
Google BigQuery
概要
Google BigQueryは、Googleが提供するフルマネージドでサーバーレスなエンタープライズデータウェアハウスです。列指向ストレージと分散アーキテクチャを採用し、ペタバイト規模のデータに対して超高速なSQL分析を実現します。機械学習機能(BigQuery ML)、地理空間分析、ストリーミング分析が統合されており、スケーラブルなデータ分析プラットフォームとして世界中の企業で活用されています。
詳細
Google BigQueryは2012年にGoogleがリリースした、クラウドネイティブなデータウェアハウスサービスです。Dremelエンジンをベースとし、列指向ストレージ、大規模並列処理、自動スケーリングにより、従来のオンプレミスデータウェアハウスでは困難な規模とスピードでのデータ分析を可能にします。完全サーバーレスなため、インフラストラクチャ管理が不要で、使用した分だけの従量課金制です。
BigQueryの主な特徴:
- サーバーレス・フルマネージド
- 列指向ストレージと分散処理
- 標準SQLクエリサポート
- BigQuery ML(機械学習統合)
- 地理空間分析機能(BigQuery GIS)
- リアルタイムストリーミング分析
- ペタバイト規模のスケーラビリティ
- データ共有とコラボレーション
- セキュリティとコンプライアンス
- Google Cloud エコシステム統合
メリット・デメリット
メリット
- サーバーレス: インフラ管理不要、自動スケーリング
- 超高速: 列指向ストレージによる高速クエリ実行
- 機械学習統合: SQLでML模型の作成・実行が可能
- 地理空間分析: 豊富なGIS機能でGeography型データを分析
- コスト効率: 使用量に応じた従量課金制
- 標準SQL: 既存SQLスキルをそのまま活用可能
- データ共有: Analyticsハブによる安全なデータ共有
- セキュリティ: エンタープライズグレードのセキュリティ機能
デメリット
- ベンダーロックイン: Google Cloud依存のリスク
- クエリコスト: 大量データ処理時の費用増加
- レイテンシ: 小規模なOLTPには不向き
- データ移行: 既存システムからの移行コスト
- 学習コスト: BigQuery特有の機能習得が必要
主要リンク
書き方の例
セットアップ・認証
# Google Cloud CLI インストール
# Windows (PowerShell)
(New-Object Net.WebClient).DownloadFile("https://dl.google.com/dl/cloudsdk/channels/rapid/GoogleCloudSDKInstaller.exe", "$env:Temp\GoogleCloudSDKInstaller.exe")
& $env:Temp\GoogleCloudSDKInstaller.exe
# macOS
curl https://sdk.cloud.google.com | bash
exec -l $SHELL
# Ubuntu/Debian
echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
sudo apt-get update && sudo apt-get install google-cloud-cli
# 認証設定
gcloud auth login
gcloud config set project YOUR_PROJECT_ID
# BigQuery Python クライアント インストール
pip install google-cloud-bigquery
pip install google-cloud-bigquery[pandas,pyarrow] # Pandas統合版
pip install google-cloud-bigquery[opentelemetry] # トレーシング対応
基本操作(データセット・テーブル管理)
from google.cloud import bigquery
import pandas as pd
from datetime import datetime
# クライアント初期化
client = bigquery.Client(project='your-project-id')
# データセット作成
dataset_id = f"{client.project}.analytics_demo"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset.description = "Demo dataset for analytics"
dataset = client.create_dataset(dataset, exists_ok=True)
print(f"Created dataset: {dataset.dataset_id}")
# テーブル作成
table_id = f"{dataset_id}.sales_data"
schema = [
bigquery.SchemaField("transaction_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("customer_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("product_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("category", "STRING", mode="REQUIRED"),
bigquery.SchemaField("amount", "FLOAT64", mode="REQUIRED"),
bigquery.SchemaField("quantity", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("region", "STRING", mode="NULLABLE"),
bigquery.SchemaField("channel", "STRING", mode="NULLABLE"),
]
table = bigquery.Table(table_id, schema=schema)
table.description = "Sales transaction data"
table = client.create_table(table, exists_ok=True)
print(f"Created table: {table.table_id}")
# データ挿入
rows_to_insert = [
{
"transaction_id": "txn_001",
"timestamp": datetime(2024, 1, 15, 10, 30),
"customer_id": "cust_1001",
"product_id": "prod_A",
"category": "Electronics",
"amount": 299.99,
"quantity": 1,
"region": "North America",
"channel": "Online"
},
{
"transaction_id": "txn_002",
"timestamp": datetime(2024, 1, 15, 11, 15),
"customer_id": "cust_1002",
"product_id": "prod_B",
"category": "Clothing",
"amount": 79.99,
"quantity": 2,
"region": "Europe",
"channel": "Store"
}
]
errors = client.insert_rows_json(table, rows_to_insert)
if not errors:
print("Data inserted successfully")
else:
print(f"Errors: {errors}")
# パーティション化テーブル作成
partitioned_table_id = f"{dataset_id}.sales_partitioned"
partitioned_table = bigquery.Table(partitioned_table_id, schema=schema)
partitioned_table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="timestamp"
)
partitioned_table = client.create_table(partitioned_table, exists_ok=True)
SQLクエリ実行・分析
# 基本的なクエリ実行
query = """
SELECT
category,
region,
COUNT(*) as transaction_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_transaction_value,
SUM(quantity) as total_quantity
FROM `your-project-id.analytics_demo.sales_data`
WHERE timestamp >= '2024-01-01'
GROUP BY category, region
ORDER BY total_revenue DESC
"""
query_job = client.query(query)
results = query_job.result()
print("Sales Analysis Results:")
for row in results:
print(f"Category: {row.category}, Region: {row.region}")
print(f" Transactions: {row.transaction_count}")
print(f" Revenue: ${row.total_revenue:.2f}")
print(f" Avg Value: ${row.avg_transaction_value:.2f}")
print()
# パラメータ化クエリ
parameterized_query = """
SELECT
customer_id,
COUNT(*) as purchase_count,
SUM(amount) as total_spent,
MAX(timestamp) as last_purchase
FROM `your-project-id.analytics_demo.sales_data`
WHERE
timestamp >= @start_date
AND timestamp <= @end_date
AND amount >= @min_amount
GROUP BY customer_id
HAVING purchase_count >= @min_purchases
ORDER BY total_spent DESC
LIMIT @limit_rows
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("start_date", "DATE", "2024-01-01"),
bigquery.ScalarQueryParameter("end_date", "DATE", "2024-12-31"),
bigquery.ScalarQueryParameter("min_amount", "FLOAT64", 50.0),
bigquery.ScalarQueryParameter("min_purchases", "INT64", 2),
bigquery.ScalarQueryParameter("limit_rows", "INT64", 100),
]
)
query_job = client.query(parameterized_query, job_config=job_config)
results = query_job.result()
# Pandas DataFrameに変換
df = query_job.to_dataframe()
print("Top customers DataFrame:")
print(df.head())
# ドライラン(コスト見積もり)
dry_run_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
dry_run_job = client.query(query, job_config=dry_run_config)
print(f"Query will process {dry_run_job.total_bytes_processed} bytes")
print(f"Estimated cost: ${(dry_run_job.total_bytes_processed / 1024**4) * 5:.4f}")
BigQuery ML(機械学習)
-- 線形回帰モデル作成
CREATE OR REPLACE MODEL `your-project-id.analytics_demo.sales_prediction_model`
OPTIONS(
model_type='LINEAR_REG',
input_label_cols=['amount'],
auto_class_weights=TRUE
) AS
SELECT
quantity,
EXTRACT(HOUR FROM timestamp) as hour_of_day,
EXTRACT(DAYOFWEEK FROM timestamp) as day_of_week,
EXTRACT(MONTH FROM timestamp) as month,
CASE
WHEN category = 'Electronics' THEN 1
WHEN category = 'Clothing' THEN 2
ELSE 3
END as category_encoded,
CASE
WHEN channel = 'Online' THEN 1
ELSE 0
END as is_online,
amount
FROM `your-project-id.analytics_demo.sales_data`
WHERE timestamp >= '2024-01-01';
-- モデル評価
SELECT
mean_absolute_error,
mean_squared_error,
mean_squared_log_error,
median_absolute_error,
r2_score
FROM ML.EVALUATE(
MODEL `your-project-id.analytics_demo.sales_prediction_model`,
(
SELECT
quantity,
EXTRACT(HOUR FROM timestamp) as hour_of_day,
EXTRACT(DAYOFWEEK FROM timestamp) as day_of_week,
EXTRACT(MONTH FROM timestamp) as month,
CASE
WHEN category = 'Electronics' THEN 1
WHEN category = 'Clothing' THEN 2
ELSE 3
END as category_encoded,
CASE
WHEN channel = 'Online' THEN 1
ELSE 0
END as is_online,
amount
FROM `your-project-id.analytics_demo.sales_data`
WHERE timestamp >= '2024-06-01'
)
);
-- 予測実行
SELECT
predicted_amount,
quantity,
hour_of_day,
category_encoded
FROM ML.PREDICT(
MODEL `your-project-id.analytics_demo.sales_prediction_model`,
(
SELECT
2 as quantity,
14 as hour_of_day,
3 as day_of_week,
6 as month,
1 as category_encoded,
1 as is_online
)
);
-- クラスタリング(K-means)
CREATE OR REPLACE MODEL `your-project-id.analytics_demo.customer_segmentation`
OPTIONS(
model_type='KMEANS',
num_clusters=4
) AS
SELECT
customer_id,
COUNT(*) as purchase_frequency,
AVG(amount) as avg_purchase_amount,
SUM(amount) as total_spent,
DATE_DIFF(CURRENT_DATE(), MAX(DATE(timestamp)), DAY) as days_since_last_purchase
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY customer_id;
-- クラスター結果確認
SELECT
CENTROID_ID,
COUNT(*) as customer_count,
AVG(purchase_frequency) as avg_frequency,
AVG(avg_purchase_amount) as avg_amount,
AVG(total_spent) as avg_total_spent,
AVG(days_since_last_purchase) as avg_days_since_last
FROM ML.PREDICT(
MODEL `your-project-id.analytics_demo.customer_segmentation`,
(
SELECT
customer_id,
COUNT(*) as purchase_frequency,
AVG(amount) as avg_purchase_amount,
SUM(amount) as total_spent,
DATE_DIFF(CURRENT_DATE(), MAX(DATE(timestamp)), DAY) as days_since_last_purchase
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY customer_id
)
)
GROUP BY CENTROID_ID
ORDER BY CENTROID_ID;
地理空間分析(BigQuery GIS)
# 地理空間データの作成とクエリ
geo_query = """
-- 店舗データ作成
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.stores` AS
SELECT
'store_001' as store_id,
'Downtown Store' as store_name,
ST_GEOGPOINT(-74.0059, 40.7128) as location, -- NYC
'New York' as city,
'NY' as state
UNION ALL
SELECT
'store_002' as store_id,
'Suburb Store' as store_name,
ST_GEOGPOINT(-118.2437, 34.0522) as location, -- LA
'Los Angeles' as city,
'CA' as state
UNION ALL
SELECT
'store_003' as store_id,
'Mall Store' as store_name,
ST_GEOGPOINT(-87.6298, 41.8781) as location, -- Chicago
'Chicago' as city,
'IL' as state;
-- 顧客位置データ作成
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.customer_locations` AS
SELECT
'cust_1001' as customer_id,
ST_GEOGPOINT(-74.0059 + RAND() * 0.1 - 0.05, 40.7128 + RAND() * 0.1 - 0.05) as location
UNION ALL
SELECT
'cust_1002' as customer_id,
ST_GEOGPOINT(-118.2437 + RAND() * 0.1 - 0.05, 34.0522 + RAND() * 0.1 - 0.05) as location;
-- 地理空間分析クエリ
SELECT
s.store_id,
s.store_name,
s.city,
c.customer_id,
ST_DISTANCE(s.location, c.location) / 1000 as distance_km,
ST_AZIMUTH(s.location, c.location) as bearing_radians
FROM `your-project-id.analytics_demo.stores` s
CROSS JOIN `your-project-id.analytics_demo.customer_locations` c
WHERE ST_DWITHIN(s.location, c.location, 10000) -- 10km以内
ORDER BY s.store_id, distance_km;
-- エリア分析
WITH store_coverage AS (
SELECT
store_id,
store_name,
ST_BUFFER(location, 5000) as coverage_area -- 5km圏内
FROM `your-project-id.analytics_demo.stores`
)
SELECT
sc.store_id,
sc.store_name,
COUNT(c.customer_id) as customers_in_coverage,
ST_AREA(sc.coverage_area) / 1000000 as coverage_area_km2
FROM store_coverage sc
LEFT JOIN `your-project-id.analytics_demo.customer_locations` c
ON ST_CONTAINS(sc.coverage_area, c.location)
GROUP BY sc.store_id, sc.store_name, sc.coverage_area
ORDER BY customers_in_coverage DESC;
-- 地理空間集計(公開データセット使用例)
SELECT
county_name,
state_name,
ST_AREA(county_geom) / 1000000 as area_km2,
ST_PERIMETER(county_geom) / 1000 as perimeter_km,
ST_CENTROID(county_geom) as center_point
FROM `bigquery-public-data.geo_us_boundaries.counties`
WHERE state_name = 'California'
ORDER BY area_km2 DESC
LIMIT 10;
"""
client.query(geo_query).result()
print("Geospatial analysis tables created successfully")
高度な分析機能
# ウィンドウ関数とCTE(Common Table Expression)
advanced_analytics_query = """
WITH daily_sales AS (
SELECT
DATE(timestamp) as sale_date,
category,
SUM(amount) as daily_revenue,
COUNT(*) as daily_transactions
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY DATE(timestamp), category
),
sales_with_trends AS (
SELECT
sale_date,
category,
daily_revenue,
daily_transactions,
-- 移動平均(7日)
AVG(daily_revenue) OVER (
PARTITION BY category
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7d,
-- 前日比較
LAG(daily_revenue) OVER (
PARTITION BY category
ORDER BY sale_date
) as prev_day_revenue,
-- ランキング
ROW_NUMBER() OVER (
PARTITION BY category
ORDER BY daily_revenue DESC
) as revenue_rank,
-- 累積売上
SUM(daily_revenue) OVER (
PARTITION BY category
ORDER BY sale_date
ROWS UNBOUNDED PRECEDING
) as cumulative_revenue
FROM daily_sales
)
SELECT
sale_date,
category,
daily_revenue,
moving_avg_7d,
ROUND((daily_revenue - prev_day_revenue) / prev_day_revenue * 100, 2) as pct_change_from_prev_day,
revenue_rank,
cumulative_revenue,
CASE
WHEN daily_revenue > moving_avg_7d * 1.2 THEN 'High Performance'
WHEN daily_revenue < moving_avg_7d * 0.8 THEN 'Low Performance'
ELSE 'Normal'
END as performance_category
FROM sales_with_trends
WHERE sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
ORDER BY category, sale_date DESC;
"""
results = client.query(advanced_analytics_query).result()
print("Advanced analytics completed")
# ARRAY/STRUCT操作
array_struct_query = """
-- ネストされたデータ構造の操作
WITH customer_orders AS (
SELECT
customer_id,
ARRAY_AGG(
STRUCT(
transaction_id,
timestamp,
amount,
product_id,
category
) ORDER BY timestamp
) as orders
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY customer_id
)
SELECT
customer_id,
ARRAY_LENGTH(orders) as total_orders,
orders[OFFSET(0)].timestamp as first_order_date,
orders[OFFSET(ARRAY_LENGTH(orders)-1)].timestamp as last_order_date,
(
SELECT AVG(order.amount)
FROM UNNEST(orders) as order
) as avg_order_value,
(
SELECT STRING_AGG(DISTINCT order.category)
FROM UNNEST(orders) as order
) as categories_purchased
FROM customer_orders
WHERE ARRAY_LENGTH(orders) > 1
ORDER BY total_orders DESC;
"""
# JSON操作
json_query = """
-- JSON データの処理
SELECT
customer_id,
JSON_EXTRACT_SCALAR(
TO_JSON_STRING(
STRUCT(
COUNT(*) as total_purchases,
SUM(amount) as total_spent,
ARRAY_AGG(category IGNORE NULLS) as categories
)
),
'$.total_purchases'
) as purchases_json,
JSON_EXTRACT_ARRAY(
TO_JSON_STRING(
STRUCT(
ARRAY_AGG(DISTINCT category IGNORE NULLS) as unique_categories
)
),
'$.unique_categories'
) as categories_array
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY customer_id;
"""
パフォーマンス最適化
# パーティション化とクラスタリング
optimization_query = """
-- パーティション・クラスタ化テーブル作成
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.sales_optimized`
PARTITION BY DATE(timestamp)
CLUSTER BY customer_id, category
AS
SELECT * FROM `your-project-id.analytics_demo.sales_data`;
-- 効率的なクエリ(パーティション対応)
SELECT
category,
COUNT(*) as transactions,
SUM(amount) as revenue
FROM `your-project-id.analytics_demo.sales_optimized`
WHERE DATE(timestamp) BETWEEN '2024-01-01' AND '2024-01-31' -- パーティション活用
AND customer_id LIKE 'cust_10%' -- クラスタ活用
GROUP BY category;
-- マテリアライズドビュー作成
CREATE MATERIALIZED VIEW `your-project-id.analytics_demo.daily_sales_summary`
PARTITION BY sale_date
CLUSTER BY category
AS
SELECT
DATE(timestamp) as sale_date,
category,
region,
COUNT(*) as transaction_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_transaction_value
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY DATE(timestamp), category, region;
-- パフォーマンス分析
SELECT
job_id,
creation_time,
start_time,
end_time,
TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) as duration_ms,
total_bytes_processed,
total_bytes_billed,
ROUND(total_bytes_billed / POW(1024, 3), 2) as gb_billed
FROM `your-project-id.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
ORDER BY total_bytes_processed DESC
LIMIT 10;
"""
client.query(optimization_query).result()
print("Performance optimization queries executed")
実用例(ダッシュボード・レポート)
# ビジネスインテリジェンス用レポート
def generate_sales_dashboard():
# KPI計算
kpi_query = """
WITH date_ranges AS (
SELECT
CURRENT_DATE() as today,
DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) as yesterday,
DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) as week_ago,
DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) as month_ago
),
current_metrics AS (
SELECT
'Today' as period,
COUNT(*) as transactions,
SUM(amount) as revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM `your-project-id.analytics_demo.sales_data`, date_ranges
WHERE DATE(timestamp) = today
UNION ALL
SELECT
'Yesterday' as period,
COUNT(*) as transactions,
SUM(amount) as revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM `your-project-id.analytics_demo.sales_data`, date_ranges
WHERE DATE(timestamp) = yesterday
UNION ALL
SELECT
'Last 7 Days' as period,
COUNT(*) as transactions,
SUM(amount) as revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM `your-project-id.analytics_demo.sales_data`, date_ranges
WHERE DATE(timestamp) >= week_ago
UNION ALL
SELECT
'Last 30 Days' as period,
COUNT(*) as transactions,
SUM(amount) as revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM `your-project-id.analytics_demo.sales_data`, date_ranges
WHERE DATE(timestamp) >= month_ago
)
SELECT
period,
transactions,
ROUND(revenue, 2) as revenue,
unique_customers,
ROUND(revenue / transactions, 2) as avg_transaction_value,
ROUND(revenue / unique_customers, 2) as revenue_per_customer
FROM current_metrics
ORDER BY
CASE period
WHEN 'Today' THEN 1
WHEN 'Yesterday' THEN 2
WHEN 'Last 7 Days' THEN 3
WHEN 'Last 30 Days' THEN 4
END;
"""
kpi_results = client.query(kpi_query).result()
kpi_df = client.query(kpi_query).to_dataframe()
print("=== Sales Dashboard KPIs ===")
print(kpi_df.to_string(index=False))
# トレンド分析
trend_query = """
SELECT
DATE(timestamp) as date,
SUM(amount) as daily_revenue,
COUNT(*) as daily_transactions,
COUNT(DISTINCT customer_id) as daily_customers,
SUM(amount) / COUNT(*) as avg_transaction_value
FROM `your-project-id.analytics_demo.sales_data`
WHERE DATE(timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
GROUP BY DATE(timestamp)
ORDER BY date;
"""
trend_df = client.query(trend_query).to_dataframe()
print("\n=== Daily Trends (Last 30 Days) ===")
print(trend_df.tail(10).to_string(index=False))
return kpi_df, trend_df
# ダッシュボード生成実行
dashboard_data = generate_sales_dashboard()
# データエクスポート
def export_to_cloud_storage():
extract_job = client.extract_table(
"your-project-id.analytics_demo.daily_sales_summary",
"gs://your-bucket-name/exports/daily_sales_*.csv",
job_config=bigquery.ExtractJobConfig(
destination_format=bigquery.DestinationFormat.CSV,
field_delimiter=",",
print_header=True
)
)
extract_job.result()
print("Data exported to Cloud Storage")
# リアルタイムストリーミング分析例
streaming_query = """
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.real_time_metrics`
AS
SELECT
CURRENT_TIMESTAMP() as update_time,
'sales_summary' as metric_type,
STRUCT(
COUNT(*) as total_transactions,
SUM(amount) as total_revenue,
AVG(amount) as avg_transaction_value,
COUNT(DISTINCT customer_id) as unique_customers
) as metrics
FROM `your-project-id.analytics_demo.sales_data`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR);
"""
print("BigQuery comprehensive analysis completed!")