Master/worker pattern

Content

So far we have spontaneously created a bunch of threads following the fork-and-join pattern whenever we wanted to parallelize a problem, joined them and finally destructed the thread objects. However, the construction and destruction of threads has its cost as demonstrated by following small test program that measures the required time:

#include <thread>
#include <printf.hpp>
#include <hpc/aux/walltime.hpp>

int main() {
   unsigned int counter = 0;
   unsigned int nof_threads = 1<<15;

   hpc::aux::WallTime<double> wall_time;
   wall_time.tic();
      for (unsigned int i = 0; i < nof_threads; ++i) {
         auto t = std::thread([&]() { ++counter; });
         t.join();
      }
   auto t = wall_time.toc();
   fmt::printf("avg time per thread creation = %.2f us\n",
      t / nof_threads * 1000000L);
}
theon$ g++ -O3 -g -I/home/numerik/pub/hpc/ws18/session17 -std=c++11 -o threads threads.cpp
theon$ ./threads
avg time per thread creation = 35.00 us
theon$ 
heim$ g++-7.2 -O3 -g -I/home/numerik/pub/hpc/ws18/session17 -std=c++17 -o threads threads.cpp -lpthread
heim$ ./threads
avg time per thread creation = 20.13 us
heim$ 

As the number of compute-intensive threads is usually known right from the beginning (for example by querying std::thread::hardware_concurrency()) it appears straightforward to create these threads once and to keep them around until the program finishes. Whenever we want to parallelize a problem, we could delegate the subtasks to our already existing pool of threads.

This can be done using the so-called master/worker pattern whereas

How to represent such a subtask? If all subtasks would be of the same kind, we could create one struct describing them. However, in practice we encounter very different kinds of subtasks. Even in our examples so far, we parallelized the initialization, copying, and multiplication of matrices. Hence we need some generic approach to this problem.

C++11 provides uses for this purpose parameterless function objects. They are already used for the construction of threads and they permit the use of lambda expressions. We need a container to store polymorphic function objects and fortunately the library provides such a container with std::function out of <functional>. We just need to fix the return type of the function types. To keep it as simple as possible, we could start with std::function<void()>.

In the following prototype, an object of class Worker represents exactly one worker and its associated thread. The linear list jobs allows to maintain a queue of incoming subtasks which are submitted using the submit method:

#include <cassert>
#include <condition_variable>
#include <functional>
#include <list>
#include <mutex>
#include <thread>
#include <utility>
#include <printf.hpp>

struct Worker {
   public:
      using Job = std::function<void()>;
      Worker() {
         t = std::thread([=]() { process_jobs(); });
      }
      void submit(Job job) {
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
      }
   private:
      std::thread t;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;

      void process_jobs() {
         for(;;) {
            Job job;
            /* fetch job */
            {
               std::unique_lock<std::mutex> lock(mutex);
               while (jobs.empty()) {
                  cv.wait(lock);
               }
               job = std::move(jobs.front());
               jobs.pop_front();
            }
            /* execute job */
            job();
         }
      }
};

int main() {
   Worker worker;
   worker.submit([]() { fmt::printf("Hi, this is your first job!\n"); });
   worker.submit([]() { fmt::printf("Now you got another job.\n"); });
}

Let us have a look at its execution:

theon$ g++ -O3 -g -std=c++17 -o worker worker.cpp
theon$ ./worker
terminate called without an active exception
theon$ 
heim$ g++-7.2 -O3 -g -std=c++17 -o worker worker.cpp -lpthread
heim$ ./worker
terminate called without an active exception
Aborted
heim$ 

Exercises