Back to Platform

PySpark Performance Tuning: From 4 Hours to 20 Minutes

Practical PySpark optimizations that reduced a production pipeline from 4 hours to 20 minutes — covering data skew, broadcast joins, partition sizing, and AQE.

Chris P

Chris P

Platform11 min read
Data platform optimization dashboard with metric tiles

A production PySpark pipeline processing 2TB of daily clickstream data was taking over 4 hours to complete. After six weeks of systematic profiling and tuning, we reduced it to 20 minutes. This article documents every optimization we applied, measured independently, so you can pick the ones relevant to your own workloads.

The pipeline reads raw clickstream events from S3, joins them with four dimension tables (users, products, regions, currencies), computes session-level aggregations, and writes enriched output back to Iceberg. The cluster runs on EMR with 20 r5.2xlarge workers. Nothing exotic — a typical large-scale ETL job.

Before tuning anything, we established a measurement baseline. Every optimization was tested in isolation on a staging cluster with the same data and hardware. We used Spark's built-in web UI, event logs, and the spark.sql.shuffle.partitions metric to identify bottlenecks. Without this discipline, you end up applying five changes at once and never knowing which one helped.

Optimization 1: Diagnosing and Fixing Data Skew with Salted Joins

The Spark UI showed a classic skew pattern: 198 out of 200 tasks finished in under 2 minutes, but 2 tasks ran for 3.5 hours. Those 2 tasks were processing join keys belonging to bot accounts that generated 40 percent of all events. The fix is salting — adding a random bucket to the join key to distribute skewed keys across multiple tasks.

python
1import pyspark.sql.functions as F2from pyspark.sql import DataFrame, SparkSession3 4def salt_skewed_join(5    spark: SparkSession,6    left: DataFrame,7    right: DataFrame,8    join_key: str,9    salt_buckets: int = 20,10    how: str = "inner"11) -> DataFrame:12    """Distribute skewed join keys across salt buckets to prevent13    hot partitions. The left (large) side gets a random salt;14    the right (small) side is exploded across all buckets."""15    salted_left = left.withColumn(16        "_salt", (F.rand() * salt_buckets).cast("int")17    )18 19    salt_range = spark.range(salt_buckets).withColumnRenamed("id", "_salt")20    exploded_right = right.crossJoin(salt_range)21 22    result = salted_left.join(23        exploded_right,24        on=[join_key, "_salt"],25        how=how26    ).drop("_salt")27 28    return result29 30 31# Usage: the users dimension has heavy skew on bot accounts32enriched = salt_skewed_join(33    spark, events_df, users_df, "user_id", salt_buckets=2034)

Salting is not free. It increases the size of the right (dimension) side by a factor of salt_buckets. For a 200MB users table with 20 buckets, that is 4GB — still well within broadcast range. But if your dimension table is already large, reduce the bucket count or consider filtering the skewed keys into a separate code path.

Impact: 4 hours down to 1 hour 30 minutes. This single optimization was the biggest win because it eliminated the long-tail tasks that were holding up the entire stage.

Optimization 2: Broadcast Joins for Dimension Tables

After fixing skew, the Spark UI still showed large shuffles on the dimension joins. The products table (120MB), regions table (5MB), and currencies table (1MB) were all being shuffle-joined with the 2TB fact table. Broadcasting these small tables eliminates the shuffle entirely — each executor gets a full copy of the dimension table in memory.

python
1from pyspark.sql.functions import broadcast2 3# Force broadcast for all dimension tables under 500MB4spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 500 * 1024 * 1024)5 6enriched = (7    events_df8    .join(broadcast(products_df), "product_id")9    .join(broadcast(regions_df), "region_code")10    .join(broadcast(currencies_df), "currency_code")11)12 13# Verify broadcast is being used in the plan14enriched.explain(True)15# Look for BroadcastHashJoin in the physical plan16# If you see SortMergeJoin, the table exceeds the threshold

A common mistake is relying solely on autoBroadcastJoinThreshold. Spark estimates table sizes from catalog statistics, and those estimates are often wrong — especially for views, filtered DataFrames, or tables without ANALYZE TABLE stats. The explicit broadcast() hint overrides the estimate and guarantees a broadcast join.

Impact: 1 hour 30 minutes down to 40 minutes. The shuffle elimination saved both network I/O and the sort phase that SortMergeJoin requires.

Optimization 3: Partition Sizing — Coalescing Small Files

The raw clickstream data on S3 consisted of 50,000 files, most under 1MB. Spark created one task per file, and the overhead of scheduling 50,000 tasks — plus the per-task JVM overhead — dwarfed the actual computation. We repartitioned the input to produce ~128MB partitions, which is the sweet spot for Spark's internal block size.

python
1raw_df = spark.read.parquet("s3://lake/clickstream/dt=2025-12-19/")2print(f"Input partitions: {raw_df.rdd.getNumPartitions()}")3# Output: 50,2474 5# Repartition by logical key to co-locate related records6optimized = raw_df.repartition(200, "user_id")7print(f"Optimized partitions: {optimized.rdd.getNumPartitions()}")8# Output: 2009 10# Write output with target file size for downstream consumers11(12    optimized13    .sortWithinPartitions("event_ts")14    .write15    .option("maxRecordsPerFile", 1_000_000)16    .partitionBy("event_date")17    .mode("overwrite")18    .parquet("s3://lake/clickstream_optimized/")19)

Use repartition() when you want to both increase or decrease partitions and need a full shuffle to redistribute data by key. Use coalesce() only when reducing partitions to avoid a shuffle — but be aware that coalesce produces uneven partitions if the source data is skewed.

Impact: 40 minutes down to 25 minutes. The task scheduling overhead dropped from 12 minutes to under 30 seconds.

Optimization 4: Predicate Pushdown and Partition Pruning

The pipeline was reading the full 2TB dataset and filtering to the last 24 hours after loading everything into memory. Since the data is partitioned by event_date on S3, applying the filter before any transformation lets Spark skip 29 out of 30 day-partitions entirely.

python
1# Before: reads all partitions, then filters (2TB scanned)2df = spark.read.parquet("s3://lake/clickstream/")3df = df.filter(F.col("event_date") == "2025-12-19")4# Spark may still scan all files depending on the plan5 6# After: partition pruning reads only the target date (~70GB)7df = (8    spark.read9    .parquet("s3://lake/clickstream_optimized/")10    .filter(F.col("event_date") == "2025-12-19")11)12 13# Verify partition pruning in the plan14df.explain(True)15# Look for: PartitionFilters: [isnotnull(event_date), (event_date = 2025-12-19)]16# PushedFilters should show the predicate was pushed to the scan

The key detail: predicate pushdown works only if the filter column matches the physical partitioning of the data on disk. If your data is not partitioned by the column you filter on, Spark reads every file and applies the filter after scan — no savings. Partition pruning also does not work through views, subqueries, or UDFs. Keep your filters simple and directly on partition columns.

Impact: 25 minutes down to 20 minutes. Most of the savings came from reduced S3 read volume.

Optimization 5: Adaptive Query Execution (AQE)

Spark 3.x introduced AQE, which re-optimizes the query plan at runtime based on actual data statistics collected during shuffle stages. It can automatically coalesce small post-shuffle partitions, switch join strategies from sort-merge to broadcast when it discovers a side is small, and optimize skew joins without manual salting.

python
1# Enable AQE and its sub-features2spark.conf.set("spark.sql.adaptive.enabled", "true")3spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")4spark.conf.set("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB")5spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")6spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")7spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256MB")8spark.conf.set("spark.sql.adaptive.localShuffleReader.enabled", "true")9 10# AQE works best with a high initial shuffle partition count11# It will coalesce automatically after gathering runtime stats12spark.conf.set("spark.sql.shuffle.partitions", "2000")

With AQE enabled, you can set shuffle.partitions to a high number (like 2000) without worrying about creating too many tiny partitions. AQE coalesces them automatically based on actual data volume. This eliminates the tedious manual tuning of shuffle partition counts that older Spark versions required.

We applied AQE after the manual optimizations above. It provided an additional 5-8 percent improvement by catching residual skew and coalescing intermediate partitions. AQE is not a replacement for proper data layout and join design — it is a safety net that handles the cases your manual tuning misses.

Optimization 6: Kryo Serialization

Spark's default Java serializer is general-purpose but slow. Kryo is 2-10x faster for most workloads and produces smaller serialized objects. The tradeoff is that you need to register custom classes, but for PySpark pipelines working with DataFrames (not RDDs of custom objects), the switch is nearly transparent.

python
1# Switch to Kryo serialization2spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")3spark.conf.set("spark.kryoserializer.buffer", "512k")4spark.conf.set("spark.kryoserializer.buffer.max", "1024m")5spark.conf.set("spark.kryo.registrationRequired", "false")

Kryo serialization primarily affects shuffle operations and broadcast variable distribution. If your pipeline is shuffle-heavy, the improvement can be significant. In our case, the combination of Kryo and AQE reduced shuffle write volume by 35 percent.

Optimization 7: Strategic Caching

Caching is the most misused Spark optimization. Teams cache everything and run out of memory, or they cache DataFrames that are only read once. The rule: cache a DataFrame only if it is read more than once in the same job and the materialized size fits in cluster memory.

python
1from pyspark import StorageLevel2 3# Cache the enriched DataFrame — it is used twice downstream4enriched = (5    events_df6    .join(broadcast(products_df), "product_id")7    .join(broadcast(regions_df), "region_code")8)9 10# MEMORY_AND_DISK avoids recomputation if memory is tight11enriched.persist(StorageLevel.MEMORY_AND_DISK)12enriched.count()  # Force materialization before branching13 14# Branch 1: session aggregation15sessions = compute_sessions(enriched)16sessions.write.parquet("s3://lake/sessions/")17 18# Branch 2: funnel analysis19funnels = compute_funnels(enriched)20funnels.write.parquet("s3://lake/funnels/")21 22# Release memory after both branches complete23enriched.unpersist()

Always call unpersist() when you are done. Forgetting to release cached DataFrames is a common source of out-of-memory errors in long-running Spark applications. And always force materialization with an action like count() before branching — otherwise Spark may re-evaluate the entire lineage for each branch independently.

Optimization 8: Memory Management and GC Tuning

Executor JVM sizing is often the difference between steady throughput and long tail times caused by frequent GC or OOM kills. Set spark.executor.memory for heap used by tasks and caches, and spark.executor.memoryOverhead so off-heap structures, direct buffers, and native libraries have headroom beyond the heap. In Spark 3.x the unified memory manager shares one pool between execution and storage; spark.memory.fraction caps how much of the JVM heap Spark may use for that pool, while spark.memory.storageFraction biases how aggressively cached data can borrow from execution space. Pair sizing with G1GC for large heaps: enable G1, tune pause targets, and cap region size only when profiling shows benefit.

python
1from pyspark.sql import SparkSession2 3spark = (4    SparkSession.builder5    .appName("tuned-memory")6    .config("spark.executor.memory", "16g")7    .config("spark.executor.memoryOverhead", "4g")8    .config("spark.memory.fraction", "0.8")9    .config("spark.memory.storageFraction", "0.3")10    .config("spark.executor.extraJavaOptions",11            "-XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:+ParallelRefProcEnabled")12    .getOrCreate()13)

Optimization 9: Column Pruning and Projection Pushdown

Wide tables amplify shuffle bytes, serialization cost, and memory spikes because Spark materializes all columns unless the optimizer can prove they are unused. Selecting only the columns you need as early as possible keeps row batches smaller in memory, reduces Parquet/ORC I/O via column pruning and predicate pushdown at the source, and shrinks shuffle partitions during joins and aggregations. Treat select as part of your physical plan design, not a cosmetic refactor: push projections before joins, window functions, and UDF-heavy transforms so downstream operators never carry dormant fields.

python
1# Before: carries unused columns through heavy transforms2raw = spark.read.parquet("s3://lake/facts_orders")3joined = raw.join(dim_customer, "customer_id").filter("order_date >= '2024-01-01'")4 5# After: prune early, then join6orders = spark.read.parquet("s3://lake/facts_orders").select(7    "order_id", "customer_id", "order_date", "order_total"8)9customers = spark.read.parquet("s3://lake/dim_customer").select(10    "customer_id", "country", "segment"11)12joined = orders.join(customers, "customer_id").filter("order_date >= '2024-01-01'")

Monitoring Your Tuning: The Spark UI Checklist

Treat the Spark UI as your regression suite: after each change, compare shuffle read and write totals, maximum shuffle spill to memory and disk, and whether spill moved between stages. Open the Stages tab and scan task duration distribution; a few stragglers with much higher duration than the median usually indicate skew, bad partitioning, or expensive UDFs on hot keys. In the Executors view, watch Task Time versus GC Time; rising GC Time as a fraction of CPU time signals heap pressure or oversized cached objects. For automated diffing across runs, parse the event log JSON lines and aggregate listener records into counters you can track in CI.

python
1import gzip2import json3from collections import defaultdict4from pathlib import Path5 6def scan_event_log(event_dir: str) -> dict:7    totals = defaultdict(float)8    for path in Path(event_dir).glob("*.gz"):9        with gzip.open(path, "rt", encoding="utf-8") as handle:10            for line in handle:11                evt = json.loads(line)12                if evt.get("Event") == "SparkListenerTaskEnd":13                    m = evt.get("Task Metrics", {})14                    sm = m.get("Shuffle Read Metrics", {})15                    sw = m.get("Shuffle Write Metrics", {})16                    totals["shuffle_read_bytes"] += sm.get("Total Bytes Read", 0)17                    totals["shuffle_write_bytes"] += sw.get("Shuffle Bytes Written", 0)18                    totals["memory_bytes_spilled"] += m.get("Memory Bytes Spilled", 0)19                    totals["disk_bytes_spilled"] += m.get("Disk Bytes Spilled", 0)20                    totals["jvm_gc_time_ms"] += m.get("JVM GC Time", 0)21                    totals["executor_run_time_ms"] += m.get("Executor Run Time", 0)22    return dict(totals)23 24print(scan_event_log("/spark/history/application_123_456"))

The cumulative improvement: skew fix (4h to 1.5h), broadcast joins (1.5h to 40min), partition optimization (40min to 25min), predicate pushdown (25min to 20min), AQE + Kryo + caching + memory tuning (20min to 18min). The lesson: data layout and join strategy dominate PySpark performance. AQE, serialization, and caching are polishing steps. If your pipeline is slow, check the Spark UI for skew and shuffle volume before touching any config knob.

Tags

Platformpysparkspark performancedata skewbroadcast joinadaptive query executionkryo serialization

Related articles

Found this useful? Share it with your team.