0

As per the documentation (https://docs.databricks.com/en/optimizations/spark-ui-guide/one-spark-task.html) , Window function without PARTITION BY statement results in single task on Spark.

Is this true, given Spark does distributed parallel processing, is not spark first performs window aggregation (for ex: max(date) over (order by some_column) or row_number() over(order by date_col) ) at partition level and later together from all partitions? why it results in single task?

asked Aug 4, 2024 at 15:29
7
  • 1
    Perhaps running explain() on your dataframe is the best way to see whether its true or not. Commented Aug 4, 2024 at 18:48
  • 1
    Because the PARTITION is how the data is divided up for parallel processing. Having no PARTITION BY clause is actually having a SINGLE partition, so the calculation can't be subdivided for parallel processing. Commented Aug 4, 2024 at 22:01
  • 1
    Be aware of how window function max works - it calculates maximum value for rows between first and current row (by default) - here's example ->db-fiddle.com/f/4jyoMCicNSZpjMt4jFYoz5/0 Commented Aug 6, 2024 at 15:53
  • 1
    As per the documentation, that is NOT how window functions behave. Put aside your preconceptions and start wondering what must be true if the docs are not wrong. Commented Aug 6, 2024 at 18:42
  • 1
    Yeah, thanks, here's correct one ->db-fiddle.com/f/7rkZSnDT661o3aq9XhbDpW/0 Commented Aug 6, 2024 at 23:46

1 Answer 1

2

Window functions based on global ordering cannot be calculated with multiple partitions.

For example: row_number() over(order by date_col) at some row depends on the count of ALL rows across ALL partitions with lower date_col. Therefore all data need to be gathered in a single partition, sorted and then assigned row number one by one.

There could be some tricks to overcome this. You could precalculate rolling sums of partition counts and add them to partition-local row numbers. This approach is used in this Dataset.withRowNumbers function. But you would need to reimplement it for other functions like max.

answered Aug 4, 2024 at 21:57
Sign up to request clarification or add additional context in comments.

2 Comments

got it,thanks . Lets say in case of max() over(<without partition>), does not spark (a) computes max() within each partition (b) then moves max() of each partition to single executor to arrive at final max() value?
It's substantial difference between max(...) (1 record on output - here yes, we can aggregate at partition level, then combine results) and max(...) over (order by ...) (N records on output).

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.