Fork-and-Join-Pattern mit Pipelines

Content

Wenn wir das Fork-and-Join-Pattern zur Parallelisierung nutzen, müssen insbesondere folgende drei Teilprobleme gelöst werden:

Charakteristisch für Fork-and-Join ist, dass eine Aufgabenverteilung nur einmal zu Beginn stattfindet. Dies ist somit in Verbindung mit fork einfach zu lösen, weil die Aufteilung der Aufgaben vor dem fork stattfinden kann und die benötigten Daten anschließend vererbt werden. Im Grunde muss jeder Prozess danach nur seinen Prozess-Index kennen. Dieser Index kann zum Beispiel für ein Array genutzt werden mit weiteren Daten.

Für die Übermittlung der Ergebnisse gibt es bei unabhängigen Prozessen prinzipiell zwei Möglichkeiten:

Aufgabe

Gegeben seien folgende Datentypen:

/* function to be executed by a worker,
   where id is an integer in [0, # workers)
   and fd the writing end of the pipeline */
typedef void (*JobHandler)(unsigned int id, int fd);

typedef struct Worker {
   pid_t pid;
   int fd; /* reading end of the pipeline */
} Worker;

JobHandler ist ein Funktionszeiger auf eine Funktion, die von einem Worker-Prozess auszuführen ist. Der erste Parameter spezifiziert den Prozess-Index und der zweite Parameter liefert den Datendeskriptor zum schreibenden Ende der Pipeline, die mit dem Hauptprozess verbunden ist.

Zu entwickeln ist folgende Funktion, die eine gegebene Zahl von Worker-Prozessen erzeugt und für diese jeweils handler aufruft:

bool spawn_workers(Worker workers[], unsigned int number_of_workers,
      JobHandler handler);

Hierbei sind im Array workers die lesenden Enden und die Prozess-IDs der Worker zu hinterlassen. Im Erfolgsfalle ist true zurückzuliefern, ansonsten false.

Bitte testen Sie Ihre Lösung nicht auf unseren Servern wie etwa der Theon oder der Thales.

Vorlage

#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <unistd.h>
#include <afblib/outbuf_printf.h>

/* function to be executed by a worker,
   where id is an integer in [0, # workers)
   and fd the writing end of the pipeline */
typedef void (*JobHandler)(unsigned int id, int fd);

typedef struct Worker {
   pid_t pid;
   int fd; /* reading end of the pipeline */
} Worker;

bool spawn_workers(Worker workers[], unsigned int number_of_workers,
      JobHandler handler) {
   /* FIXME */
}

void do_sth(unsigned int id, int fd) {
   outbuf out = {fd};
   outbuf_printf(&out, "Greetings from worker %u!\n", id);
   outbuf_flush(&out);
}

bool copy(int in, int out) {
   char buf[8192];
   ssize_t nbytes;
   while ((nbytes = read(in, buf, sizeof buf)) > 0) {
      ssize_t written = 0;
      while (written < nbytes) {
	 ssize_t count = write(out, buf + written, nbytes - written);
	 if (count <= 0) return false;
	 written += count;
      }
   }
   return nbytes == 0;
}

#define WORKERS (10)

int main() {
   Worker workers[WORKERS];
   if (!spawn_workers(workers, WORKERS, do_sth)) {
      perror("spawn_workers"); exit(1);
   }
   for (unsigned int i = 0; i < WORKERS; ++i) {
      copy(workers[i].fd, 1); close(workers[i].fd);
      int wstat; waitpid(workers[i].pid, &wstat, 0);
   }
}

Beachten Sie, dass Sie beim Übersetzen die Bibliotheken „-lafb -lowfat“ angeben müssen.