Evo C++ Library v0.5.1
memcached_client.h
Go to the documentation of this file.
1 // Evo C++ Library
2 /* Copyright 2019 Justin Crowell
3 Distributed under the BSD 2-Clause License -- see included file LICENSE.txt for details.
4 */
6 
7 #pragma once
8 #ifndef INCL_evo_api_memcached_client_h
9 #define INCL_evo_api_memcached_client_h
10 
11 #include "memcached_common.h"
12 #include "../impl/systime.h"
13 #include "../ioasync_client.h"
14 #include "../strtok.h"
15 #include "../pair.h"
16 
17 namespace evo {
18 namespace async {
21 
23 
25 namespace impl_memc {
26  // Used in queue to track expected responses to client
27  struct ClientQueueItem {
28  enum Type {
29  tNONE = 0,
30  tSTORE,
31  tINCREMENT,
32  tDELETE,
33  tTOUCH,
34  tGET,
35  tGET_CAS
36  };
37 
38  Type type;
39  void* on_reply;
40  void* on_error;
41  String data;
42  UInt64 data_num;
43  bool track_notfound;
44 
45  ClientQueueItem() : type(tNONE), on_reply(NULL), on_error(NULL), track_notfound(false)
46  { }
47  ClientQueueItem(const ClientQueueItem& src) : type(src.type), on_reply(src.on_reply), on_error(src.on_error), data(src.data), data_num(src.data_num), track_notfound(src.track_notfound)
48  { }
49  ClientQueueItem& operator=(const ClientQueueItem& src) {
50  type = src.type;
51  on_reply = src.on_reply;
52  on_error = src.on_error;
53  swap(data, (String&)src.data); // swap for thread safety and speed (unshared)
54  data_num = src.data_num;
55  track_notfound = src.track_notfound;
56  return *this;
57  }
58 
59  bool null() const {
60  return (type == tNONE);
61  }
62 
63  void set() {
64  type = tNONE;
65  on_reply = NULL;
66  on_error = NULL;
67  data.set();
68  data_num.set();
69  track_notfound = false;
70  }
71  };
72 }
75 
147 class MemcachedClient : public AsyncClient<MemcachedClient, impl_memc::ClientQueueItem> {
148 public:
149  static const SizeT DEFAULT_QUEUE_SIZE = 256;
150  static const size_t DEFAULT_MAX_READ = 524288; // 512 KB
151  static const size_t MIN_INITIAL_READ = 0;
152 
154  using Base::OnConnect;
155  using Base::OnError;
156 
158  struct OnStore {
160 
161  virtual ~OnStore() {
162  }
163 
164  virtual void on_store(const SubString& key, Memcached::StoreResult result) {
165  EVO_PARAM_UNUSED(key);
166  EVO_PARAM_UNUSED(result);
167  }
168  };
169 
171  struct OnIncrement {
172  virtual ~OnIncrement() {
173  }
174 
175  virtual void on_increment(const SubString& key, const UInt64& count) {
176  EVO_PARAM_UNUSED(key);
177  EVO_PARAM_UNUSED(count);
178  }
179  };
180 
182  struct OnRemove {
183  virtual ~OnRemove() {
184  }
185 
186  virtual void on_remove(const SubString& key, bool removed) {
187  EVO_PARAM_UNUSED(key);
188  EVO_PARAM_UNUSED(removed);
189  }
190  };
191 
193  struct OnTouch {
194  virtual ~OnTouch() {
195  }
196 
197  virtual void on_touch(const SubString& key, bool touched) {
198  EVO_PARAM_UNUSED(key);
199  EVO_PARAM_UNUSED(touched);
200  }
201  };
202 
204  struct OnGet {
205  virtual ~OnGet() {
206  }
207 
208  virtual void on_get(const SubString& key, const SubString& value, uint32 flags) {
209  EVO_PARAM_UNUSED(key);
210  EVO_PARAM_UNUSED(value);
211  EVO_PARAM_UNUSED(flags);
212  }
213 
214  virtual void on_get_cas(const SubString& key, const SubString& value, uint32 flags, uint64 cas_id) {
215  EVO_PARAM_UNUSED(key);
216  EVO_PARAM_UNUSED(value);
217  EVO_PARAM_UNUSED(flags);
218  EVO_PARAM_UNUSED(cas_id);
219  }
220 
221  virtual void on_get_end(const SubString& keys_notfound) {
222  EVO_PARAM_UNUSED(keys_notfound);
223  }
224  };
225 
227  struct OnEvent : OnConnect, OnStore, OnIncrement, OnTouch, OnRemove, OnGet {
228  };
229 
234  MemcachedClient(SizeT max_queue_size=DEFAULT_QUEUE_SIZE, SizeT max_read_size=DEFAULT_MAX_READ) :
235  AsyncClient<MemcachedClient, impl_memc::ClientQueueItem>(max_queue_size, max_read_size), cur_type_(QueueItem::tNONE) {
236  }
237 
254  bool set(const SubString& key, const SubString& value, uint32 flags=0, int64 expire=0, uint64* cas_id=NULL, OnStore* on_store=NULL, OnError* on_error=NULL) {
255  if (get_state() == sNONE)
256  return false;
257 
258  StringInt<uint32,0> flags_str(flags, fDEC, false);
259  StringInt<int64,0> expire_str(expire, fDEC, false);
260  StringInt<StrSizeT,0> val_size_str(value.size(), fDEC, false);
261  StringInt<uint64,0> cas_id_str;
262 
263  size_t buf_size = 7 + key.size() + flags_str.size() + expire_str.size() + val_size_str.size() + NEWLINE_LEN + value.size() + NEWLINE_LEN;
264  if (cas_id != NULL) {
265  cas_id_str.set(*cas_id, fDEC, false);
266  buf_size += 1 + cas_id_str.size();
267  }
268  if (on_store == NULL)
269  buf_size += 8;
270 
271  RequestWriter writer(*this, buf_size);
272  if (writer.error())
273  return false;
274  const char* dbg_str = writer.ptr();
275  writer.add((cas_id == NULL ? "set " : "cas "), 4);
276  writer.add(key.data(), key.size());
277  writer.add(' ').add(flags_str.data(), flags_str.size());
278  writer.add(' ').add(expire_str.data(), expire_str.size());
279  writer.add(' ').add(val_size_str.data(), val_size_str.size());
280  if (cas_id != NULL)
281  writer.add(' ').add(cas_id_str.data(), cas_id_str.size());
282  const StrSizeT dbg_str_len = (StrSizeT)(writer.ptr() - dbg_str);
283  if (on_store == NULL)
284  writer.add(" noreply", 8);
285  writer.add("\r\n", NEWLINE_LEN);
286  writer.add(value.data(), value.size());
287  writer.add("\r\n", NEWLINE_LEN);
288  if (logger.check(LOG_LEVEL_DEBUG))
289  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(32 + dbg_str_len) << "MemcClient " << get_id() << ' ' << SubString(dbg_str, dbg_str_len) << " (write: " << buf_size << ')');
290 
291  if (on_store != NULL) {
292  QueueItem& item = writer.pq.item;
293  item.type = QueueItem::tSTORE;
294  item.on_reply = (void*)on_store;
295  item.on_error = on_error;
296  item.data = key;
297  }
298  return true;
299  }
300 
316  bool set(const SubString& key, const SubString& value, OnStore& on_store, OnError* on_error=NULL, uint32 flags=0, int64 expire=0, uint64* cas_id=NULL) {
317  return set(key, value, flags, expire, cas_id, &on_store, on_error);
318  }
319 
336  bool set_cas(const SubString& key, const SubString& value, uint64 cas_id, OnStore& on_store, OnError* on_error=NULL, uint32 flags=0, int64 expire=0) {
337  return set(key, value, flags, expire, &cas_id, &on_store, on_error);
338  }
339 
350  bool set_append(const SubString& key, const SubString& value, OnStore* on_store=NULL, OnError* on_error=NULL) {
351  if (get_state() == sNONE)
352  return false;
353 
354  StringInt<StrSizeT,0> val_size_str(value.size(), fDEC, false);
355  size_t buf_size = 12 + key.size() + val_size_str.size() + NEWLINE_LEN + value.size() + NEWLINE_LEN;
356  if (on_store == NULL)
357  buf_size += 8;
358 
359  RequestWriter writer(*this, buf_size);
360  if (writer.error())
361  return false;
362  writer.add("append ", 7);
363  writer.add(key.data(), key.size());
364  writer.add(" 0 0 ", 5);
365  writer.add(val_size_str.data(), val_size_str.size());
366  if (on_store == NULL)
367  writer.add(" noreply", 8);
368  writer.add("\r\n", NEWLINE_LEN);
369  writer.add(value.data(), value.size());
370  writer.add("\r\n", NEWLINE_LEN);
371  if (logger.check(LOG_LEVEL_DEBUG))
372  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(54 + key.size()) << "MemcClient " << get_id() << " set_append '" << key << "' (write: " << buf_size << ')');
373 
374  if (on_store != NULL) {
375  QueueItem& item = writer.pq.item;
376  item.type = QueueItem::tSTORE;
377  item.on_reply = (void*)on_store;
378  item.on_error = on_error;
379  item.data = key;
380  }
381  return true;
382  }
383 
394  bool set_prepend(const SubString& key, const SubString& value, OnStore* on_store=NULL, OnError* on_error=NULL) {
395  if (get_state() == sNONE)
396  return false;
397 
398  StringInt<StrSizeT,0> val_size_str(value.size(), fDEC, false);
399  size_t buf_size = 13 + key.size() + val_size_str.size() + NEWLINE_LEN + value.size() + NEWLINE_LEN;
400  if (on_store == NULL)
401  buf_size += 8;
402 
403  RequestWriter writer(*this, buf_size);
404  if (writer.error())
405  return false;
406  writer.add("prepend ", 8);
407  writer.add(key.data(), key.size());
408  writer.add(" 0 0 ", 5);
409  writer.add(val_size_str.data(), val_size_str.size());
410  if (on_store == NULL)
411  writer.add(" noreply", 8);
412  writer.add("\r\n", NEWLINE_LEN);
413  writer.add(value.data(), value.size());
414  writer.add("\r\n", NEWLINE_LEN);
415  if (logger.check(LOG_LEVEL_DEBUG))
416  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(54 + key.size()) << "MemcClient " << get_id() << " set_append '" << key << "' (write: " << buf_size << ')');
417 
418  if (on_store != NULL) {
419  QueueItem& item = writer.pq.item;
420  item.type = QueueItem::tSTORE;
421  item.on_reply = (void*)on_store;
422  item.on_error = on_error;
423  item.data = key;
424  }
425  return true;
426  }
427 
441  bool set_add(const SubString& key, const SubString& value, uint32 flags=0, int64 expire=0, OnStore* on_store=NULL, OnError* on_error=NULL) {
442  if (get_state() == sNONE)
443  return false;
444 
445  StringInt<uint32,0> flags_str(flags, fDEC, false);
446  StringInt<int64,0> expire_str(expire, fDEC, false);
447  StringInt<StrSizeT,0> val_size_str(value.size(), fDEC, false);
448 
449  size_t buf_size = 7 + key.size() + flags_str.size() + expire_str.size() + val_size_str.size() + NEWLINE_LEN + value.size() + NEWLINE_LEN;
450  if (on_store == NULL)
451  buf_size += 8;
452 
453  RequestWriter writer(*this, buf_size);
454  if (writer.error())
455  return false;
456  writer.add("add ", 4);
457  writer.add(key.data(), key.size());
458  writer.add(' ').add(flags_str.data(), flags_str.size());
459  writer.add(' ').add(expire_str.data(), expire_str.size());
460  writer.add(' ').add(val_size_str.data(), val_size_str.size());
461  if (on_store == NULL)
462  writer.add(" noreply", 8);
463  writer.add("\r\n", NEWLINE_LEN);
464  writer.add(value.data(), value.size());
465  writer.add("\r\n", NEWLINE_LEN);
466  if (logger.check(LOG_LEVEL_DEBUG))
467  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(44 + key.size()) << "MemcClient " << get_id() << " set_add '" << key << "' (write: " << buf_size << ')');
468 
469  if (on_store != NULL) {
470  QueueItem& item = writer.pq.item;
471  item.type = QueueItem::tSTORE;
472  item.on_reply = (void*)on_store;
473  item.on_error = on_error;
474  item.data = key;
475  }
476  return true;
477  }
478 
492  bool set_replace(const SubString& key, const SubString& value, uint32 flags=0, int64 expire=0, OnStore* on_store=NULL, OnError* on_error=NULL) {
493  if (get_state() == sNONE)
494  return false;
495 
496  StringInt<uint32,0> flags_str(flags, fDEC, false);
497  StringInt<int64,0> expire_str(expire, fDEC, false);
498  StringInt<StrSizeT,0> val_size_str(value.size(), fDEC, false);
499 
500  size_t buf_size = 11 + key.size() + flags_str.size() + expire_str.size() + val_size_str.size() + NEWLINE_LEN + value.size() + NEWLINE_LEN;
501  if (on_store == NULL)
502  buf_size += 8;
503 
504  RequestWriter writer(*this, buf_size);
505  if (writer.error())
506  return false;
507  writer.add("replace ", 8);
508  writer.add(key.data(), key.size());
509  writer.add(' ').add(flags_str.data(), flags_str.size());
510  writer.add(' ').add(expire_str.data(), expire_str.size());
511  writer.add(' ').add(val_size_str.data(), val_size_str.size());
512  if (on_store == NULL)
513  writer.add(" noreply", 8);
514  writer.add("\r\n", NEWLINE_LEN);
515  writer.add(value.data(), value.size());
516  writer.add("\r\n", NEWLINE_LEN);
517  if (logger.check(LOG_LEVEL_DEBUG))
518  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(48 + key.size()) << "MemcClient " << get_id() << " set_replace '" << key << "' (write: " << buf_size << ')');
519 
520  if (on_store != NULL) {
521  QueueItem& item = writer.pq.item;
522  item.type = QueueItem::tSTORE;
523  item.on_reply = (void*)on_store;
524  item.on_error = on_error;
525  item.data = key;
526  }
527  return true;
528  }
529 
542  bool increment(const SubString& key, uint64 count=1, bool decrement=false, OnIncrement* on_increment=NULL, OnError* on_error=NULL) {
543  if (get_state() == sNONE)
544  return false;
545 
546  StringInt<uint64> count_str(count, fDEC, true);
547  StrSizeT buf_size = 6 + key.size() + count_str.size() + NEWLINE_LEN;
548  if (on_increment == NULL)
549  buf_size += 8;
550 
551  RequestWriter writer(*this, buf_size);
552  if (writer.error())
553  return false;
554  SubString msg_str(writer.ptr(), buf_size - NEWLINE_LEN);
555  writer.add((decrement ? "decr " : "incr "), 5);
556  writer.add(key.data(), key.size());
557  writer.add(' ').add(count_str.data(), count_str.size());
558  if (on_increment == NULL)
559  writer.add(" noreply", 8);
560  writer.add("\r\n", NEWLINE_LEN);
561  if (logger.check(LOG_LEVEL_DEBUG))
562  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(22 + msg_str.size()) << "MemcClient " << get_id() << ' ' << msg_str);
563 
564  if (on_increment != NULL) {
565  QueueItem& item = writer.pq.item;
566  item.type = QueueItem::tINCREMENT;
567  item.on_reply = (void*)on_increment;
568  item.on_error = on_error;
569  item.data = key;
570  }
571  return true;
572  }
573 
585  bool incr(const SubString& key, uint64 count=1, OnIncrement* on_increment=NULL, OnError* on_error=NULL) {
586  return increment(key, count, false, on_increment, on_error);
587  }
588 
600  bool decr(const SubString& key, uint64 count=1, OnIncrement* on_increment=NULL, OnError* on_error=NULL) {
601  return increment(key, count, true, on_increment, on_error);
602  }
603 
612  bool remove(const SubString& key, OnRemove* on_remove=NULL, OnError* on_error=NULL) {
613  if (get_state() == sNONE)
614  return false;
615 
616  StrSizeT buf_size = 7 + key.size() + NEWLINE_LEN;
617  if (on_remove == NULL)
618  buf_size += 8;
619 
620  RequestWriter writer(*this, buf_size);
621  if (writer.error())
622  return false;
623  SubString msg_str(writer.ptr(), buf_size - NEWLINE_LEN);
624  writer.add("delete ", 7);
625  writer.add(key.data(), key.size());
626  if (on_remove == NULL)
627  writer.add(" noreply", 8);
628  writer.add("\r\n", NEWLINE_LEN);
629  if (logger.check(LOG_LEVEL_DEBUG))
630  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(22 + msg_str.size()) << "MemcClient " << get_id() << ' ' << msg_str);
631 
632  if (on_remove != NULL) {
633  QueueItem& item = writer.pq.item;
634  item.type = QueueItem::tDELETE;
635  item.on_reply = (void*)on_remove;
636  item.on_error = on_error;
637  item.data = key;
638  }
639  return true;
640  }
641 
652  bool touch(const SubString& key, int64 expire, OnTouch* on_touch, OnError* on_error=NULL) {
653  if (get_state() == sNONE)
654  return false;
655 
656  StringInt<uint64,0> expire_str(expire, fDEC, false);
657  StrSizeT buf_size = 7 + key.size() + expire_str.size() + NEWLINE_LEN;
658  if (on_touch == NULL)
659  buf_size += 8;
660 
661  RequestWriter writer(*this, buf_size);
662  if (writer.error())
663  return false;
664  SubString msg_str(writer.ptr(), buf_size - NEWLINE_LEN);
665  writer.add("touch ", 6);
666  writer.add(key.data(), key.size());
667  writer.add(' ').add(expire_str.data(), expire_str.size());
668  if (on_touch == NULL)
669  writer.add(" noreply", 8);
670  writer.add("\r\n", NEWLINE_LEN);
671  if (logger.check(LOG_LEVEL_DEBUG))
672  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(22 + msg_str.size()) << "MemcClient " << get_id() << ' ' << msg_str);
673 
674  if (on_touch != NULL) {
675  QueueItem& item = writer.pq.item;
676  item.type = QueueItem::tTOUCH;
677  item.on_reply = (void*)on_touch;
678  item.on_error = on_error;
679  item.data = key;
680  }
681  return true;
682  }
683 
697  bool get(const SubString& key, OnGet& on_get, OnError* on_error=NULL, bool track_notfound=false, int64* expire=NULL) {
698  if (get_state() == sNONE)
699  return false;
700 
701  StringInt<uint64> expire_str;
702  StrSizeT buf_size = 4 + key.size() + NEWLINE_LEN;
703  if (expire != NULL) {
704  expire_str.set(*expire);
705  buf_size += 1 + expire_str.size();
706  }
707 
708  RequestWriter writer(*this, buf_size);
709  if (writer.error())
710  return false;
711  SubString msg_str(writer.ptr(), buf_size - NEWLINE_LEN);
712  if (expire != NULL) {
713  writer.add("gat ", 4);
714  writer.add(expire_str.data(), expire_str.size());
715  writer.add(" ", 1);
716  } else
717  writer.add("get ", 4);
718  writer.add(key.data(), key.size());
719  writer.add("\r\n", NEWLINE_LEN);
720  if (logger.check(LOG_LEVEL_DEBUG))
721  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(22 + msg_str.size()) << "MemcClient " << get_id() << ' ' << msg_str);
722 
723  QueueItem& item = writer.pq.item;
724  item.type = QueueItem::tGET;
725  item.on_reply = (void*)&on_get;
726  item.on_error = on_error;
727  item.data = key;
728  item.track_notfound = track_notfound;
729  return true;
730  }
731 
746  bool get_cas(const SubString& key, OnGet& on_get, OnError* on_error=NULL, bool track_notfound=false, int64* expire=NULL) {
747  if (get_state() == sNONE)
748  return false;
749 
750  StringInt<uint64> expire_str;
751  StrSizeT buf_size = 5 + key.size() + NEWLINE_LEN;
752  if (expire != NULL) {
753  expire_str.set(*expire);
754  buf_size += 1 + expire_str.size();
755  }
756 
757  RequestWriter writer(*this, buf_size);
758  if (writer.error())
759  return false;
760  SubString msg_str(writer.ptr(), buf_size - NEWLINE_LEN);
761  if (expire != NULL) {
762  writer.add("gats ", 5);
763  writer.add(expire_str.data(), expire_str.size());
764  writer.add(" ", 1);
765  } else
766  writer.add("gets ", 5);
767  writer.add(key.data(), key.size());
768  writer.add("\r\n", NEWLINE_LEN);
769  if (logger.check(LOG_LEVEL_DEBUG))
770  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(22 + msg_str.size()) << "MemcClient " << get_id() << ' ' << msg_str);
771 
772  QueueItem& item = writer.pq.item;
773  item.type = QueueItem::tGET_CAS;
774  item.on_reply = (void*)&on_get;
775  item.on_error = on_error;
776  item.data = key;
777  item.track_notfound = track_notfound;
778  return true;
779  }
780 
793  bool get_touch(const SubString& key, int64 expire, OnGet& on_get, OnError* on_error=NULL, bool track_notfound=false) {
794  return get(key, on_get, on_error, track_notfound, &expire);
795  }
796 
809  bool get_touch_cas(const SubString& key, int64 expire, OnGet& on_get, OnError* on_error=NULL, bool track_notfound=false) {
810  return get_cas(key, on_get, on_error, track_notfound, &expire);
811  }
812 
820  static int64 calc_expire_time(int64 seconds, int64 base=0) {
821  const int64 TIMESTAMP_THRESHOLD = 2592000; // 30 days
822  if (seconds > TIMESTAMP_THRESHOLD) {
823  if (base <= 0) {
825  ts.set_utc();
826  base = ts.get_unix_timestamp();
827  }
828  return base + seconds;
829  }
830  return seconds;
831  }
832 
833 private:
834  static const size_t NEWLINE_LEN = 2;
835 
836  typedef impl_memc::ClientQueueItem QueueItem;
837 
838  struct ValueParams {
839  typedef Pair<SubString,bool> KeyFlagPair;
840  typedef List<KeyFlagPair> KeyFlags;
841 
842  SubString key;
843  uint32 flags;
844  ulong size;
845  uint64 cas_id;
846  KeyFlags key_flags; // Flags for keys received -- used with track_notfound
847 
848  ValueParams() : flags(0), size(0), cas_id(0) {
849  }
850 
851  ValueParams& clear() {
852  key.set();
853  flags = 0;
854  size = 0;
855  cas_id = 0;
856  key_flags.set();
857  return *this;
858  }
859 
860  void parse(const SubString& params_str) {
861  const char DELIM = ' ';
862  StrTokWord tok(params_str);
863  for (;;) {
864  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); key = tok.value();
865  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); flags = tok.value().getnum<uint32>(fDEC);
866  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); size = tok.value().getnum<ulong>(fDEC);
867  EVO_TOK_NEXT_OR_BREAK(tok, DELIM); cas_id = tok.value().getnum<uint64>();
868  break;
869  }
870  }
871 
872  void init_key_flags(const SubString& data) {
873  key_flags.clear();
874  StrTok tok(data);
875  while (tok.nextw(' '))
876  key_flags.add(KeyFlagPair(tok.value(), false));
877  }
878 
879  void no_key_flags() {
880  key_flags.set();
881  }
882 
883  void set_key_flag() {
884  for (KeyFlags::IterM iter(key_flags); iter; ++iter) {
885  KeyFlagPair& item(*iter);
886  if (item.first == key && !item.second) {
887  item.second = true;
888  break;
889  }
890  }
891  }
892  };
893 
894  QueueItem::Type cur_type_;
895  QueueItem cur_item_;
896  ValueParams value_params_;
897 
898  friend class AsyncClient<MemcachedClient, QueueItem>;
899  friend class evo::AsyncBuffers;
900 
901  void on_connect() {
902  cur_type_ = QueueItem::tNONE;
903  }
904 
905  void on_error(AsyncError err) {
906  if (cur_type_ != QueueItem::tNONE && cur_item_.on_error != NULL) {
907  ((OnError*)cur_item_.on_error)->on_error(err);
908  cur_item_.on_error = NULL;
909  }
910  }
911 
912  void on_close() {
913  }
914 
915  bool on_read_fixed(SizeT& next_size, SubString& data, void* context) {
916  EVO_PARAM_UNUSED(next_size);
917  EVO_PARAM_UNUSED(context);
918 
919  // Value data
920  data.stripr("\r\n", NEWLINE_LEN, 1);
921  switch (cur_type_) {
922  case QueueItem::tGET_CAS:
923  if (logger.check(LOG_LEVEL_DEBUG))
924  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(64 + value_params_.key.size()) << "MemcClient " << get_id() << " on_get_cas '" << value_params_.key << "' " << value_params_.cas_id);
925  ((OnGet*)cur_item_.on_reply)->on_get_cas(value_params_.key, data, value_params_.flags, value_params_.cas_id);
926  break;
927  case QueueItem::tGET:
928  if (logger.check(LOG_LEVEL_DEBUG))
929  logger.log_direct(LOG_LEVEL_DEBUG, String().reserve(34 + value_params_.key.size()) << "MemcClient " << get_id() << " on_get '" << value_params_.key << '\'');
930  ((OnGet*)cur_item_.on_reply)->on_get(value_params_.key, data, value_params_.flags);
931  break;
932  default:
933  assert( false ); // shouldn't happen
934  break;
935  }
936  return true;
937  }
938 
939  // Helper for common response error checks, used in on_read() -- undef'd below
940  #define EVO_HELPER_RESPONSE_ERROR_CHECKS(EVENT_NAME) \
941  if (reply_str.ends(STR_ERROR)) { \
942  line.truncate(MAX_ERROR_LENGTH); \
943  if (logger.check(LOG_LEVEL_ERROR)) \
944  logger.log_direct(LOG_LEVEL_ERROR, logstr.set().reserve(30 + line.size()) << "MemcClient " << get_id() << ": " << line); \
945  return false; \
946  } else { \
947  logger.log(LOG_LEVEL_ERROR, "MemcClient protocol error on " EVENT_NAME); \
948  return false; \
949  }
950 
951  bool on_read(SizeT& fixed_size, AsyncBuffers& buffers, void* context) {
952  const char DELIM = ' ';
953  const SubString STR_NOTFOUND("NOT_FOUND", 9);
954  const SubString STR_VALUE("VALUE", 5);
955  const SubString STR_END("END", 3);
956  const SubString STR_ERROR("ERROR", 5);
957  const ulong MAX_ERROR_LENGTH = 200;
958 
959  String logstr;
960  SubString line;
961  while (buffers.read_line(line)) {
962  SubString reply_str, params_str;
963  if (cur_type_ == QueueItem::tGET || cur_type_ == QueueItem::tGET_CAS) {
964  // Still reading GET response
965  line.split(DELIM, reply_str, params_str);
966  if (reply_str == STR_VALUE) {
967  value_params_.clear().parse(params_str);
968  value_params_.set_key_flag();
969  buffers.read_flush();
970  if (!buffers.read_fixed_helper(*this, fixed_size, value_params_.size + NEWLINE_LEN, 0, context))
971  return false;
972  if (fixed_size > 0)
973  return true;
974  } else if (reply_str == STR_END) {
975  cur_type_ = QueueItem::tNONE;
976  buffers.read_flush();
977  if (cur_item_.track_notfound) {
978  String keys_notfound;
979  for (ValueParams::KeyFlags::Iter iter(value_params_.key_flags); iter; ++iter) {
980  const ValueParams::KeyFlagPair& item(*iter);
981  if (!item.second)
982  keys_notfound.addsep(' ').add(item.first);
983  }
984  if (logger.check(LOG_LEVEL_DEBUG_LOW)) {
985  if (keys_notfound.size() == 0)
986  logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(48) << "MemcClient " << get_id() << " on_get_end, no notfound");
987  else
988  logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(48 + keys_notfound.size()) << "MemcClient " << get_id() << " on_get_end, notfound: '" << keys_notfound << '\'');
989  }
990  ((OnGet*)cur_item_.on_reply)->on_get_end(keys_notfound);
991  } else {
992  if (logger.check(LOG_LEVEL_DEBUG_LOW))
993  logger.log_direct(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(34) << "MemcClient " << get_id() << " on_get_end");
994  ((OnGet*)cur_item_.on_reply)->on_get_end(SubString());
995  }
996  } else EVO_HELPER_RESPONSE_ERROR_CHECKS("GET VALUE");
997  continue; // next line
998  }
999 
1000  // Next reply
1001  if (!queue_.pop(cur_item_)) {
1002  logger.log(LOG_LEVEL_ERROR, "MemcClient internal error: Unexpected empty queue for response");
1003  return false;
1004  }
1005  line.split(DELIM, reply_str, params_str);
1006  switch (cur_item_.type) {
1007  case QueueItem::tSTORE: {
1009  if (result != Memcached::srUNKNOWN) {
1010  if (logger.check(LOG_LEVEL_DEBUG))
1011  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(36 + reply_str.size()) << "MemcClient " << get_id() << " on_store " << reply_str);
1012  OnStore* on_store = (OnStore*)cur_item_.on_reply;
1013  on_store->on_store(cur_item_.data, result);
1014  } else EVO_HELPER_RESPONSE_ERROR_CHECKS("STORE");
1015  break;
1016  }
1017 
1018  case QueueItem::tINCREMENT: {
1019  UInt64 count;
1020  if (reply_str != STR_NOTFOUND) {
1021  count = reply_str.getnum<uint64>();
1022  if (count.null()) {
1023  EVO_HELPER_RESPONSE_ERROR_CHECKS("INCR/DECR");
1024  }
1025  }
1026 
1027  if (logger.check(LOG_LEVEL_DEBUG))
1028  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(36 + reply_str.size()) << "MemcClient " << get_id() << " on_increment " << reply_str);
1029  OnIncrement* on_increment = (OnIncrement*)cur_item_.on_reply;
1030  on_increment->on_increment(cur_item_.data, count);
1031  break;
1032  }
1033 
1034  case QueueItem::tDELETE: {
1035  const SubString STR_DELETED("DELETED", 7);
1036  bool removed;
1037  if (reply_str == STR_DELETED) {
1038  removed = true;
1039  } else if (reply_str == STR_NOTFOUND) {
1040  removed = false;
1041  } else EVO_HELPER_RESPONSE_ERROR_CHECKS("DELETE");
1042 
1043  if (logger.check(LOG_LEVEL_DEBUG))
1044  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(36 + reply_str.size()) << "MemcClient " << get_id() << " on_remove " << reply_str);
1045  OnRemove* on_remove = (OnRemove*)cur_item_.on_reply;
1046  on_remove->on_remove(cur_item_.data, removed);
1047  break;
1048  }
1049 
1050  case QueueItem::tTOUCH: {
1051  const SubString STR_TOUCHED("TOUCHED", 7);
1052  bool touched;
1053  if (reply_str == STR_TOUCHED) {
1054  touched = true;
1055  } else if (reply_str == STR_NOTFOUND) {
1056  touched = false;
1057  } else EVO_HELPER_RESPONSE_ERROR_CHECKS("TOUCH");
1058 
1059  if (logger.check(LOG_LEVEL_DEBUG))
1060  logger.log_direct(LOG_LEVEL_DEBUG, logstr.set().reserve(36 + reply_str.size()) << "MemcClient " << get_id() << " on_touch " << reply_str);
1061  OnTouch* on_touch = (OnTouch*)cur_item_.on_reply;
1062  on_touch->on_touch(cur_item_.data, touched);
1063  break;
1064  }
1065 
1066  case QueueItem::tGET_CAS:
1067  case QueueItem::tGET: {
1068  if (reply_str == STR_VALUE) {
1069  cur_type_ = cur_item_.type;
1070  value_params_.clear();
1071  if (cur_item_.track_notfound)
1072  value_params_.init_key_flags(cur_item_.data); // track flags for whether each key is found
1073  else
1074  value_params_.no_key_flags();
1075  value_params_.parse(params_str);
1076  value_params_.set_key_flag();
1077  buffers.read_flush();
1078  if (!buffers.read_fixed_helper(*this, fixed_size, value_params_.size + NEWLINE_LEN, 0, context))
1079  return false;
1080  if (fixed_size > 0)
1081  return true;
1082  continue; // next line
1083  } else if (reply_str == STR_END) {
1084  // No keys found
1085  assert( cur_type_ == QueueItem::tNONE );
1086  if (cur_item_.on_reply != NULL) {
1087  if (cur_item_.track_notfound) {
1088  if (logger.check(LOG_LEVEL_DEBUG_LOW))
1089  logger.log(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(48) << "MemcClient " << get_id() << " on_get_end, none found");
1090  ((OnGet*)cur_item_.on_reply)->on_get_end(cur_item_.data);
1091  } else {
1092  if (logger.check(LOG_LEVEL_DEBUG_LOW))
1093  logger.log(LOG_LEVEL_DEBUG_LOW, logstr.set().reserve(48) << "MemcClient " << get_id() << " on_get_end");
1094  ((OnGet*)cur_item_.on_reply)->on_get_end(SubString());
1095  }
1096  }
1097  } else EVO_HELPER_RESPONSE_ERROR_CHECKS("GET");
1098  break;
1099  }
1100 
1101  default:
1102  if (logger.check(LOG_LEVEL_ERROR))
1103  logger.log_direct(LOG_LEVEL_ERROR, logstr.set().reserve(52) << "MemcClient internal error: Bad queue item type: " << (int)cur_item_.type);
1104  return false;
1105  }
1106  buffers.read_flush();
1107  }
1108  return true;
1109  }
1110 
1111  #undef EVO_HELPER_RESPONSE_ERROR_CHECKS
1112 
1113 private:
1114  // Disable copying
1115  MemcachedClient(const MemcachedClient&);
1116  MemcachedClient& operator=(const MemcachedClient&);
1117 };
1118 
1120 
1121 }
1122 }
1123 #endif
Holds a system timestamp as native (platform specific) fields.
Definition: systime.h:27
High-level debug message, used for showing debug info for higher-level behavior (DBUG) ...
Definition: logger.h:136
Template class for an async I/O client.
Definition: ioasync_client.h:28
virtual void on_store(const SubString &key, Memcached::StoreResult result)
Definition: memcached_client.h:164
General types used when implementing MemcachedClient or server callbacks.
Definition: memcached_common.h:23
bool get_touch_cas(const SubString &key, int64 expire, OnGet &on_get, OnError *on_error=NULL, bool track_notfound=false)
Send a request to get value for one or more keys for Compare-And-Swap and touch (update) the stored e...
Definition: memcached_client.h:809
Base interface for on_get() and on_get_end() events.
Definition: memcached_client.h:204
virtual ~OnRemove()
Definition: memcached_client.h:183
Nullable< T > & set()
Set as null.
Definition: type.h:342
Base interface used as a shortcut that inherits all the non-error event interfaces: OnConnect...
Definition: memcached_client.h:227
virtual void on_remove(const SubString &key, bool removed)
Definition: memcached_client.h:186
void set_utc()
Set to current date/time (UTC).
Definition: systime.h:149
bool set_prepend(const SubString &key, const SubString &value, OnStore *on_store=NULL, OnError *on_error=NULL)
Send a request to prepend to existing value for key.
Definition: memcached_client.h:394
virtual ~OnTouch()
Definition: memcached_client.h:194
Base interface for on_store() event.
Definition: memcached_client.h:158
String tokenizer adapter used internally to create variants of existing tokenizers – do not use dire...
Definition: strtok.h:1401
virtual ~OnGet()
Definition: memcached_client.h:205
virtual void on_touch(const SubString &key, bool touched)
Definition: memcached_client.h:197
void swap(T &a, T &b)
Swap contents of given objects.
Definition: sys.h:1602
String fixed-size buffer for formatting an integer.
Definition: string.h:54
static int64 calc_expire_time(int64 seconds, int64 base=0)
Calculate memcached expiration time for given number of seconds from now.
Definition: memcached_client.h:820
bool increment(const SubString &key, uint64 count=1, bool decrement=false, OnIncrement *on_increment=NULL, OnError *on_error=NULL)
Send a request to increment or decrement value for given key.
Definition: memcached_client.h:542
Basic integer type.
Definition: type.h:980
bool get_touch(const SubString &key, int64 expire, OnGet &on_get, OnError *on_error=NULL, bool track_notfound=false)
Send a request to get value for one or more keys and touch (update) the stored expiration time...
Definition: memcached_client.h:793
#define EVO_TOK_NEXT_OR_BREAK(TOK, DELIM)
Helper for tokenizing using a break-loop.
Definition: strtok.h:1854
Size size() const
Get size.
Holds data for async I/O buffers (used internally with AsyncServer and protocol implementations).
Definition: ioasync_base.h:134
Low-level debug message, used for showing debug info for lower-level internal or library details (DBG...
Definition: logger.h:137
uint32 StrSizeT
Default Evo string size type.
Definition: sys.h:734
bool incr(const SubString &key, uint64 count=1, OnIncrement *on_increment=NULL, OnError *on_error=NULL)
Send a request to increment value for given key.
Definition: memcached_client.h:585
Base 10: decimal (default)
Definition: str.h:2323
String container.
Definition: string.h:674
ThisType & set()
Set as null.
Definition: string.h:99
SubString & stripr()
Strip right (ending) whitespace (spaces and tabs).
Definition: substring.h:1135
AsyncError
Async I/O error type.
Definition: ioasync_base.h:452
const SubString & value() const
Get current token value from last call to next().
Definition: strtok.h:42
#define EVO_HELPER_RESPONSE_ERROR_CHECKS(EVENT_NAME)
Definition: memcached_client.h:940
bool set_add(const SubString &key, const SubString &value, uint32 flags=0, int64 expire=0, OnStore *on_store=NULL, OnError *on_error=NULL)
Send a request to add a new key and value and fail if key already exists.
Definition: memcached_client.h:441
static StoreResult get_enum(const evo::SubString &key)
Definition: memcached_common.h:56
bool set_cas(const SubString &key, const SubString &value, uint64 cas_id, OnStore &on_store, OnError *on_error=NULL, uint32 flags=0, int64 expire=0)
Send a request to set a key and value using Compare-And-Swap.
Definition: memcached_client.h:336
bool set_replace(const SubString &key, const SubString &value, uint32 flags=0, int64 expire=0, OnStore *on_store=NULL, OnError *on_error=NULL)
Send a request to replace an existing value under key and fail if key doesn&#39;t exist.
Definition: memcached_client.h:492
char * data()
Get formatted string pointer.
Definition: string.h:85
bool nextw(char delim)
Find next token using word delimiter.
Definition: strtok.h:278
Evo C++ Library namespace.
Definition: alg.h:11
Base interface for on_increment() event.
Definition: memcached_client.h:171
StoreResult
Memcached store command result.
Definition: memcached_common.h:34
virtual void on_get_cas(const SubString &key, const SubString &value, uint32 flags, uint64 cas_id)
Definition: memcached_client.h:214
Unknown/invalid result (always first)
Definition: memcached_common.h:35
IntegerT< uint64 > UInt64
Basic integer type (uint64) – see IntegerT.
Definition: type.h:1207
async::Memcached Memcached
Definition: memcached_client.h:159
virtual void on_get_end(const SubString &keys_notfound)
Definition: memcached_client.h:221
MemcachedClient(SizeT max_queue_size=DEFAULT_QUEUE_SIZE, SizeT max_read_size=DEFAULT_MAX_READ)
Constructor to initialize client.
Definition: memcached_client.h:234
Base interface for on_remove() event.
Definition: memcached_client.h:182
String forward tokenizer.
Definition: strtok.h:112
AsyncClient< MemcachedClient, impl_memc::ClientQueueItem > Base
Definition: memcached_client.h:153
virtual ~OnIncrement()
Definition: memcached_client.h:172
virtual ~OnStore()
Definition: memcached_client.h:161
Error message showing something isn&#39;t working as expected, program may be able to work around it (ERR...
Definition: logger.h:133
int64 get_unix_timestamp() const
Get current date/time as Unix timestamp.
Definition: systime.h:159
bool decr(const SubString &key, uint64 count=1, OnIncrement *on_increment=NULL, OnError *on_error=NULL)
Send a request to decrement value for given key.
Definition: memcached_client.h:600
String & set()
Set as null and empty.
Definition: string.h:995
bool touch(const SubString &key, int64 expire, OnTouch *on_touch, OnError *on_error=NULL)
Send a request to touch (update) the expiration time for given key.
Definition: memcached_client.h:652
Stores a key/value pair of independent objects or values.
Definition: pair.h:32
Base interface for on_touch() event.
Definition: memcached_client.h:193
AsyncBuffers()
Constructor (used internally).
Definition: ioasync_base.h:233
SubString & set(const char *data)
Set as reference to terminated string.
Definition: substring.h:353
uint32 SizeT
Default Evo container size type.
Definition: sys.h:729
Reference and access existing string data.
Definition: substring.h:229
bool set_append(const SubString &key, const SubString &value, OnStore *on_store=NULL, OnError *on_error=NULL)
Send a request to append to existing value for key.
Definition: memcached_client.h:350
const char & item(Key index) const
Get item at position.
virtual void on_increment(const SubString &key, const UInt64 &count)
Definition: memcached_client.h:175
Implements Memcached protocol for an async client.
Definition: memcached_client.h:147
bool get_cas(const SubString &key, OnGet &on_get, OnError *on_error=NULL, bool track_notfound=false, int64 *expire=NULL)
Send a request to get value for one or more keys for Compare-And-Swap.
Definition: memcached_client.h:746
Evo Async Memached API, common client/server types.
Size size() const
Get formatting string size.
Definition: string.h:93
#define EVO_PARAM_UNUSED(NAME)
Mark function parameter as unused to suppress "unreferenced parameter" compiler warnings on it...
Definition: sys.h:427
const char * data() const
Get data pointer.
virtual void on_get(const SubString &key, const SubString &value, uint32 flags)
Definition: memcached_client.h:208