3
\$\begingroup\$

I have a csv file with more than 700,000,000 records in this structure:

product_id start_date end_date
1 19-Jan-2000 20-Mar-2000
1 20-Mar-2000 25-Apr-2000
1 20-May-2000 27-Jul-2000
1 27-Jul-2000 
2 20-Mar-2000 25-Apr-2000
3 12-Jan-2010 30-Mar-2010
3 30-Mar-2010

End_date is null means product currently is in used.

End_date can mean 2 things, 1 - disable product, 2 - battery replace

If End_date is the same as the next start_date, then it is battery replacement.

The expect result is, product_id along with the start_date of its current lifecycle (battery replace is counted in current lifecycle).

Which mean, the start_date should be the date after its last disability. For example above, output would be:

product_id start_date 
1 20-May-2000
3 12-Jan-2010 

My code is as below. It's kind of ugly, so if you could please review and advise if this code can run well with 700,000,000 records or there are better ways/methods to solve this challenge. I have run this code and seem a little bit slow for 100 records test file.Thank you for your help.

Code:

# csv input
df = spark.read.csv('productlist.csv', header=True, inferSchema=True)
# filter out stopped product id 
df2 = df.select("product_id").filter("end_date is null")
df = df.join(df2, ["product_id"])
# sort dataframe by product id & start date desc
df = df.sort(['product_id', 'start_date'],ascending=False)
# create window to add next start date of the product
w = Window.partitionBy("product_id").orderBy(desc("product_id"))
df = df.withColumn("next_time", F.lag(df.start_date).over(w))
# add column to classify if the change of the current record is product disability or battery change.
df = df.withColumn('diff', F.when(F.isnull(df.end_date), 0)
 .otherwise(F.when((df.end_date != df.next_start_date), 1).otherwise(0)))
# add column to classify if the product has been disabled at least once or not
df3 = df.groupBy('product_id').agg(F.sum("diff").alias("disable"))
df = df.join(df3, ["product_id"])
# get requested start date for those products have not been disabled
df1 = df.filter(df.disable == 0).groupBy("product_id").agg(F.min("start_date").alias("first_start_date"))
# get requested date for those products have been disabled once, 
# which is the first next start date at the most recent disable date 
df2 = df.filter(df.diff == 1).groupBy("product_id").agg(F.max("next_start_date").alias("first_start_date"))
301_Moved_Permanently
29.4k3 gold badges49 silver badges98 bronze badges
asked Apr 11, 2018 at 6:36
\$\endgroup\$

1 Answer 1

1
\$\begingroup\$

I believe the below solution should solve for what you are looking to do in a more efficient way. Your current method involves a lot of "shuffle" operations (group by, sorting, joining). The below should help reduce the number of shuffle operation in your Spark job.

  1. get leading start date
  2. get disabled records
  3. add column indicating whether product ever disabled (max of is disabled)
  4. capture replacement dataset
  5. get max replacement date
  6. create indicator for current lifecycle records
  7. filter data for current lifecycle records.

# csv input
df = spark.read.csv('productlist.csv', header=True, inferSchema=True)
# get ordered and unordered windows
wo = Window.partitionBy("product_id").orderBy("start_date")
wu = Window.partitionBy("product_id")
df1 = df.withColumn("lead_start_date", F.lead(col("start_date"), 1).over(wo))\
 .withColumn("is_disabled", F.when((col("end_date").isNotNull()) &
 ((col("end_date") != col("lead_start_date")) | (col("lead_start_date").isNull())), 1).otherwise(0))\
 .withColumn("has_been_disabled", F.max(col("is_disabled")).over(wu))\
 .withColumn("replacement_date", F.when((col("end_date").isNotNull()) &
 (col("end_date") == col("lead_start_date")) & (col("lead_start_date").isNotNull()), col("start_date")).otherwise(lit(None)))\
 .withColumn("max_replacement_date", F.max(col("replacement_date")).over(wu))\
 .withColumn("is_current_lifecycle_record", F.when(((col("replacement_date") == col("max_replacement_date")) & col("replacement_date").isNotNull()) |
 ((col("has_been_disabled") == 0) & (col("max_replacement_date").isNull())), 1).otherwise(0)) # never disabled / replaced
# filter for current lifecycle record and select target columns
df_final = df1.filter(col("is_current_lifecycle_record") == 1).select(["product_id", "start_date"])
answered Apr 16, 2018 at 21:28
\$\endgroup\$

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.