Evo C++ Library v0.5.1
ioasync_client.h
Go to the documentation of this file.
1 // Evo C++ Library
2 /* Copyright 2019 Justin Crowell
3 Distributed under the BSD 2-Clause License -- see included file LICENSE.txt for details.
4 */
6 
7 #pragma once
8 #ifndef INCL_evo_ioasync_client_h
9 #define INCL_evo_ioasync_client_h
10 
11 #include "ioasync_base.h"
12 
13 namespace evo {
16 
18 
27 template<class T, class Q>
28 class AsyncClient : public AsyncBase {
29 public:
30  typedef T ProtocolHandler;
31  typedef Q QueueItem;
33 
35  struct OnConnect {
37  virtual ~OnConnect() { }
38 
40  virtual void on_connect() { }
41  };
42 
44  struct OnError {
46  virtual ~OnError() { }
47 
51  virtual void on_error(AsyncError error) {
52  EVO_PARAM_UNUSED(error);
53  }
54  };
55 
57  enum State {
58  sNONE = 0,
61  };
62 
67  AsyncClient(SizeT max_queue_size, SizeT max_read_size) : queue_(max_queue_size), id_(get_next_id()), state_(sNONE), bev_(NULL), on_connect_(NULL), on_error_(NULL), max_read_size_(max_read_size) {
68  }
69 
72  close();
73  }
74 
78  ulong get_id() const {
79  return id_;
80  }
81 
85  State get_state() const {
86  return state_;
87  }
88 
95  This& attach_to(AsyncBase& parent) {
96  init_attach(parent);
97  return *this;
98  }
99 
103  void close() {
104  if (state_ > sNONE) {
105  bufs_.reset();
106  ::bufferevent_free(bev_);
107  bev_ = NULL;
108  state_ = sNONE;
109  queue_.clear();
110  ((ProtocolHandler&)*this).on_close();
112  logger.log(LOG_LEVEL_DEBUG_LOW, String().reserve(32) << "AsyncClient " << id_ << " closed");
113  }
114  }
115 
120  This& set_on_connect(OnConnect* cb) {
121  on_connect_ = cb;
122  return *this;
123  }
124 
129  This& set_on_error(OnError* cb) {
130  on_error_ = cb;
131  return *this;
132  }
133 
137  bool connect_ip(const char* host, ushort port, int family=AF_INET) {
138  close();
139  SocketAddressInfo address_info(family);
140  Error err = address_info.convert(host, port);
141  if (err == ENone) {
142  assert( address_info.ptr->ai_addrlen > 0 && (size_t)address_info.ptr->ai_addrlen <= (size_t)Int::MAX );
143  if (connect_new(address_info.ptr->ai_addr, (int)address_info.ptr->ai_addrlen)) {
145  const SubString host_str(host);
146  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(50 + host_str.size()) << "AsyncClient " << id_ << " connect_ip: '" << host_str << "' port " << port);
147  }
148  return true;
149  }
151  logger.log_direct(LOG_LEVEL_ERROR, String().reserve(44) << "AsyncClient " << id_ << " connect_new() failed");
152  } else if (logger.check(LOG_LEVEL_ERROR)) {
153  const SubString host_str(host);
154  logger.log_direct(LOG_LEVEL_ERROR, String().reserve(58 + host_str.size()) << "AsyncClient " << id_ << " connect_ip() failed on bad host: '" << host_str << "'");
155  }
156  return false;
157  }
158 
159 protected:
161  struct PreQueueItem {
163  QueueItem item;
164 
167  }
168 
172  PreQueueItem(const PreQueueItem& src) : buf(src.buf), item(src.item) {
173  }
174 
180  buf = src.buf;
181  item = src.item;
182  return *this;
183  }
184  };
185 
194  typedef This Parent;
195 
196  Parent& parent;
198 
203  RequestWriter(Parent& parent, size_t buf_size) : parent(parent) {
204  init(parent.bufs_, buf_size);
205  }
206 
209  if (!pq.item.null())
210  parent.queue_.add(pq.item);
211  }
212  };
213 
215 
216 private:
217  // Disable copy constructor
218  AsyncClient(const This&) EVO_ONCPP11(= delete);
219 
220  AsyncBuffers bufs_;
221 
222  ulong id_;
223  State state_;
224  struct bufferevent* bev_;
225 
226  OnConnect* on_connect_;
227  OnError* on_error_;
228  SizeT read_fixed_size_;
229  SizeT max_read_size_;
230 
231  bool check_client_active() {
232  return (!queue_.empty() || bufs_.write_size() > 0);
233  }
234 
235  bool connect_new(struct sockaddr* addr, int addr_len) {
236  init();
237  bev_ = ::bufferevent_socket_new(evloop_->handle(), -1, BEV_OPT_CLOSE_ON_FREE);
238  ::bufferevent_setcb(bev_, on_read, NULL, on_event, this);
239  ::bufferevent_setwatermark(bev_, EV_READ, ProtocolHandler::MIN_INITIAL_READ, max_read_size_);
240  if (read_timeout_ms_ > 0 || write_timeout_ms_ > 0) {
241  struct timeval read_timeout, write_timeout;
242  ::bufferevent_set_timeouts(bev_,
243  get_timeout_ptr(read_timeout, read_timeout_ms_),
244  get_timeout_ptr(write_timeout, write_timeout_ms_)
245  );
246  }
247 
248  read_fixed_size_ = 0;
249  if (::bufferevent_enable(bev_, EV_READ | EV_WRITE) == 0) {
250  bufs_.attach_write(bev_);
251  if (::bufferevent_socket_connect(bev_, addr, addr_len) == 0) {
252  state_ = sCONNECTING;
253  return true;
254  } else
255  logger.log(LOG_LEVEL_ERROR, "AsyncClient libevent error: bufferevent_socket_connect() failed");
256  } else
257  logger.log(LOG_LEVEL_ERROR, "AsyncClient libevent error: bufferevent_enable() failed");
258  close();
259  return false;
260  }
261 
262  static void on_read(struct bufferevent* bev, void* self_handler) {
263  EVO_PARAM_UNUSED(bev);
264  String logstr;
265  ProtocolHandler& self = *(ProtocolHandler*)self_handler;
266  AsyncBuffers& bufs = self.bufs_;
267  bufs.attach_read();
268  if (self.read_fixed_size_ > 0) {
269  if (self.logger.check(LOG_LEVEL_DEBUG_LOW))
270  self.logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(64) << "AsyncClient " << self.id_ << " fixed read: " << self.read_fixed_size_);
271  for (;;) {
272  SubString data;
273  if (!bufs.read_fixed(data, self.read_fixed_size_))
274  return; // wait for more data
275  self.read_fixed_size_ = 0;
276  if (!self.on_read_fixed(self.read_fixed_size_, data, NULL)) {
277  if (self.logger.check(LOG_LEVEL_DEBUG_LOW))
278  self.logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(64) << "AsyncClient " << self.id_ << " on_read_fixed() returned false to close");
279  self.close();
280  return;
281  }
282  bufs.read_flush();
283  if (self.read_fixed_size_ <= 0)
284  break;
285  }
286  bufs.read_reset(ProtocolHandler::MIN_INITIAL_READ, self.max_read_size_);
287  if (bufs.read_size() == 0)
288  return;
289  }
290  if (self.logger.check(LOG_LEVEL_DEBUG_LOW))
291  self.logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(64) << "AsyncClient " << self.id_ << " read: " << bufs.read_size());
292  if (!self.on_read(self.read_fixed_size_, bufs, NULL)) {
293  if (self.logger.check(LOG_LEVEL_DEBUG_LOW))
294  self.logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(64) << "AsyncClient " << self.id_ << " on_read() returned false to close");
295  self.close();
296  return;
297  }
298  }
299 
300  static void on_event(struct bufferevent* bev, short events, void* self_ptr) {
301  EVO_PARAM_UNUSED(bev);
302  String logstr;
303  This& self = *(This*)self_ptr;
304  if (events & BEV_EVENT_CONNECTED && self.state_ == sCONNECTING) {
305  self.logger.log(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(34) << "AsyncClient " << self.id_ << " connected");
306  self.state_ = sCONNECTED;
307  ((ProtocolHandler&)self).on_connect();
308  if (self.on_connect_ != NULL)
309  self.on_connect_->on_connect();
310  } else {
311  AsyncError err;
312  if (events & BEV_EVENT_EOF)
313  err = aeCLOSED;
314  else if (events & BEV_EVENT_TIMEOUT)
315  err = aeTIMEOUT;
316  else if (events & BEV_EVENT_READING)
317  err = (self.state_ == sCONNECTING ? aeCONNECT : aeIO_READ);
318  else if (events & BEV_EVENT_WRITING)
319  err = (self.state_ == sCONNECTING ? aeCONNECT : aeIO_WRITE);
320  else if (events & BEV_EVENT_CONNECTED) {
321  self.logger.log(LOG_LEVEL_ERROR, logstr.set().reserve(64) << "AsyncClient " << self.id_ << " error: Unexpected 'connected' event");
322  err = aeIO;
323  } else
324  err = aeIO;
325  if (self.logger.check(LOG_LEVEL_ERROR)) {
326  const SubString errmsg(async_error_msg(err));
327  self.logger.log_direct(LOG_LEVEL_ERROR, logstr.set().reserve(42 + errmsg.size()) << "AsyncClient " << self.id_ << " error: " << errmsg << " (code: " << (int)err << ')');
328  }
329 
330  self.close();
331  ((ProtocolHandler&)self).on_error(err);
332  if (self.on_error_ != NULL)
333  self.on_error_->on_error(err);
334  }
335  }
336 
337  static ulong get_next_id() {
338  static AtomicULong id;
339  return ++id;
340  }
341 };
342 
344 
345 }
346 #endif
High-level debug message, used for showing debug info for higher-level behavior (DBUG) ...
Definition: logger.h:136
Error convert(const char *host)
Convert host address to one or more socket addresses.
Definition: sysio_sock.h:258
bool read_fixed(SubString &data, SizeT size, SizeT max_size=0)
Read fixed size data from read buffer.
Definition: ioasync_base.h:288
size_t write_size() const
Definition: ioasync_base.h:421
Template class for an async I/O client.
Definition: ioasync_client.h:28
void read_reset(size_t max_size, size_t min_size=0)
Reset read buffer thresholds.
Definition: ioasync_base.h:370
void log_direct(LogLevel level, const SubString &msg)
Log a message with given log level directly without checking the current log level.
Definition: logger.h:423
~RequestWriter()
Destructor writes and/or queues the request data (as applicable).
Definition: ioasync_client.h:208
Q QueueItem
Response queue item type.
Definition: ioasync_client.h:31
Atomic integer type.
Definition: atomic.h:311
I/O timeout.
Definition: ioasync_base.h:459
void close()
Close connection.
Definition: ioasync_client.h:103
No connection.
Definition: ioasync_client.h:58
I/O read error.
Definition: ioasync_base.h:457
Request data for prequeue.
Definition: ioasync_client.h:161
void attach_read()
Attach to current write buffers for reading too (used internally).
Definition: ioasync_base.h:266
This & set_on_connect(OnConnect *cb)
Set general handler to call when a connection is established.
Definition: ioasync_client.h:120
I/O write error.
Definition: ioasync_base.h:458
Connection in progress.
Definition: ioasync_client.h:59
void init_attach(AsyncBase &parent)
Initialize and attach to a parent event-loop.
Definition: ioasync_base.h:633
#define EVO_ONCPP11(EXPR)
Compile EXPR only if C++11 support is detected, otherwise this is a no-op.
Definition: sys.h:259
PreQueueItem()
Constructor.
Definition: ioasync_client.h:166
void init()
Initialize event-loop.
Definition: ioasync_base.h:622
ulong get_id() const
Get current client ID.
Definition: ioasync_client.h:78
const char * async_error_msg(AsyncError err)
Get error message for AsyncError code.
Definition: ioasync_base.h:467
PreQueueItem & operator=(const PreQueueItem &src)
Assignment operator.
Definition: ioasync_client.h:179
static const T MAX
Maximum interger value.
Definition: type.h:996
AsyncClient(SizeT max_queue_size, SizeT max_read_size)
Constructor.
Definition: ioasync_client.h:67
QueueItem item
Request response data for main queue – this is added to the main queue when the output data is writt...
Definition: ioasync_client.h:163
AtomicBufferQueue< QueueItem > queue_
Queue where each item represents an expected response from server.
Definition: ioasync_client.h:214
Socket closed by other side.
Definition: ioasync_base.h:455
ulong read_timeout_ms_
Socket read timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:615
Client connected event.
Definition: ioasync_client.h:35
LoggerPtr logger
Logger for protocol and debug messages, set to enable logging – see set_logger() ...
Definition: ioasync_base.h:489
I/O unrecoverable error.
Definition: ioasync_base.h:456
Parent & parent
Parent AsyncClient for request.
Definition: ioasync_client.h:196
Size size() const
Get size.
struct addrinfo * ptr
Pointer to first address in resolve results.
Definition: sysio_sock.h:127
Client error event.
Definition: ioasync_client.h:44
No error.
Definition: sys.h:1115
void attach_write(struct bufferevent *bev)
Attach to active buffers for writing (used internally).
Definition: ioasync_base.h:260
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
Low-level debug message, used for showing debug info for lower-level internal or library details (DBG...
Definition: logger.h:137
Error
General Evo error code stored in exceptions, or used directly when exceptions are disabled...
Definition: sys.h:1113
virtual void on_error(AsyncError error)
Called on an error that breaks the connection.
Definition: ioasync_client.h:51
String container.
Definition: string.h:674
bool log(LogLevel level, const SubString &msg)
Log a message with given severity level.
Definition: logger.h:428
Base class for Async I/O.
Definition: ioasync_base.h:487
T ProtocolHandler
Derived protocol hander type (must inherit from AsyncClient)
Definition: ioasync_client.h:30
bool connect_ip(const char *host, ushort port, int family=AF_INET)
Start IP connection.
Definition: ioasync_client.h:137
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
Used by the protocol implementation to write a request to an AsyncClient.
Definition: ioasync_client.h:193
virtual ~OnConnect()
Destructor.
Definition: ioasync_client.h:37
String buf
Request output data buffer – only used if not writing directly to socket.
Definition: ioasync_client.h:162
void clear()
Clear all items from queue, making it empty.
Definition: atomic_buffer_queue.h:125
This & attach_to(AsyncBase &parent)
Attach to a parent AsyncClient or AsyncServer and use the same event-loop as the parent.
Definition: ioasync_client.h:95
virtual ~OnError()
Destructor.
Definition: ioasync_client.h:46
Use to group multiple writes for efficiency.
Definition: ioasync_base.h:142
Evo C++ Library namespace.
Definition: alg.h:11
State get_state() const
Get current state.
Definition: ioasync_client.h:85
size_t read_size()
Get read buffer data size in bytes.
Definition: ioasync_base.h:273
void read_flush()
Flush and consume next line from read buffer.
Definition: ioasync_base.h:413
Handle handle()
Get event loop handle.
Definition: ioasync_base.h:71
Resolves socket name/address to socket address info.
Definition: sysio_sock.h:125
virtual void on_connect()
Called when client is connected.
Definition: ioasync_client.h:40
RequestWriter(Parent &parent, size_t buf_size)
Constructor sets up request writer for parent AsyncClient.
Definition: ioasync_client.h:203
ulong write_timeout_ms_
Socket write timeout in milliseconds, 0 for none (never timeout)
Definition: ioasync_base.h:616
AsyncClient< T, Q > This
This type
Definition: ioasync_client.h:32
bool check(LogLevel level) const
Check whether a message with given level will actually be logged.
Definition: logger.h:418
Error message showing something isn&#39;t working as expected, program may be able to work around it (ERR...
Definition: logger.h:133
Connection refused (clients only)
Definition: ioasync_base.h:454
PreQueueItem pq
Request data for prequeue: write buffer, main queue response data.
Definition: ioasync_client.h:197
String & set()
Set as null and empty.
Definition: string.h:995
void add(typename DataCopy< Item >::PassType item)
Add item to queue.
Definition: atomic_buffer_queue.h:141
State
Client state.
Definition: ioasync_client.h:57
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
This Parent
Parent AsyncClient type.
Definition: ioasync_client.h:194
Reference and access existing string data.
Definition: substring.h:229
This & set_on_error(OnError *cb)
Set general handler to call when an unexpected error occurs.
Definition: ioasync_client.h:129
Connected.
Definition: ioasync_client.h:60
AsyncEventLoop * evloop_
Event loop pointer, either owned by this or a parent.
Definition: ioasync_base.h:614
static struct timeval * get_timeout_ptr(struct timeval &out, ulong ms)
Get timeval struct pointer from timeout in milliseconds.
Definition: ioasync_base.h:691
PreQueueItem(const PreQueueItem &src)
Copy constructor.
Definition: ioasync_client.h:172
#define EVO_PARAM_UNUSED(NAME)
Mark function parameter as unused to suppress "unreferenced parameter" compiler warnings on it...
Definition: sys.h:427
Evo AsyncBase.
bool empty() const
Get whether queue is empty.
Definition: atomic_buffer_queue.h:109
String & reserve(Size size, bool prefer_realloc=false)
Reserve capacity for additional items (modifier).
Definition: string.h:5027
~AsyncClient()
Destructor.
Definition: ioasync_client.h:71