Apache Spark

分散処理エンジンとして設計された統合分析エンジン。大規模データセットでの高速処理を実現し、Scala、Java、Python、Rでの利用が可能。DataFrame APIにより構造化データの効率的な操作をサポート。

ScalaPythonJavaビッグデータ分散処理MLlib機械学習

フレームワーク

Apache Spark

概要

Apache Sparkは、大規模データ処理のための分散コンピューティングフレームワークです。

詳細

Apache Spark(アパッチスパーク)は2009年にUCバークレーで開始され、2013年にApacheトップレベルプロジェクトとなった統合分析エンジンです。大規模データ処理のためのクラスター計算フレームワークとして設計され、バッチ処理、ストリーミング、機械学習、グラフ処理を統一されたAPIで提供します。メモリ内計算によりHadoop MapReduceより最大100倍高速な処理を実現し、Scala、Java、Python、R、SQLなど複数言語をサポートします。MLlibライブラリにより分散機械学習、Spark Streaming によるリアルタイム処理、GraphX によるグラフ分析が可能です。YARN、Mesos、Kubernetes上での実行をサポートし、AWS、Azure、Google Cloudなどクラウド環境での利用が一般的です。ビッグデータ分析、ETL処理、リアルタイム分析、機械学習パイプラインなど幅広い用途で企業に採用されています。

メリット・デメリット

メリット

  • 高速処理: メモリ内計算によりMapReduceより大幅に高速
  • 統一されたAPI: バッチ・ストリーミング・機械学習を一つのフレームワークで
  • 多言語サポート: Scala、Java、Python、R、SQLでの開発が可能
  • スケーラビリティ: 小規模から数千ノードまでのクラスター対応
  • 豊富なライブラリ: MLlib、GraphX、Structured Streamingなど
  • クラウド対応: 主要クラウドプロバイダーでのマネージドサービス
  • フォルトトレラント: 自動的な障害回復機能

デメリット

  • メモリ使用量: 大量のメモリリソースが必要
  • 学習コスト: 分散処理の概念理解が必要で習得に時間がかかる
  • デバッグの困難さ: 分散環境でのエラー特定が複雑
  • 小規模データ: オーバーヘッドにより小さなデータセットでは非効率
  • 設定の複雑さ: チューニングパラメータが多く最適化が困難
  • バージョン互換性: アップグレード時の互換性問題

主要リンク

書き方の例

Hello World (PySpark)

from pyspark.sql import SparkSession

# SparkSessionの作成
spark = SparkSession.builder \
    .appName("HelloSpark") \
    .master("local[*]") \
    .getOrCreate()

print(f"Spark version: {spark.version}")

# 簡単なDataFrameの作成
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
columns = ["id", "name", "age"]

df = spark.createDataFrame(data, columns)

# データの表示
print("DataFrame contents:")
df.show()

# 基本的な操作
print("Schema:")
df.printSchema()

print("Count:", df.count())

# 年齢でフィルタリング
print("Age > 25:")
df.filter(df.age > 25).show()

# SparkSessionの終了
spark.stop()

データ読み込みと基本操作

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum

# SparkSessionの初期化
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# CSVファイルの読み込み
df = spark.read.option("header", "true") \
              .option("inferSchema", "true") \
              .csv("path/to/your/data.csv")

# または、JSONファイルの読み込み
# df = spark.read.json("path/to/your/data.json")

# データの基本情報
print("データ件数:", df.count())
print("カラム数:", len(df.columns))

# スキーマの確認
df.printSchema()

# 統計情報の表示
df.describe().show()

# 最初の数行を表示
df.show(5)

# カラムの選択
selected_df = df.select("column1", "column2", "column3")

# フィルタリング
filtered_df = df.filter(col("age") > 18)

# グループ化と集計
aggregated_df = df.groupBy("category") \
                  .agg(
                      count("*").alias("count"),
                      avg("price").alias("avg_price"),
                      sum("quantity").alias("total_quantity")
                  )

aggregated_df.show()

# ソート
sorted_df = df.orderBy(col("timestamp").desc())

# 新しいカラムの追加
from pyspark.sql.functions import when, lit
df_with_new_col = df.withColumn(
    "price_category",
    when(col("price") > 100, "High")
    .when(col("price") > 50, "Medium")
    .otherwise("Low")
)

df_with_new_col.show()

機械学習 (MLlib)

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# SparkSessionの作成
spark = SparkSession.builder \
    .appName("MLExample") \
    .getOrCreate()

# サンプルデータの作成
from pyspark.ml.linalg import Vectors
data = [
    (1.0, Vectors.dense([1.0, 2.0, 3.0])),
    (0.0, Vectors.dense([2.0, 1.0, 4.0])),
    (1.0, Vectors.dense([3.0, 3.0, 2.0])),
    (0.0, Vectors.dense([1.0, 1.0, 1.0])),
    (1.0, Vectors.dense([4.0, 2.0, 3.0]))
]

df = spark.createDataFrame(data, ["label", "features"])

# または、実際のデータファイルからの読み込み
# df = spark.read.csv("path/to/your/data.csv", header=True, inferSchema=True)

# 特徴量の準備(実際のデータの場合)
# feature_columns = ["feature1", "feature2", "feature3"]
# assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# df = assembler.transform(df)

# データの分割
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# 特徴量のスケーリング
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# ロジスティック回帰モデル
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")

# パイプラインの作成
pipeline = Pipeline(stages=[scaler, lr])

# モデルの学習
model = pipeline.fit(train_df)

# 予測
predictions = model.transform(test_df)

# 結果の表示
predictions.select("label", "scaled_features", "prediction", "probability").show()

# モデルの評価
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# モデルの保存
# model.write().overwrite().save("path/to/save/model")

spark.stop()

Spark SQL

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# SparkSession作成
spark = SparkSession.builder \
    .appName("SparkSQLExample") \
    .getOrCreate()

# データの読み込み
sales_df = spark.read.option("header", "true") \
                    .option("inferSchema", "true") \
                    .csv("sales_data.csv")

customers_df = spark.read.option("header", "true") \
                        .option("inferSchema", "true") \
                        .csv("customer_data.csv")

# 一時テーブルとして登録
sales_df.createOrReplaceTempView("sales")
customers_df.createOrReplaceTempView("customers")

# SQL クエリの実行
result1 = spark.sql("""
    SELECT 
        c.customer_name,
        SUM(s.amount) as total_sales,
        COUNT(s.order_id) as order_count
    FROM sales s
    JOIN customers c ON s.customer_id = c.customer_id
    WHERE s.order_date >= '2023-01-01'
    GROUP BY c.customer_name
    ORDER BY total_sales DESC
    LIMIT 10
""")

result1.show()

# DataFrame APIでの同等操作
result2 = sales_df.filter(col("order_date") >= "2023-01-01") \
                  .join(customers_df, "customer_id") \
                  .groupBy("customer_name") \
                  .agg(
                      sum("amount").alias("total_sales"),
                      count("order_id").alias("order_count")
                  ) \
                  .orderBy(col("total_sales").desc()) \
                  .limit(10)

result2.show()

# 複雑なウィンドウ関数の使用
from pyspark.sql.window import Window

# 月別売上のランキング
window_spec = Window.partitionBy(date_format(col("order_date"), "yyyy-MM")) \
                    .orderBy(col("amount").desc())

monthly_ranking = spark.sql("""
    SELECT 
        customer_id,
        amount,
        DATE_FORMAT(order_date, 'yyyy-MM') as month,
        ROW_NUMBER() OVER (
            PARTITION BY DATE_FORMAT(order_date, 'yyyy-MM') 
            ORDER BY amount DESC
        ) as rank
    FROM sales
""")

monthly_ranking.filter(col("rank") <= 5).show()

spark.stop()

ストリーミング処理

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# SparkSession作成(ストリーミング用)
spark = SparkSession.builder \
    .appName("StreamingExample") \
    .getOrCreate()

# Kafkaからのストリーミング読み込み
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor_data") \
    .load()

# JSONスキーマの定義
schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("temperature", DoubleType()),
    StructField("humidity", DoubleType())
])

# JSONデータの解析
parsed_df = kafka_df.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# ウィンドウ集計(5分間隔)
windowed_df = parsed_df \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes"),
        col("sensor_id")
    ) \
    .agg(
        avg("temperature").alias("avg_temperature"),
        avg("humidity").alias("avg_humidity"),
        count("*").alias("count")
    )

# ストリーミング出力(コンソール)
query1 = windowed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

# ファイル出力
query2 = parsed_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/path/to/output") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .start()

# 異常検知の例
anomaly_df = parsed_df.filter(
    (col("temperature") > 50) | (col("humidity") > 90)
)

anomaly_query = anomaly_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# すべてのクエリの実行待機
# query1.awaitTermination()

spark.stop()

パフォーマンス最適化

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# パフォーマンス最適化設定でSparkSession作成
spark = SparkSession.builder \
    .appName("OptimizedSpark") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

# データの読み込み
large_df = spark.read.parquet("path/to/large/dataset")

# パーティション数の確認と調整
print(f"Current partitions: {large_df.rdd.getNumPartitions()}")

# 適切なパーティション数に調整
optimal_partitions = 200  # データサイズに応じて調整
large_df = large_df.repartition(optimal_partitions)

# カラムベースパーティショニング
partitioned_df = large_df.repartition("date", "category")

# ブロードキャストジョインの使用(小さなテーブル)
from pyspark.sql.functions import broadcast

small_df = spark.read.csv("small_lookup_table.csv")
joined_df = large_df.join(broadcast(small_df), "key")

# キャッシュの活用(繰り返し使用するデータ)
frequently_used_df = large_df.filter(col("status") == "active")
frequently_used_df.cache()

# 使用例
result1 = frequently_used_df.groupBy("category").count()
result2 = frequently_used_df.filter(col("amount") > 100).count()

# パーティション書き込み最適化
large_df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("output/partitioned_data")

# バケット化(同一キーの頻繁な結合に有効)
bucketed_df = large_df.write \
    .bucketBy(10, "customer_id") \
    .sortBy("timestamp") \
    .saveAsTable("bucketed_sales")

# 統計情報の収集(Catalyst Optimizerの改善)
spark.sql("ANALYZE TABLE bucketed_sales COMPUTE STATISTICS")

# 実行計画の確認
large_df.explain(True)

spark.stop()

分散機械学習パイプライン

from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

# SparkSession作成
spark = SparkSession.builder \
    .appName("MLPipeline") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# 大規模データセットの読み込み
df = spark.read.parquet("large_training_dataset.parquet")

# 特徴量エンジニアリング
# カテゴリ変数のエンコーディング
string_indexers = []
one_hot_encoders = []
categorical_cols = ["category", "brand", "location"]

for col_name in categorical_cols:
    string_indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed")
    one_hot_encoder = OneHotEncoder(inputCol=f"{col_name}_indexed", outputCol=f"{col_name}_encoded")
    string_indexers.append(string_indexer)
    one_hot_encoders.append(one_hot_encoder)

# 数値特徴量の組み合わせ
numeric_cols = ["feature1", "feature2", "feature3"]
feature_cols = numeric_cols + [f"{col}_encoded" for col in categorical_cols]

# 特徴量ベクトルの作成
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# 特徴量のスケーリング
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# 分類器
rf = RandomForestClassifier(
    featuresCol="scaled_features",
    labelCol="label",
    numTrees=100,
    maxDepth=10
)

# パイプラインの構築
pipeline_stages = string_indexers + one_hot_encoders + [assembler, scaler, rf]
pipeline = Pipeline(stages=pipeline_stages)

# データ分割
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# ハイパーパラメータチューニング
param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100, 200]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
    .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    predictionCol="prediction",
    metricName="f1"
)

# クロスバリデーション
cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3
)

# モデル学習(分散処理)
cv_model = cv.fit(train_df)

# 最適モデルの取得
best_model = cv_model.bestModel

# テストデータでの評価
predictions = best_model.transform(test_df)
f1_score = evaluator.evaluate(predictions)

print(f"F1 Score: {f1_score}")

# 特徴量重要度の確認
rf_model = best_model.stages[-1]
feature_importance = rf_model.featureImportances
print("Feature Importance:", feature_importance)

# モデルの保存
best_model.write().overwrite().save("trained_model")

spark.stop()