Apache Spark
分散処理エンジンとして設計された統合分析エンジン。大規模データセットでの高速処理を実現し、Scala、Java、Python、Rでの利用が可能。DataFrame APIにより構造化データの効率的な操作をサポート。
フレームワーク
Apache Spark
概要
Apache Sparkは、大規模データ処理のための分散コンピューティングフレームワークです。
詳細
Apache Spark(アパッチスパーク)は2009年にUCバークレーで開始され、2013年にApacheトップレベルプロジェクトとなった統合分析エンジンです。大規模データ処理のためのクラスター計算フレームワークとして設計され、バッチ処理、ストリーミング、機械学習、グラフ処理を統一されたAPIで提供します。メモリ内計算によりHadoop MapReduceより最大100倍高速な処理を実現し、Scala、Java、Python、R、SQLなど複数言語をサポートします。MLlibライブラリにより分散機械学習、Spark Streaming によるリアルタイム処理、GraphX によるグラフ分析が可能です。YARN、Mesos、Kubernetes上での実行をサポートし、AWS、Azure、Google Cloudなどクラウド環境での利用が一般的です。ビッグデータ分析、ETL処理、リアルタイム分析、機械学習パイプラインなど幅広い用途で企業に採用されています。
メリット・デメリット
メリット
- 高速処理: メモリ内計算によりMapReduceより大幅に高速
- 統一されたAPI: バッチ・ストリーミング・機械学習を一つのフレームワークで
- 多言語サポート: Scala、Java、Python、R、SQLでの開発が可能
- スケーラビリティ: 小規模から数千ノードまでのクラスター対応
- 豊富なライブラリ: MLlib、GraphX、Structured Streamingなど
- クラウド対応: 主要クラウドプロバイダーでのマネージドサービス
- フォルトトレラント: 自動的な障害回復機能
デメリット
- メモリ使用量: 大量のメモリリソースが必要
- 学習コスト: 分散処理の概念理解が必要で習得に時間がかかる
- デバッグの困難さ: 分散環境でのエラー特定が複雑
- 小規模データ: オーバーヘッドにより小さなデータセットでは非効率
- 設定の複雑さ: チューニングパラメータが多く最適化が困難
- バージョン互換性: アップグレード時の互換性問題
主要リンク
- Apache Spark公式サイト
- Apache Spark公式ドキュメント
- Apache Spark GitHub リポジトリ
- Spark MLlib ガイド
- PySpark ドキュメント
- Spark SQL ガイド
書き方の例
Hello World (PySpark)
from pyspark.sql import SparkSession
# SparkSessionの作成
spark = SparkSession.builder \
.appName("HelloSpark") \
.master("local[*]") \
.getOrCreate()
print(f"Spark version: {spark.version}")
# 簡単なDataFrameの作成
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# データの表示
print("DataFrame contents:")
df.show()
# 基本的な操作
print("Schema:")
df.printSchema()
print("Count:", df.count())
# 年齢でフィルタリング
print("Age > 25:")
df.filter(df.age > 25).show()
# SparkSessionの終了
spark.stop()
データ読み込みと基本操作
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum
# SparkSessionの初期化
spark = SparkSession.builder \
.appName("DataProcessing") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# CSVファイルの読み込み
df = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("path/to/your/data.csv")
# または、JSONファイルの読み込み
# df = spark.read.json("path/to/your/data.json")
# データの基本情報
print("データ件数:", df.count())
print("カラム数:", len(df.columns))
# スキーマの確認
df.printSchema()
# 統計情報の表示
df.describe().show()
# 最初の数行を表示
df.show(5)
# カラムの選択
selected_df = df.select("column1", "column2", "column3")
# フィルタリング
filtered_df = df.filter(col("age") > 18)
# グループ化と集計
aggregated_df = df.groupBy("category") \
.agg(
count("*").alias("count"),
avg("price").alias("avg_price"),
sum("quantity").alias("total_quantity")
)
aggregated_df.show()
# ソート
sorted_df = df.orderBy(col("timestamp").desc())
# 新しいカラムの追加
from pyspark.sql.functions import when, lit
df_with_new_col = df.withColumn(
"price_category",
when(col("price") > 100, "High")
.when(col("price") > 50, "Medium")
.otherwise("Low")
)
df_with_new_col.show()
機械学習 (MLlib)
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
# SparkSessionの作成
spark = SparkSession.builder \
.appName("MLExample") \
.getOrCreate()
# サンプルデータの作成
from pyspark.ml.linalg import Vectors
data = [
(1.0, Vectors.dense([1.0, 2.0, 3.0])),
(0.0, Vectors.dense([2.0, 1.0, 4.0])),
(1.0, Vectors.dense([3.0, 3.0, 2.0])),
(0.0, Vectors.dense([1.0, 1.0, 1.0])),
(1.0, Vectors.dense([4.0, 2.0, 3.0]))
]
df = spark.createDataFrame(data, ["label", "features"])
# または、実際のデータファイルからの読み込み
# df = spark.read.csv("path/to/your/data.csv", header=True, inferSchema=True)
# 特徴量の準備(実際のデータの場合)
# feature_columns = ["feature1", "feature2", "feature3"]
# assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# df = assembler.transform(df)
# データの分割
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# 特徴量のスケーリング
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# ロジスティック回帰モデル
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")
# パイプラインの作成
pipeline = Pipeline(stages=[scaler, lr])
# モデルの学習
model = pipeline.fit(train_df)
# 予測
predictions = model.transform(test_df)
# 結果の表示
predictions.select("label", "scaled_features", "prediction", "probability").show()
# モデルの評価
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# モデルの保存
# model.write().overwrite().save("path/to/save/model")
spark.stop()
Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# SparkSession作成
spark = SparkSession.builder \
.appName("SparkSQLExample") \
.getOrCreate()
# データの読み込み
sales_df = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("sales_data.csv")
customers_df = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("customer_data.csv")
# 一時テーブルとして登録
sales_df.createOrReplaceTempView("sales")
customers_df.createOrReplaceTempView("customers")
# SQL クエリの実行
result1 = spark.sql("""
SELECT
c.customer_name,
SUM(s.amount) as total_sales,
COUNT(s.order_id) as order_count
FROM sales s
JOIN customers c ON s.customer_id = c.customer_id
WHERE s.order_date >= '2023-01-01'
GROUP BY c.customer_name
ORDER BY total_sales DESC
LIMIT 10
""")
result1.show()
# DataFrame APIでの同等操作
result2 = sales_df.filter(col("order_date") >= "2023-01-01") \
.join(customers_df, "customer_id") \
.groupBy("customer_name") \
.agg(
sum("amount").alias("total_sales"),
count("order_id").alias("order_count")
) \
.orderBy(col("total_sales").desc()) \
.limit(10)
result2.show()
# 複雑なウィンドウ関数の使用
from pyspark.sql.window import Window
# 月別売上のランキング
window_spec = Window.partitionBy(date_format(col("order_date"), "yyyy-MM")) \
.orderBy(col("amount").desc())
monthly_ranking = spark.sql("""
SELECT
customer_id,
amount,
DATE_FORMAT(order_date, 'yyyy-MM') as month,
ROW_NUMBER() OVER (
PARTITION BY DATE_FORMAT(order_date, 'yyyy-MM')
ORDER BY amount DESC
) as rank
FROM sales
""")
monthly_ranking.filter(col("rank") <= 5).show()
spark.stop()
ストリーミング処理
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# SparkSession作成(ストリーミング用)
spark = SparkSession.builder \
.appName("StreamingExample") \
.getOrCreate()
# Kafkaからのストリーミング読み込み
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor_data") \
.load()
# JSONスキーマの定義
schema = StructType([
StructField("sensor_id", StringType()),
StructField("timestamp", TimestampType()),
StructField("temperature", DoubleType()),
StructField("humidity", DoubleType())
])
# JSONデータの解析
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# ウィンドウ集計(5分間隔)
windowed_df = parsed_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("sensor_id")
) \
.agg(
avg("temperature").alias("avg_temperature"),
avg("humidity").alias("avg_humidity"),
count("*").alias("count")
)
# ストリーミング出力(コンソール)
query1 = windowed_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
# ファイル出力
query2 = parsed_df.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/path/to/output") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start()
# 異常検知の例
anomaly_df = parsed_df.filter(
(col("temperature") > 50) | (col("humidity") > 90)
)
anomaly_query = anomaly_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
# すべてのクエリの実行待機
# query1.awaitTermination()
spark.stop()
パフォーマンス最適化
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# パフォーマンス最適化設定でSparkSession作成
spark = SparkSession.builder \
.appName("OptimizedSpark") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
# データの読み込み
large_df = spark.read.parquet("path/to/large/dataset")
# パーティション数の確認と調整
print(f"Current partitions: {large_df.rdd.getNumPartitions()}")
# 適切なパーティション数に調整
optimal_partitions = 200 # データサイズに応じて調整
large_df = large_df.repartition(optimal_partitions)
# カラムベースパーティショニング
partitioned_df = large_df.repartition("date", "category")
# ブロードキャストジョインの使用(小さなテーブル)
from pyspark.sql.functions import broadcast
small_df = spark.read.csv("small_lookup_table.csv")
joined_df = large_df.join(broadcast(small_df), "key")
# キャッシュの活用(繰り返し使用するデータ)
frequently_used_df = large_df.filter(col("status") == "active")
frequently_used_df.cache()
# 使用例
result1 = frequently_used_df.groupBy("category").count()
result2 = frequently_used_df.filter(col("amount") > 100).count()
# パーティション書き込み最適化
large_df.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("output/partitioned_data")
# バケット化(同一キーの頻繁な結合に有効)
bucketed_df = large_df.write \
.bucketBy(10, "customer_id") \
.sortBy("timestamp") \
.saveAsTable("bucketed_sales")
# 統計情報の収集(Catalyst Optimizerの改善)
spark.sql("ANALYZE TABLE bucketed_sales COMPUTE STATISTICS")
# 実行計画の確認
large_df.explain(True)
spark.stop()
分散機械学習パイプライン
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
# SparkSession作成
spark = SparkSession.builder \
.appName("MLPipeline") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
# 大規模データセットの読み込み
df = spark.read.parquet("large_training_dataset.parquet")
# 特徴量エンジニアリング
# カテゴリ変数のエンコーディング
string_indexers = []
one_hot_encoders = []
categorical_cols = ["category", "brand", "location"]
for col_name in categorical_cols:
string_indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed")
one_hot_encoder = OneHotEncoder(inputCol=f"{col_name}_indexed", outputCol=f"{col_name}_encoded")
string_indexers.append(string_indexer)
one_hot_encoders.append(one_hot_encoder)
# 数値特徴量の組み合わせ
numeric_cols = ["feature1", "feature2", "feature3"]
feature_cols = numeric_cols + [f"{col}_encoded" for col in categorical_cols]
# 特徴量ベクトルの作成
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# 特徴量のスケーリング
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# 分類器
rf = RandomForestClassifier(
featuresCol="scaled_features",
labelCol="label",
numTrees=100,
maxDepth=10
)
# パイプラインの構築
pipeline_stages = string_indexers + one_hot_encoders + [assembler, scaler, rf]
pipeline = Pipeline(stages=pipeline_stages)
# データ分割
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# ハイパーパラメータチューニング
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.build()
evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="f1"
)
# クロスバリデーション
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3
)
# モデル学習(分散処理)
cv_model = cv.fit(train_df)
# 最適モデルの取得
best_model = cv_model.bestModel
# テストデータでの評価
predictions = best_model.transform(test_df)
f1_score = evaluator.evaluate(predictions)
print(f"F1 Score: {f1_score}")
# 特徴量重要度の確認
rf_model = best_model.stages[-1]
feature_importance = rf_model.featureImportances
print("Feature Importance:", feature_importance)
# モデルの保存
best_model.write().overwrite().save("trained_model")
spark.stop()