Improving the thread pool class

Content

Let us return to the most recent version of our thread pool class that introduced promises and futures:

template<typename T>
struct ThreadPool {
   public:
      using Job = std::packaged_task<T()>;
      // ...
      template<typename Task>
      std::future<T> submit(Task task) {
         Job job(task);
         auto result = job.get_future();
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
         return result;
      }
   private:
      unsigned int nof_threads;
      unsigned int active;
      bool finished;
      std::vector<std::thread> threads;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;
      // ...
};

This version packaged jobs into objects of type std::packaged_task that were a combination of std::function and a promise. This construct permitted the client (the one holding the future) to synchronize with the termination of a job. This allowed us to implement the fork-and-join pattern on base of a thread pool.

However, this came with a restriction: All jobs had to share the same return type T which was fixed as a template parameter of the ThreadPool class. You might wonder if it is possible to avoid this restriction and thereby to remove this template parameter from the thread pool class.

Currently, the template parameter T is used at two locations: For the definition of the Job type which is used for the job queue and for the return type of submit. The latter is not a real problem because it is restricted to the submit method which could be written as a polymorph template method. Hence, the main problem remains our job queue:

   using Job = std::packaged_task<T()>;
   std::list<Job> jobs;

The interesting point is here that the function invocation of an object of type std::packaged_task<T()> is parameterless and returns just void. It is thereby independent of T. Hence, it appears straightforward to package std::packaged_task<T> objects within objects of type std::function<void()>. We would thereby remove the last reference to T and we could remove the template parameter T from our thread pool class.

Unfortunately, it is not that easy as std::function insists that its function object is copy-constructible. However, std::packaged_task objects do not support copy construction, just move construction. This is due to the restriction that a promise is just move-constructible but not copy-constructible.

Such a problem can be solved by hiding such an object without copy constructor behind a pointer. Pointers can be easily copied without restriction and the object behind the pointer is left alone without copy- or move-constructing it.

This, however, introduces a new problem as we need to decide when an object that is shared among multiple pointers can be freed. As usual, an RAII class is required for this task. Fortunately, RAII classes exists already for pointers in the C++11 standard library -- they are called smart pointers. What we need here is std::shared_ptr<T> from <memory> that is copy-constructible and supports multiple pointers that refer to the same object of type T. When the last shared pointer is destructed, the object is freed. This is done by maintaining a shared reference counter for an object that equals the number of living std::shared_ptr<T> objects that point to the same object. And all this is even thread-safe.

As smart pointers should not be intermixed with regular pointers on the same object, the library offers a std::make_shared operation that combines the new T operator and the packaging in a std::shared_ptr<T> object in an exception-safe way.

Now we have all we need. Firstly, we can have a job list of type std::function<void()> which has been renamed to tasks:

std::list<std::function<void()>> tasks;

The submit method can be now implemented as follows:

template<typename F>
auto submit(F&& task_function)
      -> std::future<decltype(task_function())> {
   using T = decltype(task_function());
   auto task = std::make_shared<std::packaged_task<T()>>(
      std::forward<F>(task_function));
   std::future<T> result = task->get_future();
   std::lock_guard<std::mutex> lock(mutex);
   tasks.push_back([task]() { (*task)(); } );
   cv.notify_one();
   return result;
}

Note that F is given as template parameter but F&& as parameter type. While F&& expects usually an rvalue reference, the behaviour is different if F is actually a template parameter. In this case, the actual parameter may be an lvalue or rvalue reference. When we pass on this parameter, we would like to std::move it in case of an rvalue parameter but we shall not do this in case of an lvalue parameter. This problem is solved by std::forward which choses the appropriate variant. This combination is called perfect forwarding in C++ as this avoid unnecessary copying operations whenever possible.

You will find this version in the new edition of the lecture library in the directory /home/numerik/pub/hpc/ws18/session18 on our computers. The thread pool is accessible via <hpc/mt/thread_pool.hpp> and the previously introduced RandomEnginePool can be included from <hpc/aux/repool.hpp>.

Exercises

Source code

You are free to use the last version from the 16th session. Just remove the code for the RandomEnginePool and ThreadPool classes and use instead the library versions from <hpc/aux/repool.hpp> and <hpc/mt/thread_pool.hpp, respectively.

Please note that <hpc/aux/slices.hpp> has been updated. Slices has been renamed to UniformSlices and foreach_slice needs a template parameter. The construct foreach_slice<UniformSlices> is equivalent to the previous foreach_slice.

Below you will find an updated version of random_init13.cpp that works with the new version of <hpc/aux/slices.hpp>.

#include <condition_variable>
#include <cstdlib>
#include <functional>
#include <list>
#include <mutex>
#include <random>
#include <thread>
#include <utility>
#include <vector>
#include <hpc/aux/slices.hpp>
#include <hpc/matvec/gematrix.hpp>
#include <hpc/matvec/iterators.hpp>
#include <hpc/matvec/print.hpp>

using namespace hpc;
using namespace hpc::matvec;
using namespace hpc::aux;

template<typename T>
struct RandomEnginePool {
      using EngineType = T;
      RandomEnginePool(std::size_t size) :
	    size(size), nof_free_engines(size),
	    inuse(size), engines(size) {
	 std::random_device r;
	 for (std::size_t i = 0; i < size; ++i) {
	    engines[i].seed(r()); inuse[i] = false;
	 }
      }
      T& get() {
	 std::unique_lock<std::mutex> lock(mutex);
	 while (nof_free_engines == 0) {
	    cv.wait(lock);
	 }
	 for (std::size_t i = 0; i < size; ++i) {
	    if (!inuse[i]) {
	       inuse[i] = true; --nof_free_engines;
	       return engines[i];
	    }
	 }
	 assert(false);
	 #pragma GCC diagnostic ignored "-Wreturn-type"
      }
      void free(T& engine) {
	 {
	    std::unique_lock<std::mutex> lock(mutex);
	    bool found = false;
	    for (std::size_t i = 0; i < size; ++i) {
	       if (&engine == &engines[i]) {
		  inuse[i] = false; ++nof_free_engines;
		  found = true; break;
	       }
	    }
	    assert(found);
	 }
	 cv.notify_one();
      }
   private:
      std::mutex mutex;
      std::condition_variable cv;
      std::size_t size;
      std::size_t nof_free_engines;
      std::vector<bool> inuse;
      std::vector<T> engines;
};

template<typename T>
struct RandomEngineGuard {
   using EngineType = T;
   RandomEngineGuard(RandomEnginePool<T>& pool) :
      pool(pool), engine(pool.get()) {
   }
   ~RandomEngineGuard() {
      pool.free(engine);
   }
   T& get() {
      return engine;
   }
   RandomEnginePool<T>& pool;
   T& engine;
};

template <
   template<typename> class MatrixA, typename T,
   typename POOL,
   Require< Ge<MatrixA<T>> > = true
>
void randomInit(MatrixA<T>& A, POOL& pool) {
   using EngineType = typename POOL::EngineType;
   RandomEngineGuard<EngineType> guard(pool);
   std::uniform_real_distribution<double> uniform(-100, 100);
   auto& engine = guard.get();

   for (auto [i, j, Aij]: A) {
      Aij = uniform(engine);
      (void) i; (void) j; // suppress gcc warning
   }
}

struct ThreadPool {
   public:
      using Job = std::function<void()>;
      ThreadPool(unsigned int nof_threads) :
            active(0), finished(false), threads(nof_threads) {
         for (auto& t: threads) {
            t = std::thread([=]() { process_jobs(); });
         }
      }
      ~ThreadPool() {
         {
            std::unique_lock<std::mutex> lock(mutex);
            finished = true;
         }
         cv.notify_all();
         for (auto& t: threads) {
            t.join();
         }
      }
      void submit(Job&& job) {
         std::unique_lock<std::mutex> lock(mutex);
         jobs.push_back(std::move(job));
         cv.notify_one();
      }
   private:
      unsigned int active;
      bool finished;
      std::vector<std::thread> threads;
      std::mutex mutex;
      std::condition_variable cv;
      std::list<Job> jobs;

      void process_jobs() {
         for(;;) {
            Job job;
            /* fetch job */
            {
               std::unique_lock<std::mutex> lock(mutex);
               while (jobs.empty() && (active > 0 || !finished)) {
                  cv.wait(lock);
               }
               if (jobs.empty()) break;
               job = std::move(jobs.front());
               jobs.pop_front();
               ++active;
            }
            /* execute job */
            job();
            {
               std::unique_lock<std::mutex> lock(mutex);
               --active;
            }
         }
         /* if one thread finishes, all others have to finish as well */
         cv.notify_all();
      }
};

struct JobStatus {
   public:
      JobStatus() : finished(false) {
      }
      void done() {
         std::unique_lock<std::mutex> lock(mutex);
         assert(!finished);
         finished = true;
         cv.notify_all();
      }
      void wait() {
         std::unique_lock<std::mutex> lock(mutex);
         if (!finished) {
            cv.wait(lock);
         }
      }
   private:
      std::mutex mutex;
      std::condition_variable cv;
      bool finished;
};

template <
   template<typename> class MatrixA, typename T,
   typename RandomEnginePool, typename ThreadPool,
   Require< Ge<MatrixA<T>> > = true
>
void randomInit(MatrixA<T>& A, RandomEnginePool& repool, ThreadPool& tpool,
      std::size_t maxRows,
      std::size_t maxCols) {
   std::size_t nof_row_slices = A.numRows() / maxRows;
   if (A.numRows() % maxRows) ++nof_row_slices;
   std::size_t nof_col_slices = A.numCols() / maxCols;
   if (A.numCols() % maxCols) ++nof_col_slices;

   std::vector<JobStatus> job_status(nof_row_slices * nof_col_slices);

   std::size_t job_index = 0;
   foreach_slice<UniformSlices>(nof_row_slices, A.numRows(), [&]
         (std::size_t rows_offset, std::size_t rows_size) {
      foreach_slice<UniformSlices>(nof_col_slices, A.numCols(),
	    [&,rows_offset,rows_size]
            (std::size_t cols_offset, std::size_t cols_size) {
         tpool.submit([=, &A, &job_status, &repool]() mutable {
	    auto A_ = A.view(rows_offset, cols_offset, rows_size, cols_size);
            randomInit(A_, repool);
            job_status[job_index].done();
         });
         ++job_index;
      });
   });

   for (std::size_t index = 0; index < job_index; ++index) {
      job_status[index].wait();
   }
}

int main() {
   unsigned int nof_threads = std::thread::hardware_concurrency();
   RandomEnginePool<std::mt19937> repool(nof_threads);
   ThreadPool tpool(nof_threads);

   GeMatrix<double> A(51, 7);
   randomInit(A, repool, tpool, 8, 8);
   print(A, " %7.2f");
}