#ifndef RV_RINGBUFFER_H #define RV_RINGBUFFER_H #include #include #include #include "rendezvous.hpp" enum Entry {RingBufferRead, RingBufferWrite}; template struct Request { Request() : itemptr(0) {}; Request(T* ip) : itemptr(ip) {}; T* itemptr; }; template< typename T, typename RT = Request > class RingBuffer: RendezvousTask { public: RingBuffer(unsigned int size) : read_index(0), write_index(0), filled(0), buf(size) { assert(size > 0); this->start(); } void write(T item) { this->rv.connect(RingBufferWrite, RT(&item)); } void read(T& item) { this->rv.connect(RingBufferRead, RT(&item)); } virtual void operator()() { for(;;) { std::set entries; if (filled > 0) { entries.insert(RingBufferRead); } if (filled < buf.capacity()) { entries.insert(RingBufferWrite); } Entry entry = this->rv.select(entries); switch (entry) { case RingBufferRead: { RT request; typename Rendezvous::Accept(this->rv, RingBufferRead, request); *(request.itemptr) = buf[read_index]; read_index = (read_index + 1) % buf.capacity(); --filled; } break; case RingBufferWrite: { RT request; typename Rendezvous::Accept(this->rv, RingBufferWrite, request); buf[write_index] = *(request.itemptr); write_index = (write_index + 1) % buf.capacity(); ++filled; } break; } } } private: unsigned int read_index; unsigned int write_index; unsigned int filled; std::vector buf; }; #endif