8 #ifndef INCL_evo_ioasync_server_h 9 #define INCL_evo_ioasync_server_h 23 template<
class T=AsyncBuffers>
43 struct Writer :
public OutBuffer::BulkWrite {
44 using OutBuffer::BulkWrite::init;
59 if (
id == parent.next_id_) {
61 init(parent.buf_, buf_size);
67 parent.deferred_get_queue_item(rsp,
id);
68 init(rsp->data, buf_size);
70 }
else if (
id == parent.prev_id_) {
72 if (parent.prev_ == NULL)
73 init(parent.buf_, buf_size);
75 init(parent.prev_->data, buf_size);
79 if (
id == parent.next_id_) {
80 init(parent.buf_, buf_size);
85 parent.prev_->id = id;
86 init(parent.prev_->data, buf_size);
96 AsyncServerReplyT(T& bufs) : buf_(bufs), deferred_count_(0), gen_id_(1), next_id_(1), prev_id_(0), prev_(NULL) {
110 return deferred_count_;
137 return context.endref();
153 if (
id == next_id_) {
155 buf_.write(data.
data(), data.
size());
161 if (deferred_get_queue_item(rsp,
id))
179 if (
id == prev_id_) {
182 buf_.write(data.
data(), data.
size());
184 prev_->data.add(data);
188 if (
id == next_id_) {
189 buf_.write(data.
data(), data.
size());
192 prev_ = queue_.addnew().advLast();
206 while ((rsp = queue_.advFirst()) != NULL && rsp->id == next_id_) {
207 buf_.write(rsp->data.data(), rsp->data.size());
219 if (
id + 1 == gen_id_)
226 This& operator=(
const This&);
233 ReplyItem() : id(0) {
235 ReplyItem(
const ReplyItem& src) : id(src.id), data(src.data) {
237 ReplyItem& operator=(
const ReplyItem& src) {
245 ulong deferred_count_;
252 bool deferred_get_queue_item(ReplyItem*& rsp, ulong
id) {
254 if (sz > 0 &&
id > queue_[sz-1].
id) {
257 for (; i < sz; ++i) {
314 ReplyBase(Context& context, ulong
id) : context(context), id(id), finished(false) {
315 context.
handler->reply.deferred_start(context);
319 if (!finished && context.
handler != NULL)
352 if (--refcount == 0) {
353 assert( handler == NULL );
453 static const size_t MAX_INITIAL_READ = 8192;
467 template<
class T=Global>
581 typedef typename ProtocolServer::Handler::Global
Global;
582 typedef typename ProtocolServer::Handler::Shared
Shared;
592 active_connections = 0;
628 String msg(
"AsyncServer listener error setting as non-blocking: ");
637 struct event* ev = ::event_new(evloop_->handle(), listener, EV_READ | EV_PERSIST, on_listener_ready,
this);
639 logger.log(
LOG_LEVEL_ALERT,
"AsyncServer libevent event_new() failed on listener -- this shouldn't happen");
643 if (::event_add(ev, NULL) != 0) {
644 logger.log(
LOG_LEVEL_ALERT,
"AsyncServer libevent event_add() failed on listener -- this shouldn't happen");
648 if (!shared_.on_init(*
this, global_)) {
649 logger.log(
LOG_LEVEL_ALERT,
"AsyncServer Shared on_init() returned an error, indicating a bad configuration");
689 DeferredContext* deferred_context;
690 ProtocolServer protocol_server;
691 struct bufferevent* bev;
692 SizeT read_fixed_size_;
695 Connection(This& async_server,
struct bufferevent* bev, ulong
id) :
696 server(async_server), protocol_server(async_server.global_, async_server.shared_, async_server.
logger.
ptr), bev(bev), read_fixed_size_(0), id(
id) {
697 deferred_context =
new DeferredContext(protocol_server.handler);
699 ::bufferevent_setcb(bev, on_read, NULL, on_error,
this);
700 ::bufferevent_setwatermark(bev, EV_READ, T::MIN_INITIAL_READ, T::Handler::MAX_INITIAL_READ);
702 struct timeval read_timeout, write_timeout;
703 const int result = ::bufferevent_set_timeouts(bev,
708 server.
logger.
log(
LOG_LEVEL_ERROR,
"AsyncServer libevent bufferevent_set_timeouts() returned an error -- this shouldn't happen");
713 ::bufferevent_free(bev);
715 if (!deferred_context->detach())
720 if (::bufferevent_enable(bev, EV_READ | EV_WRITE) != 0) {
721 server.
logger.
log(
LOG_LEVEL_ALERT,
"AsyncServer libevent bufferevent_enable() returned an error -- this shouldn't happen");
728 static void on_listener_ready(evutil_socket_t listener,
short event,
void* self_ptr) {
730 This&
self = *(This*)self_ptr;
732 IoSocket listener_socket(listener), client_socket;
733 if (!listener_socket.accept_nonblock(err, client_socket)) {
736 msg =
"AsyncServer socket accept failed: ";
740 ++
self.stats_.accept_err;
743 listener_socket.detach();
745 struct bufferevent* bev = ::bufferevent_socket_new(
self.evloop_->handle(), client_socket.
detach(), BEV_OPT_CLOSE_ON_FREE);
747 self.logger.log(
LOG_LEVEL_ALERT,
"AsyncServer libevent bufferevent_socket_new() returned an error -- this shouldn't happen");
748 ++
self.stats_.accept_err;
753 Connection* conn =
new Connection(
self, bev, ++
self.last_id_);
754 ++
self.stats_.active_connections;
755 if (!conn->enable()) {
757 ++
self.stats_.accept_err;
759 ++
self.stats_.accept_ok;
762 static void on_read(
struct bufferevent* bev,
void* conn_ptr) {
763 Connection* conn = (Connection*)conn_ptr;
765 ++conn->server.stats_.reads;
766 conn->protocol_server.handler.buffers.attach(bev);
769 AsyncBuffers* bufs = &conn->protocol_server.handler.buffers;
770 if (conn->read_fixed_size_ > 0) {
775 if (!bufs->
read_fixed(data, conn->read_fixed_size_))
777 conn->read_fixed_size_ = 0;
778 if (!conn->protocol_server.on_read_fixed(conn->read_fixed_size_, data, conn->deferred_context)) {
785 if (conn->read_fixed_size_ <= 0)
788 bufs->
read_reset(ProtocolServer::MIN_INITIAL_READ, ProtocolServer::Handler::MAX_INITIAL_READ);
795 if (!conn->protocol_server.on_read(conn->read_fixed_size_, *bufs, conn->deferred_context)) {
802 static void on_error(
struct bufferevent* bev,
short error,
void* conn_ptr) {
804 Connection* conn = (Connection*)conn_ptr;
806 if (error & BEV_EVENT_EOF)
808 else if (error & BEV_EVENT_TIMEOUT)
810 else if (error & BEV_EVENT_READING)
812 else if (error & BEV_EVENT_WRITING)
816 ++conn->server.stats_.event_err;
817 conn->protocol_server.on_error(err);
819 const ulong conn_id = conn->id;
~ReplyBase()
Definition: ioasync_server.h:318
bool read_fixed(SubString &data, SizeT size, SizeT max_size=0)
Read fixed size data from read buffer.
Definition: ioasync_base.h:288
void read_reset(size_t max_size, size_t min_size=0)
Reset read buffer thresholds.
Definition: ioasync_base.h:370
void log_direct(LogLevel level, const SubString &msg)
Log a message with given log level directly without checking the current log level.
Definition: logger.h:423
Size size() const
Get size.
Definition: list.h:759
T Handler
Handler type.
Definition: ioasync_server.h:327
void send(ulong id, String &data)
Send response for given request ID.
Definition: ioasync_server.h:178
ResponseType
Handler response type – used with ResponseResult.
Definition: ioasync_server.h:375
bool on_init(AsyncBase &server, Global &global)
Called when server is initialized, before any connections are accepted.
Definition: ioasync_server.h:478
Stats()
Definition: ioasync_server.h:591
T & advItem(Key index)
Advanced: Get item (mutable).
Definition: list.h:2796
ResponseResult & operator=(ResultType newresult)
Assignment operator for ResponseResult – use for non rtNORMAL response.
Definition: ioasync_server.h:431
I/O timeout.
Definition: ioasync_base.h:459
void nosend(ulong id)
Cancel current ID since current request doesn't have a response.
Definition: ioasync_server.h:218
void on_uninit()
Called when server is shutting down, after last request has completed.
Definition: ioasync_server.h:485
Size insertnew(Key index, Size size=1)
Insert new items (modifier).
Definition: list.h:2154
Global & get_global()
Get reference to global data used by all requests and all threads in this server. ...
Definition: ioasync_server.h:610
AsyncServer< ProtocolServer > This
Definition: ioasync_server.h:579
bool runlocal()
Run the event-loop locally in current thread until all pending requests are handled (client only)...
Definition: ioasync_base.h:597
DeferredContextT(Handler &handler)
Constructor.
Definition: ioasync_server.h:333
I/O read error.
Definition: ioasync_base.h:457
ulong accept_err
Definition: ioasync_server.h:587
ulong id
Response ID to use for reply.
Definition: ioasync_server.h:312
I/O write error.
Definition: ioasync_base.h:458
Handler response result.
Definition: ioasync_server.h:389
AsyncBuffers buffers
Buffers for async I/O.
Definition: ioasync_server.h:496
void send_end()
End current response.
Definition: ioasync_server.h:204
Wraps a logger pointer that can reference a logger to use or be disabled.
Definition: logger.h:377
ResponseResult & operator=(ResponseType newtype)
Assignment operator for ResponseResult – use for non rtNORMAL response.
Definition: ioasync_server.h:421
ulong event_err
Definition: ioasync_server.h:588
AsyncServerReplyT(T &bufs)
Constructor.
Definition: ioasync_server.h:96
ulong gen_id()
Generate a new request ID.
Definition: ioasync_server.h:102
Deferred but not the last part of this response.
Definition: ioasync_server.h:32
const char * async_error_msg(AsyncError err)
Get error message for AsyncError code.
Definition: ioasync_base.h:467
bool endref()
Call to cleanup after deferred response is sent/finished.
Definition: ioasync_server.h:351
ulong deferred_active() const
Get current number of deferred responses in progress.
Definition: ioasync_server.h:109
Handle handle
Socket handle/descriptor.
Definition: sysio_sock.h:1353
Definition: ioasync_server.h:31
void deferred_start(U &context)
Call when deferred response is started.
Definition: ioasync_server.h:120
Socket closed by other side.
Definition: ioasync_base.h:455
ulong read_timeout_ms_
Socket read timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:615
AsyncServerReplyT AsyncServerReply
Handles sending server replies – see AsyncServerReplyT.
Definition: ioasync_server.h:275
LoggerPtr logger
Logger for protocol and debug messages, set to enable logging – see set_logger() ...
Definition: ioasync_base.h:489
int Handle
System socket handle.
Definition: sysio_sock.h:697
I/O unrecoverable error.
Definition: ioasync_base.h:456
Size size() const
Get size.
ProtocolServer::Handler::DeferredContext DeferredContext
Definition: ioasync_server.h:580
Handle detach()
Detach and return socket handle.
Definition: sysio_sock.h:1347
No error.
Definition: sys.h:1115
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
ResponseResult(const ResponseResult &src)
Copy constructor.
Definition: ioasync_server.h:402
Error
General Evo error code stored in exceptions, or used directly when exceptions are disabled...
Definition: sys.h:1113
Deferred and the last part of this response.
Definition: ioasync_server.h:33
ResultType result
Normal response result – ignored unless type=rtNORMAL
Definition: ioasync_server.h:393
static TOut & errormsg_out(TOut &out, Error err)
Write detailed error message with errno to output stream/string.
Definition: sysio_sock.h:1118
T ProtocolServer
Definition: ioasync_server.h:578
Response writer used to group multiple writes together for best performance.
Definition: ioasync_server.h:43
const char * data() const
Get string pointer (const).
Definition: string.h:1533
String container.
Definition: string.h:674
Socket I/O device (used internally).
Definition: sysio_sock.h:307
bool log(LogLevel level, const SubString &msg)
Log a message with given severity level.
Definition: logger.h:428
DeferredContextT< T > Context
Alias for this context.
Definition: ioasync_server.h:297
AsyncServer()
Constructor.
Definition: ioasync_server.h:601
T ResultType
Result type for rtNORMAL response.
Definition: ioasync_server.h:390
Base class for Async I/O.
Definition: ioasync_base.h:487
bool deferred_end(U &context)
Call when deferred response is finished.
Definition: ioasync_server.h:134
Defer response while waiting for an event – an error if deferred response not supported under curren...
Definition: ioasync_server.h:377
Async I/O server for receiving and handling requests.
Definition: ioasync_server.h:576
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
ProtocolServer::Handler::Global Global
Definition: ioasync_server.h:581
Socket I/O stream.
Definition: iosock.h:734
ResponseResult & operator=(const ResponseResult &src)
Assignment operator to copy.
Definition: ioasync_server.h:441
ListType & addnew(Size size=1)
Append new items (modifier).
Definition: list.h:1996
bool run(IoSocket::Handle listener)
Run server event handling and handle connections until shutdown.
Definition: ioasync_server.h:622
T OutBuffer
Output buffer type.
Definition: ioasync_server.h:26
ResponseResult(ResponseType type)
Constructor for ResponseResult – use for non rtNORMAL response.
Definition: ioasync_server.h:408
LoggerType * ptr
Logger pointer, NULL to disable logging with this
Definition: logger.h:380
T * advLast()
Advanced: Get last item (modifier).
Definition: list.h:2814
ulong count() const
Get current pending deferred response count.
Definition: ioasync_server.h:339
ulong accept_ok
Definition: ioasync_server.h:586
bool detach()
Call when server connection is destroyed.
Definition: ioasync_server.h:365
Evo C++ Library namespace.
Definition: alg.h:11
ulong id
Request/reply ID, used by reply manager (set by parent protocol class)
Definition: ioasync_server.h:498
Definition: ioasync_server.h:584
ResponseResult(ResultType result)
Constructor for ResultType – use for rtNORMAL response.
Definition: ioasync_server.h:414
size_t read_size()
Get read buffer data size in bytes.
Definition: ioasync_base.h:273
Default shared data (empty) with template parameter for global data type.
Definition: ioasync_server.h:468
ProtocolServer::Handler::Shared Shared
Definition: ioasync_server.h:582
void read_flush()
Flush and consume next line from read buffer.
Definition: ioasync_base.h:413
void set_id()
Create and set new ID for current request/response.
Definition: ioasync_server.h:507
Handles sending server replies, and accounts for potentially out of order responses.
Definition: ioasync_server.h:24
Handler * handler
Pointer to handler for sending deferred reply, NULL when connection is destroyed. ...
Definition: ioasync_server.h:328
ReplyBase(Context &context, ulong id)
Definition: ioasync_server.h:314
Error set_nonblock(bool enable=true)
Enable/Disable non-blocking I/O.
Definition: sysio_sock.h:756
Normal response.
Definition: ioasync_server.h:376
ResponseResult()
Default constructor initializes as rtCLOSE – other constructors preferred.
Definition: ioasync_server.h:396
WriterFlags
Flags used with Writer.
Definition: ioasync_server.h:30
void shutdown()
Shut down server.
Definition: ioasync_server.h:675
ulong write_timeout_ms_
Socket write timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:616
Base class for deferred reply.
Definition: ioasync_server.h:310
ulong reads
Definition: ioasync_server.h:589
Writer(This &parent, ulong id, SizeT buf_size, WriterFlags flags=wfNONE)
Constructor.
Definition: ioasync_server.h:55
This & deferred_send(ulong id, String &data, bool last)
Send deferred response for given request ID.
Definition: ioasync_server.h:151
bool check(LogLevel level) const
Check whether a message with given level will actually be logged.
Definition: logger.h:418
Context & context
Reference to context used for reply.
Definition: ioasync_server.h:311
bool run(Socket &listener)
Run server and handle connections until shutdown.
Definition: ioasync_server.h:667
Response already sent so request is handled, use if error was sent.
Definition: ioasync_server.h:378
String & set()
Set as null and empty.
Definition: string.h:995
IoSocket & device()
Access low-level I/O device for socket.
Definition: iosock.h:764
AsyncServerReply reply
Server reply manager, used to track deferred events and queue out of order replies.
Definition: ioasync_server.h:497
Default shared data (empty) using default global data type.
Definition: ioasync_server.h:493
ResponseType type
Response type – see ResponseType.
Definition: ioasync_server.h:392
AsyncServerHandler()
Constructor.
Definition: ioasync_server.h:501
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
Reference and access existing string data.
Definition: substring.h:229
void addref()
Call when a deferred response is started.
Definition: ioasync_server.h:344
T Global
Global data type used.
Definition: ioasync_server.h:469
AsyncServerReplyT< T > This
This type.
Definition: ioasync_server.h:27
Base async I/O server handler.
Definition: ioasync_server.h:282
Holds a context for deferred responses in progress.
Definition: ioasync_server.h:296
Default global data (empty).
Definition: ioasync_server.h:458
#define EVO_PARAM_UNUSED(NAME)
Mark function parameter as unused to suppress "unreferenced parameter" compiler warnings on it...
Definition: sys.h:427
ulong active_connections
Definition: ioasync_server.h:585
bool finished
Whether deferred response is finished.
Definition: ioasync_server.h:324
String & reserve(Size size, bool prefer_realloc=false)
Reserve capacity for additional items (modifier).
Definition: string.h:5027