Zweidimensionale Organisation von Prozessen

Die Partitionierung eines Problems auf einzelne Prozesse und deren Kommunikationsbeziehungen kann als Graph repräsentiert werden, wobei die Prozesse die Knoten und die Kommunikationsbeziehungen die Kanten repräsentieren. Der Graph ist normalerweise ungerichtet, weil zumindest die zugrundeliegenden Kommunikationsarchitekturen und das Protokoll bidirektional sind.

Da die Bandbreiten und Latenzzeiten zwischen einzelnen rechnenden Knoten nicht überall gleich sind, ist es sinnvoll, die Aufteilung der Prozesse auf Knoten so zu organisieren, dass die Kanten möglichst weitgehend auf günstigere Kommunikationsverbindungen gelegt werden. Bei Infiniband spielt die Organisation kaum eine Rolle, es sei denn, es liegt eine Zwei-Ebenen-Architektur vor wie beispielsweise bei SuperMUC in München. Bei MP-Systemen mit gemeinsamen Speicher ist es günstiger, wenn miteinander kommunizierende Prozesse auf Kernen des gleichen Prozessors laufen, da diese typischerweise einen Cache gemeinsam nutzen können und somit der Umweg über den langsamen Hauptspeicher vermieden wird. Manche Cluster-Systeme (wie beispielsweise Titan) sind in einem dreidimensionalen Torus organisiert, bei dem die Nachbarschaft eine wichtige Rolle spielt.

MPI bietet die Möglichkeit, beliebige Kommunikationsgraphen zu deklarieren. Zusätzlich unterstützt bzw. vereinfacht MPI die Deklarationen \(n\)-dimensionaler Gitterstrukturen, die in jeder Dimension mit oder ohne Ringstrukturen konfiguriert werden können. Entsprechend sind im eindimensionalen einfache Ketten oder Ringe möglich und im zweidimensionalen Fall Matrizen, Zylinder oder Tori. Dies eröffnet MPI die Möglichkeit, eine geeignete Zuordnung von Prozessen auf Prozessoren vorzunehmen. Ferner lassen sich über entsprechende MPI-Funktionen die Kommunikationsnachbarn eines Prozesses ermitteln. Grundsätzlich ist eine Kommunikation abseits des vorgegebenen Kommunikationsgraphen möglich. Nur bietet diese möglicherweise höhere Latenzzeiten und/oder niedrigere Bandbreiten.

In unserem Kontext ist zunächst die Organisation der Prozesse in einem zweidimensionalen Gitter interessant. Dies könnte so erfolgen:

int dims[2] = {0, 0}; int periods[2] = {false, false};
MPI_Dims_create(nof_processes, 2, dims);
MPI_Comm grid;
MPI_Cart_create(MPI_COMM_WORLD,
   2,        // number of dimensions
   dims,     // actual dimensions
   periods,  // both dimensions are non-periodical
   true,     // reorder is permitted
   &grid     // newly created communication domain
);
MPI_Comm_rank(MPI_COMM_WORLD, &rank); // update rank (could have changed)

In dem Array dims geben wir die gewünschte Dimensionierung vor. Wenn wir hier 0 angeben, überlassen wir die Dimensionierung MPI. Alternativ könnten wir hier auch bereits Teiler von nof_processes angeben. MPI_Dims_create führt nun die zweidimensionale Dimensionierung durch. Dabei wird das Array dims überschrieben.

MPI_Dims_create sucht nach einer möglichst guten Verteilung auf Basis der zur Verfügung stehenden Teiler von nof_processes. Sollte nof_processes prim sein, würde dabei eine sehr ungünstige Aufteilung \(1 \times nof_processes\) entstehen.

Mit MPI_Cart_create erzeugen wir eine neue Kommunikationsdomäne mit dem Prozessen aus dem ersten Parameter, hier MPI_COMM_WORLD. Mit periods wird festgelegt, ob die jeweilige Dimension ringförmig anzulegen ist. Bei dem Jacobi-Verfahren benötigen wir das nicht, entsprechend setzen wir hier beide Werte auf false.

Wenn eine Umordnung zulässig ist (fünfter Parameter), dann kann sich der rank jeder der Prozesse auch innerhalb von MPI_COMM_WORLD verändern. Dies erlaubt MPI die Umorganisation aller Prozesse auf der zur Verfügung stehenden Netzwerkstruktur, die die Kommunikationstopologie möglichst gut unterstützt. Wenn eine Umordnung zugelassen wird, muss anschließend der rank erneut abgerufen werden, da dieser sich verändert haben könnte.

Aufgabe

Analog zu den vorherigen zeilenweisen operierenden Scatter- und Gather-Operationen lohnt es sich, dies auch zwei-dimensional blockweise bei Matrizen durchzuführen. Es gibt hier allerdings keine MPI-Operationen, die dies unmittelbar stützen, da sowohl MPI_Scatterv als auch MPI_Gatherv zwar unterschiedliche Quantitäten pro Prozess zulassen, jedoch auf einheitliche Typen bestehen. Das ist bei einer zweidimensionalen Matrixaufteilung jedoch nicht mehr der Fall, da wir dort bis zu vier verschiedene Typen haben können: Regulärer Fall, rechter Rand, unterer Rand, linke untere Ecke.

Entsprechend lohnt es sich, Scatter- und Gather-Operationen für Matrizen selbst zu entwickeln, die allerdings auf der Seite des Root-Prozesses alle Kommunikationsoperationen mit Hilfe der nicht-blockierenden Operationen parallelisieren sollte. Ebenso ist zu beachten, dass der Root-Prozess auch immer einen Teilblock an sich selbst verteilt bzw. wieder zurückkopiert -- hierfür wird keine Kommunikation benötigt, sondern nur eine reguläre Matrix-Kopieroperation (hpc::matvec::copy).

Entwickeln Sie die Funktionen scatter_by_block und gather_by_block mit den folgenden Signaturen:

template<typename MA, typename MB>
typename std::enable_if<
   hpc::matvec::IsGeMatrix<MA>::value && hpc::matvec::IsGeMatrix<MB>::value &&
      std::is_same<typename MA::ElementType, typename MB::ElementType>::value,
   int>::type
scatter_by_block(const MA& A, MB& B, int root,
      MPI_Comm grid, int dims[2], int coords[2]) {
   /* ... */
}

template<typename MA, typename MB>
typename std::enable_if<
   hpc::matvec::IsGeMatrix<MA>::value && hpc::matvec::IsGeMatrix<MB>::value &&
      std::is_same<typename MA::ElementType, typename MB::ElementType>::value,
   int>::type
gather_by_block(const MA& A, MB& B, int root,
      MPI_Comm grid, int dims[2], int coords[2]) {
   /* ... */
}

Vorlage

Zum Testen steht eine Vorlage zur Verfügung:

#include <mpi.h>
#include <hpc/mpi/matrix.h>
#include <hpc/matvec/gematrix.h>
#include <hpc/matvec/print.h>
#include <hpc/matvec/apply.h>

int main(int argc, char** argv) {
   MPI_Init(&argc, &argv);

   int nof_processes; MPI_Comm_size(MPI_COMM_WORLD, &nof_processes);
   int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank);

   using namespace hpc::matvec;
   using namespace hpc::mpi;
   using namespace hpc::aux;

   using Matrix = GeMatrix<double>;
   using Index = typename Matrix::Index;
   int share = 3;
   int num_rows = nof_processes * share + 1;
   int num_cols = nof_processes * share + 2;

   Matrix A(num_rows, num_cols); /* entire matrix */

   /* create two-dimensional Cartesian grid for our prcesses */
   int dims[2] = {0, 0}; int periods[2] = {false, false};
   MPI_Dims_create(nof_processes, 2, dims);
   MPI_Comm grid;
   MPI_Cart_create(MPI_COMM_WORLD,
      2,        // number of dimensions
      dims,     // actual dimensions
      periods,  // both dimensions are non-periodical
      true,     // reorder is permitted
      &grid     // newly created communication domain
   );
   MPI_Comm_rank(MPI_COMM_WORLD, &rank); // update rank (could have changed)

   /* get our position within the grid */
   int coords[2];
   MPI_Cart_coords(grid, rank, 2, coords);
   Slices<int> rows(dims[0], A.numRows);
   Slices<int> columns(dims[1], A.numCols);

   Matrix B(rows.size(coords[0]), columns.size(coords[1]),
      StorageOrder::RowMajor);

   if (rank == 0) {
      apply(A, [](double& val, Index i, Index j) -> void {
         val = i * 100 + j;
      });
   }

   scatter_by_block(A, B, 0, grid, dims, coords);
   apply(B, [=](double& val, Index i, Index j) -> void {
      val += 10000 * (rank + 1);
   });
   gather_by_block(B, A, 0, grid, dims, coords);

   MPI_Finalize();

   if (rank == 0) {
      print(A, "A");
   }
}