Resolving OutOfMemory (OOM) Errors in PySpark: Best Practices
1️⃣ Adjust Spark Configuration (Memory Management)
Increase Executor Memory: spark.conf.set("spark.executor.memory", "8g")
Increase Driver Memory: spark.conf.set("spark.driver.memory", "4g")
Set Executor Cores: spark.conf.set("spark.executor.cores", "2")
Use Disk Persistence: df.persist(StorageLevel.DISK_ONLY)
2️⃣ Enable Dynamic Allocation
Allow Spark to adjust executors:
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
3️⃣ Enable Adaptive Query Execution (AQE)
Enable AQE to optimize query plans:
spark.conf.set("spark.sql.adaptive.enabled", "true")
4️⃣ Enforce Schema for Unstructured Data
Prevent schema inference overhead:
df = spark.read.schema(schema).json("path/to/data")
5️⃣ Tune the Number of Partitions
Repartition DataFrame:
df = df.repartition(200, "column_name")
6️⃣ Handle Data Skew Dynamically
Use salting for skewed joins:
df1.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand()))
7️⃣ Limit Cache Usage for Large DataFrames
Cache selectively, or persist to disk:
df.persist(StorageLevel.MEMORY_AND_DISK)
8️⃣ Optimize Joins for Large DataFrames
Use broadcast joins for smaller tables:
df_join = large_df.join(broadcast(small_df), "join_key", "left")
9️⃣ Monitor Spark Jobs
Use Spark UI to track memory usage and job execution.
🔟 Consider Partitioning Strategy
Write partitioned data:
df.write.partitionBy("partition_column").parquet("path_to_data")
I have curated top-notch Data Engineering Interview Preparation Resources
👇👇
https://topmate.io/analyst/910180
All the best 👍👍
1️⃣ Adjust Spark Configuration (Memory Management)
Increase Executor Memory: spark.conf.set("spark.executor.memory", "8g")
Increase Driver Memory: spark.conf.set("spark.driver.memory", "4g")
Set Executor Cores: spark.conf.set("spark.executor.cores", "2")
Use Disk Persistence: df.persist(StorageLevel.DISK_ONLY)
2️⃣ Enable Dynamic Allocation
Allow Spark to adjust executors:
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "1")
3️⃣ Enable Adaptive Query Execution (AQE)
Enable AQE to optimize query plans:
spark.conf.set("spark.sql.adaptive.enabled", "true")
4️⃣ Enforce Schema for Unstructured Data
Prevent schema inference overhead:
df = spark.read.schema(schema).json("path/to/data")
5️⃣ Tune the Number of Partitions
Repartition DataFrame:
df = df.repartition(200, "column_name")
6️⃣ Handle Data Skew Dynamically
Use salting for skewed joins:
df1.withColumn("join_key_salted", F.concat(F.col("join_key"), F.lit("_"), F.rand()))
7️⃣ Limit Cache Usage for Large DataFrames
Cache selectively, or persist to disk:
df.persist(StorageLevel.MEMORY_AND_DISK)
8️⃣ Optimize Joins for Large DataFrames
Use broadcast joins for smaller tables:
df_join = large_df.join(broadcast(small_df), "join_key", "left")
9️⃣ Monitor Spark Jobs
Use Spark UI to track memory usage and job execution.
🔟 Consider Partitioning Strategy
Write partitioned data:
df.write.partitionBy("partition_column").parquet("path_to_data")
I have curated top-notch Data Engineering Interview Preparation Resources
👇👇
https://topmate.io/analyst/910180
All the best 👍👍