Optimize Apache Spark jobs with partitioning, caching, shuffle optimization, and memory tuning. Use when improving Spark performance, debugging slow jobs, or scaling data processing pipelines.
Add this skill
npx mdskills install sickn33/spark-optimizationComprehensive Spark optimization patterns with clear code examples and performance tuning strategies
1---2name: spark-optimization3description: Optimize Apache Spark jobs with partitioning, caching, shuffle optimization, and memory tuning. Use when improving Spark performance, debugging slow jobs, or scaling data processing pipelines.4---56# Apache Spark Optimization78Production patterns for optimizing Apache Spark jobs including partitioning strategies, memory management, shuffle optimization, and performance tuning.910## Do not use this skill when1112- The task is unrelated to apache spark optimization13- You need a different domain or tool outside this scope1415## Instructions1617- Clarify goals, constraints, and required inputs.18- Apply relevant best practices and validate outcomes.19- Provide actionable steps and verification.20- If detailed examples are required, open `resources/implementation-playbook.md`.2122## Use this skill when2324- Optimizing slow Spark jobs25- Tuning memory and executor configuration26- Implementing efficient partitioning strategies27- Debugging Spark performance issues28- Scaling Spark pipelines for large datasets29- Reducing shuffle and data skew3031## Core Concepts3233### 1. Spark Execution Model3435```36Driver Program37 ↓38Job (triggered by action)39 ↓40Stages (separated by shuffles)41 ↓42Tasks (one per partition)43```4445### 2. Key Performance Factors4647| Factor | Impact | Solution |48|--------|--------|----------|49| **Shuffle** | Network I/O, disk I/O | Minimize wide transformations |50| **Data Skew** | Uneven task duration | Salting, broadcast joins |51| **Serialization** | CPU overhead | Use Kryo, columnar formats |52| **Memory** | GC pressure, spills | Tune executor memory |53| **Partitions** | Parallelism | Right-size partitions |5455## Quick Start5657```python58from pyspark.sql import SparkSession59from pyspark.sql import functions as F6061# Create optimized Spark session62spark = (SparkSession.builder63 .appName("OptimizedJob")64 .config("spark.sql.adaptive.enabled", "true")65 .config("spark.sql.adaptive.coalescePartitions.enabled", "true")66 .config("spark.sql.adaptive.skewJoin.enabled", "true")67 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")68 .config("spark.sql.shuffle.partitions", "200")69 .getOrCreate())7071# Read with optimized settings72df = (spark.read73 .format("parquet")74 .option("mergeSchema", "false")75 .load("s3://bucket/data/"))7677# Efficient transformations78result = (df79 .filter(F.col("date") >= "2024-01-01")80 .select("id", "amount", "category")81 .groupBy("category")82 .agg(F.sum("amount").alias("total")))8384result.write.mode("overwrite").parquet("s3://bucket/output/")85```8687## Patterns8889### Pattern 1: Optimal Partitioning9091```python92# Calculate optimal partition count93def calculate_partitions(data_size_gb: float, partition_size_mb: int = 128) -> int:94 """95 Optimal partition size: 128MB - 256MB96 Too few: Under-utilization, memory pressure97 Too many: Task scheduling overhead98 """99 return max(int(data_size_gb * 1024 / partition_size_mb), 1)100101# Repartition for even distribution102df_repartitioned = df.repartition(200, "partition_key")103104# Coalesce to reduce partitions (no shuffle)105df_coalesced = df.coalesce(100)106107# Partition pruning with predicate pushdown108df = (spark.read.parquet("s3://bucket/data/")109 .filter(F.col("date") == "2024-01-01")) # Spark pushes this down110111# Write with partitioning for future queries112(df.write113 .partitionBy("year", "month", "day")114 .mode("overwrite")115 .parquet("s3://bucket/partitioned_output/"))116```117118### Pattern 2: Join Optimization119120```python121from pyspark.sql import functions as F122from pyspark.sql.types import *123124# 1. Broadcast Join - Small table joins125# Best when: One side < 10MB (configurable)126small_df = spark.read.parquet("s3://bucket/small_table/") # < 10MB127large_df = spark.read.parquet("s3://bucket/large_table/") # TBs128129# Explicit broadcast hint130result = large_df.join(131 F.broadcast(small_df),132 on="key",133 how="left"134)135136# 2. Sort-Merge Join - Default for large tables137# Requires shuffle, but handles any size138result = large_df1.join(large_df2, on="key", how="inner")139140# 3. Bucket Join - Pre-sorted, no shuffle at join time141# Write bucketed tables142(df.write143 .bucketBy(200, "customer_id")144 .sortBy("customer_id")145 .mode("overwrite")146 .saveAsTable("bucketed_orders"))147148# Join bucketed tables (no shuffle!)149orders = spark.table("bucketed_orders")150customers = spark.table("bucketed_customers") # Same bucket count151result = orders.join(customers, on="customer_id")152153# 4. Skew Join Handling154# Enable AQE skew join optimization155spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")156spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")157spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")158159# Manual salting for severe skew160def salt_join(df_skewed, df_other, key_col, num_salts=10):161 """Add salt to distribute skewed keys"""162 # Add salt to skewed side163 df_salted = df_skewed.withColumn(164 "salt",165 (F.rand() * num_salts).cast("int")166 ).withColumn(167 "salted_key",168 F.concat(F.col(key_col), F.lit("_"), F.col("salt"))169 )170171 # Explode other side with all salts172 df_exploded = df_other.crossJoin(173 spark.range(num_salts).withColumnRenamed("id", "salt")174 ).withColumn(175 "salted_key",176 F.concat(F.col(key_col), F.lit("_"), F.col("salt"))177 )178179 # Join on salted key180 return df_salted.join(df_exploded, on="salted_key", how="inner")181```182183### Pattern 3: Caching and Persistence184185```python186from pyspark import StorageLevel187188# Cache when reusing DataFrame multiple times189df = spark.read.parquet("s3://bucket/data/")190df_filtered = df.filter(F.col("status") == "active")191192# Cache in memory (MEMORY_AND_DISK is default)193df_filtered.cache()194195# Or with specific storage level196df_filtered.persist(StorageLevel.MEMORY_AND_DISK_SER)197198# Force materialization199df_filtered.count()200201# Use in multiple actions202agg1 = df_filtered.groupBy("category").count()203agg2 = df_filtered.groupBy("region").sum("amount")204205# Unpersist when done206df_filtered.unpersist()207208# Storage levels explained:209# MEMORY_ONLY - Fast, but may not fit210# MEMORY_AND_DISK - Spills to disk if needed (recommended)211# MEMORY_ONLY_SER - Serialized, less memory, more CPU212# DISK_ONLY - When memory is tight213# OFF_HEAP - Tungsten off-heap memory214215# Checkpoint for complex lineage216spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")217df_complex = (df218 .join(other_df, "key")219 .groupBy("category")220 .agg(F.sum("amount")))221df_complex.checkpoint() # Breaks lineage, materializes222```223224### Pattern 4: Memory Tuning225226```python227# Executor memory configuration228# spark-submit --executor-memory 8g --executor-cores 4229230# Memory breakdown (8GB executor):231# - spark.memory.fraction = 0.6 (60% = 4.8GB for execution + storage)232# - spark.memory.storageFraction = 0.5 (50% of 4.8GB = 2.4GB for cache)233# - Remaining 2.4GB for execution (shuffles, joins, sorts)234# - 40% = 3.2GB for user data structures and internal metadata235236spark = (SparkSession.builder237 .config("spark.executor.memory", "8g")238 .config("spark.executor.memoryOverhead", "2g") # For non-JVM memory239 .config("spark.memory.fraction", "0.6")240 .config("spark.memory.storageFraction", "0.5")241 .config("spark.sql.shuffle.partitions", "200")242 # For memory-intensive operations243 .config("spark.sql.autoBroadcastJoinThreshold", "50MB")244 # Prevent OOM on large shuffles245 .config("spark.sql.files.maxPartitionBytes", "128MB")246 .getOrCreate())247248# Monitor memory usage249def print_memory_usage(spark):250 """Print current memory usage"""251 sc = spark.sparkContext252 for executor in sc._jsc.sc().getExecutorMemoryStatus().keySet().toArray():253 mem_status = sc._jsc.sc().getExecutorMemoryStatus().get(executor)254 total = mem_status._1() / (1024**3)255 free = mem_status._2() / (1024**3)256 print(f"{executor}: {total:.2f}GB total, {free:.2f}GB free")257```258259### Pattern 5: Shuffle Optimization260261```python262# Reduce shuffle data size263spark.conf.set("spark.sql.shuffle.partitions", "auto") # With AQE264spark.conf.set("spark.shuffle.compress", "true")265spark.conf.set("spark.shuffle.spill.compress", "true")266267# Pre-aggregate before shuffle268df_optimized = (df269 # Local aggregation first (combiner)270 .groupBy("key", "partition_col")271 .agg(F.sum("value").alias("partial_sum"))272 # Then global aggregation273 .groupBy("key")274 .agg(F.sum("partial_sum").alias("total")))275276# Avoid shuffle with map-side operations277# BAD: Shuffle for each distinct278distinct_count = df.select("category").distinct().count()279280# GOOD: Approximate distinct (no shuffle)281approx_count = df.select(F.approx_count_distinct("category")).collect()[0][0]282283# Use coalesce instead of repartition when reducing partitions284df_reduced = df.coalesce(10) # No shuffle285286# Optimize shuffle with compression287spark.conf.set("spark.io.compression.codec", "lz4") # Fast compression288```289290### Pattern 6: Data Format Optimization291292```python293# Parquet optimizations294(df.write295 .option("compression", "snappy") # Fast compression296 .option("parquet.block.size", 128 * 1024 * 1024) # 128MB row groups297 .parquet("s3://bucket/output/"))298299# Column pruning - only read needed columns300df = (spark.read.parquet("s3://bucket/data/")301 .select("id", "amount", "date")) # Spark only reads these columns302303# Predicate pushdown - filter at storage level304df = (spark.read.parquet("s3://bucket/partitioned/year=2024/")305 .filter(F.col("status") == "active")) # Pushed to Parquet reader306307# Delta Lake optimizations308(df.write309 .format("delta")310 .option("optimizeWrite", "true") # Bin-packing311 .option("autoCompact", "true") # Compact small files312 .mode("overwrite")313 .save("s3://bucket/delta_table/"))314315# Z-ordering for multi-dimensional queries316spark.sql("""317 OPTIMIZE delta.`s3://bucket/delta_table/`318 ZORDER BY (customer_id, date)319""")320```321322### Pattern 7: Monitoring and Debugging323324```python325# Enable detailed metrics326spark.conf.set("spark.sql.codegen.wholeStage", "true")327spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")328329# Explain query plan330df.explain(mode="extended")331# Modes: simple, extended, codegen, cost, formatted332333# Get physical plan statistics334df.explain(mode="cost")335336# Monitor task metrics337def analyze_stage_metrics(spark):338 """Analyze recent stage metrics"""339 status_tracker = spark.sparkContext.statusTracker()340341 for stage_id in status_tracker.getActiveStageIds():342 stage_info = status_tracker.getStageInfo(stage_id)343 print(f"Stage {stage_id}:")344 print(f" Tasks: {stage_info.numTasks}")345 print(f" Completed: {stage_info.numCompletedTasks}")346 print(f" Failed: {stage_info.numFailedTasks}")347348# Identify data skew349def check_partition_skew(df):350 """Check for partition skew"""351 partition_counts = (df352 .withColumn("partition_id", F.spark_partition_id())353 .groupBy("partition_id")354 .count()355 .orderBy(F.desc("count")))356357 partition_counts.show(20)358359 stats = partition_counts.select(360 F.min("count").alias("min"),361 F.max("count").alias("max"),362 F.avg("count").alias("avg"),363 F.stddev("count").alias("stddev")364 ).collect()[0]365366 skew_ratio = stats["max"] / stats["avg"]367 print(f"Skew ratio: {skew_ratio:.2f}x (>2x indicates skew)")368```369370## Configuration Cheat Sheet371372```python373# Production configuration template374spark_configs = {375 # Adaptive Query Execution (AQE)376 "spark.sql.adaptive.enabled": "true",377 "spark.sql.adaptive.coalescePartitions.enabled": "true",378 "spark.sql.adaptive.skewJoin.enabled": "true",379380 # Memory381 "spark.executor.memory": "8g",382 "spark.executor.memoryOverhead": "2g",383 "spark.memory.fraction": "0.6",384 "spark.memory.storageFraction": "0.5",385386 # Parallelism387 "spark.sql.shuffle.partitions": "200",388 "spark.default.parallelism": "200",389390 # Serialization391 "spark.serializer": "org.apache.spark.serializer.KryoSerializer",392 "spark.sql.execution.arrow.pyspark.enabled": "true",393394 # Compression395 "spark.io.compression.codec": "lz4",396 "spark.shuffle.compress": "true",397398 # Broadcast399 "spark.sql.autoBroadcastJoinThreshold": "50MB",400401 # File handling402 "spark.sql.files.maxPartitionBytes": "128MB",403 "spark.sql.files.openCostInBytes": "4MB",404}405```406407## Best Practices408409### Do's410- **Enable AQE** - Adaptive query execution handles many issues411- **Use Parquet/Delta** - Columnar formats with compression412- **Broadcast small tables** - Avoid shuffle for small joins413- **Monitor Spark UI** - Check for skew, spills, GC414- **Right-size partitions** - 128MB - 256MB per partition415416### Don'ts417- **Don't collect large data** - Keep data distributed418- **Don't use UDFs unnecessarily** - Use built-in functions419- **Don't over-cache** - Memory is limited420- **Don't ignore data skew** - It dominates job time421- **Don't use `.count()` for existence** - Use `.take(1)` or `.isEmpty()`422423## Resources424425- [Spark Performance Tuning](https://spark.apache.org/docs/latest/sql-performance-tuning.html)426- [Spark Configuration](https://spark.apache.org/docs/latest/configuration.html)427- [Databricks Optimization Guide](https://docs.databricks.com/en/optimizations/index.html)428
Full transparency — inspect the skill content before installing.