Initial support for the binary protocol
author <trond@localhost> <>
Sat, 26 Jul 2008 13:46:42 +0000 (15:46 +0200)
committer <trond@localhost> <>
Sat, 26 Jul 2008 13:46:42 +0000 (15:46 +0200)
28 files changed:
clients/client_options.h
clients/memcat.c
clients/memcp.c
clients/memflush.c
clients/memrm.c
clients/memslap.c
config/byteorder.m4 [new file with mode: 0644]
config/memcached.m4 [new file with mode: 0644]
configure.ac
docs/memcached_behavior.pod
libmemcached/Makefile.am
libmemcached/byteorder.c [new file with mode: 0644]
libmemcached/common.h
libmemcached/memcached_auto.c
libmemcached/memcached_behavior.c
libmemcached/memcached_constants.h
libmemcached/memcached_delete.c
libmemcached/memcached_do.c
libmemcached/memcached_flush.c
libmemcached/memcached_get.c
libmemcached/memcached_io.c
libmemcached/memcached_io.h
libmemcached/memcached_quit.c
libmemcached/memcached_response.c
libmemcached/memcached_stats.c
libmemcached/memcached_storage.c
libmemcached/memcached_version.c
tests/function.c

index b7ba5a968c530bde344e5f4162a3878930f1ac43..33a0f81b6d25c48763f6e2f0cd765cff6fba8461 100644 (file)
@@ -22,6 +22,7 @@ typedef enum {
   OPT_SLAP_TCP_NODELAY,
   OPT_FLUSH,
   OPT_HASH,
+  OPT_BINARY,
 } memcached_options;
 
 #endif /* CLIENT_OPTIONS */
index b9db649741e5f5b8bd246b1202a565e660d1036b..36e636223ac7abf3e238980a0d9d767efbaeb22b 100644 (file)
@@ -14,6 +14,7 @@
 /* Prototypes */
 void options_parse(int argc, char *argv[]);
 
+static int opt_binary= 0;
 static int opt_verbose= 0;
 static int opt_displayflag= 0;
 static char *opt_servers= NULL;
@@ -50,6 +51,7 @@ int main(int argc, char *argv[])
 
   memcached_server_push(memc, servers);
   memcached_server_list_free(servers);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
 
   while (optind < argc) 
   {
@@ -113,6 +115,7 @@ void options_parse(int argc, char *argv[])
       {"servers", required_argument, NULL, OPT_SERVERS},
       {"flag", no_argument, &opt_displayflag, OPT_FLAG},
       {"hash", required_argument, NULL, OPT_HASH},
+      {"binary", no_argument, NULL, OPT_BINARY},
       {0, 0, 0, 0},
     };
 
@@ -124,6 +127,9 @@ void options_parse(int argc, char *argv[])
     {
     case 0:
       break;
+    case OPT_BINARY:
+      opt_binary = 1;
+      break;
     case OPT_VERBOSE: /* --verbose or -v */
       opt_verbose = OPT_VERBOSE;
       break;
index cf06a573bed5d20d4d9901d787b8d502dbdb5040..110a8559660f422e0a8dff48c08b418792b881cf 100644 (file)
@@ -22,6 +22,7 @@
 /* Prototypes */
 void options_parse(int argc, char *argv[]);
 
+static int opt_binary=0;
 static int opt_verbose= 0;
 static char *opt_servers= NULL;
 static char *opt_hash= NULL;
@@ -60,6 +61,7 @@ int main(int argc, char *argv[])
 
   memcached_server_push(memc, servers);
   memcached_server_list_free(servers);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
 
   while (optind < argc) 
   {
@@ -172,6 +174,7 @@ void options_parse(int argc, char *argv[])
       {"add",  no_argument, NULL, OPT_ADD},
       {"replace",  no_argument, NULL, OPT_REPLACE},
       {"hash", required_argument, NULL, OPT_HASH},
+      {"binary", no_argument, NULL, OPT_BINARY},
       {0, 0, 0, 0},
     };
 
@@ -185,6 +188,9 @@ void options_parse(int argc, char *argv[])
     {
     case 0:
       break;
+    case OPT_BINARY:
+      opt_binary = 1;
+      break;
     case OPT_VERBOSE: /* --verbose or -v */
       opt_verbose = OPT_VERBOSE;
       break;
index 3bf25c34867f3eb1ce8af5375843de392f0bd614..5208e5cd0f4385cefe37b0e09a3a46adc8351d89 100644 (file)
@@ -6,6 +6,7 @@
 #include "client_options.h"
 #include "utilities.h"
 
+static int opt_binary= 0;
 static int opt_verbose= 0;
 static time_t opt_expire= 0;
 static char *opt_servers= NULL;
@@ -42,6 +43,7 @@ int main(int argc, char *argv[])
   servers= memcached_servers_parse(opt_servers);
   memcached_server_push(memc, servers);
   memcached_server_list_free(servers);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
   
   rc = memcached_flush(memc, opt_expire);
   if (rc != MEMCACHED_SUCCESS) 
@@ -76,6 +78,7 @@ void options_parse(int argc, char *argv[])
     {"debug", no_argument, &opt_verbose, OPT_DEBUG},
     {"servers", required_argument, NULL, OPT_SERVERS},
     {"expire", required_argument, NULL, OPT_EXPIRE},
+    {"binary", no_argument, NULL, OPT_BINARY},
     {0, 0, 0, 0},
   };
   int option_index= 0;
@@ -89,6 +92,9 @@ void options_parse(int argc, char *argv[])
     {
     case 0:
       break;
+    case OPT_BINARY:
+      opt_binary = 1;
+      break;
     case OPT_VERBOSE: /* --verbose or -v */
       opt_verbose = OPT_VERBOSE;
       break;
index 05fe404b8de2fab3d9a94a2cd12eb1e90be32859..1570945eb5141d5b3e6d4b2be113031a45907bce 100644 (file)
@@ -6,6 +6,7 @@
 #include "client_options.h"
 #include "utilities.h"
 
+static int opt_binary= 0;
 static int opt_verbose= 0;
 static time_t opt_expire= 0;
 static char *opt_servers= NULL;
@@ -44,6 +45,7 @@ int main(int argc, char *argv[])
   servers= memcached_servers_parse(opt_servers);
   memcached_server_push(memc, servers);
   memcached_server_list_free(servers);
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
   
   while (optind < argc) 
   {
@@ -90,6 +92,7 @@ void options_parse(int argc, char *argv[])
     {"servers", required_argument, NULL, OPT_SERVERS},
     {"expire", required_argument, NULL, OPT_EXPIRE},
     {"hash", required_argument, NULL, OPT_HASH},
+    {"binary", no_argument, NULL, OPT_BINARY},
     {0, 0, 0, 0},
   };
   int option_index= 0;
@@ -103,6 +106,9 @@ void options_parse(int argc, char *argv[])
     {
     case 0:
       break;
+    case OPT_BINARY:
+      opt_binary = 1;
+      break;
     case OPT_VERBOSE: /* --verbose or -v */
       opt_verbose = OPT_VERBOSE;
       break;
index 6fc8176f75cbd1e855b24078f23297bbdb6eb038..ddb18f3e7f303b9a95081cf1c269677b3a8c087d 100644 (file)
@@ -67,6 +67,7 @@ pairs_st *load_create_data(memcached_st *memc, unsigned int number_of,
                            unsigned int *actual_loaded);
 void flush_all(memcached_st *memc);
 
+static int opt_binary= 0;
 static int opt_verbose= 0;
 static int opt_flush= 0;
 static int opt_non_blocking_io= 0;
@@ -140,6 +141,8 @@ void scheduler(memcached_server_st *servers, conclusions_st *conclusion)
   memc= memcached_create(NULL);
   memcached_server_push(memc, servers);
 
+  memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
+  
   if (opt_flush)
     flush_all(memc);
   if (opt_createial_load)
@@ -243,6 +246,7 @@ void options_parse(int argc, char *argv[])
       {"test", required_argument, NULL, OPT_SLAP_TEST},
       {"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
       {"version", no_argument, NULL, OPT_VERSION},
+      {"binary", no_argument, NULL, OPT_BINARY},
       {0, 0, 0, 0},
     };
 
@@ -257,6 +261,9 @@ void options_parse(int argc, char *argv[])
     {
     case 0:
       break;
+    case OPT_BINARY:
+      opt_binary = 1;
+      break;
     case OPT_VERBOSE: /* --verbose or -v */
       opt_verbose = OPT_VERBOSE;
       break;
diff --git a/config/byteorder.m4 b/config/byteorder.m4
new file mode 100644 (file)
index 0000000..fe88605
--- /dev/null
@@ -0,0 +1,11 @@
+AC_RUN_IFELSE([ 
+   AC_LANG_PROGRAM([
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <inttypes.h>
+      ], [
+if (htonl(5) != 5) {
+   return 1;
+}
+      ])            
+   ], AC_DEFINE([BYTEORDER_BIG_ENDIAN], [1], [Enable big endian byteorder]))
diff --git a/config/memcached.m4 b/config/memcached.m4
new file mode 100644 (file)
index 0000000..140e48a
--- /dev/null
@@ -0,0 +1,38 @@
+dnl ---------------------------------------------------------------------------
+dnl Macro: MEMCACHED_TEST
+dnl ---------------------------------------------------------------------------
+saved_CPPFLAGS="$CPPFLAGS"
+CPPFLAGS=""
+AC_ARG_WITH(memcached,
+       [  --with-memcached=PATH   Specify path to memcached installation ],
+       [
+                if test "x$withval" != "xno" ; then
+                        CPPFLAGS="-I${withval}/include"
+                fi
+       ]
+)
+
+AC_CHECK_HEADER(memcached/protocol_binary.h, [
+                AC_RUN_IFELSE([ 
+                   AC_LANG_PROGRAM([
+                      #include "memcached/protocol_binary.h"
+                   ], [
+                      protocol_binary_request_set request;
+                      if (sizeof(request) != sizeof(request.bytes)) {
+                         return 1;
+                      }
+                   ])
+                ],[ 
+                   if test "x$CPPFLAGS" != "x" ; then
+                      CPPFLAGS="$saved_CPPFLAGS $CPPFLAGS"
+                   else
+                      CPPFLAGS="$saved_CPPFLAGS"
+                   fi
+               ], AC_MSG_ERROR([Unsupported struct padding done by compiler.])) 
+               ], [
+                  AC_MSG_ERROR([Cannot locate memcached/protocol_binary.])
+               ])
+
+dnl ---------------------------------------------------------------------------
+dnl End Macro: MEMCACHED_TEST
+dnl ---------------------------------------------------------------------------
index 5c9ed3dfcc4f5ac21343351e109cf59a71f882df..83463ac63ea02cd204990ababe340323d699c643 100644 (file)
@@ -51,6 +51,8 @@ AC_LANG_CPLUSPLUS
 
 sinclude(config/debug.m4)
 sinclude(config/dtrace.m4)
+sinclude(config/memcached.m4)
+sinclude(config/byteorder.m4)
 sinclude(config/64bit.m4)
 
 # We only support GCC and Sun's forte at the moment
index f717984d850be918b9defcd23dc9d9c474688552..8821dcdfaff49d88da47d200dbb8baf8c671dd48 100755 (executable)
@@ -121,6 +121,11 @@ sorted order. This will defeat consisten hashing.
 In non-blocking mode this changes the value of the timeout during socket
 connection.
 
+=item MEMCACHED_BEHAVIOR_BINARY_PROTOCOL
+
+Enable the use of the binary protocol. Please note that you cannot toggle
+this flag on an open connection.
+
 =back
 
 =head1 RETURN
index 550b4890fc490b4148e2b8cddf892db662517283..7cb12a01408eaf97add3d56df39ae97b883cbe7d 100644 (file)
@@ -67,7 +67,8 @@ libmemcached_la_SOURCES = crc.c \
                           memcached_strerror.c \
                          memcached_verbosity.c \
                          memcached_version.c \
-                         murmur_hash.c
+                         murmur_hash.c \
+                          byteorder.c
 
 libmemcached_la_LIBADD =
 libmemcached_la_LDFLAGS = -version-info $(MEMCACHED_LIBRARY_VERSION)
diff --git a/libmemcached/byteorder.c b/libmemcached/byteorder.c
new file mode 100644 (file)
index 0000000..f407fa0
--- /dev/null
@@ -0,0 +1,28 @@
+#include "common.h"
+
+/* Byte swap a 64-bit number. */
+static inline uint64_t swap64(uint64_t in) {
+#ifndef BYTEORDER_BIG_ENDIAN
+  /* Little endian, flip the bytes around until someone makes a faster/better
+   * way to do this. */
+  uint64_t rv= 0;
+  int i= 0;
+  for(i= 0; i < 8; i++) 
+  {
+    rv= (rv << 8) | (in & 0xff);
+    in >>= 8;
+  }
+  return rv;
+#else
+  /* big-endian machines don't need byte swapping */
+  return in;
+#endif
+}
+
+uint64_t ntohll(uint64_t value) {
+  return swap64(value);
+}
+
+uint64_t htonll(uint64_t value) {
+  return swap64(value);
+}
index 0c9b87ce3f3478eb8e7984307c7583770276d33f..90c8a140b7e3ef4b41013de55b30093261712ad2 100644 (file)
@@ -38,6 +38,7 @@
 #include <memcached.h>
 #include "memcached_io.h"
 
+#include "memcached/protocol_binary.h"
 #include <libmemcached_config.h>
 
 #if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
@@ -80,6 +81,7 @@ typedef enum {
   MEM_VERIFY_KEY= (1 << 10),
   /* 11 used for weighted ketama */
   MEM_KETAMA_WEIGHTED= (1 << 11),
+  MEM_BINARY_PROTOCOL= (1 << 12),
 } memcached_flags;
 
 /* Hashing algo */
@@ -100,7 +102,7 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death);
 #define memcached_server_response_decrement(A) (A)->cursor_active--
 #define memcached_server_response_reset(A) (A)->cursor_active=0
 
-memcached_return memcached_do(memcached_server_st *ptr, const char *commmand,
+memcached_return memcached_do(memcached_server_st *ptr, const void *commmand,
                               size_t command_length, uint8_t with_flush);
 memcached_return memcached_version(memcached_st *ptr);
 memcached_return value_fetch(memcached_server_st *ptr,
@@ -114,4 +116,8 @@ memcached_return memcachd_key_test(char **keys, size_t *key_length,
 memcached_return run_distribution(memcached_st *ptr);
 
 uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length);
+
+extern uint64_t ntohll(uint64_t);
+extern uint64_t htonll(uint64_t);
+
 #endif /* __COMMON_H__ */
index c847f9369e095391696cde34b66b8147021ac6bc..449d4c4e23b5f6f341986f4885143c9fe63a9366 100644 (file)
@@ -62,6 +62,45 @@ static memcached_return memcached_auto(memcached_st *ptr,
   return rc;
 }
 
+static memcached_return binary_incr_decr(memcached_st *ptr, uint8_t cmd,
+                                         const char *key, size_t key_length,
+                                         uint32_t offset, uint64_t *value) 
+{
+  unsigned int server_key;
+
+  unlikely (key_length == 0)
+    return MEMCACHED_NO_KEY_PROVIDED;
+
+  unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
+    return MEMCACHED_NO_SERVERS;
+
+  if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
+    return MEMCACHED_BAD_KEY_PROVIDED;
+
+  server_key= memcached_generate_hash(ptr, key, key_length);
+
+  protocol_binary_request_incr request= {0};
+
+  request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+  request.message.header.request.opcode= cmd;
+  request.message.header.request.keylen= htons((uint16_t)key_length);
+  request.message.header.request.extlen= 20;
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+  request.message.header.request.bodylen= htonl(key_length + request.message.header.request.extlen);  
+  request.message.body.delta= htonll(offset);
+  
+  /* TODO: The binary protocol allows you to specify initial and expiry time */
+  if ((memcached_do(&ptr->hosts[server_key], request.bytes,
+                    sizeof(request.bytes), 0)!=MEMCACHED_SUCCESS) ||
+      (memcached_io_write(&ptr->hosts[server_key], key, key_length, 1) == -1)) 
+  {
+    memcached_io_reset(&ptr->hosts[server_key]);
+    return MEMCACHED_WRITE_FAILURE;
+  }
+  return memcached_response(&ptr->hosts[server_key], value, sizeof(*value), NULL);
+}
+
 memcached_return memcached_increment(memcached_st *ptr, 
                                      const char *key, size_t key_length,
                                      uint32_t offset,
@@ -70,7 +109,12 @@ memcached_return memcached_increment(memcached_st *ptr,
   memcached_return rc;
 
   LIBMEMCACHED_MEMCACHED_INCREMENT_START();
-  rc= memcached_auto(ptr, "incr", key, key_length, offset, value);
+  if (ptr->flags & MEM_BINARY_PROTOCOL) 
+    rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_INCREMENT, key, 
+                        key_length, offset, value);
+  else 
+     rc= memcached_auto(ptr, "incr", key, key_length, offset, value);
+  
   LIBMEMCACHED_MEMCACHED_INCREMENT_END();
 
   return rc;
@@ -84,7 +128,12 @@ memcached_return memcached_decrement(memcached_st *ptr,
   memcached_return rc;
 
   LIBMEMCACHED_MEMCACHED_DECREMENT_START();
-  rc= memcached_auto(ptr, "decr", key, key_length, offset, value);
+  if (ptr->flags & MEM_BINARY_PROTOCOL) 
+    rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_DECREMENT, key, 
+                        key_length, offset, value);
+  else 
+    rc= memcached_auto(ptr, "decr", key, key_length, offset, value);      
+  
   LIBMEMCACHED_MEMCACHED_DECREMENT_END();
 
   return rc;
index c001c4616d7c588fdc37f8d561f1920fd6919ad9..6b36626e9e07d00a2fa06b7d2525e75bfb3e20ad 100644 (file)
@@ -23,6 +23,9 @@ memcached_return memcached_behavior_set(memcached_st *ptr,
 {
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
+    set_behavior_flag(ptr, MEM_BINARY_PROTOCOL, data);
+    break;     
   case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
     set_behavior_flag(ptr, MEM_SUPPORT_CAS, data);
     break;
@@ -118,6 +121,9 @@ uint64_t memcached_behavior_get(memcached_st *ptr,
 
   switch (flag)
   {
+  case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
+    temp_flag= MEM_BINARY_PROTOCOL;
+    break;     
   case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
     temp_flag= MEM_SUPPORT_CAS;
     break;
index 69c7666b8abdd9220ce4ffce4197e0514c7c0f16..0ee2568f3839a8c02e24a07a02dd56d347bba84e 100644 (file)
@@ -88,6 +88,7 @@ typedef enum {
   MEMCACHED_BEHAVIOR_RETRY_TIMEOUT,
   MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED,
   MEMCACHED_BEHAVIOR_KETAMA_HASH,
+  MEMCACHED_BEHAVIOR_BINARY_PROTOCOL,
 } memcached_behavior;
 
 typedef enum {
index a105f552e096a29be1075e2cfb3287ffe500faba..5ec2fca31891cf3803d5856ebd7694de71f6ef29 100644 (file)
@@ -7,6 +7,12 @@ memcached_return memcached_delete(memcached_st *ptr, const char *key, size_t key
                                  key, key_length, expiration);
 }
 
+static inline memcached_return binary_delete(memcached_st *ptr, 
+                                             unsigned int server_key,
+                                             const char *key, 
+                                             size_t key_length,
+                                            int flush);
+
 memcached_return memcached_delete_by_key(memcached_st *ptr, 
                                          const char *master_key, size_t master_key_length,
                                          const char *key, size_t key_length,
@@ -27,28 +33,33 @@ memcached_return memcached_delete_by_key(memcached_st *ptr,
     return MEMCACHED_NO_SERVERS;
 
   server_key= memcached_generate_hash(ptr, master_key, master_key_length);
-
-  if (expiration)
-    send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                          "delete %s%.*s %llu\r\n", 
-                          ptr->prefix_key,
-                          (int)key_length, key, 
-                          (unsigned long long)expiration);
-  else
-    send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
-                          "delete %s%.*s\r\n", 
-                          ptr->prefix_key,
-                          (int)key_length, key);
-
-  if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
+  to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
+     
+  if (ptr->flags & MEM_BINARY_PROTOCOL) 
+    rc= binary_delete(ptr, server_key, key, key_length, to_write);
+  else 
   {
-    rc= MEMCACHED_WRITE_FAILURE;
-    goto error;
+    if (expiration)
+      send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
+                            "delete %s%.*s %llu\r\n", 
+                            ptr->prefix_key,
+                            (int)key_length, key, 
+                            (unsigned long long)expiration);
+    else
+       send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
+                             "delete %s%.*s\r\n", 
+                             ptr->prefix_key,
+                             (int)key_length, key);
+    
+    if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE) 
+    {
+      rc= MEMCACHED_WRITE_FAILURE;
+      goto error;
+    }
+     
+    rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
   }
 
-  to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
-
-  rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
   if (rc != MEMCACHED_SUCCESS)
     goto error;
 
@@ -70,3 +81,29 @@ error:
   LIBMEMCACHED_MEMCACHED_DELETE_END();
   return rc;
 }
+
+static inline memcached_return binary_delete(memcached_st *ptr, 
+                                             unsigned int server_key,
+                                             const char *key, 
+                                            size_t key_length,
+                                            int flush)
+{
+  protocol_binary_request_delete request= {0};
+
+  request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE;
+  request.message.header.request.keylen= htons((uint16_t)key_length);
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+  request.message.header.request.bodylen= htonl(key_length);
+  
+  if ((memcached_do(&ptr->hosts[server_key], request.bytes, 
+                    sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
+      (memcached_io_write(&ptr->hosts[server_key], key, 
+                          key_length, flush) == -1)) 
+  {
+    memcached_io_reset(&ptr->hosts[server_key]);
+    return MEMCACHED_WRITE_FAILURE;
+  }
+
+  return MEMCACHED_SUCCESS;
+}
index 671b9fa3f3b809f2a8850bdd47c699b53fca0654..7b9da5ed437ed3e38aa0d06d49145aaca0f9e010 100644 (file)
@@ -1,6 +1,6 @@
 #include "common.h"
 
-memcached_return memcached_do(memcached_server_st *ptr, const char *command, 
+memcached_return memcached_do(memcached_server_st *ptr, const void *command, 
                               size_t command_length, uint8_t with_flush)
 {
   memcached_return rc;
index 1511adbe43ce0913f09cd1c9ee335e280e4b4db9..f722cc904e6ea9835d4d8bbb1d40f004a1f1024b 100644 (file)
@@ -1,12 +1,30 @@
 #include "common.h"
 
+static memcached_return memcached_flush_binary(memcached_st *ptr, 
+                                               time_t expiration);
+static memcached_return memcached_flush_textual(memcached_st *ptr, 
+                                                time_t expiration);
+
 memcached_return memcached_flush(memcached_st *ptr, time_t expiration)
+{
+  memcached_return rc;
+
+  LIBMEMCACHED_MEMCACHED_FLUSH_START();
+  if (ptr->flags & MEM_BINARY_PROTOCOL)
+    rc= memcached_flush_binary(ptr, expiration);
+  else
+    rc= memcached_flush_textual(ptr, expiration);
+  LIBMEMCACHED_MEMCACHED_FLUSH_END();
+  return rc;
+}
+
+static memcached_return memcached_flush_textual(memcached_st *ptr, 
+                                                time_t expiration)
 {
   unsigned int x;
   size_t send_length;
   memcached_return rc;
   char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
-  LIBMEMCACHED_MEMCACHED_FLUSH_START();
 
   unlikely (ptr->number_of_hosts == 0)
     return MEMCACHED_NO_SERVERS;
@@ -26,6 +44,40 @@ memcached_return memcached_flush(memcached_st *ptr, time_t expiration)
       (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
   }
 
-  LIBMEMCACHED_MEMCACHED_FLUSH_END();
+  return MEMCACHED_SUCCESS;
+}
+
+static memcached_return memcached_flush_binary(memcached_st *ptr, 
+                                               time_t expiration)
+{
+  unsigned int x;
+  protocol_binary_request_flush request= {0};
+
+  unlikely (ptr->number_of_hosts == 0)
+    return MEMCACHED_NO_SERVERS;
+
+  request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
+  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
+  request.message.header.request.extlen= 4;
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+  request.message.header.request.bodylen= htonl(request.message.header.request.extlen);
+  request.message.body.expiration= htonl(expiration);
+
+  for (x= 0; x < ptr->number_of_hosts; x++)
+  {
+    if (memcached_do(&ptr->hosts[x], request.bytes, 
+                     sizeof(request.bytes), 1) != MEMCACHED_SUCCESS) 
+    {
+      memcached_io_reset(&ptr->hosts[x]);
+      return MEMCACHED_WRITE_FAILURE;
+    } 
+  }
+
+  for (x= 0; x < ptr->number_of_hosts; x++)
+  {
+    if (memcached_server_response_count(&ptr->hosts[x]) > 0)
+       (void)memcached_response(&ptr->hosts[x], NULL, 0, NULL);
+  }
+
   return MEMCACHED_SUCCESS;
 }
index 670b2550b52c9ece3c99854e112f7c03f6dde647..dcaa8988a257344a543c34cf39e0fe6963c714c5 100644 (file)
@@ -102,6 +102,11 @@ memcached_return memcached_mget(memcached_st *ptr,
   return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
 }
 
+static memcached_return binary_mget_by_key(memcached_st *ptr,
+                                           unsigned int master_server_key,
+                                           char **keys, size_t *key_length,
+                                           unsigned int number_of_keys);
+
 memcached_return memcached_mget_by_key(memcached_st *ptr, 
                                        const char *master_key, 
                                        size_t master_key_length,
@@ -159,6 +164,10 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
         (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
     }
   }
+  
+  if (ptr->flags & MEM_BINARY_PROTOCOL)
+    return binary_mget_by_key(ptr, master_server_key, keys, 
+                              key_length, number_of_keys);
 
   /* 
     If a server fails we warn about errors and start all over with sending keys
@@ -234,3 +243,90 @@ memcached_return memcached_mget_by_key(memcached_st *ptr,
   LIBMEMCACHED_MEMCACHED_MGET_END();
   return rc;
 }
+
+static memcached_return binary_mget_by_key(memcached_st *ptr, 
+                                           unsigned int master_server_key,
+                                           char **keys, size_t *key_length, 
+                                           unsigned int number_of_keys)
+{
+  memcached_return rc= MEMCACHED_NOTFOUND;
+
+  int flush= number_of_keys == 1;
+
+  /* 
+    If a server fails we warn about errors and start all over with sending keys
+    to the server.
+  */
+  for (int x= 0; x < number_of_keys; x++) 
+  {
+    unsigned int server_key;
+
+    if (master_server_key)
+      server_key= master_server_key;
+    else
+      server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
+
+    if (memcached_server_response_count(&ptr->hosts[server_key]) == 0) 
+    {
+      rc= memcached_connect(&ptr->hosts[server_key]);
+      if (rc != MEMCACHED_SUCCESS) 
+        continue;
+    }
+     
+    protocol_binary_request_getk request= {0};
+    request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+    if (number_of_keys == 1)
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
+    else
+      request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
+
+    request.message.header.request.keylen= htons((uint16_t)key_length[x]);
+    request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+    request.message.header.request.bodylen= htonl(key_length[x]);
+    
+    if ((memcached_io_write(&ptr->hosts[server_key], request.bytes,
+                            sizeof(request.bytes), 0) == -1) ||
+        (memcached_io_write(&ptr->hosts[server_key], keys[x], 
+                            key_length[x], flush) == -1)) 
+    {
+      memcached_server_response_reset(&ptr->hosts[server_key]);
+      rc= MEMCACHED_SOME_ERRORS;
+      continue;
+    }
+    memcached_server_response_increment(&ptr->hosts[server_key]);    
+  }
+
+  if (number_of_keys > 1) 
+  {
+    /*
+     * Send a noop command to flush the buffers
+     */
+    protocol_binary_request_noop request= {0};
+    request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
+    request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+    
+    for (int x= 0; x < ptr->number_of_hosts; x++)
+      if (memcached_server_response_count(&ptr->hosts[x])) 
+      {
+        if (memcached_io_write(&ptr->hosts[x], NULL, 0, 1) == -1) 
+        {
+          memcached_server_response_reset(&ptr->hosts[x]);
+          memcached_io_reset(&ptr->hosts[x]);
+          rc= MEMCACHED_SOME_ERRORS;
+        }
+
+        if (memcached_io_write(&ptr->hosts[x], request.bytes, 
+                              sizeof(request.bytes), 1) == -1) 
+        {
+          memcached_server_response_reset(&ptr->hosts[x]);
+          memcached_io_reset(&ptr->hosts[x]);
+          rc= MEMCACHED_SOME_ERRORS;
+        }
+        memcached_server_response_increment(&ptr->hosts[x]);    
+      }
+    }
+
+
+  return rc;
+}
index 93ca41732d4ca26741c2c905e10e9ea001795e15..ea8b25816d9d981fce5f4d40cc29f3cc0a84be2f 100644 (file)
@@ -75,7 +75,7 @@ void memcached_io_preread(memcached_st *ptr)
 #endif
 
 ssize_t memcached_io_read(memcached_server_st *ptr,
-                          char *buffer, size_t length)
+                          void *buffer, size_t length)
 {
   char *buffer_ptr;
 
@@ -154,11 +154,11 @@ ssize_t memcached_io_read(memcached_server_st *ptr,
       break;
   }
 
-  return (size_t)(buffer_ptr - buffer);
+  return (size_t)(buffer_ptr - (char*)buffer);
 }
 
 ssize_t memcached_io_write(memcached_server_st *ptr,
-                           const char *buffer, size_t length, char with_flush)
+                           const void *buffer, size_t length, char with_flush)
 {
   size_t original_length;
   const char* buffer_ptr;
index f7a725740c38e346757e061b8c4fae89dfe521f3..091405b1b549068f16bce6e50da801ee6506af70 100644 (file)
@@ -2,8 +2,8 @@
 #include <memcached.h>
 
 ssize_t memcached_io_write(memcached_server_st *ptr,
-                           const char *buffer, size_t length, char with_flush);
+                           const void *buffer, size_t length, char with_flush);
 void memcached_io_reset(memcached_server_st *ptr);
 ssize_t memcached_io_read(memcached_server_st *ptr,
-                          char *buffer, size_t length);
+                          void *buffer, size_t length);
 memcached_return memcached_io_close(memcached_server_st *ptr, uint8_t io_death);
index 9749b848306cafaae7b15c693196dd728e0cedf0..a3d114b3e1984a9d23c5f768e425e8f061583627 100644 (file)
@@ -19,9 +19,18 @@ void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death)
       ssize_t read_length;
       char buffer[MEMCACHED_MAX_BUFFER];
 
-      rc= memcached_do(ptr, "quit\r\n", 6, 1);
-      WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED);
+      if (ptr->root->flags & MEM_BINARY_PROTOCOL) 
+      {
+        protocol_binary_request_quit request = {0};
+        request.message.header.request.magic = PROTOCOL_BINARY_REQ;
+        request.message.header.request.opcode = PROTOCOL_BINARY_CMD_QUIT;
+        request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
+        rc= memcached_do(ptr, request.bytes, sizeof(request.bytes), 1);
+      } else 
+        rc= memcached_do(ptr, "quit\r\n", 6, 1);
 
+      WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED);
+      
       /* read until socket is closed, or there is an error
        * closing the socket before all data is read
        * results in server throwing away all data which is
index 6f9e7c3341caa1be24830b6ec622ecae6ce7c779..2c178e4948bb6995dd34324abdc91f4db63eef1f 100644 (file)
@@ -8,6 +8,10 @@
 #include "common.h"
 #include "memcached_io.h"
 
+static memcached_return binary_response(memcached_server_st *ptr, 
+                                        char *buffer, size_t buffer_length,
+                                        memcached_result_st *result);
+
 memcached_return memcached_response(memcached_server_st *ptr, 
                                     char *buffer, size_t buffer_length,
                                     memcached_result_st *result)
@@ -34,6 +38,9 @@ memcached_return memcached_response(memcached_server_st *ptr,
   if (ptr->root->flags & MEM_NO_BLOCK)
     (void)memcached_io_write(ptr, NULL, 0, 1);
 
+  if (ptr->root->flags & MEM_BINARY_PROTOCOL)
+    return binary_response(ptr, buffer, buffer_length, result);
+  
   max_messages= memcached_server_response_count(ptr);
   for (x= 0; x <  max_messages; x++)
   {
@@ -179,3 +186,195 @@ size_t memcached_result_length(memcached_result_st *ptr)
   memcached_string_st *sptr= &ptr->value;
   return memcached_string_length(sptr);
 }
+
+/**
+ * Read a given number of bytes from the server and place it into a specific
+ * buffer. Reset the IO channel or this server if an error occurs. 
+ */
+static memcached_return safe_read(memcached_server_st *ptr, void *dta, 
+                                  size_t size) 
+{
+  int offset= 0;
+  char *data= dta;
+  while (offset < size) 
+  {
+    ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
+    if (nread <= 0) 
+    {
+      memcached_io_reset(ptr);
+      return MEMCACHED_UNKNOWN_READ_FAILURE;
+    }
+    offset += nread;
+  }
+   
+  return MEMCACHED_SUCCESS;
+}
+
+static memcached_return binary_response(memcached_server_st *ptr,
+                                        char *buffer,
+                                        size_t buffer_length,
+                                        memcached_result_st *result) 
+{
+  protocol_binary_response_header header;
+  memcached_server_response_decrement(ptr);
+   
+  unlikely (safe_read(ptr, &header.bytes, 
+                      sizeof(header.bytes)) != MEMCACHED_SUCCESS)
+    return MEMCACHED_UNKNOWN_READ_FAILURE;
+
+  unlikely (header.response.magic != PROTOCOL_BINARY_RES) 
+  {
+    memcached_io_reset(ptr);
+    return MEMCACHED_PROTOCOL_ERROR;
+  }
+
+  /*
+  ** Convert the header to host local endian!
+  */
+  header.response.keylen= ntohs(header.response.keylen);
+  header.response.status= ntohs(header.response.status);
+  header.response.bodylen= ntohl(header.response.bodylen);
+  header.response.cas= ntohll(header.response.cas);
+  uint32_t bodylen= header.response.bodylen;
+
+  if (header.response.status == 0) 
+  {
+    if ((header.response.opcode == PROTOCOL_BINARY_CMD_GETK) ||
+        (header.response.opcode == PROTOCOL_BINARY_CMD_GETKQ)) 
+    {
+      uint16_t keylen= header.response.keylen;
+      memcached_result_reset(result);
+      result->cas= header.response.cas;
+
+      if (safe_read(ptr, &result->flags,
+                    sizeof(result->flags)) != MEMCACHED_SUCCESS) 
+      {
+        return MEMCACHED_UNKNOWN_READ_FAILURE;
+      }
+      result->flags= ntohl(result->flags);
+      bodylen -= header.response.extlen;
+            
+      result->key_length= keylen;
+      if (safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS) 
+      {
+        return MEMCACHED_UNKNOWN_READ_FAILURE;
+      }
+
+      bodylen -= keylen;
+      if (memcached_string_check(&result->value,
+                                 bodylen) != MEMCACHED_SUCCESS) 
+      {
+        memcached_io_reset(ptr);
+        return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+      }
+
+      char *vptr= memcached_string_value(&result->value);
+      if (safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS) 
+      {
+        return MEMCACHED_UNKNOWN_READ_FAILURE;
+      }
+
+      memcached_string_set_length(&result->value, bodylen);  
+    } 
+    else if ((header.response.opcode == PROTOCOL_BINARY_CMD_INCREMENT) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_DECREMENT)) 
+    {
+      if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t)) 
+      {
+        return MEMCACHED_PROTOCOL_ERROR;
+      }
+
+      WATCHPOINT_ASSERT(bodylen == buffer_length);
+      uint64_t val;
+      if (safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS) 
+      {
+        return MEMCACHED_UNKNOWN_READ_FAILURE;
+      }
+
+      val= ntohll(val);
+      memcpy(buffer, &val, sizeof(val));
+    } 
+    else if (header.response.opcode == PROTOCOL_BINARY_CMD_VERSION) 
+    {
+      memset(buffer, 0, buffer_length);
+      if (bodylen >= buffer_length)
+       /* not enough space in buffer.. should not happen... */
+       return MEMCACHED_UNKNOWN_READ_FAILURE;
+      else 
+        safe_read(ptr, buffer, bodylen);
+    } 
+    else if ((header.response.opcode == PROTOCOL_BINARY_CMD_FLUSH) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_QUIT) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_SET) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_ADD) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_REPLACE) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_APPEND) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_PREPEND) ||
+            (header.response.opcode == PROTOCOL_BINARY_CMD_DELETE)) 
+    {
+       WATCHPOINT_ASSERT(bodylen == 0);
+       return MEMCACHED_SUCCESS;
+    } 
+    else if (header.response.opcode == PROTOCOL_BINARY_CMD_NOOP) 
+    {
+       WATCHPOINT_ASSERT(bodylen == 0);
+       return MEMCACHED_END;
+    }
+    else if (header.response.opcode == PROTOCOL_BINARY_CMD_STAT) 
+    {
+       if (bodylen == 0)
+          return MEMCACHED_END;
+       else if (bodylen + 1 > buffer_length)
+          /* not enough space in buffer.. should not happen... */
+          return MEMCACHED_UNKNOWN_READ_FAILURE;
+       else 
+       {
+          size_t keylen= header.response.keylen;            
+          memset(buffer, 0, buffer_length);
+          safe_read(ptr, buffer, keylen);
+          safe_read(ptr, buffer + keylen + 1, bodylen - keylen);
+       }
+    } 
+    else 
+    {
+       /* Command not implemented yet! */
+       WATCHPOINT_ASSERT(0);
+       memcached_io_reset(ptr);
+       return MEMCACHED_PROTOCOL_ERROR;
+    }        
+  } 
+  else if (header.response.bodylen) 
+  {
+     /* What should I do with the error message??? just discard it for now */
+    char hole[1024];
+    while (bodylen > 0) 
+    {
+      size_t nr= (bodylen > sizeof(hole)) ? sizeof(hole) : bodylen;
+      safe_read(ptr, hole, nr);
+      bodylen -= nr;
+    }
+  }
+
+  memcached_return rc= MEMCACHED_SUCCESS;
+  unlikely(header.response.status != 0) 
+    switch (header.response.status) 
+    {
+    case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
+      rc= MEMCACHED_NOTFOUND;
+      break;
+    case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
+      rc= MEMCACHED_DATA_EXISTS;
+      break;
+    case PROTOCOL_BINARY_RESPONSE_E2BIG:
+    case PROTOCOL_BINARY_RESPONSE_EINVAL:
+    case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
+    case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
+    case PROTOCOL_BINARY_RESPONSE_ENOMEM:
+    default:
+      /* @todo fix the error mappings */
+      rc= MEMCACHED_PROTOCOL_ERROR;
+      break;
+    }
+    
+  return rc;
+}
index bbc82b6e0dad306307816c0159bb2dbb7bfcd4e9..f8cfdbda0f45a4540f95fdb7440b2499ea32635f 100644 (file)
@@ -211,7 +211,69 @@ char *memcached_stat_get_value(memcached_st *ptr, memcached_stat_st *stat,
   return ret;
 }
 
-static memcached_return memcached_stats_fetch(memcached_st *ptr,
+static memcached_return binary_stats_fetch(memcached_st *ptr,
+                                           memcached_stat_st *stat,
+                                           char *args,
+                                           unsigned int server_key)
+{
+  memcached_return rc;
+
+  char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+  protocol_binary_request_stats request= {0};
+  request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_STAT;
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+  if (args != NULL) 
+  {
+    int len= strlen(args);
+    request.message.header.request.keylen= htons((uint16_t)len);
+    request.message.header.request.bodylen= htonl(len);
+      
+    if ((memcached_do(&ptr->hosts[server_key], request.bytes, 
+                      sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
+        (memcached_io_write(&ptr->hosts[server_key], args, len, 1) == -1)) 
+    {
+      memcached_io_reset(&ptr->hosts[server_key]);
+      return MEMCACHED_WRITE_FAILURE;
+    }
+  }
+  else
+  {
+    if (memcached_do(&ptr->hosts[server_key], request.bytes, 
+                     sizeof(request.bytes), 1) != MEMCACHED_SUCCESS) 
+    {
+      memcached_io_reset(&ptr->hosts[server_key]);
+      return MEMCACHED_WRITE_FAILURE;
+    }
+  }
+
+  memcached_server_response_decrement(&ptr->hosts[server_key]);  
+  do 
+  {
+     rc= memcached_response(&ptr->hosts[server_key], buffer, 
+                             sizeof(buffer), NULL);
+     if (rc == MEMCACHED_END)
+        break;
+     
+     unlikely (rc != MEMCACHED_SUCCESS) 
+     {
+        memcached_io_reset(&ptr->hosts[server_key]);
+        return rc;
+     }
+     
+     set_data(stat, buffer, buffer + strlen(buffer) + 1);
+  } while (1);
+  
+  /* shit... memcached_response will decrement the counter, so I need to
+  ** reset it.. todo: look at this and try to find a better solution.
+  */
+  ptr->hosts[server_key].cursor_active= 0;
+
+  return MEMCACHED_SUCCESS;
+}
+
+static memcached_return ascii_stats_fetch(memcached_st *ptr,
                                               memcached_stat_st *stat,
                                               char *args,
                                               unsigned int server_key)
@@ -294,8 +356,12 @@ memcached_stat_st *memcached_stat(memcached_st *ptr, char *args, memcached_retur
   for (x= 0; x < ptr->number_of_hosts; x++)
   {
     memcached_return temp_return;
-
-    temp_return= memcached_stats_fetch(ptr, stats + x, args, x);
+    
+    if (ptr->flags & MEM_BINARY_PROTOCOL)
+      temp_return= binary_stats_fetch(ptr, stats + x, args, x);
+    else
+      temp_return= ascii_stats_fetch(ptr, stats + x, args, x);
+    
     if (temp_return != MEMCACHED_SUCCESS)
       rc= MEMCACHED_SOME_ERRORS;
   }
@@ -314,7 +380,10 @@ memcached_return memcached_stat_servername(memcached_stat_st *stat, char *args,
 
   memcached_server_add(&memc, hostname, port);
 
-  rc= memcached_stats_fetch(&memc, stat, args, 0);
+  if (memc.flags & MEM_BINARY_PROTOCOL)
+    rc= binary_stats_fetch(&memc, stat, args, 0);
+  else
+    rc= ascii_stats_fetch(&memc, stat, args, 0);
 
   memcached_free(&memc);
 
index 04f281cc898417866159ed2af239a7f78eb43dd4..326f6fe6f723bbe027902ad94f94ae5fef40662a 100644 (file)
@@ -6,7 +6,6 @@
   memcached_add()
 
 */
-
 #include "common.h"
 #include "memcached_io.h"
 
@@ -43,6 +42,16 @@ static char *storage_op_string(memcached_storage_action verb)
   return SET_OP;
 }
 
+static memcached_return memcached_send_binary(memcached_server_st* server, 
+                                              const char *key, 
+                                              size_t key_length, 
+                                              const char *value, 
+                                              size_t value_length, 
+                                              time_t expiration,
+                                              uint32_t flags,
+                                              uint64_t cas,
+                                              memcached_storage_action verb);
+
 static inline memcached_return memcached_send(memcached_st *ptr, 
                                               const char *master_key, size_t master_key_length, 
                                               const char *key, size_t key_length, 
@@ -72,6 +81,11 @@ static inline memcached_return memcached_send(memcached_st *ptr,
 
   server_key= memcached_generate_hash(ptr, master_key, master_key_length);
 
+  if (ptr->flags & MEM_BINARY_PROTOCOL)
+    return memcached_send_binary(&ptr->hosts[server_key], key, key_length, 
+                                 value, value_length, expiration, 
+                                 flags, cas, verb);
+
   if (cas)
     write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 
                            "%s %s%.*s %u %llu %zu %llu\r\n", storage_op_string(verb),
@@ -129,6 +143,7 @@ error:
   return rc;
 }
 
+
 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length, 
                                const char *value, size_t value_length, 
                                time_t expiration,
@@ -303,3 +318,74 @@ memcached_return memcached_cas_by_key(memcached_st *ptr,
                      expiration, flags, cas, CAS_OP);
   return rc;
 }
+
+static memcached_return memcached_send_binary(memcached_server_st* server, 
+                                              const char *key, 
+                                              size_t key_length, 
+                                              const char *value, 
+                                              size_t value_length, 
+                                              time_t expiration,
+                                              uint32_t flags,
+                                              uint64_t cas,
+                                              memcached_storage_action verb)
+{
+  protocol_binary_request_set request= {0};
+  size_t send_length= sizeof(request.bytes);
+
+  request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+  switch (verb) 
+  {
+  case SET_OP:
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SET;
+    break;
+  case ADD_OP:
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_ADD;
+    break;
+  case REPLACE_OP:
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
+    break;
+  case APPEND_OP:
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_APPEND;
+    break;
+  case PREPEND_OP:
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_PREPEND;
+    break;
+  case CAS_OP:
+    request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
+      break;
+  default:
+    abort();
+  }
+
+  request.message.header.request.keylen= htons((uint16_t)key_length);
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+  if (verb == APPEND_OP || verb == PREPEND_OP)
+    send_length -= 8; /* append & prepend does not contain extras! */
+  else {
+    request.message.header.request.extlen= 8;
+    request.message.body.flags= htonl(flags);   
+    request.message.body.expiration= htonl((uint32_t)expiration);
+  }
+  
+  request.message.header.request.bodylen= htonl(key_length + value_length + 
+                                               request.message.header.request.extlen);
+  
+  if (cas)
+    request.message.header.request.cas= htonll(cas);
+  
+  char flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
+  /* write the header */
+  if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
+      (memcached_io_write(server, key, key_length, 0) == -1) ||
+      (memcached_io_write(server, value, value_length, flush) == -1)) 
+  {
+    memcached_io_reset(server);
+    return MEMCACHED_WRITE_FAILURE;
+  }
+
+  if (flush == 0)
+    return MEMCACHED_BUFFERED;
+  
+  return memcached_response(server, NULL, 0, NULL);   
+}
+
index e49d76f0b2e87c8e510ed5e74025f6edc0aa98b8..182ff14e316da66983f75cc4625379645c67f622 100644 (file)
@@ -5,7 +5,18 @@ const char * memcached_lib_version(void)
   return LIBMEMCACHED_VERSION_STRING;
 }
 
+static inline memcached_return memcached_version_binary(memcached_st *ptr);
+static inline memcached_return memcached_version_textual(memcached_st *ptr);
+
 memcached_return memcached_version(memcached_st *ptr)
+{
+   if (ptr->flags & MEM_BINARY_PROTOCOL)
+     return memcached_version_binary(ptr);
+   else
+     return memcached_version_textual(ptr);      
+}
+
+static inline memcached_return memcached_version_textual(memcached_st *ptr)
 {
   unsigned int x;
   size_t send_length;
@@ -47,3 +58,48 @@ memcached_return memcached_version(memcached_st *ptr)
 
   return rc;
 }
+
+static inline memcached_return memcached_version_binary(memcached_st *ptr)
+{
+  memcached_return rc;
+  unsigned int x;
+  protocol_binary_request_version request= {0};
+  request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+  request.message.header.request.opcode= PROTOCOL_BINARY_CMD_VERSION;
+  request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+  rc= MEMCACHED_SUCCESS;
+  for (x= 0; x < ptr->number_of_hosts; x++) 
+  {
+    memcached_return rrc;
+
+    rrc= memcached_do(&ptr->hosts[x], request.bytes, sizeof(request.bytes), 1);
+    if (rrc != MEMCACHED_SUCCESS) 
+    {
+      memcached_io_reset(&ptr->hosts[x]);
+      rc= MEMCACHED_SOME_ERRORS;
+      continue;
+    }
+  }
+
+  for (x= 0; x < ptr->number_of_hosts; x++) 
+    if (memcached_server_response_count(&ptr->hosts[x]) > 0) 
+    {
+      memcached_return rrc;
+      char buffer[32];
+      char *p;
+
+      rrc= memcached_response(&ptr->hosts[x], buffer, sizeof(buffer), NULL);
+      if (rrc != MEMCACHED_SUCCESS) 
+      {
+        memcached_io_reset(&ptr->hosts[x]);
+        rc= MEMCACHED_SOME_ERRORS;
+      }
+
+      ptr->hosts[x].major_version= (uint8_t)strtol(buffer, &p, 10);
+      ptr->hosts[x].minor_version= (uint8_t)strtol(p + 1, &p, 10);
+      ptr->hosts[x].micro_version= (uint8_t)strtol(p + 1, NULL, 10);
+    }
+
+  return rc;
+}
index 0b2c15d480035539fedb8bb3a2deadf9cb76552e..e37aabef4e488cd61faac157f58bcc6cb2978b47 100644 (file)
@@ -504,7 +504,7 @@ test_return add_test(memcached_st *memc)
   if (setting_value)
     assert(rc == MEMCACHED_NOTSTORED || rc == MEMCACHED_STORED);
   else
-    assert(rc == MEMCACHED_NOTSTORED);
+    assert(rc == MEMCACHED_NOTSTORED || rc == MEMCACHED_DATA_EXISTS);
 
   return 0;
 }
@@ -2594,6 +2594,19 @@ memcached_return pre_behavior_ketama_weighted(memcached_st *memc)
   assert(value == MEMCACHED_HASH_MD5);
   return MEMCACHED_SUCCESS;
 }
+
+memcached_return pre_binary(memcached_st *memc)
+{
+  memcached_return rc;
+
+  rc = memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
+  assert(rc == MEMCACHED_SUCCESS);
+
+  assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) == 1);
+
+  return MEMCACHED_SUCCESS;
+
+}
 void my_free(memcached_st *ptr, void *mem)
 {
   free(mem);
@@ -2736,8 +2749,8 @@ memcached_return enable_cas(memcached_st *memc)
   memcached_version(memc);
 
   if (memc->hosts[0].major_version >= 1 &&
-      memc->hosts[0].minor_version >= 2 &&
-      memc->hosts[0].micro_version >= 4)
+      (memc->hosts[0].minor_version == 2 && 
+       memc->hosts[0].micro_version >= 4) || memc->hosts[0].minor_version > 2)
   {
     memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set);
 
@@ -2752,8 +2765,8 @@ memcached_return check_for_1_2_3(memcached_st *memc)
   memcached_version(memc);
 
   if (memc->hosts[0].major_version >= 1 &&
-      memc->hosts[0].minor_version >= 2 &&
-      memc->hosts[0].micro_version >= 4)
+      (memc->hosts[0].minor_version == 2 && memc->hosts[0].micro_version >= 4) 
+      || memc->hosts[0].minor_version > 2)
     return MEMCACHED_SUCCESS;
 
   return MEMCACHED_FAILURE;
@@ -2946,6 +2959,7 @@ test_st consistent_weighted_tests[] ={
 
 collection_st collection[] ={
   {"block", 0, 0, tests},
+  {"binary", pre_binary, 0, tests},
   {"nonblock", pre_nonblock, 0, tests},
   {"nodelay", pre_nodelay, 0, tests},
   {"md5", pre_md5, 0, tests},