Fork-and-Join-Pattern mit Pipelines
Content |
Wenn wir das Fork-and-Join-Pattern zur Parallelisierung nutzen, müssen insbesondere folgende drei Teilprobleme gelöst werden:
-
Wie werden die Aufgaben aufgeteilt und an die einzelnen Prozesse kommuniziert?
-
Wie werden die Ergebnisse zu den Teilaufgaben an den Hauptprozess übermittelt?
-
Wie erfolgt die Synchronisierung?
Charakteristisch für Fork-and-Join ist, dass eine Aufgabenverteilung nur einmal zu Beginn stattfindet. Dies ist somit in Verbindung mit fork einfach zu lösen, weil die Aufteilung der Aufgaben vor dem fork stattfinden kann und die benötigten Daten anschließend vererbt werden. Im Grunde muss jeder Prozess danach nur seinen Prozess-Index kennen. Dieser Index kann zum Beispiel für ein Array genutzt werden mit weiteren Daten.
Für die Übermittlung der Ergebnisse gibt es bei unabhängigen Prozessen prinzipiell zwei Möglichkeiten:
-
Wir arbeiten mit einem gemeinsamen Speicherbereich, der mit mmap angelegt worden ist und der Option MAP_SHARED über fork hinweg vererbt werden kann. Der Nachteil ist hier, dass dieser Speicherbereich in der Größe begrenzt ist. Wenn die Ergebnisse im Umfang nicht offensichtlich beschränkt ist, lässt sich das nicht ohne ein aufwendiges Protokoll lösen, bei dem die Ergebnisse „häppchenweise“ geliefert werden und deswegen eine Synchronisierung auch zwischendrin stattfinden muss.
-
Wir nutzen Pipelines zu jedem der \(n\) Worker-Prozesse, bei der die Worker das schreibende Ende haben (um ihr Ergebnis hineinzuschreiben) und der Hauptprozess das lesende Ende. Der Vorteil liegt in der fehlenden Beschränkung des Umfangs. Nachteilhaft ist hier, dass Worker-Prozesse unter Umständen schlafen gelegt werden, wenn der Hauptprozess nicht rechtzeitig die Ergebnisse bei einer gefüllten Pipeline wegliest.
Aufgabe
Gegeben seien folgende Datentypen:
/* function to be executed by a worker, where id is an integer in [0, # workers) and fd the writing end of the pipeline */ typedef void (*JobHandler)(unsigned int id, int fd); typedef struct Worker { pid_t pid; int fd; /* reading end of the pipeline */ } Worker;
JobHandler ist ein Funktionszeiger auf eine Funktion, die von einem Worker-Prozess auszuführen ist. Der erste Parameter spezifiziert den Prozess-Index und der zweite Parameter liefert den Datendeskriptor zum schreibenden Ende der Pipeline, die mit dem Hauptprozess verbunden ist.
Zu entwickeln ist folgende Funktion, die eine gegebene Zahl von Worker-Prozessen erzeugt und für diese jeweils handler aufruft:
bool spawn_workers(Worker workers[], unsigned int number_of_workers, JobHandler handler);
Hierbei sind im Array workers die lesenden Enden und die Prozess-IDs der Worker zu hinterlassen. Im Erfolgsfalle ist true zurückzuliefern, ansonsten false.
Bitte testen Sie Ihre Lösung nicht auf unseren Servern wie etwa der Theon oder der Thales.
Vorlage
#include <stdbool.h> #include <stdio.h> #include <stdlib.h> #include <sys/wait.h> #include <unistd.h> #include <afblib/outbuf_printf.h> /* function to be executed by a worker, where id is an integer in [0, # workers) and fd the writing end of the pipeline */ typedef void (*JobHandler)(unsigned int id, int fd); typedef struct Worker { pid_t pid; int fd; /* reading end of the pipeline */ } Worker; bool spawn_workers(Worker workers[], unsigned int number_of_workers, JobHandler handler) { /* FIXME */ } void do_sth(unsigned int id, int fd) { outbuf out = {fd}; outbuf_printf(&out, "Greetings from worker %u!\n", id); outbuf_flush(&out); } bool copy(int in, int out) { char buf[8192]; ssize_t nbytes; while ((nbytes = read(in, buf, sizeof buf)) > 0) { ssize_t written = 0; while (written < nbytes) { ssize_t count = write(out, buf + written, nbytes - written); if (count <= 0) return false; written += count; } } return nbytes == 0; } #define WORKERS (10) int main() { Worker workers[WORKERS]; if (!spawn_workers(workers, WORKERS, do_sth)) { perror("spawn_workers"); exit(1); } for (unsigned int i = 0; i < WORKERS; ++i) { copy(workers[i].fd, 1); close(workers[i].fd); int wstat; waitpid(workers[i].pid, &wstat, 0); } }
Beachten Sie, dass Sie beim Übersetzen die Bibliotheken „-lafb -lowfat“ angeben müssen.