pthread_mutex_init(&ms_global.init_lock.lock, NULL);
pthread_cond_init(&ms_global.init_lock.cond, NULL);
+ ms_global.warmup_lock.count = 0;
+ pthread_mutex_init(&ms_global.warmup_lock.lock, NULL);
+ pthread_cond_init(&ms_global.warmup_lock.cond, NULL);
+
ms_global.run_lock.count= 0;
pthread_mutex_init(&ms_global.run_lock.lock, NULL);
pthread_cond_init(&ms_global.run_lock.cond, NULL);
pthread_mutex_destroy(&ms_global.init_lock.lock);
pthread_cond_destroy(&ms_global.init_lock.cond);
+ pthread_mutex_destroy(&ms_global.warmup_lock.lock);
+ pthread_cond_destroy(&ms_global.warmup_lock.cond);
+
pthread_mutex_destroy(&ms_global.run_lock.lock);
pthread_cond_destroy(&ms_global.run_lock.cond);
int second= 0;
struct timeval start_time, end_time;
+ /* Wait all the threads complete initialization. */
+ pthread_mutex_lock(&ms_global.init_lock.lock);
+ while (ms_global.init_lock.count < ms_setting.nthreads)
+ {
+ pthread_cond_wait(&ms_global.init_lock.cond,
+ &ms_global.init_lock.lock);
+ }
+ pthread_mutex_unlock(&ms_global.init_lock.lock);
+
/* only when there is no set operation it need warm up */
if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
{
/* Wait all the connects complete warm up. */
- pthread_mutex_lock(&ms_global.init_lock.lock);
- while (ms_global.init_lock.count < (int)ms_setting.nconns)
- {
- pthread_cond_wait(&ms_global.init_lock.cond,
- &ms_global.init_lock.lock);
+ pthread_mutex_lock(&ms_global.warmup_lock.lock);
+ while (ms_global.warmup_lock.count < (int)ms_setting.nconns) {
+ pthread_cond_wait(&ms_global.warmup_lock.cond, &ms_global.warmup_lock.lock);
}
- pthread_mutex_unlock(&ms_global.init_lock.lock);
+ pthread_mutex_unlock(&ms_global.warmup_lock.lock);
}
-
ms_global.finish_warmup= true;
/* running in "run time" mode, user specify run time */
/* for replication, each connection need connect all the server */
if (ms_setting.rep_write_srv > 0)
{
- c->total_sfds= ms_setting.srv_cnt;
+ c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
}
else
{
{
c->protocol= binary_prot;
}
- else if (is_udp)
- {
- c->protocol= ascii_udp_prot;
- }
else
{
c->protocol= ascii_prot;
if (ms_setting.rep_write_srv > 0)
{
/* for replication, each connection need connect all the server */
- srv_idx= i;
+ srv_idx= i % ms_setting.srv_cnt;
}
else
{
static int ms_conn_event_init(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- /* default event timeout 10 seconds */
- struct timeval t=
- {
- .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
- };
short event_flags= EV_WRITE | EV_PERSIST;
event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
event_base_set(ms_thread->base, &c->event);
c->ev_flags= event_flags;
- if (c->total_sfds == 1)
+ if (event_add(&c->event, NULL) == -1)
{
- if (event_add(&c->event, NULL) == -1)
- {
- return -1;
- }
- }
- else
- {
- if (event_add(&c->event, &t) == -1)
- {
- return -1;
- }
+ return -1;
}
return 0;
if (ms_setting.rep_write_srv > 0)
{
- srv_idx= c->cur_idx;
- srv_conn_cnt= (int)ms_setting.nconns;
+ srv_idx= c->cur_idx % ms_setting.srv_cnt;
+ srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns);
}
else
{
break;
}
- if (c->total_sfds == 1)
+ if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
{
/* wait a second and reconnect */
sleep(1);
}
}
- while (c->total_sfds == 1);
+ while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
}
if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
if (ms_setting.rep_write_srv > 0)
{
- srv_idx= i;
- srv_conn_cnt= (int)ms_setting.nconns;
+ srv_idx= i % ms_setting.srv_cnt;
+ srv_conn_cnt= (int)(ms_setting.sock_per_conn * ms_setting.nconns);
}
else
{
c->ctnwrite= false;
c->rbytes= 0;
c->rcurr= c->rbuf;
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
ms_conn_set_state(c, conn_write);
memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
{
assert(c != NULL);
assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
+ assert(c->protocol == ascii_prot);
if (c->rvbytes > 2)
{
assert(
{
assert(c != NULL);
assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot
- || c->protocol == ascii_prot
+ assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);
if (c->protocol == binary_prot)
*/
static bool ms_update_event(ms_conn_t *c, const int new_flags)
{
- /* default event timeout 10 seconds */
- struct timeval t=
- {
- .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
- };
-
assert(c != NULL);
struct event_base *base= c->event.ev_base;
event_base_set(base, &c->event);
c->ev_flags= (short)new_flags;
- if (c->total_sfds == 1)
- {
- if (event_add(&c->event, NULL) == -1)
- {
- return false;
- }
- }
- else
+ if (event_add(&c->event, NULL) == -1)
{
- if (event_add(&c->event, &t) == -1)
- {
- return false;
- }
+ return false;
}
return true;
}
assert(fd == c->sfd);
- /* event timeout, close the current connection */
- if (c->which == EV_TIMEOUT)
- {
- ms_conn_set_state(c, conn_closing);
- }
-
ms_drive_machine(c);
/* wait for next event */
* @param c, pointer of the concurrency
* @param item, pointer of task item which includes the object
* information
- * @param verify, whether do verification
*
* @return int, if success, return 0, else return -1
*/
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
{
- /* verify not supported yet */
- UNUSED_ARGUMENT(verify);
-
assert(c != NULL);
c->currcmd.cmd= CMD_GET;
#define UDP_MAX_SEND_PAYLOAD_SIZE 1400 /* mtu size is 1500 */
#define UDP_HEADER_SIZE 8 /* UDP header size */
#define MAX_SENDBUF_SIZE (256 * 1024 * 1024) /* Maximum socket buffer size */
-#define SOCK_WAIT_TIMEOUT 10 /* maximum waiting time of UDP, 10s */
-#define EVENT_TIMEOUT 10 /* maximum waiting time of event,10s */
+#define SOCK_WAIT_TIMEOUT 30 /* maximum waiting time of UDP, 30s */
#define MAX_UDP_PACKET (1 << 16) /* maximum UDP packets, 65536 */
/* Initial size of the sendmsg() scatter/gather array. */
enum protocol
{
ascii_prot = 3, /* ASCII protocol */
- ascii_udp_prot, /* ASCII UDP protocol*/
binary_prot, /* binary protocol */
};
/* used to send the get command to server */
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify);
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item);
/* used to send the multi-get command to server */
{
/* synchronize lock */
ms_sync_lock_t init_lock;
+ ms_sync_lock_t warmup_lock;
ms_sync_lock_t run_lock;
/* mutex for outputing error log synchronously when memslap crashes */
FILE *f;
size_t start_len, end_len;
double proportion;
- size_t frequence;
char *line= NULL;
size_t read_len;
ssize_t nread;
if (nread != EOF)
{
- if (sscanf(line, "%zu %zu %lf %zu", &start_len, &end_len,
- &proportion, &frequence) != 3)
+ if (sscanf(line, "%zu %zu %lf", &start_len, &end_len,
+ &proportion) != 3)
{
conf_type= ms_get_conf_type(line);
break;
if (nread != EOF)
{
- if (sscanf(line, "%d %lf\n", &cmd_type, &proportion) != 2)
+ if (sscanf(line, "%d %lf", &cmd_type, &proportion) != 2)
{
conf_type= ms_get_conf_type(line);
break;
}
+ if (cmd_type >= CMD_NULL)
+ {
+ continue;
+ }
ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_type=
cmd_type;
ms_setting.cmd_distr[ms_setting.cmd_used_count].cmd_prop=
static void ms_setting_slapmode_init_post()
{
ms_setting.total_key_rng_cnt= KEY_RANGE_COUNT_INIT;
- ms_setting.key_distr=
+ ms_setting.key_distr=
(ms_key_distr_t *)malloc((size_t)ms_setting.total_key_rng_cnt * sizeof(ms_key_distr_t));
if (ms_setting.key_distr == NULL)
ms_setting.total_val_rng_cnt= VALUE_RANGE_COUNT_INIT;
- ms_setting.value_distr=
+ ms_setting.value_distr=
(ms_value_distr_t *)malloc((size_t)ms_setting.total_val_rng_cnt * sizeof( ms_value_distr_t));
if (ms_setting.value_distr == NULL)
exit(1);
}
- if ((ms_setting.udp
- || ms_setting.facebook_test) && ms_setting.binary_prot)
- {
- fprintf(stderr, "Binary protocol doesn't support UDP now.\n");
- exit(1);
- }
-
if ((ms_setting.rep_write_srv > 0) && (ms_setting.srv_cnt < 2))
{
fprintf(stderr, "Please specify 2 servers at least for replication\n");
exit(1);
}
- if ((ms_setting.rep_write_srv > 0) && (ms_setting.sock_per_conn > 1))
- {
- fprintf(stderr, "Replication doesn't support multi-socks "
- "in one connection structure.\n");
- exit(1);
- }
-
if (ms_setting.facebook_test && (ms_setting.rep_write_srv > 0))
{
fprintf(stderr, "facebook test couldn't work with replication.\n");
exit(1);
}
- if (ms_setting.reconnect && (ms_setting.sock_per_conn > 1))
- {
- fprintf(stderr, "Reconnection doesn't support multi-socks "
- "in one connection structure.\n");
- exit(1);
- }
-
ms_build_distr();
/* initialize global character block */
abort();
}
-/* signal pipe reaches, this function will run */
-static void ms_signal_pipe(int signum, siginfo_t *info, void *ptr)
-{
- UNUSED_ARGUMENT(signum);
- UNUSED_ARGUMENT(info);
- UNUSED_ARGUMENT(ptr);
-
- pthread_mutex_lock(&ms_global.quit_mutex);
- fprintf(stderr, "\tMemslap encountered a server error. Quitting...\n");
- fprintf(stderr, "\tError info: SIGPIPE captured (from write?)\n");
- fprintf(stderr,
- "\tProbably a socket I/O error when the server is down.\n");
- pthread_mutex_unlock(&ms_global.quit_mutex);
- exit(1);
-} /* ms_signal_pipe */
-
-
/* signal int reaches, this function will run */
static void ms_signal_int(int signum, siginfo_t *info, void *ptr)
{
*/
int ms_setup_sigpipe(void)
{
- struct sigaction action_2;
-
- memset(&action_2, 0, sizeof(action_2));
- action_2.sa_sigaction= ms_signal_pipe;
- action_2.sa_flags= SA_SIGINFO;
- if (sigaction(SIGPIPE, &action_2, NULL) < 0)
- {
- perror("sigaction");
- return 0;
- }
+ /* ignore the SIGPIPE signal */
+ signal(SIGPIPE, SIG_IGN);
return -1;
} /* ms_setup_sigpipe */
static ms_task_item_t *ms_get_cur_opt_item(ms_conn_t *c);
static ms_task_item_t *ms_get_next_get_item(ms_conn_t *c);
static ms_task_item_t *ms_get_next_set_item(ms_conn_t *c);
-static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c);
+static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c);
/* select next operation to do */
/* miss rate adjustment */
-static bool ms_need_overwirte_item(ms_task_t *task);
+static bool ms_need_overwrite_item(ms_task_t *task);
static bool ms_adjust_opt(ms_conn_t *c, ms_task_t *task);
* @return ms_task_item_t*, the pointer of the previous item of
* set operation
*/
-static ms_task_item_t *ms_get_pre_set_item(ms_conn_t *c)
+static ms_task_item_t *ms_get_random_overwrite_item(ms_conn_t *c)
{
- if (c->set_cursor <= 0)
- {
- return &c->item_win[0];
- }
- else
- {
- return &c->item_win[(int64_t)-- c->set_cursor % c->win_size];
- }
-} /* ms_get_pre_set_item */
-
+ return ms_get_next_get_item(c);
+} /* ms_get_random_overwrite_item */
/**
* According to the proportion of operations(get or set), select
* @return bool, if need overwrite, return true, else return
* false
*/
-static bool ms_need_overwirte_item(ms_task_t *task)
+static bool ms_need_overwrite_item(ms_task_t *task)
{
ms_task_item_t *item= task->item;
/* If the current item is not a new item, kick it out */
if (item->value_offset != INVALID_OFFSET)
{
- if (ms_need_overwirte_item(task))
+ if (ms_need_overwrite_item(task))
{
/* overwrite */
task->overwrite_set++;
else /* it's a new item */
{
/* need overwrite */
- if (ms_need_overwirte_item(task))
+ if (ms_need_overwrite_item(task))
{
- item= ms_get_pre_set_item(c);
+ /**
+ * overwrite not use the item with current set cursor, revert
+ * set cursor.
+ */
+ c->set_cursor--;
+
+ item= ms_get_random_overwrite_item(c);
if (item->value_offset != INVALID_OFFSET)
{
task->item= item;
task->overwrite_set++;
}
- else /* previous set item is a new item */
+ else /* item is a new item */
{
- /* select the previous item to run, and cancel overwrite */
+ /* select the item to run, and cancel overwrite */
task->item= item;
}
}
*/
if (c->remain_warmup_num == -1)
{
- ms_send_signal(&ms_global.init_lock);
+ ms_send_signal(&ms_global.warmup_lock);
c->remain_warmup_num--; /* never run the if branch */
}
} /* ms_warmup_server */
else if (task->cmd == CMD_GET)
{
assert(task->cmd == CMD_GET);
- ms_mcd_get(c, item, task->verify);
+ ms_mcd_get(c, item);
}
}
} /* ms_single_getset_task_sch */
*/
static void *ms_worker_libevent(void *arg)
{
- ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
+ ms_thread_t *ms_thread= NULL;
ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
/**
/* each thread with a timer */
ms_clock_handler(0, 0, 0);
+ pthread_mutex_lock(&ms_global.init_lock.lock);
+ ms_global.init_lock.count++;
+ pthread_cond_signal(&ms_global.init_lock.cond);
+ pthread_mutex_unlock(&ms_global.init_lock.lock);
+
+ ms_thread= pthread_getspecific(ms_thread_key);
event_base_loop(ms_thread->base, 0);
return NULL;