Verbesserung der ThreadPool-Klasse

Zuletzt haben wir gesehen, wie Jobs eines Thread-Pools in std::packaged_task-Objekte verpackt werden können und die submit-Methode ein std::future-Objekt zurückliefert, das es erlaubt, sich mit der Beendigung des Jobs zu synchronisieren und ein Ergebnis abzuholen. In dieser zuletzt vorgestellten Fassung musste der Return-Typ für den gesamten Thread-Pool als Template-Parameter festgelegt werden, d.h. unterschiedliche Jobs konnten nicht unterschiedliche Return-Typen haben.

Es ist möglich, diesen Template-Parameter zu entfernen und stattdessen aus der submit-Methode eine Template-Methode zu machen, die als Jobs beliebige parameterlose Funktionstypen akzeptiert mit der Konsequenz, dass unterschiedliche Jobs auch unterschiedliche Return-Typen haben können. Das ist möglich, weil wir das std::future-Objekt sofort zurückgeben und alles andere vom Return-Typ unabhängig ist.

Das Problem war nur, dass der Return-Typ als Template-Parameter eines Jobs, d.h. von std::packaged_task erhalten blieb:

   using Job = std::packaged_task<T()>;
   std::list<Job> jobs;

Interessant ist hier aber, dass der Aufruf eines std::packaged_task-Objekts parameterlos ist und nur void zurückliefert. Somit liegt die Idee nahe, die std::packaged_task-Objekte in std::function<void()>-Objekte zu verpacken. Dies hätte den Vorteil, dass die Datenstruktur unabhängig von den Return-Typen ist und somit der Template-Parameter von ThreadPool entfallen kann.

Leider lässt sich ein std::packaged_task-Objekt nicht ohne weiteres in ein std::function-Objekt verpacken, da std::function darauf besteht, dass das Funktionsobjekt per Kopie konstruiert werden kann (copy constructible), die std::packaged_task-Objekte jedoch nicht per Kopie, sondern nur per Verschieben konstruiert werden können (move constructible). Das liegt daran, dass zu einem std::packaged_task-Objekt auch ein std::promise-Objekt gehört und dieses nicht kopierbar ist, sondern auch nur Verschiebungen unterstützt.

So ein Dilemma lässt sich lösen, indem ein solches Objekt hinter einen Zeiger versteckt wird, da Zeiger beliebig kopiert werden können. Dadurch entsteht aber das Problem, wann das Objekt hinter dem Zeiger freigegeben werden kann. Hierfür gibt es in C++ glücklicherweise sogenannte intelligente Zeiger (smart pointers), die in der Lage sind, die Zahl der Zeigerkopien zu zählen und das Objekt freizugeben, wenn die Zahl auf 0 sinkt. Diese Zeiger setzen eine Verallgemeinerung des RAII-Prinzips um, bei dem das Aufräumen verzögert wird, bis die letzte Kopie abgebaut wird. Der entsprechende Datentyp ist std::shared_ptr aus <memory>. Die Kombination aus dem new-Operator und das Verpacken in ein std::shared_ptr-Objekt steht mit der Template-Funktion std::make_shared zur Verfügung.

In der jetzt zur Verfügung gestellten Fassung des Thread-Pools sieht die Liste der Jobs so aus (hier nennt sie sich tasks):

std::list<std::function<void()>> tasks;

Und submit kann dann so umgesetzt werden:

template<typename F>
auto submit(F&& task_function)
      -> std::future<decltype(task_function())> {
   using T = decltype(task_function());
   auto task = std::make_shared<std::packaged_task<T()>>(
      std::forward<F>(task_function));
   std::future<T> result = task->get_future();
   std::lock_guard<std::mutex> lock(mutex);
   tasks.push_back([task]() { (*task)(); } );
   cv.notify_one();
   return result;
}

Sie finden diese Fassung in der neuen Version der Vorlesungsbibliothek unter dem Verzeichnis /home/numerik/pub/hpc/session18 auf unseren Rechnern. Der ThreadPool steht über <hpc/mt/thread_pool.h> zur Verfügung, der bislang verwendete RandomEnginePool ist über <hpc/aux/repool.h> zu finden.

Aufgaben

Vorlage

Als Vorlage kann die Fassung aus der 16. Session dienen:

#include <condition_variable>
#include <cstdlib>
#include <list>
#include <mutex>
#include <random>
#include <thread>
#include <utility>
#include <vector>
#include <hpc/matvec/gematrix.h>
#include <hpc/matvec/apply.h>
#include <hpc/matvec/print.h>
#include <hpc/aux/slices.h>

template<typename T>
struct RandomEnginePool {
      using EngineType = T;
      RandomEnginePool(std::size_t size) :
            size(size), nof_free_engines(size),
            inuse(size), engines(size) {
         std::random_device r;
         for (std::size_t i = 0; i < size; ++i) {
            engines[i].seed(r()); inuse[i] = false;
         }
      }
      T& get() {
         std::unique_lock<std::mutex> lock(mutex);
         if (nof_free_engines == 0) {
            cv.wait(lock);
         }
         for (std::size_t i = 0; i < size; ++i) {
            if (!inuse[i]) {
               inuse[i] = true; --nof_free_engines;
               return engines[i];
            }
         }
         assert(false);
      }
      void free(T& engine) {
         {
            std::unique_lock<std::mutex> lock(mutex);
            bool found = false;
            for (std::size_t i = 0; i < size; ++i) {
               if (&engine == &engines[i]) {
                  inuse[i] = false; ++nof_free_engines;
                  found = true; break;
               }
            }
            assert(found);
         }
         cv.notify_one();
      }
   private:
      std::mutex mutex;
      std::condition_variable cv;
      std::size_t size;
      std::size_t nof_free_engines;
      std::vector<bool> inuse;
      std::vector<T> engines;
};

template<typename T>
struct RandomEngineGuard {
   using EngineType = T;
   RandomEngineGuard(RandomEnginePool<T>& pool) : pool(pool), engine(pool.get()) {
   }
   ~RandomEngineGuard() {
      pool.free(engine);
   }
   T& get() {
      return engine;
   }
   RandomEnginePool<T>& pool;
   T& engine;
};

template<typename MA, typename Pool>
typename std::enable_if<hpc::matvec::IsRealGeMatrix<MA>::value, void>::type
randomInit(MA& A, Pool& pool) {
   using ElementType = typename MA::ElementType;
   using Index = typename MA::Index;
   using EngineType = typename Pool::EngineType;

   std::uniform_real_distribution<double> uniform(-100, 100);
   RandomEngineGuard<EngineType> guard(pool);
   auto& engine(guard.get());

   hpc::matvec::apply(A, [&](ElementType& val, Index i, Index j) -> void {
      val = uniform(engine);
   });
}

struct ThreadPool {
   public:
      using Job = std::function<void()>;
      ThreadPool(unsigned int nof_threads) :
            nof_threads(nof_threads), active(0),
            finished(false), threads(nof_threads) {
         for (auto& t: threads) {
            t = std::thread([=]() { process_jobs(); });
         }
      }
      ~ThreadPool() {
         {
            std::unique_lock<std::mutex> lock(mutex);
            finished = true;
         }
         cv.notify_all();
         for (auto& t: threads) {
            t.join();
         }
      }
      void submit(Job job) {
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
      }
   private:
      unsigned int nof_threads;
      unsigned int active;
      bool finished;
      std::vector<std::thread> threads;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;

      void process_jobs() {
         for(;;) {
            Job job;
            /* fetch job */
            {
               std::unique_lock<std::mutex> lock(mutex);
               while (jobs.empty() && (active > 0 || !finished)) {
                  cv.wait(lock);
               }
               if (jobs.empty() && active == 0 && finished) break;
               job = std::move(jobs.front());
               jobs.pop_front();
               ++active;
            }
            /* execute job */
            job();
            {
               std::unique_lock<std::mutex> lock(mutex);
               --active;
            }
         }
         /* if one thread finishes, all others have to finish as well */
         cv.notify_all();
      }
};

struct JobStatus {
   public:
      JobStatus() : finished(false) {
      }
      void done() {
         std::unique_lock<std::mutex> lock(mutex);
         assert(!finished);
         finished = true;
         cv.notify_all();
      }
      void wait() {
         std::unique_lock<std::mutex> lock(mutex);
         if (!finished) {
            cv.wait(lock);
         }
      }
   private:
      std::mutex mutex;
      std::condition_variable cv;
      bool finished;
};

template<typename MA, typename RandomEnginePool, typename ThreadPool>
typename std::enable_if<hpc::matvec::IsRealGeMatrix<MA>::value, void>::type
randomInit(MA& A, RandomEnginePool& repool, ThreadPool& tpool,
      typename MA::Index maxRows,
      typename MA::Index maxCols) {
   using ElementType = typename MA::ElementType;
   using Index = typename MA::Index;
   using namespace hpc::aux;

   Index nof_row_slices = A.numRows / maxRows;
   if (A.numRows % maxRows) ++nof_row_slices;
   Index nof_col_slices = A.numCols / maxCols;
   if (A.numCols % maxCols) ++nof_col_slices;

   std::vector<JobStatus> job_status(nof_row_slices * nof_col_slices);

   Index job_index = 0;
   foreach_slice(nof_row_slices, A.numRows, [&]
         (Index rows_offset, Index rows_size) {
      foreach_slice(nof_col_slices, A.numCols, [&,rows_offset,rows_size]
            (Index cols_offset, Index cols_size) {
         auto A_ = A(rows_offset, cols_offset, rows_size, cols_size);
         tpool.submit([=, &job_status, &repool]() mutable {
            randomInit(A_, repool);
            job_status[job_index].done();
         });
         ++job_index;
      });
   });

   for (auto& js: job_status) {
      js.wait();
   }
}

int main() {
   using namespace hpc::matvec;
   using namespace hpc::aux;
   unsigned int nof_threads = std::thread::hardware_concurrency();
   RandomEnginePool<std::mt19937> repool(nof_threads);
   ThreadPool tpool(nof_threads);

   GeMatrix<double> A(51, 7);
   randomInit(A, repool, tpool, 8, 8);
   print(A, "A");
}