#ifndef RENDEZVOUS_HPP #define RENDEZVOUS_HPP #include #include #include #include #include #include #include #include #include struct TerminationException: public std::exception { public: virtual const char* what() const throw() { return "thread has been terminated"; } }; template class Rendezvous { private: struct Member { Member() {}; Member(const Entry& e, const Request& r) : entry(e), req(r), done(new std::condition_variable()) { }; Member(const Member& other) : entry(other.entry), req(other.req), done(other.done) {}; Entry entry; Request req; std::shared_ptr done; }; bool terminating; std::mutex mutex; std::condition_variable submitted; typedef std::list Queue; typedef std::map Requests; Requests requests; public: typedef std::set EntrySet; class Accept { public: Accept(Rendezvous& rv, Entry entry, Request& request) throw(TerminationException) : lock(rv.mutex) { typename Requests::iterator it; for(;;) { it = rv.requests.find(entry); if (it != rv.requests.end()) break; rv.submitted.wait(lock); if (rv.terminating) throw TerminationException(); } member = it->second.front(); request = member.req; it->second.pop_front(); if (it->second.empty()) { rv.requests.erase(it); } } ~Accept() { member.done->notify_all(); } private: Member member; std::unique_lock lock; }; Rendezvous() : terminating(false) { } ~Rendezvous() { terminate(); } void terminate() { terminating = true; submitted.notify_all(); } void connect(Entry entry, Request request) { Member member(entry, request); // submit request std::unique_lock lock(mutex); requests[entry].push_back(member); submitted.notify_all(); // wait for its completion member.done->wait(lock); } Entry select(const EntrySet& entries) throw(TerminationException) { /* we deadlock, if there are no entries */ assert(entries.size() > 0); std::unique_lock lock(mutex); for(;;) { for (auto& entry: entries) { if (requests.find(entry) != requests.end()) { return entry; } } submitted.wait(lock); if (terminating) throw TerminationException(); } } }; template class RendezvousTask { public: RendezvousTask() { } virtual ~RendezvousTask() { /* initiate termination of the thread associated to the Rendezvous object ... */ rv.terminate(); /* ... and wait for its completion */ t.join(); } virtual void operator()() = 0; class Thread { public: Thread(RendezvousTask& _rt) : rt(_rt) { } void operator()() { try { rt(); } catch (TerminationException& e) { /* ok, simply return */ } } protected: RendezvousTask& rt; }; protected: void start() { t = std::thread(Thread(*this)); } Rendezvous rv; private: // associated thread std::thread t; }; #endif