Simple Python parallelism

1 Scott S. Jul 30, 2014

Let's say you have a function that's slow and time-consuming. It's too intensive and complex to run on the GPU (with it's thousand-ish cores) but the single core Python uses isn't enough. Your machine has eight cores and you want to use them.

Well, how do you use them? Python's GIL makes it difficult. Luckily, much more knowledgeable people have spend much more time getting around the GIL lock and have made many modules to tackle this problem. But looking at the docs for some of the modules, you get lost in threads, processes and coroutines (or at least I do).

Even the multiprocessing package doesn't seem to have a good solution. Its documentation isn't quite clear and it's hard to see where speedups might lie. It turns out the map function provides the speedups you're looking for. So, let's look at the documentation:

In [4]: from multiprocessing import Pool

In [5]: pool = Pool()

In [6]: pool.<tab>
pool.Process         pool.close           pool.join            pool.terminate
pool.apply           pool.imap            pool.map
pool.apply_async     pool.imap_unordered  pool.map_async

In [6]: pool.map?
Type:        instancemethod
String form: <bound method Pool.map of <multiprocessing.pool.Pool object at 0x103855ed0>>
File:        /Users/scott/anaconda/python.app/Contents/lib/python2.7/multiprocessing/pool.py
Definition:  pool.map(self, func, iterable, chunksize=None)
Docstring:   Equivalent of `map()` builtin

In [7]: map?
Type:        builtin_function_or_method
String form: <built-in function map>
Namespace:   Python builtin
Docstring:
map(function, sequence[, sequence, ...]) -> list

Return a list of the results of applying the function to the items of
the argument sequence(s). If more than one sequence is given, the
function is called with an argument list consisting of the corresponding
item of each sequence, substituting None for missing values when not all
sequences have the same length.  If the function is None, return a list of
the items of the sequence (or a list of tuples if more than one sequence).

In [8]:

That sure seems close to what we're looking for. We want to apply a function to a range of inputs. The docs are a tad lacking but we can rightfully assume that the multiprocessing module will speed up our code.

But you wonder how easy this is to use. The other toolboxes you've seen are a mess, and you (fairly) expect this toolbox to be the same.

def f(x):
    # complicated processing
    return x+1

y_serial = []
x = range(100)
for i in x: y_serial += [f(x)]
y_parallel = pool.map(f, x) 
# y_serial == y_parallel!

To be clear, this function works for any list-type input. You can have a list of images you want to download, tuples as (x,y) coordinates or a bunch of floats for scientific data processing. Any type of variable works.

Running this code, we find that the serial result exactly matches the parallel result even with incredibly nitpicky floats. To be safe, we should call pool.close(); pool.join() after running this code: it can't really hurt, especially if our computation takes a loooong time.

But wait. We have a method to run a function in parallel that returns the exact same result. Let's do what all programmers should do and make it easy to call.

def easy_parallize(f, sequence):
    # I didn't see gains with .dummy; you might
    from multiprocessing import Pool
    pool = Pool(processes=8)
    #from multiprocessing.dummy import Pool
    #pool = Pool(16)

    # f is given sequence. guaranteed to be in order
    result = pool.map(f, sequence)
    cleaned = [x for x in result if not x is None]
    cleaned = asarray(cleaned)
    # not optimal but safe
    pool.close()
    pool.join()
    return cleaned

While this works in this ideal post, it doesn't work in the real world. Your function might take more than one input. To get around that, we use functool's partial.

def parallel_attribute(f):
    def easy_parallize(f, sequence):
        # I didn't see gains with .dummy; you might
        from multiprocessing import Pool
        pool = Pool(processes=8)
        #from multiprocessing.dummy import Pool
        #pool = Pool(16)

        # f is given sequence. Guaranteed to be in order
        result = pool.map(f, sequence)
        cleaned = [x for x in result if not x is None]
        cleaned = asarray(cleaned)
        # not optimal but safe
        pool.close()
        pool.join()
        return cleaned
    from functools import partial
    # This assumes f has one argument, fairly easy with Python's global scope
    return partial(easy_parallize, f)

There. Now we can can use some_function.parallel = easy_parallelize(some_function). Then, if we modify our functions accordingly, we can see speedups from this!

Using Python's global scope and nested definitions, it's pretty easy to modify our function. It's really just a wrapper to make this function have one argument.

def some_function_parallel(x, y, z):
    def some_function(x):
        # x is what we want to parallelize over
        # complicated computation
        return x+y+z
    return some_function(x, y, z)
some_function.parallel = easy_parallelize(some_function_parallel)

Running some test code in the real world on a 2012 Macbook Air, I saw speed results of about twice as fast, which makes sense for my 2 cores. On a 2010 iMac, I saw speedups of about 4 times, again making sense for the 4 cores that machine has. Now go and test it with your own machines!

1 comment


Or enter your name and Email
  • B BK 1 year ago
    Does this require any other modules not listed here? Where does the function "asarray" come from?