OPT_SLAP_TCP_NODELAY,
OPT_FLUSH,
OPT_HASH,
+ OPT_BINARY,
} memcached_options;
#endif /* CLIENT_OPTIONS */
/* Prototypes */
void options_parse(int argc, char *argv[]);
+static int opt_binary= 0;
static int opt_verbose= 0;
static int opt_displayflag= 0;
static char *opt_servers= NULL;
memcached_server_push(memc, servers);
memcached_server_list_free(servers);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
while (optind < argc)
{
{"servers", required_argument, NULL, OPT_SERVERS},
{"flag", no_argument, &opt_displayflag, OPT_FLAG},
{"hash", required_argument, NULL, OPT_HASH},
+ {"binary", no_argument, NULL, OPT_BINARY},
{0, 0, 0, 0},
};
{
case 0:
break;
+ case OPT_BINARY:
+ opt_binary = 1;
+ break;
case OPT_VERBOSE: /* --verbose or -v */
opt_verbose = OPT_VERBOSE;
break;
/* Prototypes */
void options_parse(int argc, char *argv[]);
+static int opt_binary=0;
static int opt_verbose= 0;
static char *opt_servers= NULL;
static char *opt_hash= NULL;
memcached_server_push(memc, servers);
memcached_server_list_free(servers);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
while (optind < argc)
{
{"add", no_argument, NULL, OPT_ADD},
{"replace", no_argument, NULL, OPT_REPLACE},
{"hash", required_argument, NULL, OPT_HASH},
+ {"binary", no_argument, NULL, OPT_BINARY},
{0, 0, 0, 0},
};
{
case 0:
break;
+ case OPT_BINARY:
+ opt_binary = 1;
+ break;
case OPT_VERBOSE: /* --verbose or -v */
opt_verbose = OPT_VERBOSE;
break;
#include "client_options.h"
#include "utilities.h"
+static int opt_binary= 0;
static int opt_verbose= 0;
static time_t opt_expire= 0;
static char *opt_servers= NULL;
servers= memcached_servers_parse(opt_servers);
memcached_server_push(memc, servers);
memcached_server_list_free(servers);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
rc = memcached_flush(memc, opt_expire);
if (rc != MEMCACHED_SUCCESS)
{"debug", no_argument, &opt_verbose, OPT_DEBUG},
{"servers", required_argument, NULL, OPT_SERVERS},
{"expire", required_argument, NULL, OPT_EXPIRE},
+ {"binary", no_argument, NULL, OPT_BINARY},
{0, 0, 0, 0},
};
int option_index= 0;
{
case 0:
break;
+ case OPT_BINARY:
+ opt_binary = 1;
+ break;
case OPT_VERBOSE: /* --verbose or -v */
opt_verbose = OPT_VERBOSE;
break;
#include "client_options.h"
#include "utilities.h"
+static int opt_binary= 0;
static int opt_verbose= 0;
static time_t opt_expire= 0;
static char *opt_servers= NULL;
servers= memcached_servers_parse(opt_servers);
memcached_server_push(memc, servers);
memcached_server_list_free(servers);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
while (optind < argc)
{
{"servers", required_argument, NULL, OPT_SERVERS},
{"expire", required_argument, NULL, OPT_EXPIRE},
{"hash", required_argument, NULL, OPT_HASH},
+ {"binary", no_argument, NULL, OPT_BINARY},
{0, 0, 0, 0},
};
int option_index= 0;
{
case 0:
break;
+ case OPT_BINARY:
+ opt_binary = 1;
+ break;
case OPT_VERBOSE: /* --verbose or -v */
opt_verbose = OPT_VERBOSE;
break;
unsigned int *actual_loaded);
void flush_all(memcached_st *memc);
+static int opt_binary= 0;
static int opt_verbose= 0;
static int opt_flush= 0;
static int opt_non_blocking_io= 0;
memc= memcached_create(NULL);
memcached_server_push(memc, servers);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, opt_binary);
+
if (opt_flush)
flush_all(memc);
if (opt_createial_load)
{"test", required_argument, NULL, OPT_SLAP_TEST},
{"verbose", no_argument, &opt_verbose, OPT_VERBOSE},
{"version", no_argument, NULL, OPT_VERSION},
+ {"binary", no_argument, NULL, OPT_BINARY},
{0, 0, 0, 0},
};
{
case 0:
break;
+ case OPT_BINARY:
+ opt_binary = 1;
+ break;
case OPT_VERBOSE: /* --verbose or -v */
opt_verbose = OPT_VERBOSE;
break;
--- /dev/null
+AC_RUN_IFELSE([
+ AC_LANG_PROGRAM([
+#include <sys/types.h>
+#include <netinet/in.h>
+#include <inttypes.h>
+ ], [
+if (htonl(5) != 5) {
+ return 1;
+}
+ ])
+ ], AC_DEFINE([BYTEORDER_BIG_ENDIAN], [1], [Enable big endian byteorder]))
--- /dev/null
+dnl ---------------------------------------------------------------------------
+dnl Macro: MEMCACHED_TEST
+dnl ---------------------------------------------------------------------------
+saved_CPPFLAGS="$CPPFLAGS"
+CPPFLAGS=""
+AC_ARG_WITH(memcached,
+ [ --with-memcached=PATH Specify path to memcached installation ],
+ [
+ if test "x$withval" != "xno" ; then
+ CPPFLAGS="-I${withval}/include"
+ fi
+ ]
+)
+
+AC_CHECK_HEADER(memcached/protocol_binary.h, [
+ AC_RUN_IFELSE([
+ AC_LANG_PROGRAM([
+ #include "memcached/protocol_binary.h"
+ ], [
+ protocol_binary_request_set request;
+ if (sizeof(request) != sizeof(request.bytes)) {
+ return 1;
+ }
+ ])
+ ],[
+ if test "x$CPPFLAGS" != "x" ; then
+ CPPFLAGS="$saved_CPPFLAGS $CPPFLAGS"
+ else
+ CPPFLAGS="$saved_CPPFLAGS"
+ fi
+ ], AC_MSG_ERROR([Unsupported struct padding done by compiler.]))
+ ], [
+ AC_MSG_ERROR([Cannot locate memcached/protocol_binary.])
+ ])
+
+dnl ---------------------------------------------------------------------------
+dnl End Macro: MEMCACHED_TEST
+dnl ---------------------------------------------------------------------------
sinclude(config/debug.m4)
sinclude(config/dtrace.m4)
+sinclude(config/memcached.m4)
+sinclude(config/byteorder.m4)
sinclude(config/64bit.m4)
# We only support GCC and Sun's forte at the moment
In non-blocking mode this changes the value of the timeout during socket
connection.
+=item MEMCACHED_BEHAVIOR_BINARY_PROTOCOL
+
+Enable the use of the binary protocol. Please note that you cannot toggle
+this flag on an open connection.
+
=back
=head1 RETURN
memcached_strerror.c \
memcached_verbosity.c \
memcached_version.c \
- murmur_hash.c
+ murmur_hash.c \
+ byteorder.c
libmemcached_la_LIBADD =
libmemcached_la_LDFLAGS = -version-info $(MEMCACHED_LIBRARY_VERSION)
--- /dev/null
+#include "common.h"
+
+/* Byte swap a 64-bit number. */
+static inline uint64_t swap64(uint64_t in) {
+#ifndef BYTEORDER_BIG_ENDIAN
+ /* Little endian, flip the bytes around until someone makes a faster/better
+ * way to do this. */
+ uint64_t rv= 0;
+ int i= 0;
+ for(i= 0; i < 8; i++)
+ {
+ rv= (rv << 8) | (in & 0xff);
+ in >>= 8;
+ }
+ return rv;
+#else
+ /* big-endian machines don't need byte swapping */
+ return in;
+#endif
+}
+
+uint64_t ntohll(uint64_t value) {
+ return swap64(value);
+}
+
+uint64_t htonll(uint64_t value) {
+ return swap64(value);
+}
#include <memcached.h>
#include "memcached_io.h"
+#include "memcached/protocol_binary.h"
#include <libmemcached_config.h>
#if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96)
MEM_VERIFY_KEY= (1 << 10),
/* 11 used for weighted ketama */
MEM_KETAMA_WEIGHTED= (1 << 11),
+ MEM_BINARY_PROTOCOL= (1 << 12),
} memcached_flags;
/* Hashing algo */
#define memcached_server_response_decrement(A) (A)->cursor_active--
#define memcached_server_response_reset(A) (A)->cursor_active=0
-memcached_return memcached_do(memcached_server_st *ptr, const char *commmand,
+memcached_return memcached_do(memcached_server_st *ptr, const void *commmand,
size_t command_length, uint8_t with_flush);
memcached_return memcached_version(memcached_st *ptr);
memcached_return value_fetch(memcached_server_st *ptr,
memcached_return run_distribution(memcached_st *ptr);
uint32_t generate_hash(memcached_st *ptr, const char *key, size_t key_length);
+
+extern uint64_t ntohll(uint64_t);
+extern uint64_t htonll(uint64_t);
+
#endif /* __COMMON_H__ */
return rc;
}
+static memcached_return binary_incr_decr(memcached_st *ptr, uint8_t cmd,
+ const char *key, size_t key_length,
+ uint32_t offset, uint64_t *value)
+{
+ unsigned int server_key;
+
+ unlikely (key_length == 0)
+ return MEMCACHED_NO_KEY_PROVIDED;
+
+ unlikely (ptr->hosts == NULL || ptr->number_of_hosts == 0)
+ return MEMCACHED_NO_SERVERS;
+
+ if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
+ return MEMCACHED_BAD_KEY_PROVIDED;
+
+ server_key= memcached_generate_hash(ptr, key, key_length);
+
+ protocol_binary_request_incr request= {0};
+
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode= cmd;
+ request.message.header.request.keylen= htons((uint16_t)key_length);
+ request.message.header.request.extlen= 20;
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+ request.message.header.request.bodylen= htonl(key_length + request.message.header.request.extlen);
+ request.message.body.delta= htonll(offset);
+
+ /* TODO: The binary protocol allows you to specify initial and expiry time */
+ if ((memcached_do(&ptr->hosts[server_key], request.bytes,
+ sizeof(request.bytes), 0)!=MEMCACHED_SUCCESS) ||
+ (memcached_io_write(&ptr->hosts[server_key], key, key_length, 1) == -1))
+ {
+ memcached_io_reset(&ptr->hosts[server_key]);
+ return MEMCACHED_WRITE_FAILURE;
+ }
+
+ return memcached_response(&ptr->hosts[server_key], value, sizeof(*value), NULL);
+}
+
memcached_return memcached_increment(memcached_st *ptr,
const char *key, size_t key_length,
uint32_t offset,
memcached_return rc;
LIBMEMCACHED_MEMCACHED_INCREMENT_START();
- rc= memcached_auto(ptr, "incr", key, key_length, offset, value);
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_INCREMENT, key,
+ key_length, offset, value);
+ else
+ rc= memcached_auto(ptr, "incr", key, key_length, offset, value);
+
LIBMEMCACHED_MEMCACHED_INCREMENT_END();
return rc;
memcached_return rc;
LIBMEMCACHED_MEMCACHED_DECREMENT_START();
- rc= memcached_auto(ptr, "decr", key, key_length, offset, value);
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ rc= binary_incr_decr(ptr, PROTOCOL_BINARY_CMD_DECREMENT, key,
+ key_length, offset, value);
+ else
+ rc= memcached_auto(ptr, "decr", key, key_length, offset, value);
+
LIBMEMCACHED_MEMCACHED_DECREMENT_END();
return rc;
{
switch (flag)
{
+ case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
+ set_behavior_flag(ptr, MEM_BINARY_PROTOCOL, data);
+ break;
case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
set_behavior_flag(ptr, MEM_SUPPORT_CAS, data);
break;
switch (flag)
{
+ case MEMCACHED_BEHAVIOR_BINARY_PROTOCOL:
+ temp_flag= MEM_BINARY_PROTOCOL;
+ break;
case MEMCACHED_BEHAVIOR_SUPPORT_CAS:
temp_flag= MEM_SUPPORT_CAS;
break;
MEMCACHED_BEHAVIOR_RETRY_TIMEOUT,
MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED,
MEMCACHED_BEHAVIOR_KETAMA_HASH,
+ MEMCACHED_BEHAVIOR_BINARY_PROTOCOL,
} memcached_behavior;
typedef enum {
key, key_length, expiration);
}
+static inline memcached_return binary_delete(memcached_st *ptr,
+ unsigned int server_key,
+ const char *key,
+ size_t key_length,
+ int flush);
+
memcached_return memcached_delete_by_key(memcached_st *ptr,
const char *master_key, size_t master_key_length,
const char *key, size_t key_length,
return MEMCACHED_NO_SERVERS;
server_key= memcached_generate_hash(ptr, master_key, master_key_length);
-
- if (expiration)
- send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "delete %s%.*s %llu\r\n",
- ptr->prefix_key,
- (int)key_length, key,
- (unsigned long long)expiration);
- else
- send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
- "delete %s%.*s\r\n",
- ptr->prefix_key,
- (int)key_length, key);
-
- if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
+ to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
+
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ rc= binary_delete(ptr, server_key, key, key_length, to_write);
+ else
{
- rc= MEMCACHED_WRITE_FAILURE;
- goto error;
+ if (expiration)
+ send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
+ "delete %s%.*s %llu\r\n",
+ ptr->prefix_key,
+ (int)key_length, key,
+ (unsigned long long)expiration);
+ else
+ send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
+ "delete %s%.*s\r\n",
+ ptr->prefix_key,
+ (int)key_length, key);
+
+ if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ goto error;
+ }
+
+ rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
}
- to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
-
- rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
if (rc != MEMCACHED_SUCCESS)
goto error;
LIBMEMCACHED_MEMCACHED_DELETE_END();
return rc;
}
+
+static inline memcached_return binary_delete(memcached_st *ptr,
+ unsigned int server_key,
+ const char *key,
+ size_t key_length,
+ int flush)
+{
+ protocol_binary_request_delete request= {0};
+
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_DELETE;
+ request.message.header.request.keylen= htons((uint16_t)key_length);
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+ request.message.header.request.bodylen= htonl(key_length);
+
+ if ((memcached_do(&ptr->hosts[server_key], request.bytes,
+ sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
+ (memcached_io_write(&ptr->hosts[server_key], key,
+ key_length, flush) == -1))
+ {
+ memcached_io_reset(&ptr->hosts[server_key]);
+ return MEMCACHED_WRITE_FAILURE;
+ }
+
+ return MEMCACHED_SUCCESS;
+}
#include "common.h"
-memcached_return memcached_do(memcached_server_st *ptr, const char *command,
+memcached_return memcached_do(memcached_server_st *ptr, const void *command,
size_t command_length, uint8_t with_flush)
{
memcached_return rc;
#include "common.h"
+static memcached_return memcached_flush_binary(memcached_st *ptr,
+ time_t expiration);
+static memcached_return memcached_flush_textual(memcached_st *ptr,
+ time_t expiration);
+
memcached_return memcached_flush(memcached_st *ptr, time_t expiration)
+{
+ memcached_return rc;
+
+ LIBMEMCACHED_MEMCACHED_FLUSH_START();
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ rc= memcached_flush_binary(ptr, expiration);
+ else
+ rc= memcached_flush_textual(ptr, expiration);
+ LIBMEMCACHED_MEMCACHED_FLUSH_END();
+ return rc;
+}
+
+static memcached_return memcached_flush_textual(memcached_st *ptr,
+ time_t expiration)
{
unsigned int x;
size_t send_length;
memcached_return rc;
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- LIBMEMCACHED_MEMCACHED_FLUSH_START();
unlikely (ptr->number_of_hosts == 0)
return MEMCACHED_NO_SERVERS;
(void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
}
- LIBMEMCACHED_MEMCACHED_FLUSH_END();
+ return MEMCACHED_SUCCESS;
+}
+
+static memcached_return memcached_flush_binary(memcached_st *ptr,
+ time_t expiration)
+{
+ unsigned int x;
+ protocol_binary_request_flush request= {0};
+
+ unlikely (ptr->number_of_hosts == 0)
+ return MEMCACHED_NO_SERVERS;
+
+ request.message.header.request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_FLUSH;
+ request.message.header.request.extlen= 4;
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+ request.message.header.request.bodylen= htonl(request.message.header.request.extlen);
+ request.message.body.expiration= htonl(expiration);
+
+ for (x= 0; x < ptr->number_of_hosts; x++)
+ {
+ if (memcached_do(&ptr->hosts[x], request.bytes,
+ sizeof(request.bytes), 1) != MEMCACHED_SUCCESS)
+ {
+ memcached_io_reset(&ptr->hosts[x]);
+ return MEMCACHED_WRITE_FAILURE;
+ }
+ }
+
+ for (x= 0; x < ptr->number_of_hosts; x++)
+ {
+ if (memcached_server_response_count(&ptr->hosts[x]) > 0)
+ (void)memcached_response(&ptr->hosts[x], NULL, 0, NULL);
+ }
+
return MEMCACHED_SUCCESS;
}
return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
}
+static memcached_return binary_mget_by_key(memcached_st *ptr,
+ unsigned int master_server_key,
+ char **keys, size_t *key_length,
+ unsigned int number_of_keys);
+
memcached_return memcached_mget_by_key(memcached_st *ptr,
const char *master_key,
size_t master_key_length,
(void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
}
}
+
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ return binary_mget_by_key(ptr, master_server_key, keys,
+ key_length, number_of_keys);
/*
If a server fails we warn about errors and start all over with sending keys
LIBMEMCACHED_MEMCACHED_MGET_END();
return rc;
}
+
+static memcached_return binary_mget_by_key(memcached_st *ptr,
+ unsigned int master_server_key,
+ char **keys, size_t *key_length,
+ unsigned int number_of_keys)
+{
+ memcached_return rc= MEMCACHED_NOTFOUND;
+
+ int flush= number_of_keys == 1;
+
+ /*
+ If a server fails we warn about errors and start all over with sending keys
+ to the server.
+ */
+ for (int x= 0; x < number_of_keys; x++)
+ {
+ unsigned int server_key;
+
+ if (master_server_key)
+ server_key= master_server_key;
+ else
+ server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
+
+ if (memcached_server_response_count(&ptr->hosts[server_key]) == 0)
+ {
+ rc= memcached_connect(&ptr->hosts[server_key]);
+ if (rc != MEMCACHED_SUCCESS)
+ continue;
+ }
+
+ protocol_binary_request_getk request= {0};
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ if (number_of_keys == 1)
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
+ else
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
+
+ request.message.header.request.keylen= htons((uint16_t)key_length[x]);
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+ request.message.header.request.bodylen= htonl(key_length[x]);
+
+ if ((memcached_io_write(&ptr->hosts[server_key], request.bytes,
+ sizeof(request.bytes), 0) == -1) ||
+ (memcached_io_write(&ptr->hosts[server_key], keys[x],
+ key_length[x], flush) == -1))
+ {
+ memcached_server_response_reset(&ptr->hosts[server_key]);
+ rc= MEMCACHED_SOME_ERRORS;
+ continue;
+ }
+ memcached_server_response_increment(&ptr->hosts[server_key]);
+ }
+
+ if (number_of_keys > 1)
+ {
+ /*
+ * Send a noop command to flush the buffers
+ */
+ protocol_binary_request_noop request= {0};
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+ for (int x= 0; x < ptr->number_of_hosts; x++)
+ if (memcached_server_response_count(&ptr->hosts[x]))
+ {
+ if (memcached_io_write(&ptr->hosts[x], NULL, 0, 1) == -1)
+ {
+ memcached_server_response_reset(&ptr->hosts[x]);
+ memcached_io_reset(&ptr->hosts[x]);
+ rc= MEMCACHED_SOME_ERRORS;
+ }
+
+ if (memcached_io_write(&ptr->hosts[x], request.bytes,
+ sizeof(request.bytes), 1) == -1)
+ {
+ memcached_server_response_reset(&ptr->hosts[x]);
+ memcached_io_reset(&ptr->hosts[x]);
+ rc= MEMCACHED_SOME_ERRORS;
+ }
+ memcached_server_response_increment(&ptr->hosts[x]);
+ }
+ }
+
+
+ return rc;
+}
#endif
ssize_t memcached_io_read(memcached_server_st *ptr,
- char *buffer, size_t length)
+ void *buffer, size_t length)
{
char *buffer_ptr;
break;
}
- return (size_t)(buffer_ptr - buffer);
+ return (size_t)(buffer_ptr - (char*)buffer);
}
ssize_t memcached_io_write(memcached_server_st *ptr,
- const char *buffer, size_t length, char with_flush)
+ const void *buffer, size_t length, char with_flush)
{
size_t original_length;
const char* buffer_ptr;
#include <memcached.h>
ssize_t memcached_io_write(memcached_server_st *ptr,
- const char *buffer, size_t length, char with_flush);
+ const void *buffer, size_t length, char with_flush);
void memcached_io_reset(memcached_server_st *ptr);
ssize_t memcached_io_read(memcached_server_st *ptr,
- char *buffer, size_t length);
+ void *buffer, size_t length);
memcached_return memcached_io_close(memcached_server_st *ptr, uint8_t io_death);
ssize_t read_length;
char buffer[MEMCACHED_MAX_BUFFER];
- rc= memcached_do(ptr, "quit\r\n", 6, 1);
- WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED);
+ if (ptr->root->flags & MEM_BINARY_PROTOCOL)
+ {
+ protocol_binary_request_quit request = {0};
+ request.message.header.request.magic = PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode = PROTOCOL_BINARY_CMD_QUIT;
+ request.message.header.request.datatype = PROTOCOL_BINARY_RAW_BYTES;
+ rc= memcached_do(ptr, request.bytes, sizeof(request.bytes), 1);
+ } else
+ rc= memcached_do(ptr, "quit\r\n", 6, 1);
+ WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED);
+
/* read until socket is closed, or there is an error
* closing the socket before all data is read
* results in server throwing away all data which is
#include "common.h"
#include "memcached_io.h"
+static memcached_return binary_response(memcached_server_st *ptr,
+ char *buffer, size_t buffer_length,
+ memcached_result_st *result);
+
memcached_return memcached_response(memcached_server_st *ptr,
char *buffer, size_t buffer_length,
memcached_result_st *result)
if (ptr->root->flags & MEM_NO_BLOCK)
(void)memcached_io_write(ptr, NULL, 0, 1);
+ if (ptr->root->flags & MEM_BINARY_PROTOCOL)
+ return binary_response(ptr, buffer, buffer_length, result);
+
max_messages= memcached_server_response_count(ptr);
for (x= 0; x < max_messages; x++)
{
memcached_string_st *sptr= &ptr->value;
return memcached_string_length(sptr);
}
+
+/**
+ * Read a given number of bytes from the server and place it into a specific
+ * buffer. Reset the IO channel or this server if an error occurs.
+ */
+static memcached_return safe_read(memcached_server_st *ptr, void *dta,
+ size_t size)
+{
+ int offset= 0;
+ char *data= dta;
+ while (offset < size)
+ {
+ ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
+ if (nread <= 0)
+ {
+ memcached_io_reset(ptr);
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+ }
+ offset += nread;
+ }
+
+ return MEMCACHED_SUCCESS;
+}
+
+static memcached_return binary_response(memcached_server_st *ptr,
+ char *buffer,
+ size_t buffer_length,
+ memcached_result_st *result)
+{
+ protocol_binary_response_header header;
+ memcached_server_response_decrement(ptr);
+
+ unlikely (safe_read(ptr, &header.bytes,
+ sizeof(header.bytes)) != MEMCACHED_SUCCESS)
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+
+ unlikely (header.response.magic != PROTOCOL_BINARY_RES)
+ {
+ memcached_io_reset(ptr);
+ return MEMCACHED_PROTOCOL_ERROR;
+ }
+
+ /*
+ ** Convert the header to host local endian!
+ */
+ header.response.keylen= ntohs(header.response.keylen);
+ header.response.status= ntohs(header.response.status);
+ header.response.bodylen= ntohl(header.response.bodylen);
+ header.response.cas= ntohll(header.response.cas);
+ uint32_t bodylen= header.response.bodylen;
+
+ if (header.response.status == 0)
+ {
+ if ((header.response.opcode == PROTOCOL_BINARY_CMD_GETK) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_GETKQ))
+ {
+ uint16_t keylen= header.response.keylen;
+ memcached_result_reset(result);
+ result->cas= header.response.cas;
+
+ if (safe_read(ptr, &result->flags,
+ sizeof(result->flags)) != MEMCACHED_SUCCESS)
+ {
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+ }
+ result->flags= ntohl(result->flags);
+ bodylen -= header.response.extlen;
+
+ result->key_length= keylen;
+ if (safe_read(ptr, result->key, keylen) != MEMCACHED_SUCCESS)
+ {
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+ }
+
+ bodylen -= keylen;
+ if (memcached_string_check(&result->value,
+ bodylen) != MEMCACHED_SUCCESS)
+ {
+ memcached_io_reset(ptr);
+ return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
+ }
+
+ char *vptr= memcached_string_value(&result->value);
+ if (safe_read(ptr, vptr, bodylen) != MEMCACHED_SUCCESS)
+ {
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+ }
+
+ memcached_string_set_length(&result->value, bodylen);
+ }
+ else if ((header.response.opcode == PROTOCOL_BINARY_CMD_INCREMENT) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_DECREMENT))
+ {
+ if (bodylen != sizeof(uint64_t) || buffer_length != sizeof(uint64_t))
+ {
+ return MEMCACHED_PROTOCOL_ERROR;
+ }
+
+ WATCHPOINT_ASSERT(bodylen == buffer_length);
+ uint64_t val;
+ if (safe_read(ptr, &val, sizeof(val)) != MEMCACHED_SUCCESS)
+ {
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+ }
+
+ val= ntohll(val);
+ memcpy(buffer, &val, sizeof(val));
+ }
+ else if (header.response.opcode == PROTOCOL_BINARY_CMD_VERSION)
+ {
+ memset(buffer, 0, buffer_length);
+ if (bodylen >= buffer_length)
+ /* not enough space in buffer.. should not happen... */
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+ else
+ safe_read(ptr, buffer, bodylen);
+ }
+ else if ((header.response.opcode == PROTOCOL_BINARY_CMD_FLUSH) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_QUIT) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_SET) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_ADD) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_REPLACE) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_APPEND) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_PREPEND) ||
+ (header.response.opcode == PROTOCOL_BINARY_CMD_DELETE))
+ {
+ WATCHPOINT_ASSERT(bodylen == 0);
+ return MEMCACHED_SUCCESS;
+ }
+ else if (header.response.opcode == PROTOCOL_BINARY_CMD_NOOP)
+ {
+ WATCHPOINT_ASSERT(bodylen == 0);
+ return MEMCACHED_END;
+ }
+ else if (header.response.opcode == PROTOCOL_BINARY_CMD_STAT)
+ {
+ if (bodylen == 0)
+ return MEMCACHED_END;
+ else if (bodylen + 1 > buffer_length)
+ /* not enough space in buffer.. should not happen... */
+ return MEMCACHED_UNKNOWN_READ_FAILURE;
+ else
+ {
+ size_t keylen= header.response.keylen;
+ memset(buffer, 0, buffer_length);
+ safe_read(ptr, buffer, keylen);
+ safe_read(ptr, buffer + keylen + 1, bodylen - keylen);
+ }
+ }
+ else
+ {
+ /* Command not implemented yet! */
+ WATCHPOINT_ASSERT(0);
+ memcached_io_reset(ptr);
+ return MEMCACHED_PROTOCOL_ERROR;
+ }
+ }
+ else if (header.response.bodylen)
+ {
+ /* What should I do with the error message??? just discard it for now */
+ char hole[1024];
+ while (bodylen > 0)
+ {
+ size_t nr= (bodylen > sizeof(hole)) ? sizeof(hole) : bodylen;
+ safe_read(ptr, hole, nr);
+ bodylen -= nr;
+ }
+ }
+
+ memcached_return rc= MEMCACHED_SUCCESS;
+ unlikely(header.response.status != 0)
+ switch (header.response.status)
+ {
+ case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
+ rc= MEMCACHED_NOTFOUND;
+ break;
+ case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
+ rc= MEMCACHED_DATA_EXISTS;
+ break;
+ case PROTOCOL_BINARY_RESPONSE_E2BIG:
+ case PROTOCOL_BINARY_RESPONSE_EINVAL:
+ case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
+ case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
+ case PROTOCOL_BINARY_RESPONSE_ENOMEM:
+ default:
+ /* @todo fix the error mappings */
+ rc= MEMCACHED_PROTOCOL_ERROR;
+ break;
+ }
+
+ return rc;
+}
return ret;
}
-static memcached_return memcached_stats_fetch(memcached_st *ptr,
+static memcached_return binary_stats_fetch(memcached_st *ptr,
+ memcached_stat_st *stat,
+ char *args,
+ unsigned int server_key)
+{
+ memcached_return rc;
+
+ char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+ protocol_binary_request_stats request= {0};
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_STAT;
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+ if (args != NULL)
+ {
+ int len= strlen(args);
+ request.message.header.request.keylen= htons((uint16_t)len);
+ request.message.header.request.bodylen= htonl(len);
+
+ if ((memcached_do(&ptr->hosts[server_key], request.bytes,
+ sizeof(request.bytes), 0) != MEMCACHED_SUCCESS) ||
+ (memcached_io_write(&ptr->hosts[server_key], args, len, 1) == -1))
+ {
+ memcached_io_reset(&ptr->hosts[server_key]);
+ return MEMCACHED_WRITE_FAILURE;
+ }
+ }
+ else
+ {
+ if (memcached_do(&ptr->hosts[server_key], request.bytes,
+ sizeof(request.bytes), 1) != MEMCACHED_SUCCESS)
+ {
+ memcached_io_reset(&ptr->hosts[server_key]);
+ return MEMCACHED_WRITE_FAILURE;
+ }
+ }
+
+ memcached_server_response_decrement(&ptr->hosts[server_key]);
+ do
+ {
+ rc= memcached_response(&ptr->hosts[server_key], buffer,
+ sizeof(buffer), NULL);
+ if (rc == MEMCACHED_END)
+ break;
+
+ unlikely (rc != MEMCACHED_SUCCESS)
+ {
+ memcached_io_reset(&ptr->hosts[server_key]);
+ return rc;
+ }
+
+ set_data(stat, buffer, buffer + strlen(buffer) + 1);
+ } while (1);
+
+ /* shit... memcached_response will decrement the counter, so I need to
+ ** reset it.. todo: look at this and try to find a better solution.
+ */
+ ptr->hosts[server_key].cursor_active= 0;
+
+ return MEMCACHED_SUCCESS;
+}
+
+static memcached_return ascii_stats_fetch(memcached_st *ptr,
memcached_stat_st *stat,
char *args,
unsigned int server_key)
for (x= 0; x < ptr->number_of_hosts; x++)
{
memcached_return temp_return;
-
- temp_return= memcached_stats_fetch(ptr, stats + x, args, x);
+
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ temp_return= binary_stats_fetch(ptr, stats + x, args, x);
+ else
+ temp_return= ascii_stats_fetch(ptr, stats + x, args, x);
+
if (temp_return != MEMCACHED_SUCCESS)
rc= MEMCACHED_SOME_ERRORS;
}
memcached_server_add(&memc, hostname, port);
- rc= memcached_stats_fetch(&memc, stat, args, 0);
+ if (memc.flags & MEM_BINARY_PROTOCOL)
+ rc= binary_stats_fetch(&memc, stat, args, 0);
+ else
+ rc= ascii_stats_fetch(&memc, stat, args, 0);
memcached_free(&memc);
memcached_add()
*/
-
#include "common.h"
#include "memcached_io.h"
return SET_OP;
}
+static memcached_return memcached_send_binary(memcached_server_st* server,
+ const char *key,
+ size_t key_length,
+ const char *value,
+ size_t value_length,
+ time_t expiration,
+ uint32_t flags,
+ uint64_t cas,
+ memcached_storage_action verb);
+
static inline memcached_return memcached_send(memcached_st *ptr,
const char *master_key, size_t master_key_length,
const char *key, size_t key_length,
server_key= memcached_generate_hash(ptr, master_key, master_key_length);
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ return memcached_send_binary(&ptr->hosts[server_key], key, key_length,
+ value, value_length, expiration,
+ flags, cas, verb);
+
if (cas)
write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
"%s %s%.*s %u %llu %zu %llu\r\n", storage_op_string(verb),
return rc;
}
+
memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
const char *value, size_t value_length,
time_t expiration,
expiration, flags, cas, CAS_OP);
return rc;
}
+
+static memcached_return memcached_send_binary(memcached_server_st* server,
+ const char *key,
+ size_t key_length,
+ const char *value,
+ size_t value_length,
+ time_t expiration,
+ uint32_t flags,
+ uint64_t cas,
+ memcached_storage_action verb)
+{
+ protocol_binary_request_set request= {0};
+ size_t send_length= sizeof(request.bytes);
+
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ switch (verb)
+ {
+ case SET_OP:
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SET;
+ break;
+ case ADD_OP:
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_ADD;
+ break;
+ case REPLACE_OP:
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
+ break;
+ case APPEND_OP:
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_APPEND;
+ break;
+ case PREPEND_OP:
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_PREPEND;
+ break;
+ case CAS_OP:
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
+ break;
+ default:
+ abort();
+ }
+
+ request.message.header.request.keylen= htons((uint16_t)key_length);
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+ if (verb == APPEND_OP || verb == PREPEND_OP)
+ send_length -= 8; /* append & prepend does not contain extras! */
+ else {
+ request.message.header.request.extlen= 8;
+ request.message.body.flags= htonl(flags);
+ request.message.body.expiration= htonl((uint32_t)expiration);
+ }
+
+ request.message.header.request.bodylen= htonl(key_length + value_length +
+ request.message.header.request.extlen);
+
+ if (cas)
+ request.message.header.request.cas= htonll(cas);
+
+ char flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
+ /* write the header */
+ if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
+ (memcached_io_write(server, key, key_length, 0) == -1) ||
+ (memcached_io_write(server, value, value_length, flush) == -1))
+ {
+ memcached_io_reset(server);
+ return MEMCACHED_WRITE_FAILURE;
+ }
+
+ if (flush == 0)
+ return MEMCACHED_BUFFERED;
+
+ return memcached_response(server, NULL, 0, NULL);
+}
+
return LIBMEMCACHED_VERSION_STRING;
}
+static inline memcached_return memcached_version_binary(memcached_st *ptr);
+static inline memcached_return memcached_version_textual(memcached_st *ptr);
+
memcached_return memcached_version(memcached_st *ptr)
+{
+ if (ptr->flags & MEM_BINARY_PROTOCOL)
+ return memcached_version_binary(ptr);
+ else
+ return memcached_version_textual(ptr);
+}
+
+static inline memcached_return memcached_version_textual(memcached_st *ptr)
{
unsigned int x;
size_t send_length;
return rc;
}
+
+static inline memcached_return memcached_version_binary(memcached_st *ptr)
+{
+ memcached_return rc;
+ unsigned int x;
+ protocol_binary_request_version request= {0};
+ request.message.header.request.magic= PROTOCOL_BINARY_REQ;
+ request.message.header.request.opcode= PROTOCOL_BINARY_CMD_VERSION;
+ request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
+
+ rc= MEMCACHED_SUCCESS;
+ for (x= 0; x < ptr->number_of_hosts; x++)
+ {
+ memcached_return rrc;
+
+ rrc= memcached_do(&ptr->hosts[x], request.bytes, sizeof(request.bytes), 1);
+ if (rrc != MEMCACHED_SUCCESS)
+ {
+ memcached_io_reset(&ptr->hosts[x]);
+ rc= MEMCACHED_SOME_ERRORS;
+ continue;
+ }
+ }
+
+ for (x= 0; x < ptr->number_of_hosts; x++)
+ if (memcached_server_response_count(&ptr->hosts[x]) > 0)
+ {
+ memcached_return rrc;
+ char buffer[32];
+ char *p;
+
+ rrc= memcached_response(&ptr->hosts[x], buffer, sizeof(buffer), NULL);
+ if (rrc != MEMCACHED_SUCCESS)
+ {
+ memcached_io_reset(&ptr->hosts[x]);
+ rc= MEMCACHED_SOME_ERRORS;
+ }
+
+ ptr->hosts[x].major_version= (uint8_t)strtol(buffer, &p, 10);
+ ptr->hosts[x].minor_version= (uint8_t)strtol(p + 1, &p, 10);
+ ptr->hosts[x].micro_version= (uint8_t)strtol(p + 1, NULL, 10);
+ }
+
+ return rc;
+}
if (setting_value)
assert(rc == MEMCACHED_NOTSTORED || rc == MEMCACHED_STORED);
else
- assert(rc == MEMCACHED_NOTSTORED);
+ assert(rc == MEMCACHED_NOTSTORED || rc == MEMCACHED_DATA_EXISTS);
return 0;
}
assert(value == MEMCACHED_HASH_MD5);
return MEMCACHED_SUCCESS;
}
+
+memcached_return pre_binary(memcached_st *memc)
+{
+ memcached_return rc;
+
+ rc = memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL, 1);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ assert(memcached_behavior_get(memc, MEMCACHED_BEHAVIOR_BINARY_PROTOCOL) == 1);
+
+ return MEMCACHED_SUCCESS;
+
+}
void my_free(memcached_st *ptr, void *mem)
{
free(mem);
memcached_version(memc);
if (memc->hosts[0].major_version >= 1 &&
- memc->hosts[0].minor_version >= 2 &&
- memc->hosts[0].micro_version >= 4)
+ (memc->hosts[0].minor_version == 2 &&
+ memc->hosts[0].micro_version >= 4) || memc->hosts[0].minor_version > 2)
{
memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_SUPPORT_CAS, set);
memcached_version(memc);
if (memc->hosts[0].major_version >= 1 &&
- memc->hosts[0].minor_version >= 2 &&
- memc->hosts[0].micro_version >= 4)
+ (memc->hosts[0].minor_version == 2 && memc->hosts[0].micro_version >= 4)
+ || memc->hosts[0].minor_version > 2)
return MEMCACHED_SUCCESS;
return MEMCACHED_FAILURE;
collection_st collection[] ={
{"block", 0, 0, tests},
+ {"binary", pre_binary, 0, tests},
{"nonblock", pre_nonblock, 0, tests},
{"nodelay", pre_nodelay, 0, tests},
{"md5", pre_md5, 0, tests},