I am trying to write a util function that gives min, max, sum, mean, first of any column cumulative within a window but I need to make it time aware. Should I use rangebetween of rowsbetween?
For instance in the below data, if I partition the window on "partition_col" and calculate the cumulative sum, so for partition for "c", how would it calculate the cumulative sums since all the dates are same?
schema = StructType(
[
StructField("partition_col", StringType(), True),
# StructField("order_col", IntegerType(), True),
StructField("insert_date", DateType(), True),
StructField("item", StringType(), True),
StructField("sales", IntegerType(), True),
]
)
data = [
("A", date(2021, 6, 1), "apple", 100),
("A", date(2022, 6, 2), "banana", 150),
("A", date(2022, 6, 3), "banana", 200),
("B", date(2023, 6, 1), "apple", 250),
("B", date(2023, 6, 2), "apple", 300),
("B", date(2022, 6, 3), "banana", 350),
("c", date(2022, 6, 3), "banana", 350),
("c", date(2022, 6, 3), "apple", 100),
("c", date(2022, 6, 3), "banana", 10),
]
Also in the rangebetween, do we always have to provide an order col? If yes, does it only have to be numeric? In my function, I want to keep the order col as optional, so how will it work with the range between?
I tried using:
def get_cumulative_agg_by_window( df: DataFrame, agg_dict: Dict[str, List[str]], partition_cols: Union[str, List[str]], order_col: str ) -> DataFrame: """This function provides cumulative specified aggregations within a window and allows setting custom new column names based on aggregation and column name. Args: df (DataFrame): The input DataFrame. agg_dict (Dict[str, List[str]]): A dictionary with keys as column names for aggregation and values as a list of aggregations to perform on each column (e.g., ["min", "max", "sum", "avg", "first", "all"]). partition_cols (Union[str, List[str]]): Column(s) to partition the data by. order_col (str): Column to order the data within each partition.""" window_spec_agg = Window.partitionBy(partition_cols) window_spec_cumulative = window_spec_agg.rangeBetween( Window.unboundedPreceding, Window.currentRow) for agg in aggregations: new_col_name = f"cumulative {agg}_{col}" df = df.withColumn( new_col_name, getattr(F, agg)(F.col(col)).over(window_spec_cumulative) ) return df agg_dict = {"sales": "sum"} result_df = get_cumulative_agg_by_window( test_df, agg_dict, partition_cols=["item"], ) result_df.show()
but this throws an error
An error occurred while applying the aggregation on a column in the window : [DATATYPE_MISMATCH.RANGE_FRAME_WITHOUT_ORDER] Cannot resolve "(PARTITION BY partition_col RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)" due to data type mismatch: A range window frame cannot be used in an unordered window specification.;
Any help would be appreciated. Thank you.
1 Answer 1
As I mentioned in my comment, you cannot use rangeBetween without an ordering column, as the result would be non-deterministic.
That said, rangeBetween is still your best option because you want to include all rows with the same date. However, you need to provide a numeric column for ordering.
Let me show you a little trick to make this work. It will still result in non-deterministic output, but it will be allowed by Spark.
You can create a numeric ordering column using a simple range. This does introduce some additional computation and may impact performance slightly.
Here’s an example:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = df.withColumn(
"ordering_col",
F.rank().over(Window.partitionBy().orderBy(F.col("col1"), F.col("col2")))
)
You can then use this computed column as your rangeBetween ordering column. Hopefully, this will give you the results you're expecting.
aggregationsin your functions? Where did you declare your variable? 2. What is your expected output based on the example data above? It seems we have two columns that can be used as partition columns (partition_col,item).