Promises & Futures

Die spontan entwickelte JobStatus-Klasse könnte natürlich erweitert werden und beispielsweise auch dazu dienen, ein Resultat zu übermitteln. Glücklicherweise gibt es bereits eine entsprechende Klasse in der Standardbibliothek von C++11.

Relevant sind hier die Klassen std::promise und std::future, beide aus <future>. Ein std::promise-Objekt ähnelt unserem JobStatus -- nur gibt es nur eine Funktion set_value zum Setzen des zurückzugebenden Werts. Statt der Methode wait gibt es die Möglichkeit, mit get_future genau einmal ein std::future-Objekt abzurufen, das wiederum die Methode get anbietet, die die Synchronisierung einschließt. Die Funktionalität von JobStatus wurde somit in zwei Klassen bzw. zwei Objekte augesplittet:

Durch die Aufsplittung kann nicht versehentlich der Auftraggeber so tun, als ob der Auftrag bereits erledigt wäre. Wer nur ein std::future-Objekt hat, kommt nicht mehr an das std::promise-Objekt heran. Umgekehrt gilt, dass wenn einmal mit get_future ein std::future-Objekt vom std::promise-Objekt erzeugt wurde, dann klappt dies nicht ein zweites Mal.

So könnte ein Trivial-Beispiel aussehen:

int main() {
   ThreadPool pool(2);
   std::promise<int> promise;
   std::future<int> future = promise.get_future();
   pool.submit([&]() {
      printf("Hello!\n");
      promise.set_value(42);
   });
   printf("Job returned %d\n", future.get());
}

Eine Kombination aus std::promise und std::function steht mit std::packaged_task zur Verfügung, so dass der Thread-Pool von std::function auf std::packaged_task umgestellt werden kann. Dann liefert submit immer ein std::future-Objekt zurück und somit ist eine Synchronisierung immer möglich:

#include <cassert>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <list>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

template<typename T>
struct ThreadPool {
   public:
      using Job = std::packaged_task<T()>;
      ThreadPool(unsigned int nof_threads) :
            nof_threads(nof_threads), active(0),
            finished(false), threads(nof_threads) {
         for (auto& t: threads) {
            t = std::thread([=]() { process_jobs(); });
         }
      }
      ~ThreadPool() {
         {
            std::unique_lock<std::mutex> lock(mutex);
            finished = true;
         }
         cv.notify_all();
         for (auto& t: threads) {
            t.join();
         }
      }
      template<typename Task>
      std::future<T> submit(Task task) {
         Job job(task);
         auto result = job.get_future();
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
         return result;
      }
   private:
      unsigned int nof_threads;
      unsigned int active;
      bool finished;
      std::vector<std::thread> threads;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;

      void process_jobs() {
         for(;;) {
            Job job;
            /* fetch job */
            {
               std::unique_lock<std::mutex> lock(mutex);
               while (jobs.empty() && (active > 0 || !finished)) {
                  cv.wait(lock);
               }
               if (jobs.empty() && active == 0 && finished) break;
               job = std::move(jobs.front());
               jobs.pop_front();
               ++active;
            }
            /* execute job */
            job();
            {
               std::unique_lock<std::mutex> lock(mutex);
               --active;
            }
         }
         /* if one thread finishes, all others have to finish as well */
         cv.notify_all();
      }
};

int main() {
   ThreadPool<int> pool(2);
   auto result = pool.submit([&]() -> int {
      std::printf("Hello!\n"); return 42;
   });
   std::printf("Job returned %d\n", result.get());
}

Hinweis: Ältere g++-Versionen kommen damit nicht zurecht (siehe https://gcc.gnu.org/bugzilla/show_bug.cgi?id=53137). Dies betrifft leider momentan noch die Maschinen im E.44. Probieren Sie dies auf der Thales aus oder ersetzen Sie den Aufruf process_jobs(); durch this->process_jobs(). Der gleiche Hinweis gilt auch für die Beispiele auf den folgenden Seiten.

Aufgabe

Passen Sie Ihre zuvor entwickelte Version zur Matrix-Initialisierung an den neuen Thread-Pool an.

Die Lösungen können wieder auf der Thales eingereicht werden:

submit hpc session16 session16.tar