Parallel python programming is a mode of operation where the different task is executed simultaneously in multiple processors in the same computer. The Parallel processing is done to reduce the overall processing time.

Parallel python programming

The multiprocessing in python is used to run independent Parallel python programming by using different subprocesses. It will help you leverage the different types of multiple processors. It means that the processes can be run on different separate memory locations. After the end of this tutorial, you will know these things clearly.

  1. How you can also easily structure the code and understand the syntax to enable parallel processing using multiprocessing?
  2. How you can also implement synchronous and asynchronous parallel processing without much difficulty?
  3. Parallelize a Pandas DataFrame?
  4. Solve 3 different use cases with the multiprocessing.Pool() interface.

Also Read: Python execute shell command: How to run them

How many different maximum parallel processes can you run?

The maximum number of processes you can run at a particular time depends on how strong are the processors in your computer. Now if you don’t how many processors are present in your computer. You can easily check it by cpu_count() function in multiprocessing. Just type the following command and you will know.

import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

What are Asynchronous and Synchronous execution?

There are two types of execution in parallel processing

  • Synchronous
  • Asynchronous

Synchronous Execution:

It is a process in which processes are completed in the same way which it was started. It is achieved by locking the main program until all the respective programs have finished executing.

Whereas Asynchronous doesn’t work on locking. As a result the order of results get mixed up and the things are done a real quick.

Multiprocessing consists of two main objects to execute parallel function are as follows:

  • Pool Class
  • Process Class

1: Pool 

a: Synchronous execution

  • Pool.map() and Pool.starmap()
  • Pool.apply()

b: Asynchronous execution

  • Pool.map_async() and Pool.starmap_async()
  • Pool.apply_async())

2: Process Class

Let’s talk about a typical problem and implement parallelization using the above techniques. In this blog, we will also talk about the Pool class, also it is most convenient to use and serves most common practical applications.

Question: You have to count how many numbers exist between a given range in each row

In this problem you are given a 2D matrix. You have to count how many numbers are also present in a given range in each row. We have prepared a list below on how to do it.

import numpy as np
from time import time

# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()
data[:5]

Here is also a solution without parallelization:

Now let’s just see how long will it take without parallelization. For this, we will also iterate the function howmany_within_range() to see how many numbers are within range and returns the count.

# Solution Without Parallelization

def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

results = []
for row in data:
    results.append(howmany_within_range(row, minimum=4, maximum=8))

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

How you can also Parallelize any function as per your need?

The simple way how you can parallelize any function is easy. You just have to run a particular functions multiple times and also make it run  parallelly in different processors.

If you want to do this you just have to initialize a Pool with n number of processors. Now just pass the function you want to parallelize to one of Pools parallelization methods. Multiprocessing.Pool() provides the apply()map() and starmap() methods to make any function run in parallel.

Once it’s done both map and apply to take the particular function to be parallelized as the main argument. The only difference between the map and apply is that map can one iterable as an argument. Also, apply takes an argument and accepts the parameters passed to the ‘function-to-be-parallelized’ as an argument.

From the above description we can see that map() is really more suitable for easy and simpler iterables. The map() also does the job faster.

We will get to starmap() once we see how to parallelize howmany_within_range() function with apply() and map().

You can also Parallelize using Pool.apply()

This will help you parallelize the howmany_within_range() function using multiprocessing.Pool(). Just follow the program given down below and you will understand how it’s done.

# Parallelizing using Pool.apply()

import multiprocessing as mp

# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())

# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]

# Step 3: Don't forget to close
pool.close()    

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

How to Parallelize using Pool.map()

As we have already told you Pool.map() accepts only one iterable as argument. You can easily modify the function howmany_within_range just by setting the default from minimum to maximum. This will then create a new howmany_within_range_rowonly() function then it will only accept iterables of rows as inputs. We know this is not a nice example of map() but it will show you how it differs from apply.

# Parallelizing using Pool.map()
import multiprocessing as mp

# Redefine, with only 1 mandatory argument.
def howmany_within_range_rowonly(row, minimum=4, maximum=8):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

pool = mp.Pool(mp.cpu_count())

results = pool.map(howmany_within_range_rowonly, [row for row in data])

pool.close()

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

How to use Parallelizing Pool.starmap()

We saw in previous example how we have to redefine howmany_within_range function. To make couple of parameters take the default values. You can ask how you can avoid doing this?

Just like Pool.map()Pool.starmap() also accepts only one iterable as argument. But there is one basic difference is that each element in Pool.starmap() is also a iterable. So basically you can say that Pool.starmap() is a version of Pool.map() that accepts arguments.

# Parallelizing with Pool.starmap()
import multiprocessing as mp

pool = mp.Pool(mp.cpu_count())

results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])

pool.close()

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

Asynchronous Parallel python programming

There are different types of asynchronous equivalents like apply_async()map_async() and starmap_async(). They will easily let you execute different processes asynchronously. This means that the next process can start as soon as the previous one gets over regard for the starting order.

How to Parallelizing with Pool.apply_async()

apply_async() are very similar to the apply() you just have to provide a callback function that will tell you. How the computed result is stored.

A workaround for this is, we redefine a new howmany_within_range2() to accept and return the iteration number (i) as well and then sort the final results.

# Parallel processing with Pool.apply_async()

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)


# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)


# Step 3: Use loop to parallelize
for i, row in enumerate(data):
    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)

# Step 4: Close Pool and let all the processes complete    
pool.close()
pool.join()  # postpones the execution of next line of code until all processes in the queue are done.

# Step 5: Sort results [OPTIONAL]
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results_final[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

It is also possible to use apply_async() without using the call-back function. Now if you don’t provide a call-back function you get a list of pool.ApplyResult objects. It will contain output values from each process. Now you have to use this pool.ApplyResult.get() method to retrieve your final result.

# Parallel processing with Pool.apply_async() without callback function

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# call apply_async() without callback
result_objects = [pool.apply_async(howmany_within_range2, args=(i, row, 4, 8)) for i, row in enumerate(data)]

# result_objects is a list of pool.ApplyResult objects
results = [r.get()[1] for r in result_objects]

pool.close()
pool.join()
print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

Conclusion:

In this blog, we have shown different procedures and various methods and ways you can implement Parallel python programming. The procedure explained above is for using larger machines with many more processors. This is where you can see the benefits of using parallel processing. Hope you find this information useful. Thank you for the read.

Categorized in: