MEMCACHED_PARTIAL_READ,
MEMCACHED_SOME_ERRORS,
MEMCACHED_NO_SERVERS,
+ MEMCACHED_END,
+ MEMCACHED_DELETED,
+ MEMCACHED_VALUE,
MEMCACHED_MAXIMUM_RETURN, /* Always add new error code before */
} memcached_return;
memcached_return memcached_server_add(memcached_st *ptr, char *hostname,
unsigned int port);
+memcached_server_st *memcached_server_list_append(memcached_server_st *ptr,
+ char *hostname, unsigned int port,
+ memcached_return *error);
+void memcached_server_list_free(memcached_server_st *ptr);
+memcached_return memcached_server_push(memcached_st *ptr, memcached_server_st *list);
+unsigned int memcached_server_list_count(memcached_server_st *ptr);
+
/* These are all private, do not use. */
memcached_return memcached_connect(memcached_st *ptr);
memcached_return memcached_response(memcached_st *ptr,
/* Some personal debugging functions */
#define WATCHPOINT printf("WATCHPOINT %s:%d\n", __FILE__, __LINE__);fflush(stdout);
#define WATCHPOINT_ERROR(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, memcached_strerror(NULL, A));fflush(stdout);
+#define WATCHPOINT_STRING(A) printf("WATCHPOINT %s:%d %s\n", __FILE__, __LINE__, A);fflush(stdout);
+#define WATCHPOINT_NUMBER(A) printf("WATCHPOINT %s:%d %d\n", __FILE__, __LINE__, A);fflush(stdout);
#ifdef __cplusplus
memcached_quit.c \
memcached_flush.c \
memcached_string.c \
+ memcached_hosts.c \
memcached_stats.c
libmemcached_la_LIBADD =
void memcached_deinit(memcached_st *ptr)
{
- unsigned int x;
-
if (ptr->hosts)
{
- for (x= 0; x < ptr->number_of_hosts; x++)
- {
- if (ptr->hosts[x].fd > 0)
- close(ptr->hosts[x].fd);
-
- free(ptr->hosts[x].hostname);
- }
-
- free(ptr->hosts);
+ memcached_server_list_free(ptr->hosts);
+ ptr->hosts= NULL;
}
if (ptr->is_allocated == MEMCACHED_ALLOCATED)
#include "common.h"
-memcached_return memcached_server_add(memcached_st *ptr, char *hostname, unsigned int port)
-{
- memcached_server_st *new_host_list;
- char *new_hostname;
- LIBMEMCACHED_MEMCACHED_SERVER_ADD_START();
-
- if (!port)
- port= MEMCACHED_DEFAULT_PORT;
-
- if (!hostname)
- hostname= "localhost";
-
-
- new_host_list= (memcached_server_st *)realloc(ptr->hosts, sizeof(memcached_server_st) * (ptr->number_of_hosts+1));
- if (!new_host_list)
- return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
- memset(&new_host_list[ptr->number_of_hosts], 0, sizeof(memcached_server_st));
-
- if (!new_host_list)
- return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
-
- ptr->hosts= new_host_list;
-
- new_hostname=
- (char *)malloc(sizeof(char) * (strlen(hostname)+1));
- if (!new_hostname)
- return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
-
- memset(new_hostname, 0, strlen(hostname)+1);
- memcpy(new_hostname, hostname, strlen(hostname));
- ptr->hosts[ptr->number_of_hosts].hostname= new_hostname;
- ptr->hosts[ptr->number_of_hosts].port= port;
- ptr->hosts[ptr->number_of_hosts].fd= -1;
- ptr->number_of_hosts++;
-
- LIBMEMCACHED_MEMCACHED_SERVER_ADD_END();
-
- return MEMCACHED_SUCCESS;
-}
-
memcached_return memcached_connect(memcached_st *ptr)
{
unsigned int x;
return MEMCACHED_SUCCESS;
if (!ptr->hosts)
- {
return MEMCACHED_NO_SERVERS;
- }
for (x= 0; x < ptr->number_of_hosts; x++)
{
rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
LIBMEMCACHED_MEMCACHED_DELETE_END();
+ if (rc == MEMCACHED_DELETED)
+ rc= MEMCACHED_SUCCESS;
+
return rc;
}
return value;
}
}
+ else if (*error == MEMCACHED_END)
+ *error= MEMCACHED_NOTFOUND;
return NULL;
read_error:
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
unsigned int server_key;
char *value;
- memcached_return rc;
LIBMEMCACHED_MEMCACHED_GET_START();
*value_length= 0;
*error= memcached_connect(ptr);
if (*error != MEMCACHED_SUCCESS)
- return NULL;
+ goto error;
server_key= memcached_generate_hash(key, key_length) % ptr->number_of_hosts;
send_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, "get %.*s\r\n",
(int)key_length, key);
- if (*error != MEMCACHED_SUCCESS)
- return NULL;
-
if ((send(ptr->hosts[server_key].fd, buffer, send_length, 0) == -1))
{
*error= MEMCACHED_WRITE_FAILURE;
- return NULL;
+ goto error;
}
value= memcached_value_fetch(ptr, key, &key_length, value_length, flags,
error, 0, server_key);
- /* We need to read END */
- rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
- if (rc != MEMCACHED_NOTFOUND)
+ if (*error == MEMCACHED_END && *value_length == 0)
+ {
+ *error= MEMCACHED_NOTFOUND;
+ goto error;
+ }
+ else if (*error == MEMCACHED_SUCCESS)
{
- free(value);
- *value_length= 0;
- *error= MEMCACHED_PROTOCOL_ERROR;
+ memcached_return rc;
+ /* We need to read END */
+ rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
+
+ if (rc != MEMCACHED_END)
+ {
+ *error= MEMCACHED_PROTOCOL_ERROR;
+ goto error;
+ }
}
+ else
+ goto error;
+
LIBMEMCACHED_MEMCACHED_GET_END();
return value;
+
+error:
+ free(value);
+ *value_length= 0;
+
+ LIBMEMCACHED_MEMCACHED_GET_END();
+
+ return NULL;
}
memcached_return memcached_mget(memcached_st *ptr,
return MEMCACHED_UNKNOWN_READ_FAILURE;
}
case 'D': /* DELETED */
- return MEMCACHED_SUCCESS;
+ return MEMCACHED_DELETED;
case 'N': /* NOT_FOUND */
{
if (buffer[4] == 'F')
case 'E': /* PROTOCOL ERROR or END */
{
if (buffer[1] == 'N')
- return MEMCACHED_NOTFOUND;
+ return MEMCACHED_END;
else if (buffer[1] == 'R')
return MEMCACHED_PROTOCOL_ERROR;
else
return "SOME ERRORS WERE REPORTED";
case MEMCACHED_NO_SERVERS:
return "NO SERVERS DEFINED";
+ case MEMCACHED_END:
+ return "SERVER END";
+ case MEMCACHED_DELETED:
+ return "SERVER DELETE";
+ case MEMCACHED_VALUE:
+ return "SERVER VALUE";
case MEMCACHED_MAXIMUM_RETURN:
return "Gibberish returned!";
default:
#include <fcntl.h>
#include <sys/time.h>
#include <getopt.h>
+#include <pthread.h>
#include <memcached.h>
#include "utilities.h"
#include "generator.h"
+/* Global Thread counter */
+unsigned int thread_counter;
+pthread_mutex_t counter_mutex;
+pthread_cond_t count_threshhold;
+unsigned int master_wakeup;
+pthread_mutex_t sleeper_mutex;
+pthread_cond_t sleep_threshhold;
+
+void *run_task(void *p);
/* Types */
typedef struct conclusions_st conclusions_st;
+typedef struct thread_context_st thread_context_st;
+typedef enum {
+ AC_SET,
+ AC_GET,
+} run_action;
+
+struct thread_context_st {
+ unsigned int x;
+ pairs_st *pairs;
+ run_action action;
+};
struct conclusions_st {
long int load_time;
/* Prototypes */
void options_parse(int argc, char *argv[]);
void conclusions_print(conclusions_st *conclusion);
+void scheduler(conclusions_st *conclusion);
static int opt_verbose= 0;
static unsigned int opt_default_pairs= 100;
+static unsigned int opt_concurrency= 1;
static int opt_displayflag= 0;
static char *opt_servers= NULL;
unsigned int x;
memcached_return rc;
memcached_st *memc;
- struct timeval start_time, end_time;
pairs_st *pairs;
conclusions_st conclusion;
pairs= pairs_generate(opt_default_pairs);
+ pthread_mutex_init(&counter_mutex, NULL);
+ pthread_cond_init(&count_threshhold, NULL);
+ pthread_mutex_init(&sleeper_mutex, NULL);
+ pthread_cond_init(&sleep_threshhold, NULL);
- gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
- {
- rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
- pairs[x].value, pairs[x].value_length,
- 0, 0);
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on insert of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_loaded++;
- }
- gettimeofday(&end_time, NULL);
- conclusion.load_time= timedif(end_time, start_time);
-
- gettimeofday(&start_time, NULL);
- for (x= 0; x < opt_default_pairs; x++)
- {
- char *value;
- size_t value_length;
- uint16_t flags;
-
- value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
- &value_length,
- &flags, &rc);
-
- if (rc != MEMCACHED_SUCCESS)
- fprintf(stderr, "Failured on read of %.*s\n",
- (unsigned int)pairs[x].key_length, pairs[x].key);
- conclusion.rows_read++;
- free(value);
- }
- gettimeofday(&end_time, NULL);
- conclusion.read_time= timedif(end_time, start_time);
+ scheduler(&conclusion);
pairs_free(pairs);
memcached_deinit(memc);
+ (void)pthread_mutex_init(&counter_mutex, NULL);
+ (void)pthread_cond_init(&count_threshhold, NULL);
+ (void)pthread_mutex_init(&sleeper_mutex, NULL);
+ (void)pthread_cond_init(&sleep_threshhold, NULL);
conclusions_print(&conclusion);
return 0;
}
+void scheduler(conclusions_st *conclusion)
+{
+ unsigned int x;
+ struct timeval start_time, end_time;
+ pthread_t mainthread; /* Thread descriptor */
+ pthread_attr_t attr; /* Thread attributes */
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr,
+ PTHREAD_CREATE_DETACHED);
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter= 0;
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 1;
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ for (x= 0; x < opt_concurrency; x++)
+ {
+ thread_context_st *context;
+ context= (thread_context_st *)malloc(sizeof(thread_context_st));
+
+ /* now you create the thread */
+ if (pthread_create(&mainthread, &attr, run_task,
+ (void *)context) != 0)
+ {
+ fprintf(stderr,"Could not create thread\n");
+ exit(1);
+ }
+ thread_counter++;
+ }
+
+ pthread_mutex_unlock(&counter_mutex);
+ pthread_attr_destroy(&attr);
+
+ pthread_mutex_lock(&sleeper_mutex);
+ master_wakeup= 0;
+ pthread_mutex_unlock(&sleeper_mutex);
+ pthread_cond_broadcast(&sleep_threshhold);
+
+ gettimeofday(&start_time, NULL);
+ /*
+ We loop until we know that all children have cleaned up.
+ */
+ pthread_mutex_lock(&counter_mutex);
+ while (thread_counter)
+ {
+ struct timespec abstime;
+
+ memset(&abstime, 0, sizeof(struct timespec));
+ abstime.tv_sec= 1;
+
+ pthread_cond_timedwait(&count_threshhold, &counter_mutex, &abstime);
+ }
+ pthread_mutex_unlock(&counter_mutex);
+
+ gettimeofday(&end_time, NULL);
+
+ conclusion->load_time= timedif(end_time, start_time);
+ conclusion->read_time= timedif(end_time, start_time);
+}
+
void options_parse(int argc, char *argv[])
{
static struct option long_options[]=
printf("\tTook %ld.%03ld seconds to read data\n", conclusion->read_time / 1000,
conclusion->read_time % 1000);
}
+
+void *run_task(void *p)
+{
+ unsigned int x;
+ thread_context_st *context= (thread_context_st *)p;
+
+ pthread_mutex_lock(&sleeper_mutex);
+ while (master_wakeup)
+ {
+ pthread_cond_wait(&sleep_threshhold, &sleeper_mutex);
+ }
+ pthread_mutex_unlock(&sleeper_mutex);
+
+ /* Do Stuff */
+
+ switch (context->action)
+ {
+ case AC_SET:
+ for (x= 0; x < opt_default_pairs; x++)
+ {
+ rc= memcached_set(memc, pairs[x].key, pairs[x].key_length,
+ pairs[x].value, pairs[x].value_length,
+ 0, 0);
+ if (rc != MEMCACHED_SUCCESS)
+ fprintf(stderr, "Failured on insert of %.*s\n",
+ (unsigned int)pairs[x].key_length, pairs[x].key);
+ conclusion->rows_loaded++;
+ }
+ break;
+ case AC_GET:
+ for (x= 0; x < opt_default_pairs; x++)
+ {
+ char *value;
+ size_t value_length;
+ uint16_t flags;
+
+ value= memcached_get(memc, pairs[x].key, pairs[x].key_length,
+ &value_length,
+ &flags, &rc);
+
+ if (rc != MEMCACHED_SUCCESS)
+ fprintf(stderr, "Failured on read of %.*s\n",
+ (unsigned int)pairs[x].key_length, pairs[x].key);
+ conclusion->rows_read++;
+ free(value);
+ }
+ break;
+ }
+
+ pthread_mutex_lock(&counter_mutex);
+ thread_counter--;
+ pthread_cond_signal(&count_threshhold);
+ pthread_mutex_unlock(&counter_mutex);
+
+ free(context);
+}
Error 17 -> PARTIAL READ
Error 18 -> SOME ERRORS WERE REPORTED
Error 19 -> NO SERVERS DEFINED
+Error 20 -> SERVER END
+Error 21 -> SERVER DELETE
+Error 22 -> SERVER VALUE
Found key pid
Found key uptime
Found key time
memcached_deinit(memc);
}
+void add_host_test(void)
+{
+ unsigned int x;
+ memcached_st *memc;
+ memcached_server_st *servers;
+ memcached_return rc;
+ char servername[]= "0.example.com";
+
+ memc= memcached_init(NULL);
+ assert(memc);
+ rc= memcached_server_add(memc, "localhost", 0);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ servers= memcached_server_list_append(NULL, servername, 400, &rc);
+ assert(servers);
+ assert(1 == memcached_server_list_count(servers));
+
+ for (x= 2; x < 20; x++)
+ {
+ char buffer[SMALL_STRING_LEN];
+
+ snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x);
+ servers= memcached_server_list_append(servers, buffer, 401,
+ &rc);
+ assert(rc == MEMCACHED_SUCCESS);
+ assert(x == memcached_server_list_count(servers));
+ }
+
+ rc= memcached_server_push(memc, servers);
+ assert(rc == MEMCACHED_SUCCESS);
+ rc= memcached_server_push(memc, servers);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ memcached_server_list_free(servers);
+ memcached_deinit(memc);
+}
+
+void add_host_test1(void)
+{
+ unsigned int x;
+ memcached_st *memc;
+ memcached_server_st *servers;
+ memcached_return rc;
+ char servername[]= "0.example.com";
+
+ memc= memcached_init(NULL);
+ assert(memc);
+
+ servers= memcached_server_list_append(NULL, servername, 400, &rc);
+ assert(servers);
+ assert(1 == memcached_server_list_count(servers));
+
+ for (x= 2; x < 20; x++)
+ {
+ char buffer[SMALL_STRING_LEN];
+
+ snprintf(buffer, SMALL_STRING_LEN, "%u.example.com", 400+x);
+ servers= memcached_server_list_append(servers, buffer, 401,
+ &rc);
+ assert(rc == MEMCACHED_SUCCESS);
+ assert(x == memcached_server_list_count(servers));
+ }
+
+ rc= memcached_server_push(memc, servers);
+ assert(rc == MEMCACHED_SUCCESS);
+ rc= memcached_server_push(memc, servers);
+ assert(rc == MEMCACHED_SUCCESS);
+
+ memcached_server_list_free(servers);
+ memcached_deinit(memc);
+}
+
int main(int argc, char *argv[])
{
quit_test();
mget_test();
get_stats();
+ add_host_test();
/* The multiple tests */
if (argc == 2)