Evo C++ Library v0.5.1
event.h
Go to the documentation of this file.
1 // Evo C++ Library
2 /* Copyright 2019 Justin Crowell
3 Distributed under the BSD 2-Clause License -- see included file LICENSE.txt for details.
4 */
6 
7 #pragma once
8 #ifndef INCL_evo_event_h
9 #define INCL_evo_event_h
10 
11 #include "atomic.h"
12 
13 #if defined(EVO_CPP11)
14  #include <functional>
15 #endif
16 
17 namespace evo {
22 
24 
31 struct Event {
34  { }
35 
37  virtual ~Event()
38  { }
39 
47  virtual bool operator()() = 0;
48 };
49 
51 
52 #if defined(EVO_CPP11)
53 
59 class EventLambda : public Event {
60 public:
61  typedef std::function<bool()> Lambda;
62 
66  EventLambda(const Lambda& lambda) : lambda_(lambda) {
67  }
68 
72  EventLambda(const EventLambda& src) : lambda_(src.lambda_) {
73  }
74 
80  lambda_ = src.lambda_;
81  return *this;
82  }
83 
84  // Doc by parent
85  bool operator()() {
86  return lambda_();
87  }
88 
89 private:
90  Lambda lambda_;
91 };
92 
93 #endif
94 
96 
145 template<class T=Event>
146 class EventQueue {
147 public:
148  typedef T EventT;
149  typedef uint Size;
150 
151  static const Size DEFAULT_SIZE = 256;
152 
156  EventQueue(Size size=DEFAULT_SIZE) {
157  ringbuf_size_ = adjust_size(size);
158  ringbuf_ = new T*[ringbuf_size_];
159  ringbuf_size_mask_ = ringbuf_size_ - 1; // mask: set all bits up to size
160  memset(ringbuf_, 0, sizeof(T*) * ringbuf_size_);
161  next_pos_.store(1);
162  read_pos_.store(1);
163  }
164 
169  if (read_pos_.load() <= cursor_pos_.load()) {
170  assert( false ); // Queue should be empty
171  }
172  delete [] ringbuf_;
173  }
174 
185  void add(T* event, ulongl spinwait_ns=1) {
186  // Claim a slot and wait for available capacity
187  const uint64 seq = next_pos_.fetch_add(1, EVO_ATOMIC_ACQ_REL);
188  while (seq - read_pos_.load(EVO_ATOMIC_ACQUIRE) >= ringbuf_size_)
189  sleepns(spinwait_ns);
190 
191  // Store event in queue
193  ringbuf_[seq & ringbuf_size_mask_] = event;
195 
196  // Wait for cursor to reach previous slot, then increment cursor to commit the write
197  const uint64 prev_seq = seq - 1;
198  while (!cursor_pos_.compare_set(prev_seq, seq, EVO_ATOMIC_ACQ_REL, EVO_ATOMIC_ACQUIRE))
199  sleepns(spinwait_ns);
200  }
201 
210  template<class U>
211  void notify_multiwait(U& condmutex) {
212  if (condmutex.trylock()) { // non-blocking
213  condmutex.notify();
214  condmutex.unlock();
215  }
216  }
217 
225  bool process() {
226  T* event;
227  uint64 start, seq;
228  start = seq = read_pos_.load(EVO_ATOMIC_ACQUIRE);
229  while (seq <= cursor_pos_.load(EVO_ATOMIC_ACQUIRE)) {
230  event = ringbuf_[seq & ringbuf_size_mask_];
231  seq = read_pos_.fetch_add(1, EVO_ATOMIC_ACQ_REL) + 1;
232  if ((*event)())
233  delete event;
234  }
235  return (seq > start);
236  }
237 
246  template<class U>
247  bool process_multi(U& mutex) {
248  T* event;
249  uint64 seq, count = 0;
250  typename U::Lock lock(mutex);
251  seq = read_pos_.load(EVO_ATOMIC_RELAXED);
252  for (; seq <= cursor_pos_.load(EVO_ATOMIC_ACQUIRE); ++count) {
253  event = ringbuf_[seq & ringbuf_size_mask_];
254  read_pos_.fetch_add(1, EVO_ATOMIC_RELEASE);
255  lock.unlock();
256  if ((*event)())
257  delete event;
258  lock.lock();
259  seq = read_pos_.load(EVO_ATOMIC_RELAXED);
260  }
261  lock.unlock();
262  return (count > 0);
263  }
264 
276  template<class U>
277  void process_multiwait(U& condmutex, AtomicInt& stopflag, ulong waitms=1) {
278  T* event;
279  uint64 seq;
280  typename U::Lock lock(condmutex);
281  for (;;) {
282  seq = read_pos_.load(EVO_ATOMIC_RELAXED);
283  for (; seq <= cursor_pos_.load(EVO_ATOMIC_ACQUIRE);) {
284  event = ringbuf_[seq & ringbuf_size_mask_];
285  read_pos_.fetch_add(1, EVO_ATOMIC_RELEASE);
286  lock.unlock();
287  if ((*event)())
288  delete event;
289  lock.lock();
290  seq = read_pos_.load(EVO_ATOMIC_RELAXED);
291  }
292  if (stopflag.load(EVO_ATOMIC_RELAXED))
293  break;
294  condmutex.wait(waitms, true);
295  }
296  lock.unlock();
297  }
298 
299 private:
300  // Disable copying
301  EventQueue(const EventQueue&);
302  EventQueue& operator=(const EventQueue&);
303 
304  // Ring buffer
305  T** ringbuf_;
306  Size ringbuf_size_; // Must be a power of 2 for mask to work
307  Size ringbuf_size_mask_; // Mask for faster modulus
308 
309  // Positions increase to infinity (index = pos % ringbuf_size_), would take hundreds of years to max out 64 bits
310  AtomicUInt64 cursor_pos_; // Position of latest item committed to queue
311  AtomicUInt64 next_pos_; // Next write position in queue (cursor + 1 when no add() in progress)
312  AtomicUInt64 read_pos_; // Position of next item to read from queue (cursor + 1 when queue is empty)
313 
314  // Make sure size is within min/max and is a power of 2
315  static Size adjust_size(Size size) {
316  const Size MIN_SIZE = 16;
317  const Size MAX_SIZE = (std::numeric_limits<Size>::max() >> 1) + 1;
318  if (size <= MIN_SIZE)
319  size = MIN_SIZE;
320  else if (size >= MAX_SIZE)
321  size = MAX_SIZE;
322  else
323  size = next_pow2(size);
324  return size;
325  }
326 };
327 
329 
330 }
331 #endif
T & max(T &a, T &b)
Returns highest of given values.
Definition: alg.h:47
EventLambda & operator=(const EventLambda &src)
Assignment operator.
Definition: event.h:79
#define EVO_ATOMIC_ACQ_REL
Combined "acquire" & "release" level memory barrier.
Definition: atomic.h:33
virtual bool operator()()=0
Event function.
void notify_multiwait(U &condmutex)
Notify an item has been added with multiple consumer threads.
Definition: event.h:211
Implement Event using a lambda function (C++11).
Definition: event.h:59
#define EVO_ATOMIC_FENCE(MEM_ORDER)
Sets a memory fence/barrier.
Definition: atomic.h:52
T next_pow2(T v)
Get next power of 2 equal to or greater than given number.
Definition: type.h:1932
bool operator()()
Event function.
Definition: event.h:85
#define EVO_ATOMIC_RELAXED
Relaxed memory ordering, used between start/end memory barriers.
Definition: atomic.h:21
void process_multiwait(U &condmutex, AtomicInt &stopflag, ulong waitms=1)
Process queued events until stopflag is set, allowing multiple consumer threads, and waiting with con...
Definition: event.h:277
virtual ~Event()
Destructor.
Definition: event.h:37
bool process_multi(U &mutex)
Process queued events and return, allowing multiple consumer threads.
Definition: event.h:247
#define EVO_ATOMIC_ACQUIRE
Start "acquire" memory ordering barrier, usually followed by a matching "release" barrier...
Definition: atomic.h:27
uint Size
Queue size type.
Definition: event.h:149
Evo atomic types.
Event()
Constructor.
Definition: event.h:33
T EventT
Event type used
Definition: event.h:148
Evo C++ Library namespace.
Definition: alg.h:11
#define EVO_ATOMIC_RELEASE
Release (end) memory ordering barrier started with "consume" or "acquire" barrier.
Definition: atomic.h:30
Event base type used with EventQueue.
Definition: event.h:31
bool sleepns(ulongl nsec)
Sleep for number of nanoseconds.
Definition: sys.h:706
void add(T *event, ulongl spinwait_ns=1)
Add an event to queue.
Definition: event.h:185
bool process()
Process queued events and return.
Definition: event.h:225
~EventQueue()
Destructor.
Definition: event.h:168
EventLambda(const EventLambda &src)
Copy constructor.
Definition: event.h:72
EventLambda(const Lambda &lambda)
Constructor.
Definition: event.h:66
EventQueue(Size size=DEFAULT_SIZE)
Constructor.
Definition: event.h:156
T load(MemOrder mem_order=std::memory_order_seq_cst) const
Load and return current value.
std::function< bool()> Lambda
Lambda function type for Event.
Definition: event.h:61
Lock-free event processing queue.
Definition: event.h:146