8 #ifndef INCL_evo_ioasync_client_h 9 #define INCL_evo_ioasync_client_h 27 template<
class T,
class Q>
67 AsyncClient(
SizeT max_queue_size,
SizeT max_read_size) :
queue_(max_queue_size), id_(get_next_id()), state_(
sNONE), bev_(NULL), on_connect_(NULL), on_error_(NULL), max_read_size_(max_read_size) {
104 if (state_ >
sNONE) {
106 ::bufferevent_free(bev_);
110 ((ProtocolHandler&)*
this).on_close();
137 bool connect_ip(
const char* host, ushort port,
int family=AF_INET) {
142 assert( address_info.
ptr->ai_addrlen > 0 && (
size_t)address_info.
ptr->ai_addrlen <= (
size_t)
Int::MAX );
143 if (connect_new(address_info.
ptr->ai_addr, (
int)address_info.
ptr->ai_addrlen)) {
204 init(parent.bufs_, buf_size);
224 struct bufferevent* bev_;
226 OnConnect* on_connect_;
228 SizeT read_fixed_size_;
229 SizeT max_read_size_;
231 bool check_client_active() {
235 bool connect_new(
struct sockaddr* addr,
int addr_len) {
237 bev_ = ::bufferevent_socket_new(
evloop_->
handle(), -1, BEV_OPT_CLOSE_ON_FREE);
238 ::bufferevent_setcb(bev_, on_read, NULL, on_event,
this);
239 ::bufferevent_setwatermark(bev_, EV_READ, ProtocolHandler::MIN_INITIAL_READ, max_read_size_);
241 struct timeval read_timeout, write_timeout;
242 ::bufferevent_set_timeouts(bev_,
248 read_fixed_size_ = 0;
249 if (::bufferevent_enable(bev_, EV_READ | EV_WRITE) == 0) {
251 if (::bufferevent_socket_connect(bev_, addr, addr_len) == 0) {
262 static void on_read(
struct bufferevent* bev,
void* self_handler) {
265 ProtocolHandler&
self = *(ProtocolHandler*)self_handler;
268 if (
self.read_fixed_size_ > 0) {
273 if (!bufs.
read_fixed(data,
self.read_fixed_size_))
275 self.read_fixed_size_ = 0;
276 if (!
self.on_read_fixed(
self.read_fixed_size_, data, NULL)) {
283 if (
self.read_fixed_size_ <= 0)
286 bufs.
read_reset(ProtocolHandler::MIN_INITIAL_READ,
self.max_read_size_);
292 if (!
self.on_read(
self.read_fixed_size_, bufs, NULL)) {
300 static void on_event(
struct bufferevent* bev,
short events,
void* self_ptr) {
303 This&
self = *(This*)self_ptr;
304 if (events & BEV_EVENT_CONNECTED &&
self.state_ ==
sCONNECTING) {
307 ((ProtocolHandler&)
self).on_connect();
308 if (
self.on_connect_ != NULL)
309 self.on_connect_->on_connect();
312 if (events & BEV_EVENT_EOF)
314 else if (events & BEV_EVENT_TIMEOUT)
316 else if (events & BEV_EVENT_READING)
318 else if (events & BEV_EVENT_WRITING)
320 else if (events & BEV_EVENT_CONNECTED) {
321 self.logger.log(
LOG_LEVEL_ERROR, logstr.
set().
reserve(64) <<
"AsyncClient " <<
self.id_ <<
" error: Unexpected 'connected' event");
327 self.logger.log_direct(
LOG_LEVEL_ERROR, logstr.
set().
reserve(42 + errmsg.size()) <<
"AsyncClient " <<
self.id_ <<
" error: " << errmsg <<
" (code: " << (
int)err <<
')');
331 ((ProtocolHandler&)
self).on_error(err);
332 if (
self.on_error_ != NULL)
333 self.on_error_->on_error(err);
337 static ulong get_next_id() {
Error convert(const char *host)
Convert host address to one or more socket addresses.
Definition: sysio_sock.h:258
bool read_fixed(SubString &data, SizeT size, SizeT max_size=0)
Read fixed size data from read buffer.
Definition: ioasync_base.h:288
size_t write_size() const
Definition: ioasync_base.h:421
Template class for an async I/O client.
Definition: ioasync_client.h:28
void read_reset(size_t max_size, size_t min_size=0)
Reset read buffer thresholds.
Definition: ioasync_base.h:370
void log_direct(LogLevel level, const SubString &msg)
Log a message with given log level directly without checking the current log level.
Definition: logger.h:423
~RequestWriter()
Destructor writes and/or queues the request data (as applicable).
Definition: ioasync_client.h:208
Q QueueItem
Response queue item type.
Definition: ioasync_client.h:31
Atomic integer type.
Definition: atomic.h:311
I/O timeout.
Definition: ioasync_base.h:459
void close()
Close connection.
Definition: ioasync_client.h:103
No connection.
Definition: ioasync_client.h:58
I/O read error.
Definition: ioasync_base.h:457
Request data for prequeue.
Definition: ioasync_client.h:161
void attach_read()
Attach to current write buffers for reading too (used internally).
Definition: ioasync_base.h:266
This & set_on_connect(OnConnect *cb)
Set general handler to call when a connection is established.
Definition: ioasync_client.h:120
I/O write error.
Definition: ioasync_base.h:458
Connection in progress.
Definition: ioasync_client.h:59
void init_attach(AsyncBase &parent)
Initialize and attach to a parent event-loop.
Definition: ioasync_base.h:633
#define EVO_ONCPP11(EXPR)
Compile EXPR only if C++11 support is detected, otherwise this is a no-op.
Definition: sys.h:259
PreQueueItem()
Constructor.
Definition: ioasync_client.h:166
void init()
Initialize event-loop.
Definition: ioasync_base.h:622
ulong get_id() const
Get current client ID.
Definition: ioasync_client.h:78
const char * async_error_msg(AsyncError err)
Get error message for AsyncError code.
Definition: ioasync_base.h:467
PreQueueItem & operator=(const PreQueueItem &src)
Assignment operator.
Definition: ioasync_client.h:179
static const T MAX
Maximum interger value.
Definition: type.h:996
AsyncClient(SizeT max_queue_size, SizeT max_read_size)
Constructor.
Definition: ioasync_client.h:67
QueueItem item
Request response data for main queue – this is added to the main queue when the output data is writt...
Definition: ioasync_client.h:163
AtomicBufferQueue< QueueItem > queue_
Queue where each item represents an expected response from server.
Definition: ioasync_client.h:214
Socket closed by other side.
Definition: ioasync_base.h:455
ulong read_timeout_ms_
Socket read timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:615
Client connected event.
Definition: ioasync_client.h:35
LoggerPtr logger
Logger for protocol and debug messages, set to enable logging – see set_logger() ...
Definition: ioasync_base.h:489
I/O unrecoverable error.
Definition: ioasync_base.h:456
Parent & parent
Parent AsyncClient for request.
Definition: ioasync_client.h:196
Size size() const
Get size.
struct addrinfo * ptr
Pointer to first address in resolve results.
Definition: sysio_sock.h:127
Client error event.
Definition: ioasync_client.h:44
No error.
Definition: sys.h:1115
void attach_write(struct bufferevent *bev)
Attach to active buffers for writing (used internally).
Definition: ioasync_base.h:260
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
Error
General Evo error code stored in exceptions, or used directly when exceptions are disabled...
Definition: sys.h:1113
virtual void on_error(AsyncError error)
Called on an error that breaks the connection.
Definition: ioasync_client.h:51
String container.
Definition: string.h:674
bool log(LogLevel level, const SubString &msg)
Log a message with given severity level.
Definition: logger.h:428
Base class for Async I/O.
Definition: ioasync_base.h:487
T ProtocolHandler
Derived protocol hander type (must inherit from AsyncClient)
Definition: ioasync_client.h:30
bool connect_ip(const char *host, ushort port, int family=AF_INET)
Start IP connection.
Definition: ioasync_client.h:137
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
Used by the protocol implementation to write a request to an AsyncClient.
Definition: ioasync_client.h:193
virtual ~OnConnect()
Destructor.
Definition: ioasync_client.h:37
String buf
Request output data buffer – only used if not writing directly to socket.
Definition: ioasync_client.h:162
void clear()
Clear all items from queue, making it empty.
Definition: atomic_buffer_queue.h:125
This & attach_to(AsyncBase &parent)
Attach to a parent AsyncClient or AsyncServer and use the same event-loop as the parent.
Definition: ioasync_client.h:95
virtual ~OnError()
Destructor.
Definition: ioasync_client.h:46
Use to group multiple writes for efficiency.
Definition: ioasync_base.h:142
Evo C++ Library namespace.
Definition: alg.h:11
State get_state() const
Get current state.
Definition: ioasync_client.h:85
size_t read_size()
Get read buffer data size in bytes.
Definition: ioasync_base.h:273
void read_flush()
Flush and consume next line from read buffer.
Definition: ioasync_base.h:413
Handle handle()
Get event loop handle.
Definition: ioasync_base.h:71
Resolves socket name/address to socket address info.
Definition: sysio_sock.h:125
virtual void on_connect()
Called when client is connected.
Definition: ioasync_client.h:40
RequestWriter(Parent &parent, size_t buf_size)
Constructor sets up request writer for parent AsyncClient.
Definition: ioasync_client.h:203
ulong write_timeout_ms_
Socket write timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:616
AsyncClient< T, Q > This
This type
Definition: ioasync_client.h:32
bool check(LogLevel level) const
Check whether a message with given level will actually be logged.
Definition: logger.h:418
Connection refused (clients only)
Definition: ioasync_base.h:454
PreQueueItem pq
Request data for prequeue: write buffer, main queue response data.
Definition: ioasync_client.h:197
String & set()
Set as null and empty.
Definition: string.h:995
void add(typename DataCopy< Item >::PassType item)
Add item to queue.
Definition: atomic_buffer_queue.h:141
State
Client state.
Definition: ioasync_client.h:57
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
This Parent
Parent AsyncClient type.
Definition: ioasync_client.h:194
Reference and access existing string data.
Definition: substring.h:229
This & set_on_error(OnError *cb)
Set general handler to call when an unexpected error occurs.
Definition: ioasync_client.h:129
Connected.
Definition: ioasync_client.h:60
AsyncEventLoop * evloop_
Event loop pointer, either owned by this or a parent.
Definition: ioasync_base.h:614
static struct timeval * get_timeout_ptr(struct timeval &out, ulong ms)
Get timeval struct pointer from timeout in milliseconds.
Definition: ioasync_base.h:691
PreQueueItem(const PreQueueItem &src)
Copy constructor.
Definition: ioasync_client.h:172
#define EVO_PARAM_UNUSED(NAME)
Mark function parameter as unused to suppress "unreferenced parameter" compiler warnings on it...
Definition: sys.h:427
bool empty() const
Get whether queue is empty.
Definition: atomic_buffer_queue.h:109
String & reserve(Size size, bool prefer_realloc=false)
Reserve capacity for additional items (modifier).
Definition: string.h:5027
~AsyncClient()
Destructor.
Definition: ioasync_client.h:71