Thread-Pools

Ein Worker alleine gibt natürlich nicht viel her. Wir könnten natürlich mehrere Worker-Objekte erzeugen (etwa als Array) und diese dann mit Aufträgen versorgen. Aber welchem Worker geben wir einen Auftrag? Wissen wir, welcher Worker gerade untätig auf einen eingehenden Job wartet? Es erscheint daher sinnvoll, nicht nur eine einzelne Klasse zu haben, die sich um einen Thread kümmert, sondern einen Thread-Pool mit einer Vielzahl von Threads und einer einzigen Queue mit Aufträgen.

So könnte eine entsprechende Klasse aussehen. Die Threads sind in einem std::vector organisiert. Entsprechend müssen wir im Konstruktor mehr als nur einen Thread starten und im Destruktor uns mit allen Threads per join synchronisieren. Beides erfolgt hier mit der in C++11 eingeführten for-Schleife, mit der durch sämtliche Objekte eines Containers (hier einem std::vector) iteriert wird.

#include <cassert>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <list>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

struct ThreadPool {
   public:
      using Job = std::function<void()>;
      ThreadPool(unsigned int nof_threads) :
            nof_threads(nof_threads), finished(false),
            threads(nof_threads) {
         for (auto& t: threads) {
            t = std::thread([=]() { process_jobs(); });
         }
      }
      ~ThreadPool() {
         {
            std::unique_lock<std::mutex> lock(mutex);
            finished = true;
         }
         cv.notify_all();
         for (auto& t: threads) {
            t.join();
         }
      }
      void submit(Job job) {
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
      }
   private:
      unsigned int nof_threads;
      bool finished;
      std::vector<std::thread> threads;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;

      void process_jobs() {
         for(;;) {
            Job job;
            /* fetch job */
            {
               std::unique_lock<std::mutex> lock(mutex);
               if (jobs.empty() && !finished) {
                  cv.wait(lock);
               }
               if (jobs.empty() && finished) break;
               job = std::move(jobs.front());
               jobs.pop_front();
            }
            /* execute job */
            job();
         }
      }
};

int main() {
   ThreadPool pool(2);
   pool.submit([]() { std::printf("Hi, this is your first job!\n"); });
   pool.submit([]() { std::fprintf(stderr, "Now you get another job.\n"); });
}

Worker können auch neue Jobs erzeugen. Bei Teile-und-Herrsche-Algorithmen ist dies sogar die Regel. Probieren wir das mal mit der vorgestellten Implementierung eines Thread-Pools an einem einfachen Beispiel:

   pool.submit([&]() {
      std::cout << "Hi, this is thread " << std::this_thread::get_id() <<
         std::endl;;
      pool.submit([&]() {
         pool.submit([]() {
            std::cout << "Hello guys, I'm " << std::this_thread::get_id() <<
               std::endl;;
         });
         std::cout << "Hi guys, I'm " << std::this_thread::get_id() <<
            std::endl;;
      });
      pool.submit([&]() {
         pool.submit([]() {
            std::cout << "O wonder, I'm " << std::this_thread::get_id() <<
               std::endl;;
         });
         std::cout << "Huhu guys, I'm " << std::this_thread::get_id() <<
            std::endl;;
      });
      std::cout << "Hi, this is again thread " << std::this_thread::get_id() <<
         std::endl;;
   });

Hierbei steht std::this_thread immer für das Thread-Objekt des gerade laufenden Threads und die Methode get_id() liefert ein Objekts des Typs std::thread::id. Da solche Objekte nicht nach int konvertierbar sind (ihre Repräsentierung hängt von der jeweiligen Implementierung ab), werden sie hier mit Hilfe der iostream und dem überladenen <<-Operator ausgegeben -- das ist die einzige portable Möglichkeit, das zu tun. (Wir ignorieren hier mal das Problem der konkurrierenden Zugriffe auf std::cout.)

Ein Testlauf sieht so aus:

$shell> g++ -O3 -g -I/home/numerik/pub/hpc/session16 -std=c++11 -o tpool2 tpool2.cpp
$shell> ./tpool2
Hi, this is thread 2
Hi, this is again thread 2
Hi guys, I'm 2
Huhu guys, I'm 2
Hello guys, I'm 2
O wonder, I'm 2

Wenn wir die Thread-IDs ansehen, stellen wir überraschenderweise fest, dass alle Ausgaben auf die gleiche Thread-ID verweisen.

Hinweis: Ältere g++-Versionen kommen damit nicht zurecht (siehe https://gcc.gnu.org/bugzilla/show_bug.cgi?id=53137). Dies betrifft leider momentan noch die Maschinen im E.44. Probieren Sie dies auf der Thales aus oder ersetzen Sie den Aufruf process_jobs(); durch this->process_jobs(). Der gleiche Hinweis gilt auch für die Beispiele auf den folgenden Seiten.

Aufgaben

Vorlage

#include <cassert>
#include <condition_variable>
#include <functional>
#include <iostream>
#include <list>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

struct ThreadPool {
   public:
      using Job = std::function<void()>;
      ThreadPool(unsigned int nof_threads) :
            nof_threads(nof_threads), finished(false),
            threads(nof_threads) {
         for (auto& t: threads) {
            t = std::thread([=]() { process_jobs(); });
         }
      }
      ~ThreadPool() {
         {
            std::unique_lock<std::mutex> lock(mutex);
            finished = true;
         }
         cv.notify_all();
         for (auto& t: threads) {
            t.join();
         }
      }
      void submit(Job job) {
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
      }
   private:
      unsigned int nof_threads;
      bool finished;
      std::vector<std::thread> threads;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;

      void process_jobs() {
         for(;;) {
            Job job;
            /* fetch job */
            {
               std::unique_lock<std::mutex> lock(mutex);
               if (jobs.empty() && !finished) {
                  cv.wait(lock);
               }
               if (jobs.empty() && finished) break;
               job = std::move(jobs.front());
               jobs.pop_front();
            }
            /* execute job */
            job();
         }
      }
};

int main() {
   ThreadPool pool(2);
   pool.submit([&]() {
      std::cout << "Hi, this is thread " << std::this_thread::get_id() <<
         std::endl;;
      pool.submit([&]() {
         pool.submit([]() {
            std::cout << "Hello guys, I'm " << std::this_thread::get_id() <<
               std::endl;;
         });
         std::cout << "Hi guys, I'm " << std::this_thread::get_id() <<
            std::endl;;
      });
      pool.submit([&]() {
         pool.submit([]() {
            std::cout << "O wonder, I'm " << std::this_thread::get_id() <<
               std::endl;;
         });
         std::cout << "Huhu guys, I'm " << std::this_thread::get_id() <<
            std::endl;;
      });
      std::cout << "Hi, this is again thread " << std::this_thread::get_id() <<
         std::endl;;
   });
}