Casting fixes for linux.
[awesomized/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <arpa/inet.h>
22 #include "ms_setting.h"
23 #include "ms_thread.h"
24 #include "ms_atomic.h"
25
26 /* for network write */
27 #define TRANSMIT_COMPLETE 0
28 #define TRANSMIT_INCOMPLETE 1
29 #define TRANSMIT_SOFT_ERROR 2
30 #define TRANSMIT_HARD_ERROR 3
31
32 /* for generating key */
33 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
34 #define KEY_PREFIX_MASK 0x1010101010101010
35
36 /* For parse the value length return by server */
37 #define KEY_TOKEN 1
38 #define VALUELEN_TOKEN 3
39
40 /* global increasing counter, to ensure the key prefix unique */
41 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
42
43 /* global increasing counter, generating request id for UDP */
44 static volatile uint32_t udp_request_id= 0;
45
46 extern pthread_key_t ms_thread_key;
47
48 /* generate upd request id */
49 static uint32_t ms_get_udp_request_id(void);
50
51
52 /* connect initialize */
53 static void ms_task_init(ms_conn_t *c);
54 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
55 static int ms_conn_sock_init(ms_conn_t *c);
56 static int ms_conn_event_init(ms_conn_t *c);
57 static int ms_conn_init(ms_conn_t *c,
58 const int init_state,
59 const int read_buffer_size,
60 const bool is_udp);
61 static void ms_warmup_num_init(ms_conn_t *c);
62 static int ms_item_win_init(ms_conn_t *c);
63
64
65 /* connection close */
66 void ms_conn_free(ms_conn_t *c);
67 static void ms_conn_close(ms_conn_t *c);
68
69
70 /* create network connection */
71 static int ms_new_socket(struct addrinfo *ai);
72 static void ms_maximize_sndbuf(const int sfd);
73 static int ms_network_connect(ms_conn_t *c,
74 char *srv_host_name,
75 const int srv_port,
76 const bool is_udp,
77 int *ret_sfd);
78 static int ms_reconn(ms_conn_t *c);
79
80
81 /* read and parse */
82 static int ms_tokenize_command(char *command,
83 token_t *tokens,
84 const int max_tokens);
85 static int ms_ascii_process_line(ms_conn_t *c, char *command);
86 static int ms_try_read_line(ms_conn_t *c);
87 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
88 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
89 static int ms_try_read_network(ms_conn_t *c);
90 static void ms_verify_value(ms_conn_t *c,
91 ms_mlget_task_item_t *mlget_item,
92 char *value,
93 int vlen);
94 static void ms_ascii_complete_nread(ms_conn_t *c);
95 static void ms_bin_complete_nread(ms_conn_t *c);
96 static void ms_complete_nread(ms_conn_t *c);
97
98
99 /* send functions */
100 static int ms_add_msghdr(ms_conn_t *c);
101 static int ms_ensure_iov_space(ms_conn_t *c);
102 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
103 static int ms_build_udp_headers(ms_conn_t *c);
104 static int ms_transmit(ms_conn_t *c);
105
106
107 /* status adjustment */
108 static void ms_conn_shrink(ms_conn_t *c);
109 static void ms_conn_set_state(ms_conn_t *c, int state);
110 static bool ms_update_event(ms_conn_t *c, const int new_flags);
111 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
112 static int ms_get_next_sock_index(ms_conn_t *c);
113 static int ms_update_conn_sock_event(ms_conn_t *c);
114 static bool ms_need_yield(ms_conn_t *c);
115 static void ms_update_start_time(ms_conn_t *c);
116
117
118 /* main loop */
119 static void ms_drive_machine(ms_conn_t *c);
120 void ms_event_handler(const int fd, const short which, void *arg);
121
122
123 /* ascii protocol */
124 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
125 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
126 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
127
128
129 /* binary protocol */
130 static int ms_bin_process_response(ms_conn_t *c);
131 static void ms_add_bin_header(ms_conn_t *c,
132 uint8_t opcode,
133 uint8_t hdr_len,
134 uint16_t key_len,
135 uint32_t body_len);
136 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
137 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
138 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
139 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
140
141
142 /**
143 * each key has two parts, prefix and suffix. The suffix is a
144 * string random get form the character table. The prefix is a
145 * uint64_t variable. And the prefix must be unique. we use the
146 * prefix to identify a key. And the prefix can't include
147 * character ' ' '\r' '\n' '\0'.
148 *
149 * @return uint64_t
150 */
151 uint64_t ms_get_key_prefix(void)
152 {
153 uint64_t key_prefix;
154
155 pthread_mutex_lock(&ms_global.seq_mutex);
156 key_prefix_seq|= KEY_PREFIX_MASK;
157 key_prefix= key_prefix_seq;
158 key_prefix_seq++;
159 pthread_mutex_unlock(&ms_global.seq_mutex);
160
161 return key_prefix;
162 } /* ms_get_key_prefix */
163
164
165 /**
166 * get an unique udp request id
167 *
168 * @return an unique UDP request id
169 */
170 static uint32_t ms_get_udp_request_id(void)
171 {
172 return atomic_add_32_nv(&udp_request_id, 1);
173 }
174
175
176 /**
177 * initialize current task structure
178 *
179 * @param c, pointer of the concurrency
180 */
181 static void ms_task_init(ms_conn_t *c)
182 {
183 c->curr_task.cmd= CMD_NULL;
184 c->curr_task.item= 0;
185 c->curr_task.verify= false;
186 c->curr_task.finish_verify= true;
187 c->curr_task.get_miss= true;
188
189 c->curr_task.get_opt= 0;
190 c->curr_task.set_opt= 0;
191 c->curr_task.cycle_undo_get= 0;
192 c->curr_task.cycle_undo_set= 0;
193 c->curr_task.verified_get= 0;
194 c->curr_task.overwrite_set= 0;
195 } /* ms_task_init */
196
197
198 /**
199 * initialize udp for the connection structure
200 *
201 * @param c, pointer of the concurrency
202 * @param is_udp, whether it's udp
203 *
204 * @return int, if success, return 0, else return -1
205 */
206 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
207 {
208 c->hdrbuf= 0;
209 c->rudpbuf= 0;
210 c->udppkt= 0;
211
212 c->rudpsize= UDP_DATA_BUFFER_SIZE;
213 c->hdrsize= 0;
214
215 c->rudpbytes= 0;
216 c->packets= 0;
217 c->recvpkt= 0;
218 c->pktcurr= 0;
219 c->ordcurr= 0;
220
221 c->udp= is_udp;
222
223 if (c->udp || (! c->udp && ms_setting.facebook_test))
224 {
225 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
226 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
227
228 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
229 {
230 if (c->rudpbuf != NULL)
231 free(c->rudpbuf);
232 if (c->udppkt != NULL)
233 free(c->udppkt);
234 fprintf(stderr, "malloc()\n");
235 return -1;
236 }
237 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
238 }
239
240 return 0;
241 } /* ms_conn_udp_init */
242
243
244 /**
245 * initialize the connection structure
246 *
247 * @param c, pointer of the concurrency
248 * @param init_state, (conn_read, conn_write, conn_closing)
249 * @param read_buffer_size
250 * @param is_udp, whether it's udp
251 *
252 * @return int, if success, return 0, else return -1
253 */
254 static int ms_conn_init(ms_conn_t *c,
255 const int init_state,
256 const int read_buffer_size,
257 const bool is_udp)
258 {
259 assert(c != NULL);
260
261 c->rbuf= c->wbuf= 0;
262 c->iov= 0;
263 c->msglist= 0;
264
265 c->rsize= read_buffer_size;
266 c->wsize= WRITE_BUFFER_SIZE;
267 c->iovsize= IOV_LIST_INITIAL;
268 c->msgsize= MSG_LIST_INITIAL;
269
270 /* for replication, each connection need connect all the server */
271 if (ms_setting.rep_write_srv > 0)
272 {
273 c->total_sfds= ms_setting.srv_cnt;
274 }
275 else
276 {
277 c->total_sfds= ms_setting.sock_per_conn;
278 }
279 c->alive_sfds= 0;
280
281 c->rbuf= (char *)malloc((size_t)c->rsize);
282 c->wbuf= (char *)malloc((size_t)c->wsize);
283 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
284 c->msglist= (struct msghdr *)malloc(
285 sizeof(struct msghdr) * (size_t)c->msgsize);
286 if (ms_setting.mult_key_num > 1)
287 {
288 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
289 malloc(
290 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
291 }
292 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
293
294 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
295 || (c->msglist == NULL) || (c->tcpsfd == NULL)
296 || ((ms_setting.mult_key_num > 1)
297 && (c->mlget_task.mlget_item == NULL)))
298 {
299 if (c->rbuf != NULL)
300 free(c->rbuf);
301 if (c->wbuf != NULL)
302 free(c->wbuf);
303 if (c->iov != NULL)
304 free(c->iov);
305 if (c->msglist != NULL)
306 free(c->msglist);
307 if (c->mlget_task.mlget_item != NULL)
308 free(c->mlget_task.mlget_item);
309 if (c->tcpsfd != NULL)
310 free(c->tcpsfd);
311 fprintf(stderr, "malloc()\n");
312 return -1;
313 }
314
315 c->state= init_state;
316 c->rvbytes= 0;
317 c->rbytes= 0;
318 c->rcurr= c->rbuf;
319 c->wcurr= c->wbuf;
320 c->iovused= 0;
321 c->msgcurr= 0;
322 c->msgused= 0;
323 c->cur_idx= c->total_sfds; /* default index is a invalid value */
324
325 c->ctnwrite= false;
326 c->readval= false;
327 c->change_sfd= false;
328
329 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
330 c->precmd.isfinish= true; /* default the previous command finished */
331 c->currcmd.isfinish= false;
332 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
333 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
334
335 c->mlget_task.mlget_num= 0;
336 c->mlget_task.value_index= -1; /* default invalid value */
337
338 if (ms_setting.binary_prot)
339 {
340 c->protocol= binary_prot;
341 }
342 else if (is_udp)
343 {
344 c->protocol= ascii_udp_prot;
345 }
346 else
347 {
348 c->protocol= ascii_prot;
349 }
350
351 /* initialize udp */
352 if (ms_conn_udp_init(c, is_udp) != 0)
353 {
354 return -1;
355 }
356
357 /* initialize task */
358 ms_task_init(c);
359
360 if (! (ms_setting.facebook_test && is_udp))
361 {
362 atomic_add_32(&ms_stats.active_conns, 1);
363 }
364
365 return 0;
366 } /* ms_conn_init */
367
368
369 /**
370 * when doing 100% get operation, it could preset some objects
371 * to warmup the server. this function is used to initialize the
372 * number of the objects to preset.
373 *
374 * @param c, pointer of the concurrency
375 */
376 static void ms_warmup_num_init(ms_conn_t *c)
377 {
378 /* no set operation, preset all the items in the window */
379 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
380 {
381 c->warmup_num= c->win_size;
382 c->remain_warmup_num= c->warmup_num;
383 }
384 else
385 {
386 c->warmup_num= 0;
387 c->remain_warmup_num= c->warmup_num;
388 }
389 } /* ms_warmup_num_init */
390
391
392 /**
393 * each connection has an item window, this function initialize
394 * the window. The window is used to generate task.
395 *
396 * @param c, pointer of the concurrency
397 *
398 * @return int, if success, return 0, else return -1
399 */
400 static int ms_item_win_init(ms_conn_t *c)
401 {
402 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
403 int exp_cnt= 0;
404
405 c->win_size= (int)ms_setting.win_size;
406 c->set_cursor= 0;
407 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
408 c->remain_exec_num= c->exec_num;
409
410 c->item_win= (ms_task_item_t *)malloc(
411 sizeof(ms_task_item_t) * (size_t)c->win_size);
412 if (c->item_win == NULL)
413 {
414 fprintf(stderr, "Can't allocate task item array for conn.\n");
415 return -1;
416 }
417 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
418
419 for (int i= 0; i < c->win_size; i++)
420 {
421 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
422 c->item_win[i].key_prefix= ms_get_key_prefix();
423 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
424 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
425 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
426 c->item_win[i].client_time= 0;
427
428 /* set expire time base on the proportion */
429 if (exp_cnt < ms_setting.exp_ver_per * i)
430 {
431 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
432 exp_cnt++;
433 }
434 else
435 {
436 c->item_win[i].exp_time= 0;
437 }
438 }
439
440 ms_warmup_num_init(c);
441
442 return 0;
443 } /* ms_item_win_init */
444
445
446 /**
447 * each connection structure can include one or more sock
448 * handlers. this function create these socks and connect the
449 * server(s).
450 *
451 * @param c, pointer of the concurrency
452 *
453 * @return int, if success, return 0, else return -1
454 */
455 static int ms_conn_sock_init(ms_conn_t *c)
456 {
457 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
458 int i;
459 int ret_sfd;
460 int srv_idx= 0;
461
462 assert(c != NULL);
463 assert(c->tcpsfd != NULL);
464
465 for (i= 0; i < c->total_sfds; i++)
466 {
467 ret_sfd= 0;
468 if (ms_setting.rep_write_srv > 0)
469 {
470 /* for replication, each connection need connect all the server */
471 srv_idx= i;
472 }
473 else
474 {
475 /* all the connections in a thread connects the same server */
476 srv_idx= ms_thread->thread_ctx->srv_idx;
477 }
478
479 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
480 ms_setting.servers[srv_idx].srv_port,
481 ms_setting.udp, &ret_sfd) != 0)
482 {
483 break;
484 }
485
486 if (i == 0)
487 {
488 c->sfd= ret_sfd;
489 }
490
491 if (! ms_setting.udp)
492 {
493 c->tcpsfd[i]= ret_sfd;
494 }
495
496 c->alive_sfds++;
497 }
498
499 /* initialize udp sock handler if necessary */
500 if (ms_setting.facebook_test)
501 {
502 ret_sfd= 0;
503 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
504 ms_setting.servers[srv_idx].srv_port,
505 true, &ret_sfd) != 0)
506 {
507 c->udpsfd= 0;
508 }
509 else
510 {
511 c->udpsfd= ret_sfd;
512 }
513 }
514
515 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
516 {
517 if (ms_setting.udp)
518 {
519 close(c->sfd);
520 }
521 else
522 {
523 for (int j= 0; j < i; j++)
524 {
525 close(c->tcpsfd[j]);
526 }
527 }
528
529 if (c->udpsfd != 0)
530 {
531 close(c->udpsfd);
532 }
533
534 return -1;
535 }
536
537 return 0;
538 } /* ms_conn_sock_init */
539
540
541 /**
542 * each connection is managed by libevent, this function
543 * initialize the event of the connection structure.
544 *
545 * @param c, pointer of the concurrency
546 *
547 * @return int, if success, return 0, else return -1
548 */
549 static int ms_conn_event_init(ms_conn_t *c)
550 {
551 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
552 /* default event timeout 10 seconds */
553 struct timeval t=
554 {
555 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
556 };
557 short event_flags= EV_WRITE | EV_PERSIST;
558
559 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
560 event_base_set(ms_thread->base, &c->event);
561 c->ev_flags= event_flags;
562
563 if (c->total_sfds == 1)
564 {
565 if (event_add(&c->event, NULL) == -1)
566 {
567 return -1;
568 }
569 }
570 else
571 {
572 if (event_add(&c->event, &t) == -1)
573 {
574 return -1;
575 }
576 }
577
578 return 0;
579 } /* ms_conn_event_init */
580
581
582 /**
583 * setup a connection, each connection structure of each
584 * thread must call this function to initialize.
585 *
586 * @param c, pointer of the concurrency
587 *
588 * @return int, if success, return 0, else return -1
589 */
590 int ms_setup_conn(ms_conn_t *c)
591 {
592 if (ms_item_win_init(c) != 0)
593 {
594 return -1;
595 }
596
597 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
598 {
599 return -1;
600 }
601
602 if (ms_conn_sock_init(c) != 0)
603 {
604 return -1;
605 }
606
607 if (ms_conn_event_init(c) != 0)
608 {
609 return -1;
610 }
611
612 return 0;
613 } /* ms_setup_conn */
614
615
616 /**
617 * Frees a connection.
618 *
619 * @param c, pointer of the concurrency
620 */
621 void ms_conn_free(ms_conn_t *c)
622 {
623 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
624 if (c != NULL)
625 {
626 if (c->hdrbuf != NULL)
627 free(c->hdrbuf);
628 if (c->msglist != NULL)
629 free(c->msglist);
630 if (c->rbuf != NULL)
631 free(c->rbuf);
632 if (c->wbuf != NULL)
633 free(c->wbuf);
634 if (c->iov != NULL)
635 free(c->iov);
636 if (c->mlget_task.mlget_item != NULL)
637 free(c->mlget_task.mlget_item);
638 if (c->rudpbuf != NULL)
639 free(c->rudpbuf);
640 if (c->udppkt != NULL)
641 free(c->udppkt);
642 if (c->item_win != NULL)
643 free(c->item_win);
644 if (c->tcpsfd != NULL)
645 free(c->tcpsfd);
646
647 if (--ms_thread->nactive_conn == 0)
648 {
649 free(ms_thread->conn);
650 }
651 }
652 } /* ms_conn_free */
653
654
655 /**
656 * close a connection
657 *
658 * @param c, pointer of the concurrency
659 */
660 static void ms_conn_close(ms_conn_t *c)
661 {
662 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
663 assert(c != NULL);
664
665 /* delete the event, the socket and the connection */
666 event_del(&c->event);
667
668 for (int i= 0; i < c->total_sfds; i++)
669 {
670 if (c->tcpsfd[i] > 0)
671 {
672 close(c->tcpsfd[i]);
673 }
674 }
675 c->sfd= 0;
676
677 if (ms_setting.facebook_test)
678 {
679 close(c->udpsfd);
680 }
681
682 atomic_dec_32(&ms_stats.active_conns);
683
684 ms_conn_free(c);
685
686 if (ms_setting.run_time == 0)
687 {
688 pthread_mutex_lock(&ms_global.run_lock.lock);
689 ms_global.run_lock.count++;
690 pthread_cond_signal(&ms_global.run_lock.cond);
691 pthread_mutex_unlock(&ms_global.run_lock.lock);
692 }
693
694 if (ms_thread->nactive_conn == 0)
695 {
696 pthread_exit(NULL);
697 }
698 } /* ms_conn_close */
699
700
701 /**
702 * create a new sock
703 *
704 * @param ai, server address information
705 *
706 * @return int, if success, return 0, else return -1
707 */
708 static int ms_new_socket(struct addrinfo *ai)
709 {
710 int sfd;
711
712 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
713 {
714 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
715 return -1;
716 }
717
718 return sfd;
719 } /* ms_new_socket */
720
721
722 /**
723 * Sets a socket's send buffer size to the maximum allowed by the system.
724 *
725 * @param sfd, file descriptor of socket
726 */
727 static void ms_maximize_sndbuf(const int sfd)
728 {
729 socklen_t intsize= sizeof(int);
730 unsigned int last_good= 0;
731 unsigned int min, max, avg;
732 unsigned int old_size;
733
734 /* Start with the default size. */
735 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
736 {
737 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
738 return;
739 }
740
741 /* Binary-search for the real maximum. */
742 min= old_size;
743 max= MAX_SENDBUF_SIZE;
744
745 while (min <= max)
746 {
747 avg= ((unsigned int)(min + max)) / 2;
748 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
749 {
750 last_good= avg;
751 min= avg + 1;
752 }
753 else
754 {
755 max= avg - 1;
756 }
757 }
758 } /* ms_maximize_sndbuf */
759
760
761 /**
762 * socket connects the server
763 *
764 * @param c, pointer of the concurrency
765 * @param srv_host_name, the host name of the server
766 * @param srv_port, port of server
767 * @param is_udp, whether it's udp
768 * @param ret_sfd, the connected socket file descriptor
769 *
770 * @return int, if success, return 0, else return -1
771 */
772 static int ms_network_connect(ms_conn_t *c,
773 char *srv_host_name,
774 const int srv_port,
775 const bool is_udp,
776 int *ret_sfd)
777 {
778 int sfd;
779 struct linger ling=
780 {
781 0, 0
782 };
783 struct addrinfo *ai;
784 struct addrinfo *next;
785 struct addrinfo hints;
786 char port_buf[NI_MAXSERV];
787 int error;
788 int success= 0;
789
790 int flags= 1;
791
792 /*
793 * the memset call clears nonstandard fields in some impementations
794 * that otherwise mess things up.
795 */
796 memset(&hints, 0, sizeof(hints));
797 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
798 if (is_udp)
799 {
800 hints.ai_protocol= IPPROTO_UDP;
801 hints.ai_socktype= SOCK_DGRAM;
802 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
803 }
804 else
805 {
806 hints.ai_family= AF_UNSPEC;
807 hints.ai_protocol= IPPROTO_TCP;
808 hints.ai_socktype= SOCK_STREAM;
809 }
810
811 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
812 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
813 if (error != 0)
814 {
815 if (error != EAI_SYSTEM)
816 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
817 else
818 perror("getaddrinfo()\n");
819
820 return -1;
821 }
822
823 for (next= ai; next; next= next->ai_next)
824 {
825 if ((sfd= ms_new_socket(next)) == -1)
826 {
827 freeaddrinfo(ai);
828 return -1;
829 }
830
831 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
832 if (is_udp)
833 {
834 ms_maximize_sndbuf(sfd);
835 }
836 else
837 {
838 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
839 sizeof(flags));
840 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
841 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
842 sizeof(flags));
843 }
844
845 if (is_udp)
846 {
847 c->srv_recv_addr_size= sizeof(struct sockaddr);
848 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
849 }
850 else
851 {
852 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
853 {
854 close(sfd);
855 freeaddrinfo(ai);
856 return -1;
857 }
858 }
859
860 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
861 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
862 {
863 fprintf(stderr, "setting O_NONBLOCK\n");
864 close(sfd);
865 freeaddrinfo(ai);
866 return -1;
867 }
868
869 if (ret_sfd != NULL)
870 {
871 *ret_sfd= sfd;
872 }
873
874 success++;
875 }
876
877 freeaddrinfo(ai);
878
879 /* Return zero if we detected no errors in starting up connections */
880 return success == 0;
881 } /* ms_network_connect */
882
883
884 /**
885 * reconnect a disconnected sock
886 *
887 * @param c, pointer of the concurrency
888 *
889 * @return int, if success, return 0, else return -1
890 */
891 static int ms_reconn(ms_conn_t *c)
892 {
893 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
894 int srv_idx= 0;
895 int32_t srv_conn_cnt= 0;
896
897 if (ms_setting.rep_write_srv > 0)
898 {
899 srv_idx= c->cur_idx;
900 srv_conn_cnt= ms_setting.nconns;
901 }
902 else
903 {
904 srv_idx= ms_thread->thread_ctx->srv_idx;
905 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
906 }
907
908 /* close the old socket handler */
909 close(c->sfd);
910 c->tcpsfd[c->cur_idx]= 0;
911
912 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
913 % (uint32_t)srv_conn_cnt == 0)
914 {
915 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
916 fprintf(stderr, "Server %s:%d disconnect\n",
917 ms_setting.servers[srv_idx].srv_host_name,
918 ms_setting.servers[srv_idx].srv_port);
919 }
920
921 if (ms_setting.rep_write_srv > 0)
922 {
923 int i= 0;
924 for (i= 0; i < c->total_sfds; i++)
925 {
926 if (c->tcpsfd[i] != 0)
927 {
928 break;
929 }
930 }
931
932 /* all socks disconnect */
933 if (i == c->total_sfds)
934 {
935 return -1;
936 }
937 }
938 else
939 {
940 do
941 {
942 /* reconnect success, break the loop */
943 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
944 ms_setting.servers[srv_idx].srv_port,
945 ms_setting.udp, &c->sfd) == 0)
946 {
947 c->tcpsfd[c->cur_idx]= c->sfd;
948 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
949 % (uint32_t)srv_conn_cnt == 0)
950 {
951 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
952 int reconn_time=
953 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
954 - ms_setting.servers[srv_idx].disconn_time
955 .tv_sec);
956 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
957 ms_setting.servers[srv_idx].srv_host_name,
958 ms_setting.servers[srv_idx].srv_port, reconn_time);
959 }
960 break;
961 }
962
963 if (c->total_sfds == 1)
964 {
965 /* wait a second and reconnect */
966 sleep(1);
967 }
968 }
969 while (c->total_sfds == 1);
970 }
971
972 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
973 {
974 c->sfd= 0;
975 c->alive_sfds--;
976 }
977
978 return 0;
979 } /* ms_reconn */
980
981
982 /**
983 * reconnect several disconnected socks in the connection
984 * structure, the ever-1-second timer of the thread will check
985 * whether some socks in the connections disconnect. if
986 * disconnect, reconnect the sock.
987 *
988 * @param c, pointer of the concurrency
989 *
990 * @return int, if success, return 0, else return -1
991 */
992 int ms_reconn_socks(ms_conn_t *c)
993 {
994 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
995 int srv_idx= 0;
996 int ret_sfd= 0;
997 int srv_conn_cnt= 0;
998 struct timeval cur_time;
999
1000 assert(c != NULL);
1001
1002 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1003 {
1004 return 0;
1005 }
1006
1007 for (int i= 0; i < c->total_sfds; i++)
1008 {
1009 if (c->tcpsfd[i] == 0)
1010 {
1011 gettimeofday(&cur_time, NULL);
1012
1013 /**
1014 * For failover test of replication, reconnect the socks after
1015 * it disconnects more than 5 seconds, Otherwise memslap will
1016 * block at connect() function and the work threads can't work
1017 * in this interval.
1018 */
1019 if (cur_time.tv_sec
1020 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1021 {
1022 break;
1023 }
1024
1025 if (ms_setting.rep_write_srv > 0)
1026 {
1027 srv_idx= i;
1028 srv_conn_cnt= ms_setting.nconns;
1029 }
1030 else
1031 {
1032 srv_idx= ms_thread->thread_ctx->srv_idx;
1033 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1034 }
1035
1036 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1037 ms_setting.servers[srv_idx].srv_port,
1038 ms_setting.udp, &ret_sfd) == 0)
1039 {
1040 c->tcpsfd[i]= ret_sfd;
1041 c->alive_sfds++;
1042
1043 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1044 % (uint32_t)srv_conn_cnt == 0)
1045 {
1046 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1047 int reconn_time=
1048 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1049 - ms_setting.servers[srv_idx].disconn_time
1050 .tv_sec);
1051 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1052 ms_setting.servers[srv_idx].srv_host_name,
1053 ms_setting.servers[srv_idx].srv_port, reconn_time);
1054 }
1055 }
1056 }
1057 }
1058
1059 return 0;
1060 } /* ms_reconn_socks */
1061
1062
1063 /**
1064 * Tokenize the command string by replacing whitespace with '\0' and update
1065 * the token array tokens with pointer to start of each token and length.
1066 * Returns total number of tokens. The last valid token is the terminal
1067 * token (value points to the first unprocessed character of the string and
1068 * length zero).
1069 *
1070 * Usage example:
1071 *
1072 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1073 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1074 * ...
1075 * }
1076 * ncommand = tokens[ix].value - command;
1077 * command = tokens[ix].value;
1078 * }
1079 *
1080 * @param command, the command string to token
1081 * @param tokens, array to store tokens
1082 * @param max_tokens, maximum tokens number
1083 *
1084 * @return int, the number of tokens
1085 */
1086 static int ms_tokenize_command(char *command,
1087 token_t *tokens,
1088 const int max_tokens)
1089 {
1090 char *s, *e;
1091 int ntokens= 0;
1092
1093 assert(command != NULL && tokens != NULL && max_tokens > 1);
1094
1095 for (s= e= command; ntokens < max_tokens - 1; ++e)
1096 {
1097 if (*e == ' ')
1098 {
1099 if (s != e)
1100 {
1101 tokens[ntokens].value= s;
1102 tokens[ntokens].length= (size_t)(e - s);
1103 ntokens++;
1104 *e= '\0';
1105 }
1106 s= e + 1;
1107 }
1108 else if (*e == '\0')
1109 {
1110 if (s != e)
1111 {
1112 tokens[ntokens].value= s;
1113 tokens[ntokens].length= (size_t)(e - s);
1114 ntokens++;
1115 }
1116
1117 break; /* string end */
1118 }
1119 }
1120
1121 return ntokens;
1122 } /* ms_tokenize_command */
1123
1124
1125 /**
1126 * parse the response of server.
1127 *
1128 * @param c, pointer of the concurrency
1129 * @param command, the string responded by server
1130 *
1131 * @return int, if the command completed return 0, else return
1132 * -1
1133 */
1134 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1135 {
1136 int ret= 0;
1137 int64_t value_len;
1138 char *buffer= command;
1139
1140 assert(c != NULL);
1141
1142 /**
1143 * for command get, we store the returned value into local buffer
1144 * then continue in ms_complete_nread().
1145 */
1146
1147 switch (buffer[0])
1148 {
1149 case 'V': /* VALUE || VERSION */
1150 if (buffer[1] == 'A') /* VALUE */
1151 {
1152 token_t tokens[MAX_TOKENS];
1153 ms_tokenize_command(command, tokens, MAX_TOKENS);
1154 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1155 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1156
1157 /*
1158 * We read the \r\n into the string since not doing so is more
1159 * cycles then the waster of memory to do so.
1160 *
1161 * We are null terminating through, which will most likely make
1162 * some people lazy about using the return length.
1163 */
1164 c->rvbytes= (int)(value_len + 2);
1165 c->readval= true;
1166 ret= -1;
1167 }
1168
1169 break;
1170
1171 case 'O': /* OK */
1172 c->currcmd.retstat= MCD_SUCCESS;
1173
1174 case 'S': /* STORED STATS SERVER_ERROR */
1175 if (buffer[2] == 'A') /* STORED STATS */
1176 { /* STATS*/
1177 c->currcmd.retstat= MCD_STAT;
1178 }
1179 else if (buffer[1] == 'E')
1180 {
1181 /* SERVER_ERROR */
1182 printf("<%d %s\n", c->sfd, buffer);
1183
1184 c->currcmd.retstat= MCD_SERVER_ERROR;
1185 }
1186 else if (buffer[1] == 'T')
1187 {
1188 /* STORED */
1189 c->currcmd.retstat= MCD_STORED;
1190 }
1191 else
1192 {
1193 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1194 }
1195 break;
1196
1197 case 'D': /* DELETED DATA */
1198 if (buffer[1] == 'E')
1199 {
1200 c->currcmd.retstat= MCD_DELETED;
1201 }
1202 else
1203 {
1204 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1205 }
1206
1207 break;
1208
1209 case 'N': /* NOT_FOUND NOT_STORED*/
1210 if (buffer[4] == 'F')
1211 {
1212 c->currcmd.retstat= MCD_NOTFOUND;
1213 }
1214 else if (buffer[4] == 'S')
1215 {
1216 printf("<%d %s\n", c->sfd, buffer);
1217 c->currcmd.retstat= MCD_NOTSTORED;
1218 }
1219 else
1220 {
1221 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1222 }
1223 break;
1224
1225 case 'E': /* PROTOCOL ERROR or END */
1226 if (buffer[1] == 'N')
1227 {
1228 /* END */
1229 c->currcmd.retstat= MCD_END;
1230 }
1231 else if (buffer[1] == 'R')
1232 {
1233 printf("<%d ERROR\n", c->sfd);
1234 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1235 }
1236 else if (buffer[1] == 'X')
1237 {
1238 c->currcmd.retstat= MCD_DATA_EXISTS;
1239 printf("<%d %s\n", c->sfd, buffer);
1240 }
1241 else
1242 {
1243 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1244 }
1245 break;
1246
1247 case 'C': /* CLIENT ERROR */
1248 printf("<%d %s\n", c->sfd, buffer);
1249 c->currcmd.retstat= MCD_CLIENT_ERROR;
1250 break;
1251
1252 default:
1253 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1254 break;
1255 } /* switch */
1256
1257 return ret;
1258 } /* ms_ascii_process_line */
1259
1260
1261 /**
1262 * after one operation completes, reset the concurrency
1263 *
1264 * @param c, pointer of the concurrency
1265 * @param timeout, whether it's timeout
1266 */
1267 void ms_reset_conn(ms_conn_t *c, bool timeout)
1268 {
1269 assert(c != NULL);
1270
1271 if (c->udp)
1272 {
1273 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1274 {
1275 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (uint64_t)c->packets);
1276 }
1277
1278 c->packets= 0;
1279 c->recvpkt= 0;
1280 c->pktcurr= 0;
1281 c->ordcurr= 0;
1282 c->rudpbytes= 0;
1283 }
1284 c->currcmd.isfinish= true;
1285 c->ctnwrite= false;
1286 c->rbytes= 0;
1287 c->rcurr= c->rbuf;
1288 ms_conn_set_state(c, conn_write);
1289 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1290
1291 if (timeout)
1292 {
1293 ms_drive_machine(c);
1294 }
1295 } /* ms_reset_conn */
1296
1297
1298 /**
1299 * if we have a complete line in the buffer, process it.
1300 *
1301 * @param c, pointer of the concurrency
1302 *
1303 * @return int, if success, return 0, else return -1
1304 */
1305 static int ms_try_read_line(ms_conn_t *c)
1306 {
1307 if (c->protocol == binary_prot)
1308 {
1309 /* Do we have the complete packet header? */
1310 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1311 {
1312 /* need more data! */
1313 return 0;
1314 }
1315 else
1316 {
1317 #ifdef NEED_ALIGN
1318 if (((long)(c->rcurr)) % 8 != 0)
1319 {
1320 /* must realign input buffer */
1321 memmove(c->rbuf, c->rcurr, c->rbytes);
1322 c->rcurr= c->rbuf;
1323 if (settings.verbose)
1324 {
1325 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1326 }
1327 }
1328 #endif
1329 protocol_binary_response_header *rsp;
1330 rsp= (protocol_binary_response_header *)c->rcurr;
1331
1332 c->binary_header= *rsp;
1333 c->binary_header.response.extlen= rsp->response.extlen;
1334 c->binary_header.response.keylen= ntohl(rsp->response.keylen);
1335 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1336 c->binary_header.response.status= ntohl(rsp->response.status);
1337
1338 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1339 {
1340 fprintf(stderr, "Invalid magic: %x\n",
1341 c->binary_header.response.magic);
1342 ms_conn_set_state(c, conn_closing);
1343 return 0;
1344 }
1345
1346 /* process this complete response */
1347 if (ms_bin_process_response(c) == 0)
1348 {
1349 /* current operation completed */
1350 ms_reset_conn(c, false);
1351 return -1;
1352 }
1353 else
1354 {
1355 c->rbytes-= (int32_t)sizeof(c->binary_header);
1356 c->rcurr+= sizeof(c->binary_header);
1357 }
1358 }
1359 }
1360 else
1361 {
1362 char *el, *cont;
1363
1364 assert(c != NULL);
1365 assert(c->rcurr <= (c->rbuf + c->rsize));
1366
1367 if (c->rbytes == 0)
1368 return 0;
1369
1370 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1371 if (! el)
1372 return 0;
1373
1374 cont= el + 1;
1375 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1376 {
1377 el--;
1378 }
1379 *el= '\0';
1380
1381 assert(cont <= (c->rcurr + c->rbytes));
1382
1383 /* process this complete line */
1384 if (ms_ascii_process_line(c, c->rcurr) == 0)
1385 {
1386 /* current operation completed */
1387 ms_reset_conn(c, false);
1388 return -1;
1389 }
1390 else
1391 {
1392 /* current operation didn't complete */
1393 c->rbytes-= (int32_t)(cont - c->rcurr);
1394 c->rcurr= cont;
1395 }
1396
1397 assert(c->rcurr <= (c->rbuf + c->rsize));
1398 }
1399
1400 return -1;
1401 } /* ms_try_read_line */
1402
1403
1404 /**
1405 * because the packet of UDP can't ensure the order, the
1406 * function is used to sort the received udp packet.
1407 *
1408 * @param c, pointer of the concurrency
1409 * @param buf, the buffer to store the ordered packages data
1410 * @param rbytes, the maximum capacity of the buffer
1411 *
1412 * @return int, if success, return the copy bytes, else return
1413 * -1
1414 */
1415 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1416 {
1417 int len= 0;
1418 int wbytes= 0;
1419 uint16_t req_id= 0;
1420 uint16_t seq_num= 0;
1421 uint16_t packets= 0;
1422 unsigned char *header= NULL;
1423
1424 /* no enough data */
1425 assert(c != NULL);
1426 assert(buf != NULL);
1427 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1428
1429 /* calculate received packets count */
1430 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1431 {
1432 /* the last packet has some data */
1433 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1434 }
1435 else
1436 {
1437 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1438 }
1439
1440 /* get the total packets count if necessary */
1441 if (c->packets == 0)
1442 {
1443 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1444 }
1445
1446 /* build the ordered packet array */
1447 for (int i= c->pktcurr; i < c->recvpkt; i++)
1448 {
1449 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1450 req_id= (uint16_t)HEADER_TO_REQID(header);
1451 assert(req_id == c->request_id % (1 << 16));
1452
1453 packets= (uint16_t)HEADER_TO_PACKETS(header);
1454 assert(c->packets == HEADER_TO_PACKETS(header));
1455
1456 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1457 c->udppkt[seq_num].header= header;
1458 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1459
1460 if (i == c->recvpkt - 1)
1461 {
1462 /* last received packet */
1463 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1464 {
1465 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1466 c->pktcurr++;
1467 }
1468 else
1469 {
1470 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1471 - UDP_HEADER_SIZE;
1472 }
1473 }
1474 else
1475 {
1476 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1477 c->pktcurr++;
1478 }
1479 }
1480
1481 for (int i= c->ordcurr; i < c->recvpkt; i++)
1482 {
1483 /* there is some data to copy */
1484 if ((c->udppkt[i].data != NULL)
1485 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1486 {
1487 header= c->udppkt[i].header;
1488 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1489 if (len > rbytes - wbytes)
1490 {
1491 len= rbytes - wbytes;
1492 }
1493
1494 assert(len <= rbytes - wbytes);
1495 assert(i == HEADER_TO_SEQNUM(header));
1496
1497 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1498 (size_t)len);
1499 wbytes+= len;
1500 c->udppkt[i].copybytes+= len;
1501
1502 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1503 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1504 {
1505 /* finish copying all the data of this packet, next */
1506 c->ordcurr++;
1507 }
1508
1509 /* last received packet, and finish copying all the data */
1510 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1511 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1512 {
1513 break;
1514 }
1515
1516 /* no space to copy data */
1517 if (wbytes >= rbytes)
1518 {
1519 break;
1520 }
1521
1522 /* it doesn't finish reading all the data of the packet from network */
1523 if ((i != c->recvpkt - 1)
1524 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1525 {
1526 break;
1527 }
1528 }
1529 else
1530 {
1531 /* no data to copy */
1532 break;
1533 }
1534 }
1535
1536 return wbytes == 0 ? -1 : wbytes;
1537 } /* ms_sort_udp_packet */
1538
1539
1540 /**
1541 * encapsulate upd read like tcp read
1542 *
1543 * @param c, pointer of the concurrency
1544 * @param buf, read buffer
1545 * @param len, length to read
1546 *
1547 * @return int, if success, return the read bytes, else return
1548 * -1
1549 */
1550 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1551 {
1552 int res= 0;
1553 int avail= 0;
1554 int rbytes= 0;
1555 int copybytes= 0;
1556
1557 assert(c->udp);
1558
1559 while (1)
1560 {
1561 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1562 {
1563 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1564 if (! new_rbuf)
1565 {
1566 fprintf(stderr, "Couldn't realloc input buffer.\n");
1567 c->rudpbytes= 0; /* ignore what we read */
1568 return -1;
1569 }
1570 c->rudpbuf= new_rbuf;
1571 c->rudpsize*= 2;
1572 }
1573
1574 avail= c->rudpsize - c->rudpbytes;
1575 /* UDP each time read a packet, 1400 bytes */
1576 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1577
1578 if (res > 0)
1579 {
1580 atomic_add_64(&ms_stats.bytes_read, res);
1581 c->rudpbytes+= res;
1582 rbytes+= res;
1583 if (res == avail)
1584 {
1585 continue;
1586 }
1587 else
1588 {
1589 break;
1590 }
1591 }
1592
1593 if (res == 0)
1594 {
1595 /* "connection" closed */
1596 return res;
1597 }
1598
1599 if (res == -1)
1600 {
1601 /* no data to read */
1602 return res;
1603 }
1604 }
1605
1606 /* copy data to read buffer */
1607 if (rbytes > 0)
1608 {
1609 copybytes= ms_sort_udp_packet(c, buf, len);
1610 }
1611
1612 if (copybytes == -1)
1613 {
1614 atomic_add_64(&ms_stats.pkt_disorder, 1);
1615 }
1616
1617 return copybytes;
1618 } /* ms_udp_read */
1619
1620
1621 /*
1622 * read from network as much as we can, handle buffer overflow and connection
1623 * close.
1624 * before reading, move the remaining incomplete fragment of a command
1625 * (if any) to the beginning of the buffer.
1626 * return 0 if there's nothing to read on the first read.
1627 */
1628
1629 /**
1630 * read from network as much as we can, handle buffer overflow and connection
1631 * close. before reading, move the remaining incomplete fragment of a command
1632 * (if any) to the beginning of the buffer.
1633 *
1634 * @param c, pointer of the concurrency
1635 *
1636 * @return int,
1637 * return 0 if there's nothing to read on the first read.
1638 * return 1 if get data
1639 * return -1 if error happens
1640 */
1641 static int ms_try_read_network(ms_conn_t *c)
1642 {
1643 int gotdata= 0;
1644 int res;
1645 int64_t avail;
1646
1647 assert(c != NULL);
1648
1649 if ((c->rcurr != c->rbuf)
1650 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1651 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1652 {
1653 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1654 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1655 c->rcurr= c->rbuf;
1656 }
1657
1658 while (1)
1659 {
1660 if (c->rbytes >= c->rsize)
1661 {
1662 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1663 if (! new_rbuf)
1664 {
1665 fprintf(stderr, "Couldn't realloc input buffer.\n");
1666 c->rbytes= 0; /* ignore what we read */
1667 return -1;
1668 }
1669 c->rcurr= c->rbuf= new_rbuf;
1670 c->rsize*= 2;
1671 }
1672
1673 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1674 if (avail == 0)
1675 {
1676 break;
1677 }
1678
1679 if (c->udp)
1680 {
1681 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1682 }
1683 else
1684 {
1685 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1686 }
1687
1688 if (res > 0)
1689 {
1690 if (! c->udp)
1691 {
1692 atomic_add_64(&ms_stats.bytes_read, res);
1693 }
1694 gotdata= 1;
1695 c->rbytes+= res;
1696 if (res == avail)
1697 {
1698 continue;
1699 }
1700 else
1701 {
1702 break;
1703 }
1704 }
1705 if (res == 0)
1706 {
1707 /* connection closed */
1708 ms_conn_set_state(c, conn_closing);
1709 return -1;
1710 }
1711 if (res == -1)
1712 {
1713 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1714 break;
1715 /* Should close on unhandled errors. */
1716 ms_conn_set_state(c, conn_closing);
1717 return -1;
1718 }
1719 }
1720
1721 return gotdata;
1722 } /* ms_try_read_network */
1723
1724
1725 /**
1726 * after get the object from server, verify the value if
1727 * necessary.
1728 *
1729 * @param c, pointer of the concurrency
1730 * @param mlget_item, pointer of mulit-get task item structure
1731 * @param value, received value string
1732 * @param vlen, received value string length
1733 */
1734 static void ms_verify_value(ms_conn_t *c,
1735 ms_mlget_task_item_t *mlget_item,
1736 char *value,
1737 int vlen)
1738 {
1739 if (c->curr_task.verify)
1740 {
1741 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1742 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1743 char *orignkey=
1744 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1745
1746 /* verify expire time if necessary */
1747 if (c->curr_task.item->exp_time > 0)
1748 {
1749 struct timeval curr_time;
1750 gettimeofday(&curr_time, NULL);
1751
1752 /* object expired but get it now */
1753 if (curr_time.tv_sec - c->curr_task.item->client_time
1754 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1755 {
1756 atomic_add_64(&ms_stats.exp_get, 1);
1757
1758 if (ms_setting.verbose)
1759 {
1760 char set_time[64];
1761 char cur_time[64];
1762 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1763 localtime(&c->curr_task.item->client_time));
1764 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1765 localtime(&curr_time.tv_sec));
1766 fprintf(stderr,
1767 "\n<%d expire time verification failed, "
1768 "object expired but get it now\n"
1769 "\tkey len: %d\n"
1770 "\tkey: %" PRIx64 " %.*s\n"
1771 "\tset time: %s current time: %s "
1772 "diff time: %d expire time: %d\n"
1773 "\texpected data: \n"
1774 "\treceived data len: %d\n"
1775 "\treceived data: %.*s\n",
1776 c->sfd,
1777 c->curr_task.item->key_size,
1778 c->curr_task.item->key_prefix,
1779 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1780 orignkey,
1781 set_time,
1782 cur_time,
1783 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1784 c->curr_task.item->exp_time,
1785 vlen,
1786 vlen,
1787 value);
1788 fflush(stderr);
1789 }
1790 }
1791 }
1792 else
1793 {
1794 if ((c->curr_task.item->value_size != vlen)
1795 || (memcmp(orignval, value, (size_t)vlen) != 0))
1796 {
1797 atomic_add_64(&ms_stats.vef_failed, 1);
1798
1799 if (ms_setting.verbose)
1800 {
1801 fprintf(stderr,
1802 "\n<%d data verification failed\n"
1803 "\tkey len: %d\n"
1804 "\tkey: %" PRIx64" %.*s\n"
1805 "\texpected data len: %d\n"
1806 "\texpected data: %.*s\n"
1807 "\treceived data len: %d\n"
1808 "\treceived data: %.*s\n",
1809 c->sfd,
1810 c->curr_task.item->key_size,
1811 c->curr_task.item->key_prefix,
1812 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1813 orignkey,
1814 c->curr_task.item->value_size,
1815 c->curr_task.item->value_size,
1816 orignval,
1817 vlen,
1818 vlen,
1819 value);
1820 fflush(stderr);
1821 }
1822 }
1823 }
1824
1825 c->curr_task.finish_verify= true;
1826
1827 if (mlget_item != NULL)
1828 {
1829 mlget_item->finish_verify= true;
1830 }
1831 }
1832 } /* ms_verify_value */
1833
1834
1835 /**
1836 * For ASCII protocol, after store the data into the local
1837 * buffer, run this function to handle the data.
1838 *
1839 * @param c, pointer of the concurrency
1840 */
1841 static void ms_ascii_complete_nread(ms_conn_t *c)
1842 {
1843 assert(c != NULL);
1844 assert(c->rbytes >= c->rvbytes);
1845 assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
1846 if (c->rvbytes > 2)
1847 {
1848 assert(
1849 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1850 }
1851
1852 /* multi-get */
1853 ms_mlget_task_item_t *mlget_item= NULL;
1854 if (((ms_setting.mult_key_num > 1)
1855 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1856 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1857 {
1858 c->mlget_task.value_index++;
1859 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1860
1861 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1862 {
1863 c->curr_task.item= mlget_item->item;
1864 c->curr_task.verify= mlget_item->verify;
1865 c->curr_task.finish_verify= mlget_item->finish_verify;
1866 mlget_item->get_miss= false;
1867 }
1868 else
1869 {
1870 /* Try to find the task item in multi-get task array */
1871 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1872 {
1873 mlget_item= &c->mlget_task.mlget_item[i];
1874 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1875 {
1876 c->curr_task.item= mlget_item->item;
1877 c->curr_task.verify= mlget_item->verify;
1878 c->curr_task.finish_verify= mlget_item->finish_verify;
1879 mlget_item->get_miss= false;
1880
1881 break;
1882 }
1883 }
1884 }
1885 }
1886
1887 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1888
1889 c->curr_task.get_miss= false;
1890 c->rbytes-= c->rvbytes;
1891 c->rcurr= c->rcurr + c->rvbytes;
1892 assert(c->rcurr <= (c->rbuf + c->rsize));
1893 c->readval= false;
1894 c->rvbytes= 0;
1895 } /* ms_ascii_complete_nread */
1896
1897
1898 /**
1899 * For binary protocol, after store the data into the local
1900 * buffer, run this function to handle the data.
1901 *
1902 * @param c, pointer of the concurrency
1903 */
1904 static void ms_bin_complete_nread(ms_conn_t *c)
1905 {
1906 assert(c != NULL);
1907 assert(c->rbytes >= c->rvbytes);
1908 assert(c->protocol == binary_prot);
1909
1910 int extlen= c->binary_header.response.extlen;
1911 int keylen= c->binary_header.response.keylen;
1912 uint8_t opcode= c->binary_header.response.opcode;
1913
1914 /* not get command or not include value, just return */
1915 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1916 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1917 || (c->rvbytes <= extlen + keylen))
1918 {
1919 /* get miss */
1920 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1921 {
1922 c->currcmd.retstat= MCD_END;
1923 c->curr_task.get_miss= true;
1924 }
1925
1926 c->readval= false;
1927 c->rvbytes= 0;
1928 ms_reset_conn(c, false);
1929 return;
1930 }
1931
1932 /* multi-get */
1933 ms_mlget_task_item_t *mlget_item= NULL;
1934 if (((ms_setting.mult_key_num > 1)
1935 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1936 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1937 {
1938 c->mlget_task.value_index++;
1939 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1940
1941 c->curr_task.item= mlget_item->item;
1942 c->curr_task.verify= mlget_item->verify;
1943 c->curr_task.finish_verify= mlget_item->finish_verify;
1944 mlget_item->get_miss= false;
1945 }
1946
1947 ms_verify_value(c,
1948 mlget_item,
1949 c->rcurr + extlen + keylen,
1950 c->rvbytes - extlen - keylen);
1951
1952 c->currcmd.retstat= MCD_END;
1953 c->curr_task.get_miss= false;
1954 c->rbytes-= c->rvbytes;
1955 c->rcurr= c->rcurr + c->rvbytes;
1956 assert(c->rcurr <= (c->rbuf + c->rsize));
1957 c->readval= false;
1958 c->rvbytes= 0;
1959
1960 if (ms_setting.mult_key_num > 1)
1961 {
1962 /* multi-get have check all the item */
1963 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1964 {
1965 ms_reset_conn(c, false);
1966 }
1967 }
1968 else
1969 {
1970 /* single get */
1971 ms_reset_conn(c, false);
1972 }
1973 } /* ms_bin_complete_nread */
1974
1975
1976 /**
1977 * we get here after reading the value of get commands.
1978 *
1979 * @param c, pointer of the concurrency
1980 */
1981 static void ms_complete_nread(ms_conn_t *c)
1982 {
1983 assert(c != NULL);
1984 assert(c->rbytes >= c->rvbytes);
1985 assert(c->protocol == ascii_udp_prot
1986 || c->protocol == ascii_prot
1987 || c->protocol == binary_prot);
1988
1989 if (c->protocol == binary_prot)
1990 {
1991 ms_bin_complete_nread(c);
1992 }
1993 else
1994 {
1995 ms_ascii_complete_nread(c);
1996 }
1997 } /* ms_complete_nread */
1998
1999
2000 /**
2001 * Adds a message header to a connection.
2002 *
2003 * @param c, pointer of the concurrency
2004 *
2005 * @return int, if success, return 0, else return -1
2006 */
2007 static int ms_add_msghdr(ms_conn_t *c)
2008 {
2009 struct msghdr *msg;
2010
2011 assert(c != NULL);
2012
2013 if (c->msgsize == c->msgused)
2014 {
2015 msg=
2016 realloc(c->msglist, (uint64_t)c->msgsize * 2 * sizeof(struct msghdr));
2017 if (! msg)
2018 return -1;
2019
2020 c->msglist= msg;
2021 c->msgsize*= 2;
2022 }
2023
2024 msg= c->msglist + c->msgused;
2025
2026 /**
2027 * this wipes msg_iovlen, msg_control, msg_controllen, and
2028 * msg_flags, the last 3 of which aren't defined on solaris:
2029 */
2030 memset(msg, 0, sizeof(struct msghdr));
2031
2032 msg->msg_iov= &c->iov[c->iovused];
2033
2034 if (c->udp && (c->srv_recv_addr_size > 0))
2035 {
2036 msg->msg_name= &c->srv_recv_addr;
2037 msg->msg_namelen= c->srv_recv_addr_size;
2038 }
2039
2040 c->msgbytes= 0;
2041 c->msgused++;
2042
2043 if (c->udp)
2044 {
2045 /* Leave room for the UDP header, which we'll fill in later. */
2046 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2047 }
2048
2049 return 0;
2050 } /* ms_add_msghdr */
2051
2052
2053 /**
2054 * Ensures that there is room for another structure iovec in a connection's
2055 * iov list.
2056 *
2057 * @param c, pointer of the concurrency
2058 *
2059 * @return int, if success, return 0, else return -1
2060 */
2061 static int ms_ensure_iov_space(ms_conn_t *c)
2062 {
2063 assert(c != NULL);
2064
2065 if (c->iovused >= c->iovsize)
2066 {
2067 int i, iovnum;
2068 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2069 ((uint64_t)c->iovsize
2070 * 2)
2071 * sizeof(struct iovec));
2072 if (! new_iov)
2073 return -1;
2074
2075 c->iov= new_iov;
2076 c->iovsize*= 2;
2077
2078 /* Point all the msghdr structures at the new list. */
2079 for (i= 0, iovnum= 0; i < c->msgused; i++)
2080 {
2081 c->msglist[i].msg_iov= &c->iov[iovnum];
2082 iovnum+= (int)c->msglist[i].msg_iovlen;
2083 }
2084 }
2085
2086 return 0;
2087 } /* ms_ensure_iov_space */
2088
2089
2090 /**
2091 * Adds data to the list of pending data that will be written out to a
2092 * connection.
2093 *
2094 * @param c, pointer of the concurrency
2095 * @param buf, the buffer includes data to send
2096 * @param len, the data length in the buffer
2097 *
2098 * @return int, if success, return 0, else return -1
2099 */
2100 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2101 {
2102 struct msghdr *m;
2103 int leftover;
2104 bool limit_to_mtu;
2105
2106 assert(c != NULL);
2107
2108 do
2109 {
2110 m= &c->msglist[c->msgused - 1];
2111
2112 /*
2113 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2114 */
2115 limit_to_mtu= c->udp;
2116
2117 /* We may need to start a new msghdr if this one is full. */
2118 if ((m->msg_iovlen == IOV_MAX)
2119 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2120 {
2121 ms_add_msghdr(c);
2122 m= &c->msglist[c->msgused - 1];
2123 }
2124
2125 if (ms_ensure_iov_space(c) != 0)
2126 return -1;
2127
2128 /* If the fragment is too big to fit in the datagram, split it up */
2129 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2130 {
2131 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2132 len-= leftover;
2133 }
2134 else
2135 {
2136 leftover= 0;
2137 }
2138
2139 m= &c->msglist[c->msgused - 1];
2140 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2141 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2142
2143 c->msgbytes+= len;
2144 c->iovused++;
2145 m->msg_iovlen++;
2146
2147 buf= ((char *)buf) + len;
2148 len= leftover;
2149 }
2150 while (leftover > 0);
2151
2152 return 0;
2153 } /* ms_add_iov */
2154
2155
2156 /**
2157 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2158 *
2159 * @param c, pointer of the concurrency
2160 *
2161 * @return int, if success, return 0, else return -1
2162 */
2163 static int ms_build_udp_headers(ms_conn_t *c)
2164 {
2165 int i;
2166 unsigned char *hdr;
2167
2168 assert(c != NULL);
2169
2170 c->request_id= ms_get_udp_request_id();
2171
2172 if (c->msgused > c->hdrsize)
2173 {
2174 void *new_hdrbuf;
2175 if (c->hdrbuf)
2176 new_hdrbuf= realloc(c->hdrbuf,
2177 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2178 else
2179 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2180 if (! new_hdrbuf)
2181 return -1;
2182
2183 c->hdrbuf= (unsigned char *)new_hdrbuf;
2184 c->hdrsize= c->msgused * 2;
2185 }
2186
2187 /* If this is a multi-packet request, drop it. */
2188 if (c->udp && (c->msgused > 1))
2189 {
2190 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2191 return -1;
2192 }
2193
2194 hdr= c->hdrbuf;
2195 for (i= 0; i < c->msgused; i++)
2196 {
2197 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2198 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2199 *hdr++= (unsigned char)(c->request_id / 256);
2200 *hdr++= (unsigned char)(c->request_id % 256);
2201 *hdr++= (unsigned char)(i / 256);
2202 *hdr++= (unsigned char)(i % 256);
2203 *hdr++= (unsigned char)(c->msgused / 256);
2204 *hdr++= (unsigned char)(c->msgused % 256);
2205 *hdr++= (unsigned char)1; /* support facebook memcached */
2206 *hdr++= (unsigned char)0;
2207 assert(hdr ==
2208 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2209 + UDP_HEADER_SIZE));
2210 }
2211
2212 return 0;
2213 } /* ms_build_udp_headers */
2214
2215
2216 /**
2217 * Transmit the next chunk of data from our list of msgbuf structures.
2218 *
2219 * @param c, pointer of the concurrency
2220 *
2221 * @return TRANSMIT_COMPLETE All done writing.
2222 * TRANSMIT_INCOMPLETE More data remaining to write.
2223 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2224 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2225 */
2226 static int ms_transmit(ms_conn_t *c)
2227 {
2228 assert(c != NULL);
2229
2230 if ((c->msgcurr < c->msgused)
2231 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2232 {
2233 /* Finished writing the current msg; advance to the next. */
2234 c->msgcurr++;
2235 }
2236
2237 if (c->msgcurr < c->msgused)
2238 {
2239 ssize_t res;
2240 struct msghdr *m= &c->msglist[c->msgcurr];
2241
2242 res= sendmsg(c->sfd, m, 0);
2243 if (res > 0)
2244 {
2245 atomic_add_64(&ms_stats.bytes_written, res);
2246
2247 /* We've written some of the data. Remove the completed
2248 * iovec entries from the list of pending writes. */
2249 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2250 {
2251 res-= (ssize_t)m->msg_iov->iov_len;
2252 m->msg_iovlen--;
2253 m->msg_iov++;
2254 }
2255
2256 /* Might have written just part of the last iovec entry;
2257 * adjust it so the next write will do the rest. */
2258 if (res > 0)
2259 {
2260 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2261 m->msg_iov->iov_len-= (uint64_t)res;
2262 }
2263 return TRANSMIT_INCOMPLETE;
2264 }
2265 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2266 {
2267 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2268 {
2269 fprintf(stderr, "Couldn't update event.\n");
2270 ms_conn_set_state(c, conn_closing);
2271 return TRANSMIT_HARD_ERROR;
2272 }
2273 return TRANSMIT_SOFT_ERROR;
2274 }
2275
2276 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2277 * we have a real error, on which we close the connection */
2278 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2279
2280 ms_conn_set_state(c, conn_closing);
2281 return TRANSMIT_HARD_ERROR;
2282 }
2283 else
2284 {
2285 return TRANSMIT_COMPLETE;
2286 }
2287 } /* ms_transmit */
2288
2289
2290 /**
2291 * Shrinks a connection's buffers if they're too big. This prevents
2292 * periodic large "mget" response from server chewing lots of client
2293 * memory.
2294 *
2295 * This should only be called in between requests since it can wipe output
2296 * buffers!
2297 *
2298 * @param c, pointer of the concurrency
2299 */
2300 static void ms_conn_shrink(ms_conn_t *c)
2301 {
2302 assert(c != NULL);
2303
2304 if (c->udp)
2305 return;
2306
2307 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2308 {
2309 char *newbuf;
2310
2311 if (c->rcurr != c->rbuf)
2312 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2313
2314 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2315
2316 if (newbuf)
2317 {
2318 c->rbuf= newbuf;
2319 c->rsize= DATA_BUFFER_SIZE;
2320 }
2321 c->rcurr= c->rbuf;
2322 }
2323
2324 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2325 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2326 {
2327 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2328 if (! new_rbuf)
2329 {
2330 c->rudpbuf= new_rbuf;
2331 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2332 }
2333 /* TODO check error condition? */
2334 }
2335
2336 if (c->msgsize > MSG_LIST_HIGHWAT)
2337 {
2338 struct msghdr *newbuf= (struct msghdr *)realloc(
2339 (void *)c->msglist,
2340 MSG_LIST_INITIAL
2341 * sizeof(c->msglist[0]));
2342 if (newbuf)
2343 {
2344 c->msglist= newbuf;
2345 c->msgsize= MSG_LIST_INITIAL;
2346 }
2347 /* TODO check error condition? */
2348 }
2349
2350 if (c->iovsize > IOV_LIST_HIGHWAT)
2351 {
2352 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2353 IOV_LIST_INITIAL
2354 * sizeof(c->iov[0]));
2355 if (newbuf)
2356 {
2357 c->iov= newbuf;
2358 c->iovsize= IOV_LIST_INITIAL;
2359 }
2360 /* TODO check return value */
2361 }
2362 } /* ms_conn_shrink */
2363
2364
2365 /**
2366 * Sets a connection's current state in the state machine. Any special
2367 * processing that needs to happen on certain state transitions can
2368 * happen here.
2369 *
2370 * @param c, pointer of the concurrency
2371 * @param state, connection state
2372 */
2373 static void ms_conn_set_state(ms_conn_t *c, int state)
2374 {
2375 assert(c != NULL);
2376
2377 if (state != c->state)
2378 {
2379 if (state == conn_read)
2380 {
2381 ms_conn_shrink(c);
2382 }
2383 c->state= state;
2384 }
2385 } /* ms_conn_set_state */
2386
2387
2388 /**
2389 * update the event if socks change state. for example: when
2390 * change the listen scoket read event to sock write event, or
2391 * change socket handler, we could call this function.
2392 *
2393 * @param c, pointer of the concurrency
2394 * @param new_flags, new event flags
2395 *
2396 * @return bool, if success, return true, else return false
2397 */
2398 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2399 {
2400 /* default event timeout 10 seconds */
2401 struct timeval t=
2402 {
2403 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
2404 };
2405
2406 assert(c != NULL);
2407
2408 struct event_base *base= c->event.ev_base;
2409 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2410 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2411 {
2412 return true;
2413 }
2414
2415 if (event_del(&c->event) == -1)
2416 {
2417 /* try to delete the event again */
2418 if (event_del(&c->event) == -1)
2419 {
2420 return false;
2421 }
2422 }
2423
2424 event_set(&c->event,
2425 c->sfd,
2426 (short)new_flags,
2427 ms_event_handler,
2428 (void *)c);
2429 event_base_set(base, &c->event);
2430 c->ev_flags= (short)new_flags;
2431
2432 if (c->total_sfds == 1)
2433 {
2434 if (event_add(&c->event, NULL) == -1)
2435 {
2436 return false;
2437 }
2438 }
2439 else
2440 {
2441 if (event_add(&c->event, &t) == -1)
2442 {
2443 return false;
2444 }
2445 }
2446
2447 return true;
2448 } /* ms_update_event */
2449
2450
2451 /**
2452 * If user want to get the expected throughput, we could limit
2453 * the performance of memslap. we could give up some work and
2454 * just wait a short time. The function is used to check this
2455 * case.
2456 *
2457 * @param c, pointer of the concurrency
2458 *
2459 * @return bool, if success, return true, else return false
2460 */
2461 static bool ms_need_yield(ms_conn_t *c)
2462 {
2463 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2464 int64_t tps= 0;
2465 int64_t time_diff= 0;
2466 struct timeval curr_time;
2467 ms_task_t *task= &c->curr_task;
2468
2469 if (ms_setting.expected_tps > 0)
2470 {
2471 gettimeofday(&curr_time, NULL);
2472 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2473 tps=
2474 (int64_t)((task->get_opt
2475 + task->set_opt) / ((uint64_t)time_diff / 1000000));
2476
2477 /* current throughput is greater than expected throughput */
2478 if (tps > ms_thread->thread_ctx->tps_perconn)
2479 {
2480 return true;
2481 }
2482 }
2483
2484 return false;
2485 } /* ms_need_yield */
2486
2487
2488 /**
2489 * used to update the start time of each operation
2490 *
2491 * @param c, pointer of the concurrency
2492 */
2493 static void ms_update_start_time(ms_conn_t *c)
2494 {
2495 ms_task_item_t *item= c->curr_task.item;
2496
2497 if ((ms_setting.stat_freq > 0) || c->udp
2498 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2499 {
2500 gettimeofday(&c->start_time, NULL);
2501 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2502 {
2503 /* record the current time */
2504 item->client_time= c->start_time.tv_sec;
2505 }
2506 }
2507 } /* ms_update_start_time */
2508
2509
2510 /**
2511 * run the state machine
2512 *
2513 * @param c, pointer of the concurrency
2514 */
2515 static void ms_drive_machine(ms_conn_t *c)
2516 {
2517 bool stop= false;
2518
2519 assert(c != NULL);
2520
2521 while (! stop)
2522 {
2523 switch (c->state)
2524 {
2525 case conn_read:
2526 if (c->readval)
2527 {
2528 if (c->rbytes >= c->rvbytes)
2529 {
2530 ms_complete_nread(c);
2531 break;
2532 }
2533 }
2534 else
2535 {
2536 if (ms_try_read_line(c) != 0)
2537 {
2538 break;
2539 }
2540 }
2541
2542 if (ms_try_read_network(c) != 0)
2543 {
2544 break;
2545 }
2546
2547 /* doesn't read all the response data, wait event wake up */
2548 if (! c->currcmd.isfinish)
2549 {
2550 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2551 {
2552 fprintf(stderr, "Couldn't update event.\n");
2553 ms_conn_set_state(c, conn_closing);
2554 break;
2555 }
2556 stop= true;
2557 break;
2558 }
2559
2560 /* we have no command line and no data to read from network, next write */
2561 ms_conn_set_state(c, conn_write);
2562 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2563
2564 break;
2565
2566 case conn_write:
2567 if (! c->ctnwrite && ms_need_yield(c))
2568 {
2569 usleep(10);
2570
2571 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2572 {
2573 fprintf(stderr, "Couldn't update event.\n");
2574 ms_conn_set_state(c, conn_closing);
2575 break;
2576 }
2577 stop= true;
2578 break;
2579 }
2580
2581 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2582 {
2583 ms_conn_set_state(c, conn_closing);
2584 break;
2585 }
2586
2587 /* record the start time before starting to send data if necessary */
2588 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2589 {
2590 if (c->change_sfd)
2591 {
2592 c->change_sfd= false;
2593 }
2594 ms_update_start_time(c);
2595 }
2596
2597 /* change sfd if necessary */
2598 if (c->change_sfd)
2599 {
2600 c->ctnwrite= true;
2601 stop= true;
2602 break;
2603 }
2604
2605 /* execute task until nothing need be written to network */
2606 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2607 {
2608 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2609 {
2610 fprintf(stderr, "Couldn't update event.\n");
2611 ms_conn_set_state(c, conn_closing);
2612 break;
2613 }
2614 stop= true;
2615 break;
2616 }
2617
2618 switch (ms_transmit(c))
2619 {
2620 case TRANSMIT_COMPLETE:
2621 /* we have no data to write to network, next wait repose */
2622 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2623 {
2624 fprintf(stderr, "Couldn't update event.\n");
2625 ms_conn_set_state(c, conn_closing);
2626 c->ctnwrite= false;
2627 break;
2628 }
2629 ms_conn_set_state(c, conn_read);
2630 c->ctnwrite= false;
2631 stop= true;
2632 break;
2633
2634 case TRANSMIT_INCOMPLETE:
2635 c->ctnwrite= true;
2636 break; /* Continue in state machine. */
2637
2638 case TRANSMIT_HARD_ERROR:
2639 c->ctnwrite= false;
2640 break;
2641
2642 case TRANSMIT_SOFT_ERROR:
2643 c->ctnwrite= true;
2644 stop= true;
2645 break;
2646
2647 default:
2648 break;
2649 } /* switch */
2650
2651 break;
2652
2653 case conn_closing:
2654 /* recovery mode, need reconnect if connection close */
2655 if (ms_setting.reconnect && (! ms_global.time_out
2656 || ((ms_setting.run_time == 0)
2657 && (c->remain_exec_num > 0))))
2658 {
2659 if (ms_reconn(c) != 0)
2660 {
2661 ms_conn_close(c);
2662 stop= true;
2663 break;
2664 }
2665
2666 ms_reset_conn(c, false);
2667
2668 if (c->total_sfds == 1)
2669 {
2670 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2671 {
2672 fprintf(stderr, "Couldn't update event.\n");
2673 ms_conn_set_state(c, conn_closing);
2674 break;
2675 }
2676 }
2677
2678 break;
2679 }
2680 else
2681 {
2682 ms_conn_close(c);
2683 stop= true;
2684 break;
2685 }
2686
2687 default:
2688 assert(0);
2689 } /* switch */
2690 }
2691 } /* ms_drive_machine */
2692
2693
2694 /**
2695 * the event handler of each thread
2696 *
2697 * @param fd, the file descriptor of socket
2698 * @param which, event flag
2699 * @param arg, argument
2700 */
2701 void ms_event_handler(const int fd, const short which, void *arg)
2702 {
2703 ms_conn_t *c= (ms_conn_t *)arg;
2704
2705 assert(c != NULL);
2706
2707 c->which= which;
2708
2709 /* sanity */
2710 if (fd != c->sfd)
2711 {
2712 fprintf(stderr,
2713 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2714 fd,
2715 c->sfd);
2716 ms_conn_close(c);
2717 exit(1);
2718 }
2719 assert(fd == c->sfd);
2720
2721 /* event timeout, close the current connection */
2722 if (c->which == EV_TIMEOUT)
2723 {
2724 ms_conn_set_state(c, conn_closing);
2725 }
2726
2727 ms_drive_machine(c);
2728
2729 /* wait for next event */
2730 } /* ms_event_handler */
2731
2732
2733 /**
2734 * get the next socket descriptor index to run for replication
2735 *
2736 * @param c, pointer of the concurrency
2737 * @param cmd, command(get or set )
2738 *
2739 * @return int, if success, return the index, else return 0
2740 */
2741 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2742 {
2743 int sock_index= -1;
2744 int i= 0;
2745
2746 if (c->total_sfds == 1)
2747 {
2748 return 0;
2749 }
2750
2751 if (ms_setting.rep_write_srv == 0)
2752 {
2753 return sock_index;
2754 }
2755
2756 do
2757 {
2758 if (cmd == CMD_SET)
2759 {
2760 for (i= 0; i < ms_setting.rep_write_srv; i++)
2761 {
2762 if (c->tcpsfd[i] > 0)
2763 {
2764 break;
2765 }
2766 }
2767
2768 if (i == ms_setting.rep_write_srv)
2769 {
2770 /* random get one replication server to read */
2771 sock_index= (int)(random() % c->total_sfds);
2772 }
2773 else
2774 {
2775 /* random get one replication writing server to write */
2776 sock_index= (int)(random() % ms_setting.rep_write_srv);
2777 }
2778 }
2779 else if (cmd == CMD_GET)
2780 {
2781 /* random get one replication server to read */
2782 sock_index= (int)(random() % c->total_sfds);
2783 }
2784 }
2785 while (c->tcpsfd[sock_index] == 0);
2786
2787 return sock_index;
2788 } /* ms_get_rep_sock_index */
2789
2790
2791 /**
2792 * get the next socket descriptor index to run
2793 *
2794 * @param c, pointer of the concurrency
2795 *
2796 * @return int, return the index
2797 */
2798 static int ms_get_next_sock_index(ms_conn_t *c)
2799 {
2800 int sock_index= 0;
2801
2802 do
2803 {
2804 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2805 }
2806 while (c->tcpsfd[sock_index] == 0);
2807
2808 return sock_index;
2809 } /* ms_get_next_sock_index */
2810
2811
2812 /**
2813 * update socket event of the connections
2814 *
2815 * @param c, pointer of the concurrency
2816 *
2817 * @return int, if success, return 0, else return -1
2818 */
2819 static int ms_update_conn_sock_event(ms_conn_t *c)
2820 {
2821 assert(c != NULL);
2822
2823 switch (c->currcmd.cmd)
2824 {
2825 case CMD_SET:
2826 if (ms_setting.facebook_test && c->udp)
2827 {
2828 c->sfd= c->tcpsfd[0];
2829 c->udp= false;
2830 c->change_sfd= true;
2831 }
2832 break;
2833
2834 case CMD_GET:
2835 if (ms_setting.facebook_test && ! c->udp)
2836 {
2837 c->sfd= c->udpsfd;
2838 c->udp= true;
2839 c->change_sfd= true;
2840 }
2841 break;
2842
2843 default:
2844 break;
2845 } /* switch */
2846
2847 if (! c->udp && (c->total_sfds > 1))
2848 {
2849 if (c->cur_idx != c->total_sfds)
2850 {
2851 if (ms_setting.rep_write_srv == 0)
2852 {
2853 c->cur_idx= ms_get_next_sock_index(c);
2854 }
2855 else
2856 {
2857 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2858 }
2859 }
2860 else
2861 {
2862 /* must select the first sock of the connection at the beginning */
2863 c->cur_idx= 0;
2864 }
2865
2866 c->sfd= c->tcpsfd[c->cur_idx];
2867 assert(c->sfd != 0);
2868 c->change_sfd= true;
2869 }
2870
2871 if (c->change_sfd)
2872 {
2873 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2874 {
2875 fprintf(stderr, "Couldn't update event.\n");
2876 ms_conn_set_state(c, conn_closing);
2877 return -1;
2878 }
2879 }
2880
2881 return 0;
2882 } /* ms_update_conn_sock_event */
2883
2884
2885 /**
2886 * for ASCII protocol, this function build the set command
2887 * string and send the command.
2888 *
2889 * @param c, pointer of the concurrency
2890 * @param item, pointer of task item which includes the object
2891 * information
2892 *
2893 * @return int, if success, return 0, else return -1
2894 */
2895 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2896 {
2897 int value_offset;
2898 int write_len;
2899 char *buffer= c->wbuf;
2900
2901 write_len= sprintf(buffer,
2902 " %u %d %d\r\n",
2903 0,
2904 item->exp_time,
2905 item->value_size);
2906
2907 if (write_len > c->wsize)
2908 {
2909 /* ought to be always enough. just fail for simplicity */
2910 fprintf(stderr, "output command line too long.\n");
2911 return -1;
2912 }
2913
2914 if (item->value_offset == INVALID_OFFSET)
2915 {
2916 value_offset= item->key_suffix_offset;
2917 }
2918 else
2919 {
2920 value_offset= item->value_offset;
2921 }
2922
2923 if ((ms_add_iov(c, "set ", 4) != 0)
2924 || (ms_add_iov(c, (char *)&item->key_prefix,
2925 (int)KEY_PREFIX_SIZE) != 0)
2926 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2927 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2928 || (ms_add_iov(c, buffer, write_len) != 0)
2929 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2930 item->value_size) != 0)
2931 || (ms_add_iov(c, "\r\n", 2) != 0)
2932 || (c->udp && (ms_build_udp_headers(c) != 0)))
2933 {
2934 return -1;
2935 }
2936
2937 return 0;
2938 } /* ms_build_ascii_write_buf_set */
2939
2940
2941 /**
2942 * used to send set command to server
2943 *
2944 * @param c, pointer of the concurrency
2945 * @param item, pointer of task item which includes the object
2946 * information
2947 *
2948 * @return int, if success, return 0, else return -1
2949 */
2950 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2951 {
2952 assert(c != NULL);
2953
2954 c->currcmd.cmd= CMD_SET;
2955 c->currcmd.isfinish= false;
2956 c->currcmd.retstat= MCD_FAILURE;
2957
2958 if (ms_update_conn_sock_event(c) != 0)
2959 {
2960 return -1;
2961 }
2962
2963 c->msgcurr= 0;
2964 c->msgused= 0;
2965 c->iovused= 0;
2966 if (ms_add_msghdr(c) != 0)
2967 {
2968 fprintf(stderr, "Out of memory preparing request.");
2969 return -1;
2970 }
2971
2972 /* binary protocol */
2973 if (c->protocol == binary_prot)
2974 {
2975 if (ms_build_bin_write_buf_set(c, item) != 0)
2976 {
2977 return -1;
2978 }
2979 }
2980 else
2981 {
2982 if (ms_build_ascii_write_buf_set(c, item) != 0)
2983 {
2984 return -1;
2985 }
2986 }
2987
2988 atomic_add_64(&ms_stats.obj_bytes,
2989 item->key_size + item->value_size);
2990 atomic_add_64(&ms_stats.cmd_set, 1);
2991
2992 return 0;
2993 } /* ms_mcd_set */
2994
2995
2996 /**
2997 * for ASCII protocol, this function build the get command
2998 * string and send the command.
2999 *
3000 * @param c, pointer of the concurrency
3001 * @param item, pointer of task item which includes the object
3002 * information
3003 *
3004 * @return int, if success, return 0, else return -1
3005 */
3006 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3007 {
3008 if ((ms_add_iov(c, "get ", 4) != 0)
3009 || (ms_add_iov(c, (char *)&item->key_prefix,
3010 (int)KEY_PREFIX_SIZE) != 0)
3011 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3012 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3013 || (ms_add_iov(c, "\r\n", 2) != 0)
3014 || (c->udp && (ms_build_udp_headers(c) != 0)))
3015 {
3016 return -1;
3017 }
3018
3019 return 0;
3020 } /* ms_build_ascii_write_buf_get */
3021
3022
3023 /**
3024 * used to send the get command to server
3025 *
3026 * @param c, pointer of the concurrency
3027 * @param item, pointer of task item which includes the object
3028 * information
3029 * @param verify, whether do verification
3030 *
3031 * @return int, if success, return 0, else return -1
3032 */
3033 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
3034 {
3035 /* verify not supported yet */
3036 UNUSED_ARGUMENT(verify);
3037
3038 assert(c != NULL);
3039
3040 c->currcmd.cmd= CMD_GET;
3041 c->currcmd.isfinish= false;
3042 c->currcmd.retstat= MCD_FAILURE;
3043
3044 if (ms_update_conn_sock_event(c) != 0)
3045 {
3046 return -1;
3047 }
3048
3049 c->msgcurr= 0;
3050 c->msgused= 0;
3051 c->iovused= 0;
3052 if (ms_add_msghdr(c) != 0)
3053 {
3054 fprintf(stderr, "Out of memory preparing request.");
3055 return -1;
3056 }
3057
3058 /* binary protocol */
3059 if (c->protocol == binary_prot)
3060 {
3061 if (ms_build_bin_write_buf_get(c, item) != 0)
3062 {
3063 return -1;
3064 }
3065 }
3066 else
3067 {
3068 if (ms_build_ascii_write_buf_get(c, item) != 0)
3069 {
3070 return -1;
3071 }
3072 }
3073
3074 atomic_add_64(&ms_stats.cmd_get, 1);
3075
3076 return 0;
3077 } /* ms_mcd_get */
3078
3079
3080 /**
3081 * for ASCII protocol, this function build the multi-get command
3082 * string and send the command.
3083 *
3084 * @param c, pointer of the concurrency
3085 *
3086 * @return int, if success, return 0, else return -1
3087 */
3088 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3089 {
3090 ms_task_item_t *item;
3091
3092 if (ms_add_iov(c, "get", 3) != 0)
3093 {
3094 return -1;
3095 }
3096
3097 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3098 {
3099 item= c->mlget_task.mlget_item[i].item;
3100 assert(item != NULL);
3101 if ((ms_add_iov(c, " ", 1) != 0)
3102 || (ms_add_iov(c, (char *)&item->key_prefix,
3103 (int)KEY_PREFIX_SIZE) != 0)
3104 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3105 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3106 {
3107 return -1;
3108 }
3109 }
3110
3111 if ((ms_add_iov(c, "\r\n", 2) != 0)
3112 || (c->udp && (ms_build_udp_headers(c) != 0)))
3113 {
3114 return -1;
3115 }
3116
3117 return 0;
3118 } /* ms_build_ascii_write_buf_mlget */
3119
3120
3121 /**
3122 * used to send the multi-get command to server
3123 *
3124 * @param c, pointer of the concurrency
3125 *
3126 * @return int, if success, return 0, else return -1
3127 */
3128 int ms_mcd_mlget(ms_conn_t *c)
3129 {
3130 ms_task_item_t *item;
3131
3132 assert(c != NULL);
3133 assert(c->mlget_task.mlget_num >= 1);
3134
3135 c->currcmd.cmd= CMD_GET;
3136 c->currcmd.isfinish= false;
3137 c->currcmd.retstat= MCD_FAILURE;
3138
3139 if (ms_update_conn_sock_event(c) != 0)
3140 {
3141 return -1;
3142 }
3143
3144 c->msgcurr= 0;
3145 c->msgused= 0;
3146 c->iovused= 0;
3147 if (ms_add_msghdr(c) != 0)
3148 {
3149 fprintf(stderr, "Out of memory preparing request.");
3150 return -1;
3151 }
3152
3153 /* binary protocol */
3154 if (c->protocol == binary_prot)
3155 {
3156 if (ms_build_bin_write_buf_mlget(c) != 0)
3157 {
3158 return -1;
3159 }
3160 }
3161 else
3162 {
3163 if (ms_build_ascii_write_buf_mlget(c) != 0)
3164 {
3165 return -1;
3166 }
3167 }
3168
3169 /* decrease operation time of each item */
3170 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3171 {
3172 item= c->mlget_task.mlget_item[i].item;
3173 atomic_add_64(&ms_stats.cmd_get, 1);
3174 }
3175
3176 return 0;
3177 } /* ms_mcd_mlget */
3178
3179
3180 /**
3181 * binary protocol support
3182 */
3183
3184 /**
3185 * for binary protocol, parse the response of server
3186 *
3187 * @param c, pointer of the concurrency
3188 *
3189 * @return int, if success, return 0, else return -1
3190 */
3191 static int ms_bin_process_response(ms_conn_t *c)
3192 {
3193 const char *errstr= NULL;
3194
3195 assert(c != NULL);
3196
3197 uint32_t bodylen= c->binary_header.response.bodylen;
3198 uint8_t opcode= c->binary_header.response.opcode;
3199 uint16_t status= c->binary_header.response.status;
3200
3201 if (bodylen > 0)
3202 {
3203 c->rvbytes= (int32_t)bodylen;
3204 c->readval= true;
3205 return 1;
3206 }
3207 else
3208 {
3209 switch (status)
3210 {
3211 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3212 if (opcode == PROTOCOL_BINARY_CMD_SET)
3213 {
3214 c->currcmd.retstat= MCD_STORED;
3215 }
3216 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3217 {
3218 c->currcmd.retstat= MCD_DELETED;
3219 }
3220 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3221 {
3222 c->currcmd.retstat= MCD_END;
3223 }
3224 break;
3225
3226 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3227 errstr= "Out of memory";
3228 c->currcmd.retstat= MCD_SERVER_ERROR;
3229 break;
3230
3231 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3232 errstr= "Unknown command";
3233 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3234 break;
3235
3236 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3237 errstr= "Not found";
3238 c->currcmd.retstat= MCD_NOTFOUND;
3239 break;
3240
3241 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3242 errstr= "Invalid arguments";
3243 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3244 break;
3245
3246 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3247 errstr= "Data exists for key.";
3248 break;
3249
3250 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3251 errstr= "Too large.";
3252 c->currcmd.retstat= MCD_SERVER_ERROR;
3253 break;
3254
3255 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3256 errstr= "Not stored.";
3257 c->currcmd.retstat= MCD_NOTSTORED;
3258 break;
3259
3260 default:
3261 errstr= "Unknown error";
3262 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3263 break;
3264 } /* switch */
3265
3266 if (errstr != NULL)
3267 {
3268 fprintf(stderr, "%s\n", errstr);
3269 }
3270 }
3271
3272 return 0;
3273 } /* ms_bin_process_response */
3274
3275
3276 /* build binary header and add the header to the buffer to send */
3277
3278 /**
3279 * build binary header and add the header to the buffer to send
3280 *
3281 * @param c, pointer of the concurrency
3282 * @param opcode, operation code
3283 * @param hdr_len, length of header
3284 * @param key_len, length of key
3285 * @param body_len. length of body
3286 */
3287 static void ms_add_bin_header(ms_conn_t *c,
3288 uint8_t opcode,
3289 uint8_t hdr_len,
3290 uint16_t key_len,
3291 uint32_t body_len)
3292 {
3293 protocol_binary_request_header *header;
3294
3295 assert(c != NULL);
3296
3297 header= (protocol_binary_request_header *)c->wcurr;
3298
3299 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3300 header->request.opcode= (uint8_t)opcode;
3301 header->request.keylen= htonl(key_len);
3302
3303 header->request.extlen= (uint8_t)hdr_len;
3304 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3305 header->request.reserved= 0;
3306
3307 header->request.bodylen= htonl(body_len);
3308 header->request.opaque= 0;
3309 header->request.cas= 0;
3310
3311 ms_add_iov(c, c->wcurr, sizeof(header->request));
3312 } /* ms_add_bin_header */
3313
3314
3315 /**
3316 * add the key to the socket write buffer array
3317 *
3318 * @param c, pointer of the concurrency
3319 * @param item, pointer of task item which includes the object
3320 * information
3321 */
3322 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3323 {
3324 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3325 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3326 item->key_size - (int)KEY_PREFIX_SIZE);
3327 }
3328
3329
3330 /**
3331 * for binary protocol, this function build the set command
3332 * and add the command to send buffer array.
3333 *
3334 * @param c, pointer of the concurrency
3335 * @param item, pointer of task item which includes the object
3336 * information
3337 *
3338 * @return int, if success, return 0, else return -1
3339 */
3340 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3341 {
3342 assert(c->wbuf == c->wcurr);
3343
3344 int value_offset;
3345 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3346 uint16_t keylen= (uint16_t)item->key_size;
3347 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3348 + (uint32_t)keylen + (uint32_t)item->value_size;
3349
3350 ms_add_bin_header(c,
3351 PROTOCOL_BINARY_CMD_SET,
3352 sizeof(rep->message.body),
3353 keylen,
3354 bodylen);
3355 rep->message.body.flags= 0;
3356 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3357 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3358 ms_add_key_to_iov(c, item);
3359
3360 if (item->value_offset == INVALID_OFFSET)
3361 {
3362 value_offset= item->key_suffix_offset;
3363 }
3364 else
3365 {
3366 value_offset= item->value_offset;
3367 }
3368 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3369
3370 return 0;
3371 } /* ms_build_bin_write_buf_set */
3372
3373
3374 /**
3375 * for binary protocol, this function build the get command and
3376 * add the command to send buffer array.
3377 *
3378 * @param c, pointer of the concurrency
3379 * @param item, pointer of task item which includes the object
3380 * information
3381 *
3382 * @return int, if success, return 0, else return -1
3383 */
3384 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3385 {
3386 assert(c->wbuf == c->wcurr);
3387
3388 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3389 (uint32_t)item->key_size);
3390 ms_add_key_to_iov(c, item);
3391
3392 return 0;
3393 } /* ms_build_bin_write_buf_get */
3394
3395
3396 /**
3397 * for binary protocol, this function build the multi-get
3398 * command and add the command to send buffer array.
3399 *
3400 * @param c, pointer of the concurrency
3401 * @param item, pointer of task item which includes the object
3402 * information
3403 *
3404 * @return int, if success, return 0, else return -1
3405 */
3406 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3407 {
3408 ms_task_item_t *item;
3409
3410 assert(c->wbuf == c->wcurr);
3411
3412 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3413 {
3414 item= c->mlget_task.mlget_item[i].item;
3415 assert(item != NULL);
3416
3417 ms_add_bin_header(c,
3418 PROTOCOL_BINARY_CMD_GET,
3419 0,
3420 (uint16_t)item->key_size,
3421 (uint32_t)item->key_size);
3422 ms_add_key_to_iov(c, item);
3423 c->wcurr+= sizeof(protocol_binary_request_get);
3424 }
3425
3426 c->wcurr= c->wbuf;
3427
3428 return 0;
3429 } /* ms_build_bin_write_buf_mlget */