Evo C++ Library v0.5.1
memcached_server.h
Go to the documentation of this file.
1 // Evo C++ Library
2 /* Copyright 2019 Justin Crowell
3 Distributed under the BSD 2-Clause License -- see included file LICENSE.txt for details.
4 */
6 
7 #pragma once
8 #ifndef INCL_evo_api_memcached_server_h
9 #define INCL_evo_api_memcached_server_h
10 
11 #include "memcached_common.h"
12 #include "../ioasync_server.h"
13 #include "../strtok.h"
14 
15 namespace evo {
16 namespace async {
19 
21 
40 
42  enum Command {
43  cUNKNOWN = 0,
44  cADD,
46  cCAS,
49  cGAT,
51  cGET,
57  cSET,
62  };
63 
66  "add",
67  "append",
68  "cas",
69  "decr",
70  "delete",
71  "gat",
72  "gats",
73  "get",
74  "gets",
75  "incr",
76  "prepend",
77  "quit",
78  "replace",
79  "set",
80  "stats",
81  "touch",
82  "version"
83  );
84 
86  struct StoreParams {
88  uint32 flags;
89  int64 expire;
90  ulong size;
91  uint64 cas_id;
92 
93  StoreParams() : flags(0), expire(0), size(0), cas_id(0) { }
94  };
95 
100  };
101 
106  };
107 
113 
123  struct GetAdvParams {
125  bool cas;
126  };
127 
129  typedef DeferredContextT<MemcachedServerHandlerBase> DeferredContext;
130 
132  struct DeferredReply : DeferredContext::ReplyBase {
137  DeferredReply(DeferredContext& context, ulong id) : DeferredContext::ReplyBase(context, id) {
138  }
139 
142  if (!finished && context.handler != NULL) {
143  context.handler->logger.log(LOG_LEVEL_ERROR, "MemcServer DeferredReply left unfinished");
144  deferred_reply_error("Internal handler error: DeferredReply left unfinished");
145  }
146  }
147 
151  void deferred_reply_error(const SubString& msg) {
152  if (context.handler != NULL) {
153  String buf;
154  buf.reserve(15 + msg.size());
155  buf.set("SERVER_ERROR ", 13);
156  buf << msg;
157  buf.add("\r\n", 2);
158  DeferredContext::Handler& handler = *context.handler;
159  handler.reply.deferred_send(id, buf, true);
160  if (handler.logger.check(LOG_LEVEL_DEBUG_LOW))
161  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(40 + msg.size()) << "MemcServer deferred reply " << id << " error: " << msg);
162  if (handler.reply.deferred_end(context))
163  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, "MemcServer cleanup");
164  }
165  finished = true;
166  }
167 
172  if (context.handler != NULL) {
173  String buf;
174  switch (result) {
175  case Memcached::srSTORED: buf.set("STORED\r\n", 8); break;
176  case Memcached::srNOT_STORED: buf.set("NOT_STORED\r\n", 12); break;
177  case Memcached::srEXISTS: buf.set("EXISTS\r\n", 8); break;
178  case Memcached::srNOT_FOUND: buf.set("NOT_FOUND\r\n", 11); break;
179  default: buf.set("SERVER_ERROR Backend error\r\n", 28); break;
180  }
181  DeferredContext::Handler& handler = *context.handler;
182  if (handler.logger.check(LOG_LEVEL_DEBUG_LOW))
183  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(60 + buf.size()) << "MemcServer on_store send deferred response " << id << ": " << SubString(buf.data(), buf.size() - 2));
184  handler.reply.deferred_send(id, buf, true);
185  if (handler.reply.deferred_end(context))
186  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, "MemcServer cleanup");
187  }
188  finished = true;
189  }
190 
195  if (context.handler != NULL) {
196  String buf;
197  if (value.null()) {
198  buf.set("NOT_FOUND\r\n", 11);
199  } else {
200  buf.reserve(UInt64::MAXSTRLEN + 2);
201  buf << *value << SubString("\r\n", 2);
202  }
203  DeferredContext::Handler& handler = *context.handler;
204  if (handler.logger.check(LOG_LEVEL_DEBUG_LOW))
205  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(60 + buf.size()) << "MemcServer on_increment send deferred response " << id << ": " << SubString(buf.data(), buf.size() - 2));
206  handler.reply.deferred_send(id, buf, true);
207  if (handler.reply.deferred_end(context))
208  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, "MemcServer cleanup");
209  }
210  finished = true;
211  }
212 
216  void deferred_reply_delete(bool success) {
217  if (context.handler != NULL) {
218  String buf;
219  if (success)
220  buf.set("DELETED\r\n", 9);
221  else
222  buf.set("NOT_FOUND\r\n", 11);
223  DeferredContext::Handler& handler = *context.handler;
224  if (handler.logger.check(LOG_LEVEL_DEBUG_LOW))
225  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(60 + buf.size()) << "MemcServer on_delete send deferred response " << id << ": " << SubString(buf.data(), buf.size() - 2));
226  handler.reply.deferred_send(id, buf, true);
227  if (handler.reply.deferred_end(context))
228  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, "MemcServer cleanup");
229  }
230  finished = true;
231  }
232 
236  void deferred_reply_touch(bool success) {
237  if (context.handler != NULL) {
238  String buf;
239  if (success)
240  buf.set("TOUCHED\r\n", 9);
241  else
242  buf.set("NOT_FOUND\r\n", 11);
243  DeferredContext::Handler& handler = *context.handler;
244  if (handler.logger.check(LOG_LEVEL_DEBUG_LOW))
245  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(60 + buf.size()) << "MemcServer on_touch send deferred response " << id << ": " << SubString(buf.data(), buf.size() - 2));
246  handler.reply.deferred_send(id, buf, true);
247  if (handler.reply.deferred_end(context))
248  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, "MemcServer cleanup");
249  }
250  finished = true;
251  }
252 
263  void deferred_reply_get(const SubString& key, const SubString& value, uint32 flags, uint64* cas_id=NULL) {
264  if (context.handler != NULL) {
265  DeferredContext::Handler& handler = *context.handler;
266  handler.send_value_internal(id, key, value, flags, cas_id, AsyncServerReply::wfDEFERRED);
267  if (handler.logger.check(LOG_LEVEL_DEBUG_LOW))
268  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(68 + key.size()) << "MemcServer on_get sent deferred value " << id << ": '" << key << "' (size: " << value.size() << ')');
269  }
270  }
271 
274  if (context.handler != NULL) {
275  String buf("END\r\n", 5);
276  DeferredContext::Handler& handler = *context.handler;
277  handler.reply.deferred_send(id, buf, true);
278  if (handler.logger.check(LOG_LEVEL_DEBUG_LOW))
279  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(48) << "MemcServer on_get end deferred response " << id);
280  if (handler.reply.deferred_end(context))
281  handler.logger.log_direct(LOG_LEVEL_DEBUG_LOW, "MemcServer cleanup");
282  }
283  finished = true;
284  }
285  };
286 
288  bool noreply;
289  bool enable_gat;
290  bool enable_cas;
291 
293  MemcachedServerHandlerBase() : noreply(false), enable_gat(false), enable_cas(false) {
294  }
295 
296  // Config
297 
298  static size_t get_max_initial_read() {
299  const size_t MAX_INITIAL_READ_VALUE = 524288; // 512 KB
300  return MAX_INITIAL_READ_VALUE;
301  }
302 
303  // Events
304 
344  virtual StoreResult on_store(DeferredContext& context, StoreParams& params, SubString& value, Command command, uint64 cas_id) {
345  EVO_PARAM_UNUSED(context);
346  EVO_PARAM_UNUSED(params);
347  EVO_PARAM_UNUSED(value);
348  EVO_PARAM_UNUSED(command);
349  EVO_PARAM_UNUSED(cas_id);
350  send_error("Not implemented");
351  return rtHANDLED;
352  }
353 
374  virtual IncrementResult on_increment(DeferredContext& context, const SubString& key, uint64 count, bool decrement) {
375  EVO_PARAM_UNUSED(context);
376  EVO_PARAM_UNUSED(key);
377  EVO_PARAM_UNUSED(count);
378  EVO_PARAM_UNUSED(decrement);
379  send_error("Not implemented");
380  return rtHANDLED;
381  }
382 
399  virtual DeleteResult on_delete(DeferredContext& context, const SubString& key) {
400  EVO_PARAM_UNUSED(context);
401  EVO_PARAM_UNUSED(key);
402  send_error("Not implemented");
403  return rtHANDLED;
404  }
405 
423  virtual TouchResult on_touch(DeferredContext& context, const SubString& key, int64 expire) {
424  EVO_PARAM_UNUSED(context);
425  EVO_PARAM_UNUSED(key);
426  EVO_PARAM_UNUSED(expire);
427  send_error("Not implemented");
428  return rtHANDLED;
429  }
430 
459  virtual GetStartResult on_get_start(DeferredContext& context, const SubString& keys, GetAdvParams* adv_params) {
460  EVO_PARAM_UNUSED(context);
461  EVO_PARAM_UNUSED(keys);
462  EVO_PARAM_UNUSED(adv_params);
463  return gsrCONTINUE;
464  }
465 
491  virtual ResponseType on_get(DeferredContext& context, const SubString& key, GetAdvParams* adv_params) {
492  EVO_PARAM_UNUSED(context);
493  EVO_PARAM_UNUSED(key);
494  EVO_PARAM_UNUSED(adv_params);
495  return rtNORMAL;
496  }
497 
514  virtual ResponseType on_get_end(DeferredContext& context) {
515  EVO_PARAM_UNUSED(context);
516  return rtNORMAL;
517  }
518 
522  virtual void on_flush_all(ulong delay_sec) {
523  EVO_PARAM_UNUSED(delay_sec);
524  }
525 
531  virtual void on_stats(SubString& params) {
532  EVO_PARAM_UNUSED(params);
533  }
534 
540  virtual void on_version(String& version) {
541  version = "Unknown";
542  }
543 
552  virtual bool on_command(bool& handled, SubString& command_str, SubString& params) {
553  EVO_PARAM_UNUSED(handled);
554  EVO_PARAM_UNUSED(command_str);
555  EVO_PARAM_UNUSED(params);
556  return true;
557  }
558 
562  virtual void on_error(AsyncError err) {
563  EVO_PARAM_UNUSED(err);
564  }
565 
566  // Helpers
567 
571  void send_reply(const SubString& msg) {
572  if (!noreply) {
573  AsyncServerReply::Writer writer(reply, id, msg.size() + 2);
574  writer.add(msg.data(), msg.size());
575  writer.add("\r\n", 2);
576  if (logger.check(LOG_LEVEL_DEBUG_LOW))
577  logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(32 + msg.size()) << "MemcServer -- send_reply: " << msg);
578  }
579  }
580 
584  void send_client_error(const SubString& msg) {
585  if (!noreply) {
586  AsyncServerReply::Writer writer(reply, id, 15 + msg.size());
587  writer.add("CLIENT_ERROR ", 13);
588  writer.add(msg.data(), msg.size());
589  writer.add("\r\n", 2);
590  if (logger.check(LOG_LEVEL_DEBUG_LOW))
591  logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(40 + msg.size()) << "MemcServer -- send_error: CLIENT_ERROR " << msg);
592  }
593  }
594 
598  void send_error(const SubString& msg) {
599  if (!noreply) {
600  AsyncServerReply::Writer writer(reply, id, 15 + msg.size());
601  writer.add("SERVER_ERROR ", 13);
602  writer.add(msg.data(), msg.size());
603  writer.add("\r\n", 2);
604  if (logger.check(LOG_LEVEL_DEBUG_LOW))
605  logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(40 + msg.size()) << "MemcServer -- send_error: SERVER_ERROR " << msg);
606  }
607  }
608 
616  void send_stat(const SubString& name, const SubString& value) {
617  AsyncServerReply::Writer writer(reply, id, 8 + name.size() + value.size());
618  writer.add("STAT ", 5);
619  writer.add(name.data(), name.size());
620  writer.add(' ');
621  writer.add(value.data(), value.size());
622  writer.add("\r\n", 2);
623  }
624 
633  void send_value(const SubString& key, const SubString& value, uint32 flags=0, uint64* cas_id=NULL) {
634  send_value_internal(id, key, value, flags, cas_id, AsyncServerReply::wfNONE);
635  if (logger.check(LOG_LEVEL_DEBUG_LOW))
636  logger.log_direct(LOG_LEVEL_DEBUG_LOW, String().reserve(42 + key.size()) << "MemcServer -- send_value '" << key << "' (size: " << value.size() << ')');
637  }
638 
639 private:
650  void send_value_internal(ulong req_id, const SubString& key, const SubString& value, uint32 flags, uint64* cas_id, AsyncServerReply::WriterFlags writer_flags) {
651  StringInt<uint32> flags_str(flags);
652  StringInt<StrSizeT> value_size_str(value.size());
653  StringInt<uint64> cas_id_str;
654 
655  SizeT write_size = 12 + key.size() + flags_str.size() + value_size_str.size() + value.size();
656  if (cas_id != NULL) {
657  cas_id_str.set(*cas_id);
658  write_size += 1 + cas_id_str.size();
659  }
660 
661  AsyncServerReply::Writer writer(reply, req_id, write_size, writer_flags);
662  writer.add("VALUE ", 6);
663  writer.add(key.data(), key.size());
664  writer.add(' ').add(flags_str.data(), flags_str.size());
665  writer.add(' ').add(value_size_str.data(), value_size_str.size());
666  if (cas_id != NULL)
667  writer.add(' ').add(cas_id_str.data(), cas_id_str.size());
668  writer.add("\r\n", 2);
669  writer.add(value.data(), value.size());
670  writer.add("\r\n", 2);
671  }
672 };
673 
675 
755 template<class T>
757  static const size_t MIN_INITIAL_READ = 0;
758  static const size_t NEWLINE_LEN = 2;
759 
760  typedef T Handler;
763  typedef typename Handler::DeferredContext DeferredContext;
764  typedef typename Handler::Global Global;
765  typedef typename Handler::Shared Shared;
766 
768 
770  Handler handler;
771 
777  MemcachedServer(Global& global, Shared& shared, LoggerBase* logger) : logger(logger), handler(global, shared), command(HandlerBase::cUNKNOWN) {
778  handler.logger.set(logger);
779  }
780 
781  // Read events
782 
783  // Helper for common deferred count checks -- undef'd below
784  #define EVO_HELPER_HANDLER_DEFCHECK(EVENT_NAME, EVENT_MSG_SUFFIX) \
785  if (handler.noreply) { \
786  logger.log(LOG_LEVEL_ERROR, "MemcServer " EVENT_NAME " error: Handler returned rtDEFERRED on 'noreply' request"); \
787  return false; /* invalid result, close connection */ \
788  } \
789  if (++expected_deferred_count != context_ref.count()) { \
790  if (expected_deferred_count > context_ref.count()) \
791  logger.log(LOG_LEVEL_ERROR, "MemcServer " EVENT_NAME " error: Handler returned rtDEFERRED without creating a DeferredReply"); \
792  else \
793  logger.log(LOG_LEVEL_ERROR, "MemcServer " EVENT_NAME " error: Handler created more than one DeferredReply, only 1 allowed per handler"); \
794  return false; /* invalid result, close connection */ \
795  } \
796  logger.log(LOG_LEVEL_DEBUG_LOW, "MemcServer -- " EVENT_NAME " response deferred" EVENT_MSG_SUFFIX);
797 
798  // Helper for common result.type cases -- undef'd below
799  #define EVO_HELPER_HANDLER_CASES(EVENT_NAME) \
800  case HandlerBase::rtDEFERRED: \
801  EVO_HELPER_HANDLER_DEFCHECK(EVENT_NAME, "") \
802  break; \
803  case HandlerBase::rtHANDLED: break; \
804  default: return false; /* rtCLOSE */
805 
806  // Helper for common handler end deferred-check -- undef'd below
807  #define EVO_HELPER_HANDLER_END_DEFCHECK(EVENT_NAME) \
808  if (expected_deferred_count != context_ref.count()) { \
809  logger.log(LOG_LEVEL_ERROR, "MemcServer " EVENT_NAME " error: Handler created DeferredReply without returning rtDEFERRED"); \
810  return false; /* invalid result, close connection */ \
811  }
812 
813  bool on_read_fixed(SizeT& next_size, SubString& data, void* context) {
814  // Read storage value
815  EVO_PARAM_UNUSED(next_size);
816  assert( context != NULL );
817  String logstr;
818  if (logger.check(LOG_LEVEL_DEBUG)) {
819  logstr.set().reserve(96 + storage_params.key.size())
820  << "MemcServer on_store " << HandlerBase::CommandEnum::get_string(command)
821  << " '" << storage_params.key << "' fl:" << storage_params.flags << " exp:" << storage_params.expire;
822  if (command == HandlerBase::cCAS)
823  logstr << " id:" << storage_params.cas_id;
824  logstr << " (size: " << storage_params.size << ')';
825  logger.log_direct(LOG_LEVEL_DEBUG, logstr);
826  }
827  data.stripr("\r\n", NEWLINE_LEN, 1);
828 
829  DeferredContext& context_ref = *(DeferredContext*)context;
830  ulong expected_deferred_count = context_ref.count();
831 
832  HandlerBase::StoreResult result = handler.on_store(context_ref, storage_params, data, command, storage_params.cas_id);
833  switch (result.type) {
834  case HandlerBase::rtNORMAL:
835  switch (result.result) {
836  case Memcached::srSTORED: reply("STORED\r\n", 8); break;
837  case Memcached::srNOT_STORED: reply("NOT_STORED\r\n", 12); break;
838  case Memcached::srEXISTS: reply("EXISTS\r\n", 8); break;
839  case Memcached::srNOT_FOUND: reply("NOT_FOUND\r\n", 11); break;
840  default:
841  if (logger.check(LOG_LEVEL_ERROR))
842  logger.log_direct(LOG_LEVEL_ERROR, logstr.set().reserve(56) << "MemcServer on_store error: Invalid handler result: " << (int)result.result);
843  return false; // invalid result, close connection
844  }
845  break;
846  EVO_HELPER_HANDLER_CASES("on_store");
847  }
849  return true;
850  }
851 
852  bool on_read(SizeT& fixed_size, AsyncBuffers& buffers, void* context) {
853  // New command
854  assert( context != NULL );
855 
856  DeferredContext& context_ref = *(DeferredContext*)context;
857  ulong expected_deferred_count = context_ref.count();
858 
859  const char DELIM = ' ';
860  String logstr;
861  SubString line;
862  while (buffers.read_line(line)) {
863  SubString command_str, params_str;
864  line.split(' ', command_str, params_str);
865  handler.set_id();
866  handler.noreply = false;
867 
868  command = Handler::CommandEnum::get_enum(command_str);
869  switch (command) {
870  // Storage
871  case HandlerBase::cCAS:
872  if (!handler.enable_cas) {
873  handler.send_error("Not implemented");
874  break;
875  } // fallthrough
876  case HandlerBase::cADD: // fallthrough
877  case HandlerBase::cAPPEND: // fallthrough
878  case HandlerBase::cPREPEND: // fallthrough
879  case HandlerBase::cREPLACE: // fallthrough
880  case HandlerBase::cSET: {
881  storage_params.key.set();
882  storage_params.flags = 0;
883  storage_params.expire = 0;
884  storage_params.size = 0;
885 
886  StrTokWord tok(params_str);
887  for (;;) {
888  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); storage_params.key = tok.value();
889  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); storage_params.flags = tok.value().getnum<uint32>(fDEC);
890  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); storage_params.expire = tok.value().getnum<longl>(fDEC);
891  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); storage_params.size = tok.value().getnum<ulong>(fDEC);
892  if (command == HandlerBase::cCAS) {
893  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); storage_params.cas_id = tok.value().getnum<uint64>();
894  }
895  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); handler.noreply = (tok.value() == "noreply");
896  break;
897  }
898  if (storage_params.key.empty()) {
899  handler.send_client_error("Missing parameter, expected key");
900  break;
901  }
902  buffers.read_flush();
903  if (handler.noreply)
904  handler.reply.nosend(handler.id);
905 
906  if (!buffers.read_fixed_helper(*this, fixed_size, storage_params.size + NEWLINE_LEN, 0, context))
907  return false;
908  if (fixed_size > 0)
909  return true;
910  continue; // next command (while loop at top)
911  }
912 
913  // Increment
914  case HandlerBase::cINCREMENT:
915  case HandlerBase::cDECREMENT: {
916  SubString key;
917  uint64 count = 0;
918  StrTokWord tok(params_str);
919  for (;;) {
920  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); key = tok.value();
921  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); count = tok.value().getnum<uint64>();
922  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); handler.noreply = (tok.value() == "noreply");
923  break;
924  }
925  if (key.empty()) {
926  handler.send_client_error("Missing parameter, expected key");
927  break;
928  }
929  if (logger.check(LOG_LEVEL_DEBUG))
930  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(42 + key.size()) << "MemcServer on_increment '" << key << "' " << (command == HandlerBase::cDECREMENT ? '-' : '+') << count);
931  if (handler.noreply)
932  handler.reply.nosend(handler.id);
933  HandlerBase::IncrementResult result = handler.on_increment(context_ref, key, count, (command == HandlerBase::cDECREMENT));
934  switch (result.type) {
935  case HandlerBase::rtNORMAL:
936  if (!result.result.null()) {
937  StringInt<uint64,NEWLINE_LEN> result_str(*result.result, fDEC, false, NEWLINE_LEN);
938  result_str.add("\r\n", NEWLINE_LEN);
939  reply(result_str.data(), result_str.size());
940  } else
941  reply("NOT_FOUND\r\n", 11);
942  break;
943  EVO_HELPER_HANDLER_CASES("on_increment");
944  }
945  EVO_HELPER_HANDLER_END_DEFCHECK("on_increment");
946  break;
947  }
948 
949  // Delete
950  case HandlerBase::cDELETE: {
951  SubString key;
952  StrTokWord tok(params_str);
953  for (;;) {
954  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); key = tok.value();
955  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); handler.noreply = (tok.value() == "noreply");
956  break;
957  }
958  if (key.empty()) {
959  handler.send_client_error("Missing parameter, expected key(s)");
960  break;
961  }
962  if (logger.check(LOG_LEVEL_DEBUG))
963  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(24 + key.size()) << "MemcServer on_delete '" << key << '\'');
964  if (handler.noreply)
965  handler.reply.nosend(handler.id);
966  HandlerBase::DeleteResult result = handler.on_delete(context_ref, key);
967  switch (result.type) {
968  case HandlerBase::rtNORMAL:
969  if (result.result == HandlerBase::grOK)
970  reply("DELETED\r\n", 9);
971  else
972  reply("NOT_FOUND\r\n", 11);
973  break;
974  EVO_HELPER_HANDLER_CASES("on_delete");
975  }
976  EVO_HELPER_HANDLER_END_DEFCHECK("on_delete");
977  break;
978  }
979 
980  // Touch
981  case HandlerBase::cTOUCH: {
982  SubString key;
983  int64 expire = 0;
984  StrTokWord tok(params_str);
985  for (;;) {
986  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); key = tok.value();
987  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); expire = tok.value().getnum<int64>(fDEC);
988  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); handler.noreply = (tok.value() == "noreply");
989  break;
990  }
991  if (key.empty()) {
992  handler.send_client_error("Missing parameter, expected key(s)");
993  break;
994  }
995  if (logger.check(LOG_LEVEL_DEBUG))
996  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(24 + key.size()) << "MemcServer on_touch '" << key << '\'');
997  if (handler.noreply)
998  handler.reply.nosend(handler.id);
999  HandlerBase::TouchResult result = handler.on_touch(context_ref, key, expire);
1000  switch (result.type) {
1001  case HandlerBase::rtNORMAL:
1002  if (result.result == HandlerBase::grOK)
1003  reply("TOUCHED\r\n", 9);
1004  else
1005  reply("NOT_FOUND\r\n", 11);
1006  break;
1007  EVO_HELPER_HANDLER_CASES("on_touch");
1008  }
1009  EVO_HELPER_HANDLER_END_DEFCHECK("on_touch");
1010  break;
1011  }
1012 
1013  // Get
1014  case HandlerBase::cGAT:
1015  case HandlerBase::cGATS:
1016  case HandlerBase::cGETS:
1017  case HandlerBase::cGET: {
1018  bool handled = false;
1019  HandlerBase::GetAdvParams adv_params;
1020  HandlerBase::GetAdvParams* adv_params_ptr = NULL;
1021  switch (command) {
1022  case HandlerBase::cGETS:
1023  // CAS variant
1024  if (!handler.enable_cas) {
1025  handler.send_error("Not implemented");
1026  handled = true;
1027  break;
1028  }
1029  adv_params.cas = true;
1030  adv_params_ptr = &adv_params;
1031  break;
1032  case HandlerBase::cGATS:
1033  // GAT and CAS variant
1034  if (!handler.enable_cas) {
1035  handler.send_error("Not implemented");
1036  handled = true;
1037  break;
1038  }
1039  adv_params.cas = true; // fallthrough
1040  case HandlerBase::cGAT: {
1041  // GAT variant
1042  if (!handler.enable_gat) {
1043  handler.send_error("Not implemented");
1044  handled = true;
1045  break;
1046  }
1047  SubString expire_str;
1048  if (!params_str.token(expire_str, DELIM) || expire_str.empty() || params_str.stripl(DELIM).empty()) {
1049  handler.send_client_error("Missing parameter, expected expire value and key(s)");
1050  handled = true;
1051  break;
1052  }
1053  adv_params.expire = expire_str.getnum<uint64>();
1054  adv_params_ptr = &adv_params;
1055  break;
1056  }
1057  default:
1058  if (params_str.empty()) {
1059  handler.send_client_error("Missing parameter, expected key(s)");
1060  handled = true;
1061  break;
1062  }
1063  break;
1064  }
1065  if (handled)
1066  break;
1067 
1068  if (adv_params_ptr != NULL && logger.check(LOG_LEVEL_DEBUG_LOW)) {
1069  logstr.set().reserve(48) << "MemcServer get adv:" << command_str;
1070  if (!adv_params.expire.null())
1071  logstr << " exp:" << *adv_params.expire;
1072  logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr);
1073  }
1074 
1075  uint deferred = 0;
1076  HandlerBase::GetStartResult result = handler.on_get_start(context_ref, params_str, adv_params_ptr);
1077  switch (result.type) {
1078  case HandlerBase::rtNORMAL:
1079  switch (result.result) {
1080  case HandlerBase::gsrCONTINUE: {
1081  // Call on_get() for each key
1082  StrTokWord tok(params_str);
1083  while (tok.nextw(DELIM)) {
1084  if (logger.check(LOG_LEVEL_DEBUG))
1085  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(24 + tok.value().size()) << "MemcServer on_get '" << tok.value() << '\'');
1086  switch (handler.on_get(context_ref, tok.value(), adv_params_ptr)) {
1087  case HandlerBase::rtNORMAL: break;
1088  case HandlerBase::rtHANDLED: handled = true; break;
1089  case HandlerBase::rtDEFERRED: ++deferred; EVO_HELPER_HANDLER_DEFCHECK("on_get", " from on_get()"); break;
1090  default: return false; // rtCLOSE
1091  }
1093  if (handled)
1094  break;
1095  }
1096  }
1097  case HandlerBase::gsrSKIP:
1098  break;
1099  }
1100  break;
1101  case HandlerBase::rtDEFERRED: ++deferred; EVO_HELPER_HANDLER_DEFCHECK("on_get", " from on_get_start()"); break;
1102  case HandlerBase::rtHANDLED: handled = true; break;
1103  default: return false; // rtCLOSE
1104  }
1105  EVO_HELPER_HANDLER_END_DEFCHECK("on_get_start");
1106 
1107  if (!handled) {
1108  switch (handler.on_get_end(context_ref)) {
1109  case HandlerBase::rtNORMAL: break;
1110  case HandlerBase::rtHANDLED: handled = true; break;
1111  case HandlerBase::rtDEFERRED: ++deferred; EVO_HELPER_HANDLER_DEFCHECK("on_get", " from on_get_end()"); break;
1112  default: return false; // rtCLOSE
1113  }
1114  EVO_HELPER_HANDLER_END_DEFCHECK("on_get_end");
1115 
1116  if (!handled && deferred == 0) {
1117  String buf("END\r\n", 5);
1118  handler.reply.send(handler.id, buf);
1119  handler.reply.send_end();
1120  }
1121  }
1122  break;
1123  }
1124 
1125  // Misc
1126  case HandlerBase::cSTATS: handler.on_stats(params_str); break;
1127  case HandlerBase::cQUIT: return false;
1128 
1129  // Unknown
1130  default: {
1131  bool handled = false;
1132  if (!handler.on_command(handled, command_str, params_str))
1133  return false;
1134  if (!handled)
1135  reply("ERROR\r\n", 7);
1136  break;
1137  }
1138  }
1139  buffers.read_flush();
1140  }
1141  return true;
1142  }
1143 
1144  void on_error(AsyncError err) {
1145  handler.on_error(err);
1146  }
1147 
1148  #undef EVO_HELPER_HANDLER_END_DEFCHECK
1149  #undef EVO_HELPER_HANDLER_CASES
1150  #undef EVO_HELPER_HANDLER_DEFCHECK
1151 
1152 private:
1153  HandlerBase::Command command;
1154  HandlerBase::StoreParams storage_params;
1155 
1156  // String literals only
1157  void reply(const char* data, StrSizeT size) {
1158  if (!handler.noreply) {
1159  String data_str(data, size);
1160  handler.reply.send(handler.id, data_str);
1161  handler.reply.send_end();
1162  }
1163  }
1164 };
1165 
1167 
1168 }
1169 }
1170 #endif
High-level debug message, used for showing debug info for higher-level behavior (DBUG) ...
Definition: logger.h:136
bool empty() const
Get whether empty.
void log_direct(LogLevel level, const SubString &msg)
Log a message with given log level directly without checking the current log level.
Definition: logger.h:423
Size size() const
Get size.
Definition: list.h:759
void deferred_reply_error(const SubString &msg)
Finish deferred response with an error.
Definition: memcached_server.h:151
General types used when implementing MemcachedClient or server callbacks.
Definition: memcached_common.h:23
ResponseType
Handler response type – used with ResponseResult.
Definition: ioasync_server.h:375
#define EVO_HELPER_HANDLER_END_DEFCHECK(EVENT_NAME)
Definition: memcached_server.h:807
Add value if not found.
Definition: memcached_server.h:44
Get server version.
Definition: memcached_server.h:60
bool token(StringT &value, char delim)
Extract next token from string.
Definition: substring.h:383
bool null() const
Get whether null.
Definition: type.h:318
#define EVO_HELPER_HANDLER_DEFCHECK(EVENT_NAME, EVENT_MSG_SUFFIX)
Definition: memcached_server.h:784
Set new value.
Definition: memcached_server.h:57
void deferred_reply_touch(bool success)
Finish touch request and report result.
Definition: memcached_server.h:236
T Handler
User defined handler type.
Definition: memcached_server.h:760
Get server stats.
Definition: memcached_server.h:58
void send_client_error(const SubString &msg)
Helper to send a client error response.
Definition: memcached_server.h:584
Not stored due to unmet condtion for append, prepend, add, or replace command.
Definition: memcached_common.h:38
Get value for compare and swap.
Definition: memcached_server.h:52
Decrement numeric value for key.
Definition: memcached_server.h:47
Handler::DeferredContext DeferredContext
Alias for Handler::DeferredContext.
Definition: memcached_server.h:763
bool noreply
Whether no-reply mode is enabled (set by parent protocol class)
Definition: memcached_server.h:288
virtual IncrementResult on_increment(DeferredContext &context, const SubString &key, uint64 count, bool decrement)
Called on INCR or DECR request to increment or decrement a numeric value.
Definition: memcached_server.h:374
virtual void on_stats(SubString &params)
Called on STATS request for statistics.
Definition: memcached_server.h:531
LoggerPtr logger
Logger to use.
Definition: memcached_server.h:769
bool cas
Whether CAS is enabled, if true the handler must include a cas_id when sending response values with s...
Definition: memcached_server.h:125
Handler response result.
Definition: ioasync_server.h:389
AsyncBuffers buffers
Buffers for async I/O.
Definition: ioasync_server.h:496
uint32 flags
Flags to store, returned with GET.
Definition: memcached_server.h:88
static size_t get_max_initial_read()
Definition: memcached_server.h:298
void send_value(const SubString &key, const SubString &value, uint32 flags=0, uint64 *cas_id=NULL)
Helper to send value for get response.
Definition: memcached_server.h:633
Wraps a logger pointer that can reference a logger to use or be disabled.
Definition: logger.h:377
virtual StoreResult on_store(DeferredContext &context, StoreParams &params, SubString &value, Command command, uint64 cas_id)
Called on STORE request to store a value.
Definition: memcached_server.h:344
Compare and swap
Definition: memcached_server.h:46
String tokenizer adapter used internally to create variants of existing tokenizers – do not use dire...
Definition: strtok.h:1401
DeferredContextT< MemcachedServerHandlerBase > DeferredContext
Deferred context helper – used by AsyncServer.
Definition: memcached_server.h:129
bool enable_cas
Derived constructor must set to true to enable "compare and swap" (gets/gats command) ...
Definition: memcached_server.h:290
Base class for user defined Memcached server handler.
Definition: memcached_server.h:38
Deferred but not the last part of this response.
Definition: ioasync_server.h:32
AsyncServer< This > Server
Server type.
Definition: memcached_server.h:762
evo::async::Memcached Memcached
Alias for evo::async::Memcached
Definition: memcached_server.h:39
String fixed-size buffer for formatting an integer.
Definition: string.h:54
virtual bool on_command(bool &handled, SubString &command_str, SubString &params)
Called on any other command.
Definition: memcached_server.h:552
MemcachedServer< Handler > This
This type.
Definition: memcached_server.h:761
~DeferredReply()
Destructor.
Definition: memcached_server.h:141
Definition: ioasync_server.h:31
#define EVO_ENUM_MAP_PREFIXED(ENUM, PREFIX,...)
Helper for creating enum string/value mappers with prefixed enum values.
Definition: enum.h:219
void send_error(const SubString &msg)
Helper to send a server error response.
Definition: memcached_server.h:598
virtual void on_flush_all(ulong delay_sec)
Called on FLUSH_ALL request to expire all keys.
Definition: memcached_server.h:522
ulong size
Value size to store.
Definition: memcached_server.h:90
#define EVO_TOK_NEXT_OR_BREAK(TOK, DELIM)
Helper for tokenizing using a break-loop.
Definition: strtok.h:1854
Continue to on_get() events.
Definition: memcached_server.h:98
Size size() const
Get size.
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
Low-level debug message, used for showing debug info for lower-level internal or library details (DBG...
Definition: logger.h:137
Quit command to close connection.
Definition: memcached_server.h:55
virtual void on_error(AsyncError err)
Called on error.
Definition: memcached_server.h:562
virtual ResponseType on_get_end(DeferredContext &context)
Called at end of GET request.
Definition: memcached_server.h:514
STORE command parameters.
Definition: memcached_server.h:86
ResultType result
Normal response result – ignored unless type=rtNORMAL
Definition: ioasync_server.h:393
Item not found, can&#39;t compare-and-swap (CAS command only)
Definition: memcached_common.h:37
Replace value if found.
Definition: memcached_server.h:56
ResponseResult< GeneralResultValue > DeleteResult
DELETE command result returned by on_delete()
Definition: memcached_server.h:111
Prepend to existing value.
Definition: memcached_server.h:54
Response writer used to group multiple writes together for best performance.
Definition: ioasync_server.h:43
const char * data() const
Get string pointer (const).
Definition: string.h:1533
uint32 StrSizeT
Default Evo string size type.
Definition: sys.h:734
ResponseResult< GetStartResultValue > GetStartResult
GET command result returned by on_get_start()
Definition: memcached_server.h:109
void send_reply(const SubString &msg)
Helper to send a reply message.
Definition: memcached_server.h:571
Base 10: decimal (default)
Definition: str.h:2323
String container.
Definition: string.h:674
virtual void on_version(String &version)
Called on VERSION request for server version.
Definition: memcached_server.h:540
ThisType & set()
Set as null.
Definition: string.h:99
T getnum(Error &error, int base=0) const
Convert to number value for given integer type.
Definition: substring.h:1351
Command
Command value.
Definition: memcached_server.h:42
uint64 cas_id
CAS ID – only used for CAS command.
Definition: memcached_server.h:91
String key
Key to store.
Definition: memcached_server.h:87
Unknown command (always first)
Definition: memcached_server.h:43
SubString & stripl()
Strip left (beginning) whitespace (spaces and tabs).
Definition: substring.h:1090
Get value and update expiratiom time.
Definition: memcached_server.h:49
SubString & stripr()
Strip right (ending) whitespace (spaces and tabs).
Definition: substring.h:1135
Async I/O server for receiving and handling requests.
Definition: ioasync_server.h:576
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
ThisType & add(const char *data, Size size)
Append additional data to buffer.
Definition: string.h:139
Append to existing value.
Definition: memcached_server.h:45
Skip on_get() events.
Definition: memcached_server.h:99
ResponseResult< Memcached::StoreResult > StoreResult
STORE command result returned by on_store()
Definition: memcached_server.h:108
GeneralResultValue
General result used by some commands.
Definition: memcached_server.h:103
Key not found.
Definition: memcached_server.h:105
bool read_fixed_helper(T &parent, SizeT &fixed_size, SizeT size, SizeT max_size=0, void *context=NULL)
Helper for reading fixed size data from read buffer from a ProtocolHandler on_read() event...
Definition: ioasync_base.h:345
Increment numeric value for key.
Definition: memcached_server.h:53
Item modified, interrupting compare-and-swap command (CAS command only)
Definition: memcached_common.h:36
#define EVO_HELPER_HANDLER_CASES(EVENT_NAME)
Definition: memcached_server.h:799
Get value for compare and swap and update expiratiom time.
Definition: memcached_server.h:50
ResponseResult< UInt64 > IncrementResult
INCR/DECR command result.
Definition: memcached_server.h:110
Handler::Global Global
Alias for Handler::Global.
Definition: memcached_server.h:764
Delete by key.
Definition: memcached_server.h:48
virtual DeleteResult on_delete(DeferredContext &context, const SubString &key)
Called on DELETE request to delete key and value.
Definition: memcached_server.h:399
char * data()
Get formatted string pointer.
Definition: string.h:85
static const int MAXSTRLEN
Max formatted length, including either sign or hex/octal prefix (0x/0), but not both.
Definition: type.h:987
LoggerPtr logger
Logger to use (set by AsyncServer)
Definition: memcached_server.h:287
Evo C++ Library namespace.
Definition: alg.h:11
Touch key by updating expiration time.
Definition: memcached_server.h:59
StoreResult
Memcached store command result.
Definition: memcached_common.h:34
Deferred reply helper – deferred event objects should hold or inherit this, and use to send deferred...
Definition: memcached_server.h:132
void read_flush()
Flush and consume next line from read buffer.
Definition: ioasync_base.h:413
Get value.
Definition: memcached_server.h:51
MemcachedServer(Global &global, Shared &shared, LoggerBase *logger)
Constructor to create server protocol instance.
Definition: memcached_server.h:777
Int64 expire
Expiration time in seconds from now, null if not updating expiration, 0 for no expiration, negative to expire now, or a value greater than 2592000 (30 days) means a Unix timestamp to expire on.
Definition: memcached_server.h:124
void deferred_reply_get(const SubString &key, const SubString &value, uint32 flags, uint64 *cas_id=NULL)
Send get request value for key.
Definition: memcached_server.h:263
bool enable_gat
Derived constructor must set to true to enable "get and touch" (gat/gats command) ...
Definition: memcached_server.h:289
Successfully stored.
Definition: memcached_common.h:39
bool read_line(SubString &data)
Read next line from read buffer.
Definition: ioasync_base.h:382
void deferred_reply_store(Memcached::StoreResult result)
Finish set/store request and report result.
Definition: memcached_server.h:171
Normal response.
Definition: ioasync_server.h:376
WriterFlags
Flags used with Writer.
Definition: ioasync_server.h:30
String & add(char ch)
Append character (modifier).
Definition: string.h:2741
Success.
Definition: memcached_server.h:104
void deferred_reply_delete(bool success)
Finish delete request and report result.
Definition: memcached_server.h:216
ResponseResult< GeneralResultValue > TouchResult
TOUCH command result returned by on_touch()
Definition: memcached_server.h:112
virtual ResponseType on_get(DeferredContext &context, const SubString &key, GetAdvParams *adv_params)
Called for each key in GET request.
Definition: memcached_server.h:491
Handler handler
Handler instance.
Definition: memcached_server.h:770
bool check(LogLevel level) const
Check whether a message with given level will actually be logged.
Definition: logger.h:418
MemcachedServerHandlerBase()
Constructor.
Definition: memcached_server.h:293
Error message showing something isn&#39;t working as expected, program may be able to work around it (ERR...
Definition: logger.h:133
GetStartResultValue
GET command result from on_get_start().
Definition: memcached_server.h:97
bool on_read_fixed(SizeT &next_size, SubString &data, void *context)
Definition: memcached_server.h:813
int64 expire
Expiration time in seconds from now, 0 for no expiration, negative to expire now, or a value greater ...
Definition: memcached_server.h:89
bool split(char delim, T1 &left, T2 &right) const
Split at first occurrence of delimiter into left/right substrings.
Definition: substring.h:976
Response already sent so request is handled, use if error was sent.
Definition: ioasync_server.h:378
Implements Memcached protocol for an async server.
Definition: memcached_server.h:756
String & set()
Set as null and empty.
Definition: string.h:995
AsyncServerReply reply
Server reply manager, used to track deferred events and queue out of order replies.
Definition: ioasync_server.h:497
void deferred_reply_increment(UInt64 value)
Finish increment/decrement request and report result.
Definition: memcached_server.h:194
Handler::Shared Shared
Alias for Handler::Shared.
Definition: memcached_server.h:765
ResponseType type
Response type – see ResponseType.
Definition: ioasync_server.h:392
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
Reference and access existing string data.
Definition: substring.h:229
void send_stat(const SubString &name, const SubString &value)
Helper for sending statistics.
Definition: memcached_server.h:616
bool on_read(SizeT &fixed_size, AsyncBuffers &buffers, void *context)
Definition: memcached_server.h:852
Base class for Logger.
Definition: logger.h:273
Additional parameters for advanced GET request variants.
Definition: memcached_server.h:123
void on_error(AsyncError err)
Definition: memcached_server.h:1144
virtual TouchResult on_touch(DeferredContext &context, const SubString &key, int64 expire)
Called on TOUCH request to update expiration of existing key without retrieving the value...
Definition: memcached_server.h:423
Base async I/O server handler.
Definition: ioasync_server.h:282
Evo Async Memached API, common client/server types.
Size size() const
Get formatting string size.
Definition: string.h:93
StoreParams()
Definition: memcached_server.h:93
#define EVO_PARAM_UNUSED(NAME)
Mark function parameter as unused to suppress "unreferenced parameter" compiler warnings on it...
Definition: sys.h:427
const char * data() const
Get data pointer.
Enum guard value (always last)
Definition: memcached_server.h:61
DeferredReply(DeferredContext &context, ulong id)
Constructor.
Definition: memcached_server.h:137
String & reserve(Size size, bool prefer_realloc=false)
Reserve capacity for additional items (modifier).
Definition: string.h:5027
void deferred_reply_get_end()
Finish response for get request.
Definition: memcached_server.h:273
MemcachedServerHandlerBase HandlerBase
Alias for MemcachedServerHandlerBase.
Definition: memcached_server.h:767
virtual GetStartResult on_get_start(DeferredContext &context, const SubString &keys, GetAdvParams *adv_params)
Called at the beginning of a GET request with all requested keys.
Definition: memcached_server.h:459