2

I have two data frames: df1

+---+-----------------+
|id1| items1|
+---+-----------------+
| 0| [B, C, D, E]|
| 1| [E, A, C]|
| 2| [F, A, E, B]|
| 3| [E, G, A]|
| 4| [A, C, E, B, D]|
+---+-----------------+ 

and df2:

+---+-----------------+
|id2| items2|
+---+-----------------+
|001| [A, C]|
|002| [D]|
|003| [E, A, B]|
|004| [B, D, C]|
|005| [F, B]|
|006| [G, E]|
+---+-----------------+ 

I would like to create an indicator vector (in a new column result_array in df1) based on values in items2. The vector should be of the same length as number of rows in df2 (in this example it should have 6 elements). Its elements should have either value of 1.0 if the row in items1 contains all the elements in the corresponding row of items2, or value 0.0 otherwise. The result should look as follows:

+---+-----------------+-------------------------+
|id1| items1| result_array|
+---+-----------------+-------------------------+
| 0| [B, C, D, E]|[0.0,1.0,0.0,1.0,0.0,0.0]|
| 1| [E, A, C]|[1.0,0.0,0.0,0.0,0.0,0.0]|
| 2| [F, A, E, B]|[0.0,0.0,1.0,0.0,1.0,0.0]|
| 3| [E, G, A]|[0.0,0.0,0.0,0.0,0.0,1.0]|
| 4| [A, C, E, B, D]|[1.0,1.0,1.0,1.0,0.0,0.0]|
+---+-----------------+-------------------------+

For example, in row 0, the second value is 1.0 because [D] is a subset of [B, C, D, E] and the fourth value is 1.0 because [B, D, C] is a subset of [B, C, D, E]. All other item groups in df2 are not subsets of [B, C, D, E], thus their indicator values are 0.0.

I've tried to create a list of all item groups in items2 using collect() and then apply a udf but my data is too large (over 10 million rows).

asked Oct 22, 2018 at 18:35

1 Answer 1

1

You can proceed like this,

import pyspark.sql.functions as F
from pyspark.sql.types import *
df1 = sql.createDataFrame([
 (0,['B', 'C', 'D', 'E']),
 (1,['E', 'A', 'C']),
 (2,['F', 'A', 'E', 'B']),
 (3,['E', 'G', 'A']),
 (4,['A', 'C', 'E', 'B', 'D'])],
 ['id1','items1'])
df2 = sql.createDataFrame([
 (001,['A', 'C']),
 (002,['D']),
 (003,['E', 'A', 'B']),
 (004,['B', 'D', 'C']),
 (005,['F', 'B']),
 (006,['G', 'E'])],
 ['id2','items2'])

Which gives you the dataframes,

+---+---------------+
|id1| items1|
+---+---------------+
| 0| [B, C, D, E]|
| 1| [E, A, C]|
| 2| [F, A, E, B]|
| 3| [E, G, A]|
| 4|[A, C, E, B, D]|
+---+---------------+
+---+---------+
|id2| items2|
+---+---------+
| 1| [A, C]|
| 2| [D]|
| 3|[E, A, B]|
| 4|[B, D, C]|
| 5| [F, B]|
| 6| [G, E]|
+---+---------+

Now, crossJoin the two dataframes, which gives you the cartesian product of df1 with df2. Then, groupby on 'items1' and apply a udf to get the 'result_array'.

get_array_udf = F.udf(lambda x,y:[1.0 if set(z) < set(x) else 0.0 for z in y], ArrayType(FloatType()))
df = df1.crossJoin(df2)\
 .groupby(['id1', 'items1']).agg(F.collect_list('items2').alias('items2'))\
 .withColumn('result_array', get_array_udf('items1', 'items2')).drop('items2')
df.show()

This gives you the output as,

+---+---------------+------------------------------+ 
|id1|items1 |result_array |
+---+---------------+------------------------------+
|1 |[E, A, C] |[1.0, 0.0, 0.0, 0.0, 0.0, 0.0]|
|0 |[B, C, D, E] |[0.0, 1.0, 0.0, 1.0, 0.0, 0.0]|
|4 |[A, C, E, B, D]|[1.0, 1.0, 1.0, 1.0, 0.0, 0.0]|
|3 |[E, G, A] |[0.0, 0.0, 0.0, 0.0, 0.0, 1.0]|
|2 |[F, A, E, B] |[0.0, 0.0, 1.0, 0.0, 1.0, 0.0]|
+---+---------------+------------------------------+
answered Oct 23, 2018 at 6:51
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you very much for your help. This is an excellent solution. So efficient and simple.

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.