========================================== Kontroll-Pipeline mit atomaren Nachrichten [TOC] ========================================== Die erarbeitete Lösung ist nicht wirklich geeignet, wenn die einzelnen Worker prinzipiell unbeschränkt Resultate erzeugen können und diese vom Hauptprozess zügig abgeholt werden müssen, damit eifrige Worker nicht unerwünscht blockiert werden. Wie kann hier der Hauptprozess darauf warten, dass irgendeiner der Worker etwas abzuliefern hat und dann genau aus dessen Pipeline lesen? Eine denkbare Lösung für dieses Problem wäre die Einrichtung einer zusätzlichen Kontroll-Pipeline: * Alle Worker-Prozesse haben das schreibende Ende der Kontroll-Pipeline. * Der Hauptprozess behält das lesende Ende der Kontroll-Pipeline. * Wenn immer ein Worker-Prozess ein Resultat über die reguläre Pipeline eingereicht hat (im Umfang so beschränkt, dass eine Blockierung ausgeschlossen ist), dann wird über die Kontroll-Pipeline der Prozess-Index des Worker-Prozesses binär geschrieben. Dies erfolgt atomar. * Der Hauptprozess muss dann nur noch die Kontroll-Pipeline betrachten. Wenn immer ein Prozess-Index einzulesen ist, sollte die entsprechende individuelle Pipeline geleert werden. Damit der Hauptprozess hier nicht versehentlich blockiert wird, empfiehlt es sich, mit dem Flag `O_NONBLOCK` zu arbeiten. * Sobald das Eingabe-Ende bei der Kontroll-Pipeline erkannt wird, wissen wir, dass alle Worker-Prozesse beendet sind. Aufgabe ======= Passen Sie Ihre Lösung aus dem letzten Schritt an das neue Schema an: * Der _JobHandler_ erhält einen weiteren Parameter mit dem Dateideskriptor zum schreibenden Ende der Kontroll-Pipeline: ---- CODE (type=c) --------------------------------------------------------- /* function to be executed by a worker, where id is an integer in [0, # workers) out_fd is the writing end of the pipeline for the results, and control_fd the writing end of the control pipe */ typedef void (*JobHandler)(unsigned int id, int out_fd, int control_fd); ---------------------------------------------------------------------------- * Die das Schema aufsetzende Funktion liefert nicht mehr _bool_ zurück, sondern den lesenden Dateideskriptor der Kontroll-Pipeline, der im Fehlerfall negativ ist: ---- CODE (type=c) --------------------------------------------------------- int spawn_workers(Worker workers[], unsigned int number_of_workers, JobHandler handler); ---------------------------------------------------------------------------- Hierbei sollten die an den Hauptprozess zur Verfügung gestellten lesenden Enden der Pipelines mit den Resultaten der einzelnen Worker alle auf `O_NONBLOCK` konfiguriert werden. So geht das: ---- CODE (type=c) --------------------------------------------------------- int flags = fcntl(fd, F_GETFL) | O_NONBLOCK; fcntl(fd, F_SETFL, flags); ---------------------------------------------------------------------------- Hierfür wird `#include ` benötigt. * Für den Test sollte die von den Worker-Prozessen aufgerufene Funktion nicht nur eine Testausgabe liefern, sondern viele, wobei es sinnvoll ist, mit _poll_ kleinere zufällige Zeitverzögerungen vorzunehmen: ---- CODE (type=c) --------------------------------------------------------- void do_sth(unsigned int id, int out_fd, int control_fd) { srand(getpid()); outbuf out = {out_fd}; unsigned int count = 0; for(;;) { outbuf_printf(&out, "Greetings #%u from worker %u!\n", ++count, id); outbuf_flush(&out); /* FIXME: signal to the control_fd that we have delivered sth */ /* decide when we stop this */ if (rand() % 5 == 0) break; /* sleep a short amount of time */ int timeout = 100 + rand() % 500; poll(0, 0, timeout); } } ---------------------------------------------------------------------------- Für die Funktion _poll_ wird `#include ` benötigt; die Deklarationen für den Pseudo-Zufallszahlengenerator kommen mit `#include `. * Passen Sie _main_ wie oben beschrieben an. :navigate: up -> doc:index back -> doc:session04/page02 next -> doc:session04/page04