Apache Spark
Unified analytics engine designed as distributed processing engine. Achieves high-speed processing with large datasets, available for Scala, Java, Python, and R. Supports efficient structured data operations through DataFrame API.
Framework
Apache Spark
Overview
Apache Spark is a distributed computing framework for large-scale data processing.
Details
Apache Spark is a unified analytics engine that began at UC Berkeley in 2009 and became an Apache top-level project in 2013. Designed as a cluster computing framework for large-scale data processing, it provides batch processing, streaming, machine learning, and graph processing through unified APIs. Through in-memory computing, it achieves up to 100x faster processing than Hadoop MapReduce, supporting multiple languages including Scala, Java, Python, R, and SQL. The MLlib library enables distributed machine learning, Spark Streaming provides real-time processing, and GraphX enables graph analytics. It supports execution on YARN, Mesos, and Kubernetes, with common usage in cloud environments like AWS, Azure, and Google Cloud. Widely adopted by enterprises for big data analytics, ETL processing, real-time analytics, and machine learning pipelines.
Pros and Cons
Pros
- High-Speed Processing: Significantly faster than MapReduce through in-memory computing
- Unified API: Batch, streaming, and machine learning in one framework
- Multi-Language Support: Development possible in Scala, Java, Python, R, and SQL
- Scalability: Support from small-scale to thousands of nodes in clusters
- Rich Libraries: MLlib, GraphX, Structured Streaming, and more
- Cloud-Ready: Managed services available on major cloud providers
- Fault Tolerance: Automatic failure recovery capabilities
Cons
- Memory Usage: Requires substantial memory resources
- Learning Curve: Understanding distributed processing concepts takes time
- Debugging Difficulty: Error identification in distributed environments is complex
- Small Data: Inefficient for small datasets due to overhead
- Configuration Complexity: Many tuning parameters make optimization difficult
- Version Compatibility: Compatibility issues during upgrades
Key Links
- Apache Spark Official Site
- Apache Spark Official Documentation
- Apache Spark GitHub Repository
- Spark MLlib Guide
- PySpark Documentation
- Spark SQL Guide
Code Examples
Hello World (PySpark)
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName("HelloSpark") \
.master("local[*]") \
.getOrCreate()
print(f"Spark version: {spark.version}")
# Create simple DataFrame
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Charlie", 35)]
columns = ["id", "name", "age"]
df = spark.createDataFrame(data, columns)
# Display data
print("DataFrame contents:")
df.show()
# Basic operations
print("Schema:")
df.printSchema()
print("Count:", df.count())
# Filter by age
print("Age > 25:")
df.filter(df.age > 25).show()
# Stop SparkSession
spark.stop()
Data Loading and Basic Operations
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, count, sum
# Initialize SparkSession
spark = SparkSession.builder \
.appName("DataProcessing") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read CSV file
df = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("path/to/your/data.csv")
# Or read JSON file
# df = spark.read.json("path/to/your/data.json")
# Basic data information
print("Row count:", df.count())
print("Column count:", len(df.columns))
# Check schema
df.printSchema()
# Display statistics
df.describe().show()
# Show first few rows
df.show(5)
# Column selection
selected_df = df.select("column1", "column2", "column3")
# Filtering
filtered_df = df.filter(col("age") > 18)
# Grouping and aggregation
aggregated_df = df.groupBy("category") \
.agg(
count("*").alias("count"),
avg("price").alias("avg_price"),
sum("quantity").alias("total_quantity")
)
aggregated_df.show()
# Sorting
sorted_df = df.orderBy(col("timestamp").desc())
# Adding new column
from pyspark.sql.functions import when, lit
df_with_new_col = df.withColumn(
"price_category",
when(col("price") > 100, "High")
.when(col("price") > 50, "Medium")
.otherwise("Low")
)
df_with_new_col.show()
Machine Learning (MLlib)
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
# Create SparkSession
spark = SparkSession.builder \
.appName("MLExample") \
.getOrCreate()
# Create sample data
from pyspark.ml.linalg import Vectors
data = [
(1.0, Vectors.dense([1.0, 2.0, 3.0])),
(0.0, Vectors.dense([2.0, 1.0, 4.0])),
(1.0, Vectors.dense([3.0, 3.0, 2.0])),
(0.0, Vectors.dense([1.0, 1.0, 1.0])),
(1.0, Vectors.dense([4.0, 2.0, 3.0]))
]
df = spark.createDataFrame(data, ["label", "features"])
# Or load from actual data file
# df = spark.read.csv("path/to/your/data.csv", header=True, inferSchema=True)
# Feature preparation (for real data)
# feature_columns = ["feature1", "feature2", "feature3"]
# assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
# df = assembler.transform(df)
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Feature scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# Logistic regression model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")
# Create pipeline
pipeline = Pipeline(stages=[scaler, lr])
# Train model
model = pipeline.fit(train_df)
# Make predictions
predictions = model.transform(test_df)
# Display results
predictions.select("label", "scaled_features", "prediction", "probability").show()
# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")
# Save model
# model.write().overwrite().save("path/to/save/model")
spark.stop()
Spark SQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Create SparkSession
spark = SparkSession.builder \
.appName("SparkSQLExample") \
.getOrCreate()
# Load data
sales_df = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("sales_data.csv")
customers_df = spark.read.option("header", "true") \
.option("inferSchema", "true") \
.csv("customer_data.csv")
# Register as temporary tables
sales_df.createOrReplaceTempView("sales")
customers_df.createOrReplaceTempView("customers")
# Execute SQL queries
result1 = spark.sql("""
SELECT
c.customer_name,
SUM(s.amount) as total_sales,
COUNT(s.order_id) as order_count
FROM sales s
JOIN customers c ON s.customer_id = c.customer_id
WHERE s.order_date >= '2023-01-01'
GROUP BY c.customer_name
ORDER BY total_sales DESC
LIMIT 10
""")
result1.show()
# Equivalent operation with DataFrame API
result2 = sales_df.filter(col("order_date") >= "2023-01-01") \
.join(customers_df, "customer_id") \
.groupBy("customer_name") \
.agg(
sum("amount").alias("total_sales"),
count("order_id").alias("order_count")
) \
.orderBy(col("total_sales").desc()) \
.limit(10)
result2.show()
# Using complex window functions
from pyspark.sql.window import Window
# Monthly sales ranking
window_spec = Window.partitionBy(date_format(col("order_date"), "yyyy-MM")) \
.orderBy(col("amount").desc())
monthly_ranking = spark.sql("""
SELECT
customer_id,
amount,
DATE_FORMAT(order_date, 'yyyy-MM') as month,
ROW_NUMBER() OVER (
PARTITION BY DATE_FORMAT(order_date, 'yyyy-MM')
ORDER BY amount DESC
) as rank
FROM sales
""")
monthly_ranking.filter(col("rank") <= 5).show()
spark.stop()
Streaming Processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Create SparkSession for streaming
spark = SparkSession.builder \
.appName("StreamingExample") \
.getOrCreate()
# Stream from Kafka
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sensor_data") \
.load()
# Define JSON schema
schema = StructType([
StructField("sensor_id", StringType()),
StructField("timestamp", TimestampType()),
StructField("temperature", DoubleType()),
StructField("humidity", DoubleType())
])
# Parse JSON data
parsed_df = kafka_df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Window aggregation (5-minute intervals)
windowed_df = parsed_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes"),
col("sensor_id")
) \
.agg(
avg("temperature").alias("avg_temperature"),
avg("humidity").alias("avg_humidity"),
count("*").alias("count")
)
# Stream output (console)
query1 = windowed_df.writeStream \
.outputMode("append") \
.format("console") \
.option("truncate", "false") \
.start()
# File output
query2 = parsed_df.writeStream \
.outputMode("append") \
.format("parquet") \
.option("path", "/path/to/output") \
.option("checkpointLocation", "/path/to/checkpoint") \
.start()
# Anomaly detection example
anomaly_df = parsed_df.filter(
(col("temperature") > 50) | (col("humidity") > 90)
)
anomaly_query = anomaly_df.writeStream \
.outputMode("append") \
.format("console") \
.start()
# Wait for all queries to complete
# query1.awaitTermination()
spark.stop()
Performance Optimization
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Create SparkSession with performance optimization settings
spark = SparkSession.builder \
.appName("OptimizedSpark") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
# Load data
large_df = spark.read.parquet("path/to/large/dataset")
# Check and adjust partition count
print(f"Current partitions: {large_df.rdd.getNumPartitions()}")
# Adjust to optimal partition count
optimal_partitions = 200 # Adjust based on data size
large_df = large_df.repartition(optimal_partitions)
# Column-based partitioning
partitioned_df = large_df.repartition("date", "category")
# Use broadcast join (for small tables)
from pyspark.sql.functions import broadcast
small_df = spark.read.csv("small_lookup_table.csv")
joined_df = large_df.join(broadcast(small_df), "key")
# Utilize caching (for repeatedly used data)
frequently_used_df = large_df.filter(col("status") == "active")
frequently_used_df.cache()
# Usage examples
result1 = frequently_used_df.groupBy("category").count()
result2 = frequently_used_df.filter(col("amount") > 100).count()
# Optimized partitioned write
large_df.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("output/partitioned_data")
# Bucketing (effective for frequent joins on same key)
bucketed_df = large_df.write \
.bucketBy(10, "customer_id") \
.sortBy("timestamp") \
.saveAsTable("bucketed_sales")
# Collect statistics (improves Catalyst Optimizer)
spark.sql("ANALYZE TABLE bucketed_sales COMPUTE STATISTICS")
# Check execution plan
large_df.explain(True)
spark.stop()
Distributed Machine Learning Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline
# Create SparkSession
spark = SparkSession.builder \
.appName("MLPipeline") \
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
# Load large dataset
df = spark.read.parquet("large_training_dataset.parquet")
# Feature engineering
# Categorical variable encoding
string_indexers = []
one_hot_encoders = []
categorical_cols = ["category", "brand", "location"]
for col_name in categorical_cols:
string_indexer = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_indexed")
one_hot_encoder = OneHotEncoder(inputCol=f"{col_name}_indexed", outputCol=f"{col_name}_encoded")
string_indexers.append(string_indexer)
one_hot_encoders.append(one_hot_encoder)
# Numeric feature combination
numeric_cols = ["feature1", "feature2", "feature3"]
feature_cols = numeric_cols + [f"{col}_encoded" for col in categorical_cols]
# Create feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
# Feature scaling
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
# Classifier
rf = RandomForestClassifier(
featuresCol="scaled_features",
labelCol="label",
numTrees=100,
maxDepth=10
)
# Build pipeline
pipeline_stages = string_indexers + one_hot_encoders + [assembler, scaler, rf]
pipeline = Pipeline(stages=pipeline_stages)
# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
# Hyperparameter tuning
param_grid = ParamGridBuilder() \
.addGrid(rf.numTrees, [50, 100, 200]) \
.addGrid(rf.maxDepth, [5, 10, 15]) \
.addGrid(rf.minInstancesPerNode, [1, 5, 10]) \
.build()
evaluator = MulticlassClassificationEvaluator(
labelCol="label",
predictionCol="prediction",
metricName="f1"
)
# Cross-validation
cv = CrossValidator(
estimator=pipeline,
estimatorParamMaps=param_grid,
evaluator=evaluator,
numFolds=3
)
# Train model (distributed processing)
cv_model = cv.fit(train_df)
# Get best model
best_model = cv_model.bestModel
# Evaluate on test data
predictions = best_model.transform(test_df)
f1_score = evaluator.evaluate(predictions)
print(f"F1 Score: {f1_score}")
# Check feature importance
rf_model = best_model.stages[-1]
feature_importance = rf_model.featureImportances
print("Feature Importance:", feature_importance)
# Save model
best_model.write().overwrite().save("trained_model")
spark.stop()