Spark's Pandas API allows for Pandas functions to be performed on top of a Spark dataframe that looks and behaves like a Pandas Dataframe. Pandas has functions that Spark does not have implementations for, the one that I care about is the ewm function to provide an exponential moving average. I'm having trouble figuring out how to run these functions in a way that takes advantage of Spark's distributed processing, as the calculation is being performed on a single partition instead of the desired outcome of multiple partitions.
I'm fairly new to Spark/Pyspark, so I tried pursuing a couple avenues to find solutions to the problem of running a function on a subset of data over a window. I tried exploring UDFs, but currently it appears that UDFs can only apply non-aggregating functions, so applying a function to a window of data wasn't something I was able to get working. After exploring the Pandas API I was able to achieve the functionality I was after, applying the ewm function to my data:
from pyspark.sql import functions as F
import pyspark.pandas as ps
def ewm_2(column: pd.Series[float]) -> pd.Series[float]:
return column.ewm(span=2).mean()
def calculate_pandas_api(df):
ps.set_option("compute.ops_on_diff_frames", True)
pdf = df.pandas_api()
pdf["EWM2"] = pdf.groupby("Name")["Scores"].transform(ewm_2)
sdf = pdf.to_spark()
sdf = sdf.repartition("Name")
return sdf
When the operation was applied though, I saw a stream of warnings about performance degradation occurring as all of the calculations were being performed on a single partition. The groupby I added certainly achieved the functionality of seperating each name for it's own ewm calculation, but Spark wasn't able to infer that those operations could be performed independently of each other. I haven't been able to find any resources that seemed to provide a solution to this problem. I briefly explored fugue, but quickly realized that I would run into the same aggregation problem I had trying to write a udf. I'm not sure what to try next, and so I would love some feedback if anyone would be willing to correct any misconceptions I have, or give some guidance as to a solution to parallelize this operation through spark.
-
You have apply the pandas function on a grouped dataframe. here's an example of how to do that. stackoverflow.com/questions/40006395/…user238607– user2386072023年12月02日 11:44:48 +00:00Commented Dec 2, 2023 at 11:44
-
You are truly incredible, that is just what I needed! Thank you so much for pointing me in that direction. For the sake of completeness--what would we need to do to parallelize a function declared by the Pandas API? The information for this is somewhat sparse: spark.apache.org/docs/latest/api/python/user_guide/…Brian Anderson– Brian Anderson2023年12月02日 19:08:40 +00:00Commented Dec 2, 2023 at 19:08
-
Actually spark takes care of parallel execution. Basically it will shuffle data to ensure all data belonging to one group is in one executor so that it can be executed by a pandas function in appropriate manner. So if you have N independent groups, they will be executed independentlyuser238607– user2386072023年12月02日 19:42:01 +00:00Commented Dec 2, 2023 at 19:42