0

I have a PySpark dataframe that contains 100M rows. I'm trying to do a series of aggregations on multiple columns, after a groupby.

df_agg = df.groupby("colA","colB","colC","colD").agg(count(*).alias("numRecords"),
 sort_array(collected_set("colE")).alias("colE"),
 sum("colE").alias("colE"),
 sum("colF").alias("colF"),
 sum("colG").alias("colG"),
 sum("colH").alias("colH"),
 min("colI").alias("colI"),
 max("colJ").alias("colJ"),
 countDistinct("colK").alias("colK"),
 first("colL").alias("colL"),
 first("colM").alias("colM"),
 first("colN").alias("colN"),
 first("colO").alias("colO"),
 
 sort_array(collected_set("colP")).alias("colP"),
 
 sort_array(collected_set("colQ")).alias("colQ"),
 max("colR").alias("colR"),
 max("colS").alias("colS")
)

colL, colM, colN, colO are strings, and they are the same value for each group, so I simply want to get the first (or any) instance.

I ran the following and it failed several times, before it finally succeeded. I commented out first("colL").alias("colL"),, and it succeeded for the first time. Then I uncommented it, and it succeeded again, but ran noticeably slower.

df_agg[(df_agg["numRecords"] > 5) & (df_agg["numRecords"] < 10)].show(10, truncate=False)

The error I kept getting was Job aborted due to stage failure: ShuffleMapStage 51 (showString at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException

Is there any more efficient way to do these aggregations. I'm only running this on 100M rows now for testing, but eventually, I will need to run it on a much larger dataframe, possible tens or hundreds of billions of rows.

asked May 28, 2025 at 6:43
3
  • 3
    you're likely running out of memory or disk space due to the shuffle. you could try increasing executor memory. Commented May 28, 2025 at 6:56
  • 3
    did you try checking data skewness? i see you're performing lot of aggregations resulting in lot of shuffles. also you can try tweaking spark.sql.shuffle.partitions Commented May 28, 2025 at 10:11
  • I did a .explain() on the aggregations, and I see 2 Exchange hashpartitioning, one with the keys colA,colB,colC,colD, and the other one further down on the keys colA,colB,colC,colD,ColE. So that means there are 2 shuffles? Does the order of the aggregations make any difference? Commented May 29, 2025 at 6:51

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.