The Message Passing Interface abbreviated as MPI is a standard for parallel and distributed computing. It is a communication protocol that helps to make a computation on different computers within a network… In this environment, computers launch processes that run in parallel, and they exchange data by passing messages from one to another. The most recent version of the standard is MPI 3.1 from 2015. And there is an open-source implementation of this standard called openMPI that supports a lot of operating systems and programming languages such as Java, C, C++, Fortran and of course Python. As we introduce threads with python, we will use MPI with a python library called MPI4py.
The most annoying part of MPI is actually the setting up. We will be testing everything on our local machine but it is more useful in a completely distributed environment.
Depending on your environment, installing MPI may vary.
If you are on windows, follow this link, download the .exe and install it. When the installation is complete, add the path C:\Program Files\Microsoft MPI\bin to the PATH environment variable. And we are good to go.
If you are on Linux, you can install it by running
sudo apt install openmpi-bin openmpi-common openmpi-doc libopenmpi-dev
And on Mac Os it is actually a simple process:
brew install open-mpi
After that, we can start to configure our environment.
In a network configuration, we need to set up password-less ssh on every computer we will use for our computation. and we install mpi on all those machines. You can follow this link to set up password-less ssh.
And we can run an MPI program on a LAN with this command:
mpirun -np 5 -hosts ip_computer_1,ip_computer_2 python path_to_python_file.py
But for testing purposes, we will be working on a local computer. And in this case, to run a program we will use :
mpirun -np 5 python path_to_python_file.py
In an MPI environment, every computer has an ID. It helps to identify who you send a message to and who is receiving it. There is a master and the slaves. The master ID or rank is 0. He is responsible for orchestrating the global communication scenario. The other process has ranks beginning at 1.
There are some general classes, methods and constants that are almost present in every implementation of MPI. Those are:
To install the interface of MPI in python, we can use multiple ways: the most simple one is to install it using pip:
pip install mpi4py
To verify everything is ok, let’s write a hello world program.
#!/usr/bin/env python3
from mpi4py import MPI
comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()
print("Hello world from worker ", worker, " of ", size)
And the result is :
In a communication process, we have at least two actors (even if it is possible to have one side of communication with MPI. We will come back to it later). In point-to-point communication, we have exactly two processes communicating together. MPI provides us with a really simple API to perform communication between two processes. In the actual case, we have one source and a destination for our message. Those actions are either synchronous (which means the receiver will block on the receiving instruction until he actually receives something. And the sender will wait until the message has been sent). Or asynchronous/Non-blocking and (in that case the API returns a status API and not the data directly. And then the execution continued. We can then check on the status object to know if the message has been received and then get the data).
Here are the Communicator methods used in Point to Point communication:
They are used when you really want to assign a specific task to each process in your environment or for a complex workflow. Some tasks somehow force you to use this routine. We can cite for example a pipeline flow where we have a chained dependency between results and each process will have to wait for a result of a specific computer.
Let’s take a simple example:
We have 5 computers and we want to create a pipeline where one computer computes something and gives the result to the next computer. So we start with 2 and each computer takes the number from computer rank-1, doubles it and sends it to the computer with rank+1.
We must have this setup:
Let’s code that:
#!/usr/bin/env python3
import time
from mpi4py import MPI
comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()
initial = 2
if worker == 0:
print(size)
comm.send(initial, dest=worker+1)
elif worker == size-1:
rec = comm.recv(source=worker-1)
print(rec*2)
else:
rec = comm.recv(source=worker-1)
comm.send(rec*2, dest=worker+1)
print(worker, rec)
We have 8 computers and we want to create a data cleaning pipeline using all those computers. What we want is to retrieve data from Twitter chunk after chunk, give each computer a role in the processing chain and the last computer will save a perfectly cleaned and ready-to-use dataset for our ML or NLP task.
So we may have the following setup:
In collective communication, we have a group of workers who can either send or receive information from one worker or a group of workers. In this case, we have those methods:
This is used when we want to send the same message to every worker. For example, a learning rate for the computation of the gradient.
# Broadcast
from mpi4py import MPI
comm = MPI.COMM_WORLD
worker = comm.Get_rank()
size = comm.Get_size()
if worker == 0:
data = {"data": "for everyone", "something": [1,2,3,4,5]}
else:
data = None
data = comm.bcast(data, root=0)
print(worker, data)
Like the reduce function from programming languages, this reduces aggregate data from all processes. It is used in operations like finding the max in an array or sum in an array, … in our case, the receiver will receive the data from every worker and aggregate them using a predefined or custom operation. For example MPI.SUM, MPI.PROD, MPI.MAX, MPI.MIN, …
# Reduction
from mpi4py import MPI
def reduce_func(a,b):
return a*b
comm = MPI.COMM_WORLD
size = comm.Get_size()
worker = comm.Get_rank()
data = comm.reduce(worker, op=reduce_func, root=0)
print(worker, worker)
if worker == 0:
print("final result ",data)
Distributed data equally to different workers. For example, if we have an array containing 4 items and 5 workers. The sender will send each item to one worker(see image)
#Scattering
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
worker = comm.Get_rank()
if worker == 0:
data = [{"data for "+str(i): i} for i in range(0, size)]
print(data)
else:
data = None
data = comm.scatter(data, root=0)
print(worker, data)
This is the opposite of Scatter. and his main role is to collect part of the data from different processes (see image)
# Gathering
from mpi4py import MPI
comm = MPI.COMM_WORLD
size = comm.Get_size()
worker = comm.Get_rank()
data = comm.gather(worker**3, root=0)
print(worker, worker**3)
if worker == 0:
print("final result ", data)
Let’s take some examples to illustrate what we are talking about
With MPI we can achieve much more things than what we have introduced earlier. There are concepts like One-side communications, Shared memory (interaction between process and threads), Parallel I/O, and much more that you can document yourself concerning MPI and its capabilities.
We have the knowledge and the infrastructure to build, deploy and monitor Ai solutions for any of your needs.
Contact us