Last Updated: February 25, 2016
·
4.403K
· datasaur

Chunking large text files with multiple Python processes (and Redis)

https://gist.github.com/2914601

import multiprocessing, os, sys, time
from itertools import izip
import redis

rr = None
def process_chunk(chunk):
    global rr
    if rr is None:
        rr = redis.Redis()  # Create one connection per process 
        print 'PID %d' % os.getpid()

    pl = rr.pipeline(transaction=False)
    lc = 0
    for line in chunk:
        sample = dict(izip(cols, line.split(',')))

        key = "%s" % (sample['hhmi']])
        bps = int(sample['data'])

        for f in fields:
            pl.hincrby("%s:%s" % (key, f), sample[f], bps)
            lc = lc + 1

    pl.execute()
    return lc 

def grouper(n, iterable):
    return izip(*[iter(iterable)]*n)

if __name__ == '__main__':
    cols = #list of all columns in the data set / header 
    cols = cols.split(',')
    fields = #list of fields to parse 
    fields = fields.split(',')

    csv = open('0000.csv','rU')
    pool = multiprocessing.Pool(6)

    c = 0
    lc = 0
    jobs = []
    for chunk in grouper(100, csv.readlines()):
        c += 1
        jobs.append(chunk)
    print '%d chunks' % c 

    stime = time.time()
    rc = pool.map(process_chunk, jobs)
    rtime = (time.time()-stime)+.001  # Avoid div/0

    for l in rc: lc += l
    print("END: %d/s" % int(lc/rtime))