0

Consider 2 dataframes holiday df and everyday df with 3 columns as below

  1. Holiday df: (5 records)
Country_code|currency_code| date
Gb | gbp | 2022年04月15日
Gb | gbp | 2022年04月16日
US | usd | 2022年04月17日
Gb | gbp | 2022年04月18日
Gb | gbp | 2022年04月21日
  1. Everyday df (4 records)
Country_code_demo|currency_code_demo| date_demo
Gb | gbp | 2022年04月14日
Gb | gbp | 2022年04月15日
Gb | gbp | 2022年04月16日
Gb | gbp | 2022年04月18日

with columns as country_code,currency_code and date columns. Date columns of both dataframes needs to be Compared based on country_code and currency_code. If date between both dataframes matches then everyday df:date_demo column needs to updated to next working day and the date should also not be present in the holiday df. Write a spark scala code using window functions. Expected output is as below

Country_code_demo|currency_code_demo| date_updated
Gb | gbp | 2022年04月14日
Gb | gbp | 2022年04月17日
Gb | gbp | 2022年04月17日
Gb | gbp | 2022年04月19日
Dmytro Mitin
53.1k3 gold badges34 silver badges76 bronze badges
asked Apr 5, 2023 at 4:16
1

1 Answer 1

1

The holiday df cannot be that big so I would use a UDF to compute, for each holiday, the next working day. Then I would join that dataframe to the everyday dataframe to obtain the result you seek:

// optionally, you can consider sundays and saturdays as non working days
// val nonWorkingDays = Set(java.time.DayOfWeek.SUNDAY, java.time.DayOfWeek.SATURDAY)
// using an empty set to match your expected result
val nonWorkingDays = Set[java.time.DayOfWeek]()
val nextWorkingDay = udf((dates : WrappedArray[java.sql.Date]) => {
 val dateSet = dates.map(_.toLocalDate).toSet
 dates.map(date => {
 var nextDate = date.toLocalDate.plusDays(1)
 while(dateSet.contains(nextDate) || nonWorkingDays.contains(nextDate.getDayOfWeek))
 nextDate = nextDate.plusDays(1)
 date -> java.sql.Date.valueOf(nextDate)
 })
})
val nextDayDF = holiday
 .groupBy("Country_code", "currency_code")
 .agg(collect_list('date) as "dates")
 .withColumn("date_struct", explode(nextWorkingDay('dates)))
 .drop("dates")
 // renaming columns to simplify the join
 .select($"Country_code" as "Country_code_demo", $"currency_code" as "currency_code_demo",
 $"date_struct._1" as "date_demo", $"date_struct._2" as "date_updated")
everyday
 .join(nextDayDF, Seq("Country_code_demo", "currency_code_demo", "date_demo"), "left")
 .withColumn("date_updated", coalesce('date_updated, 'date_demo))
 .show()
+-----------------+------------------+----------+------------+
|Country_code_demo|currency_code_demo| date_demo|date_updated|
+-----------------+------------------+----------+------------+
| Gb| gbp|2022年04月14日| 2022年04月14日|
| Gb| gbp|2022年04月15日| 2022年04月17日|
| Gb| gbp|2022年04月16日| 2022年04月17日|
| Gb| gbp|2022年04月18日| 2022年04月19日|
+-----------------+------------------+----------+------------+
answered Apr 5, 2023 at 6:23
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.