I have written a very basic algorithm, that counts the amount of times sub-string appears in a string.
The algorithm is given:
- seq - a string sequence
- k - length of a sub-string
- L - a window of a string to search in
- t - times that sub-string, length k has to be present to be added to results
from collections import defaultdict
import concurrent.futures
def kmer_clumps(seq, k, L, t):
"""Returns a set of L-t kmers that form a clump. If a kmer of length [k] apperas [t] times in window [L], it is added to a set"""
print("Starting a job")
clump_dict = defaultdict(list)
clumps = set()
for pos in range(len(seq) - k + 1):
current_pos = seq[pos:pos + k]
clump_dict[current_pos].append(pos)
if len(clump_dict[current_pos]) >= t:
if ((clump_dict[current_pos][-1] + (k - 1)) - clump_dict[current_pos][-t]) < L:
clumps.add(current_pos)
if clumps:
return clumps
else:
return None
Running this algorithm in one thread/core is easy, just pass all of the parameters and it scans the string and returns a list.
Now, this is a perfect candidate for running on multiple cores as sometimes I need to provide a large amount of data to this algorithm (text file 20mb+ in size or even more).
Below, I have used Python's import multiprocessing
and/or import concurrent.futures
, but this does not really matter as the question is: what is the best way of splitting the data to be passed to multiple threads/cores to avoid duplicate data, large amounts of memory usage, etc... I am hoping that people can share their experience in preparing data in this manner and best practices for this.
seq = "CGGACTCGACAGATGTGAAGAACGACAATGTGAAGACTCGACACGACAGAGTGAAGAGAAGAGGAAACATTGTAA"
k = 5
L = 50
t = 4
cores = 2
def sub_seq_clumps_multicore_job(seq, k, L, t, cores=1):
seqLength = len(seq)
if cores == 1:
print("Using only 1 core as specified or data is to small.")
return kmer_clumps(seq, k, L, t)
else:
print("Preparing data...")
# Basic logic to split the data to provide to multiple jobs/threads
jobSegments = seqLength // cores
extraSegments = seqLength % cores
overlapSize = (L // cores)
# Creating multiple parts from a single string and adding same arguments to all
# Amount of parts is based on core/thread count
parts = []
for part in range(0, seqLength, jobSegments + extraSegments):
tmpList = [seq[part:part + jobSegments +
extraSegments + overlapSize + 1], k, L, t]
parts.append(tmpList)
print(f"Processing data on {cores} cores...")
resultSet = set()
# Starting jobs/threads by passing all parts to a thread pool
with concurrent.futures.ProcessPoolExecutor() as executer:
jobs = [
executer.submit(kmer_clumps, *sec) for sec in parts
]
# Just collecting results
for f in concurrent.futures.as_completed(jobs):
resultSet.update(f.result())
# Returning collected results, which are sub-strings of length k,
# that were found in all windows length L
return resultSet
print(sub_seq_clumps_multicore_job(seq, k, L, t, cores))
Some sample output:
Data, segmented and passed to kmer_clumps(seq, k, L, t)
to run on 1 core/thread:
"CGGACTCGACAGATGTGAAGAACGACAATGTGAAGACTCGACACGACAGAGTGAAGAGAAGAGGAAACATTGTAA", 5, 50, 4
Data, segmented and passed to kmer_clumps(seq, k, L, t)
to run on 2 core/thread:
Thread 1:
['CGGACTCGACAGATGTGAAGAACGACAATGTGAAGACTCGACACGACAGAGTGAAGAGAAGAGG', 5, 50, 4]
Thread 2:
['CGACACGACAGAGTGAAGAGAAGAGGAAACATTGTAA', 5, 50, 4]
Data, segmented and passed to kmer_clumps(seq, k, L, t)
to run on 4 core/thread:
Thread 1:
['CGGACTCGACAGATGTGAAGAACGACAATGTGAA', 5, 50, 4]
Thread 2:
['ACGACAATGTGAAGACTCGACACGACAGAGTGAA', 5, 50, 4]
Thread 3:
['ACGACAGAGTGAAGAGAAGAGGAAACATTGTAA', 5, 50, 4]
Thread 4:
['GAAACATTGTAA', 5, 50, 4]]
Results in running time, when reading a huge data file (4.5 MB):
File reading complete!
Using only 1 core as specified or data is to small.
Starting a job
real 0m4.536s
File reading complete!
Preparing data...
Processing data on 2 cores...
Starting a job
Starting a job
real 0m2.438s
File reading complete!
Preparing data...
Processing data on 4 cores...
Starting a job
Starting a job
Starting a job
Starting a job
real 0m1.413s
File reading complete!
Preparing data...
Processing data on 6 cores...
Starting a job
Starting a job
Starting a job
Starting a job
Starting a job
Starting a job
real 0m1.268s
Because of the nature of the algorithm, when running on multiple threads/cores there is more data generated as we need to make sure we cover the whole data string and take a window L into the account. So there are data overlaps happening:
Let's take two examples:
Running on 2 cores: enter image description here
Running on 4 cores: enter image description here enter image description here enter image description here
With this post I have two goals:
1) To have experienced people look at it and suggest any improvements to how to segment the data for threading like that, to avoid data duplicates, memory overload, etc... any suggestions would be very helpful.
2) To learn from amazing people and so others can learn from this example.
Thank you all very much!
-
\$\begingroup\$ I am not including this algorithm to avoid confusion. It has been tested and it works as expected. - Fine; but what if a reviewer wants to execute your code (potentially with different segmentation)? \$\endgroup\$Reinderien– Reinderien2020年03月20日 19:57:38 +00:00Commented Mar 20, 2020 at 19:57
-
\$\begingroup\$ @Reinderien you are right. \$\endgroup\$rebelCoder– rebelCoder2020年03月20日 20:12:57 +00:00Commented Mar 20, 2020 at 20:12
1 Answer 1
Type hints
At a guess, this:
def kmer_clumps(seq, k, L, t):
can become
def kmer_clumps(seq: Sequence[str], k: int, l: int, t: int) -> Set[str]:
It's somewhat more self-documenting, and helps your IDE with static analysis and autocompletion.
p.s. There's nothing wrong with returning an empty set. That's much more preferable than returning None
. People iterating over the returned set often won't need to change their code for an empty set, but would need to change their code for a None
.
Logging
print("Starting a job")
can benefit from the (even simple) use of https://docs.python.org/3.8/library/logging.html#module-email
Global constants
...such as
k = 5
L = 50
t = 4
cores = 2
should be ALL_CAPS.
Variable names
seqLength
should be snake_case, i.e. set_length
; and so on for your other variables.
Generators
parts = []
for part in range(0, seqLength, jobSegments + extraSegments):
tmpList = [seq[part:part + jobSegments +
extraSegments + overlapSize + 1], k, L, t]
parts.append(tmpList)
can be cleaned up by factoring it out into a generator function:
def make_part_list(...):
for part in range(0, seqLength, jobSegments + extraSegments):
tmpList = [seq[part:part + jobSegments +
extraSegments + overlapSize + 1], k, L, t]
yield tmpList
# or if you don't want nested lists...
yield from tmpList
# ...
parts = tuple(make_part_list(...))
-
\$\begingroup\$ Thanks a lot for your input! This code is mostly draft, so 'Global constants', for example, were just for testing porpoise so people can try running the code. While you provided very interesting suggestions, I was more interested in discussing how we can optimize segmentation of the data, a string of text in this case, before we split it into multiple threads/cores. this code works just fine and does what I need it to do. I could not find any good information about how to segment/split data for multiprocessing. \$\endgroup\$rebelCoder– rebelCoder2020年03月22日 09:20:51 +00:00Commented Mar 22, 2020 at 9:20
-
\$\begingroup\$ In some cases you just need to do multiple things, which are not really connected ore semi-connected in parallel. This is easier. But in this case we have one single peace of data that we need to process and the result is a combination of all the 'findings' from all threads. I was wondering if there was a better way to approach data segmentation/splitting. \$\endgroup\$rebelCoder– rebelCoder2020年03月22日 09:20:58 +00:00Commented Mar 22, 2020 at 9:20
Explore related questions
See similar questions with these tags.