データベース

Google BigQuery

概要

Google BigQueryは、Googleが提供するフルマネージドでサーバーレスなエンタープライズデータウェアハウスです。列指向ストレージと分散アーキテクチャを採用し、ペタバイト規模のデータに対して超高速なSQL分析を実現します。機械学習機能(BigQuery ML)、地理空間分析、ストリーミング分析が統合されており、スケーラブルなデータ分析プラットフォームとして世界中の企業で活用されています。

詳細

Google BigQueryは2012年にGoogleがリリースした、クラウドネイティブなデータウェアハウスサービスです。Dremelエンジンをベースとし、列指向ストレージ、大規模並列処理、自動スケーリングにより、従来のオンプレミスデータウェアハウスでは困難な規模とスピードでのデータ分析を可能にします。完全サーバーレスなため、インフラストラクチャ管理が不要で、使用した分だけの従量課金制です。

BigQueryの主な特徴:

  • サーバーレス・フルマネージド
  • 列指向ストレージと分散処理
  • 標準SQLクエリサポート
  • BigQuery ML(機械学習統合)
  • 地理空間分析機能(BigQuery GIS)
  • リアルタイムストリーミング分析
  • ペタバイト規模のスケーラビリティ
  • データ共有とコラボレーション
  • セキュリティとコンプライアンス
  • Google Cloud エコシステム統合

メリット・デメリット

メリット

  • サーバーレス: インフラ管理不要、自動スケーリング
  • 超高速: 列指向ストレージによる高速クエリ実行
  • 機械学習統合: SQLでML模型の作成・実行が可能
  • 地理空間分析: 豊富なGIS機能でGeography型データを分析
  • コスト効率: 使用量に応じた従量課金制
  • 標準SQL: 既存SQLスキルをそのまま活用可能
  • データ共有: Analyticsハブによる安全なデータ共有
  • セキュリティ: エンタープライズグレードのセキュリティ機能

デメリット

  • ベンダーロックイン: Google Cloud依存のリスク
  • クエリコスト: 大量データ処理時の費用増加
  • レイテンシ: 小規模なOLTPには不向き
  • データ移行: 既存システムからの移行コスト
  • 学習コスト: BigQuery特有の機能習得が必要

主要リンク

書き方の例

セットアップ・認証

# Google Cloud CLI インストール
# Windows (PowerShell)
(New-Object Net.WebClient).DownloadFile("https://dl.google.com/dl/cloudsdk/channels/rapid/GoogleCloudSDKInstaller.exe", "$env:Temp\GoogleCloudSDKInstaller.exe")
& $env:Temp\GoogleCloudSDKInstaller.exe

# macOS
curl https://sdk.cloud.google.com | bash
exec -l $SHELL

# Ubuntu/Debian
echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list
curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key --keyring /usr/share/keyrings/cloud.google.gpg add -
sudo apt-get update && sudo apt-get install google-cloud-cli

# 認証設定
gcloud auth login
gcloud config set project YOUR_PROJECT_ID

# BigQuery Python クライアント インストール
pip install google-cloud-bigquery
pip install google-cloud-bigquery[pandas,pyarrow]  # Pandas統合版
pip install google-cloud-bigquery[opentelemetry]   # トレーシング対応

基本操作(データセット・テーブル管理)

from google.cloud import bigquery
import pandas as pd
from datetime import datetime

# クライアント初期化
client = bigquery.Client(project='your-project-id')

# データセット作成
dataset_id = f"{client.project}.analytics_demo"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset.description = "Demo dataset for analytics"
dataset = client.create_dataset(dataset, exists_ok=True)
print(f"Created dataset: {dataset.dataset_id}")

# テーブル作成
table_id = f"{dataset_id}.sales_data"
schema = [
    bigquery.SchemaField("transaction_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("customer_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("product_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("category", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("amount", "FLOAT64", mode="REQUIRED"),
    bigquery.SchemaField("quantity", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("region", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("channel", "STRING", mode="NULLABLE"),
]

table = bigquery.Table(table_id, schema=schema)
table.description = "Sales transaction data"
table = client.create_table(table, exists_ok=True)
print(f"Created table: {table.table_id}")

# データ挿入
rows_to_insert = [
    {
        "transaction_id": "txn_001",
        "timestamp": datetime(2024, 1, 15, 10, 30),
        "customer_id": "cust_1001",
        "product_id": "prod_A",
        "category": "Electronics",
        "amount": 299.99,
        "quantity": 1,
        "region": "North America",
        "channel": "Online"
    },
    {
        "transaction_id": "txn_002",
        "timestamp": datetime(2024, 1, 15, 11, 15),
        "customer_id": "cust_1002",
        "product_id": "prod_B",
        "category": "Clothing",
        "amount": 79.99,
        "quantity": 2,
        "region": "Europe",
        "channel": "Store"
    }
]

errors = client.insert_rows_json(table, rows_to_insert)
if not errors:
    print("Data inserted successfully")
else:
    print(f"Errors: {errors}")

# パーティション化テーブル作成
partitioned_table_id = f"{dataset_id}.sales_partitioned"
partitioned_table = bigquery.Table(partitioned_table_id, schema=schema)
partitioned_table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="timestamp"
)
partitioned_table = client.create_table(partitioned_table, exists_ok=True)

SQLクエリ実行・分析

# 基本的なクエリ実行
query = """
SELECT 
    category,
    region,
    COUNT(*) as transaction_count,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_transaction_value,
    SUM(quantity) as total_quantity
FROM `your-project-id.analytics_demo.sales_data`
WHERE timestamp >= '2024-01-01'
GROUP BY category, region
ORDER BY total_revenue DESC
"""

query_job = client.query(query)
results = query_job.result()

print("Sales Analysis Results:")
for row in results:
    print(f"Category: {row.category}, Region: {row.region}")
    print(f"  Transactions: {row.transaction_count}")
    print(f"  Revenue: ${row.total_revenue:.2f}")
    print(f"  Avg Value: ${row.avg_transaction_value:.2f}")
    print()

# パラメータ化クエリ
parameterized_query = """
SELECT 
    customer_id,
    COUNT(*) as purchase_count,
    SUM(amount) as total_spent,
    MAX(timestamp) as last_purchase
FROM `your-project-id.analytics_demo.sales_data`
WHERE 
    timestamp >= @start_date 
    AND timestamp <= @end_date
    AND amount >= @min_amount
GROUP BY customer_id
HAVING purchase_count >= @min_purchases
ORDER BY total_spent DESC
LIMIT @limit_rows
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter("start_date", "DATE", "2024-01-01"),
        bigquery.ScalarQueryParameter("end_date", "DATE", "2024-12-31"),
        bigquery.ScalarQueryParameter("min_amount", "FLOAT64", 50.0),
        bigquery.ScalarQueryParameter("min_purchases", "INT64", 2),
        bigquery.ScalarQueryParameter("limit_rows", "INT64", 100),
    ]
)

query_job = client.query(parameterized_query, job_config=job_config)
results = query_job.result()

# Pandas DataFrameに変換
df = query_job.to_dataframe()
print("Top customers DataFrame:")
print(df.head())

# ドライラン(コスト見積もり)
dry_run_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
dry_run_job = client.query(query, job_config=dry_run_config)
print(f"Query will process {dry_run_job.total_bytes_processed} bytes")
print(f"Estimated cost: ${(dry_run_job.total_bytes_processed / 1024**4) * 5:.4f}")

BigQuery ML(機械学習)

-- 線形回帰モデル作成
CREATE OR REPLACE MODEL `your-project-id.analytics_demo.sales_prediction_model`
OPTIONS(
  model_type='LINEAR_REG',
  input_label_cols=['amount'],
  auto_class_weights=TRUE
) AS
SELECT 
  quantity,
  EXTRACT(HOUR FROM timestamp) as hour_of_day,
  EXTRACT(DAYOFWEEK FROM timestamp) as day_of_week,
  EXTRACT(MONTH FROM timestamp) as month,
  CASE 
    WHEN category = 'Electronics' THEN 1 
    WHEN category = 'Clothing' THEN 2 
    ELSE 3 
  END as category_encoded,
  CASE 
    WHEN channel = 'Online' THEN 1 
    ELSE 0 
  END as is_online,
  amount
FROM `your-project-id.analytics_demo.sales_data`
WHERE timestamp >= '2024-01-01';

-- モデル評価
SELECT 
  mean_absolute_error,
  mean_squared_error,
  mean_squared_log_error,
  median_absolute_error,
  r2_score
FROM ML.EVALUATE(
  MODEL `your-project-id.analytics_demo.sales_prediction_model`,
  (
    SELECT 
      quantity,
      EXTRACT(HOUR FROM timestamp) as hour_of_day,
      EXTRACT(DAYOFWEEK FROM timestamp) as day_of_week,
      EXTRACT(MONTH FROM timestamp) as month,
      CASE 
        WHEN category = 'Electronics' THEN 1 
        WHEN category = 'Clothing' THEN 2 
        ELSE 3 
      END as category_encoded,
      CASE 
        WHEN channel = 'Online' THEN 1 
        ELSE 0 
      END as is_online,
      amount
    FROM `your-project-id.analytics_demo.sales_data`
    WHERE timestamp >= '2024-06-01'
  )
);

-- 予測実行
SELECT 
  predicted_amount,
  quantity,
  hour_of_day,
  category_encoded
FROM ML.PREDICT(
  MODEL `your-project-id.analytics_demo.sales_prediction_model`,
  (
    SELECT 
      2 as quantity,
      14 as hour_of_day,
      3 as day_of_week,
      6 as month,
      1 as category_encoded,
      1 as is_online
  )
);

-- クラスタリング(K-means)
CREATE OR REPLACE MODEL `your-project-id.analytics_demo.customer_segmentation`
OPTIONS(
  model_type='KMEANS',
  num_clusters=4
) AS
SELECT 
  customer_id,
  COUNT(*) as purchase_frequency,
  AVG(amount) as avg_purchase_amount,
  SUM(amount) as total_spent,
  DATE_DIFF(CURRENT_DATE(), MAX(DATE(timestamp)), DAY) as days_since_last_purchase
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY customer_id;

-- クラスター結果確認
SELECT 
  CENTROID_ID,
  COUNT(*) as customer_count,
  AVG(purchase_frequency) as avg_frequency,
  AVG(avg_purchase_amount) as avg_amount,
  AVG(total_spent) as avg_total_spent,
  AVG(days_since_last_purchase) as avg_days_since_last
FROM ML.PREDICT(
  MODEL `your-project-id.analytics_demo.customer_segmentation`,
  (
    SELECT 
      customer_id,
      COUNT(*) as purchase_frequency,
      AVG(amount) as avg_purchase_amount,
      SUM(amount) as total_spent,
      DATE_DIFF(CURRENT_DATE(), MAX(DATE(timestamp)), DAY) as days_since_last_purchase
    FROM `your-project-id.analytics_demo.sales_data`
    GROUP BY customer_id
  )
)
GROUP BY CENTROID_ID
ORDER BY CENTROID_ID;

地理空間分析(BigQuery GIS)

# 地理空間データの作成とクエリ
geo_query = """
-- 店舗データ作成
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.stores` AS
SELECT 
  'store_001' as store_id,
  'Downtown Store' as store_name,
  ST_GEOGPOINT(-74.0059, 40.7128) as location, -- NYC
  'New York' as city,
  'NY' as state
UNION ALL
SELECT 
  'store_002' as store_id,
  'Suburb Store' as store_name,
  ST_GEOGPOINT(-118.2437, 34.0522) as location, -- LA
  'Los Angeles' as city,
  'CA' as state
UNION ALL
SELECT 
  'store_003' as store_id,
  'Mall Store' as store_name,
  ST_GEOGPOINT(-87.6298, 41.8781) as location, -- Chicago
  'Chicago' as city,
  'IL' as state;

-- 顧客位置データ作成
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.customer_locations` AS
SELECT 
  'cust_1001' as customer_id,
  ST_GEOGPOINT(-74.0059 + RAND() * 0.1 - 0.05, 40.7128 + RAND() * 0.1 - 0.05) as location
UNION ALL
SELECT 
  'cust_1002' as customer_id,
  ST_GEOGPOINT(-118.2437 + RAND() * 0.1 - 0.05, 34.0522 + RAND() * 0.1 - 0.05) as location;

-- 地理空間分析クエリ
SELECT 
  s.store_id,
  s.store_name,
  s.city,
  c.customer_id,
  ST_DISTANCE(s.location, c.location) / 1000 as distance_km,
  ST_AZIMUTH(s.location, c.location) as bearing_radians
FROM `your-project-id.analytics_demo.stores` s
CROSS JOIN `your-project-id.analytics_demo.customer_locations` c
WHERE ST_DWITHIN(s.location, c.location, 10000) -- 10km以内
ORDER BY s.store_id, distance_km;

-- エリア分析
WITH store_coverage AS (
  SELECT 
    store_id,
    store_name,
    ST_BUFFER(location, 5000) as coverage_area -- 5km圏内
  FROM `your-project-id.analytics_demo.stores`
)
SELECT 
  sc.store_id,
  sc.store_name,
  COUNT(c.customer_id) as customers_in_coverage,
  ST_AREA(sc.coverage_area) / 1000000 as coverage_area_km2
FROM store_coverage sc
LEFT JOIN `your-project-id.analytics_demo.customer_locations` c
  ON ST_CONTAINS(sc.coverage_area, c.location)
GROUP BY sc.store_id, sc.store_name, sc.coverage_area
ORDER BY customers_in_coverage DESC;

-- 地理空間集計(公開データセット使用例)
SELECT 
  county_name,
  state_name,
  ST_AREA(county_geom) / 1000000 as area_km2,
  ST_PERIMETER(county_geom) / 1000 as perimeter_km,
  ST_CENTROID(county_geom) as center_point
FROM `bigquery-public-data.geo_us_boundaries.counties`
WHERE state_name = 'California'
ORDER BY area_km2 DESC
LIMIT 10;
"""

client.query(geo_query).result()
print("Geospatial analysis tables created successfully")

高度な分析機能

# ウィンドウ関数とCTE(Common Table Expression)
advanced_analytics_query = """
WITH daily_sales AS (
  SELECT 
    DATE(timestamp) as sale_date,
    category,
    SUM(amount) as daily_revenue,
    COUNT(*) as daily_transactions
  FROM `your-project-id.analytics_demo.sales_data`
  GROUP BY DATE(timestamp), category
),
sales_with_trends AS (
  SELECT 
    sale_date,
    category,
    daily_revenue,
    daily_transactions,
    -- 移動平均(7日)
    AVG(daily_revenue) OVER (
      PARTITION BY category 
      ORDER BY sale_date 
      ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_7d,
    -- 前日比較
    LAG(daily_revenue) OVER (
      PARTITION BY category 
      ORDER BY sale_date
    ) as prev_day_revenue,
    -- ランキング
    ROW_NUMBER() OVER (
      PARTITION BY category 
      ORDER BY daily_revenue DESC
    ) as revenue_rank,
    -- 累積売上
    SUM(daily_revenue) OVER (
      PARTITION BY category 
      ORDER BY sale_date 
      ROWS UNBOUNDED PRECEDING
    ) as cumulative_revenue
  FROM daily_sales
)
SELECT 
  sale_date,
  category,
  daily_revenue,
  moving_avg_7d,
  ROUND((daily_revenue - prev_day_revenue) / prev_day_revenue * 100, 2) as pct_change_from_prev_day,
  revenue_rank,
  cumulative_revenue,
  CASE 
    WHEN daily_revenue > moving_avg_7d * 1.2 THEN 'High Performance'
    WHEN daily_revenue < moving_avg_7d * 0.8 THEN 'Low Performance'
    ELSE 'Normal'
  END as performance_category
FROM sales_with_trends
WHERE sale_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
ORDER BY category, sale_date DESC;
"""

results = client.query(advanced_analytics_query).result()
print("Advanced analytics completed")

# ARRAY/STRUCT操作
array_struct_query = """
-- ネストされたデータ構造の操作
WITH customer_orders AS (
  SELECT 
    customer_id,
    ARRAY_AGG(
      STRUCT(
        transaction_id,
        timestamp,
        amount,
        product_id,
        category
      ) ORDER BY timestamp
    ) as orders
  FROM `your-project-id.analytics_demo.sales_data`
  GROUP BY customer_id
)
SELECT 
  customer_id,
  ARRAY_LENGTH(orders) as total_orders,
  orders[OFFSET(0)].timestamp as first_order_date,
  orders[OFFSET(ARRAY_LENGTH(orders)-1)].timestamp as last_order_date,
  (
    SELECT AVG(order.amount) 
    FROM UNNEST(orders) as order
  ) as avg_order_value,
  (
    SELECT STRING_AGG(DISTINCT order.category) 
    FROM UNNEST(orders) as order
  ) as categories_purchased
FROM customer_orders
WHERE ARRAY_LENGTH(orders) > 1
ORDER BY total_orders DESC;
"""

# JSON操作
json_query = """
-- JSON データの処理
SELECT 
  customer_id,
  JSON_EXTRACT_SCALAR(
    TO_JSON_STRING(
      STRUCT(
        COUNT(*) as total_purchases,
        SUM(amount) as total_spent,
        ARRAY_AGG(category IGNORE NULLS) as categories
      )
    ),
    '$.total_purchases'
  ) as purchases_json,
  JSON_EXTRACT_ARRAY(
    TO_JSON_STRING(
      STRUCT(
        ARRAY_AGG(DISTINCT category IGNORE NULLS) as unique_categories
      )
    ),
    '$.unique_categories'
  ) as categories_array
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY customer_id;
"""

パフォーマンス最適化

# パーティション化とクラスタリング
optimization_query = """
-- パーティション・クラスタ化テーブル作成
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.sales_optimized`
PARTITION BY DATE(timestamp)
CLUSTER BY customer_id, category
AS
SELECT * FROM `your-project-id.analytics_demo.sales_data`;

-- 効率的なクエリ(パーティション対応)
SELECT 
  category,
  COUNT(*) as transactions,
  SUM(amount) as revenue
FROM `your-project-id.analytics_demo.sales_optimized`
WHERE DATE(timestamp) BETWEEN '2024-01-01' AND '2024-01-31'  -- パーティション活用
  AND customer_id LIKE 'cust_10%'  -- クラスタ活用
GROUP BY category;

-- マテリアライズドビュー作成
CREATE MATERIALIZED VIEW `your-project-id.analytics_demo.daily_sales_summary`
PARTITION BY sale_date
CLUSTER BY category
AS
SELECT 
  DATE(timestamp) as sale_date,
  category,
  region,
  COUNT(*) as transaction_count,
  SUM(amount) as total_revenue,
  AVG(amount) as avg_transaction_value
FROM `your-project-id.analytics_demo.sales_data`
GROUP BY DATE(timestamp), category, region;

-- パフォーマンス分析
SELECT 
  job_id,
  creation_time,
  start_time,
  end_time,
  TIMESTAMP_DIFF(end_time, start_time, MILLISECOND) as duration_ms,
  total_bytes_processed,
  total_bytes_billed,
  ROUND(total_bytes_billed / POW(1024, 3), 2) as gb_billed
FROM `your-project-id.region-us.INFORMATION_SCHEMA.JOBS_BY_PROJECT`
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
  AND job_type = 'QUERY'
  AND state = 'DONE'
ORDER BY total_bytes_processed DESC
LIMIT 10;
"""

client.query(optimization_query).result()
print("Performance optimization queries executed")

実用例(ダッシュボード・レポート)

# ビジネスインテリジェンス用レポート
def generate_sales_dashboard():
    # KPI計算
    kpi_query = """
    WITH date_ranges AS (
      SELECT 
        CURRENT_DATE() as today,
        DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY) as yesterday,
        DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) as week_ago,
        DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY) as month_ago
    ),
    current_metrics AS (
      SELECT 
        'Today' as period,
        COUNT(*) as transactions,
        SUM(amount) as revenue,
        COUNT(DISTINCT customer_id) as unique_customers
      FROM `your-project-id.analytics_demo.sales_data`, date_ranges
      WHERE DATE(timestamp) = today
      
      UNION ALL
      
      SELECT 
        'Yesterday' as period,
        COUNT(*) as transactions,
        SUM(amount) as revenue,
        COUNT(DISTINCT customer_id) as unique_customers
      FROM `your-project-id.analytics_demo.sales_data`, date_ranges
      WHERE DATE(timestamp) = yesterday
      
      UNION ALL
      
      SELECT 
        'Last 7 Days' as period,
        COUNT(*) as transactions,
        SUM(amount) as revenue,
        COUNT(DISTINCT customer_id) as unique_customers
      FROM `your-project-id.analytics_demo.sales_data`, date_ranges
      WHERE DATE(timestamp) >= week_ago
      
      UNION ALL
      
      SELECT 
        'Last 30 Days' as period,
        COUNT(*) as transactions,
        SUM(amount) as revenue,
        COUNT(DISTINCT customer_id) as unique_customers
      FROM `your-project-id.analytics_demo.sales_data`, date_ranges
      WHERE DATE(timestamp) >= month_ago
    )
    SELECT 
      period,
      transactions,
      ROUND(revenue, 2) as revenue,
      unique_customers,
      ROUND(revenue / transactions, 2) as avg_transaction_value,
      ROUND(revenue / unique_customers, 2) as revenue_per_customer
    FROM current_metrics
    ORDER BY 
      CASE period
        WHEN 'Today' THEN 1
        WHEN 'Yesterday' THEN 2
        WHEN 'Last 7 Days' THEN 3
        WHEN 'Last 30 Days' THEN 4
      END;
    """
    
    kpi_results = client.query(kpi_query).result()
    kpi_df = client.query(kpi_query).to_dataframe()
    
    print("=== Sales Dashboard KPIs ===")
    print(kpi_df.to_string(index=False))
    
    # トレンド分析
    trend_query = """
    SELECT 
      DATE(timestamp) as date,
      SUM(amount) as daily_revenue,
      COUNT(*) as daily_transactions,
      COUNT(DISTINCT customer_id) as daily_customers,
      SUM(amount) / COUNT(*) as avg_transaction_value
    FROM `your-project-id.analytics_demo.sales_data`
    WHERE DATE(timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
    GROUP BY DATE(timestamp)
    ORDER BY date;
    """
    
    trend_df = client.query(trend_query).to_dataframe()
    
    print("\n=== Daily Trends (Last 30 Days) ===")
    print(trend_df.tail(10).to_string(index=False))
    
    return kpi_df, trend_df

# ダッシュボード生成実行
dashboard_data = generate_sales_dashboard()

# データエクスポート
def export_to_cloud_storage():
    extract_job = client.extract_table(
        "your-project-id.analytics_demo.daily_sales_summary",
        "gs://your-bucket-name/exports/daily_sales_*.csv",
        job_config=bigquery.ExtractJobConfig(
            destination_format=bigquery.DestinationFormat.CSV,
            field_delimiter=",",
            print_header=True
        )
    )
    extract_job.result()
    print("Data exported to Cloud Storage")

# リアルタイムストリーミング分析例
streaming_query = """
CREATE OR REPLACE TABLE `your-project-id.analytics_demo.real_time_metrics`
AS
SELECT 
  CURRENT_TIMESTAMP() as update_time,
  'sales_summary' as metric_type,
  STRUCT(
    COUNT(*) as total_transactions,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_transaction_value,
    COUNT(DISTINCT customer_id) as unique_customers
  ) as metrics
FROM `your-project-id.analytics_demo.sales_data`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR);
"""

print("BigQuery comprehensive analysis completed!")