Master/worker pattern using MPI

Content

The fork-and-join pattern provides a simple approach whenever the problem can be easily divided into subproblems that require a similar amount of computation. However, if the complexity cannot be forseen or if the individual processes run on multiple computers the master/worker pattern can be advantageous. The overall problem is like before divided into subproblems but not all subproblems are immediately distributed. Instead, the master hands out smaller subproblems whenever one of the workers is ready to work on it. Once a worker is finished, the results are sent back to the master who is then free to assign the next subproblem.

When eventually the master runs out of to be solved subproblems, the workers must be notified that they can stop working. In MPI, tags can be used to differentiate between messages that deliver a new subproblem and those that signal the end of work. But it appears simpler to use a count of 0 for the last message.

Exercise

One of the open problems of number theory is whether the frequency of prime constellations can be calculated. An empirical analysis can be helpful where prime tuples can be easily checked out over given intervals and this can be easily parallelized.

The source code below is an MPI application that expects an interval \([N_1, N_2]\) of natural numbers and a given prime constallation \(\left\{n_i\right\}_{i=1}^{k-1}\) with \(n_i < n_{i+1}\) of length \(k > 1\). The task is to find all prime constellations \((p, p+n_1, \dots p+n_{k-1})\) where \(N_1 \le p \le N_2\). Twin primes can be searched for using \(\left\{2\right\}\), prime quadruplets are specified as \(\left\{2, 6, 8\right\}\).

The source code distributes the entire interval uniformely among all workers. Modify the code towards a master/worker pattern where smaller subintervals are passed to the worker such that we can keep them all busy.

To be able to work with long prime numbers, the following code works already with integers of arbitrary size of the GMP library. In the lecture library below /home/numerik/pub/hpc/ws18/session05 you will find in hpc/gmp/integer.h a class named ExportInteger that allows to convert such a long number into an int array which can be easily transfered using MPI. This is done by the functions send_integer and receive_integer. The probalistic search of primes is implemented in primes.hpp and primes.cpp.

You will need the library options -lgmpxx -lgmp at the end of the link command.

#ifndef PRIMES_HPP
#define PRIMES_HPP

#include <hpc/gmp/integer.hpp>

bool search_prime_constellation(
	 hpc::gmp::Integer& start_interval,
	 hpc::gmp::Integer& end_interval,
	 unsigned int k,           // size of prime constellation
	 unsigned int offsets[],   // k-1 offsets
	 hpc::gmp::Integer& first_prime);    // found result (if returning true)

#endif
#include "primes.hpp"

using namespace hpc::gmp;

bool search_prime_constellation(
	 Integer& start_interval,
	 Integer& end_interval,
	 unsigned int k,         // size of prime constellation
	 unsigned int offsets[], // k-1 offsets
	 Integer& first_prime) {   // found result (if returning true)
   Integer p = start_interval;
   Integer q;
   bool found = false;
   for(;;) {
      // lookout for the next prime in the interval
      mpz_nextprime(p.get_mpz_t(), p.get_mpz_t());
      if (p > end_interval) break;
      // p is apparently prime, check for an constellation
      found = true;
      for (unsigned int i = 0; i < k-1; ++i) {
	 unsigned int offset = offsets[i];
	 q = p + offset;
	 if (mpz_probab_prime_p(q.get_mpz_t(), 10) < 1) {
	    found = false;
	    break; // not a prime
	 }
      }
      if (found) break;
   }
   if (found) {
      first_prime = p;
   }
   return found;
}
#include <cassert>
#include <iostream>
#include <mpi.h>
#include <printf.hpp>
#include <hpc/gmp/integer.hpp>
#include <hpc/aux/slices.hpp>
#include "primes.hpp"

using namespace hpc::aux;
using namespace hpc::gmp;

void send_integer(const Integer& value, int dest, int tag) {
   assert(dest >= 0);
   ExportedInteger exp_value(value);
   int len = (int) exp_value.length();
   int header[2] = {exp_value.sgn(), len};
   MPI_Send(header, 2, MPI_INT, dest, tag, MPI_COMM_WORLD);
   MPI_Send(exp_value.words, len, MPI_INT, dest, tag, MPI_COMM_WORLD);
}

void send_finish(int dest) {
   MPI_Send(nullptr, 0, MPI_INT, dest, 1, MPI_COMM_WORLD);
}

bool receive_integer(Integer& value, int& source, int& tag) {
   MPI_Status status;
   int header[2];
   MPI_Recv(header, 2, MPI_INT, source, tag, MPI_COMM_WORLD, &status);
   tag = status.MPI_TAG;
   source = status.MPI_SOURCE;
   if (tag) return false;
   int sgn = header[0]; unsigned int len = header[1];
   ExportedInteger exp_value(sgn, len);
   MPI_Recv(exp_value.words, len, MPI_INT,
      source, tag, MPI_COMM_WORLD, &status);
   value = exp_value.get();
   return true;
}

char* progname;

void usage() {
   fmt::printf(std::cerr, "Usage: %s N1 N2 {n_i}\n", progname);
   exit(1);
}

void primes_master(int nofworkers, int argc, char** argv) {
   progname = *argv++; --argc;
   if (argc < 3) usage();
   Integer start(*argv++); --argc;
   Integer end(*argv++); --argc;

   int k = argc + 1;
   unsigned int* offsets = new unsigned int[k-1];
   for (int i = 0; i < k-1; ++i) {
      offsets[i] = atoi(*argv++); --argc;
   }

   MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
   MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);

   for (int worker = 1; worker <= nofworkers; ++worker) {
      send_integer(start, worker, 0);
      send_integer(end, worker, 0);
   }
   int running_workers = nofworkers;
   while (running_workers > 0) {
      int source = MPI_ANY_SOURCE; int tag = MPI_ANY_TAG;
      Integer prime;
      bool ok = receive_integer(prime, source, tag);
      if (!ok) {
	 fmt::printf("%d has finished\n", source);
	 --running_workers; continue;
      }
      fmt::printf("%d: %d\n", source, prime);
   }
}

void primes_worker(int nofworkers, int rank) {
   int k;
   MPI_Bcast(&k, 1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);
   unsigned int* offsets = new unsigned int[k-1];
   MPI_Bcast(offsets, k-1, MPI_UNSIGNED, 0, MPI_COMM_WORLD);

   Integer global_start, global_end;
   int source = 0, tag = 0;
   receive_integer(global_start, source, tag);
   receive_integer(global_end, source, tag);

   UniformSlices<Integer> slices(nofworkers, global_end - global_start);
   Integer start = slices.offset(rank);
   Integer end = start + slices.size(rank);

   Integer prime;
   while (search_prime_constellation(start, end, k, offsets, prime)) {
      send_integer(prime, 0, 0);
      start = prime;
      start += 1;
   }
   send_finish(0);
}

int main(int argc, char** argv) {
   MPI_Init(&argc, &argv);

   int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank);
   int nofworkers; MPI_Comm_size(MPI_COMM_WORLD, &nofworkers);
   --nofworkers; assert(nofworkers > 0);

   if (rank == 0) {
      primes_master(nofworkers, argc, argv);
   } else {
      primes_worker(nofworkers, rank-1);
   }

   MPI_Finalize();
}
theon$ g++ -g -Wall -std=c++17 -I/home/numerik/pub/pp/ss19/lib -c primes.cpp
theon$ mpiCC -g -Wall -std=c++17 -I/home/numerik/pub/pp/ss19/lib -c mpi-primes.cpp
theon$ mpiCC -o mpi-primes mpi-primes.o primes.o -lgmpxx -lgmp
theon$ mpirun -np 4 mpi-primes 1 1000 2 6
1: 5
1: 11
1: 17
1: 41
1: 101
1: 107
3: 821
1: 191
1: 227
3: 857
3: 881
1: 311
1 has finished
2: 347
3 has finished
2: 461
2: 641
2 has finished
--------------------------------------------------------------------------
A system call failed during shared memory initialization that should
not have.  It is likely that your MPI job will now either abort or
experience performance degradation.

  Local host:  theon
  System call: unlink(2) /tmp/ompi.theon.120/pid.22751/1/vader_segment.theon.120.10940001.2
  Error:       No such file or directory (errno 2)
--------------------------------------------------------------------------
theon$ 
heim$ g++-8.3 -g -Wall -std=c++17 -I/home/numerik/pub/pp/ss19/lib -c primes.cpp
/usr/local/libexec/gcc/x86_64-pc-linux-gnu/8.3.0/cc1plus: error while loading shared libraries: libmpfr.so.4: cannot open shared object file: No such file or directory
heim$ OMPI_CXX=g++-8.3 mpiCC -g -Wall -std=c++17 -I/home/numerik/pub/pp/ss19/lib -c mpi-primes.cpp -Wno-literal-suffix
/usr/local/libexec/gcc/x86_64-pc-linux-gnu/8.3.0/cc1plus: error while loading shared libraries: libmpfr.so.4: cannot open shared object file: No such file or directory
heim$ OMPI_CXX=g++-8.3 mpiCC -o mpi-primes mpi-primes.o primes.o -lgmpxx -lgmp
/usr/bin/ld: /usr/bin/ld: DWARF error: can't find .debug_ranges section.
mpi-primes.o: in function `send_integer(__gmp_expr<__mpz_struct [1], __mpz_struct [1]> const&, int, int)':
mpi-primes.cpp:(.text+0x3f): undefined reference to `__assert_c99'
/usr/bin/ld: mpi-primes.o: in function `main':
mpi-primes.cpp:(.text+0x9c5): undefined reference to `__assert_c99'
/usr/bin/ld: mpi-primes.o: in function `hpc::aux::UniformSlices<__gmp_expr<__mpz_struct [1], __mpz_struct [1]> >::offset(__gmp_expr<__mpz_struct [1], __mpz_struct [1]>) const':
mpi-primes.cpp:(.text._ZNK3hpc3aux13UniformSlicesI10__gmp_exprIA1_12__mpz_structS4_EE6offsetES5_[_ZNK3hpc3aux13UniformSlicesI10__gmp_exprIA1_12__mpz_structS4_EE6offsetES5_]+0x40): undefined reference to `__assert_c99'
/usr/bin/ld: mpi-primes.o: in function `hpc::aux::UniformSlices<__gmp_expr<__mpz_struct [1], __mpz_struct [1]> >::size(__gmp_expr<__mpz_struct [1], __mpz_struct [1]>) const':
mpi-primes.cpp:(.text._ZNK3hpc3aux13UniformSlicesI10__gmp_exprIA1_12__mpz_structS4_EE4sizeES5_[_ZNK3hpc3aux13UniformSlicesI10__gmp_exprIA1_12__mpz_structS4_EE4sizeES5_]+0x41): undefined reference to `__assert_c99'
/usr/bin/ld: mpi-primes.o: in function `hpc::aux::BasicSlices<__gmp_expr<__mpz_struct [1], __mpz_struct [1]> >::BasicSlices(__gmp_expr<__mpz_struct [1], __mpz_struct [1]>, __gmp_expr<__mpz_struct [1], __mpz_struct [1]>, __gmp_expr<__mpz_struct [1], __mpz_struct [1]>)':
mpi-primes.cpp:(.text._ZN3hpc3aux11BasicSlicesI10__gmp_exprIA1_12__mpz_structS4_EEC2ES5_S5_S5_[_ZN3hpc3aux11BasicSlicesI10__gmp_exprIA1_12__mpz_structS4_EEC5ES5_S5_S5_]+0x60): undefined reference to `__assert_c99'
collect2: error: ld returned 1 exit status
heim$ mpirun -np 4 mpi-primes 1 1000 2 6
--------------------------------------------------------------------------
mpirun was unable to find the specified executable file, and therefore
did not launch the job.  This error was first reported for process
rank 0; it may have occurred for other processes as well.

NOTE: A common cause for this error is misspelling a mpirun command
      line parameter option (remember that mpirun interprets the first
      unrecognized command line token as the executable).

Node:       heim
Executable: mpi-primes
--------------------------------------------------------------------------
4 total processes failed to start
heim$