Evo C++ Library v0.5.1
ioasync_server.h
Go to the documentation of this file.
1 // Evo C++ Library
2 /* Copyright 2019 Justin Crowell
3 Distributed under the BSD 2-Clause License -- see included file LICENSE.txt for details.
4 */
6 
7 #pragma once
8 #ifndef INCL_evo_ioasync_server_h
9 #define INCL_evo_ioasync_server_h
10 
11 #include "ioasync_base.h"
12 
13 namespace evo {
16 
18 
23 template<class T=AsyncBuffers>
25 public:
26  typedef T OutBuffer;
28 
30  enum WriterFlags {
31  wfNONE = 0,
32  wfDEFERRED = 0x01,
34  };
35 
43  struct Writer : public OutBuffer::BulkWrite {
44  using OutBuffer::BulkWrite::init;
45 
55  Writer(This& parent, ulong id, SizeT buf_size, WriterFlags flags=wfNONE) {
56  if (int(flags) & wfDEFERRED) {
57  // Deferred
58  parent.send_end();
59  if (id == parent.next_id_) {
60  // This is the current reply
61  init(parent.buf_, buf_size);
62  if (flags == wfDEFERRED_LAST)
63  ++parent.next_id_;
64  } else {
65  // Either append to existing reply in queue, or insert new reply -- keep queue ordered by ID
66  ReplyItem* rsp;
67  parent.deferred_get_queue_item(rsp, id);
68  init(rsp->data, buf_size);
69  }
70  } else if (id == parent.prev_id_) {
71  // Append to previous send
72  if (parent.prev_ == NULL)
73  init(parent.buf_, buf_size);
74  else
75  init(parent.prev_->data, buf_size);
76  } else {
77  // New send
78  parent.send_end();
79  if (id == parent.next_id_) {
80  init(parent.buf_, buf_size);
81  ++parent.next_id_;
82  parent.prev_ = NULL;
83  } else {
84  parent.prev_ = parent.queue_.addnew().advLast();
85  parent.prev_->id = id;
86  init(parent.prev_->data, buf_size);
87  }
88  parent.prev_id_ = id;
89  }
90  }
91  };
92 
96  AsyncServerReplyT(T& bufs) : buf_(bufs), deferred_count_(0), gen_id_(1), next_id_(1), prev_id_(0), prev_(NULL) {
97  }
98 
102  ulong gen_id() {
103  return gen_id_++;
104  }
105 
109  ulong deferred_active() const {
110  return deferred_count_;
111  }
112 
119  template<class U>
120  void deferred_start(U& context) {
121  ++deferred_count_;
122  context.addref();
123  }
124 
133  template<class U>
134  bool deferred_end(U& context) {
135  --deferred_count_;
136  send_end();
137  return context.endref();
138  }
139 
151  This& deferred_send(ulong id, String& data, bool last) {
152  send_end();
153  if (id == next_id_) {
154  // This is the current reply
155  buf_.write(data.data(), data.size());
156  if (last)
157  ++next_id_;
158  } else {
159  // Either append to existing reply in queue, or insert new reply -- keep queue ordered by ID
160  ReplyItem* rsp;
161  if (deferred_get_queue_item(rsp, id))
162  rsp->data.add(data);
163  else
164  rsp->data = data;
165  }
166  return *this;
167  }
168 
178  void send(ulong id, String& data) {
179  if (id == prev_id_) {
180  // Append to previous send
181  if (prev_ == NULL)
182  buf_.write(data.data(), data.size());
183  else
184  prev_->data.add(data);
185  } else {
186  // New send
187  send_end();
188  if (id == next_id_) {
189  buf_.write(data.data(), data.size());
190  ++next_id_;
191  } else {
192  prev_ = queue_.addnew().advLast();
193  prev_->id = id;
194  prev_->data = data;
195  }
196  prev_id_ = id;
197  }
198  }
199 
204  void send_end() {
205  ReplyItem* rsp;
206  while ((rsp = queue_.advFirst()) != NULL && rsp->id == next_id_) {
207  buf_.write(rsp->data.data(), rsp->data.size());
208  queue_.popq();
209  ++next_id_;
210  }
211  }
212 
218  void nosend(ulong id) {
219  if (id + 1 == gen_id_)
220  --gen_id_;
221  }
222 
223 private:
224  // Disable copying
225  AsyncServerReplyT(const This&);
226  This& operator=(const This&);
227 
228  // Reply item in queue
229  struct ReplyItem {
230  ulong id;
231  String data;
232 
233  ReplyItem() : id(0) {
234  }
235  ReplyItem(const ReplyItem& src) : id(src.id), data(src.data) {
236  }
237  ReplyItem& operator=(const ReplyItem& src) {
238  id = src.id;
239  data = src.data;
240  return *this;
241  }
242  };
243 
244  OutBuffer& buf_; // write buffer
245  ulong deferred_count_; // deferred replies in progress, 0 for none
246  ulong gen_id_; // next ID to generate
247  ulong next_id_; // next ID to send
248  List<ReplyItem> queue_; // reply queue, only used while replies are out of order
249  ulong prev_id_; // previously queued/sent request ID -- not used by deferred_send()
250  ReplyItem* prev_; // previously queued item, NULL if none -- not used by deferred_send()
251 
252  bool deferred_get_queue_item(ReplyItem*& rsp, ulong id) {
253  SizeT i = 0, sz = queue_.size();
254  if (sz > 0 && id > queue_[sz-1].id) {
255  i = sz; // ID not in queue, insert at end
256  } else {
257  for (; i < sz; ++i) {
258  rsp = &queue_.advItem(i);
259  if (id < rsp->id)
260  break; // ID not in queue, insert here
261  if (id == rsp->id) {
262  // Use existing reply in queue
263  return true;
264  }
265  }
266  }
267 
268  // Insert new reply in queue
269  rsp = &queue_.advItem(queue_.insertnew(i));
270  rsp->id = id;
271  return false;
272  }
273 };
274 
276 
278 
295  template<class T>
298 
310  struct ReplyBase {
311  Context& context;
312  ulong id;
313 
314  ReplyBase(Context& context, ulong id) : context(context), id(id), finished(false) {
315  context.handler->reply.deferred_start(context);
316  }
317 
319  if (!finished && context.handler != NULL)
320  context.handler->logger.log(LOG_LEVEL_ERROR, "AsyncServer DeferredReply left unfinished");
321  }
322 
323  protected:
324  bool finished;
325  };
326 
327  typedef T Handler;
328  Handler* handler;
329 
333  DeferredContextT(Handler& handler) : handler(&handler), refcount(1) {
334  }
335 
339  ulong count() const {
340  return refcount;
341  }
342 
344  void addref() {
345  ++refcount;
346  }
347 
351  bool endref() {
352  if (--refcount == 0) {
353  assert( handler == NULL ); // failure here indicates deferred start/end call mismatch (shouldn't happen)
354  delete this;
355  return true;
356  }
357  return false;
358  }
359 
365  bool detach() {
366  handler = NULL;
367  return endref();
368  }
369 
370  private:
371  ulong refcount;
372  };
373 
379  rtCLOSE
380  };
381 
388  template<class T>
389  struct ResponseResult {
390  typedef T ResultType;
391 
393  ResultType result;
394 
396  ResponseResult() : type(rtCLOSE), result((ResultType)0) {
397  }
398 
402  ResponseResult(const ResponseResult& src) : type(src.type), result(src.result) {
403  }
404 
408  ResponseResult(ResponseType type) : type(type), result((ResultType)0) {
409  }
410 
414  ResponseResult(ResultType result) : type(rtNORMAL), result(result) {
415  }
416 
422  type = newtype;
423  result = (T)0;
424  return *this;
425  }
426 
431  ResponseResult& operator=(ResultType newresult) {
432  type = rtNORMAL;
433  result = newresult;
434  return *this;
435  }
436 
442  type = src.type;
443  result = src.result;
444  return * this;
445  }
446  };
447 
453  static const size_t MAX_INITIAL_READ = 8192; // 8 KB
454 
458  struct Global {
459  };
460 
467  template<class T=Global>
469  typedef T Global;
470 
478  bool on_init(AsyncBase& server, Global& global) {
479  EVO_PARAM_UNUSED(server);
480  EVO_PARAM_UNUSED(global);
481  return true;
482  }
483 
485  void on_uninit() {
486  }
487  };
488 
494  };
495 
497  AsyncServerReply reply;
498  ulong id;
499 
501  AsyncServerHandler() : reply(buffers), id(0) {
502  }
503 
507  void set_id() {
508  id = reply.gen_id();
509  }
510 };
511 
513 
575 template<class T>
576 class AsyncServer : public AsyncBase {
577 public:
578  typedef T ProtocolServer;
580  typedef typename ProtocolServer::Handler::DeferredContext DeferredContext;
581  typedef typename ProtocolServer::Handler::Global Global;
582  typedef typename ProtocolServer::Handler::Shared Shared;
583 
584  struct Stats {
586  ulong accept_ok;
587  ulong accept_err;
588  ulong event_err;
589  ulong reads;
590 
591  Stats() {
592  active_connections = 0;
593  accept_ok = 0;
594  accept_err = 0;
595  event_err = 0;
596  reads = 0;
597  }
598  };
599 
601  AsyncServer() : last_id_(0) {
602  init();
603  }
604 
610  Global& get_global() {
611  return global_;
612  }
613 
622  bool run(IoSocket::Handle listener) {
623  {
624  IoSocket listener_socket(listener);
625  Error err = listener_socket.set_nonblock();
626  if (err != ENone) {
627  if (logger.check(LOG_LEVEL_ALERT)) {
628  String msg("AsyncServer listener error setting as non-blocking: ");
629  IoSocket::errormsg_out(msg, err);
630  logger.log_direct(LOG_LEVEL_ALERT, msg);
631  }
632  return false;
633  }
634  listener_socket.detach();
635  }
636 
637  struct event* ev = ::event_new(evloop_->handle(), listener, EV_READ | EV_PERSIST, on_listener_ready, this);
638  if (ev == NULL) {
639  logger.log(LOG_LEVEL_ALERT, "AsyncServer libevent event_new() failed on listener -- this shouldn't happen");
640  return false;
641  }
642 
643  if (::event_add(ev, NULL) != 0) {
644  logger.log(LOG_LEVEL_ALERT, "AsyncServer libevent event_add() failed on listener -- this shouldn't happen");
645  return false;
646  }
647 
648  if (!shared_.on_init(*this, global_)) {
649  logger.log(LOG_LEVEL_ALERT, "AsyncServer Shared on_init() returned an error, indicating a bad configuration");
650  return false;
651  } else
652  logger.log(LOG_LEVEL_DEBUG, "AsyncServer ready");
653 
654  run_eventloop();
655  ::event_free(ev);
656  return true;
657  }
658 
667  bool run(Socket& listener) {
668  return run(listener.device().handle);
669  }
670 
675  void shutdown() {
676  evloop_->shutdown();
677  }
678 
679 private:
680  Global global_;
681  Shared shared_;
682  Stats stats_;
683  ulong last_id_;
684 
685  using AsyncBase::runlocal;
686 
687  struct Connection {
688  This& server;
689  DeferredContext* deferred_context;
690  ProtocolServer protocol_server;
691  struct bufferevent* bev;
692  SizeT read_fixed_size_;
693  ulong id;
694 
695  Connection(This& async_server, struct bufferevent* bev, ulong id) :
696  server(async_server), protocol_server(async_server.global_, async_server.shared_, async_server.logger.ptr), bev(bev), read_fixed_size_(0), id(id) {
697  deferred_context = new DeferredContext(protocol_server.handler);
698 
699  ::bufferevent_setcb(bev, on_read, NULL, on_error, this);
700  ::bufferevent_setwatermark(bev, EV_READ, T::MIN_INITIAL_READ, T::Handler::MAX_INITIAL_READ);
701  if (server.read_timeout_ms_ > 0 || server.write_timeout_ms_ > 0) {
702  struct timeval read_timeout, write_timeout;
703  const int result = ::bufferevent_set_timeouts(bev,
704  get_timeout_ptr(read_timeout, server.read_timeout_ms_),
705  get_timeout_ptr(write_timeout, server.write_timeout_ms_)
706  );
707  if (result != 0)
708  server.logger.log(LOG_LEVEL_ERROR, "AsyncServer libevent bufferevent_set_timeouts() returned an error -- this shouldn't happen");
709  }
710  }
711 
712  ~Connection() {
713  ::bufferevent_free(bev);
714  --server.stats_.active_connections;
715  if (!deferred_context->detach())
716  server.logger.log(LOG_LEVEL_DEBUG_LOW, "AsyncServer cleanup, deferred pending");
717  }
718 
719  bool enable() {
720  if (::bufferevent_enable(bev, EV_READ | EV_WRITE) != 0) {
721  server.logger.log(LOG_LEVEL_ALERT, "AsyncServer libevent bufferevent_enable() returned an error -- this shouldn't happen");
722  return false;
723  }
724  return true;
725  }
726  };
727 
728  static void on_listener_ready(evutil_socket_t listener, short event, void* self_ptr) {
729  EVO_PARAM_UNUSED(event);
730  This& self = *(This*)self_ptr;
731  Error err;
732  IoSocket listener_socket(listener), client_socket;
733  if (!listener_socket.accept_nonblock(err, client_socket)) {
734  if (self.logger.check(LOG_LEVEL_ALERT)) {
735  String msg;
736  msg = "AsyncServer socket accept failed: ";
737  IoSocket::errormsg_out(msg, err);
738  self.logger.log_direct(LOG_LEVEL_ALERT, msg);
739  }
740  ++self.stats_.accept_err;
741  return;
742  }
743  listener_socket.detach();
744 
745  struct bufferevent* bev = ::bufferevent_socket_new(self.evloop_->handle(), client_socket.detach(), BEV_OPT_CLOSE_ON_FREE);
746  if (bev == NULL) {
747  self.logger.log(LOG_LEVEL_ALERT, "AsyncServer libevent bufferevent_socket_new() returned an error -- this shouldn't happen");
748  ++self.stats_.accept_err;
749  return;
750  }
751 
752  // Connection takes ownership of bev, and is freed by on_read() or on_error() (when connection is closed)
753  Connection* conn = new Connection(self, bev, ++self.last_id_);
754  ++self.stats_.active_connections;
755  if (!conn->enable()) {
756  delete conn; // failed to enable connection event handling
757  ++self.stats_.accept_err;
758  } else
759  ++self.stats_.accept_ok;
760  }
761 
762  static void on_read(struct bufferevent* bev, void* conn_ptr) {
763  Connection* conn = (Connection*)conn_ptr;
764  LoggerPtr<>& logger = conn->server.logger;
765  ++conn->server.stats_.reads;
766  conn->protocol_server.handler.buffers.attach(bev);
767 
768  String logstr;
769  AsyncBuffers* bufs = &conn->protocol_server.handler.buffers;
770  if (conn->read_fixed_size_ > 0) {
771  if (logger.check(LOG_LEVEL_DEBUG_LOW))
772  logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(72) << "AsyncServer connection " << conn->id << " fixed read: " << conn->read_fixed_size_);
773  for (;;) {
774  SubString data;
775  if (!bufs->read_fixed(data, conn->read_fixed_size_))
776  return; // wait for more data
777  conn->read_fixed_size_ = 0;
778  if (!conn->protocol_server.on_read_fixed(conn->read_fixed_size_, data, conn->deferred_context)) {
779  delete conn;
780  if (logger.check(LOG_LEVEL_DEBUG_LOW))
781  logger.log(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(72) << "AsyncServer connection " << conn->id << " on_read_fixed() returned false to close");
782  return;
783  }
784  bufs->read_flush();
785  if (conn->read_fixed_size_ <= 0)
786  break;
787  }
788  bufs->read_reset(ProtocolServer::MIN_INITIAL_READ, ProtocolServer::Handler::MAX_INITIAL_READ);
789  if (bufs->read_size() == 0)
790  return;
791  }
792 
793  if (logger.check(LOG_LEVEL_DEBUG_LOW))
794  logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(72) << "AsyncServer connection " << conn->id << " read: " << bufs->read_size());
795  if (!conn->protocol_server.on_read(conn->read_fixed_size_, *bufs, conn->deferred_context)) {
796  delete conn;
797  if (logger.check(LOG_LEVEL_DEBUG_LOW))
798  logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(72) << "AsyncServer connection " << conn->id << " on_read() returned false to close");
799  }
800  }
801 
802  static void on_error(struct bufferevent* bev, short error, void* conn_ptr) {
803  EVO_PARAM_UNUSED(bev);
804  Connection* conn = (Connection*)conn_ptr;
805  AsyncError err;
806  if (error & BEV_EVENT_EOF)
807  err = aeCLOSED;
808  else if (error & BEV_EVENT_TIMEOUT)
809  err = aeTIMEOUT;
810  else if (error & BEV_EVENT_READING)
811  err = aeIO_READ;
812  else if (error & BEV_EVENT_WRITING)
813  err = aeIO_WRITE;
814  else
815  err = aeIO;
816  ++conn->server.stats_.event_err;
817  conn->protocol_server.on_error(err);
818 
819  const ulong conn_id = conn->id;
820  LoggerPtr<>& logger = conn->server.logger;
821  delete conn;
822 
823  if (logger.check(LOG_LEVEL_DEBUG_LOW)) {
824  const SubString errmsg(async_error_msg(err));
825  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(48 + errmsg.size()) << "AsyncServer connection " << conn_id << " error: " << errmsg);
826  }
827  }
828 };
829 
831 
832 }
833 #endif
~ReplyBase()
Definition: ioasync_server.h:318
High-level debug message, used for showing debug info for higher-level behavior (DBUG) ...
Definition: logger.h:136
bool read_fixed(SubString &data, SizeT size, SizeT max_size=0)
Read fixed size data from read buffer.
Definition: ioasync_base.h:288
void read_reset(size_t max_size, size_t min_size=0)
Reset read buffer thresholds.
Definition: ioasync_base.h:370
void log_direct(LogLevel level, const SubString &msg)
Log a message with given log level directly without checking the current log level.
Definition: logger.h:423
Size size() const
Get size.
Definition: list.h:759
T Handler
Handler type.
Definition: ioasync_server.h:327
void send(ulong id, String &data)
Send response for given request ID.
Definition: ioasync_server.h:178
ResponseType
Handler response type – used with ResponseResult.
Definition: ioasync_server.h:375
bool on_init(AsyncBase &server, Global &global)
Called when server is initialized, before any connections are accepted.
Definition: ioasync_server.h:478
Stats()
Definition: ioasync_server.h:591
T & advItem(Key index)
Advanced: Get item (mutable).
Definition: list.h:2796
ResponseResult & operator=(ResultType newresult)
Assignment operator for ResponseResult – use for non rtNORMAL response.
Definition: ioasync_server.h:431
I/O timeout.
Definition: ioasync_base.h:459
void nosend(ulong id)
Cancel current ID since current request doesn&#39;t have a response.
Definition: ioasync_server.h:218
void on_uninit()
Called when server is shutting down, after last request has completed.
Definition: ioasync_server.h:485
Size insertnew(Key index, Size size=1)
Insert new items (modifier).
Definition: list.h:2154
Global & get_global()
Get reference to global data used by all requests and all threads in this server. ...
Definition: ioasync_server.h:610
AsyncServer< ProtocolServer > This
Definition: ioasync_server.h:579
bool runlocal()
Run the event-loop locally in current thread until all pending requests are handled (client only)...
Definition: ioasync_base.h:597
DeferredContextT(Handler &handler)
Constructor.
Definition: ioasync_server.h:333
I/O read error.
Definition: ioasync_base.h:457
ulong accept_err
Definition: ioasync_server.h:587
ulong id
Response ID to use for reply.
Definition: ioasync_server.h:312
I/O write error.
Definition: ioasync_base.h:458
Handler response result.
Definition: ioasync_server.h:389
AsyncBuffers buffers
Buffers for async I/O.
Definition: ioasync_server.h:496
void send_end()
End current response.
Definition: ioasync_server.h:204
Wraps a logger pointer that can reference a logger to use or be disabled.
Definition: logger.h:377
ResponseResult & operator=(ResponseType newtype)
Assignment operator for ResponseResult – use for non rtNORMAL response.
Definition: ioasync_server.h:421
ulong event_err
Definition: ioasync_server.h:588
AsyncServerReplyT(T &bufs)
Constructor.
Definition: ioasync_server.h:96
ulong gen_id()
Generate a new request ID.
Definition: ioasync_server.h:102
Deferred but not the last part of this response.
Definition: ioasync_server.h:32
const char * async_error_msg(AsyncError err)
Get error message for AsyncError code.
Definition: ioasync_base.h:467
bool endref()
Call to cleanup after deferred response is sent/finished.
Definition: ioasync_server.h:351
ulong deferred_active() const
Get current number of deferred responses in progress.
Definition: ioasync_server.h:109
Handle handle
Socket handle/descriptor.
Definition: sysio_sock.h:1353
Definition: ioasync_server.h:31
void deferred_start(U &context)
Call when deferred response is started.
Definition: ioasync_server.h:120
Socket closed by other side.
Definition: ioasync_base.h:455
ulong read_timeout_ms_
Socket read timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:615
AsyncServerReplyT AsyncServerReply
Handles sending server replies – see AsyncServerReplyT.
Definition: ioasync_server.h:275
LoggerPtr logger
Logger for protocol and debug messages, set to enable logging – see set_logger() ...
Definition: ioasync_base.h:489
int Handle
System socket handle.
Definition: sysio_sock.h:697
I/O unrecoverable error.
Definition: ioasync_base.h:456
Size size() const
Get size.
ProtocolServer::Handler::DeferredContext DeferredContext
Definition: ioasync_server.h:580
Handle detach()
Detach and return socket handle.
Definition: sysio_sock.h:1347
No error.
Definition: sys.h:1115
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
ResponseResult(const ResponseResult &src)
Copy constructor.
Definition: ioasync_server.h:402
Low-level debug message, used for showing debug info for lower-level internal or library details (DBG...
Definition: logger.h:137
Error
General Evo error code stored in exceptions, or used directly when exceptions are disabled...
Definition: sys.h:1113
Deferred and the last part of this response.
Definition: ioasync_server.h:33
ResultType result
Normal response result – ignored unless type=rtNORMAL
Definition: ioasync_server.h:393
static TOut & errormsg_out(TOut &out, Error err)
Write detailed error message with errno to output stream/string.
Definition: sysio_sock.h:1118
T ProtocolServer
Definition: ioasync_server.h:578
Response writer used to group multiple writes together for best performance.
Definition: ioasync_server.h:43
const char * data() const
Get string pointer (const).
Definition: string.h:1533
String container.
Definition: string.h:674
Socket I/O device (used internally).
Definition: sysio_sock.h:307
bool log(LogLevel level, const SubString &msg)
Log a message with given severity level.
Definition: logger.h:428
DeferredContextT< T > Context
Alias for this context.
Definition: ioasync_server.h:297
AsyncServer()
Constructor.
Definition: ioasync_server.h:601
T ResultType
Result type for rtNORMAL response.
Definition: ioasync_server.h:390
Base class for Async I/O.
Definition: ioasync_base.h:487
bool deferred_end(U &context)
Call when deferred response is finished.
Definition: ioasync_server.h:134
Alert message for critical alert that needs immediate attention, program may be unstable (ALRT) ...
Definition: logger.h:132
Defer response while waiting for an event – an error if deferred response not supported under curren...
Definition: ioasync_server.h:377
Async I/O server for receiving and handling requests.
Definition: ioasync_server.h:576
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
ProtocolServer::Handler::Global Global
Definition: ioasync_server.h:581
Socket I/O stream.
Definition: iosock.h:734
ResponseResult & operator=(const ResponseResult &src)
Assignment operator to copy.
Definition: ioasync_server.h:441
ListType & addnew(Size size=1)
Append new items (modifier).
Definition: list.h:1996
bool run(IoSocket::Handle listener)
Run server event handling and handle connections until shutdown.
Definition: ioasync_server.h:622
T OutBuffer
Output buffer type.
Definition: ioasync_server.h:26
ResponseResult(ResponseType type)
Constructor for ResponseResult – use for non rtNORMAL response.
Definition: ioasync_server.h:408
LoggerType * ptr
Logger pointer, NULL to disable logging with this
Definition: logger.h:380
T * advLast()
Advanced: Get last item (modifier).
Definition: list.h:2814
ulong count() const
Get current pending deferred response count.
Definition: ioasync_server.h:339
ulong accept_ok
Definition: ioasync_server.h:586
bool detach()
Call when server connection is destroyed.
Definition: ioasync_server.h:365
Evo C++ Library namespace.
Definition: alg.h:11
ulong id
Request/reply ID, used by reply manager (set by parent protocol class)
Definition: ioasync_server.h:498
Definition: ioasync_server.h:584
ResponseResult(ResultType result)
Constructor for ResultType – use for rtNORMAL response.
Definition: ioasync_server.h:414
size_t read_size()
Get read buffer data size in bytes.
Definition: ioasync_base.h:273
Default shared data (empty) with template parameter for global data type.
Definition: ioasync_server.h:468
ProtocolServer::Handler::Shared Shared
Definition: ioasync_server.h:582
void read_flush()
Flush and consume next line from read buffer.
Definition: ioasync_base.h:413
void set_id()
Create and set new ID for current request/response.
Definition: ioasync_server.h:507
Handles sending server replies, and accounts for potentially out of order responses.
Definition: ioasync_server.h:24
Handler * handler
Pointer to handler for sending deferred reply, NULL when connection is destroyed. ...
Definition: ioasync_server.h:328
ReplyBase(Context &context, ulong id)
Definition: ioasync_server.h:314
Error set_nonblock(bool enable=true)
Enable/Disable non-blocking I/O.
Definition: sysio_sock.h:756
Normal response.
Definition: ioasync_server.h:376
ResponseResult()
Default constructor initializes as rtCLOSE – other constructors preferred.
Definition: ioasync_server.h:396
WriterFlags
Flags used with Writer.
Definition: ioasync_server.h:30
void shutdown()
Shut down server.
Definition: ioasync_server.h:675
ulong write_timeout_ms_
Socket write timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:616
Base class for deferred reply.
Definition: ioasync_server.h:310
ulong reads
Definition: ioasync_server.h:589
Writer(This &parent, ulong id, SizeT buf_size, WriterFlags flags=wfNONE)
Constructor.
Definition: ioasync_server.h:55
This & deferred_send(ulong id, String &data, bool last)
Send deferred response for given request ID.
Definition: ioasync_server.h:151
bool check(LogLevel level) const
Check whether a message with given level will actually be logged.
Definition: logger.h:418
Error message showing something isn&#39;t working as expected, program may be able to work around it (ERR...
Definition: logger.h:133
Context & context
Reference to context used for reply.
Definition: ioasync_server.h:311
bool run(Socket &listener)
Run server and handle connections until shutdown.
Definition: ioasync_server.h:667
Response already sent so request is handled, use if error was sent.
Definition: ioasync_server.h:378
String & set()
Set as null and empty.
Definition: string.h:995
IoSocket & device()
Access low-level I/O device for socket.
Definition: iosock.h:764
AsyncServerReply reply
Server reply manager, used to track deferred events and queue out of order replies.
Definition: ioasync_server.h:497
Default shared data (empty) using default global data type.
Definition: ioasync_server.h:493
ResponseType type
Response type – see ResponseType.
Definition: ioasync_server.h:392
AsyncServerHandler()
Constructor.
Definition: ioasync_server.h:501
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
Reference and access existing string data.
Definition: substring.h:229
void addref()
Call when a deferred response is started.
Definition: ioasync_server.h:344
T Global
Global data type used.
Definition: ioasync_server.h:469
AsyncServerReplyT< T > This
This type.
Definition: ioasync_server.h:27
Base async I/O server handler.
Definition: ioasync_server.h:282
Holds a context for deferred responses in progress.
Definition: ioasync_server.h:296
Default global data (empty).
Definition: ioasync_server.h:458
#define EVO_PARAM_UNUSED(NAME)
Mark function parameter as unused to suppress "unreferenced parameter" compiler warnings on it...
Definition: sys.h:427
ulong active_connections
Definition: ioasync_server.h:585
Evo AsyncBase.
bool finished
Whether deferred response is finished.
Definition: ioasync_server.h:324
String & reserve(Size size, bool prefer_realloc=false)
Reserve capacity for additional items (modifier).
Definition: string.h:5027