I have as input a potentially large CSV file (gzip compressed) with a known structure. I don't know in advance the size of this file, but let's say it can't fit in memory. The rows in this CSV are ordered like the following:
key1, ...other fields
key1,
key1,
key1,
key2,
key2,
key3,
key4,
key4,
⋮
They are ordered by the value in the first column (let's call it key
), but it is unknown how many rows are there for each distinct key. I need to scan the whole file and process only the first N rows matching each key (there could be more than N rows for some of the keys). These N rows per key can be processed in memory.
I came up with this code, but I don't like it very much. It is a bit messy:
import gzip
def process_rows(key rows):
print(f'Processed rows for key {key}')
def main(file_path, N=1000):
with gzip.GzipFile(filename=file_path) as file:
curr_key = None
rows_to_process = []
for line in file:
line = line.decode().strip()
if len(line) == 0:
continue
fields = line.split(',')
[key, field2, field3] = fields
if curr_key is not None:
if curr_key != key or (len(rows_to_process) > 0 and len(rows_to_process) % N == 0):
process_rows(key, rows_to_process)
# Find next key if needed
while curr_key == key:
line = next(file, None)
if line is None:
return # End of file, exit
line = line.decode().strip()
if len(line) < 1:
continue
fields = line.split(',')
[key, field2, field3] = fields
print('Found next key', key)
# Reset rows to process
rows_to_process = []
curr_key = key
rows_to_process.append([key, field2, field3])
# Flush trailing data
if (len(rows_to_process) > 0):
process_rows(key, rows_to_process)
Is there a cleaner way to do this?
1 Answer 1
Minor niggles
Testing len() > 0
is over-wordy. If we want to test whether a string or list is non-empty, its truthiness directly indicates that (see below, if not line:
).
Structure and design
There's a lot in main()
, and it doesn't lend itself to unit-testing very well. I would split its responsibilities, probably splitting into a generator that emits each line as an array (or more likely, a tuple) of fields, and a consumer that batches your N rows:
def rows(input):
'''
Generator function yielding the first three fields of each line of input.
'''
for line in input:
line = line.decode().strip()
if not line:
continue
fields = line.split(',') # assumes no quoted ',' in fields
yield fields[0:2]
For the grouping of up to N lines, we can take advantage of the standard library functions in itertools
to eliminate most of the logic you wrote.
Specifically, itertools.groupby()
to get an iterator for each group of rows, and itertools.islice()
to take the first N elements from each of those:
import gzip
import itertools
import operator
def main(file_path, N=1000, func=process_rows):
groups = itertools.groupby(rows(gzip.GzipFile(filename=file_path)),
operator.itemgetter(0))
for (key, values) in groups:
func(key, itertools.islice(values, N))
I tested the code with a modified process function:
def process_rows(key, rows):
print(f'Processed {len(list(rows))} row(s) for key {key}')
and this input:
alpha,0,1
alpha,0,2
alpha,0,3
alpha,1,4
beta,2,6
gamma,3,0
gamma,4,0
gamma,5,0
gamma,6,0
The output is:
Processed 3 row(s) for key alpha
Processed 1 row(s) for key beta
Processed 3 row(s) for key gamma
-
2\$\begingroup\$ Testing len() > 0 might also be considered good PEP0020 (explicit is better than implicit). \$\endgroup\$Reinderien– Reinderien2022年10月15日 13:02:39 +00:00Commented Oct 15, 2022 at 13:02