static void ms_conn_shrink(ms_conn_t *c);
static void ms_conn_set_state(ms_conn_t *c, int state);
static bool ms_update_event(ms_conn_t *c, const int new_flags);
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
-static int ms_get_next_sock_index(ms_conn_t *c);
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
+static uint32_t ms_get_next_sock_index(ms_conn_t *c);
static int ms_update_conn_sock_event(ms_conn_t *c);
static bool ms_need_yield(ms_conn_t *c);
static void ms_update_start_time(ms_conn_t *c);
/* for replication, each connection need connect all the server */
if (ms_setting.rep_write_srv > 0)
{
- c->total_sfds= ms_setting.srv_cnt;
+ c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
}
else
{
{
c->protocol= binary_prot;
}
- else if (is_udp)
- {
- c->protocol= ascii_udp_prot;
- }
else
{
c->protocol= ascii_prot;
static int ms_conn_sock_init(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- int i;
+ uint32_t i;
int ret_sfd;
- int srv_idx= 0;
+ uint32_t srv_idx= 0;
assert(c != NULL);
assert(c->tcpsfd != NULL);
if (ms_setting.rep_write_srv > 0)
{
/* for replication, each connection need connect all the server */
- srv_idx= i;
+ srv_idx= i % ms_setting.srv_cnt;
}
else
{
}
else
{
- for (int j= 0; j < i; j++)
+ for (uint32_t j= 0; j < i; j++)
{
close(c->tcpsfd[j]);
}
static int ms_conn_event_init(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- /* default event timeout 10 seconds */
- struct timeval t=
- {
- .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
- };
short event_flags= EV_WRITE | EV_PERSIST;
event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
event_base_set(ms_thread->base, &c->event);
c->ev_flags= event_flags;
- if (c->total_sfds == 1)
+ if (event_add(&c->event, NULL) == -1)
{
- if (event_add(&c->event, NULL) == -1)
- {
- return -1;
- }
- }
- else
- {
- if (event_add(&c->event, &t) == -1)
- {
- return -1;
- }
+ return -1;
}
return 0;
/* delete the event, the socket and the connection */
event_del(&c->event);
- for (int i= 0; i < c->total_sfds; i++)
+ for (uint32_t i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] > 0)
{
* that otherwise mess things up.
*/
memset(&hints, 0, sizeof(hints));
+#ifdef AI_ADDRCONFIG
hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
+#else
+ hints.ai_flags= AI_PASSIVE;
+#endif /* AI_ADDRCONFIG */
if (is_udp)
{
hints.ai_protocol= IPPROTO_UDP;
static int ms_reconn(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- int srv_idx= 0;
- int32_t srv_conn_cnt= 0;
+ uint32_t srv_idx= 0;
+ uint32_t srv_conn_cnt= 0;
if (ms_setting.rep_write_srv > 0)
{
- srv_idx= c->cur_idx;
- srv_conn_cnt= (int)ms_setting.nconns;
+ srv_idx= c->cur_idx % ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
}
else
{
srv_idx= ms_thread->thread_ctx->srv_idx;
- srv_conn_cnt= (int32_t)((int)ms_setting.nconns / ms_setting.srv_cnt);
+ srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
/* close the old socket handler */
c->tcpsfd[c->cur_idx]= 0;
if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
- % (uint32_t)srv_conn_cnt == 0)
+ % srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
fprintf(stderr, "Server %s:%d disconnect\n",
if (ms_setting.rep_write_srv > 0)
{
- int i= 0;
+ uint32_t i= 0;
+
for (i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] != 0)
break;
}
- if (c->total_sfds == 1)
+ if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
{
/* wait a second and reconnect */
sleep(1);
}
}
- while (c->total_sfds == 1);
+ while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
}
if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
int ms_reconn_socks(ms_conn_t *c)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- int srv_idx= 0;
+ uint32_t srv_idx= 0;
int ret_sfd= 0;
- int srv_conn_cnt= 0;
+ uint32_t srv_conn_cnt= 0;
struct timeval cur_time;
assert(c != NULL);
return 0;
}
- for (int i= 0; i < c->total_sfds; i++)
+ for (uint32_t i= 0; i < c->total_sfds; i++)
{
if (c->tcpsfd[i] == 0)
{
if (ms_setting.rep_write_srv > 0)
{
- srv_idx= i;
- srv_conn_cnt= (int)ms_setting.nconns;
+ srv_idx= i % ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
}
else
{
srv_idx= ms_thread->thread_ctx->srv_idx;
- srv_conn_cnt= (int)ms_setting.nconns / ms_setting.srv_cnt;
+ srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
c->ctnwrite= false;
c->rbytes= 0;
c->rcurr= c->rbuf;
+ c->msgcurr = 0;
+ c->msgused = 0;
+ c->iovused = 0;
ms_conn_set_state(c, conn_write);
memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
{
assert(c != NULL);
assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
+ assert(c->protocol == ascii_prot);
if (c->rvbytes > 2)
{
assert(
{
assert(c != NULL);
assert(c->rbytes >= c->rvbytes);
- assert(c->protocol == ascii_udp_prot
- || c->protocol == ascii_prot
+ assert(c->protocol == ascii_prot
|| c->protocol == binary_prot);
if (c->protocol == binary_prot)
*/
static bool ms_update_event(ms_conn_t *c, const int new_flags)
{
- /* default event timeout 10 seconds */
- struct timeval t=
- {
- .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
- };
-
assert(c != NULL);
struct event_base *base= c->event.ev_base;
event_base_set(base, &c->event);
c->ev_flags= (short)new_flags;
- if (c->total_sfds == 1)
- {
- if (event_add(&c->event, NULL) == -1)
- {
- return false;
- }
- }
- else
+ if (event_add(&c->event, NULL) == -1)
{
- if (event_add(&c->event, &t) == -1)
- {
- return false;
- }
+ return false;
}
return true;
}
assert(fd == c->sfd);
- /* event timeout, close the current connection */
- if (c->which == EV_TIMEOUT)
- {
- ms_conn_set_state(c, conn_closing);
- }
-
ms_drive_machine(c);
/* wait for next event */
*
* @return int, if success, return the index, else return 0
*/
-static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
+static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
{
- int sock_index= -1;
- int i= 0;
+ uint32_t sock_index= 0;
+ uint32_t i= 0;
if (c->total_sfds == 1)
{
if (i == ms_setting.rep_write_srv)
{
/* random get one replication server to read */
- sock_index= (int)(random() % c->total_sfds);
+ sock_index= (uint32_t)random() % c->total_sfds;
}
else
{
/* random get one replication writing server to write */
- sock_index= (int)(random() % ms_setting.rep_write_srv);
+ sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
}
}
else if (cmd == CMD_GET)
{
/* random get one replication server to read */
- sock_index= (int)(random() % c->total_sfds);
+ sock_index= (uint32_t)random() % c->total_sfds;
}
}
while (c->tcpsfd[sock_index] == 0);
*
* @return int, return the index
*/
-static int ms_get_next_sock_index(ms_conn_t *c)
+static uint32_t ms_get_next_sock_index(ms_conn_t *c)
{
- int sock_index= 0;
+ uint32_t sock_index= 0;
do
{
int write_len;
char *buffer= c->wbuf;
- write_len= sprintf(buffer,
- " %u %d %d\r\n",
- 0,
- item->exp_time,
- item->value_size);
+ write_len= snprintf(buffer,
+ c->wsize,
+ " %u %d %d\r\n",
+ 0,
+ item->exp_time,
+ item->value_size);
- if (write_len > c->wsize)
+ if (write_len > c->wsize || write_len < 0)
{
/* ought to be always enough. just fail for simplicity */
fprintf(stderr, "output command line too long.\n");
* @param c, pointer of the concurrency
* @param item, pointer of task item which includes the object
* information
- * @param verify, whether do verification
*
* @return int, if success, return 0, else return -1
*/
-int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
+int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
{
- /* verify not supported yet */
- UNUSED_ARGUMENT(verify);
-
assert(c != NULL);
c->currcmd.cmd= CMD_GET;