2

My dataframe

df_a = spark.createDataFrame( [ 
 (0, ["B","C","D","E"] , [1,2,3,4] ),
 (1,["E","A","C"] , [1,2,3] ),
 (2, ["F","A","E","B"] , [1,2,3,4]),
 (3,["E","G","A"] , [1,2,3 ]),
 (4,["A","C","E","B","D"] , [1,2,3,4,5])] , ["id","items",'rank'])

and i want my output as :

+---+----+----+
| id|item|rank|
+---+----+----+
| 0| B| 1|
| 0| C| 2|
| 0| D| 3|
| 0| E| 4|
| 1| E| 1|
| 1| A| 2|
| 1| C| 3|
| 2| F| 1|
| 2| A| 2|
| 2| E| 3|
| 2| B| 4|
| 3| E| 1|
| 3| G| 2|
| 3| A| 3|
| 4| A| 1|
| 4| C| 2|
| 4| E| 3|
| 4| B| 4|
| 4| D| 5|
+---+----+----+

my dataframe has 8 million rows and when i try to zip and explode as below, its extremely slow and job runs forever using 15executors and 25GB memory

zip_udf2 = F.udf(
 lambda x, y: list(zip(x, y)),
 ArrayType(StructType([
 StructField("item", StringType()),
 StructField("rank", IntegerType())
 
 ]))
)
(df_a
 .withColumn('tmp', zip_udf2("items", "rank"))
 .withColumn("tmp", F.explode('tmp'))
 .select("id", F.col("tmp.item"), F.col("tmp.rank"))
 .show())

Any alternate methods? i tried rdd.flatMap still didn't make a dent on the performance. number of elements in the arrays in each row varies.

asked Sep 13, 2021 at 23:40
2
  • What zip_udf2 does? Could you share it? Commented Sep 13, 2021 at 23:43
  • Add zip_udf2, to my post Commented Sep 13, 2021 at 23:50

1 Answer 1

1

UPDATE

Since you are using Spark 2.3.2 and arrays_zip isn't available, I did some tests comparing which is the best option: udf or posexplode. The quick answer is: posexplode.

(df_a
 .select('id', F.posexplode('items'), 'rank')
 .select('id', F.col('col').alias('item'), F.expr('rank[pos]').alias('rank'))
 .show())

Tests

from pyspark.sql.types import *
import pyspark.sql.functions as F
import time
df_a = spark.createDataFrame([ 
 (0, ["B","C","D","E"] , [1,2,3,4] ),
 (1,["E","A","C"] , [1,2,3] ),
 (2, ["F","A","E","B"] , [1,2,3,4]),
 (3,["E","G","A"] , [1,2,3 ]),
 (4,["A","C","E","B","D"] , [1,2,3,4,5])] , ["id","items",'rank'])
# My solution
def using_posexplode():
 (df_a
 .select('id', F.posexplode('items'), 'rank')
 .select('id', F.col('col').alias('item'), F.expr('rank[pos]').alias('rank'))
 .count())
# Your solution
zip_udf2 = F.udf(
 lambda x, y: list(zip(x, y)),
 ArrayType(StructType([
 StructField("item", StringType()),
 StructField("rank", IntegerType())
 ])))
def using_udf():
 (df_a
 .withColumn('tmp', zip_udf2("items", "rank"))
 .withColumn("tmp", F.explode('tmp'))
 .select("id", F.col("tmp.item"), F.col("tmp.rank"))
 .count())
def time_run_method(iterations, fn):
 t0 = time.time()
 for i in range(iterations):
 fn()
 td = time.time() - t0
 
 print(fn.__name__, "Time to count %d iterations: %s [sec]" % (iterations, "{:,}".format(td)))
 
for function in [using_posexplode, using_udf]:
 time_run_method(iterations=100, fn=function)
using_posexplode Time to count 100 iterations: 24.962905168533325 [sec]
using_udf Time to count 100 iterations: 44.120017290115356 [sec]

OLD

There is no guarantee that only this will solve your entire problem, but one thing to consider is to remove your zip_udf2 and change it to a Spark's native function arrays_zip. Here is an explanation about why we should avoid (when it's possible) UDF.

answered Sep 13, 2021 at 23:57
Sign up to request clarification or add additional context in comments.

4 Comments

I am using spark2.3.2 and arrays_zip doesn't exist. Hence had to resort to udf
Thank @kafels. Let me give this a try !
Using 2 explode for each of the column produces multiple combination of rows. (df_a .withColumn('item', F.explode('items')) .withColumn('rank', F.explode('rank')) .select('id', 'item', 'rank') .count()) -> 75 rows I actually want item,rank to be zipped and produce only one row per combination
@Appden65 Check it again. I solved it using posexplode and expr

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.