Evo C++ Library v0.5.1
Asynchronous I/O

Evo supports asynchronous I/O for high performance clients and servers:

See also: I/O Streams & Sockets

Evo async client and server classes are under namespace async

Alpha: Evo Async I/O classes should be considerd a Work In Progress

Dependencies

Evo async I/O requires libevent 2.0 or newer:

Near the beginning of a program using sockets, call Socket::sysinit() for best portability (required in Windows).

Supported Protocols

Evo async I/O is designed to support clients and servers using different protocol implementations.

Client

The async client classes are named ProtocolClient, where Protocol is the protocol used.

Client callback types:

Here's an example using MemcachedClient (Memcached protocol):

#include <evo/io.h>
using namespace evo;
void on_connect() {
con().out << "on_connect()" << NL;
}
void on_store(const SubString& key, Memcached::StoreResult result) {
con().out << "on_store() " << key << ' ' << Memcached::StoreResultEnum::get_string(result) << NL;
}
void on_get(const SubString& key, const SubString& value, uint32 flags) {
con().out << "on_get() " << key << " '" << value << "' " << flags << NL;
}
};
int main() {
const ushort MEMC_PORT = 11211;
OnEvent on_event;
memc.set_on_connect(&on_event);
memc.connect_ip("127.0.0.1", MEMC_PORT);
memc.set("key1", "value1", on_event);
memc.set("key2", "value2", on_event);
memc.runlocal();
memc.get("key1", on_event);
memc.get("key2", on_event);
memc.runlocal();
return 0;
}
Server

An async server class is created using a template class implementing a PROTOCOL and passing it a user-defined HANDLER class that implements the protocol event callbacks.

dot_inline_dotgraph_3.png
Note: Dashed line shows template parameter type used for member variable, solid line shows inheritance

Here's an example using Memcached PROTOCOL to create a simple async single-threaded memcached server:

#include <evo/maphash.h>
using namespace evo;
// Define a Handler type to handle memcached server request events
struct Shared : SimpleSharedBase<> {
StrHash map;
};
Shared& shared;
Handler(Global& global, Shared& shared) : shared(shared) {
}
StoreResult on_store(StoreParams& params, SubString& value, Command command, uint64 cas_id) {
switch(command) {
case cSET:
shared.map[params.key] = value;
break;
default:
send_error("Not supported");
return rtHANDLED;
}
return Memcached::srSTORED;
}
ResponseType on_get(const SubString& key, GetAdvParams* adv_params) {
const String* val = shared.map.find(key);
if (val != NULL)
send_value(key, *val);
return rtHANDLED;
}
};
// Create Memcached Server class using Handler
int main() {
const ushort PORT = 11211;
const ulong RD_TIMEOUT_MS = 5000;
const ulong WR_TIMEOUT_MS = 1000;
Socket listener;
try {
listener.listen_ip(PORT);
} EVO_CATCH(return 1)
Server server;
server.set_timeout(RD_TIMEOUT_MS, WR_TIMEOUT_MS);
server.run(listener);
return 0;
}
Server - Deferred Response

If a server handler has to wait on something to get a response, this is called a Deferred Response.

Examples when deferred response is required:

Here's a more complex example using Memcached PROTOCOL to create a simple async single-threaded memcached proxy server that uses a client to call a back-end memcached server:

using namespace evo;
// Define a Handler type to handle memcached server request events
struct ServerHandler : public async::MemcachedServerHandlerBase {
// Global configuration for all requests
struct Global {
String proxy_address;
ushort proxy_port;
Global() : proxy_port(0) {
}
};
// Shared state per thread
struct Shared {
async::MemcachedClient client; // client for calling back-end server
bool on_init(AsyncBase& server, Global& global) {
String tmp;
client.attach_to(server); // attach client to server event-loop
if (!client.connect_ip(global.proxy_address.cstr(tmp), global.proxy_port))
return false;
return true;
}
};
// Used to make a back-end request via client -- this either gets a response or an error occurs
// - After the response (or error) this deletes itself to cleanup
struct OnClientEvent : DeferredReply, async::MemcachedClient::OnEvent, async::MemcachedClient::OnError {
OnClientEvent(ServerHandler& parent, ulong id) : DeferredReply(parent, id) {
}
void on_store(const SubString& key, Memcached::StoreResult result) {
deferred_reply_store(result);
delete this; // deferred response complete, delete callback
}
void on_get(const SubString& key, const SubString& value, uint32 flags) {
deferred_reply_get(key, value, flags);
}
void on_get_end(const SubString&) {
deferred_reply_get_end();
delete this; // deferred response complete, delete callback
}
void on_error(AsyncError error) {
deferred_reply_error("Backend client error");
delete this; // deferred response aborted on error, delete callback
}
};
Global& global;
Shared& shared;
ServerHandler(Global& global, Shared& shared) : global(global), shared(shared) {
}
StoreResult on_store(StoreParams& params, SubString& value, Command command, uint64 cas_id) {
if (noreply) {
// Call back-end via client, no reply expected
shared.client.set(params.key, value, params.flags, params.expire);
return Memcached::srSTORED;
} else {
// Call back-end via client, response is deferred until client gets response (or an error occurs)
OnClientEvent* on_event = new OnClientEvent(*this, id);
if (!shared.client.set(params.key, value, params.flags, params.expire, NULL, on_event, on_event)) {
delete on_event;
send_error("Error calling back-end");
return rtHANDLED; // error sent, not deferred
}
return rtDEFERRED;
}
}
ResponseType on_get(const SubString& key, GetAdvParams* adv_params) {
// Call back-end via client, response is deferred until client gets response (or an error occurs)
OnClientEvent* on_event = new OnClientEvent(*this, id);
if (!shared.client.get(key, *on_event, on_event)) {
delete on_event;
send_error("Error calling back-end");
return rtHANDLED;
}
return rtDEFERRED;
}
};
// Create Memcached Server class using ServerHandler
int main() {
const ushort PORT = 11210;
const String BACKEND_ADDRESS = "127.0.0.1";
const ushort BACKEND_PORT = 11211;
const ulong RD_TIMEOUT_MS = 5000;
const ulong WR_TIMEOUT_MS = 1000;
Socket listener;
try {
listener.listen_ip(PORT);
} EVO_CATCH(return 1)
Server server;
{
Server::Global& global = server.get_global();
global.proxy_address = BACKEND_ADDRESS;
global.proxy_port = BACKEND_PORT;
}
server.set_timeout(RD_TIMEOUT_MS, WR_TIMEOUT_MS);
server.run(listener);
}
Implementation Detail

AsyncClient and AsyncServer both use an AsyncEventLoop to wait for I/O and call the appropriate callbacks.

dot_inline_dotgraph_4.png
Implementation Detail - Client

Client request internal details:

Implementation Detail - Server

When a server accepts an incoming connection:

dot_inline_dotgraph_5.png

Response types:

Deferred Response
msc_inline_mscgraph_1

The framework handles these scenarios with deferred responses: