Evo C++ Library v0.5.1
ioasync_base.h
Go to the documentation of this file.
1 // Evo C++ Library
2 /* Copyright 2019 Justin Crowell
3 Distributed under the BSD 2-Clause License -- see included file LICENSE.txt for details.
4 */
6 
7 #pragma once
8 #ifndef INCL_evo_ioasync_base_h
9 #define INCL_evo_ioasync_base_h
10 
11 #include "iosock.h"
12 #include "event.h"
13 #include "thread.h"
14 #include "string.h"
15 #include "substring.h"
16 #include "atomic_buffer_queue.h"
17 #include "logger.h"
18 
19 // Requires libevent 2.0+
20 #if !defined(LIBEVENT_VERSION_NUMBER)
21  #include <event2/event.h>
22  #include <event2/buffer.h>
23  #include <event2/bufferevent.h>
24  #include <event2/thread.h>
25 #endif
26 #define EVO_LIBEVENT_MINVER 0x02000000
27 #if LIBEVENT_VERSION_NUMBER < EVO_LIBEVENT_MINVER
28  #error Evo Async I/O requires libevent 2.0+
29 #endif
30 #if !defined(EVO_ASYNC_MULTI_THREAD)
31 
35  #define EVO_ASYNC_MULTI_THREAD 0
36 #endif
37 
38 #if defined(EVO_MSVC_YEAR)
39  #pragma comment(lib, "libevent_core.lib")
40  #pragma comment(lib, "advapi32.lib")
41 #endif
42 
43 namespace evo {
46 
48 
52 public:
53  typedef struct event_base* Handle;
54 
57  static FirstInitHelper first_init_helper;
58  evbase_ = ::event_base_new();
59  if (evbase_ == NULL)
60  abort(); // This shouldn't happen
61  }
62 
65  ::event_base_free(evbase_);
66  }
67 
71  Handle handle()
72  { return evbase_; }
73 
77  bool active() const {
78  return !shutdown_.load(EVO_ATOMIC_ACQUIRE);
79  }
80 
88  bool run1() {
89  int result = ::event_base_loop(evbase_, EVLOOP_ONCE);
90  if (result < 0)
91  return false;
92  else if (result > 0)
93  shutdown_.store(1, EVO_ATOMIC_RELEASE);
94  return true;
95  }
96 
98  void shutdown() {
99  if (shutdown_.compare_set(0, 1, EVO_ATOMIC_ACQ_REL, EVO_ATOMIC_ACQUIRE)) {
100  if (::event_base_loopexit(evbase_, NULL) != 0)
101  abort(); // This should never happen
102  }
103  }
104 
105 private:
106  struct FirstInitHelper {
107  FirstInitHelper() {
108  static Mutex mutex;
109  Mutex::Lock lock(mutex);
110  if (::event_get_version_number() < EVO_LIBEVENT_MINVER)
111  abort(); // Abort if libevent is too old
112  #if EVO_ASYNC_MULTI_THREAD
113  int result =
114  #if defined(_WIN32)
115  ::evthread_use_windows_threads();
116  #else
117  ::evthread_use_pthreads();
118  #endif
119  assert( result == 0 );
120  if (result != 0)
121  abort(); // Abort if can't initialize libevent for multithreaded
122  #endif
123  }
124  };
125 
126  Handle evbase_;
127  AtomicInt shutdown_;
128 };
129 
131 
135 public:
142  class BulkWrite {
143  public:
145  parent_ = NULL;
146  ptr_ = end_ = NULL;
147  }
148 
149  BulkWrite(String& str, SizeT size) {
150  init(str, size);
151  }
152 
153  BulkWrite(AsyncBuffers& parent, size_t size) {
154  init(parent, size);
155  }
156 
158  assert( ptr_ == end_ );
159  }
160 
161  BulkWrite& init(String& str, SizeT size) {
162  const SizeT used = str.used();
163  ptr_ = str.advBuffer(used + size) + used;
164  end_ = ptr_ + size;
165  str.advSize(size);
166  return *this;
167  }
168 
169  BulkWrite& init(AsyncBuffers& parent, size_t size) {
170  if (parent.output_ != NULL) {
171  int result = ::evbuffer_reserve_space(parent.output_, size, &data_, 1);
172  if (result != 1)
173  abort(); // This should never happen
174  assert( data_.iov_base != NULL );
175  assert( data_.iov_len >= size );
176  data_.iov_len = size;
177  ptr_ = (char*)data_.iov_base;
178  end_ = ptr_ + size;
179  parent_ = &parent;
180  } else {
181  parent_ = NULL;
182  ptr_ = end_ = NULL;
183  abort(); // This should never happen
184  }
185  return *this;
186  }
187 
188  bool error() const {
189  return (ptr_ == NULL);
190  }
191 
192  char* ptr() {
193  return ptr_;
194  }
195 
196  BulkWrite& addsize(size_t size) {
197  ptr_ += size;
198  assert( ptr_ <= end_ );
199  if (parent_ != NULL && ptr_ >= end_) {
200  int result = ::evbuffer_commit_space(parent_->output_, &data_, 1);
201  if (result != 0)
202  abort(); // This should never happen
203  ptr_ = end_ = NULL;
204  }
205  return *this;
206  }
207 
208  BulkWrite& add(const char* data, size_t size) {
209  assert( ptr_ != NULL );
210  memcpy(ptr_, data, size);
211  addsize(size);
212  return *this;
213  }
214 
215  BulkWrite& add(char ch) {
216  assert( ptr_ != NULL );
217  *ptr_ = ch;
218  addsize(1);
219  return *this;
220  }
221 
222  private:
223  BulkWrite(const BulkWrite&);
224  BulkWrite& operator=(const BulkWrite&);
225 
226  AsyncBuffers* parent_;
227  struct evbuffer_iovec data_;
228  char* ptr_;
229  char* end_;
230  };
231 
234  bev_ = NULL;
235  input_ = NULL;
236  output_ = NULL;
237  read_offset_ = 0;
238  }
239 
241  void reset() {
242  bev_ = NULL;
243  input_ = NULL;
244  output_ = NULL;
245  read_offset_ = 0;
246  }
247 
251  void attach(struct bufferevent* bev) {
252  bev_ = bev;
253  input_ = ::bufferevent_get_input(bev);
254  output_ = ::bufferevent_get_output(bev);
255  }
256 
260  void attach_write(struct bufferevent* bev) {
261  bev_ = bev;
262  output_ = ::bufferevent_get_output(bev);
263  }
264 
266  void attach_read() {
267  input_ = ::bufferevent_get_input(bev_);
268  }
269 
273  size_t read_size()
274  { return ::evbuffer_get_length(input_); }
275 
288  bool read_fixed(SubString& data, SizeT size, SizeT max_size=0) {
289  assert( max_size == 0 || max_size >= size );
290  if (::evbuffer_get_length(input_) < size) {
291  ::bufferevent_setwatermark(bev_, EV_READ, size, max_size);
292  return false;
293  }
294  read_offset_ = size;
295  data.set((char*)::evbuffer_pullup(input_, size), size);
296  return true;
297  }
298 
344  template<class T>
345  bool read_fixed_helper(T& parent, SizeT& fixed_size, SizeT size, SizeT max_size=0, void* context=NULL) {
346  for (;;) {
347  SubString data;
348  if (!read_fixed(data, size, max_size)) {
349  fixed_size = size;
350  return true; // wait for more data
351  }
352  fixed_size = 0;
353  if (!parent.on_read_fixed(fixed_size, data, context))
354  return false;
355  read_flush();
356  if (fixed_size <= 0)
357  break;
358  }
359  return true;
360  }
361 
370  void read_reset(size_t max_size, size_t min_size=0) {
371  ::bufferevent_setwatermark(bev_, EV_READ, min_size, max_size);
372  }
373 
382  bool read_line(SubString& data) {
383  size_t block_len = ::evbuffer_get_contiguous_space(input_);
384  if (block_len > 0) {
385  size_t tmp_len;
386  for (;;) {
387  char* ptr = (char*)::evbuffer_pullup(input_, block_len);
388  while (read_offset_ < block_len) {
389  if (ptr[read_offset_] == '\n') {
390  assert( read_offset_ <= IntegerT<SizeT>::MAX );
391  if (read_offset_ > 0 && ptr[read_offset_-1] == '\r')
392  data.set(ptr, (SizeT)(read_offset_-1));
393  else
394  data.set(ptr, (SizeT)read_offset_);
395  ++read_offset_;
396  return true;
397  }
398  ++read_offset_;
399  }
400  if (block_len < (tmp_len=::evbuffer_get_length(input_))) {
401  block_len = tmp_len;
402  } else
403  break;
404  }
405  }
406  return false;
407  }
408 
413  void read_flush() {
414  if (read_offset_ > 0) {
415  if (::evbuffer_drain(input_, read_offset_) != 0)
416  abort(); // This should never happen
417  read_offset_ = 0;
418  }
419  }
420 
421  size_t write_size() const
422  { return (output_ == NULL ? 0 : ::evbuffer_get_length(output_)); }
423 
424  void write_clear() {
425  if (output_ != NULL)
426  ::evbuffer_drain(output_, ::evbuffer_get_length(output_));
427  }
428 
429  void write_reserve(size_t size)
430  { ::evbuffer_expand(output_, size); }
431 
432  void write(const char* data, size_t size)
433  { ::evbuffer_add(output_, data, size); }
434 
435 private:
436  struct bufferevent* bev_;
437  struct evbuffer* input_;
438  struct evbuffer* output_;
439  size_t read_offset_;
440 };
441 
443 
446  arrERROR = 0,
449 };
450 
453  aeNONE = 0,
461 };
462 
467 inline const char* async_error_msg(AsyncError err) {
468  if (err <= aeNONE || err > aeCLIENT)
469  return "Unknown error";
470  const char* MSG[] = {
471  "Connection refused",
472  "Socket closed by other side",
473  "Unrecoverable I/O error",
474  "Read error",
475  "Write error",
476  "Timed out",
477  "Client protocol error"
478  };
479  return MSG[(int)err - 1];
480 }
481 
483 
487 class AsyncBase {
488 public:
490 
496  struct OnTimer {
497  struct event* timer_handle;
498  ulong timer_msec;
499 
501  OnTimer() : timer_handle(NULL) {
502  }
503 
505  virtual ~OnTimer() {
506  ::event_free(timer_handle);
507  }
508 
513  virtual void on_timer() = 0;
514 
521  bool timer_reset(ulong msec) {
522  struct timeval tv;
523  if (::event_add(timer_handle, get_timeout_ptr(tv, msec)) != 0) {
524  ::event_free(timer_handle);
525  return false;
526  }
527  timer_msec = msec;
528  return true;
529  }
530  };
531 
535  AsyncBase() : parent_base_(NULL), child_base_(NULL), local_(true), evloop_(NULL), read_timeout_ms_(0), write_timeout_ms_(0) {
536  }
537 
541  virtual ~AsyncBase() {
542  if (parent_base_ == NULL) {
543  if (child_base_ != NULL) {
544  logger.log(LOG_LEVEL_ERROR, "AsyncBase internal cleanup error, parent destroyed before child");
545  assert( false ); // this shouldn't happen
546  }
547  delete evloop_;
548  } else {
549  parent_base_->child_base_ = child_base_;
550  if (child_base_ != NULL)
551  child_base_->parent_base_ = parent_base_;
552  }
553  }
554 
561  virtual void set_logger(LoggerBase* newlogger) {
562  logger.ptr = newlogger;
563  }
564 
569  void set_timeout(ulong read_timeout_ms=0, ulong write_timeout_ms=0) {
570  read_timeout_ms_ = read_timeout_ms;
571  write_timeout_ms_ = write_timeout_ms;
572  }
573 
582  bool set_timer(OnTimer& on_timer, ulong msec) {
583  if (on_timer.timer_handle != NULL)
584  ::event_free(on_timer.timer_handle);
585  on_timer.timer_handle = ::event_new(evloop_->handle(), -1, 0, on_timer_event, &on_timer);
586  if (on_timer.timer_handle == NULL)
587  return false;
588  return on_timer.timer_reset(msec);
589  }
590 
597  bool runlocal() {
598  if (parent_base_ != NULL || evloop_ == NULL)
599  return false;
600  if (evloop_->active() && !run_eventloop_once())
601  return false;
602  for (AsyncBase* p = this; p != NULL && evloop_->active(); p = p->child_base_)
603  while (p->check_client_active())
604  if (!run_eventloop_once())
605  return false;
606  return true;
607  }
608 
609 protected:
612  bool local_;
613 
617 
622  void init() {
623  if (evloop_ == NULL)
624  evloop_ = new AsyncEventLoop;
625 
626  }
627 
633  void init_attach(AsyncBase& parent) {
634  if (evloop_ == NULL && parent_base_ == NULL) {
635  // Always attach at end of chain
636  AsyncBase* p = &parent;
637  while (p->child_base_ != NULL) {
638  p = p->child_base_;
639  assert( p->parent_base_ != NULL );
640  }
641 
642  if (p->evloop_ == NULL) {
643  // Init parent since this hasn't been done yet
644  assert( p->parent_base_ == NULL );
645  p->init();
646  }
647 
648  parent_base_ = p;
649  p->child_base_ = this;
650  evloop_ = p->evloop_;
651  }
652  }
653 
660  virtual bool check_client_active() {
661  return false;
662  }
663 
670  if (parent_base_ != NULL || !evloop_->run1())
671  return false;
672  return true;
673  }
674 
678  bool run_eventloop() {
679  while (evloop_->active()) {
680  if (!run_eventloop_once())
681  return false;
682  }
683  return true;
684  }
685 
691  static struct timeval* get_timeout_ptr(struct timeval& out, ulong ms) {
692  if (ms > 0) {
693  #if defined(_WIN32)
694  SysWindows::set_timeval_ms(out, ms);
695  #else
696  SysLinux::set_timeval_ms(out, ms);
697  #endif
698  return &out;
699  }
700  return NULL;
701  }
702 
703 private:
704  // Disable copying
705  AsyncBase(const AsyncBase&);
706  AsyncBase& operator=(const AsyncBase&);
707 
708  static void on_timer_event(evutil_socket_t, short, void * arg) {
709  ((OnTimer*)arg)->on_timer();
710  }
711 };
712 
714 
715 }
716 #endif
bool read_fixed(SubString &data, SizeT size, SizeT max_size=0)
Read fixed size data from read buffer.
Definition: ioasync_base.h:288
size_t write_size() const
Definition: ioasync_base.h:421
void read_reset(size_t max_size, size_t min_size=0)
Reset read buffer thresholds.
Definition: ioasync_base.h:370
ulong timer_msec
Timer value in milliseconds, set by timer_reset()
Definition: ioasync_base.h:498
void store(T num, MemOrder mem_order=std::memory_order_seq_cst)
Store new value.
void shutdown()
Shutdown event loop.
Definition: ioasync_base.h:98
bool run1()
Run event loop with one pass and process I/O events.
Definition: ioasync_base.h:88
virtual void set_logger(LoggerBase *newlogger)
Set logger to use.
Definition: ioasync_base.h:561
Evo SubString container.
Done reading request.
Definition: ioasync_base.h:447
AsyncBase * child_base_
Pointer to child in AsyncBase chain (always an AsyncClient), NULL for none.
Definition: ioasync_base.h:611
Evo String container.
I/O timeout.
Definition: ioasync_base.h:459
AsyncReadResult
Async I/O read results used by protocol events.
Definition: ioasync_base.h:445
BulkWrite & addsize(size_t size)
Definition: ioasync_base.h:196
void advSize(Size size)
Advanced: Set new size after writing directly to buffer.
Definition: list.h:2754
#define EVO_ATOMIC_ACQ_REL
Combined "acquire" & "release" level memory barrier.
Definition: atomic.h:33
bool runlocal()
Run the event-loop locally in current thread until all pending requests are handled (client only)...
Definition: ioasync_base.h:597
I/O read error.
Definition: ioasync_base.h:457
Timer expired event.
Definition: ioasync_base.h:496
void attach_read()
Attach to current write buffers for reading too (used internally).
Definition: ioasync_base.h:266
struct event_base * Handle
Definition: ioasync_base.h:53
I/O write error.
Definition: ioasync_base.h:458
void init_attach(AsyncBase &parent)
Initialize and attach to a parent event-loop.
Definition: ioasync_base.h:633
BulkWrite(String &str, SizeT size)
Definition: ioasync_base.h:149
Wraps a logger pointer that can reference a logger to use or be disabled.
Definition: logger.h:377
void init()
Initialize event-loop.
Definition: ioasync_base.h:622
BulkWrite & add(const char *data, size_t size)
Definition: ioasync_base.h:208
const char * async_error_msg(AsyncError err)
Get error message for AsyncError code.
Definition: ioasync_base.h:467
void reset()
Reset buffer pointers (used internally).
Definition: ioasync_base.h:241
AsyncBase * parent_base_
Pointer to parent in AsyncBase chain, NULL if this is the main parent (and owns evloop_ pointer) ...
Definition: ioasync_base.h:610
Socket closed by other side.
Definition: ioasync_base.h:455
Basic integer type.
Definition: type.h:980
ulong read_timeout_ms_
Socket read timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:615
Mutex for thread synchronization.
Definition: thread.h:104
virtual ~OnTimer()
Destructor.
Definition: ioasync_base.h:505
bool local_
Whether event-loop is local (same thread), false if separate thread.
Definition: ioasync_base.h:612
LoggerPtr logger
Logger for protocol and debug messages, set to enable logging – see set_logger() ...
Definition: ioasync_base.h:489
I/O unrecoverable error.
Definition: ioasync_base.h:456
bool set_timer(OnTimer &on_timer, ulong msec)
Activate timer so it expires after given time elapses.
Definition: ioasync_base.h:582
void attach_write(struct bufferevent *bev)
Attach to active buffers for writing (used internally).
Definition: ioasync_base.h:260
Evo threads implementation.
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
void write(const char *data, size_t size)
Definition: ioasync_base.h:432
No error.
Definition: ioasync_base.h:453
String container.
Definition: string.h:674
bool timer_reset(ulong msec)
Reset and activate timer so the on_timer() event is called after given time elapses.
Definition: ioasync_base.h:521
bool log(LogLevel level, const SubString &msg)
Log a message with given severity level.
Definition: logger.h:428
bool compare_set(T cmpval, T newval, MemOrder mem_order_success=std::memory_order_seq_cst, MemOrder mem_order_failure=std::memory_order_acquire)
Compare and set, storing new value if comparison matches.
Definition: atomic.h:332
Base class for Async I/O.
Definition: ioasync_base.h:487
Smart locking for synchronization.
Definition: lock.h:28
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
More to read for request.
Definition: ioasync_base.h:448
An error occurred.
Definition: ioasync_base.h:446
#define EVO_ATOMIC_ACQUIRE
Start "acquire" memory ordering barrier, usually followed by a matching "release" barrier...
Definition: atomic.h:27
bool read_fixed_helper(T &parent, SizeT &fixed_size, SizeT size, SizeT max_size=0, void *context=NULL)
Helper for reading fixed size data from read buffer from a ProtocolHandler on_read() event...
Definition: ioasync_base.h:345
LoggerType * ptr
Logger pointer, NULL to disable logging with this
Definition: logger.h:380
OnTimer()
Constructor.
Definition: ioasync_base.h:501
~BulkWrite()
Definition: ioasync_base.h:157
Use to group multiple writes for efficiency.
Definition: ioasync_base.h:142
Evo C++ Library namespace.
Definition: alg.h:11
size_t read_size()
Get read buffer data size in bytes.
Definition: ioasync_base.h:273
#define EVO_ATOMIC_RELEASE
Release (end) memory ordering barrier started with "consume" or "acquire" barrier.
Definition: atomic.h:30
BulkWrite(AsyncBuffers &parent, size_t size)
Definition: ioasync_base.h:153
void read_flush()
Flush and consume next line from read buffer.
Definition: ioasync_base.h:413
Handle handle()
Get event loop handle.
Definition: ioasync_base.h:71
bool active() const
Get whether event-loop is active.
Definition: ioasync_base.h:77
void set_timeout(ulong read_timeout_ms=0, ulong write_timeout_ms=0)
Set read/write timeouts to use.
Definition: ioasync_base.h:569
bool read_line(SubString &data)
Read next line from read buffer.
Definition: ioasync_base.h:382
Evo async event handling.
Evo logging.
BulkWrite & add(char ch)
Definition: ioasync_base.h:215
char * ptr()
Definition: ioasync_base.h:192
void attach(struct bufferevent *bev)
Attach to active buffers (used internally).
Definition: ioasync_base.h:251
#define EVO_LIBEVENT_MINVER
Definition: ioasync_base.h:26
Evo Input/Output Socket streams.
ulong write_timeout_ms_
Socket write timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:616
T * advBuffer(Size size)
Advanced: Resize and get buffer pointer (modifier).
Definition: list.h:2728
bool error() const
Definition: ioasync_base.h:188
bool run_eventloop_once()
Run event loop with one pass and process all events.
Definition: ioasync_base.h:669
virtual bool check_client_active()
Called during client event-loop to check whether any client requests are active (client only)...
Definition: ioasync_base.h:660
Error message showing something isn&#39;t working as expected, program may be able to work around it (ERR...
Definition: logger.h:133
Connection refused (clients only)
Definition: ioasync_base.h:454
AsyncBase()
Constructor.
Definition: ioasync_base.h:535
BulkWrite & init(AsyncBuffers &parent, size_t size)
Definition: ioasync_base.h:169
Manages an event-loop for async I/O.
Definition: ioasync_base.h:51
static void set_timeval_ms(struct timeval &tm, ulong ms)
Definition: sys.h:1526
SubString & set(const char *data)
Set as reference to terminated string.
Definition: substring.h:353
AsyncBuffers()
Constructor (used internally).
Definition: ioasync_base.h:233
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
bool run_eventloop()
Run event loop and process all events and repeat until shutdown.
Definition: ioasync_base.h:678
Reference and access existing string data.
Definition: substring.h:229
Base class for Logger.
Definition: logger.h:273
T load(MemOrder mem_order=std::memory_order_seq_cst) const
Load and return current value.
AsyncEventLoop()
Constructor.
Definition: ioasync_base.h:56
AsyncEventLoop * evloop_
Event loop pointer, either owned by this or a parent.
Definition: ioasync_base.h:614
~AsyncEventLoop()
Destructor.
Definition: ioasync_base.h:64
BulkWrite()
Definition: ioasync_base.h:144
static struct timeval * get_timeout_ptr(struct timeval &out, ulong ms)
Get timeval struct pointer from timeout in milliseconds.
Definition: ioasync_base.h:691
Evo AtomicBufferQueue.
virtual ~AsyncBase()
Destructor.
Definition: ioasync_base.h:541
void write_clear()
Definition: ioasync_base.h:424
Client protocol error (set by protocol implementation)
Definition: ioasync_base.h:460
void write_reserve(size_t size)
Definition: ioasync_base.h:429
BulkWrite & init(String &str, SizeT size)
Definition: ioasync_base.h:161
struct event * timer_handle
Internal handle for event.
Definition: ioasync_base.h:497