How to launch your AI startup in 30 days? Register for free here
Request access

The best introduction to MPI (Message Passing Interface) with python

_

The Message Passing Interface abbreviated as MPI is a standard for parallel and distributed computing. It is a communication protocol that helps to make a computation on different computers within a network… In this environment, computers launch processes that run in parallel, and they exchange data by passing messages from one to another.  The most recent version of the standard is MPI 3.1 from 2015. And there is an open-source implementation of this standard called openMPI that supports a lot of operating systems and programming languages such as Java, C, C++, Fortran and of course Python. As we introduce threads with python, we will use MPI with a python library called MPI4py.

Environment setup

The most annoying part of MPI is actually the setting up. We will be testing everything on our local machine but it is more useful in a completely distributed environment. 

Depending on your environment, installing MPI may vary. 

If you are on windows, follow this link, download the .exe and install it. When the installation is complete, add the path C:\Program Files\Microsoft MPI\bin to the PATH environment variable. And we are good to go.

If you are on Linux,  you can install it by running 

sudo apt install openmpi-bin openmpi-common openmpi-doc libopenmpi-dev

 And on Mac Os it is actually a simple process:

brew install open-mpi


After that, we can start to configure our environment.

In a network configuration, we need to set up password-less ssh on every computer we will use for our computation. and we install mpi on all those machines. You can follow this link to set up password-less ssh. 

And we can run an MPI program on a LAN with this command:

mpirun -np 5 -hosts ip_computer_1,ip_computer_2 python path_to_python_file.py

But for testing purposes, we will be working on a local computer. And in this case, to run a program we will use :

mpirun -np 5 python path_to_python_file.py

Global concept

In an MPI environment, every computer has an ID. It helps to identify who you send a message to and who is receiving it. There is a master and the slaves. The master ID or rank is 0. He is responsible for orchestrating the global communication scenario. The other process has ranks beginning at 1.

There are some general classes, methods and constants that are almost present in every implementation of MPI. Those are:

  • COMM_WORLD: default communicator, It represents the global environment setup to exchange messages
  • Finalize() : Shutdown the MPI system
  • GetSize(): Return the number of processes in the group.
  • GetRank(): Get the ID or rank of the current process(between 0 to size-1)

Installation of the python MPI API (mpi4py)

To install the interface of MPI in python, we can use multiple ways: the most simple one is to install it using pip:

pip install mpi4py

To verify everything is ok, let’s write a hello world program.

#!/usr/bin/env python3
 
from mpi4py import MPI
 
comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()
 
print("Hello world from worker ", worker, " of ", size)

And the result is :

  • Hello world from worker  1  of  4
  • Hello world from worker  2  of  4
  • Hello world from worker  3  of  4
  • Hello world from worker  0  of  4

2 – Point to Point communication

In a communication process, we have at least two actors (even if it is possible to have one side of communication with MPI. We will come back to it later). In point-to-point communication, we have exactly two processes communicating together. MPI provides us with a really simple API to perform communication between two processes. In the actual case, we have one source and a destination for our message. Those actions are either synchronous (which means the receiver will block on the receiving instruction until he actually receives something. And the sender will wait until the message has been sent). Or asynchronous/Non-blocking and (in that case the API returns a status API and not the data directly. And then the execution continued. We can then check on the status object to know if the message has been received and then get the data)

Here are the Communicator methods used in Point to Point communication:

  • send: This method is used by the sender to send a message to a specific process in the environment with the ID specified by the recvID parameter
  • receive: This message is a blocking method used by the receiver to wait for a message from a process specified by the SendID parameter

They are used when you really want to assign a specific task to each process in your environment or for a complex workflow. Some tasks somehow force you to use this routine. We can cite for example a pipeline flow where we have a chained dependency between results and each process will have to wait for a result of a specific computer. 

Let’s take a simple example: 

We have 5 computers and we want to create a pipeline where one computer computes something and gives the result to the next computer. So we start with 2 and each computer takes the number from computer rank-1, doubles it and sends it to the computer with rank+1.

We must have this setup:

Example of point-to-point communication

Let’s code that:

#!/usr/bin/env python3

import time
from mpi4py import MPI

comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()

initial = 2
if worker == 0:
    print(size)
    comm.send(initial, dest=worker+1)
elif worker == size-1:
    rec = comm.recv(source=worker-1)
    print(rec*2)
else:
    rec = comm.recv(source=worker-1)
    comm.send(rec*2, dest=worker+1)
    print(worker, rec)

We have  8 computers and we want to create a data cleaning pipeline using all those computers. What we want is to retrieve data from Twitter chunk after chunk, give each computer a role in the processing chain and the last computer will save a perfectly cleaned and ready-to-use dataset for our ML or NLP task. 

So we may have the following setup:

  • The first worker with rank 0 retrieves the data chunk after chunk from the source. And send each chunk to the next worker with rank 1 and the worker from rank 1 remove emojis, tokenize the text and send it to worker 2, worker 2 cleans the text by removing links, abbreviations, and numbers, … and sent it to worker 3, worker 3 remove stopwords (If you don’t know much about it don’t worry it is not important in this exercise) and send to worker 4, worker 4 open the file called cleandataset.txt and append the actually cleaned chunk to the file and then close it. this is not a technology made for pipeline creation but that example can help to show you the computer communication process

3 – Collective Communication 

In collective communication, we have a group of workers who can either send or receive information from one worker or a group of workers. In this case, we have those methods:

Broadcasting:

This is used when we want to send the same message to every worker. For example, a learning rate for the computation of the gradient.

# Broadcast
from mpi4py import MPI

comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()

if worker == 0:
    data = {"data": "for everyone", "something": [1,2,3,4,5]}
else:
    data = None
data = comm.bcast(data, root=0)

print(worker, data)
broadcast MPI

Reduce:

Like the reduce function from programming languages, this reduces aggregate data from all processes. It is used in operations like finding the max in an array or sum in an array, … in our case, the receiver will receive the data from every worker and aggregate them using a predefined or custom operation. For example MPI.SUM, MPI.PROD, MPI.MAX, MPI.MIN, …

# Reduction
from mpi4py import MPI

def reduce_func(a,b):
    return a*b

comm = MPI.COMM_WORLD
size = comm.Get_size()
worker = comm.Get_rank()

data = comm.reduce(worker, op=reduce_func, root=0)

print(worker, worker)

if worker == 0:
    print("final result ",data)
Reduce MPI

Scatter:

Distributed data equally to different workers. For example, if we have an array containing 4 items and 5 workers. The sender will send each item to one worker(see image)

#Scattering
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
worker = comm.Get_rank()

if worker == 0:
    data = [{"data for "+str(i): i} for i in range(0, size)]
    print(data)
else:
    data = None
data = comm.scatter(data, root=0)
print(worker, data)
Scatter MPI

Gather:

This is the opposite of Scatter. and his main role is to collect part of the data from different processes (see image)

# Gathering
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.Get_size()
worker = comm.Get_rank()

data = comm.gather(worker**3, root=0)

print(worker, worker**3)

if worker == 0:
    print("final result ", data)
Gather MPI

Let’s take some examples to illustrate what we are talking about

4 – More

With MPI we can achieve much more things than what we have introduced earlier. There are concepts like One-side communications, Shared memory (interaction between process and threads), Parallel I/O, and much more that you can document yourself concerning MPI and its capabilities.

Let's Innovate together for a better future.

We have the knowledge and the infrastructure to build, deploy and monitor Ai solutions for any of your needs.

Contact us