When processing large files (say for parsing or performing computations based on the contents), we want to be able to use multiple processes to perform the job faster.
In my case I wanted to count the frequency of appearance of features in a LibSVM formated file, similar to a word-count which is a typical parallel processing example.
Example input:
1 4:22 6:22 7:44 8:12312
1 4:44 7:44
0 1:33 9:0.44
-1 1:55 4:0 8:12132
We want to count how many times each feature index, i.e. the value before the ':', appears. Here feature 4 appears 3 times, feature 7 appears 2 times etc.
Expected output:
[(4, 3), (7, 2), (8, 2), (1, 2), (6, 1), (9, 1)]
Here's my solution for Python 3:
import argparse
import multiprocessing as mp
import os
from operator import itemgetter
from collections import Counter
import functools
import json
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--input", required=True)
parser.add_argument("--output", action='store_true', default=False)
parser.add_argument("--no-stdout", action='store_true', default=False)
parser.add_argument("--cores", type=int, default=None)
return parser.parse_args()
def parse_libsvm_line(line: str) -> list:
"""
Parses a line in a LibSVM file to return the indexes of non-zero features
:param line: A line in LibSVM format: "1 5:22 7:44 99:0.88"
:return: A list of ints, one for each index appearing in the line
"""
features = line.split()[1:] # Get rid of the class value
indexes = [int(pair.split(":")[0]) for pair in features]
return indexes
def process_wrapper(arg_tuple):
"""
Applies the process function to every line in a chunk of a file, to determine the frequency
of features in the chunk.
:param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size
:return: A counter object that counts the frequency of each feature in the chunk
"""
line_process_fun, filename, chunk_start, chunk_size = arg_tuple
counter = Counter()
with open(filename) as f:
f.seek(chunk_start)
lines = f.read(chunk_size).splitlines()
for line in lines:
indexes = line_process_fun(line)
for index in indexes:
counter[index] += 1
return counter
def chunkify(fname, size=1024*1024):
"""
Creates a generator that indicates how to chunk a file into parts.
:param fname: The name of the file to be chunked
:param size: The size of each chunk, in bytes.
:return: A generator of (chunk_start, chunk_size) tuples for the file.
"""
file_end = os.path.getsize(fname)
with open(fname, 'r') as f:
chunk_end = f.tell()
while True:
chunk_start = chunk_end
f.seek(f.tell() + size, os.SEEK_SET)
f.readline()
chunk_end = f.tell()
yield chunk_start, chunk_end - chunk_start
if chunk_end > file_end:
break
if __name__ == '__main__':
args = parse_args()
pool = mp.Pool(args.cores)
jobs = []
# Create one job argument tuple for each chunk of the file
for chunk_start, chunk_size in chunkify(args.input):
jobs.append((parse_libsvm_line, args.input, chunk_start, chunk_size))
# Process chunks in parallel. The result is a list of Counter objects
res_list = pool.map(process_wrapper, jobs)
# Aggregate the chunk dictionaries and sort by decreasing value
aggregated_count = sorted(functools.reduce(lambda a, b: a + b, res_list).items(),
key=itemgetter(1), reverse=True)
# Print the result
if not args.no_stdout:
print(aggregated_count)
# Write the result to a file as json (sorted list of [index, count] lists)
if args.output:
with open(args.input + "_frequencies.json", 'w') as out:
json.dump(aggregated_count, out)
# Close the pool workers
pool.close()
My questions are:
- Is it possible to do this in a single pass in parallel? Now I'm using one pass to determine the chunks, then one more for the processing.
- Is there a more efficient way to chunk text files? Now I'm using chunks of constant byte size.
1 Answer 1
A few comments, unfortunately not on the multiprocessing part.
parser.add_argument("--output", action='store_true', default=False)
is exactly the same asparser.add_argument("--output", action='store_true')
, the'store_true'
action makes sure that it is false if the flag is not set.I like to give my argument parsing functions an optional argument, so
def parse_args(args=None)
and later usereturn parser.parse_args(args)
. This allows you to interactively test this function by passing a list of strings to see if the parsing works as expected. When it isNone
, the parsing proceeds as it currently does.Python 3 has advanced tuple unpacking, so you could do
_, *features = line.split()
instead offeatures = line.split()[1:]
. Whether or not that is better is debatable, but it is good to know that this feature exists.While "indexes" is a valid plural of "index", if it is used in the mathematical sense, you should probably use "indices".
Counter
objects have a niceupdate
method. It can either take anotherCounter
(or actually anydict
subclass) object, in which case it works just like the normaldict.update
. But it can also take an iterable, in which case it consumes that iterable just like it does when creating the object (by counting the occurrences of each object). Soindexes = line_process_fun(line) for index in indexes: counter[index] += 1
Could just be
counter.update(line_process_fun(line))
Indeed, that whole function could be greatly simplified by using
map
anditertools.chain
:from itertools import chain def process_wrapper(arg_tuple): """ Applies the process function to every line in a chunk of a file, to determine the frequency of features in the chunk. :param arg_tuple: A tuple that contains: line_process_fun, filename, chunk_start, chunk_size :return: A counter object that counts the frequency of each feature in the chunk """ line_process_fun, filename, chunk_start, chunk_size = arg_tuple with open(filename) as f: f.seek(chunk_start) lines = f.read(chunk_size).splitlines() return Counter(chain.from_iterable(map(line_process_fun, lines)))
Right now you manually have to unpack
line_process_fun, filename, chunk_start, chunk_size = arg_tuple
, but if you usedPool.starmap
instead ofPool.map
, you could make the signaturedef process_wrapper(line_process_fun, filename, chunk_start, chunk_size)
.Counter
objects support not only updating, but also summing two instances. In this case, quite intuitively, all counts are added. They also have amost_common
method which returns tuples of values and counts from most to least counts, so exactly what your reduce and sort does. And finally,sum
takes an optional second argument stating what the base object is:res_list = pool.map(process_wrapper, jobs) aggregated_count = sum(res_list, Counter()).most_common()
Make sure to test that this does not slow down the processing, but even if it does, it sure is easier to understand. For the small example given, it is slightly slower on my machine.
multiprocessing.Pool
can also be used as a context manager to ensure it is closed after the processing. This would introduce another level of indentation, though.
-
\$\begingroup\$ Thanks for the review @Graipher, I like the use of
map
andchain
. For the aggregation I had tried to use sum, but didn't know it takes a second argument, that's cool. I thought aboutmost_common()
not sure why I went with the sort approach in the end. \$\endgroup\$Bar– Bar2018年10月19日 14:48:10 +00:00Commented Oct 19, 2018 at 14:48 -
\$\begingroup\$ @Bar You're welcome :-). The second argument of
sum
is indeed not used very often, but it can come in handy. I also added a point about usingPool.starmap
, instead ofPool.map
for some more readability. \$\endgroup\$Graipher– Graipher2018年10月19日 14:55:56 +00:00Commented Oct 19, 2018 at 14:55
Explore related questions
See similar questions with these tags.