Evo C++ Library v0.5.1
atomic_buffer_queue.h
Go to the documentation of this file.
1 // Evo C++ Library
2 /* Copyright 2019 Justin Crowell
3 Distributed under the BSD 2-Clause License -- see included file LICENSE.txt for details.
4 */
6 
7 #pragma once
8 #ifndef INCL_evo_atomic_buffer_queue_h
9 #define INCL_evo_atomic_buffer_queue_h
10 
11 #include "atomic.h"
12 
13 namespace evo {
16 
18 
58 template<class T, class TSize=SizeT>
60 public:
62  typedef TSize Size;
63  typedef T Item;
64 
65  static const Size DEFAULT_SIZE = 128;
66 
70  AtomicBufferQueue(Size size=DEFAULT_SIZE) {
71  size = size_pow2(size);
72  buf_ = new Item[size];
73  size_ = size;
74  size_mask_ = size - 1;
75  next_pos_.store(1);
76  read_pos_.store(1);
77  }
78 
81  delete [] buf_;
82  }
83 
89  Size size() const {
90  return size_;
91  }
92 
98  Size used() const {
99  const uint64 cursor = cursor_pos_.load(EVO_ATOMIC_ACQUIRE);
100  const uint64 read = read_pos_.load(EVO_ATOMIC_ACQUIRE);
101  return (Size)(cursor < read ? 0 : cursor + 1 - read);
102  }
103 
109  bool empty() const {
110  return (cursor_pos_.load(EVO_ATOMIC_ACQUIRE) < read_pos_.load(EVO_ATOMIC_ACQUIRE));
111  }
112 
118  bool full() const {
119  return (used() >= size_);
120  }
121 
125  void clear() {
126  next_pos_.store(1, EVO_ATOMIC_RELEASE);
127  cursor_pos_.store(0, EVO_ATOMIC_RELEASE);
128  read_pos_.store(1, EVO_ATOMIC_RELEASE);
129  }
130 
141  void add(typename DataCopy<Item>::PassType item) {
142  // Claim a slot and wait for available capacity
143  const uint64 seq = next_pos_.fetch_add(1, EVO_ATOMIC_ACQ_REL);
144  while (seq - read_pos_.load(EVO_ATOMIC_ACQUIRE) >= size_)
145  sleepus(1);
146 
147  // Store event in queue
150  buf_[seq & size_mask_] = item;
153 
154  // Wait for cursor to reach previous slot, then increment cursor to commit the write
155  const uint64 prev_seq = seq - 1;
156  while (!cursor_pos_.compare_set(prev_seq, seq, EVO_ATOMIC_ACQ_REL, EVO_ATOMIC_ACQUIRE))
157  sleepus(1);
158  }
159 
172  Item& add_start(uint64& seq) {
173  // Claim a slot and wait for available capacity
174  seq = next_pos_.fetch_add(1, EVO_ATOMIC_ACQ_REL);
175  while (seq - read_pos_.load(EVO_ATOMIC_ACQUIRE) >= size_)
176  sleepus(1);
177 
178  // Return event, user will set it
180  return buf_[seq & size_mask_];
181  }
182 
191  void add_commit(uint64 seq) {
192  // Wait for cursor to reach previous slot, then increment cursor to commit the write
194  const uint64 prev_seq = seq - 1;
195  while (!cursor_pos_.compare_set(prev_seq, seq, EVO_ATOMIC_ACQ_REL, EVO_ATOMIC_ACQUIRE))
196  sleepus(1);
197  }
198 
209  bool pop(Item& item) {
210  uint64 seq;
211  seq = read_pos_.load(EVO_ATOMIC_ACQUIRE);
212  if (seq <= cursor_pos_.load(EVO_ATOMIC_ACQUIRE)) {
213  item = buf_[seq & size_mask_];
214  read_pos_.fetch_add(1, EVO_ATOMIC_RELEASE);
215  return true;
216  }
217  return false;
218  }
219 
220 private:
221  // Disable copying
222  AtomicBufferQueue(const This&);
223  This& operator=(const This&);
224 
225  T* buf_;
226  Size size_; // Must be a power of 2 for mask to work
227  Size size_mask_; // Mask for faster modulus
228 
229  // Positions increase to infinity (index = pos % ringbuf_size_), would take hundreds of years to max out 64 bits
230  AtomicUInt64 cursor_pos_; // Position of latest item committed to queue
231  AtomicUInt64 next_pos_; // Next write position in queue (cursor + 1 when no add() in progress)
232  AtomicUInt64 read_pos_; // Position of next item to read from queue (cursor + 1 when queue is empty)
233 };
234 
236 
237 }
238 #endif
#define EVO_EXCEPTION_GUARD_START
Start exception guard (try block).
Definition: container.h:52
void store(T num, MemOrder mem_order=std::memory_order_seq_cst)
Store new value.
bool pop(Item &item)
Pop oldest item from queue.
Definition: atomic_buffer_queue.h:209
#define EVO_ATOMIC_ACQ_REL
Combined "acquire" & "release" level memory barrier.
Definition: atomic.h:33
#define EVO_EXCEPTION_GUARD_END
End exception guard, catch and abort().
Definition: container.h:55
Item & add_start(uint64 &seq)
Start adding item to queue directly.
Definition: atomic_buffer_queue.h:172
Size used() const
Get used item count.
Definition: atomic_buffer_queue.h:98
bool full() const
Get whether queue is full.
Definition: atomic_buffer_queue.h:118
~AtomicBufferQueue()
Destructor.
Definition: atomic_buffer_queue.h:80
#define EVO_ATOMIC_FENCE(MEM_ORDER)
Sets a memory fence/barrier.
Definition: atomic.h:52
Size size() const
Get buffer size.
Definition: atomic_buffer_queue.h:89
T Item
Item type.
Definition: atomic_buffer_queue.h:63
bool compare_set(T cmpval, T newval, MemOrder mem_order_success=std::memory_order_seq_cst, MemOrder mem_order_failure=std::memory_order_acquire)
Compare and set, storing new value if comparison matches.
Definition: atomic.h:332
#define EVO_ATOMIC_ACQUIRE
Start "acquire" memory ordering barrier, usually followed by a matching "release" barrier...
Definition: atomic.h:27
TSize Size
Queue size integer type (always unsigned)
Definition: atomic_buffer_queue.h:62
void clear()
Clear all items from queue, making it empty.
Definition: atomic_buffer_queue.h:125
Evo atomic types.
static const Size DEFAULT_SIZE
Default size to use.
Definition: atomic_buffer_queue.h:65
Evo C++ Library namespace.
Definition: alg.h:11
#define EVO_ATOMIC_RELEASE
Release (end) memory ordering barrier started with "consume" or "acquire" barrier.
Definition: atomic.h:30
void add_commit(uint64 seq)
Commit adding an item.
Definition: atomic_buffer_queue.h:191
T fetch_add(T num, MemOrder mem_order=std::memory_order_seq_cst)
Add number to value and return the previous value.
AtomicBufferQueue(Size size=DEFAULT_SIZE)
Constructor, sets buffer size.
Definition: atomic_buffer_queue.h:70
Fast buffer-based queue, implemented with a ring-buffer.
Definition: atomic_buffer_queue.h:59
AddConst< T >::Type & PassType
Most efficient type for passing as parameter (const-reference or POD value).
Definition: container.h:551
void add(typename DataCopy< Item >::PassType item)
Add item to queue.
Definition: atomic_buffer_queue.h:141
bool sleepus(ulongl usec)
Sleep for number of microseconds.
Definition: sys.h:679
T load(MemOrder mem_order=std::memory_order_seq_cst) const
Load and return current value.
AtomicBufferQueue< T, TSize > This
This type
Definition: atomic_buffer_queue.h:61
Size size_pow2(Size size, Size minsize=2)
Get size as power of 2.
Definition: type.h:1951
bool empty() const
Get whether queue is empty.
Definition: atomic_buffer_queue.h:109