Synchronisierung mit den Workern

Die Synchronisierung mit der Beendigung eines Threads war möglich mit Hilfe der Methode join. Da bei einem Thread-Pool die Threads nach der Fertigstellung eines Auftrags sich mit anderen Aufgaben beschäftigen, fehlt hier eine wichtige Synchronisierungsmöglichkeit. Wir können momentan nicht ohne weiteres darauf warten, dass eine Aufgabe beendet ist.

Das Problem lässt sich jedoch mit einer Hilfsklasse lösen, die mit einer Mutex- und Bedingungsvariablen arbeitet:

struct JobStatus {
   public:
      JobStatus() : finished(false) {
      }
      void is_finished() {
         std::unique_lock<std::mutex> lock(mutex);
         assert(!finished);
         finished = true;
         cv.notify_all();
      }
      void wait() {
         std::unique_lock<std::mutex> lock(mutex);
         if (!finished) {
            cv.wait(lock);
         }
      }
   private:
      std::mutex mutex;
      std::condition_variable cv;
      bool finished;
};

Sowohl das den Auftrag repräsentierenden Funktionsobjekt als auch der Auftraggeber haben Zugriff auf ein solches Objekt. Das Funktionsobjekt ruft die Methode is_finished auf, sobald es fertig ist. Der Auftraggeber kann sich dann mit Hilfe von wait synchronisieren. So könnte ein triviales Beispiel aussehen:

ThreadPool pool(2);
JobStatus hello;
pool.submit([&]() {
   printf("Hello!\n");
   hello.is_finished();
});
hello.wait();
printf("Job appears to be finished!\n");

Aufgabe

Schreiben Sie eine Template-Funktion zum Initialisieren einer Matrix, die als zusätzlichen Parameter eine Referenz auf einen Thread-Pool erhält und die die Matrix A zweidimensional in Teilblöcke \(\mathbf{A}_{i,j}\) zerlegt und die Initialisierung der einzelnen Teilblöcke dem Thread-Pool überlässt. Die maximale Größe der Teilblöcke ist dabei als Parameter zu übergeben. Die Funktion sollte erst dann zurückkehren, wenn die Initialisierung vollständig abgeschlossen ist.

Die Vorlesungsbibliothek einschließlich der Matrixklassen steht Ihnen unter /home/numerik/pub/hpc/session16 zur Verfügung.

Als Vorlage können Sie die letzte Fassung aus der letzten Session verwenden:

#include <thread>
#include <random>
#include <condition_variable>
#include <cstdlib>
#include <list>
#include <mutex>
#include <vector>
#include <hpc/matvec/gematrix.h>
#include <hpc/matvec/apply.h>
#include <hpc/matvec/print.h>
#include <hpc/aux/slices.h>

template<typename T>
struct RandomEnginePool {
      using EngineType = T;
      RandomEnginePool(std::size_t size) :
            size(size), nof_free_engines(size),
            inuse(size), engines(size) {
         std::random_device r;
         for (std::size_t i = 0; i < size; ++i) {
            engines[i].seed(r()); inuse[i] = false;
         }
      }
      T& get() {
         std::unique_lock<std::mutex> lock(mutex);
         if (nof_free_engines == 0) {
            cv.wait(lock);
         }
         for (std::size_t i = 0; i < size; ++i) {
            if (!inuse[i]) {
               inuse[i] = true; --nof_free_engines;
               return engines[i];
            }
         }
         assert(false);
      }
      void free(T& engine) {
         {
            std::unique_lock<std::mutex> lock(mutex);
            bool found = false;
            for (std::size_t i = 0; i < size; ++i) {
               if (&engine == &engines[i]) {
                  inuse[i] = false; ++nof_free_engines;
                  found = true; break;
               }
            }
            assert(found);
         }
         cv.notify_one();
      }
   private:
      std::mutex mutex;
      std::condition_variable cv;
      std::size_t size;
      std::size_t nof_free_engines;
      std::vector<bool> inuse;
      std::vector<T> engines;
};

template<typename T>
struct RandomEngineGuard {
   using EngineType = T;
   RandomEngineGuard(RandomEnginePool<T>& pool) : pool(pool), engine(pool.get()) {
   }
   ~RandomEngineGuard() {
      pool.free(engine);
   }
   T& get() {
      return engine;
   }
   RandomEnginePool<T>& pool;
   T& engine;
};

template<typename MA, typename Pool>
typename std::enable_if<hpc::matvec::IsRealGeMatrix<MA>::value, void>::type
randomInit(MA& A, Pool& pool) {
   using ElementType = typename MA::ElementType;
   using Index = typename MA::Index;
   using EngineType = typename Pool::EngineType;

   std::uniform_real_distribution<double> uniform(-100, 100);
   RandomEngineGuard<EngineType> guard(pool);
   auto& engine(guard.get());

   hpc::matvec::apply(A, [&](ElementType& val, Index i, Index j) -> void {
      val = uniform(engine);
   });
}

int main() {
   using namespace hpc::matvec;
   using namespace hpc::aux;
   RandomEnginePool<std::mt19937> pool(2);
   GeMatrix<double> A(51, 7);
   unsigned int nof_threads = std::thread::hardware_concurrency();

   std::vector<std::thread> threads(nof_threads);
   Slices<GeMatrix<double>::Index> slices(nof_threads, A.numRows);
   for (int index = 0; index < nof_threads; ++index) {
      auto firstRow = slices.offset(index);
      auto numRows = slices.size(index);
      auto A_ = A(firstRow, 0, numRows, A.numCols);
      threads[index] = std::thread([=,&pool]() mutable { randomInit(A_, pool); });
   }
   for (int index = 0; index < nof_threads; ++index) {
      threads[index].join();
   }
   print(A, "A");
}