Master/Worker-Pattern

Wir haben bislang bei jeder parallelisierten Funktion nach dem Fork-and-Join-Pattern die gewünschte Zahl von Threads erzeugt, diese die jeweiligen Teilaufgaben rechnen lassen und schließlich auf das Ende der Threads mit der join-Methode gewartet.

Das Erzeugen und Abbauen von Threads ist jedoch nicht sehr billig. Dies demonstriert folgendes kleines Testprogramm, das die hierfür benötigte Zeit misst:

#include <cstdio>
#include <thread>
#include <hpc/aux/walltime.h>

int main() {
   unsigned int counter = 0;
   unsigned int nof_threads = 1<<15;

   hpc::aux::WallTime<double> wall_time;
   wall_time.tic();
      for (unsigned int i = 0; i < nof_threads; ++i) {
         auto t = std::thread([&]() { ++counter; });
         t.join();
      }
   auto t = wall_time.toc();
   std::printf("avg time per thread creation = %.2f us\n",
      t / nof_threads * 1000000L);
}
$shell> g++ -O3 -g -I/home/numerik/pub/hpc/session16 -std=c++11 -o threads threads.cpp
$shell> ./threads
avg time per thread creation = 33.30 us

Da die Zahl rechenintensiver Threads ohnehin zu Beginn festgelegt wird (wie beispielsweise durch das Abfragen von std::thread::hardware_concurrency()), erscheint es naheliegend, diese Threads zu Beginn zu erzeugen und dann die parallel auszuführenden Teilaufgaben an diese Threads zu delegieren.

Prinzipiell geht dies mit dem Master/Worker-Pattern, bei dem

Wie kann so ein Auftrag repräsentiert werden? Wenn die Aufträge alle von einheitlicher Natur sind, könnte diese durch eine entsprechende uniforme struct beschrieben werden. In der Praxis sehen die Aufträge aber sehr unterschiedlich aus. Selbst bei unseren bisherigen Beispielen würden wir gerne parallel Matrizen initialisieren, kopieren und miteinander multiplizieren.

Für die Unterstützung polymorpher Aufträge bieten sich in C++11 parameterlose Funktionsobjekte an (worunter alle Lambda-Ausdrücke fallen) und für einen Container eines polymorphen Funktionsobjekts empfiehlt sich std::function aus <functional>. Nur der Return-Typ müsste hier festgelegt werden. Im einfachsten Falle könnten wir hier mit std::function<void()> arbeiten.

Im folgenden Ansatz repräsentiert ein Objekt der Klasse Worker genau einen Worker und den zugehörigen Thread. Die Liste jobs dient der Verwaltung der Aufträge, die dieser Worker über die submit-Methode erhält:

#include <cassert>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <list>
#include <mutex>
#include <thread>
#include <utility>

struct Worker {
   public:
      using Job = std::function<void()>;
      Worker() {
         t = std::thread([=]() { process_jobs(); });
      }
      void submit(Job job) {
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
      }
   private:
      std::thread t;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;

      void process_jobs() {
         for(;;) {
            Job job;
            /* fetch job */
            {
               std::unique_lock<std::mutex> lock(mutex);
               if (jobs.empty()) {
                  cv.wait(lock);
               }
               job = std::move(jobs.front());
               jobs.pop_front();
            }
            /* execute job */
            job();
         }
      }
};

int main() {
   Worker worker;
   worker.submit([]() { std::printf("Hi, this is your first job!\n"); });
   worker.submit([]() { std::printf("Now you get another job.\n"); });
}

Wie sieht dann sie Ausführung aus?

$shell> g++ -O3 -g -I/home/numerik/pub/hpc/session16 -std=c++11 -o worker worker.cpp
$shell> ./worker
/home/borchert/hpc/commons/uebungen/tmp/shell.sh: line 2:  1821 Segmentation Fault      (core dumped) ./worker

Aufgaben