0.4
+ * Added buffered IO to write calls for keys
* memstat was broken (bad if/else on connect)
0.3 Mon Oct 1 06:37:52 PDT 2007
#define SMALL_STRING_LEN 1024
#define HUGE_STRING_LEN 8196
#define MEMCACHED_MAX_KEY 251 /* We add one to have it null terminated */
+//#define MEMCACHED_MAX_BUFFER 8196
+#define MEMCACHED_MAX_BUFFER 30
typedef enum {
MEMCACHED_SUCCESS,
memcached_server_st *hosts;
unsigned int number_of_hosts;
unsigned int cursor_server;
+ char write_buffer[MEMCACHED_MAX_BUFFER];
+ size_t write_buffer_offset;
+ size_t write_between_flush;
char connected;
};
memcached_stats.o
noinst_HEADERS = libmemcached_probes.h \
+ memcached_io.h \
common.h
lib_LTLIBRARIES = libmemcached.la
libmemcached_la_SOURCES = memcached.c \
- memcached_strerror.c \
+ memcached_auto.c \
memcached_connect.c \
- memcached_response.c \
- memcached_get.c \
- memcached_storage.c \
memcached_delete.c \
+ memcached_flush.c \
+ memcached_get.c \
memcached_hash.c \
- memcached_auto.c \
- memcached_verbosity.c \
+ memcached_hosts.c \
+ memcached_io.c \
memcached_quit.c \
- memcached_flush.c \
+ memcached_response.c \
+ memcached_storage.c \
memcached_string.c \
- memcached_hosts.c \
- memcached_stats.c
+ memcached_stats.c \
+ memcached_strerror.c \
+ memcached_verbosity.c
libmemcached_la_LIBADD =
if HAVE_DTRACE
"delete %.*s\r\n", (int)key_length, key);
if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
- return MEMCACHED_WRITE_FAILURE;
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ goto error;
+ }
sent_length= send(ptr->hosts[server_key].fd, buffer, send_length, 0);
if (sent_length == -1 || sent_length != send_length)
- return MEMCACHED_WRITE_FAILURE;
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ goto error;
+ }
rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
- LIBMEMCACHED_MEMCACHED_DELETE_END();
if (rc == MEMCACHED_DELETED)
rc= MEMCACHED_SUCCESS;
+ LIBMEMCACHED_MEMCACHED_DELETE_END();
+
+error:
return rc;
}
--- /dev/null
+/*
+ Basic socket buffered IO
+*/
+
+#include <memcached.h>
+
+ssize_t memcached_io_read(memcached_st *ptr, char *buf, size_t length)
+{
+ return -1;
+}
+
+ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
+ char *buffer, size_t length)
+{
+ unsigned long long x;
+
+ for (x= 0; x < length; x++)
+ {
+ ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
+ ptr->write_buffer_offset++;
+ if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
+ {
+ size_t sent_length;
+
+ if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer,
+ MEMCACHED_MAX_BUFFER, 0)) == -1)
+ return -1;
+
+ assert(sent_length == MEMCACHED_MAX_BUFFER);
+ ptr->write_between_flush+= MEMCACHED_MAX_BUFFER;
+
+ ptr->write_buffer_offset= 0;
+ }
+ }
+
+ return length;
+}
+
+ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
+{
+ size_t sent_length;
+
+ if (ptr->write_buffer_offset == 0)
+ return 0;
+
+ if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer,
+ ptr->write_buffer_offset, 0)) == -1)
+ return -1;
+
+ assert(sent_length == ptr->write_buffer_offset);
+
+ sent_length+= ptr->write_between_flush;
+
+ ptr->write_buffer_offset= 0;
+ ptr->write_between_flush= 0;
+
+ return sent_length;
+}
+
+/*
+ Eventually we will just kill off the server with the problem.
+*/
+void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
+{
+ ptr->write_buffer_offset= 0;
+ memcached_quit(ptr);
+}
--- /dev/null
+/* Server IO, Not public! */
+#include <memcached.h>
+
+ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key);
+ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
+ char *buffer, size_t length);
+void memcached_io_reset(memcached_st *ptr, unsigned int server_key);
*/
#include "common.h"
+#include "memcached_io.h"
static memcached_return memcached_send(memcached_st *ptr,
char *key, size_t key_length,
if (rc != MEMCACHED_SUCCESS)
return rc;
+ assert(ptr->write_buffer_offset == 0);
+
server_key= memcached_generate_hash(key, key_length) % ptr->number_of_hosts;
write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
(int)key_length, key, flags,
(unsigned long long)expiration, value_length);
if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
- return MEMCACHED_WRITE_FAILURE;
- if ((sent_length= send(ptr->hosts[server_key].fd, buffer, write_length, 0)) == -1)
- return MEMCACHED_WRITE_FAILURE;
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ goto error;
+ }
+
+ if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length)) == -1)
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ goto error;
+ }
assert(write_length == sent_length);
- if ((sent_length= send(ptr->hosts[server_key].fd, value, value_length, 0)) == -1)
+ /*
+ We have to flush after sending the command. Memcached is not smart enough
+ to just keep reading from the socket :(
+ */
+ if ((sent_length= memcached_io_flush(ptr, server_key)) == -1)
return MEMCACHED_WRITE_FAILURE;
+
+ if ((sent_length= memcached_io_write(ptr, server_key, value, value_length)) == -1)
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ goto error;
+ }
assert(value_length == sent_length);
- if ((sent_length= send(ptr->hosts[server_key].fd, "\r\n", 2, 0)) == -1)
- return MEMCACHED_WRITE_FAILURE;
+ if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2)) == -1)
+ {
+ rc= MEMCACHED_WRITE_FAILURE;
+ goto error;
+ }
+
assert(2 == sent_length);
+ if ((sent_length= memcached_io_flush(ptr, server_key)) == -1)
+ return MEMCACHED_WRITE_FAILURE;
+
+ //assert(sent_length == write_length + value_length + 2);
+
sent_length= recv(ptr->hosts[server_key].fd, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, 0);
if (sent_length && buffer[0] == 'S') /* STORED */
return MEMCACHED_SUCCESS;
else if (write_length && buffer[0] == 'N') /* NOT_STORED */
return MEMCACHED_NOTSTORED;
+ else if (write_length && buffer[0] == 'E') /* ERROR */
+ {
+ printf("BUFFER :%s:\n", buffer);
+ return MEMCACHED_PROTOCOL_ERROR;
+ }
else
return MEMCACHED_READ_FAILURE;
+
+error:
+ memcached_io_reset(ptr, server_key);
+
+ return rc;
}
memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length,
memcached_free(memc);
}
+void set_test2(void)
+{
+ memcached_st *memc;
+ memcached_return rc;
+ char *key= "foo";
+ char *value= "train in the brain";
+ size_t value_length= strlen(value);
+ unsigned int x;
+
+ memc= memcached_create(NULL);
+ assert(memc);
+ rc= memcached_server_add(memc, "localhost", 0);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ for (x= 0; x < 10; x++)
+ {
+ rc= memcached_set(memc, key, strlen(key),
+ value, value_length,
+ (time_t)0, (uint16_t)0);
+ assert(rc == MEMCACHED_SUCCESS);
+ }
+
+ memcached_free(memc);
+}
+
+void set_test3(void)
+{
+ memcached_st *memc;
+ memcached_return rc;
+ char *key= "foo";
+ char *value;
+ size_t value_length= 8191;
+ unsigned int x;
+
+ value = (char*)malloc(value_length);
+ assert(value);
+
+ for (x= 0; x < value_length; x++)
+ value[x] = (char) (x % 127);
+
+ memc= memcached_create(NULL);
+ assert(memc);
+ rc= memcached_server_add(memc, "localhost", 0);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ for (x= 0; x < 1; x++)
+ {
+ rc= memcached_set(memc, key, strlen(key),
+ value, value_length,
+ (time_t)0, (uint16_t)0);
+ assert(rc == MEMCACHED_SUCCESS);
+ }
+
+ free(value);
+
+ memcached_free(memc);
+}
+
void get_test3(void)
{
memcached_st *memc;
connection_test();
error_test();
set_test();
+ set_test2();
+ set_test3();
add_test();
replace_test();
flush_test();
flush_test();
get_test();
get_test2();
- get_test3();
+ get_test3();
get_test4();
stats_servername_test();