#include <condition_variable>
#include <cstdlib>
#include <list>
#include <mutex>
#include <random>
#include <thread>
#include <utility>
#include <vector>
#include <hpc/matvec/gematrix.h>
#include <hpc/matvec/apply.h>
#include <hpc/matvec/print.h>
#include <hpc/aux/slices.h>
template<typename T>
struct RandomEnginePool {
using EngineType = T;
RandomEnginePool(std::size_t size) :
size(size), nof_free_engines(size),
inuse(size), engines(size) {
std::random_device r;
for (std::size_t i = 0; i < size; ++i) {
engines[i].seed(r()); inuse[i] = false;
}
}
T& get() {
std::unique_lock<std::mutex> lock(mutex);
if (nof_free_engines == 0) {
cv.wait(lock);
}
for (std::size_t i = 0; i < size; ++i) {
if (!inuse[i]) {
inuse[i] = true; --nof_free_engines;
return engines[i];
}
}
assert(false);
}
void free(T& engine) {
{
std::unique_lock<std::mutex> lock(mutex);
bool found = false;
for (std::size_t i = 0; i < size; ++i) {
if (&engine == &engines[i]) {
inuse[i] = false; ++nof_free_engines;
found = true; break;
}
}
assert(found);
}
cv.notify_one();
}
private:
std::mutex mutex;
std::condition_variable cv;
std::size_t size;
std::size_t nof_free_engines;
std::vector<bool> inuse;
std::vector<T> engines;
};
template<typename T>
struct RandomEngineGuard {
using EngineType = T;
RandomEngineGuard(RandomEnginePool<T>& pool) : pool(pool), engine(pool.get()) {
}
~RandomEngineGuard() {
pool.free(engine);
}
T& get() {
return engine;
}
RandomEnginePool<T>& pool;
T& engine;
};
template<typename MA, typename Pool>
typename std::enable_if<hpc::matvec::IsRealGeMatrix<MA>::value, void>::type
randomInit(MA& A, Pool& pool) {
using ElementType = typename MA::ElementType;
using Index = typename MA::Index;
using EngineType = typename Pool::EngineType;
std::uniform_real_distribution<double> uniform(-100, 100);
RandomEngineGuard<EngineType> guard(pool);
auto& engine(guard.get());
hpc::matvec::apply(A, [&](ElementType& val, Index i, Index j) -> void {
val = uniform(engine);
});
}
struct ThreadPool {
public:
using Job = std::function<void()>;
ThreadPool(unsigned int nof_threads) :
nof_threads(nof_threads), active(0),
finished(false), threads(nof_threads) {
for (auto& t: threads) {
t = std::thread([=]() { process_jobs(); });
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mutex);
finished = true;
}
cv.notify_all();
for (auto& t: threads) {
t.join();
}
}
void submit(Job job) {
std::unique_lock<std::mutex> lock(mutex);
jobs.push_back(std::move(job));
cv.notify_one();
}
private:
unsigned int nof_threads;
unsigned int active;
bool finished;
std::vector<std::thread> threads;
std::mutex mutex;
std::condition_variable cv;
std::list<Job> jobs;
void process_jobs() {
for(;;) {
Job job;
/* fetch job */
{
std::unique_lock<std::mutex> lock(mutex);
while (jobs.empty() && (active > 0 || !finished)) {
cv.wait(lock);
}
if (jobs.empty() && active == 0 && finished) break;
job = std::move(jobs.front());
jobs.pop_front();
++active;
}
/* execute job */
job();
{
std::unique_lock<std::mutex> lock(mutex);
--active;
}
}
/* if one thread finishes, all others have to finish as well */
cv.notify_all();
}
};
struct JobStatus {
public:
JobStatus() : finished(false) {
}
void done() {
std::unique_lock<std::mutex> lock(mutex);
assert(!finished);
finished = true;
cv.notify_all();
}
void wait() {
std::unique_lock<std::mutex> lock(mutex);
if (!finished) {
cv.wait(lock);
}
}
private:
std::mutex mutex;
std::condition_variable cv;
bool finished;
};
template<typename MA, typename RandomEnginePool, typename ThreadPool>
typename std::enable_if<hpc::matvec::IsRealGeMatrix<MA>::value, void>::type
randomInit(MA& A, RandomEnginePool& repool, ThreadPool& tpool,
typename MA::Index maxRows,
typename MA::Index maxCols) {
using ElementType = typename MA::ElementType;
using Index = typename MA::Index;
using namespace hpc::aux;
Index nof_row_slices = A.numRows / maxRows;
if (A.numRows % maxRows) ++nof_row_slices;
Index nof_col_slices = A.numCols / maxCols;
if (A.numCols % maxCols) ++nof_col_slices;
std::vector<JobStatus> job_status(nof_row_slices * nof_col_slices);
Index job_index = 0;
foreach_slice(nof_row_slices, A.numRows, [&]
(Index rows_offset, Index rows_size) {
foreach_slice(nof_col_slices, A.numCols, [&,rows_offset,rows_size]
(Index cols_offset, Index cols_size) {
auto A_ = A(rows_offset, cols_offset, rows_size, cols_size);
tpool.submit([=, &job_status, &repool]() mutable {
randomInit(A_, repool);
job_status[job_index].done();
});
++job_index;
});
});
for (auto& js: job_status) {
js.wait();
}
}
int main() {
using namespace hpc::matvec;
using namespace hpc::aux;
unsigned int nof_threads = std::thread::hardware_concurrency();
RandomEnginePool<std::mt19937> repool(nof_threads);
ThreadPool tpool(nof_threads);
GeMatrix<double> A(51, 7);
randomInit(A, repool, tpool, 8, 8);
print(A, "A");
}