From 613d2abe710af39d6daf34673462cd467ed6499a Mon Sep 17 00:00:00 2001 From: Xiaoyun Mao Date: Mon, 18 Jan 2010 19:38:53 -0800 Subject: [PATCH] 1.Fix bug that memslap can't start up(ms_thread=NULL). 2.ignore signal SIGPIPE. 3.support reconnection with thousands of connections. 4.enhance binary protocol to support UDP. 5.synchronize all the threads at the beginning. 6.merge with latest trunk. --- clients/memslap.c | 27 +++++++++++---- clients/ms_conn.c | 79 ++++++++++---------------------------------- clients/ms_conn.h | 6 ++-- clients/ms_memslap.h | 1 + clients/ms_setting.c | 36 +++++--------------- clients/ms_sigsegv.c | 29 ++-------------- clients/ms_task.c | 40 +++++++++++----------- clients/ms_thread.c | 8 ++++- 8 files changed, 78 insertions(+), 148 deletions(-) diff --git a/clients/memslap.c b/clients/memslap.c index 4b342e76..006d3c2b 100644 --- a/clients/memslap.c +++ b/clients/memslap.c @@ -126,6 +126,10 @@ static void ms_sync_lock_init() pthread_mutex_init(&ms_global.init_lock.lock, NULL); pthread_cond_init(&ms_global.init_lock.cond, NULL); + ms_global.warmup_lock.count = 0; + pthread_mutex_init(&ms_global.warmup_lock.lock, NULL); + pthread_cond_init(&ms_global.warmup_lock.cond, NULL); + ms_global.run_lock.count= 0; pthread_mutex_init(&ms_global.run_lock.lock, NULL); pthread_cond_init(&ms_global.run_lock.cond, NULL); @@ -141,6 +145,9 @@ static void ms_sync_lock_destroy() pthread_mutex_destroy(&ms_global.init_lock.lock); pthread_cond_destroy(&ms_global.init_lock.cond); + pthread_mutex_destroy(&ms_global.warmup_lock.lock); + pthread_cond_destroy(&ms_global.warmup_lock.cond); + pthread_mutex_destroy(&ms_global.run_lock.lock); pthread_cond_destroy(&ms_global.run_lock.cond); @@ -771,19 +778,25 @@ static void ms_monitor_slap_mode() int second= 0; struct timeval start_time, end_time; + /* Wait all the threads complete initialization. */ + pthread_mutex_lock(&ms_global.init_lock.lock); + while (ms_global.init_lock.count < ms_setting.nthreads) + { + pthread_cond_wait(&ms_global.init_lock.cond, + &ms_global.init_lock.lock); + } + pthread_mutex_unlock(&ms_global.init_lock.lock); + /* only when there is no set operation it need warm up */ if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR) { /* Wait all the connects complete warm up. */ - pthread_mutex_lock(&ms_global.init_lock.lock); - while (ms_global.init_lock.count < (int)ms_setting.nconns) - { - pthread_cond_wait(&ms_global.init_lock.cond, - &ms_global.init_lock.lock); + pthread_mutex_lock(&ms_global.warmup_lock.lock); + while (ms_global.warmup_lock.count < (int)ms_setting.nconns) { + pthread_cond_wait(&ms_global.warmup_lock.cond, &ms_global.warmup_lock.lock); } - pthread_mutex_unlock(&ms_global.init_lock.lock); + pthread_mutex_unlock(&ms_global.warmup_lock.lock); } - ms_global.finish_warmup= true; /* running in "run time" mode, user specify run time */ diff --git a/clients/ms_conn.c b/clients/ms_conn.c index b488401c..32578b3e 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -292,7 +292,7 @@ static int ms_conn_init(ms_conn_t *c, /* for replication, each connection need connect all the server */ if (ms_setting.rep_write_srv > 0) { - c->total_sfds= ms_setting.srv_cnt; + c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn; } else { @@ -361,10 +361,6 @@ static int ms_conn_init(ms_conn_t *c, { c->protocol= binary_prot; } - else if (is_udp) - { - c->protocol= ascii_udp_prot; - } else { c->protocol= ascii_prot; @@ -490,7 +486,7 @@ static int ms_conn_sock_init(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { /* for replication, each connection need connect all the server */ - srv_idx= i; + srv_idx= i % ms_setting.srv_cnt; } else { @@ -571,30 +567,15 @@ static int ms_conn_sock_init(ms_conn_t *c) static int ms_conn_event_init(ms_conn_t *c) { ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; short event_flags= EV_WRITE | EV_PERSIST; event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c); event_base_set(ms_thread->base, &c->event); c->ev_flags= event_flags; - if (c->total_sfds == 1) + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, NULL) == -1) - { - return -1; - } - } - else - { - if (event_add(&c->event, &t) == -1) - { - return -1; - } + return -1; } return 0; @@ -918,8 +899,8 @@ static int ms_reconn(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - srv_idx= c->cur_idx; - srv_conn_cnt= (int)ms_setting.nconns; + srv_idx= c->cur_idx % ms_setting.srv_cnt; + srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns); } else { @@ -982,13 +963,13 @@ static int ms_reconn(ms_conn_t *c) break; } - if (c->total_sfds == 1) + if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0) { /* wait a second and reconnect */ sleep(1); } } - while (c->total_sfds == 1); + while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0); } if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0)) @@ -1046,8 +1027,8 @@ int ms_reconn_socks(ms_conn_t *c) if (ms_setting.rep_write_srv > 0) { - srv_idx= i; - srv_conn_cnt= (int)ms_setting.nconns; + srv_idx= i % ms_setting.srv_cnt; + srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns); } else { @@ -1307,6 +1288,9 @@ void ms_reset_conn(ms_conn_t *c, bool timeout) c->ctnwrite= false; c->rbytes= 0; c->rcurr= c->rbuf; + c->msgcurr = 0; + c->msgused = 0; + c->iovused = 0; ms_conn_set_state(c, conn_write); memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */ @@ -1864,7 +1848,7 @@ static void ms_ascii_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot); + assert(c->protocol == ascii_prot); if (c->rvbytes > 2) { assert( @@ -2004,8 +1988,7 @@ static void ms_complete_nread(ms_conn_t *c) { assert(c != NULL); assert(c->rbytes >= c->rvbytes); - assert(c->protocol == ascii_udp_prot - || c->protocol == ascii_prot + assert(c->protocol == ascii_prot || c->protocol == binary_prot); if (c->protocol == binary_prot) @@ -2419,12 +2402,6 @@ static void ms_conn_set_state(ms_conn_t *c, int state) */ static bool ms_update_event(ms_conn_t *c, const int new_flags) { - /* default event timeout 10 seconds */ - struct timeval t= - { - .tv_sec= EVENT_TIMEOUT, .tv_usec= 0 - }; - assert(c != NULL); struct event_base *base= c->event.ev_base; @@ -2451,19 +2428,9 @@ static bool ms_update_event(ms_conn_t *c, const int new_flags) event_base_set(base, &c->event); c->ev_flags= (short)new_flags; - if (c->total_sfds == 1) - { - if (event_add(&c->event, NULL) == -1) - { - return false; - } - } - else + if (event_add(&c->event, NULL) == -1) { - if (event_add(&c->event, &t) == -1) - { - return false; - } + return false; } return true; @@ -2740,12 +2707,6 @@ void ms_event_handler(const int fd, const short which, void *arg) } assert(fd == c->sfd); - /* event timeout, close the current connection */ - if (c->which == EV_TIMEOUT) - { - ms_conn_set_state(c, conn_closing); - } - ms_drive_machine(c); /* wait for next event */ @@ -3048,15 +3009,11 @@ static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item) * @param c, pointer of the concurrency * @param item, pointer of task item which includes the object * information - * @param verify, whether do verification * * @return int, if success, return 0, else return -1 */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify) +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item) { - /* verify not supported yet */ - UNUSED_ARGUMENT(verify); - assert(c != NULL); c->currcmd.cmd= CMD_GET; diff --git a/clients/ms_conn.h b/clients/ms_conn.h index d95191ec..a52a55c9 100644 --- a/clients/ms_conn.h +++ b/clients/ms_conn.h @@ -30,8 +30,7 @@ extern "C" { #define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */ #define UDP_HEADER_SIZE 8 /* UDP header size */ #define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */ -#define SOCK_WAIT_TIMEOUT 10 /* maximum waiting time of UDP, 10s */ -#define EVENT_TIMEOUT 10 /* maximum waiting time of event,10s */ +#define SOCK_WAIT_TIMEOUT 30 /* maximum waiting time of UDP, 30s */ #define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */ /* Initial size of the sendmsg() scatter/gather array. */ @@ -104,7 +103,6 @@ typedef struct udppkt enum protocol { ascii_prot = 3, /* ASCII protocol */ - ascii_udp_prot, /* ASCII UDP protocol*/ binary_prot, /* binary protocol */ }; @@ -229,7 +227,7 @@ int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item); /* used to send the get command to server */ -int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify); +int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item); /* used to send the multi-get command to server */ diff --git a/clients/ms_memslap.h b/clients/ms_memslap.h index 87caca5d..540fed0e 100644 --- a/clients/ms_memslap.h +++ b/clients/ms_memslap.h @@ -102,6 +102,7 @@ typedef struct global { /* synchronize lock */ ms_sync_lock_t init_lock; + ms_sync_lock_t warmup_lock; ms_sync_lock_t run_lock; /* mutex for outputing error log synchronously when memslap crashes */ diff --git a/clients/ms_setting.c b/clients/ms_setting.c index 95af535c..eb5e4055 100644 --- a/clients/ms_setting.c +++ b/clients/ms_setting.c @@ -342,7 +342,6 @@ static void ms_parse_cfg_file(char *cfg_file) FILE *f; size_t start_len, end_len; double proportion; - size_t frequence; char *line= NULL; size_t read_len; ssize_t nread; @@ -435,8 +434,8 @@ static void ms_parse_cfg_file(char *cfg_file) if (nread != EOF) { - if (sscanf(line, "%zu %zu %lf %zu", &start_len, &end_len, - &proportion, &frequence) != 3) + if (sscanf(line, "%zu %zu %lf", &start_len, &end_len, + &proportion) != 3) { conf_type= ms_get_conf_type(line); break; @@ -479,11 +478,15 @@ static void ms_parse_cfg_file(char *cfg_file) if (nread != EOF) { - if (sscanf(line, "%d %lf\n", &cmd_type, &proportion) != 2) + if (sscanf(line, "%d %lf", &cmd_type, &proportion) != 2) { conf_type= ms_get_conf_type(line); break; } + if (cmd_type >= CMD_NULL) + { + continue; + } ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_type= cmd_type; ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_prop= @@ -920,7 +923,7 @@ void ms_setting_init_pre() static void ms_setting_slapmode_init_post() { ms_setting.total_key_rng_cnt= KEY_RANGE_COUNT_INIT; - ms_setting.key_distr= + ms_setting.key_distr= (ms_key_distr_t *)malloc((size_t)ms_setting.total_key_rng_cnt * sizeof(ms_key_distr_t)); if (ms_setting.key_distr == NULL) @@ -931,7 +934,7 @@ static void ms_setting_slapmode_init_post() ms_setting.total_val_rng_cnt= VALUE_RANGE_COUNT_INIT; - ms_setting.value_distr= + ms_setting.value_distr= (ms_value_distr_t *)malloc((size_t)ms_setting.total_val_rng_cnt * sizeof( ms_value_distr_t)); if (ms_setting.value_distr == NULL) @@ -980,13 +983,6 @@ static void ms_setting_slapmode_init_post() exit(1); } - if ((ms_setting.udp - || ms_setting.facebook_test) && ms_setting.binary_prot) - { - fprintf(stderr, "Binary protocol doesn't support UDP now.\n"); - exit(1); - } - if ((ms_setting.rep_write_srv > 0) && (ms_setting.srv_cnt < 2)) { fprintf(stderr, "Please specify 2 servers at least for replication\n"); @@ -1007,26 +1003,12 @@ static void ms_setting_slapmode_init_post() exit(1); } - if ((ms_setting.rep_write_srv > 0) && (ms_setting.sock_per_conn > 1)) - { - fprintf(stderr, "Replication doesn't support multi-socks " - "in one connection structure.\n"); - exit(1); - } - if (ms_setting.facebook_test && (ms_setting.rep_write_srv > 0)) { fprintf(stderr, "facebook test couldn't work with replication.\n"); exit(1); } - if (ms_setting.reconnect && (ms_setting.sock_per_conn > 1)) - { - fprintf(stderr, "Reconnection doesn't support multi-socks " - "in one connection structure.\n"); - exit(1); - } - ms_build_distr(); /* initialize global character block */ diff --git a/clients/ms_sigsegv.c b/clients/ms_sigsegv.c index 33029b42..60e1f59c 100644 --- a/clients/ms_sigsegv.c +++ b/clients/ms_sigsegv.c @@ -44,23 +44,6 @@ static void ms_signal_segv(int signum, siginfo_t *info, void *ptr) abort(); } -/* signal pipe reaches, this function will run */ -static void ms_signal_pipe(int signum, siginfo_t *info, void *ptr) -{ - UNUSED_ARGUMENT(signum); - UNUSED_ARGUMENT(info); - UNUSED_ARGUMENT(ptr); - - pthread_mutex_lock(&ms_global.quit_mutex); - fprintf(stderr, "\tMemslap encountered a server error. Quitting...\n"); - fprintf(stderr, "\tError info: SIGPIPE captured (from write?)\n"); - fprintf(stderr, - "\tProbably a socket I/O error when the server is down.\n"); - pthread_mutex_unlock(&ms_global.quit_mutex); - exit(1); -} /* ms_signal_pipe */ - - /* signal int reaches, this function will run */ static void ms_signal_int(int signum, siginfo_t *info, void *ptr) { @@ -104,16 +87,8 @@ int ms_setup_sigsegv(void) */ int ms_setup_sigpipe(void) { - struct sigaction action_2; - - memset(&action_2, 0, sizeof(action_2)); - action_2.sa_sigaction= ms_signal_pipe; - action_2.sa_flags= SA_SIGINFO; - if (sigaction(SIGPIPE, &action_2, NULL) < 0) - { - perror("sigaction"); - return 0; - } + /* ignore the SIGPIPE signal */ + signal(SIGPIPE, SIG_IGN); return -1; } /* ms_setup_sigpipe */ diff --git a/clients/ms_task.c b/clients/ms_task.c index 5fbd7d13..631c35ea 100644 --- a/clients/ms_task.c +++ b/clients/ms_task.c @@ -40,7 +40,7 @@ static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c); static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c); static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c); -static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c); +static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c); /* select next operation to do */ @@ -54,7 +54,7 @@ static void ms_kick_out_item(ms_task_item_t *item); /* miss rate adjustment */ -static bool ms_need_overwirte_item(ms_task_t *task); +static bool ms_need_overwrite_item(ms_task_t *task); static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task); @@ -154,18 +154,10 @@ static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c) * @return ms_task_item_t*, the pointer of the previous item of * set operation */ -static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c) +static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c) { - if (c->set_cursor <= 0) - { - return &c->item_win[0]; - } - else - { - return &c->item_win[(int64_t)-- c->set_cursor % c->win_size]; - } -} /* ms_get_pre_set_item */ - + return ms_get_next_get_item(c); +} /* ms_get_random_overwrite_item */ /** * According to the proportion of operations(get or set), select @@ -301,7 +293,7 @@ static void ms_kick_out_item(ms_task_item_t *item) * @return bool, if need overwrite, return true, else return * false */ -static bool ms_need_overwirte_item(ms_task_t *task) +static bool ms_need_overwrite_item(ms_task_t *task) { ms_task_item_t *item= task->item; @@ -355,7 +347,7 @@ static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task) /* If the current item is not a new item, kick it out */ if (item->value_offset != INVALID_OFFSET) { - if (ms_need_overwirte_item(task)) + if (ms_need_overwrite_item(task)) { /* overwrite */ task->overwrite_set++; @@ -369,17 +361,23 @@ static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task) else /* it's a new item */ { /* need overwrite */ - if (ms_need_overwirte_item(task)) + if (ms_need_overwrite_item(task)) { - item= ms_get_pre_set_item(c); + /** + * overwrite not use the item with current set cursor, revert + * set cursor. + */ + c->set_cursor--; + + item= ms_get_random_overwrite_item(c); if (item->value_offset != INVALID_OFFSET) { task->item= item; task->overwrite_set++; } - else /* previous set item is a new item */ + else /* item is a new item */ { - /* select the previous item to run, and cancel overwrite */ + /* select the item to run, and cancel overwrite */ task->item= item; } } @@ -601,7 +599,7 @@ static void ms_warmup_server(ms_conn_t *c) */ if (c->remain_warmup_num == -1) { - ms_send_signal(&ms_global.init_lock); + ms_send_signal(&ms_global.warmup_lock); c->remain_warmup_num--; /* never run the if branch */ } } /* ms_warmup_server */ @@ -629,7 +627,7 @@ static void ms_single_getset_task_sch(ms_conn_t *c) else if (task->cmd == CMD_GET) { assert(task->cmd == CMD_GET); - ms_mcd_get(c, item, task->verify); + ms_mcd_get(c, item); } } } /* ms_single_getset_task_sch */ diff --git a/clients/ms_thread.c b/clients/ms_thread.c index dbfc62a7..60f8ef79 100644 --- a/clients/ms_thread.c +++ b/clients/ms_thread.c @@ -244,7 +244,7 @@ static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) */ static void *ms_worker_libevent(void *arg) { - ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key); + ms_thread_t *ms_thread= NULL; ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg; /** @@ -264,6 +264,12 @@ static void *ms_worker_libevent(void *arg) /* each thread with a timer */ ms_clock_handler(0, 0, 0); + pthread_mutex_lock(&ms_global.init_lock.lock); + ms_global.init_lock.count++; + pthread_cond_signal(&ms_global.init_lock.cond); + pthread_mutex_unlock(&ms_global.init_lock.lock); + + ms_thread= pthread_getspecific(ms_thread_key); event_base_loop(ms_thread->base, 0); return NULL; -- 2.30.2