Promises & Futures
Die spontan entwickelte JobStatus-Klasse könnte natürlich erweitert werden und beispielsweise auch dazu dienen, ein Resultat zu übermitteln. Glücklicherweise gibt es bereits eine entsprechende Klasse in der Standardbibliothek von C++11.
Relevant sind hier die Klassen std::promise und std::future, beide aus <future>. Ein std::promise-Objekt ähnelt unserem JobStatus -- nur gibt es nur eine Funktion set_value zum Setzen des zurückzugebenden Werts. Statt der Methode wait gibt es die Möglichkeit, mit get_future genau einmal ein std::future-Objekt abzurufen, das wiederum die Methode get anbietet, die die Synchronisierung einschließt. Die Funktionalität von JobStatus wurde somit in zwei Klassen bzw. zwei Objekte augesplittet:
-
std::promise ist für den Auftragnehmer (also beispielsweise ein Worker in unserem Thread-Pool).
-
std::future ist für den Auftraggeber.
Durch die Aufsplittung kann nicht versehentlich der Auftraggeber so tun, als ob der Auftrag bereits erledigt wäre. Wer nur ein std::future-Objekt hat, kommt nicht mehr an das std::promise-Objekt heran. Umgekehrt gilt, dass wenn einmal mit get_future ein std::future-Objekt vom std::promise-Objekt erzeugt wurde, dann klappt dies nicht ein zweites Mal.
So könnte ein Trivial-Beispiel aussehen:
int main() { ThreadPool pool(2); std::promise<int> promise; std::future<int> future = promise.get_future(); pool.submit([&]() { printf("Hello!\n"); promise.set_value(42); }); printf("Job returned %d\n", future.get()); }
Eine Kombination aus std::promise und std::function steht mit std::packaged_task zur Verfügung, so dass der Thread-Pool von std::function auf std::packaged_task umgestellt werden kann. Dann liefert submit immer ein std::future-Objekt zurück und somit ist eine Synchronisierung immer möglich:
#include <cassert> #include <condition_variable> #include <cstdio> #include <functional> #include <future> #include <list> #include <mutex> #include <thread> #include <utility> #include <vector> template<typename T> struct ThreadPool { public: using Job = std::packaged_task<T()>; ThreadPool(unsigned int nof_threads) : nof_threads(nof_threads), active(0), finished(false), threads(nof_threads) { for (auto& t: threads) { t = std::thread([=]() { process_jobs(); }); } } ~ThreadPool() { { std::unique_lock<std::mutex> lock(mutex); finished = true; } cv.notify_all(); for (auto& t: threads) { t.join(); } } template<typename Task> std::future<T> submit(Task task) { Job job(task); auto result = job.get_future(); std::unique_lock<std::mutex> lock(mutex); jobs.push_back(std::move(job)); cv.notify_one(); return result; } private: unsigned int nof_threads; unsigned int active; bool finished; std::vector<std::thread> threads; std::mutex mutex; std::condition_variable cv; std::list<Job> jobs; void process_jobs() { for(;;) { Job job; /* fetch job */ { std::unique_lock<std::mutex> lock(mutex); while (jobs.empty() && (active > 0 || !finished)) { cv.wait(lock); } if (jobs.empty() && active == 0 && finished) break; job = std::move(jobs.front()); jobs.pop_front(); ++active; } /* execute job */ job(); { std::unique_lock<std::mutex> lock(mutex); --active; } } /* if one thread finishes, all others have to finish as well */ cv.notify_all(); } }; int main() { ThreadPool<int> pool(2); auto result = pool.submit([&]() -> int { std::printf("Hello!\n"); return 42; }); std::printf("Job returned %d\n", result.get()); }
Hinweis: Ältere g++-Versionen kommen damit nicht zurecht (siehe https://gcc.gnu.org/bugzilla/show_bug.cgi?id=53137). Dies betrifft leider momentan noch die Maschinen im E.44. Probieren Sie dies auf der Thales aus oder ersetzen Sie den Aufruf process_jobs(); durch this->process_jobs(). Der gleiche Hinweis gilt auch für die Beispiele auf den folgenden Seiten.
Aufgabe
Passen Sie Ihre zuvor entwickelte Version zur Matrix-Initialisierung an den neuen Thread-Pool an.
Die Lösungen können wieder auf der Thales eingereicht werden:
submit hpc session16 session16.tar