1

I am using pyspark and obtained a table as below, table_1

+--------+------------------+---------------+----------+-----+
| Number|Institution | datetime| date| time|
+--------+------------------+---------------+----------+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|
+--------+------------------+---------------+----------+-----+

I would like to add a lag column of the time (15 minutes) to the table_1

+--------+------------------+---------------+----------+-----+-----+
| Number|Institution | datetime| date| time|lag1 |
+--------+------------------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|7:30 |
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+------------------+---------------+----------+-----+-----+
from datetime import datetime, timedelta
table_2 = table_.withColumn('lag1', (datetime.strptime(table1['time'], '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))

The code above could be applied to a string but I have no idea why it cannot apply to the table in this case. It showed an error '''TypeError: strptime() argument 1 must be str, not Column''', is there any method to obtain a string from a column in Pyspark? Thanks!

mck
42.7k13 gold badges44 silver badges62 bronze badges
asked Jan 13, 2021 at 11:28

1 Answer 1

4

You can't use Python functions directly on Spark dataframe columns. You can use Spark SQL functions instead, as shown below:

import pyspark.sql.functions as F
df2 = df.withColumn(
 'lag1',
 F.expr("date_format(to_timestamp(time, 'H:m') - interval 15 minute, 'H:m')")
)
df2.show()
+--------+-----------+---------------+----------+-----+-----+
| Number|Institution| datetime| date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45| 7:30|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+

Alternatively, you can call the Python function as a UDF (but the performance should be worse than calling Spark SQL functions directly):

import pyspark.sql.functions as F
from datetime import datetime, timedelta
lag = F.udf(lambda t: (datetime.strptime(t, '%H:%M') -timedelta(minutes=15)).strftime('%H:%M'))
df2 = df.withColumn('lag1', lag('time'))
df2.show()
+--------+-----------+---------------+----------+-----+-----+
| Number|Institution| datetime| date| time| lag1|
+--------+-----------+---------------+----------+-----+-----+
|AE19075B| ABC| 7/20/2019 7:45|07/20/2019| 7:45|07:30|
|AE11688U| CBT|2/11/2019 20:31|02/11/2019|20:31|20:16|
+--------+-----------+---------------+----------+-----+-----+
answered Jan 13, 2021 at 12:14
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

Draft saved
Draft discarded

Sign up or log in

Sign up using Google
Sign up using Email and Password

Post as a guest

Required, but never shown

Post as a guest

Required, but never shown

By clicking "Post Your Answer", you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.