Two-dimensional organization of processes

Content

The partitioning of a problem among multiple processes and their communication network can be represented as graph where vertices represent processes and direct communication lines are represented by edges. The graph is not directed as the communication architecture and the protocol are bidirectional.

As the bandwidths and the latency periods are not necessarily uniform for all combination of nodes it is advisable to look for a mapping between processes to the available hardware resources that supports best the communication graph. If, for example, you work with multiple computers having each multiple cores, the communication over shared memory among cores of the same computer is significantly faster than a communication over an Ethernet connection. In case of clusters that use Infiniband over a single switch, the bandwidths and latency periods are uniform. However, in case of multi-layer structures the bandwith and the latency period depends on the actual communication path. Some cluster systems have a multi-dimensional network grid, possibly with a wrap-around (much like a torus) where neighborhood is of importance.

MPI allows you to declare arbitrary communication graphs. Common cases like \(n\)-dimensional grids where each dimension can be configured as ring are supported by a specialized operation. In a two-dimensional case, you can have matrix-like structures or a cylinder or a torus. When such a communication graph is specified, MPI can be asked to reorganize the processes such they fit best to the graph on a given platform. You are then still free to communicate outside the declared edges of the graph. However, these communication lines have potentially larger latency periods and lower bandwidths.

In the context of the Jacobi solver, it is helpful to work with a two-dimensional grid without rings. This could be done as follows:

int dims[2] = {0, 0}; int periods[2] = {false, false};
MPI_Dims_create(nof_processes, 2, dims);
MPI_Comm grid;
MPI_Cart_create(MPI_COMM_WORLD,
   2,        // number of dimensions
   dims,     // actual dimensions
   periods,  // both dimensions are non-periodical
   true,     // reorder is permitted
   &grid     // newly created communication domain
);
MPI_Comm_rank(grid, &rank); // update rank (could have changed)

The array dims specifies the dimensions. If we initialize them to 0, we delegate this to MPI. Alternatively, we could already specify divisors of nof_processes at this point. MPI_Dims_create looks for suitable dimensions and overwrite dims such that dims[0] * dims[1] == nof_processes holds. If nof_processes is a prime number, we would get a one-dimensional partitioning of \(1 \times nof\_processes\).

MPI_Cart_create creates a new communication domain with the processes of the first domain (MPI_COMM_WORLD in this case). The periods parameter tells whether we want to organize the respective dimension as ring. We do not need this for a Jacobi solver.

If the reorder parameter is set to true, MPI has the freedom to reorganize all processes of the new domain. In this case, the rank must be updated as this could have changed.

Exercise

As we had before row-wise operating scatter and gather operations, it appears straightforward to introduce block-wise operating scatter and gather operations. However, there are no MPI operations that support this directly as MPI_Scatterv and MPI_Gatherv accept varying counts per processor but insist on uniform types. In case of a two-dimensional partitioning of a matrix into blocks we get, however, up to four types: for regular cases, right border, lower border, and for the lower right corner.

Hence, we need to develop block-wise scatter and gather operations for matrices ourselves. As the root process shall parallelize its transfer operations, we need to use asynchronous transfer operations. And we must not forget that the root process distributes also a block to itself. For this we do not need transfer operations, a regular matrix copy operation using hpc::matvec::copy will do the job.

Develop the functions scatter_by_block and gather_by_block with the following signatures:

template<typename T,
   template<typename> class MA,
   template<typename> class MB,
   Require<Ge<MA<T>>, Ge<MB<T>>> = true>
int scatter_by_block(const MA<T>& A, MB<T>& B, int root,
      MPI_Comm grid, int overlap = 0) {
   /* ... */
}

template<typename T,
   template<typename> typename MA,
   template<typename> typename MB,
   Require<Ge<MA<T>>, Ge<MB<T>>> = true>
int gather_by_block(const MA<T>& A, MB<T>& B, int root,
      MPI_Comm grid, int overlap = 0) {
   /* ... */
}

Within the functions you will need to determine the dimensions of the grid. This can be done using MPI_Cart_get:

int dims[2]; int coords[2]; int periods[2];
MPI_Cart_get(grid, 2, dims, periods, coords);

Source code

Following program may be used for testing:

#include <mpi.h>
#include <hpc/matvec/gematrix.hpp>
#include <hpc/matvec/iterators.hpp>
#include <hpc/matvec/print.hpp>
#include <hpc/mpi/matrix.hpp>

int main(int argc, char** argv) {
   MPI_Init(&argc, &argv);

   int nof_processes; MPI_Comm_size(MPI_COMM_WORLD, &nof_processes);
   int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank);

   using namespace hpc::matvec;
   using namespace hpc::mpi;
   using namespace hpc::aux;

   using Matrix = GeMatrix<double>;
   int share = 3;
   int num_rows = nof_processes * share + 1;
   int num_cols = nof_processes * share + 2;

   Matrix A(num_rows, num_cols); /* entire matrix */

   /* create two-dimensional Cartesian grid for our prcesses */
   int dims[2] = {0, 0}; int periods[2] = {false, false};
   MPI_Dims_create(nof_processes, 2, dims);
   MPI_Comm grid;
   MPI_Cart_create(MPI_COMM_WORLD,
      2,        // number of dimensions
      dims,     // actual dimensions
      periods,  // both dimensions are non-periodical
      true,     // reorder is permitted
      &grid     // newly created communication domain
   );
   MPI_Comm_rank(grid, &rank); // update rank (could have changed)

   /* get our position within the grid */
   int coords[2];
   MPI_Cart_coords(grid, rank, 2, coords);
   UniformSlices<int> rows(dims[0], A.numRows());
   UniformSlices<int> columns(dims[1], A.numCols());

   Matrix B(rows.size(coords[0]), columns.size(coords[1]), Order::RowMajor);

   if (rank == 0) {
      for (auto [i, j, Aij]: A) {
	 Aij = i * 100 + j;
      }
   }

   scatter_by_block(A, B, 0, grid);
   for (auto [i, j, Bij]: B) {
      Bij += 10000 * (rank + 1);
      (void) i; (void) j; // supress gcc warning
   }
   gather_by_block(B, A, 0, grid);

   MPI_Finalize();

   if (rank == 0) {
      print(A, " %6g");
   }
}