[フレーム]
Last Updated: February 25, 2016
·
4.473K
· datasaur

Chunking large text files with multiple Python processes (and Redis)

https://gist.github.com/2914601

import multiprocessing, os, sys, time
from itertools import izip
import redis

rr = None
def process_chunk(chunk):
 global rr
 if rr is None:
 rr = redis.Redis() # Create one connection per process 
 print 'PID %d' % os.getpid()

 pl = rr.pipeline(transaction=False)
 lc = 0
 for line in chunk:
 sample = dict(izip(cols, line.split(',')))

 key = "%s" % (sample['hhmi']])
 bps = int(sample['data'])

 for f in fields:
 pl.hincrby("%s:%s" % (key, f), sample[f], bps)
 lc = lc + 1

 pl.execute()
 return lc 

def grouper(n, iterable):
 return izip(*[iter(iterable)]*n)

if __name__ == '__main__':
 cols = #list of all columns in the data set / header 
 cols = cols.split(',')
 fields = #list of fields to parse 
 fields = fields.split(',')

 csv = open('0000.csv','rU')
 pool = multiprocessing.Pool(6)

 c = 0
 lc = 0
 jobs = []
 for chunk in grouper(100, csv.readlines()):
 c += 1
 jobs.append(chunk)
 print '%d chunks' % c 

 stime = time.time()
 rc = pool.map(process_chunk, jobs)
 rtime = (time.time()-stime)+.001 # Avoid div/0

 for l in rc: lc += l
 print("END: %d/s" % int(lc/rtime))

AltStyle によって変換されたページ (->オリジナル) /