internal thing... no changes to external interfaces).
+ * Refactor of all IO to just pass in the active server
* Problem configuring (PKG_CHECK_MODULES) fixed by removal of "rpath" in
support/libmemcached.pc.in (Thanks to Ross McFarland).
* Added memcached_callback_get()/set()
char hostname[MEMCACHED_MAX_HOST_LENGTH];
unsigned int port;
int fd;
+ int cached_errno;
unsigned int cursor_active;
char write_buffer[MEMCACHED_MAX_BUFFER];
size_t write_buffer_offset;
- char *write_ptr;
char read_buffer[MEMCACHED_MAX_BUFFER];
size_t read_data_length;
size_t read_buffer_length;
uint8_t minor_version;
uint8_t micro_version;
uint16_t count;
+ memcached_st *root;
};
struct memcached_stat_st {
memcached_server_st *hosts;
unsigned int number_of_hosts;
unsigned int cursor_server;
- char connected;
int cached_errno;
- unsigned long long flags;
+ uint32_t flags;
int send_size;
int recv_size;
int32_t poll_timeout;
#define memcached_server_name(A,B) (B).hostname
#define memcached_server_port(A,B) (B).port
#define memcached_server_list(A) (A)->hosts
-#define memcached_server_response_count(A,B) (A)->hosts[B].cursor_active
+#define memcached_server_response_count(A) (A)->cursor_active
+
memcached_return memcached_server_add_udp(memcached_st *ptr,
char *hostname,
size_t data_len);
uint32_t hsieh_hash(char *key, size_t key_length);
-memcached_return memcached_connect(memcached_st *ptr, unsigned int server_key);
-memcached_return memcached_response(memcached_st *ptr,
+memcached_return memcached_connect(memcached_server_st *ptr);
+memcached_return memcached_response(memcached_server_st *ptr,
char *buffer, size_t buffer_length,
- memcached_result_st *result,
- unsigned int server_key);
+ memcached_result_st *result);
unsigned int memcached_generate_hash(memcached_st *ptr, char *key, size_t key_length);
-void memcached_quit_server(memcached_st *ptr, unsigned int server_key, uint8_t io_death);
+void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death);
-#define memcached_server_response_increment(A,B) A->hosts[B].cursor_active++
-#define memcached_server_response_decrement(A,B) A->hosts[B].cursor_active--
-#define memcached_server_response_reset(A,B) A->hosts[B].cursor_active=0
+#define memcached_server_response_increment(A) (A)->cursor_active++
+#define memcached_server_response_decrement(A) (A)->cursor_active--
+#define memcached_server_response_reset(A) (A)->cursor_active=0
/* String Struct */
#define memcached_string_length(A) (size_t)((A)->end - (A)->string)
size_t memcached_string_backspace(memcached_string_st *string, size_t remove);
memcached_return memcached_string_reset(memcached_string_st *string);
void memcached_string_free(memcached_string_st *string);
-memcached_return memcached_do(memcached_st *ptr, unsigned int server_key, char *commmand,
- size_t command_length, char with_flush);
+memcached_return memcached_do(memcached_server_st *ptr, char *commmand,
+ size_t command_length, uint8_t with_flush);
memcached_return memcached_version(memcached_st *ptr);
-memcached_return value_fetch(memcached_st *ptr,
+memcached_return value_fetch(memcached_server_st *ptr,
char *buffer,
- memcached_result_st *result,
- unsigned int server_key);
+ memcached_result_st *result);
void server_list_free(memcached_st *ptr, memcached_server_st *servers);
memcached_st *new_clone;
if (ptr == NULL)
- {
- new_clone= memcached_create(clone);
-
- if (ptr->on_clone)
- ptr->on_clone(NULL, new_clone);
-
- return new_clone;
- }
+ return memcached_create(clone);
if (ptr->is_allocated == MEMCACHED_USED)
{
if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
return MEMCACHED_WRITE_FAILURE;
- rc= memcached_do(ptr, server_key, buffer, send_length, 1);
+ rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
if (rc != MEMCACHED_SUCCESS)
return rc;
- rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL, server_key);
+ rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
/*
So why recheck responce? Because the protocol is brain dead :)
int sock_size;
socklen_t sock_length= sizeof(int);
+ /* REFACTOR */
/* We just try the first host, and if it is down we return zero */
- if ((memcached_connect(ptr, 0)) != MEMCACHED_SUCCESS)
+ if ((memcached_connect(&ptr->hosts[0])) != MEMCACHED_SUCCESS)
return 0;
if (getsockopt(ptr->hosts[0].fd, SOL_SOCKET,
int sock_size;
socklen_t sock_length= sizeof(int);
+ /* REFACTOR */
/* We just try the first host, and if it is down we return zero */
- if ((memcached_connect(ptr, 0)) != MEMCACHED_SUCCESS)
+ if ((memcached_connect(&ptr->hosts[0])) != MEMCACHED_SUCCESS)
return 0;
if (getsockopt(ptr->hosts[0].fd, SOL_SOCKET,
return MEMCACHED_SUCCESS;
}
-static memcached_return unix_socket_connect(memcached_st *ptr, unsigned int server_key)
+static memcached_return unix_socket_connect(memcached_server_st *ptr)
{
struct sockaddr_un servAddr;
socklen_t addrlen;
- if (ptr->hosts[server_key].fd == -1)
+ if (ptr->fd == -1)
{
- if ((ptr->hosts[server_key].fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
+ if ((ptr->fd= socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
{
ptr->cached_errno= errno;
return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
memset(&servAddr, 0, sizeof (struct sockaddr_un));
servAddr.sun_family= AF_UNIX;
- strcpy(servAddr.sun_path, ptr->hosts[server_key].hostname); /* Copy filename */
+ strcpy(servAddr.sun_path, ptr->hostname); /* Copy filename */
addrlen= strlen(servAddr.sun_path) + sizeof(servAddr.sun_family);
test_connect:
- if (connect(ptr->hosts[server_key].fd,
+ if (connect(ptr->fd,
(struct sockaddr *)&servAddr,
sizeof(servAddr)) < 0)
{
ptr->cached_errno= errno;
return MEMCACHED_ERRNO;
}
- ptr->connected++;
}
}
return MEMCACHED_SUCCESS;
}
-static memcached_return udp_connect(memcached_st *ptr, unsigned int server_key)
+static memcached_return udp_connect(memcached_server_st *ptr)
{
- if (ptr->hosts[server_key].fd == -1)
+ if (ptr->fd == -1)
{
/* Old connection junk still is in the structure */
- WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
+ WATCHPOINT_ASSERT(ptr->cursor_active == 0);
/*
If we have not allocated the hosts object.
Or if the cache has not been set.
*/
- if (ptr->hosts[server_key].sockaddr_inited == MEMCACHED_NOT_ALLOCATED ||
- (!(ptr->flags & MEM_USE_CACHE_LOOKUPS)))
+ if (ptr->sockaddr_inited == MEMCACHED_NOT_ALLOCATED ||
+ (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
{
memcached_return rc;
- rc= set_hostinfo(&ptr->hosts[server_key]);
+ rc= set_hostinfo(ptr);
if (rc != MEMCACHED_SUCCESS)
return rc;
- ptr->hosts[server_key].sockaddr_inited= MEMCACHED_ALLOCATED;
+ ptr->sockaddr_inited= MEMCACHED_ALLOCATED;
}
/* Create the socket */
- if ((ptr->hosts[server_key].fd= socket(AF_INET, SOCK_DGRAM, 0)) < 0)
+ if ((ptr->fd= socket(AF_INET, SOCK_DGRAM, 0)) < 0)
{
ptr->cached_errno= errno;
return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
return MEMCACHED_SUCCESS;
}
-static memcached_return tcp_connect(memcached_st *ptr, unsigned int server_key)
+static memcached_return tcp_connect(memcached_server_st *ptr)
{
- if (ptr->hosts[server_key].fd == -1)
+ if (ptr->fd == -1)
{
struct addrinfo *use;
/* Old connection junk still is in the structure */
- WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
+ WATCHPOINT_ASSERT(ptr->cursor_active == 0);
- if (ptr->hosts[server_key].sockaddr_inited == MEMCACHED_NOT_ALLOCATED ||
- (!(ptr->flags & MEM_USE_CACHE_LOOKUPS)))
+ if (ptr->sockaddr_inited == MEMCACHED_NOT_ALLOCATED ||
+ (!(ptr->root->flags & MEM_USE_CACHE_LOOKUPS)))
{
memcached_return rc;
- rc= set_hostinfo(&ptr->hosts[server_key]);
+ rc= set_hostinfo(ptr);
if (rc != MEMCACHED_SUCCESS)
return rc;
- ptr->hosts[server_key].sockaddr_inited= MEMCACHED_ALLOCATED;
+ ptr->sockaddr_inited= MEMCACHED_ALLOCATED;
}
- use= ptr->hosts[server_key].address_info;
+ use= ptr->address_info;
/* Create the socket */
- if ((ptr->hosts[server_key].fd= socket(use->ai_family,
- use->ai_socktype,
- use->ai_protocol)) < 0)
+ if ((ptr->fd= socket(use->ai_family,
+ use->ai_socktype,
+ use->ai_protocol)) < 0)
{
ptr->cached_errno= errno;
WATCHPOINT_ERRNO(errno);
return MEMCACHED_CONNECTION_SOCKET_CREATE_FAILURE;
}
- if (ptr->flags & MEM_NO_BLOCK)
+ if (ptr->root->flags & MEM_NO_BLOCK)
{
int error;
struct linger linger;
linger.l_onoff= 1;
linger.l_linger= MEMCACHED_DEFAULT_TIMEOUT;
- error= setsockopt(ptr->hosts[server_key].fd, SOL_SOCKET, SO_LINGER,
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_LINGER,
&linger, (socklen_t)sizeof(struct linger));
WATCHPOINT_ASSERT(error == 0);
- error= setsockopt(ptr->hosts[server_key].fd, SOL_SOCKET, SO_SNDTIMEO,
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDTIMEO,
&waittime, (socklen_t)sizeof(struct timeval));
WATCHPOINT_ASSERT(error == 0);
- error= setsockopt(ptr->hosts[server_key].fd, SOL_SOCKET, SO_RCVTIMEO,
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_RCVTIMEO,
&waittime, (socklen_t)sizeof(struct timeval));
WATCHPOINT_ASSERT(error == 0);
}
- if (ptr->flags & MEM_TCP_NODELAY)
+ if (ptr->root->flags & MEM_TCP_NODELAY)
{
int flag= 1;
int error;
- error= setsockopt(ptr->hosts[server_key].fd, IPPROTO_TCP, TCP_NODELAY,
+ error= setsockopt(ptr->fd, IPPROTO_TCP, TCP_NODELAY,
&flag, (socklen_t)sizeof(int));
WATCHPOINT_ASSERT(error == 0);
}
- if (ptr->send_size)
+ if (ptr->root->send_size)
{
int error;
- error= setsockopt(ptr->hosts[server_key].fd, SOL_SOCKET, SO_SNDBUF,
- &ptr->send_size, (socklen_t)sizeof(int));
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
+ &ptr->root->send_size, (socklen_t)sizeof(int));
WATCHPOINT_ASSERT(error == 0);
}
- if (ptr->recv_size)
+ if (ptr->root->recv_size)
{
int error;
- error= setsockopt(ptr->hosts[server_key].fd, SOL_SOCKET, SO_SNDBUF,
- &ptr->recv_size, (socklen_t)sizeof(int));
+ error= setsockopt(ptr->fd, SOL_SOCKET, SO_SNDBUF,
+ &ptr->root->recv_size, (socklen_t)sizeof(int));
WATCHPOINT_ASSERT(error == 0);
}
/* For the moment, not getting a nonblocking mode will not be fatal */
- if (ptr->flags & MEM_NO_BLOCK)
+ if (ptr->root->flags & MEM_NO_BLOCK)
{
int flags;
- flags= fcntl(ptr->hosts[server_key].fd, F_GETFL, 0);
+ flags= fcntl(ptr->fd, F_GETFL, 0);
if (flags != -1)
{
- (void)fcntl(ptr->hosts[server_key].fd, F_SETFL, flags | O_NONBLOCK);
+ (void)fcntl(ptr->fd, F_SETFL, flags | O_NONBLOCK);
}
}
/* connect to server */
test_connect:
- if (connect(ptr->hosts[server_key].fd,
+ if (connect(ptr->fd,
use->ai_addr,
use->ai_addrlen) < 0)
{
ptr->cached_errno= errno;
WATCHPOINT_ASSERT(errno == ECONNREFUSED);
WATCHPOINT_ERRNO(ptr->cached_errno);
- close(ptr->hosts[server_key].fd);
- ptr->hosts[server_key].fd= -1;
+ close(ptr->fd);
+ ptr->fd= -1;
return MEMCACHED_ERRNO;
}
- ptr->connected++;
}
- WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
+ WATCHPOINT_ASSERT(ptr->cursor_active == 0);
}
return MEMCACHED_SUCCESS;
}
-memcached_return memcached_connect(memcached_st *ptr, unsigned int server_key)
+memcached_return memcached_connect(memcached_server_st *ptr)
{
memcached_return rc= MEMCACHED_NO_SERVERS;
LIBMEMCACHED_MEMCACHED_CONNECT_START();
- if (ptr->connected == ptr->number_of_hosts && ptr->number_of_hosts)
- return MEMCACHED_SUCCESS;
-
- if (ptr->hosts == NULL || ptr->number_of_hosts == 0)
- return MEMCACHED_NO_SERVERS;
-
/* We need to clean up the multi startup piece */
- switch (ptr->hosts[server_key].type)
+ switch (ptr->type)
{
case MEMCACHED_CONNECTION_UNKNOWN:
WATCHPOINT_ASSERT(0);
rc= MEMCACHED_NOT_SUPPORTED;
break;
case MEMCACHED_CONNECTION_UDP:
- rc= udp_connect(ptr, server_key);
+ rc= udp_connect(ptr);
break;
case MEMCACHED_CONNECTION_TCP:
- rc= tcp_connect(ptr, server_key);
+ rc= tcp_connect(ptr);
break;
case MEMCACHED_CONNECTION_UNIX_SOCKET:
- rc= unix_socket_connect(ptr, server_key);
+ rc= unix_socket_connect(ptr);
break;
+ default:
+ WATCHPOINT_ASSERT(0);
}
if (rc != MEMCACHED_SUCCESS)
to_write= (ptr->flags & MEM_BUFFER_REQUESTS) ? 0 : 1;
- rc= memcached_do(ptr, server_key, buffer, send_length, to_write);
+ rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, to_write);
if (rc != MEMCACHED_SUCCESS)
goto error;
}
else
{
- rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL, server_key);
+ rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (rc == MEMCACHED_DELETED)
rc= MEMCACHED_SUCCESS;
}
#include "common.h"
-memcached_return memcached_do(memcached_st *ptr, unsigned int server_key, char *command,
- size_t command_length, char with_flush)
+memcached_return memcached_do(memcached_server_st *ptr, char *command,
+ size_t command_length, uint8_t with_flush)
{
memcached_return rc;
ssize_t sent_length;
WATCHPOINT_ASSERT(command_length);
WATCHPOINT_ASSERT(command);
- if ((rc= memcached_connect(ptr, server_key)) != MEMCACHED_SUCCESS)
+ if ((rc= memcached_connect(ptr)) != MEMCACHED_SUCCESS)
return rc;
- sent_length= memcached_io_write(ptr, server_key, command, command_length, with_flush);
+ sent_length= memcached_io_write(ptr, command, command_length, with_flush);
if (sent_length == -1 || sent_length != command_length)
rc= MEMCACHED_WRITE_FAILURE;
else
- memcached_server_response_increment(ptr, server_key);
+ memcached_server_response_increment(ptr);
return rc;
}
#include "common.h"
#include "memcached_io.h"
-memcached_return value_fetch(memcached_st *ptr,
+memcached_return value_fetch(memcached_server_st *ptr,
char *buffer,
- memcached_result_st *result,
- unsigned int server_key)
+ memcached_result_st *result)
{
memcached_return rc= MEMCACHED_SUCCESS;
char *string_ptr;
*/
to_read= (value_length) + 2;
- read_length= memcached_io_read(ptr, server_key,
- value_ptr, to_read);
+ read_length= memcached_io_read(ptr, value_ptr, to_read);
if (read_length != (size_t)(value_length + 2))
{
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- if (memcached_server_response_count(ptr, ptr->cursor_server) == 0)
+ if (memcached_server_response_count(&ptr->hosts[ptr->cursor_server]) == 0)
{
ptr->cursor_server++;
continue;
}
- *error= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result_buffer, ptr->cursor_server);
+ *error= memcached_response(&ptr->hosts[ptr->cursor_server], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result_buffer);
if (*error == MEMCACHED_END) /* END means that we move on to the next */
{
- memcached_server_response_reset(ptr, ptr->cursor_server);
+ memcached_server_response_reset(&ptr->hosts[ptr->cursor_server]);
ptr->cursor_server++;
continue;
}
WATCHPOINT_ASSERT(result->value.is_allocated != MEMCACHED_USED);
+#ifdef UNUSED
if (ptr->flags & MEM_NO_BLOCK)
memcached_io_preread(ptr);
+#endif
while (ptr->cursor_server < ptr->number_of_hosts)
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- if (memcached_server_response_count(ptr, ptr->cursor_server) == 0)
+ if (memcached_server_response_count(&ptr->hosts[ptr->cursor_server]) == 0)
{
ptr->cursor_server++;
continue;
}
- *error= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result, ptr->cursor_server);
+ *error= memcached_response(&ptr->hosts[ptr->cursor_server], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result);
if (*error == MEMCACHED_END) /* END means that we move on to the next */
{
- memcached_server_response_reset(ptr, ptr->cursor_server);
+ memcached_server_response_reset(&ptr->hosts[ptr->cursor_server]);
ptr->cursor_server++;
continue;
}
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- if (memcached_server_response_count(ptr, ptr->cursor_server) == 0)
+ if (memcached_server_response_count(&ptr->hosts[ptr->cursor_server]) == 0)
{
ptr->cursor_server++;
continue;
}
- rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result, ptr->cursor_server);
+ rc= memcached_response(&ptr->hosts[ptr->cursor_server], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, result);
if (rc == MEMCACHED_END) /* END means that we move on to the next */
{
- memcached_server_response_reset(ptr, ptr->cursor_server);
+ memcached_server_response_reset(&ptr->hosts[ptr->cursor_server]);
ptr->cursor_server++;
continue;
}
send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
"flush_all\r\n");
- rc= memcached_do(ptr, x, buffer, send_length, 1);
+ rc= memcached_do(&ptr->hosts[x], buffer, send_length, 1);
if (rc == MEMCACHED_SUCCESS)
- (void)memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL, x);
+ (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
}
LIBMEMCACHED_MEMCACHED_FLUSH_END();
*/
for (x= 0; x < ptr->number_of_hosts; x++)
{
- if (memcached_server_response_count(ptr, x))
+ if (memcached_server_response_count(&ptr->hosts[x]))
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
if (ptr->flags & MEM_NO_BLOCK)
- (void)memcached_io_write(ptr, x, NULL, 0, 1);
+ (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1);
- while(memcached_server_response_count(ptr, x))
- (void)memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result, x);
+ while(memcached_server_response_count(&ptr->hosts[x]))
+ (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
}
}
else
server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
- if (memcached_server_response_count(ptr, server_key) == 0)
+ if (memcached_server_response_count(&ptr->hosts[server_key]) == 0)
{
- rc= memcached_connect(ptr, server_key);
+ rc= memcached_connect(&ptr->hosts[server_key]);
if (rc != MEMCACHED_SUCCESS)
continue;
- if ((memcached_io_write(ptr, server_key, get_command, get_command_length, 0)) == -1)
+ if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1)
{
rc= MEMCACHED_SOME_ERRORS;
continue;
}
WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
- memcached_server_response_increment(ptr, server_key);
+ memcached_server_response_increment(&ptr->hosts[server_key]);
WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1);
}
- if ((memcached_io_write(ptr, server_key, keys[x], key_length[x], 0)) == -1)
+ if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1)
{
- memcached_server_response_reset(ptr, server_key);
+ memcached_server_response_reset(&ptr->hosts[server_key]);
rc= MEMCACHED_SOME_ERRORS;
continue;
}
- if ((memcached_io_write(ptr, server_key, " ", 1, 0)) == -1)
+ if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1)
{
- memcached_server_response_reset(ptr, server_key);
+ memcached_server_response_reset(&ptr->hosts[server_key]);
rc= MEMCACHED_SOME_ERRORS;
continue;
}
*/
for (x= 0; x < ptr->number_of_hosts; x++)
{
- if (memcached_server_response_count(ptr, x))
+ if (memcached_server_response_count(&ptr->hosts[x]))
{
/* We need to do something about non-connnected hosts in the future */
- if ((memcached_io_write(ptr, x, "\r\n", 2, 1)) == -1)
+ if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1)
{
rc= MEMCACHED_SOME_ERRORS;
}
}
}
-static void host_reset(memcached_server_st *host, char *hostname, unsigned int port,
+static void host_reset(memcached_st *ptr, memcached_server_st *host,
+ char *hostname, unsigned int port,
memcached_connection type)
{
memset(host, 0, sizeof(memcached_server_st));
strncpy(host->hostname, hostname, MEMCACHED_MAX_HOST_LENGTH - 1);
+ host->root= ptr ? ptr : NULL;
host->port= port;
host->fd= -1;
host->type= type;
host->read_ptr= host->read_buffer;
- host->write_ptr= host->write_buffer;
host->sockaddr_inited= MEMCACHED_NOT_ALLOCATED;
}
for (x= 0; x < count; x++)
{
WATCHPOINT_ASSERT(list[x].hostname[0] != 0);
- host_reset(&ptr->hosts[ptr->number_of_hosts], list[x].hostname,
+ host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], list[x].hostname,
list[x].port, list[x].type);
ptr->number_of_hosts++;
}
ptr->hosts= new_host_list;
- host_reset(&ptr->hosts[ptr->number_of_hosts], hostname, port, type);
+ host_reset(ptr, &ptr->hosts[ptr->number_of_hosts], hostname, port, type);
ptr->number_of_hosts++;
ptr->hosts[0].count++;
return NULL;
}
- host_reset(&new_host_list[count-1], hostname, port, MEMCACHED_CONNECTION_TCP);
+ host_reset(NULL, &new_host_list[count-1], hostname, port, MEMCACHED_CONNECTION_TCP);
+
+ /* Backwards compatibility hack */
new_host_list[0].count++;
MEM_WRITE,
} memc_read_or_write;
-static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
- memcached_return *error);
+static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
-static memcached_return io_wait(memcached_st *ptr, unsigned int server_key,
+static memcached_return io_wait(memcached_server_st *ptr,
memc_read_or_write read_or_write)
{
struct pollfd fds[1];
flags= POLLIN | POLLERR;
memset(&fds, 0, sizeof(struct pollfd));
- fds[0].fd= ptr->hosts[server_key].fd;
+ fds[0].fd= ptr->fd;
fds[0].events= flags;
- error= poll(fds, 1, ptr->poll_timeout);
+ error= poll(fds, 1, ptr->root->poll_timeout);
if (error == 1)
return MEMCACHED_SUCCESS;
return MEMCACHED_TIMEOUT;
}
- WATCHPOINT;
/* Imposssible for anything other then -1 */
WATCHPOINT_ASSERT(error == -1);
- memcached_quit_server(ptr, server_key, 1);
+ memcached_quit_server(ptr, 1);
return MEMCACHED_FAILURE;
}
+#ifdef UNUSED
void memcached_io_preread(memcached_st *ptr)
{
unsigned int x;
}
}
}
+#endif
-ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
+ssize_t memcached_io_read(memcached_server_st *ptr,
char *buffer, size_t length)
{
char *buffer_ptr;
while (length)
{
uint8_t found_eof= 0;
- if (!ptr->hosts[server_key].read_buffer_length)
+ if (!ptr->read_buffer_length)
{
size_t data_read;
while (1)
{
- data_read= read(ptr->hosts[server_key].fd,
- ptr->hosts[server_key].read_buffer,
+ data_read= read(ptr->fd,
+ ptr->read_buffer,
MEMCACHED_MAX_BUFFER);
if (data_read == -1)
{
{
memcached_return rc;
- rc= io_wait(ptr, server_key, MEM_READ);
+ rc= io_wait(ptr, MEM_READ);
if (rc == MEMCACHED_SUCCESS)
continue;
- memcached_quit_server(ptr, server_key, 1);
+ memcached_quit_server(ptr, 1);
return -1;
}
default:
{
- memcached_quit_server(ptr, server_key, 1);
+ memcached_quit_server(ptr, 1);
ptr->cached_errno= errno;
return -1;
}
}
}
- ptr->hosts[server_key].read_data_length= data_read;
- ptr->hosts[server_key].read_buffer_length= data_read;
- ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
+ ptr->read_data_length= data_read;
+ ptr->read_buffer_length= data_read;
+ ptr->read_ptr= ptr->read_buffer;
}
if (length > 1)
{
size_t difference;
- difference= (length > ptr->hosts[server_key].read_buffer_length) ? ptr->hosts[server_key].read_buffer_length : length;
+ difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
- memcpy(buffer_ptr, ptr->hosts[server_key].read_ptr, difference);
+ memcpy(buffer_ptr, ptr->read_ptr, difference);
length -= difference;
- ptr->hosts[server_key].read_ptr+= difference;
- ptr->hosts[server_key].read_buffer_length-= difference;
+ ptr->read_ptr+= difference;
+ ptr->read_buffer_length-= difference;
buffer_ptr+= difference;
}
else
{
- *buffer_ptr= *ptr->hosts[server_key].read_ptr;
+ *buffer_ptr= *ptr->read_ptr;
length--;
- ptr->hosts[server_key].read_ptr++;
- ptr->hosts[server_key].read_buffer_length--;
+ ptr->read_ptr++;
+ ptr->read_buffer_length--;
buffer_ptr++;
}
return (size_t)(buffer_ptr - buffer);
}
-ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
- char *buffer, size_t length, char with_flush)
+ssize_t memcached_io_write(memcached_server_st *ptr,
+ char *buffer, size_t length, char with_flush)
{
unsigned long long x;
for (x= 0; x < length; x++)
{
- if (ptr->hosts[server_key].write_ptr == 0)
- ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
- WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
- *ptr->hosts[server_key].write_ptr= buffer[x];
- ptr->hosts[server_key].write_ptr++;
- ptr->hosts[server_key].write_buffer_offset++;
-
- if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
+ ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
+ ptr->write_buffer_offset++;
+ WATCHPOINT_ASSERT(ptr->write_buffer_offset <= MEMCACHED_MAX_BUFFER);
+
+ if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
{
memcached_return rc;
- size_t sent_length;
+ ssize_t sent_length;
- sent_length= io_flush(ptr, server_key, &rc);
+ sent_length= io_flush(ptr, &rc);
if (sent_length == -1)
return -1;
WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
- ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
- ptr->hosts[server_key].write_buffer_offset= 0;
}
}
if (with_flush)
{
memcached_return rc;
- if (io_flush(ptr, server_key, &rc) == -1)
+ if (io_flush(ptr, &rc) == -1)
return -1;
}
return length;
}
-memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
+memcached_return memcached_io_close(memcached_server_st *ptr)
{
- close(ptr->hosts[server_key].fd);
+ close(ptr->fd);
return MEMCACHED_SUCCESS;
}
-static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
+static ssize_t io_flush(memcached_server_st *ptr,
memcached_return *error)
{
size_t sent_length;
size_t return_length;
- char *write_ptr= ptr->hosts[server_key].write_buffer;
- size_t write_length= ptr->hosts[server_key].write_buffer_offset;
+ char *local_write_ptr= ptr->write_buffer;
+ size_t write_length= ptr->write_buffer_offset;
*error= MEMCACHED_SUCCESS;
- if (ptr->hosts[server_key].write_buffer_offset == 0)
+ if (ptr->write_buffer_offset == 0)
return 0;
+ /* Looking for memory overflows */
+ if (write_length == MEMCACHED_MAX_BUFFER)
+ WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
+ WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
return_length= 0;
while (write_length)
{
sent_length= 0;
- if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
+ if (ptr->type == MEMCACHED_CONNECTION_UDP)
{
- sent_length= sendto(ptr->hosts[server_key].fd,
- write_ptr, write_length, 0,
- (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
+ sent_length= sendto(ptr->fd, local_write_ptr, write_length, 0,
+ (struct sockaddr *)&ptr->address_info->ai_addr,
sizeof(struct sockaddr));
}
else
{
- if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
+ if ((ssize_t)(sent_length= write(ptr->fd, local_write_ptr,
write_length)) == -1)
{
+ WATCHPOINT_STRING("Error in write occurred");
switch (errno)
{
case ENOBUFS:
case EAGAIN:
{
memcached_return rc;
- rc= io_wait(ptr, server_key, MEM_WRITE);
+ rc= io_wait(ptr, MEM_WRITE);
if (rc == MEMCACHED_SUCCESS)
continue;
- memcached_quit_server(ptr, server_key, 1);
+ memcached_quit_server(ptr, 1);
return -1;
}
default:
- memcached_quit_server(ptr, server_key, 1);
+ memcached_quit_server(ptr, 1);
ptr->cached_errno= errno;
*error= MEMCACHED_ERRNO;
return -1;
}
}
- write_ptr+= sent_length;
+ local_write_ptr+= sent_length;
write_length-= sent_length;
return_length+= sent_length;
}
WATCHPOINT_ASSERT(write_length == 0);
- WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
- ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
- ptr->hosts[server_key].write_buffer_offset= 0;
+ WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset);
+ ptr->write_buffer_offset= 0;
return return_length;
}
/*
Eventually we will just kill off the server with the problem.
*/
-void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
+void memcached_io_reset(memcached_server_st *ptr)
{
- ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
- ptr->hosts[server_key].write_buffer_offset= 0;
- memcached_quit(ptr);
+ ptr->write_buffer_offset= 0;
+ memcached_quit_server(ptr, 0);
}
/* Server IO, Not public! */
#include <memcached.h>
-ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
+ssize_t memcached_io_write(memcached_server_st *ptr,
char *buffer, size_t length, char with_flush);
-void memcached_io_reset(memcached_st *ptr, unsigned int server_key);
-ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
+void memcached_io_reset(memcached_server_st *ptr);
+ssize_t memcached_io_read(memcached_server_st *ptr,
char *buffer, size_t length);
-memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key);
-void memcached_io_preread(memcached_st *ptr);
+memcached_return memcached_io_close(memcached_server_st *ptr);
will force data to be completed.
*/
-void memcached_quit_server(memcached_st *ptr, unsigned int server_key, uint8_t io_death)
+void memcached_quit_server(memcached_server_st *ptr, uint8_t io_death)
{
- if (server_key > ptr->number_of_hosts)
- {
- WATCHPOINT_ASSERT(0);
- return;
- }
-
- if (ptr->hosts[server_key].fd != -1)
+ if (ptr->fd != -1)
{
if (io_death == 0)
{
memcached_return rc;
- rc= memcached_do(ptr, server_key, "quit\r\n", 6, 1);
+ rc= memcached_do(ptr, "quit\r\n", 6, 1);
WATCHPOINT_ASSERT(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_FETCH_NOTFINISHED);
- memcached_io_close(ptr, server_key);
+ memcached_io_close(ptr);
}
- ptr->hosts[server_key].fd= -1;
- ptr->hosts[server_key].write_buffer_offset= 0;
- ptr->hosts[server_key].read_buffer_length= 0;
- ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
- ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
- memcached_server_response_reset(ptr, server_key);
+ ptr->fd= -1;
+ ptr->write_buffer_offset= 0;
+ ptr->read_buffer_length= 0;
+ ptr->read_ptr= ptr->read_buffer;
+ memcached_server_response_reset(ptr);
}
-
- ptr->connected--;
}
void memcached_quit(memcached_st *ptr)
if (ptr->hosts && ptr->number_of_hosts)
{
for (x= 0; x < ptr->number_of_hosts; x++)
- memcached_quit_server(ptr, x, 0);
+ memcached_quit_server(&ptr->hosts[x], 0);
}
-
- ptr->connected= 0;
}
#include "common.h"
#include "memcached_io.h"
-memcached_return memcached_response(memcached_st *ptr,
+memcached_return memcached_response(memcached_server_st *ptr,
char *buffer, size_t buffer_length,
- memcached_result_st *result,
- unsigned int server_key)
+ memcached_result_st *result)
{
unsigned int x;
size_t send_length;
send_length= 0;
/* We may have old commands in the buffer not set, first purge */
- if (ptr->flags & MEM_NO_BLOCK)
- (void)memcached_io_write(ptr, server_key, NULL, 0, 1);
+ if (ptr->root->flags & MEM_NO_BLOCK)
+ (void)memcached_io_write(ptr, NULL, 0, 1);
- max_messages= memcached_server_response_count(ptr, server_key);
+ max_messages= memcached_server_response_count(ptr);
for (x= 0; x < max_messages; x++)
{
size_t total_length= 0;
{
unsigned int read_length;
- read_length= memcached_io_read(ptr, server_key,
- buffer_ptr, 1);
+ read_length= memcached_io_read(ptr, buffer_ptr, 1);
if (read_length != 1)
return MEMCACHED_UNKNOWN_READ_FAILURE;
buffer_ptr++;
*buffer_ptr= 0;
- memcached_server_response_decrement(ptr, server_key);
+ memcached_server_response_decrement(ptr);
}
switch(buffer[0])
memcached_return rc;
/* We add back in one because we will need to search for END */
- memcached_server_response_increment(ptr, server_key);
+ memcached_server_response_increment(ptr);
if (result)
- rc= value_fetch(ptr, buffer, result, server_key);
+ rc= value_fetch(ptr, buffer, result);
else
- rc= value_fetch(ptr, buffer, &ptr->result, server_key);
+ rc= value_fetch(ptr, buffer, &ptr->root->result);
return rc;
}
{
if (buffer[2] == 'A') /* STORED STATS */
{
- memcached_server_response_increment(ptr, server_key);
+ memcached_server_response_increment(ptr);
return MEMCACHED_STAT;
}
else if (buffer[1] == 'E')
if (send_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
return MEMCACHED_WRITE_FAILURE;
- rc= memcached_do(ptr, server_key, buffer, send_length, 1);
+ rc= memcached_do(&ptr->hosts[server_key], buffer, send_length, 1);
if (rc != MEMCACHED_SUCCESS)
goto error;
while (1)
{
- rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL, server_key);
+ rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (rc == MEMCACHED_STAT)
{
goto error;
}
- rc= memcached_do(ptr, server_key, buffer, write_length, 0);
+ rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
if (rc != MEMCACHED_SUCCESS)
goto error;
- if ((sent_length= memcached_io_write(ptr, server_key, value, value_length, 0)) == -1)
+ if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
{
rc= MEMCACHED_WRITE_FAILURE;
goto error;
else
to_write= 1;
- if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2, to_write)) == -1)
+ if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
{
rc= MEMCACHED_WRITE_FAILURE;
goto error;
if (to_write == 0)
return MEMCACHED_BUFFERED;
- rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL, server_key);
+ rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (rc == MEMCACHED_STORED)
return MEMCACHED_SUCCESS;
return rc;
error:
- memcached_io_reset(ptr, server_key);
+ memcached_io_reset(&ptr->hosts[server_key]);
return rc;
}
{
memcached_return rrc;
- rrc= memcached_do(ptr, x, buffer, send_length, 1);
+ rrc= memcached_do(&ptr->hosts[x], buffer, send_length, 1);
if (rrc != MEMCACHED_SUCCESS)
{
rc= MEMCACHED_SOME_ERRORS;
continue;
}
- rrc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL, x);
+ rrc= memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (rrc != MEMCACHED_SUCCESS)
rc= MEMCACHED_SOME_ERRORS;
}
{
memcached_return rrc;
- rrc= memcached_do(ptr, x, command, send_length, 1);
+ rrc= memcached_do(&ptr->hosts[x], command, send_length, 1);
if (rrc != MEMCACHED_SUCCESS)
{
rc= MEMCACHED_SOME_ERRORS;
continue;
}
- rrc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL, x);
+ rrc= memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (rrc != MEMCACHED_SUCCESS)
rc= MEMCACHED_SOME_ERRORS;