0

Spark's Pandas API allows for Pandas functions to be performed on top of a Spark dataframe that looks and behaves like a Pandas Dataframe. Pandas has functions that Spark does not have implementations for, the one that I care about is the ewm function to provide an exponential moving average. I'm having trouble figuring out how to run these functions in a way that takes advantage of Spark's distributed processing, as the calculation is being performed on a single partition instead of the desired outcome of multiple partitions.

I'm fairly new to Spark/Pyspark, so I tried pursuing a couple avenues to find solutions to the problem of running a function on a subset of data over a window. I tried exploring UDFs, but currently it appears that UDFs can only apply non-aggregating functions, so applying a function to a window of data wasn't something I was able to get working. After exploring the Pandas API I was able to achieve the functionality I was after, applying the ewm function to my data:

from pyspark.sql import functions as F
import pyspark.pandas as ps
def ewm_2(column: pd.Series[float]) -> pd.Series[float]:
 return column.ewm(span=2).mean()
def calculate_pandas_api(df):
 ps.set_option("compute.ops_on_diff_frames", True)
 pdf = df.pandas_api()
 pdf["EWM2"] = pdf.groupby("Name")["Scores"].transform(ewm_2)
 sdf = pdf.to_spark()
 sdf = sdf.repartition("Name")
 return sdf

When the operation was applied though, I saw a stream of warnings about performance degradation occurring as all of the calculations were being performed on a single partition. The groupby I added certainly achieved the functionality of seperating each name for it's own ewm calculation, but Spark wasn't able to infer that those operations could be performed independently of each other. I haven't been able to find any resources that seemed to provide a solution to this problem. I briefly explored fugue, but quickly realized that I would run into the same aggregation problem I had trying to write a udf. I'm not sure what to try next, and so I would love some feedback if anyone would be willing to correct any misconceptions I have, or give some guidance as to a solution to parallelize this operation through spark.

3
  • You have apply the pandas function on a grouped dataframe. here's an example of how to do that. stackoverflow.com/questions/40006395/… Commented Dec 2, 2023 at 11:44
  • You are truly incredible, that is just what I needed! Thank you so much for pointing me in that direction. For the sake of completeness--what would we need to do to parallelize a function declared by the Pandas API? The information for this is somewhat sparse: spark.apache.org/docs/latest/api/python/user_guide/… Commented Dec 2, 2023 at 19:08
  • Actually spark takes care of parallel execution. Basically it will shuffle data to ensure all data belonging to one group is in one executor so that it can be executed by a pandas function in appropriate manner. So if you have N independent groups, they will be executed independently Commented Dec 2, 2023 at 19:42

0

Know someone who can answer? Share a link to this question via email, Twitter, or Facebook.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.