before close.
Refactor of memcached_get() to use common code.
* Updates for consistent hashing
* IPV6 support
* Static allocation for hostname (performance)
+ * Fixed bug where in non-block mode all data might not have been sent on
+ close().
+ * Refactor of memcached_get() to use common code.
+
0.11 Mon Nov 26 01:05:52 PST 2007
* Added option to memcache_behavior_set() so that poll() can be timed out.
memcached_hash hash;
memcached_server_distribution distribution;
unsigned int wheel[MEMCACHED_WHEEL_SIZE];
+ uint8_t replicas;
memcached_return warning; /* Future Use */
};
#define WATCHPOINT_ERROR(A) fprintf(stderr, "\nWATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout);
#endif
#define WATCHPOINT_STRING(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__,A);fflush(stdout);
+#define WATCHPOINT_STRING_LENGTH(A,B) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %.*s\n", __FILE__, __LINE__,__func__,(int)B,A);fflush(stdout);
#define WATCHPOINT_NUMBER(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %zu\n", __FILE__, __LINE__,__func__,(size_t)(A));fflush(stdout);
#define WATCHPOINT_ERRNO(A) fprintf(stderr, "\nWATCHPOINT %s:%d (%s) %s\n", __FILE__, __LINE__,__func__, strerror(A));A= 0;fflush(stdout);
#define WATCHPOINT_ASSERT(A) assert((A));
#define WATCHPOINT { 1; };
#define WATCHPOINT_ERROR(A) { 1; };
#define WATCHPOINT_STRING(A) { 1; };
+#define WATCHPOINT_STRING_LENGTH(A,B) { 1; };
#define WATCHPOINT_NUMBER(A) { 1; };
#define WATCHPOINT_ERRNO(A) { 1; };
#define WATCHPOINT_ASSERT(A) { 1; };
#include <assert.h>
#include <time.h>
#include <errno.h>
+#include <fcntl.h>
+#include <sys/un.h>
+#include <netinet/tcp.h>
+
#include <memcached.h>
#define MEMCACHED_BLOCK_SIZE 1024
+typedef enum {
+ MEM_NO_FLUSH,
+ MEM_FLUSH,
+} memcached_flush_action;
+
typedef enum {
MEM_NO_BLOCK= (1 << 0),
MEM_TCP_NODELAY= (1 << 1),
#include "common.h"
-#include <fcntl.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <netinet/tcp.h>
-#include <netdb.h>
-#include <netinet/in.h>
-
static memcached_return set_hostinfo(memcached_server_st *server)
{
struct addrinfo *ai;
use->ai_protocol)) < 0)
{
ptr->cached_errno= errno;
+ WATCHPOINT_ERRNO(errno);
return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
}
- /* For the moment, not getting a nonblocking mode will note be fatal */
+ /* For the moment, not getting a nonblocking mode will not be fatal */
if (ptr->flags & MEM_NO_BLOCK)
{
int flags;
uint16_t *flags,
memcached_return *error)
{
- char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- char *buf_ptr= buffer;
- unsigned int server_key;
- memcached_string_st *result_buffer;
- LIBMEMCACHED_MEMCACHED_GET_START();
+ char *value;
+ char *dummy_value;
+ size_t dummy_length;
+ uint16_t dummy_flags;
+ memcached_return dummy_error;
- if (key_length == 0)
- {
- *error= MEMCACHED_NO_KEY_PROVIDED;
- return NULL;
- }
+ /* Request the key */
+ *error= memcached_mget(ptr, &key, &key_length, 1);
- if (ptr->hosts == NULL || ptr->number_of_hosts == 0)
- {
- *error= MEMCACHED_NO_SERVERS;
- return NULL;
- }
-
- server_key= memcached_generate_hash(ptr, key, key_length);
- result_buffer= &ptr->result_buffer;
+ value= memcached_fetch(ptr, NULL, NULL,
+ value_length, flags, error);
- *value_length= 0;
- memcpy(buf_ptr, "get ", 4);
- buf_ptr+= 4;
- memcpy(buf_ptr, key, key_length);
- buf_ptr+= key_length;
- memcpy(buf_ptr, "\r\n", 2);
- buf_ptr+= 2;
-
- *error= memcached_do(ptr, server_key, buffer, (size_t)(buf_ptr - buffer), 1);
- if (*error != MEMCACHED_SUCCESS)
- goto error;
-
- *error= memcached_value_fetch(ptr, NULL, NULL, result_buffer,
- flags, NULL, server_key);
- *value_length= memcached_string_length(result_buffer);
- if (*error == MEMCACHED_END && *value_length == 0)
- {
- *error= MEMCACHED_NOTFOUND;
- goto error;
- }
- else if (*error == MEMCACHED_END)
- {
- WATCHPOINT_ASSERT(0); /* If this happens we have somehow messed up the fetch */
- }
- else if (*error == MEMCACHED_SUCCESS)
- {
- memcached_return rc;
- /* We need to read END */
- rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
-
- if (rc != MEMCACHED_END)
- {
- *error= MEMCACHED_PROTOCOL_ERROR;
- goto error;
- }
- }
- else
- goto error;
-
- LIBMEMCACHED_MEMCACHED_GET_END();
-
- return memcached_string_c_copy(result_buffer);
+ if (value == NULL)
+ return NULL;
-error:
- *value_length= 0;
+ /* We do a second read to clean the cursor */
+ dummy_value= memcached_fetch(ptr, NULL, NULL,
+ &dummy_length, &dummy_flags,
+ &dummy_error);
- LIBMEMCACHED_MEMCACHED_GET_END();
+ /* Something is really wrong if this happens */
+ WATCHPOINT_ASSERT(dummy_value == NULL);
+ if (dummy_value)
+ free(dummy_value);
- return NULL;
+ return value;
}
memcached_return memcached_mget(memcached_st *ptr,
return length;
}
+memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
+{
+ struct pollfd fds[1];
+ short flags= 0;
+ struct timespec timer;
+ memcached_return rc;
+
+ timer.tv_sec= 1;
+ timer.tv_nsec= 0;
+ flags= POLLHUP | POLLERR;
+
+ memset(&fds, 0, sizeof(struct pollfd));
+ fds[0].fd= ptr->hosts[server_key].fd;
+ fds[0].events= flags;
+ fds[0].revents= flags;
+
+ if (poll(fds, 1, ptr->poll_timeout) < 0)
+ rc= MEMCACHED_FAILURE;
+ else
+ rc= MEMCACHED_SUCCESS;
+
+ close(ptr->hosts[server_key].fd);
+
+ return rc;
+}
+
ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
{
size_t sent_length;
void memcached_io_reset(memcached_st *ptr, unsigned int server_key);
ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
char *buffer, size_t length);
+memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key);
if (ptr->hosts[server_key].fd != -1)
{
- if (ptr->flags & MEM_NO_BLOCK && ptr->hosts[server_key].stack_responses)
- memcached_io_flush(ptr, server_key);
+ memcached_return rc;
+ rc= memcached_do(ptr, server_key, "quit\r\n", 6, 1);
+ WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS);
- memcached_io_write(ptr, server_key, "quit\r\n", 6, 1);
-
- close(ptr->hosts[server_key].fd);
+ memcached_io_close(ptr, server_key);
ptr->hosts[server_key].fd= -1;
ptr->hosts[server_key].stack_responses= 0;
ptr->hosts[server_key].cursor_active= 0;
if (key_length == 0)
return MEMCACHED_NO_KEY_PROVIDED;
- if (ptr->hosts == NULL || ptr->number_of_hosts == 0)
- return MEMCACHED_NO_SERVERS;
-
server_key= memcached_generate_hash(ptr, key, key_length);
- rc= memcached_connect(ptr, server_key);
- if (rc != MEMCACHED_SUCCESS)
- return rc;
-
-
if (cas)
write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
"%s %.*s %u %llu %zu %llu\r\n", storage_op_string(verb),
goto error;
}
- /*
- We have to flush after sending the command. Memcached is not smart enough
- to just keep reading from the socket :(
- */
- if ((sent_length= memcached_io_write(ptr, server_key, buffer, write_length, 0)) == -1)
- {
- rc= MEMCACHED_WRITE_FAILURE;
+ rc= memcached_do(ptr, server_key, buffer, write_length, 0);
+ if (rc != MEMCACHED_SUCCESS)
goto error;
- }
if ((sent_length= memcached_io_write(ptr, server_key, value, value_length, 0)) == -1)
{
goto error;
}
- if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP)
+ if (to_write == 0)
{
rc= MEMCACHED_SUCCESS;
memcached_server_response_increment(ptr, server_key);
return 0;
}
+/*
+ Set the value, then quit to make sure it is flushed.
+ Come back in and test that add fails.
+*/
uint8_t add_test(memcached_st *memc)
{
memcached_return rc;
char *key= "foo";
char *value= "when we sanitize";
+ rc= memcached_set(memc, key, strlen(key),
+ value, strlen(value),
+ (time_t)0, (uint16_t)0);
+ assert(rc == MEMCACHED_SUCCESS);
+ memcached_quit(memc);
rc= memcached_add(memc, key, strlen(key),
value, strlen(value),
(time_t)0, (uint16_t)0);
{"set", 0, set_test },
{"set2", 0, set_test2 },
{"set3", 0, set_test3 },
- {"add", 0, add_test },
+ {"add", 1, add_test },
{"replace", 0, replace_test },
{"delete", 1, delete_test },
{"get", 1, get_test },