I have a PySpark dataframe that contains 100M rows. I'm trying to do a series of aggregations on multiple columns, after a groupby.
df_agg = df.groupby("colA","colB","colC","colD").agg(count(*).alias("numRecords"),
sort_array(collected_set("colE")).alias("colE"),
sum("colE").alias("colE"),
sum("colF").alias("colF"),
sum("colG").alias("colG"),
sum("colH").alias("colH"),
min("colI").alias("colI"),
max("colJ").alias("colJ"),
countDistinct("colK").alias("colK"),
first("colL").alias("colL"),
first("colM").alias("colM"),
first("colN").alias("colN"),
first("colO").alias("colO"),
sort_array(collected_set("colP")).alias("colP"),
sort_array(collected_set("colQ")).alias("colQ"),
max("colR").alias("colR"),
max("colS").alias("colS")
)
colL, colM, colN, colO are strings, and they are the same value for each group, so I simply want to get the first (or any) instance.
I ran the following and it failed several times, before it finally succeeded. I commented out first("colL").alias("colL"),, and it succeeded for the first time. Then I uncommented it, and it succeeded again, but ran noticeably slower.
df_agg[(df_agg["numRecords"] > 5) & (df_agg["numRecords"] < 10)].show(10, truncate=False)
The error I kept getting was Job aborted due to stage failure: ShuffleMapStage 51 (showString at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException
Is there any more efficient way to do these aggregations. I'm only running this on 100M rows now for testing, but eventually, I will need to run it on a much larger dataframe, possible tens or hundreds of billions of rows.
spark.sql.shuffle.partitions.explain()on the aggregations, and I see 2Exchange hashpartitioning, one with the keyscolA,colB,colC,colD, and the other one further down on the keyscolA,colB,colC,colD,ColE. So that means there are 2 shuffles? Does the order of the aggregations make any difference?