I would like to speed up pandas apply function. I have been using swifter . It currently takes about 5 mins for 200000 records using multiprocessing as below . Is there any way to speed this up further .
def partial_match(source_words, dest_words):
matched_words = ''
if any(word in dest_words for word in source_words) :
match_words_list = set(source_words)&set(dest_words)
matched_words = ",".join(match_words_list)
return matched_words
def exact_match(source_words, dest_words):
matched_words = ''
if all(word in dest_words for word in source_words) :
match_words_list = set(source_words)&set(dest_words)
matched_words = ",".join(match_words_list)
return matched_words
series_index = ['match_type', 'matched_words' ]
def perform_match(x):
match_series = pd.Series(np.repeat('', len(series_index)), index = series_index)
if x['remove_bus_ending'] == 'Y':
x['dest_words'] = x['dest_words_2']
else:
x['dest_words'] = x['dest_words_1']
# exact match
if (x['partial_match_flag'] == 'Y') :
match_series['matched_words'] = partial_match(x['source_words'], x['dest_words'])
if match_series['matched_words'] != '':
match_series['match_type'] = 'Partial Match'
elif (x['exact_match_2'] == 'Y'):
match_series['matched_words'] = exact_match(x['source_words'], x['dest_words'])
if match_series['matched_words'] != '':
match_series['match_type'] = 'Exact Match'
return match_series
from multiprocessing import Pool
from functools import partial
import numpy as np
def parallelize(data, func, num_of_processes=8):
data_split = np.array_split(data, num_of_processes)
pool = Pool(num_of_processes)
data = pd.concat(pool.map(func, data_split))
pool.close()
pool.join()
return data
def run_on_subset(func, data_subset):
return data_subset.swifter.apply(func, axis=1)
def parallelize_on_rows(data, func, num_of_processes=8):
return parallelize(data, partial(run_on_subset, func), num_of_processes)
df[match_series] = parallelize_on_rows(df, perform_match)
below is some sample data
flag1 partial_match_flag exact_match_flag source_words dest_word_2 dest_words_1
0 N Y N [song, la] [urban, karamay, credit, city, co, kunlun, com... [ltd, urban, karamay, credit, city, co, kunlun...
1 N Y N [song, la] [al, abdulah, nasser] [al, abdulah, nasser]
2 N Y N [song, la] [al, abdulah, nasser] [al, abdulah, nasser]
3 N Y N [song, la] [abdulamir, mahdi] [abdulamir, mahdi]
4 N Y N [song, la] [abdullah, al, nasser] [abdullah, al, nasser]
5 N Y N [song, la] [abu, al, jud] [abu, al, jud]
6 N Y N [song, la] [al, herz, adam] [al, herz, adam]
-
\$\begingroup\$ "However if you have CPU bound applications; one of the most important things you can known, is that if you add threading to a CPU bound application will it go faster or slower? Slower." \$\endgroup\$Peilonrayz– Peilonrayz ♦2020年06月08日 11:53:12 +00:00Commented Jun 8, 2020 at 11:53
-
\$\begingroup\$ Please edit your question so that you explain what your code does. Currently it is unclear. \$\endgroup\$Peilonrayz– Peilonrayz ♦2020年06月08日 11:54:30 +00:00Commented Jun 8, 2020 at 11:54
1 Answer 1
flag as boolean
If you change the flags from 'Y'
and 'N'
to True
and False
You can use boolean indexing. This should speed up a lot of things already
set
You check for each combination word in dest_words for word in source_words
on a list
of words. If the check matches, you convert to a set
. The containment check would be sped up by checking against a list, but using set
comparisons would speed this up a lot.
import typing
def partial_match(
source_words: typing.Set[str], dest_words: typing.Set[str], index=None
) -> typing.Tuple[typing.Any, typing.Optional[str]]:
intersection = source_words & dest_words
if intersection:
return index, ", ".join(intersection)
return index, None
def exact_match(
source_words: typing.Set[str], dest_words: typing.Set[str], index=None
) -> typing.Tuple[typing.Any, typing.Optional[str]]:
if source_words == dest_words:
return index, ", ".join(source_words)
return index, None
The reason I chose to return the index along with it is to be able to reconstruct the series easier when reassembling everything.
Don't touch the original data
You change your source data inplace (by adding columns). Better would be to leave this untouched, and keep the destination words etc in separate series.
Series.where
You can replace calls like this
if x['remove_bus_ending'] == 'Y':
x['dest_words'] = x['dest_words_2']
else:
x['dest_words'] = x['dest_words_1']
with Series.where
a = pd.Series(list("abcd"))
b = pd.Series(list("efgh"))
c = pd.Series([True, True, False, True])
b.where(c, other=a)
0 e 1 f 2 c 3 h dtype: object
If your data looks like this:
from io import StringIO
import pandas as pd
def setify(s):
return s.str.strip("[]").str.split(", ").apply(set)
df = pd.read_csv(StringIO(data_str), sep="\s\s+", index_col=False, engine='python')
df["source_words"] = setify(df["source_words"])
df["dest_words_1"] = setify(df["dest_words_1"])
df["dest_word_2"] = setify(df["dest_word_2"])
df["remove_bus_ending"] = df["remove_bus_ending"] == "Y"
df["partial_match_flag"] = df["partial_match_flag"] == "Y"
df["exact_match_flag"] = df["exact_match_flag"] == "Y"
intermediate dataframe
If you want to split the dataframe with arraysplit, you'll need to provide an intermediate form with the info you need:
df_intermediate = pd.concat(
[
df["dest_word_2"]
.where(df["remove_bus_ending"], other=df["dest_words_1"])
.rename("dest_words"),
df["source_words"],
],
axis=1,
)
You can even split it immediately according to what matching is needed
df_intermediate_partial = df_intermediate.loc[df["partial_match_flag"]]
df_intermediate_exact = df_intermediate.loc[df["exact_match_flag"]]
applying the function
not parallel:
result_partial = list(
map(
partial_match,
df_intermediate_partial["source_words"],
df_intermediate_partial["dest_words"],
df_intermediate_partial.index,
)
)
results_exact = list(
map(
exact_match,
df_intermediate_exact["source_words"],
df_intermediate_exact["dest_words"],
df_intermediate_exact.index,
)
)
result = pd.Series(result_partial + results_exact)
This should be easy to parallelize. Since I'm no expert on that, I'll leave that to others.
context manager
Most of the examples I found in the multiprocessing
documantation work with a context manager that takes care of the closing of the pool
with Pool(processes=4) as pool:
... # parallel part of the code
-
\$\begingroup\$ flagging as boolean and setifying data has improved speed drastically \$\endgroup\$abhilash Dasari– abhilash Dasari2020年06月11日 06:08:48 +00:00Commented Jun 11, 2020 at 6:08
Explore related questions
See similar questions with these tags.