8 #ifndef INCL_evo_ioasync_base_h 9 #define INCL_evo_ioasync_base_h 20 #if !defined(LIBEVENT_VERSION_NUMBER) 21 #include <event2/event.h> 22 #include <event2/buffer.h> 23 #include <event2/bufferevent.h> 24 #include <event2/thread.h> 26 #define EVO_LIBEVENT_MINVER 0x02000000 27 #if LIBEVENT_VERSION_NUMBER < EVO_LIBEVENT_MINVER 28 #error Evo Async I/O requires libevent 2.0+ 30 #if !defined(EVO_ASYNC_MULTI_THREAD) 35 #define EVO_ASYNC_MULTI_THREAD 0 38 #if defined(EVO_MSVC_YEAR) 39 #pragma comment(lib, "libevent_core.lib") 40 #pragma comment(lib, "advapi32.lib") 57 static FirstInitHelper first_init_helper;
58 evbase_ = ::event_base_new();
65 ::event_base_free(evbase_);
89 int result = ::event_base_loop(evbase_, EVLOOP_ONCE);
100 if (::event_base_loopexit(evbase_, NULL) != 0)
106 struct FirstInitHelper {
112 #if EVO_ASYNC_MULTI_THREAD 115 ::evthread_use_windows_threads();
117 ::evthread_use_pthreads();
119 assert( result == 0 );
158 assert( ptr_ == end_ );
162 const SizeT used = str.used();
163 ptr_ = str.
advBuffer(used + size) + used;
170 if (parent.output_ != NULL) {
171 int result = ::evbuffer_reserve_space(parent.output_, size, &data_, 1);
174 assert( data_.iov_base != NULL );
175 assert( data_.iov_len >= size );
176 data_.iov_len = size;
177 ptr_ = (
char*)data_.iov_base;
189 return (ptr_ == NULL);
198 assert( ptr_ <= end_ );
199 if (parent_ != NULL && ptr_ >= end_) {
200 int result = ::evbuffer_commit_space(parent_->output_, &data_, 1);
209 assert( ptr_ != NULL );
210 memcpy(ptr_, data, size);
216 assert( ptr_ != NULL );
227 struct evbuffer_iovec data_;
253 input_ = ::bufferevent_get_input(bev);
254 output_ = ::bufferevent_get_output(bev);
262 output_ = ::bufferevent_get_output(bev);
267 input_ = ::bufferevent_get_input(bev_);
274 { return ::evbuffer_get_length(input_); }
289 assert( max_size == 0 || max_size >= size );
290 if (::evbuffer_get_length(input_) < size) {
291 ::bufferevent_setwatermark(bev_, EV_READ, size, max_size);
295 data.
set((
char*)::evbuffer_pullup(input_, size), size);
348 if (!read_fixed(data, size, max_size)) {
353 if (!parent.on_read_fixed(fixed_size, data, context))
371 ::bufferevent_setwatermark(bev_, EV_READ, min_size, max_size);
383 size_t block_len = ::evbuffer_get_contiguous_space(input_);
387 char* ptr = (
char*)::evbuffer_pullup(input_, block_len);
388 while (read_offset_ < block_len) {
389 if (ptr[read_offset_] ==
'\n') {
391 if (read_offset_ > 0 && ptr[read_offset_-1] ==
'\r')
392 data.
set(ptr, (
SizeT)(read_offset_-1));
400 if (block_len < (tmp_len=::evbuffer_get_length(input_))) {
414 if (read_offset_ > 0) {
415 if (::evbuffer_drain(input_, read_offset_) != 0)
422 {
return (output_ == NULL ? 0 : ::evbuffer_get_length(output_)); }
426 ::evbuffer_drain(output_, ::evbuffer_get_length(output_));
430 { ::evbuffer_expand(output_, size); }
432 void write(
const char* data,
size_t size)
433 { ::evbuffer_add(output_, data, size); }
436 struct bufferevent* bev_;
437 struct evbuffer* input_;
438 struct evbuffer* output_;
468 if (err <= aeNONE || err >
aeCLIENT)
469 return "Unknown error";
470 const char* MSG[] = {
471 "Connection refused",
472 "Socket closed by other side",
473 "Unrecoverable I/O error",
477 "Client protocol error" 479 return MSG[(int)err - 1];
506 ::event_free(timer_handle);
513 virtual void on_timer() = 0;
523 if (::event_add(timer_handle, get_timeout_ptr(tv, msec)) != 0) {
524 ::event_free(timer_handle);
535 AsyncBase() : parent_base_(NULL), child_base_(NULL), local_(true), evloop_(NULL), read_timeout_ms_(0), write_timeout_ms_(0) {
542 if (parent_base_ == NULL) {
543 if (child_base_ != NULL) {
544 logger.
log(
LOG_LEVEL_ERROR,
"AsyncBase internal cleanup error, parent destroyed before child");
549 parent_base_->child_base_ = child_base_;
550 if (child_base_ != NULL)
551 child_base_->parent_base_ = parent_base_;
562 logger.
ptr = newlogger;
569 void set_timeout(ulong read_timeout_ms=0, ulong write_timeout_ms=0) {
570 read_timeout_ms_ = read_timeout_ms;
571 write_timeout_ms_ = write_timeout_ms;
585 on_timer.
timer_handle = ::event_new(evloop_->handle(), -1, 0, on_timer_event, &on_timer);
598 if (parent_base_ != NULL || evloop_ == NULL)
600 if (evloop_->active() && !run_eventloop_once())
603 while (p->check_client_active())
604 if (!run_eventloop_once())
634 if (evloop_ == NULL && parent_base_ == NULL) {
670 if (parent_base_ != NULL || !evloop_->
run1())
679 while (evloop_->
active()) {
680 if (!run_eventloop_once())
694 SysWindows::set_timeval_ms(out, ms);
708 static void on_timer_event(evutil_socket_t,
short,
void * arg) {
bool read_fixed(SubString &data, SizeT size, SizeT max_size=0)
Read fixed size data from read buffer.
Definition: ioasync_base.h:288
size_t write_size() const
Definition: ioasync_base.h:421
void read_reset(size_t max_size, size_t min_size=0)
Reset read buffer thresholds.
Definition: ioasync_base.h:370
ulong timer_msec
Timer value in milliseconds, set by timer_reset()
Definition: ioasync_base.h:498
void store(T num, MemOrder mem_order=std::memory_order_seq_cst)
Store new value.
void shutdown()
Shutdown event loop.
Definition: ioasync_base.h:98
bool run1()
Run event loop with one pass and process I/O events.
Definition: ioasync_base.h:88
virtual void set_logger(LoggerBase *newlogger)
Set logger to use.
Definition: ioasync_base.h:561
Done reading request.
Definition: ioasync_base.h:447
AsyncBase * child_base_
Pointer to child in AsyncBase chain (always an AsyncClient), NULL for none.
Definition: ioasync_base.h:611
I/O timeout.
Definition: ioasync_base.h:459
AsyncReadResult
Async I/O read results used by protocol events.
Definition: ioasync_base.h:445
BulkWrite & addsize(size_t size)
Definition: ioasync_base.h:196
void advSize(Size size)
Advanced: Set new size after writing directly to buffer.
Definition: list.h:2754
#define EVO_ATOMIC_ACQ_REL
Combined "acquire" & "release" level memory barrier.
Definition: atomic.h:33
bool runlocal()
Run the event-loop locally in current thread until all pending requests are handled (client only)...
Definition: ioasync_base.h:597
I/O read error.
Definition: ioasync_base.h:457
Timer expired event.
Definition: ioasync_base.h:496
void attach_read()
Attach to current write buffers for reading too (used internally).
Definition: ioasync_base.h:266
struct event_base * Handle
Definition: ioasync_base.h:53
I/O write error.
Definition: ioasync_base.h:458
void init_attach(AsyncBase &parent)
Initialize and attach to a parent event-loop.
Definition: ioasync_base.h:633
BulkWrite(String &str, SizeT size)
Definition: ioasync_base.h:149
Wraps a logger pointer that can reference a logger to use or be disabled.
Definition: logger.h:377
void init()
Initialize event-loop.
Definition: ioasync_base.h:622
BulkWrite & add(const char *data, size_t size)
Definition: ioasync_base.h:208
const char * async_error_msg(AsyncError err)
Get error message for AsyncError code.
Definition: ioasync_base.h:467
void reset()
Reset buffer pointers (used internally).
Definition: ioasync_base.h:241
AsyncBase * parent_base_
Pointer to parent in AsyncBase chain, NULL if this is the main parent (and owns evloop_ pointer) ...
Definition: ioasync_base.h:610
Socket closed by other side.
Definition: ioasync_base.h:455
Basic integer type.
Definition: type.h:980
ulong read_timeout_ms_
Socket read timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:615
Mutex for thread synchronization.
Definition: thread.h:104
virtual ~OnTimer()
Destructor.
Definition: ioasync_base.h:505
bool local_
Whether event-loop is local (same thread), false if separate thread.
Definition: ioasync_base.h:612
LoggerPtr logger
Logger for protocol and debug messages, set to enable logging – see set_logger() ...
Definition: ioasync_base.h:489
I/O unrecoverable error.
Definition: ioasync_base.h:456
bool set_timer(OnTimer &on_timer, ulong msec)
Activate timer so it expires after given time elapses.
Definition: ioasync_base.h:582
void attach_write(struct bufferevent *bev)
Attach to active buffers for writing (used internally).
Definition: ioasync_base.h:260
Evo threads implementation.
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
void write(const char *data, size_t size)
Definition: ioasync_base.h:432
No error.
Definition: ioasync_base.h:453
String container.
Definition: string.h:674
bool timer_reset(ulong msec)
Reset and activate timer so the on_timer() event is called after given time elapses.
Definition: ioasync_base.h:521
bool log(LogLevel level, const SubString &msg)
Log a message with given severity level.
Definition: logger.h:428
bool compare_set(T cmpval, T newval, MemOrder mem_order_success=std::memory_order_seq_cst, MemOrder mem_order_failure=std::memory_order_acquire)
Compare and set, storing new value if comparison matches.
Definition: atomic.h:332
Base class for Async I/O.
Definition: ioasync_base.h:487
Smart locking for synchronization.
Definition: lock.h:28
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
More to read for request.
Definition: ioasync_base.h:448
An error occurred.
Definition: ioasync_base.h:446
#define EVO_ATOMIC_ACQUIRE
Start "acquire" memory ordering barrier, usually followed by a matching "release" barrier...
Definition: atomic.h:27
bool read_fixed_helper(T &parent, SizeT &fixed_size, SizeT size, SizeT max_size=0, void *context=NULL)
Helper for reading fixed size data from read buffer from a ProtocolHandler on_read() event...
Definition: ioasync_base.h:345
LoggerType * ptr
Logger pointer, NULL to disable logging with this
Definition: logger.h:380
OnTimer()
Constructor.
Definition: ioasync_base.h:501
~BulkWrite()
Definition: ioasync_base.h:157
Use to group multiple writes for efficiency.
Definition: ioasync_base.h:142
Evo C++ Library namespace.
Definition: alg.h:11
size_t read_size()
Get read buffer data size in bytes.
Definition: ioasync_base.h:273
#define EVO_ATOMIC_RELEASE
Release (end) memory ordering barrier started with "consume" or "acquire" barrier.
Definition: atomic.h:30
BulkWrite(AsyncBuffers &parent, size_t size)
Definition: ioasync_base.h:153
void read_flush()
Flush and consume next line from read buffer.
Definition: ioasync_base.h:413
Handle handle()
Get event loop handle.
Definition: ioasync_base.h:71
bool active() const
Get whether event-loop is active.
Definition: ioasync_base.h:77
void set_timeout(ulong read_timeout_ms=0, ulong write_timeout_ms=0)
Set read/write timeouts to use.
Definition: ioasync_base.h:569
bool read_line(SubString &data)
Read next line from read buffer.
Definition: ioasync_base.h:382
Evo async event handling.
BulkWrite & add(char ch)
Definition: ioasync_base.h:215
char * ptr()
Definition: ioasync_base.h:192
void attach(struct bufferevent *bev)
Attach to active buffers (used internally).
Definition: ioasync_base.h:251
#define EVO_LIBEVENT_MINVER
Definition: ioasync_base.h:26
Evo Input/Output Socket streams.
ulong write_timeout_ms_
Socket write timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:616
T * advBuffer(Size size)
Advanced: Resize and get buffer pointer (modifier).
Definition: list.h:2728
bool error() const
Definition: ioasync_base.h:188
bool run_eventloop_once()
Run event loop with one pass and process all events.
Definition: ioasync_base.h:669
virtual bool check_client_active()
Called during client event-loop to check whether any client requests are active (client only)...
Definition: ioasync_base.h:660
Connection refused (clients only)
Definition: ioasync_base.h:454
AsyncBase()
Constructor.
Definition: ioasync_base.h:535
BulkWrite & init(AsyncBuffers &parent, size_t size)
Definition: ioasync_base.h:169
Manages an event-loop for async I/O.
Definition: ioasync_base.h:51
static void set_timeval_ms(struct timeval &tm, ulong ms)
Definition: sys.h:1526
SubString & set(const char *data)
Set as reference to terminated string.
Definition: substring.h:353
AsyncBuffers()
Constructor (used internally).
Definition: ioasync_base.h:233
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
bool run_eventloop()
Run event loop and process all events and repeat until shutdown.
Definition: ioasync_base.h:678
Reference and access existing string data.
Definition: substring.h:229
Base class for Logger.
Definition: logger.h:273
T load(MemOrder mem_order=std::memory_order_seq_cst) const
Load and return current value.
AsyncEventLoop()
Constructor.
Definition: ioasync_base.h:56
AsyncEventLoop * evloop_
Event loop pointer, either owned by this or a parent.
Definition: ioasync_base.h:614
~AsyncEventLoop()
Destructor.
Definition: ioasync_base.h:64
BulkWrite()
Definition: ioasync_base.h:144
static struct timeval * get_timeout_ptr(struct timeval &out, ulong ms)
Get timeval struct pointer from timeout in milliseconds.
Definition: ioasync_base.h:691
virtual ~AsyncBase()
Destructor.
Definition: ioasync_base.h:541
void write_clear()
Definition: ioasync_base.h:424
Client protocol error (set by protocol implementation)
Definition: ioasync_base.h:460
void write_reserve(size_t size)
Definition: ioasync_base.h:429
BulkWrite & init(String &str, SizeT size)
Definition: ioasync_base.h:161
struct event * timer_handle
Internal handle for event.
Definition: ioasync_base.h:497