1

I am trying to write a util function that gives min, max, sum, mean, first of any column cumulative within a window but I need to make it time aware. Should I use rangebetween of rowsbetween?

For instance in the below data, if I partition the window on "partition_col" and calculate the cumulative sum, so for partition for "c", how would it calculate the cumulative sums since all the dates are same?

schema = StructType(
 [
 StructField("partition_col", StringType(), True),
 # StructField("order_col", IntegerType(), True),
 StructField("insert_date", DateType(), True),
 StructField("item", StringType(), True),
 StructField("sales", IntegerType(), True),
 ]
 )
data = [
 ("A", date(2021, 6, 1), "apple", 100),
 ("A", date(2022, 6, 2), "banana", 150),
 ("A", date(2022, 6, 3), "banana", 200),
 ("B", date(2023, 6, 1), "apple", 250),
 ("B", date(2023, 6, 2), "apple", 300),
 ("B", date(2022, 6, 3), "banana", 350),
 ("c", date(2022, 6, 3), "banana", 350),
 ("c", date(2022, 6, 3), "apple", 100),
 ("c", date(2022, 6, 3), "banana", 10),
 ]

Also in the rangebetween, do we always have to provide an order col? If yes, does it only have to be numeric? In my function, I want to keep the order col as optional, so how will it work with the range between?

I tried using:

def get_cumulative_agg_by_window(
 df: DataFrame,
 agg_dict: Dict[str, List[str]],
 partition_cols: Union[str, List[str]],
 order_col: str
) -> DataFrame:
"""This function provides cumulative specified aggregations within a window and allows setting custom new column names
 based on aggregation and column name.
 Args:
 df (DataFrame): The input DataFrame.
 agg_dict (Dict[str, List[str]]): A dictionary with keys as column names for aggregation and values as a list of aggregations to perform on each column (e.g., ["min", "max", "sum", "avg", "first", "all"]).
 partition_cols (Union[str, List[str]]): Column(s) to partition the data by.
 order_col (str): Column to order the data within each partition."""
 window_spec_agg = Window.partitionBy(partition_cols)
 window_spec_cumulative = window_spec_agg.rangeBetween(
 Window.unboundedPreceding, Window.currentRow)
 
 for agg in aggregations:
 new_col_name = f"cumulative {agg}_{col}"
 df = df.withColumn(
 new_col_name, getattr(F, agg)(F.col(col)).over(window_spec_cumulative)
 )
 return df
 
agg_dict = {"sales": "sum"}
 result_df = get_cumulative_agg_by_window(
 test_df,
 agg_dict,
 partition_cols=["item"],
 )
 result_df.show()

but this throws an error

An error occurred while applying the aggregation on a column in the window : [DATATYPE_MISMATCH.RANGE_FRAME_WITHOUT_ORDER] Cannot resolve "(PARTITION BY partition_col RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)" due to data type mismatch: A range window frame cannot be used in an unordered window specification.;

Any help would be appreciated. Thank you.

asked Jul 4, 2025 at 11:22
3
  • I have 2 questions that need your clarification: 1. What is aggregations in your functions? Where did you declare your variable? 2. What is your expected output based on the example data above? It seems we have two columns that can be used as partition columns (partition_col, item). Commented Jul 7, 2025 at 9:45
  • Aggregations is the list that stores all the aggregation I want e.g sum, mean ,max etc Commented Jul 15, 2025 at 8:20
  • 1
    you cannot use cumulative aggregation wihout ordering. it just doest make sense. as you are working with distributed systems, the order is random, therefore, if you do not specify an ordering col, your computation is not Deterministic which is not allowed Commented Jul 16, 2025 at 7:19

1 Answer 1

0

As I mentioned in my comment, you cannot use rangeBetween without an ordering column, as the result would be non-deterministic.

That said, rangeBetween is still your best option because you want to include all rows with the same date. However, you need to provide a numeric column for ordering.

Let me show you a little trick to make this work. It will still result in non-deterministic output, but it will be allowed by Spark.

You can create a numeric ordering column using a simple range. This does introduce some additional computation and may impact performance slightly.

Here’s an example:

from pyspark.sql import functions as F
from pyspark.sql.window import Window
df = df.withColumn(
 "ordering_col",
 F.rank().over(Window.partitionBy().orderBy(F.col("col1"), F.col("col2")))
)

You can then use this computed column as your rangeBetween ordering column. Hopefully, this will give you the results you're expecting.

answered Jul 16, 2025 at 7:29
Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for this answer Steven. According to the business requirement, the order by col should be a date. Can you please help me understand that if my order col is a date, based on that can I define user based window size? For example can I give an arg of 7d/1M/2y and the window is calculated based on this and the current row?
if you want that kind of range, you should use timestamp. timestamp is a number of second, therefore, if you want 7d, you need to transforme it into seconds : 7d = 7d * 24h/d * 3600s/h = 604800s. you can compute the same for month or year.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.