on concurrent insert with non-blocking IO.
A bug was also uncovered where the read bufffers for multiple hosts were not
being read (all code has been refactored for that now).
One user contributed test case has been added.
MEMCACHED_DELETED,
MEMCACHED_VALUE,
MEMCACHED_STAT,
+ MEMCACHED_ERRNO,
MEMCACHED_MAXIMUM_RETURN, /* Always add new error code before */
} memcached_return;
char *hostname;
unsigned int port;
int fd;
+ unsigned int stack_responses;
};
struct memcached_stat_st {
size_t write_buffer_offset;
char connected;
int my_errno;
- unsigned int stack_responses;
unsigned long long flags;
memcached_return warning; /* Future Use */
};
#define memcached_server_name(A,B) B.hostname
#define memcached_server_port(A,B) B.port
#define memcached_server_list(A) A->hosts
+#define memcached_server_response_increment(A,B) A->hosts[B].stack_responses++
+#define memcached_server_response_decrement(A,B) A->hosts[B].stack_responses--
+#define memcached_server_response_count(A,B) A->hosts[B].stack_responses
memcached_return memcached_server_add(memcached_st *ptr, char *hostname,
unsigned int port);
char *buffer, size_t buffer_length,
unsigned int server_key);
unsigned int memcached_generate_hash(memcached_st *ptr, char *key, size_t key_length);
+void memcached_quit_server(memcached_st *ptr, unsigned int server_key);
#endif /* __COMMON_H__ */
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
+#include <netdb.h>
memcached_return memcached_real_connect(memcached_st *ptr, unsigned int server_key)
{
if (ptr->hosts[server_key].fd == -1)
{
if ((h= gethostbyname(ptr->hosts[server_key].hostname)) == NULL)
+ {
+ ptr->my_errno= h_errno;
return MEMCACHED_HOST_LOCKUP_FAILURE;
+ }
servAddr.sin_family= h->h_addrtype;
memcpy((char *) &servAddr.sin_addr.s_addr, h->h_addr_list[0], h->h_length);
{
switch (errno) {
/* We are spinning waiting on connect */
+ case EALREADY:
case EINPROGRESS:
case EINTR:
goto test_connect;
break;
default:
ptr->my_errno= errno;
- return MEMCACHED_HOST_LOCKUP_FAILURE;
+ return MEMCACHED_ERRNO;
}
ptr->connected++;
}
}
if (with_flush)
- memcached_io_flush(ptr, server_key);
+ {
+ if (memcached_io_flush(ptr, server_key) == -1)
+ return -1;
+ }
return length;
}
ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
{
size_t sent_length;
+ char *write_ptr= ptr->write_buffer;
+ size_t write_length= ptr->write_buffer_offset;
+ unsigned int loop= 1;
if (ptr->write_buffer_offset == 0)
return 0;
- if (ptr->flags & MEM_NO_BLOCK)
+ while (write_length)
{
- struct timeval local_tv;
- fd_set set;
+ if (ptr->flags & MEM_NO_BLOCK)
+ {
- local_tv.tv_sec= 0;
- local_tv.tv_usec= 300;
+ while (1)
+ {
+ struct timeval local_tv;
+ fd_set set;
+ int select_return;
- FD_ZERO(&set);
- FD_SET(ptr->hosts[server_key].fd, &set);
+ local_tv.tv_sec= 0;
+ local_tv.tv_usec= 300 * loop;
- select(1, NULL, &set, NULL, &local_tv);
- }
- if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer,
- ptr->write_buffer_offset, 0)) == -1)
- {
- return -1;
- }
+ FD_ZERO(&set);
+ FD_SET(ptr->hosts[server_key].fd, &set);
+
+ select_return= select(1, NULL, &set, NULL, &local_tv);
+
+ if (select_return == -1)
+ {
+ ptr->my_errno= errno;
+ return -1;
+ }
+ else if (!select_return)
+ break;
+ }
+ }
- assert(sent_length == ptr->write_buffer_offset);
+ sent_length= 0;
+ if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr,
+ write_length, 0)) == -1)
+ {
+ switch (errno)
+ {
+ case ENOBUFS:
+ case EAGAIN:
+ if (loop < 10)
+ {
+ loop++;
+ break;
+ }
+ /* Yes, we want to fall through */
+ default:
+ ptr->my_errno= errno;
+ return -1;
+ }
+ }
+ else
+ {
+ write_ptr+= sent_length;
+ write_length-= sent_length;
+ }
+ }
ptr->write_buffer_offset= 0;
The reason we send "quit" is that in case we have buffered IO, this
will force data to be completed.
*/
+
+void memcached_quit_server(memcached_st *ptr, unsigned int server_key)
+{
+ if (ptr->hosts[server_key].fd != -1)
+ {
+ if (ptr->flags & MEM_NO_BLOCK)
+ memcached_io_write(ptr, server_key, "quit\r\n", 6, 1);
+ close(ptr->hosts[server_key].fd);
+ ptr->hosts[server_key].fd= -1;
+ ptr->hosts[server_key].stack_responses= 0;
+ }
+
+ ptr->connected--;
+}
+
void memcached_quit(memcached_st *ptr)
{
unsigned int x;
if (ptr->hosts)
{
for (x= 0; x < ptr->number_of_hosts; x++)
- {
- if (ptr->hosts[x].fd != -1)
- {
- if (ptr->flags & MEM_NO_BLOCK)
- memcached_io_write(ptr, x, "quit\r\n", 6, 1);
- close(ptr->hosts[x].fd);
- ptr->hosts[x].fd= -1;
- }
- }
+ memcached_quit_server(ptr, x);
}
ptr->connected= 0;
unsigned int x;
size_t send_length;
char *buffer_ptr;
+ unsigned int max_messages;
+
memset(buffer, 0, buffer_length);
send_length= 0;
- for (x= 0; x <= ptr->stack_responses; x++)
+ max_messages= memcached_server_response_count(ptr, server_key);
+ for (x= 0; x <= max_messages; x++)
{
buffer_ptr= buffer;
+
while (1)
{
unsigned int read_length;
else
buffer_ptr++;
}
+
+ if (memcached_server_response_count(ptr, server_key))
+ memcached_server_response_decrement(ptr, server_key);
}
- ptr->stack_responses= 0;
switch(buffer[0])
{
memset(buffer, 0, MEMCACHED_DEFAULT_COMMAND_SIZE);
- /* Leaveing this assert in since only a library fubar could blow this */
+ /* Leaving this assert in since only a library fubar could blow this */
+ if (ptr->write_buffer_offset != 0)
+ {
+ WATCHPOINT_NUMBER(ptr->write_buffer_offset);
+ }
+
assert(ptr->write_buffer_offset == 0);
server_key= memcached_generate_hash(ptr, key, key_length);
if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2, 1)) == -1)
{
+ memcached_quit_server(ptr, server_key);
rc= MEMCACHED_WRITE_FAILURE;
goto error;
}
if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP)
{
rc= MEMCACHED_SUCCESS;
- ptr->stack_responses++;
+ memcached_server_response_increment(ptr, server_key);
}
else
{
return "SERVER VALUE";
case MEMCACHED_STAT:
return "STAT VALUE";
+ case MEMCACHED_ERRNO:
+ return "UNKOWN ERROR SEE MY_ERRNO";
case MEMCACHED_MAXIMUM_RETURN:
return "Gibberish returned!";
default:
assert(rc == MEMCACHED_SUCCESS);
for (ptr= list; *ptr; ptr++)
printf("Found key %s\n", *ptr);
+ fflush(stdout);
free(list);
}
assert(value == 0);
}
+/* Test case provided by Cal Haldenbrand */
+void user_supplied_bug1(memcached_st *memc)
+{
+ unsigned int setter= 1;
+ unsigned int x;
+
+ long total= 0;
+ int size= 0;
+ srand(time(NULL));
+ char key[10];
+ char *randomstuff = (char *)malloc(6 * 1024);
+ memset(randomstuff, 0, 6 * 1024);
+
+ memcached_return rc;
+
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NO_BLOCK, &setter);
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_TCP_NODELAY, &setter);
+
+
+ /* add key */
+ for (x= 0 ; total < 20 * 1024576 ; x++ )
+ {
+ unsigned int j= 0;
+
+ size= (rand() % ( 5 * 1024 ) ) + 400;
+ memset(randomstuff, 0, 6 * 1024);
+ assert(size < 6 * 1024); /* Being safe here */
+
+ for (j= 0 ; j < size ;j++)
+ randomstuff[j] = (char) (rand() % 26) + 97;
+
+ total += size;
+ sprintf(key, "%d", x);
+ rc = memcached_set(memc, key, strlen(key),
+ randomstuff, strlen(randomstuff), 10, 0);
+ /* If we fail, lets try again */
+ if (rc != MEMCACHED_SUCCESS)
+ rc = memcached_set(memc, key, strlen(key),
+ randomstuff, strlen(randomstuff), 10, 0);
+ assert(rc == MEMCACHED_SUCCESS);
+ }
+}
void add_host_test1(memcached_st *memc)
{
unsigned int x;
server_list= "localhost";
printf("servers %s\n", server_list);
+ srandom(time(NULL));
servers= memcached_servers_parse(server_list);
assert(servers);
{0, 0, 0}
};
+ test_st user_tests[] ={
+ {"user_supplied_bug1", 0, user_supplied_bug1 },
+ {0, 0, 0}
+ };
+
fprintf(stderr, "\nBlock tests\n\n");
for (x= 0; tests[x].function_name; x++)
{
memcached_free(memc);
}
+ fprintf(stderr, "\nUser Supplied tests\n\n");
+ for (x= 0; user_tests[x].function_name; x++)
+ {
+ memcached_st *memc;
+ memcached_return rc;
+ memc= memcached_create(NULL);
+ assert(memc);
+
+ rc= memcached_server_push(memc, servers);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ fprintf(stderr, "Testing %s", user_tests[x].function_name);
+ user_tests[x].function(memc);
+ fprintf(stderr, "\t\t\t\t\t[ ok ]\n");
+ assert(memc);
+ memcached_free(memc);
+ }
+
/* Clean up whatever we might have left */
{
memcached_st *memc;
flush_test(memc);
memcached_free(memc);
}
+
+ fprintf(stderr, "All tests completed successfully\n\n");
+
return 0;
}