Give that tired serial algorithm an octane boost by converting it to run in SMP and distributed-memory environments.
In this article we will look at an example of how to turn a serial algorithm into one that has higher performance in symmetric multiprocessing (shared-memory), as well as distributed-memory environments. In order to fulfill this task, we will develop a simple application in three stages: a serial version, a multithreaded version and a distributed multithreaded version.
In addition to the theoretical aspects of parallel programming, we will discuss some of the practical problems encountered when programming. We have chosen to implement all of the examples in C++ and use the POSIX threads (pthreads) and MPI libraries for symmetric multiprocessing and distributed processing, respectively.
The problem we have chosen to explore is that of finding the number of primes in a specified range. The problem has the advantage of being both simple to comprehend and illustrative of the concepts involved with parallel programming.
In our implementation, we have chosen to represent a range of numbers by an object that has the ability to count the number of primes in its range (see Listing 1).
Here is an example of compiling and using the program:
bash$ g++ -o primes primes.cpp bash$ ./primes 0 10000 There were 1229 primes.
In order to improve the speed of our example on symmetric-multiprocessing (SMP) machines, we need to make use of threads. We would like to stick to our design in the previous example, which means we need to find a way for each object to have its own thread of execution. Unfortunately, mixing C++ and pthreads is nontrivial, as pthread_create( ) expects a function that has been linked with C-style linking, not C++. We have solved this problem by creating an accessory class and a static member function (see Listing 2).
The remainder of the CountPrimes object implementation is the same. There are three points to note: 1) The Threaded class is an abstract class. 2) The entry( ) function is a static member function, which means that it has knowledge of the details of the Threaded class but is not tied to a specific instance. It therefore does not go through name-mangling and can be used as a C-style function. This is the function pointer we will pass to pthread_create( ) along with a pointer to the object to be threaded. 3) The run( ) member function is pure virtual, and as such must be implemented by any class derived from Threaded. This function will be the main execution point of the derived class, and its return value will represent the result of the computation. In the case of our CountPrimes class, this function simply calculates the total for the range and returns it.
We would like to retain the usage form of our serial implementation, and therefore add only one extra parameter that controls the number of threads that will be spawned to complete the task. Because we do not know beforehand how many objects (threads) will be needed, we will allocate them dynamically (see Listing 3).
There are a few more subtleties in this example, so we will go through the code in a little more detail. First we set the default number of threads to two and then check to see if the user specified another value. We create a dynamic array of pthread_t that will store the thread ids and then create a dynamic array of pointers to CountPrimes objects. This is important because we need to instantiate each one with a different range. In fact, we could not create a static array of CountPrimes objects since there is no default constructor. This design forces us to use the object correctly.
Next is a loop that will spawn the individual threads with their respective ranges of numbers to check. Note that we have made no attempt at load balancing here. We will return to this concept later. The important point is that each CountPrimes object is instantiated dynamically, and its pointer is stored in the array created above. The thread is actually spawned through thread_create( ). We pass a pointer to the entry member function and a pointer to the object itself. The id of the spawned thread is stored in the thread id array.
Next we wait for the threads to finish computing their totals by using pthread_join( ) on the thread ids. Because we dynamically allocated space for the return value in run( ), we must de-allocate it here. Each thread's total is added to count.
Finally, the CountPrimes objects are de-alloacted, and both the thread id array and counter object pointer array are de-allocated. Here is an example of compiling and using the program:
bash$ g++ -o primes_threaded primes_threaded.cpp bash$ ./primes_threaded 0 10000 4 There were 1229 primes.
Message passing interface (MPI) is a standard API for implementing distributed programs. There are many advantages of using MPI, but the main one is that programs will be compatible at the source level regardless of the particular MPI implementation being used. For the rest of this discussion, we will assume the availability of a properly configured local area multicomputer (LAM) install, an MPI implementation from Notre Dame (see Resources).
A very common model used for distributed programming is the master/slave model. In this model, there is one process called the master, which creates work and distributes it to the slaves. The slaves respond to the master with their completed work and ask for more if it is available. This conceptually simple model works very well for problems that do not require a lot of synchronization and whose slaves can be completely autonomous. These types of problems are often referred to as embarrassingly parallel.
In order to build on our threaded implementation, we need to decide how to reformulate our implementation in terms of a master/slave model and add the required calls to MPI in order to distribute our problem and collect the results. Listing 4 shows the changes to main( ).
We need to call MPI_Init( ) at the beginning of our distributed program in order to connect to the multicomputer. The next two function calls establish our rank and the total number of computers that will be involved in the computation.
MPI will start the same program on every computer in the multicomputer. This is why we need to establish at runtime what our rank is so that we can decide if we are a master or a slave. Depending on our rank, we either call master( ) or slave( ).
After we have finished our computations, we must call MPI_Finalize( ) to release our connection to the multicomputer.
Our slave( ) function takes only one argument, namely the number of threads to use. This allows us to fully utilize the processing power of SMP machines in a cluster.
The purpose of the slave is to sit and wait for work, perform the work and then return the results. It will continue to do this until it receives a signal that there is no more work to do, at which point it will return (see Listing 5).
The bulk of the code in the slave( ) function is similar to main( ) in our threaded example. The only difference is how the slave gets the bounds it is supposed to count the primes in and how it returns those results.
The slave goes into an endless loop waiting for work from the master, which it gets via MPI_Recv(). This function gets two longs that are sent by the master and stores them in the bounds array. After receiving from the master, the slave checks the status of the message to see if the work is done (the KILL message), and if so, returns. Otherwise, we rename the variables so that we can use exactly the same code as in the threaded version. The only remaining step is to send our results back to the master via MPI_Send( ). Here we send back one long containing the count found by this slave.
The job of the master is slightly more complicated as it must decide how to break up the work to be sent out to the slaves and how to collect the results. The first part of the master sends the initial work units out to the slaves and waits for results to come back in. When the master receives a result, it sends another work unit out to the same process if there is still work to be done. After there is no more work to be sent out, each process is polled once more for any remaining results, and then each slave is told to quit (see Listing 6).
The make_work( ) function is responsible for deciding when the work is done and how to break it up. We have chosen a simple sequential model where the size of the chunks is determined by STEP_SIZE (see Listing 7).
The STEP_SIZE variable is key to controlling the load balancing between the machines. If it is too big, there is a possibility that some machines will remain idle, while a few machines deal with the numbers in the higher end of the range. If it is too small, then there will be too much communication overhead. These factors are generally easier to determine through experimentation. These details are further explored in the Performance section.
MPI programs are compiled with mpicc or mpiCC, depending on whether you are compiling C or C++ code respectively. To run the distributed program, you must first boot the multicomputer via lamboot, and then you can run your program using the mpirun command. When you finish an MPI session, you can shut down the multicomputer with wipe:
bash$ mpiCC -O -o primes_mpi primes_mpi.cpp -lpthread bash$ lamboot LAM 6.3.2/MPI 2 C++/ROMIO - University of Notre Dame bash$ mpirun -O -np 16 primes_mpi -- 0 10000000 There were 664579 primes. bash$ wipe
If you are having difficulty getting lamboot to run successfully, you can use the recon command to verify what may be causing you trouble. If recon fails, it is possible that you are not able to run commands on remote machines without typing a password. If you are using ssh, make sure you have set LAMRSH to reflect that:
bash$ export LAMRSH=`which ssh`The arguments to mpicc are essentially the same as those you would normally pass directly to your compiler. One exception is the -O to both mpicc and mpirun that specifies that the multicomputer is homogeneous and that endianness translations need not be performed. The -np argument to mpirun specifies the number of processes to start (usually the number of nodes in the multicomputer). All arguments after the double minus (--) are passed as arguments to the main program being run.
In order to demonstrate the effectiveness of parallel programming, we must show that the elapsed time (wall clock time) is lower for the parallel versions of our program. In general it will not be possible to get a 100% performance increase per node, unless the problem is coarse grained and requires little synchronization.
Our tests were performed on a cluster of 16 dual PIII 700MHz with 384MB of RAM. We ran the program to calculate the number of primes between 0 and 10,000,000. Here are the times for the three versions of our program developed so far:
Serial implementation on one node: 6:29.28 seconds.
Multithreaded implementation on one node: 3:24.24 seconds.
Distributed (and multithreaded) implementation on 16 nodes: 11.05 seconds.
These results show that we are getting a linear increase in performance per processor (32x speed improvement over serial version).
One of the biggest problems encountered when programming a multicomputer is that of keeping each computer, and each processor in SMP computers, as busy as possible. We would like to avoid having several machines sit idle while waiting for the results of another computation being performed on a separate machine or processor. This delicate art is known as load balancing.
While a complete discussion of load balancing is beyond the scope of this article, we can examine a few properties of the specific problem we are solving to try to learn how to improve our performance. The single function that performs the bulk of the computation in our example is the is_prime( ) function. Due to its nature, its time is proportional to the size of the input number. Consider how we are breaking up the problem in our threaded implementation when using two threads: we send half of the numbers to one thread and the other half of the numbers to the other thread. This is inherently unbalanced because we divide the numbers sequentially. The thread with the lower half of the numbers will complete much earlier than the thread with the upper half of the numbers, and hence one processor will sit idle. There are at least two approaches to fixing this particular problem: when dividing the range of numbers, we can send every other number to each thread, or we can simply use more threads, which will break up the problem into smaller chunks and rely more on the kernel thread scheduler to balance the load. This will only work to a certain point where the time spent scheduling will exceed the gain of splitting up the problem.
There is a much more robust approach to load balancing that we used for sending jobs to machines in the distributed implementation: send out smaller chunks of work to each machine and only send them new work when they have completed their initial work. We still need to worry a bit about the size of the chunks we send out (controlled by the STEP_SIZE variable in our implementation), or we will be increasing our network traffic without increasing our throughput. A similar approach could have been used to balance the threads but was not used for the sake of clarity.