Master/Worker-Pattern
Wir haben bislang bei jeder parallelisierten Funktion nach dem Fork-and-Join-Pattern die gewünschte Zahl von Threads erzeugt, diese die jeweiligen Teilaufgaben rechnen lassen und schließlich auf das Ende der Threads mit der join-Methode gewartet.
Das Erzeugen und Abbauen von Threads ist jedoch nicht sehr billig. Dies demonstriert folgendes kleines Testprogramm, das die hierfür benötigte Zeit misst:
#include <cstdio> #include <thread> #include <hpc/aux/walltime.h> int main() { unsigned int counter = 0; unsigned int nof_threads = 1<<15; hpc::aux::WallTime<double> wall_time; wall_time.tic(); for (unsigned int i = 0; i < nof_threads; ++i) { auto t = std::thread([&]() { ++counter; }); t.join(); } auto t = wall_time.toc(); std::printf("avg time per thread creation = %.2f us\n", t / nof_threads * 1000000L); }
$shell> g++ -O3 -g -I/home/numerik/pub/hpc/session16 -std=c++11 -o threads threads.cpp $shell> ./threads avg time per thread creation = 33.30 us
Da die Zahl rechenintensiver Threads ohnehin zu Beginn festgelegt wird (wie beispielsweise durch das Abfragen von std::thread::hardware_concurrency()), erscheint es naheliegend, diese Threads zu Beginn zu erzeugen und dann die parallel auszuführenden Teilaufgaben an diese Threads zu delegieren.
Prinzipiell geht dies mit dem Master/Worker-Pattern, bei dem
-
der Master die parallel auszuführenden Teilaufgaben an die Worker delegiert und
-
die Worker auf Aufträge warten und diese dann ausführen.
Wie kann so ein Auftrag repräsentiert werden? Wenn die Aufträge alle von einheitlicher Natur sind, könnte diese durch eine entsprechende uniforme struct beschrieben werden. In der Praxis sehen die Aufträge aber sehr unterschiedlich aus. Selbst bei unseren bisherigen Beispielen würden wir gerne parallel Matrizen initialisieren, kopieren und miteinander multiplizieren.
Für die Unterstützung polymorpher Aufträge bieten sich in C++11 parameterlose Funktionsobjekte an (worunter alle Lambda-Ausdrücke fallen) und für einen Container eines polymorphen Funktionsobjekts empfiehlt sich std::function aus <functional>. Nur der Return-Typ müsste hier festgelegt werden. Im einfachsten Falle könnten wir hier mit std::function<void()> arbeiten.
Im folgenden Ansatz repräsentiert ein Objekt der Klasse Worker genau einen Worker und den zugehörigen Thread. Die Liste jobs dient der Verwaltung der Aufträge, die dieser Worker über die submit-Methode erhält:
#include <cassert> #include <condition_variable> #include <cstdio> #include <functional> #include <list> #include <mutex> #include <thread> #include <utility> struct Worker { public: using Job = std::function<void()>; Worker() { t = std::thread([=]() { process_jobs(); }); } void submit(Job job) { std::unique_lock<std::mutex> lock(mutex); jobs.push_back(std::move(job)); cv.notify_one(); } private: std::thread t; std::mutex mutex; std::condition_variable cv; std::list<Job> jobs; void process_jobs() { for(;;) { Job job; /* fetch job */ { std::unique_lock<std::mutex> lock(mutex); if (jobs.empty()) { cv.wait(lock); } job = std::move(jobs.front()); jobs.pop_front(); } /* execute job */ job(); } } }; int main() { Worker worker; worker.submit([]() { std::printf("Hi, this is your first job!\n"); }); worker.submit([]() { std::printf("Now you get another job.\n"); }); }
Wie sieht dann sie Ausführung aus?
$shell> g++ -O3 -g -I/home/numerik/pub/hpc/session16 -std=c++11 -o worker worker.cpp $shell> ./worker /home/borchert/hpc/commons/uebungen/tmp/shell.sh: line 2: 1821 Segmentation Fault (core dumped) ./worker
Aufgaben
-
Was genau wird von der capture [=] im Konstruktor erfasst?
-
Warum erfolgt die Initialisierung des Threads im Konstruktor nicht in einem initializer wie sonst auch üblich bei Konstruktoren?
-
Warum genau terminiert das Programm mit Abort (core dumped)?
-
Wie ließe sich das verhindern? Korrigieren Sie worker.cpp so, dass das Hauptprogramm unverändert bleibt und somit nur die Worker-Klasse korrigiert wird. Denken Sie dabei daran, dass diese Klasse den RAII-Prinzipien genügen sollte.