.gitignore: cleanup
[awesomized/libmemcached] / src / bin / memaslap / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "mem_config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <netinet/in.h>
22
23 #if defined(HAVE_ARPA_INET_H)
24 # include <arpa/inet.h>
25 #endif
26
27 #if defined(HAVE_SYS_TIME_H)
28 # include <sys/time.h>
29 #endif
30
31 #if defined(HAVE_TIME_H)
32 # include <time.h>
33 #endif
34
35 #include "ms_setting.h"
36 #include "ms_thread.h"
37 #include "ms_atomic.h"
38
39 #ifdef linux
40 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
41 * optimize the conversion functions, but the prototypes generate warnings
42 * from gcc. The conversion methods isn't the bottleneck for my app, so
43 * just remove the warnings by undef'ing the optimization ..
44 */
45 #undef ntohs
46 #undef ntohl
47 #undef htons
48 #undef htonl
49 #endif
50
51 /* for network write */
52 #define TRANSMIT_COMPLETE 0
53 #define TRANSMIT_INCOMPLETE 1
54 #define TRANSMIT_SOFT_ERROR 2
55 #define TRANSMIT_HARD_ERROR 3
56
57 /* for generating key */
58 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
59 #define KEY_PREFIX_MASK 0x1010101010101010
60
61 /* For parse the value length return by server */
62 #define KEY_TOKEN 1
63 #define VALUELEN_TOKEN 3
64
65 /* global increasing counter, to ensure the key prefix unique */
66 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
67
68 /* global increasing counter, generating request id for UDP */
69 static ATOMIC uint32_t udp_request_id= 0;
70
71 extern pthread_key_t ms_thread_key;
72
73 /* generate upd request id */
74 static uint32_t ms_get_udp_request_id(void);
75
76
77 /* connect initialize */
78 static void ms_task_init(ms_conn_t *c);
79 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
80 static int ms_conn_sock_init(ms_conn_t *c);
81 static int ms_conn_event_init(ms_conn_t *c);
82 static int ms_conn_init(ms_conn_t *c,
83 const int init_state,
84 const int read_buffer_size,
85 const bool is_udp);
86 static void ms_warmup_num_init(ms_conn_t *c);
87 static int ms_item_win_init(ms_conn_t *c);
88
89
90 /* connection close */
91 void ms_conn_free(ms_conn_t *c);
92 static void ms_conn_close(ms_conn_t *c);
93
94
95 /* create network connection */
96 static int ms_new_socket(struct addrinfo *ai);
97 static void ms_maximize_sndbuf(const int sfd);
98 static int ms_network_connect(ms_conn_t *c,
99 char *srv_host_name,
100 const int srv_port,
101 const bool is_udp,
102 int *ret_sfd);
103 static int ms_reconn(ms_conn_t *c);
104
105
106 /* read and parse */
107 static int ms_tokenize_command(char *command,
108 token_t *tokens,
109 const int max_tokens);
110 static int ms_ascii_process_line(ms_conn_t *c, char *command);
111 static int ms_try_read_line(ms_conn_t *c);
112 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
113 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
114 static int ms_try_read_network(ms_conn_t *c);
115 static void ms_verify_value(ms_conn_t *c,
116 ms_mlget_task_item_t *mlget_item,
117 char *value,
118 int vlen);
119 static void ms_ascii_complete_nread(ms_conn_t *c);
120 static void ms_bin_complete_nread(ms_conn_t *c);
121 static void ms_complete_nread(ms_conn_t *c);
122
123
124 /* send functions */
125 static int ms_add_msghdr(ms_conn_t *c);
126 static int ms_ensure_iov_space(ms_conn_t *c);
127 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
128 static int ms_build_udp_headers(ms_conn_t *c);
129 static int ms_transmit(ms_conn_t *c);
130
131
132 /* status adjustment */
133 static void ms_conn_shrink(ms_conn_t *c);
134 static void ms_conn_set_state(ms_conn_t *c, int state);
135 static bool ms_update_event(ms_conn_t *c, const int new_flags);
136 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd);
137 static uint32_t ms_get_next_sock_index(ms_conn_t *c);
138 static int ms_update_conn_sock_event(ms_conn_t *c);
139 static bool ms_need_yield(ms_conn_t *c);
140 static void ms_update_start_time(ms_conn_t *c);
141
142
143 /* main loop */
144 static void ms_drive_machine(ms_conn_t *c);
145 void ms_event_handler(const int fd, const short which, void *arg);
146
147
148 /* ascii protocol */
149 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
150 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
151 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
152
153
154 /* binary protocol */
155 static int ms_bin_process_response(ms_conn_t *c);
156 static void ms_add_bin_header(ms_conn_t *c,
157 uint8_t opcode,
158 uint8_t hdr_len,
159 uint16_t key_len,
160 uint32_t body_len);
161 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
162 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
163 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
164 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
165
166
167 /**
168 * each key has two parts, prefix and suffix. The suffix is a
169 * string random get form the character table. The prefix is a
170 * uint64_t variable. And the prefix must be unique. we use the
171 * prefix to identify a key. And the prefix can't include
172 * character ' ' '\r' '\n' '\0'.
173 *
174 * @return uint64_t
175 */
176 uint64_t ms_get_key_prefix(void)
177 {
178 uint64_t key_prefix;
179
180 pthread_mutex_lock(&ms_global.seq_mutex);
181 key_prefix_seq|= KEY_PREFIX_MASK;
182 key_prefix= key_prefix_seq;
183 key_prefix_seq++;
184 pthread_mutex_unlock(&ms_global.seq_mutex);
185
186 return key_prefix;
187 } /* ms_get_key_prefix */
188
189
190 /**
191 * get an unique udp request id
192 *
193 * @return an unique UDP request id
194 */
195 static uint32_t ms_get_udp_request_id(void)
196 {
197 return atomic_add_32_nv(&udp_request_id, 1);
198 }
199
200
201 /**
202 * initialize current task structure
203 *
204 * @param c, pointer of the concurrency
205 */
206 static void ms_task_init(ms_conn_t *c)
207 {
208 c->curr_task.cmd= CMD_NULL;
209 c->curr_task.item= 0;
210 c->curr_task.verify= false;
211 c->curr_task.finish_verify= true;
212 c->curr_task.get_miss= true;
213
214 c->curr_task.get_opt= 0;
215 c->curr_task.set_opt= 0;
216 c->curr_task.cycle_undo_get= 0;
217 c->curr_task.cycle_undo_set= 0;
218 c->curr_task.verified_get= 0;
219 c->curr_task.overwrite_set= 0;
220 } /* ms_task_init */
221
222
223 /**
224 * initialize udp for the connection structure
225 *
226 * @param c, pointer of the concurrency
227 * @param is_udp, whether it's udp
228 *
229 * @return int, if success, return EXIT_SUCCESS, else return -1
230 */
231 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
232 {
233 c->hdrbuf= 0;
234 c->rudpbuf= 0;
235 c->udppkt= 0;
236
237 c->rudpsize= UDP_DATA_BUFFER_SIZE;
238 c->hdrsize= 0;
239
240 c->rudpbytes= 0;
241 c->packets= 0;
242 c->recvpkt= 0;
243 c->pktcurr= 0;
244 c->ordcurr= 0;
245
246 c->udp= is_udp;
247
248 if (c->udp || (! c->udp && ms_setting.facebook_test))
249 {
250 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
251 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
252
253 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
254 {
255 if (c->rudpbuf != NULL)
256 free(c->rudpbuf);
257 if (c->udppkt != NULL)
258 free(c->udppkt);
259 fprintf(stderr, "malloc()\n");
260 return -1;
261 }
262 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
263 }
264
265 return EXIT_SUCCESS;
266 } /* ms_conn_udp_init */
267
268
269 /**
270 * initialize the connection structure
271 *
272 * @param c, pointer of the concurrency
273 * @param init_state, (conn_read, conn_write, conn_closing)
274 * @param read_buffer_size
275 * @param is_udp, whether it's udp
276 *
277 * @return int, if success, return EXIT_SUCCESS, else return -1
278 */
279 static int ms_conn_init(ms_conn_t *c,
280 const int init_state,
281 const int read_buffer_size,
282 const bool is_udp)
283 {
284 assert(c != NULL);
285
286 c->rbuf= c->wbuf= 0;
287 c->iov= 0;
288 c->msglist= 0;
289
290 c->rsize= read_buffer_size;
291 c->wsize= WRITE_BUFFER_SIZE;
292 c->iovsize= IOV_LIST_INITIAL;
293 c->msgsize= MSG_LIST_INITIAL;
294
295 /* for replication, each connection need connect all the server */
296 if (ms_setting.rep_write_srv > 0)
297 {
298 c->total_sfds= ms_setting.srv_cnt * ms_setting.sock_per_conn;
299 }
300 else
301 {
302 c->total_sfds= ms_setting.sock_per_conn;
303 }
304 c->alive_sfds= 0;
305
306 c->rbuf= (char *)malloc((size_t)c->rsize);
307 c->wbuf= (char *)malloc((size_t)c->wsize);
308 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
309 c->msglist= (struct msghdr *)malloc(
310 sizeof(struct msghdr) * (size_t)c->msgsize);
311 if (ms_setting.mult_key_num > 1)
312 {
313 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
314 malloc(
315 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
316 }
317 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
318
319 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
320 || (c->msglist == NULL) || (c->tcpsfd == NULL)
321 || ((ms_setting.mult_key_num > 1)
322 && (c->mlget_task.mlget_item == NULL)))
323 {
324 if (c->rbuf != NULL)
325 free(c->rbuf);
326 if (c->wbuf != NULL)
327 free(c->wbuf);
328 if (c->iov != NULL)
329 free(c->iov);
330 if (c->msglist != NULL)
331 free(c->msglist);
332 if (c->mlget_task.mlget_item != NULL)
333 free(c->mlget_task.mlget_item);
334 if (c->tcpsfd != NULL)
335 free(c->tcpsfd);
336 fprintf(stderr, "malloc()\n");
337 return -1;
338 }
339
340 c->state= init_state;
341 c->rvbytes= 0;
342 c->rbytes= 0;
343 c->rcurr= c->rbuf;
344 c->wcurr= c->wbuf;
345 c->iovused= 0;
346 c->msgcurr= 0;
347 c->msgused= 0;
348 c->cur_idx= c->total_sfds; /* default index is a invalid value */
349
350 c->ctnwrite= false;
351 c->readval= false;
352 c->change_sfd= false;
353
354 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
355 c->precmd.isfinish= true; /* default the previous command finished */
356 c->currcmd.isfinish= false;
357 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
358 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
359
360 c->mlget_task.mlget_num= 0;
361 c->mlget_task.value_index= -1; /* default invalid value */
362
363 if (ms_setting.binary_prot_)
364 {
365 c->protocol= binary_prot;
366 }
367 else
368 {
369 c->protocol= ascii_prot;
370 }
371
372 /* initialize udp */
373 if (ms_conn_udp_init(c, is_udp) != 0)
374 {
375 return -1;
376 }
377
378 /* initialize task */
379 ms_task_init(c);
380
381 if (! (ms_setting.facebook_test && is_udp))
382 {
383 atomic_add_32(&ms_stats.active_conns, 1);
384 }
385
386 return EXIT_SUCCESS;
387 } /* ms_conn_init */
388
389
390 /**
391 * when doing 100% get operation, it could preset some objects
392 * to warmup the server. this function is used to initialize the
393 * number of the objects to preset.
394 *
395 * @param c, pointer of the concurrency
396 */
397 static void ms_warmup_num_init(ms_conn_t *c)
398 {
399 /* no set operation, preset all the items in the window */
400 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
401 {
402 c->warmup_num= c->win_size;
403 c->remain_warmup_num= c->warmup_num;
404 }
405 else
406 {
407 c->warmup_num= 0;
408 c->remain_warmup_num= c->warmup_num;
409 }
410 } /* ms_warmup_num_init */
411
412
413 /**
414 * each connection has an item window, this function initialize
415 * the window. The window is used to generate task.
416 *
417 * @param c, pointer of the concurrency
418 *
419 * @return int, if success, return EXIT_SUCCESS, else return -1
420 */
421 static int ms_item_win_init(ms_conn_t *c)
422 {
423 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
424 int exp_cnt= 0;
425
426 c->win_size= (int)ms_setting.win_size;
427 c->set_cursor= 0;
428 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
429 c->remain_exec_num= c->exec_num;
430
431 c->item_win= (ms_task_item_t *)malloc(
432 sizeof(ms_task_item_t) * (size_t)c->win_size);
433 if (c->item_win == NULL)
434 {
435 fprintf(stderr, "Can't allocate task item array for conn.\n");
436 return -1;
437 }
438 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
439
440 for (int i= 0; i < c->win_size; i++)
441 {
442 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
443 c->item_win[i].key_prefix= ms_get_key_prefix();
444 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
445 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
446 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
447 c->item_win[i].client_time= 0;
448
449 /* set expire time base on the proportion */
450 if (exp_cnt < ms_setting.exp_ver_per * i)
451 {
452 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
453 exp_cnt++;
454 }
455 else
456 {
457 c->item_win[i].exp_time= 0;
458 }
459 }
460
461 ms_warmup_num_init(c);
462
463 return EXIT_SUCCESS;
464 } /* ms_item_win_init */
465
466
467 /**
468 * each connection structure can include one or more sock
469 * handlers. this function create these socks and connect the
470 * server(s).
471 *
472 * @param c, pointer of the concurrency
473 *
474 * @return int, if success, return EXIT_SUCCESS, else return -1
475 */
476 static int ms_conn_sock_init(ms_conn_t *c)
477 {
478 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
479 uint32_t i;
480 int ret_sfd;
481 uint32_t srv_idx= 0;
482
483 assert(c != NULL);
484 assert(c->tcpsfd != NULL);
485
486 for (i= 0; i < c->total_sfds; i++)
487 {
488 ret_sfd= 0;
489 if (ms_setting.rep_write_srv > 0)
490 {
491 /* for replication, each connection need connect all the server */
492 srv_idx= i % ms_setting.srv_cnt;
493 }
494 else
495 {
496 /* all the connections in a thread connects the same server */
497 srv_idx= ms_thread->thread_ctx->srv_idx;
498 }
499
500 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
501 ms_setting.servers[srv_idx].srv_port,
502 ms_setting.udp, &ret_sfd) != 0)
503 {
504 break;
505 }
506
507 if (i == 0)
508 {
509 c->sfd= ret_sfd;
510 }
511
512 if (! ms_setting.udp)
513 {
514 c->tcpsfd[i]= ret_sfd;
515 }
516
517 c->alive_sfds++;
518 }
519
520 /* initialize udp sock handler if necessary */
521 if (ms_setting.facebook_test)
522 {
523 ret_sfd= 0;
524 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
525 ms_setting.servers[srv_idx].srv_port,
526 true, &ret_sfd) != 0)
527 {
528 c->udpsfd= 0;
529 }
530 else
531 {
532 c->udpsfd= ret_sfd;
533 }
534 }
535
536 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
537 {
538 if (ms_setting.udp)
539 {
540 close(c->sfd);
541 }
542 else
543 {
544 for (uint32_t j= 0; j < i; j++)
545 {
546 close(c->tcpsfd[j]);
547 }
548 }
549
550 if (c->udpsfd != 0)
551 {
552 close(c->udpsfd);
553 }
554
555 return -1;
556 }
557
558 return EXIT_SUCCESS;
559 } /* ms_conn_sock_init */
560
561
562 /**
563 * each connection is managed by libevent, this function
564 * initialize the event of the connection structure.
565 *
566 * @param c, pointer of the concurrency
567 *
568 * @return int, if success, return EXIT_SUCCESS, else return -1
569 */
570 static int ms_conn_event_init(ms_conn_t *c)
571 {
572 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
573 short event_flags= EV_WRITE | EV_PERSIST;
574
575 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
576 event_base_set(ms_thread->base, &c->event);
577 c->ev_flags= event_flags;
578
579 if (event_add(&c->event, NULL) == -1)
580 {
581 return -1;
582 }
583
584 return EXIT_SUCCESS;
585 } /* ms_conn_event_init */
586
587
588 /**
589 * setup a connection, each connection structure of each
590 * thread must call this function to initialize.
591 *
592 * @param c, pointer of the concurrency
593 *
594 * @return int, if success, return EXIT_SUCCESS, else return -1
595 */
596 int ms_setup_conn(ms_conn_t *c)
597 {
598 if (ms_item_win_init(c) != 0)
599 {
600 return -1;
601 }
602
603 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
604 {
605 return -1;
606 }
607
608 if (ms_conn_sock_init(c) != 0)
609 {
610 return -1;
611 }
612
613 if (ms_conn_event_init(c) != 0)
614 {
615 return -1;
616 }
617
618 return EXIT_SUCCESS;
619 } /* ms_setup_conn */
620
621
622 /**
623 * Frees a connection.
624 *
625 * @param c, pointer of the concurrency
626 */
627 void ms_conn_free(ms_conn_t *c)
628 {
629 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
630 if (c != NULL)
631 {
632 if (c->hdrbuf != NULL)
633 free(c->hdrbuf);
634 if (c->msglist != NULL)
635 free(c->msglist);
636 if (c->rbuf != NULL)
637 free(c->rbuf);
638 if (c->wbuf != NULL)
639 free(c->wbuf);
640 if (c->iov != NULL)
641 free(c->iov);
642 if (c->mlget_task.mlget_item != NULL)
643 free(c->mlget_task.mlget_item);
644 if (c->rudpbuf != NULL)
645 free(c->rudpbuf);
646 if (c->udppkt != NULL)
647 free(c->udppkt);
648 if (c->item_win != NULL)
649 free(c->item_win);
650 if (c->tcpsfd != NULL)
651 free(c->tcpsfd);
652
653 if (--ms_thread->nactive_conn == 0)
654 {
655 free(ms_thread->conn);
656 }
657 }
658 } /* ms_conn_free */
659
660
661 /**
662 * close a connection
663 *
664 * @param c, pointer of the concurrency
665 */
666 static void ms_conn_close(ms_conn_t *c)
667 {
668 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
669 assert(c != NULL);
670
671 /* delete the event, the socket and the connection */
672 event_del(&c->event);
673
674 for (uint32_t i= 0; i < c->total_sfds; i++)
675 {
676 if (c->tcpsfd[i] > 0)
677 {
678 close(c->tcpsfd[i]);
679 }
680 }
681 c->sfd= 0;
682
683 if (ms_setting.facebook_test)
684 {
685 close(c->udpsfd);
686 }
687
688 atomic_dec_32(&ms_stats.active_conns);
689
690 ms_conn_free(c);
691
692 if (ms_setting.run_time == 0)
693 {
694 pthread_mutex_lock(&ms_global.run_lock.lock);
695 ms_global.run_lock.count++;
696 pthread_cond_signal(&ms_global.run_lock.cond);
697 pthread_mutex_unlock(&ms_global.run_lock.lock);
698 }
699
700 if (ms_thread->nactive_conn == 0)
701 {
702 pthread_exit(NULL);
703 }
704 } /* ms_conn_close */
705
706
707 /**
708 * create a new sock
709 *
710 * @param ai, server address information
711 *
712 * @return int, if success, return EXIT_SUCCESS, else return -1
713 */
714 static int ms_new_socket(struct addrinfo *ai)
715 {
716 int sfd;
717
718 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
719 {
720 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
721 return -1;
722 }
723
724 return sfd;
725 } /* ms_new_socket */
726
727
728 /**
729 * Sets a socket's send buffer size to the maximum allowed by the system.
730 *
731 * @param sfd, file descriptor of socket
732 */
733 static void ms_maximize_sndbuf(const int sfd)
734 {
735 socklen_t intsize= sizeof(int);
736 unsigned int last_good= 0;
737 unsigned int min, max, avg;
738 unsigned int old_size;
739
740 /* Start with the default size. */
741 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
742 {
743 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
744 return;
745 }
746
747 /* Binary-search for the real maximum. */
748 min= old_size;
749 max= MAX_SENDBUF_SIZE;
750
751 while (min <= max)
752 {
753 avg= ((unsigned int)(min + max)) / 2;
754 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
755 {
756 last_good= avg;
757 min= avg + 1;
758 }
759 else
760 {
761 max= avg - 1;
762 }
763 }
764 (void)last_good;
765 } /* ms_maximize_sndbuf */
766
767
768 /**
769 * socket connects the server
770 *
771 * @param c, pointer of the concurrency
772 * @param srv_host_name, the host name of the server
773 * @param srv_port, port of server
774 * @param is_udp, whether it's udp
775 * @param ret_sfd, the connected socket file descriptor
776 *
777 * @return int, if success, return EXIT_SUCCESS, else return -1
778 */
779 static int ms_network_connect(ms_conn_t *c,
780 char *srv_host_name,
781 const int srv_port,
782 const bool is_udp,
783 int *ret_sfd)
784 {
785 int sfd;
786 struct linger ling=
787 {
788 0, 0
789 };
790 struct addrinfo *ai;
791 struct addrinfo *next;
792 struct addrinfo hints;
793 char port_buf[NI_MAXSERV];
794 int error;
795 int success= 0;
796
797 int flags= 1;
798
799 /*
800 * the memset call clears nonstandard fields in some impementations
801 * that otherwise mess things up.
802 */
803 memset(&hints, 0, sizeof(hints));
804 #ifdef AI_ADDRCONFIG
805 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
806 #else
807 hints.ai_flags= AI_PASSIVE;
808 #endif /* AI_ADDRCONFIG */
809 if (is_udp)
810 {
811 hints.ai_protocol= IPPROTO_UDP;
812 hints.ai_socktype= SOCK_DGRAM;
813 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
814 }
815 else
816 {
817 hints.ai_family= AF_UNSPEC;
818 hints.ai_protocol= IPPROTO_TCP;
819 hints.ai_socktype= SOCK_STREAM;
820 }
821
822 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
823 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
824 if (error != 0)
825 {
826 if (error != EAI_SYSTEM)
827 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
828 else
829 perror("getaddrinfo()\n");
830
831 return -1;
832 }
833
834 for (next= ai; next; next= next->ai_next)
835 {
836 if ((sfd= ms_new_socket(next)) == -1)
837 {
838 freeaddrinfo(ai);
839 return -1;
840 }
841
842 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
843 if (is_udp)
844 {
845 ms_maximize_sndbuf(sfd);
846 }
847 else
848 {
849 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
850 sizeof(flags));
851 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
852 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
853 sizeof(flags));
854 }
855
856 if (is_udp)
857 {
858 c->srv_recv_addr_size= sizeof(struct sockaddr);
859 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
860 }
861 else
862 {
863 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
864 {
865 close(sfd);
866 freeaddrinfo(ai);
867 return -1;
868 }
869 }
870
871 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
872 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
873 {
874 fprintf(stderr, "setting O_NONBLOCK\n");
875 close(sfd);
876 freeaddrinfo(ai);
877 return -1;
878 }
879
880 if (ret_sfd != NULL)
881 {
882 *ret_sfd= sfd;
883 }
884
885 success++;
886 }
887
888 freeaddrinfo(ai);
889
890 /* Return zero if we detected no errors in starting up connections */
891 return success == 0;
892 } /* ms_network_connect */
893
894
895 /**
896 * reconnect a disconnected sock
897 *
898 * @param c, pointer of the concurrency
899 *
900 * @return int, if success, return EXIT_SUCCESS, else return -1
901 */
902 static int ms_reconn(ms_conn_t *c)
903 {
904 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
905 uint32_t srv_idx= 0;
906 uint32_t srv_conn_cnt= 0;
907
908 if (ms_setting.rep_write_srv > 0)
909 {
910 srv_idx= c->cur_idx % ms_setting.srv_cnt;
911 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
912 }
913 else
914 {
915 srv_idx= ms_thread->thread_ctx->srv_idx;
916 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
917 }
918
919 /* close the old socket handler */
920 close(c->sfd);
921 c->tcpsfd[c->cur_idx]= 0;
922
923 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
924 % srv_conn_cnt == 0)
925 {
926 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
927 fprintf(stderr, "Server %s:%d disconnect\n",
928 ms_setting.servers[srv_idx].srv_host_name,
929 ms_setting.servers[srv_idx].srv_port);
930 }
931
932 if (ms_setting.rep_write_srv > 0)
933 {
934 uint32_t i= 0;
935
936 for (i= 0; i < c->total_sfds; i++)
937 {
938 if (c->tcpsfd[i] != 0)
939 {
940 break;
941 }
942 }
943
944 /* all socks disconnect */
945 if (i == c->total_sfds)
946 {
947 return -1;
948 }
949 }
950 else
951 {
952 do
953 {
954 /* reconnect success, break the loop */
955 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
956 ms_setting.servers[srv_idx].srv_port,
957 ms_setting.udp, &c->sfd) == 0)
958 {
959 c->tcpsfd[c->cur_idx]= c->sfd;
960 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
961 % (uint32_t)srv_conn_cnt == 0)
962 {
963 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
964 int reconn_time=
965 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
966 - ms_setting.servers[srv_idx].disconn_time
967 .tv_sec);
968 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
969 ms_setting.servers[srv_idx].srv_host_name,
970 ms_setting.servers[srv_idx].srv_port, reconn_time);
971 }
972 break;
973 }
974
975 if (ms_setting.rep_write_srv == 0 && c->total_sfds > 0)
976 {
977 /* wait a second and reconnect */
978 sleep(1);
979 }
980 }
981 while (ms_setting.rep_write_srv == 0 && c->total_sfds > 0);
982 }
983
984 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
985 {
986 c->sfd= 0;
987 c->alive_sfds--;
988 }
989
990 return EXIT_SUCCESS;
991 } /* ms_reconn */
992
993
994 /**
995 * reconnect several disconnected socks in the connection
996 * structure, the ever-1-second timer of the thread will check
997 * whether some socks in the connections disconnect. if
998 * disconnect, reconnect the sock.
999 *
1000 * @param c, pointer of the concurrency
1001 *
1002 * @return int, if success, return EXIT_SUCCESS, else return -1
1003 */
1004 int ms_reconn_socks(ms_conn_t *c)
1005 {
1006 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
1007 uint32_t srv_idx= 0;
1008 int ret_sfd= 0;
1009 uint32_t srv_conn_cnt= 0;
1010 struct timeval cur_time;
1011
1012 assert(c != NULL);
1013
1014 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1015 {
1016 return EXIT_SUCCESS;
1017 }
1018
1019 for (uint32_t i= 0; i < c->total_sfds; i++)
1020 {
1021 if (c->tcpsfd[i] == 0)
1022 {
1023 gettimeofday(&cur_time, NULL);
1024
1025 /**
1026 * For failover test of replication, reconnect the socks after
1027 * it disconnects more than 5 seconds, Otherwise memslap will
1028 * block at connect() function and the work threads can't work
1029 * in this interval.
1030 */
1031 if (cur_time.tv_sec
1032 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1033 {
1034 break;
1035 }
1036
1037 if (ms_setting.rep_write_srv > 0)
1038 {
1039 srv_idx= i % ms_setting.srv_cnt;
1040 srv_conn_cnt= ms_setting.sock_per_conn * ms_setting.nconns;
1041 }
1042 else
1043 {
1044 srv_idx= ms_thread->thread_ctx->srv_idx;
1045 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1046 }
1047
1048 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1049 ms_setting.servers[srv_idx].srv_port,
1050 ms_setting.udp, &ret_sfd) == 0)
1051 {
1052 c->tcpsfd[i]= ret_sfd;
1053 c->alive_sfds++;
1054
1055 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1056 % (uint32_t)srv_conn_cnt == 0)
1057 {
1058 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1059 int reconn_time=
1060 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1061 - ms_setting.servers[srv_idx].disconn_time
1062 .tv_sec);
1063 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1064 ms_setting.servers[srv_idx].srv_host_name,
1065 ms_setting.servers[srv_idx].srv_port, reconn_time);
1066 }
1067 }
1068 }
1069 }
1070
1071 return EXIT_SUCCESS;
1072 } /* ms_reconn_socks */
1073
1074
1075 /**
1076 * Tokenize the command string by replacing whitespace with '\0' and update
1077 * the token array tokens with pointer to start of each token and length.
1078 * Returns total number of tokens. The last valid token is the terminal
1079 * token (value points to the first unprocessed character of the string and
1080 * length zero).
1081 *
1082 * Usage example:
1083 *
1084 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1085 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1086 * ...
1087 * }
1088 * ncommand = tokens[ix].value - command;
1089 * command = tokens[ix].value;
1090 * }
1091 *
1092 * @param command, the command string to token
1093 * @param tokens, array to store tokens
1094 * @param max_tokens, maximum tokens number
1095 *
1096 * @return int, the number of tokens
1097 */
1098 static int ms_tokenize_command(char *command,
1099 token_t *tokens,
1100 const int max_tokens)
1101 {
1102 char *s, *e;
1103 int ntokens= 0;
1104
1105 assert(command != NULL && tokens != NULL && max_tokens > 1);
1106
1107 for (s= e= command; ntokens < max_tokens - 1; ++e)
1108 {
1109 if (*e == ' ')
1110 {
1111 if (s != e)
1112 {
1113 tokens[ntokens].value= s;
1114 tokens[ntokens].length= (size_t)(e - s);
1115 ntokens++;
1116 *e= '\0';
1117 }
1118 s= e + 1;
1119 }
1120 else if (*e == '\0')
1121 {
1122 if (s != e)
1123 {
1124 tokens[ntokens].value= s;
1125 tokens[ntokens].length= (size_t)(e - s);
1126 ntokens++;
1127 }
1128
1129 break; /* string end */
1130 }
1131 }
1132
1133 return ntokens;
1134 } /* ms_tokenize_command */
1135
1136
1137 /**
1138 * parse the response of server.
1139 *
1140 * @param c, pointer of the concurrency
1141 * @param command, the string responded by server
1142 *
1143 * @return int, if the command completed return EXIT_SUCCESS, else return
1144 * -1
1145 */
1146 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1147 {
1148 int ret= 0;
1149 int64_t value_len;
1150 char *buffer= command;
1151
1152 assert(c != NULL);
1153
1154 /**
1155 * for command get, we store the returned value into local buffer
1156 * then continue in ms_complete_nread().
1157 */
1158
1159 switch (buffer[0])
1160 {
1161 case 'V': /* VALUE || VERSION */
1162 if (buffer[1] == 'A') /* VALUE */
1163 {
1164 token_t tokens[MAX_TOKENS];
1165 ms_tokenize_command(command, tokens, MAX_TOKENS);
1166 errno= 0;
1167 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1168 if (errno != 0)
1169 {
1170 printf("<%d ERROR %s\n", c->sfd, strerror(errno));
1171 }
1172 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1173
1174 /*
1175 * We read the \r\n into the string since not doing so is more
1176 * cycles then the waster of memory to do so.
1177 *
1178 * We are null terminating through, which will most likely make
1179 * some people lazy about using the return length.
1180 */
1181 c->rvbytes= (int)(value_len + 2);
1182 c->readval= true;
1183 ret= -1;
1184 }
1185
1186 break;
1187
1188 case 'O': /* OK */
1189 c->currcmd.retstat= MCD_SUCCESS;
1190 break;
1191
1192 case 'S': /* STORED STATS SERVER_ERROR */
1193 if (buffer[2] == 'A') /* STORED STATS */
1194 { /* STATS*/
1195 c->currcmd.retstat= MCD_STAT;
1196 }
1197 else if (buffer[1] == 'E')
1198 {
1199 /* SERVER_ERROR */
1200 printf("<%d %s\n", c->sfd, buffer);
1201
1202 c->currcmd.retstat= MCD_SERVER_ERROR;
1203 }
1204 else if (buffer[1] == 'T')
1205 {
1206 /* STORED */
1207 c->currcmd.retstat= MCD_STORED;
1208 }
1209 else
1210 {
1211 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1212 }
1213 break;
1214
1215 case 'D': /* DELETED DATA */
1216 if (buffer[1] == 'E')
1217 {
1218 c->currcmd.retstat= MCD_DELETED;
1219 }
1220 else
1221 {
1222 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1223 }
1224
1225 break;
1226
1227 case 'N': /* NOT_FOUND NOT_STORED*/
1228 if (buffer[4] == 'F')
1229 {
1230 c->currcmd.retstat= MCD_NOTFOUND;
1231 }
1232 else if (buffer[4] == 'S')
1233 {
1234 printf("<%d %s\n", c->sfd, buffer);
1235 c->currcmd.retstat= MCD_NOTSTORED;
1236 }
1237 else
1238 {
1239 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1240 }
1241 break;
1242
1243 case 'E': /* PROTOCOL ERROR or END */
1244 if (buffer[1] == 'N')
1245 {
1246 /* END */
1247 c->currcmd.retstat= MCD_END;
1248 }
1249 else if (buffer[1] == 'R')
1250 {
1251 printf("<%d ERROR\n", c->sfd);
1252 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1253 }
1254 else if (buffer[1] == 'X')
1255 {
1256 c->currcmd.retstat= MCD_DATA_EXISTS;
1257 printf("<%d %s\n", c->sfd, buffer);
1258 }
1259 else
1260 {
1261 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1262 }
1263 break;
1264
1265 case 'C': /* CLIENT ERROR */
1266 printf("<%d %s\n", c->sfd, buffer);
1267 c->currcmd.retstat= MCD_CLIENT_ERROR;
1268 break;
1269
1270 default:
1271 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1272 break;
1273 } /* switch */
1274
1275 return ret;
1276 } /* ms_ascii_process_line */
1277
1278
1279 /**
1280 * after one operation completes, reset the concurrency
1281 *
1282 * @param c, pointer of the concurrency
1283 * @param timeout, whether it's timeout
1284 */
1285 void ms_reset_conn(ms_conn_t *c, bool timeout)
1286 {
1287 assert(c != NULL);
1288
1289 if (c->udp)
1290 {
1291 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1292 {
1293 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1294 }
1295
1296 c->packets= 0;
1297 c->recvpkt= 0;
1298 c->pktcurr= 0;
1299 c->ordcurr= 0;
1300 c->rudpbytes= 0;
1301 }
1302 c->currcmd.isfinish= true;
1303 c->ctnwrite= false;
1304 c->rbytes= 0;
1305 c->rcurr= c->rbuf;
1306 c->msgcurr = 0;
1307 c->msgused = 0;
1308 c->iovused = 0;
1309 ms_conn_set_state(c, conn_write);
1310 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1311
1312 if (timeout)
1313 {
1314 ms_drive_machine(c);
1315 }
1316 } /* ms_reset_conn */
1317
1318
1319 /**
1320 * if we have a complete line in the buffer, process it.
1321 *
1322 * @param c, pointer of the concurrency
1323 *
1324 * @return int, if success, return EXIT_SUCCESS, else return -1
1325 */
1326 static int ms_try_read_line(ms_conn_t *c)
1327 {
1328 if (c->protocol == binary_prot)
1329 {
1330 /* Do we have the complete packet header? */
1331 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1332 {
1333 /* need more data! */
1334 return EXIT_SUCCESS;
1335 }
1336 else
1337 {
1338 #ifdef NEED_ALIGN
1339 if (((long)(c->rcurr)) % 8 != 0)
1340 {
1341 /* must realign input buffer */
1342 memmove(c->rbuf, c->rcurr, c->rbytes);
1343 c->rcurr= c->rbuf;
1344 if (settings.verbose)
1345 {
1346 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1347 }
1348 }
1349 #endif
1350 protocol_binary_response_header *rsp;
1351 rsp= (protocol_binary_response_header *)c->rcurr;
1352
1353 c->binary_header= *rsp;
1354 c->binary_header.response.extlen= rsp->response.extlen;
1355 c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1356 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1357 c->binary_header.response.status= ntohs(rsp->response.status);
1358
1359 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1360 {
1361 fprintf(stderr, "Invalid magic: %x\n",
1362 c->binary_header.response.magic);
1363 ms_conn_set_state(c, conn_closing);
1364 return EXIT_SUCCESS;
1365 }
1366
1367 /* process this complete response */
1368 if (ms_bin_process_response(c) == 0)
1369 {
1370 /* current operation completed */
1371 ms_reset_conn(c, false);
1372 return -1;
1373 }
1374 else
1375 {
1376 c->rbytes-= (int32_t)sizeof(c->binary_header);
1377 c->rcurr+= sizeof(c->binary_header);
1378 }
1379 }
1380 }
1381 else
1382 {
1383 char *el, *cont;
1384
1385 assert(c != NULL);
1386 assert(c->rcurr <= (c->rbuf + c->rsize));
1387
1388 if (c->rbytes == 0)
1389 return EXIT_SUCCESS;
1390
1391 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1392 if (! el)
1393 return EXIT_SUCCESS;
1394
1395 cont= el + 1;
1396 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1397 {
1398 el--;
1399 }
1400 *el= '\0';
1401
1402 assert(cont <= (c->rcurr + c->rbytes));
1403
1404 /* process this complete line */
1405 if (ms_ascii_process_line(c, c->rcurr) == 0)
1406 {
1407 /* current operation completed */
1408 ms_reset_conn(c, false);
1409 return -1;
1410 }
1411 else
1412 {
1413 /* current operation didn't complete */
1414 c->rbytes-= (int32_t)(cont - c->rcurr);
1415 c->rcurr= cont;
1416 }
1417
1418 assert(c->rcurr <= (c->rbuf + c->rsize));
1419 }
1420
1421 return -1;
1422 } /* ms_try_read_line */
1423
1424
1425 /**
1426 * because the packet of UDP can't ensure the order, the
1427 * function is used to sort the received udp packet.
1428 *
1429 * @param c, pointer of the concurrency
1430 * @param buf, the buffer to store the ordered packages data
1431 * @param rbytes, the maximum capacity of the buffer
1432 *
1433 * @return int, if success, return the copy bytes, else return
1434 * -1
1435 */
1436 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1437 {
1438 int len= 0;
1439 int wbytes= 0;
1440 uint16_t req_id= 0;
1441 uint16_t seq_num= 0;
1442 uint16_t packets= 0;
1443 unsigned char *header= NULL;
1444
1445 /* no enough data */
1446 assert(c != NULL);
1447 assert(buf != NULL);
1448 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1449
1450 /* calculate received packets count */
1451 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1452 {
1453 /* the last packet has some data */
1454 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1455 }
1456 else
1457 {
1458 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1459 }
1460
1461 /* get the total packets count if necessary */
1462 if (c->packets == 0)
1463 {
1464 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1465 }
1466
1467 /* build the ordered packet array */
1468 for (int i= c->pktcurr; i < c->recvpkt; i++)
1469 {
1470 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1471 req_id= (uint16_t)HEADER_TO_REQID(header);
1472 assert(req_id == c->request_id % (1 << 16));
1473
1474 packets= (uint16_t)HEADER_TO_PACKETS(header);
1475 assert(c->packets == HEADER_TO_PACKETS(header));
1476
1477 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1478 c->udppkt[seq_num].header= header;
1479 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1480
1481 if (i == c->recvpkt - 1)
1482 {
1483 /* last received packet */
1484 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1485 {
1486 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1487 c->pktcurr++;
1488 }
1489 else
1490 {
1491 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1492 - UDP_HEADER_SIZE;
1493 }
1494 }
1495 else
1496 {
1497 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1498 c->pktcurr++;
1499 }
1500 }
1501
1502 for (int i= c->ordcurr; i < c->recvpkt; i++)
1503 {
1504 /* there is some data to copy */
1505 if ((c->udppkt[i].data != NULL)
1506 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1507 {
1508 header= c->udppkt[i].header;
1509 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1510 if (len > rbytes - wbytes)
1511 {
1512 len= rbytes - wbytes;
1513 }
1514
1515 assert(len <= rbytes - wbytes);
1516 assert(i == HEADER_TO_SEQNUM(header));
1517
1518 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1519 (size_t)len);
1520 wbytes+= len;
1521 c->udppkt[i].copybytes+= len;
1522
1523 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1524 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1525 {
1526 /* finish copying all the data of this packet, next */
1527 c->ordcurr++;
1528 }
1529
1530 /* last received packet, and finish copying all the data */
1531 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1532 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1533 {
1534 break;
1535 }
1536
1537 /* no space to copy data */
1538 if (wbytes >= rbytes)
1539 {
1540 break;
1541 }
1542
1543 /* it doesn't finish reading all the data of the packet from network */
1544 if ((i != c->recvpkt - 1)
1545 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1546 {
1547 break;
1548 }
1549 }
1550 else
1551 {
1552 /* no data to copy */
1553 break;
1554 }
1555 }
1556 (void)packets;
1557
1558 return wbytes == 0 ? -1 : wbytes;
1559 } /* ms_sort_udp_packet */
1560
1561
1562 /**
1563 * encapsulate upd read like tcp read
1564 *
1565 * @param c, pointer of the concurrency
1566 * @param buf, read buffer
1567 * @param len, length to read
1568 *
1569 * @return int, if success, return the read bytes, else return
1570 * -1
1571 */
1572 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1573 {
1574 int res= 0;
1575 int avail= 0;
1576 int rbytes= 0;
1577 int copybytes= 0;
1578
1579 assert(c->udp);
1580
1581 while (1)
1582 {
1583 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1584 {
1585 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1586 if (! new_rbuf)
1587 {
1588 fprintf(stderr, "Couldn't realloc input buffer.\n");
1589 c->rudpbytes= 0; /* ignore what we read */
1590 return -1;
1591 }
1592 c->rudpbuf= new_rbuf;
1593 c->rudpsize*= 2;
1594 }
1595
1596 avail= c->rudpsize - c->rudpbytes;
1597 /* UDP each time read a packet, 1400 bytes */
1598 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1599
1600 if (res > 0)
1601 {
1602 atomic_add_size(&ms_stats.bytes_read, res);
1603 c->rudpbytes+= res;
1604 rbytes+= res;
1605 if (res == avail)
1606 {
1607 continue;
1608 }
1609 else
1610 {
1611 break;
1612 }
1613 }
1614
1615 if (res == 0)
1616 {
1617 /* "connection" closed */
1618 return res;
1619 }
1620
1621 if (res == -1)
1622 {
1623 /* no data to read */
1624 return res;
1625 }
1626 }
1627
1628 /* copy data to read buffer */
1629 if (rbytes > 0)
1630 {
1631 copybytes= ms_sort_udp_packet(c, buf, len);
1632 }
1633
1634 if (copybytes == -1)
1635 {
1636 atomic_add_size(&ms_stats.pkt_disorder, 1);
1637 }
1638
1639 return copybytes;
1640 } /* ms_udp_read */
1641
1642
1643 /*
1644 * read from network as much as we can, handle buffer overflow and connection
1645 * close.
1646 * before reading, move the remaining incomplete fragment of a command
1647 * (if any) to the beginning of the buffer.
1648 * return EXIT_SUCCESS if there's nothing to read on the first read.
1649 */
1650
1651 /**
1652 * read from network as much as we can, handle buffer overflow and connection
1653 * close. before reading, move the remaining incomplete fragment of a command
1654 * (if any) to the beginning of the buffer.
1655 *
1656 * @param c, pointer of the concurrency
1657 *
1658 * @return int,
1659 * return EXIT_SUCCESS if there's nothing to read on the first read.
1660 * return EXIT_FAILURE if get data
1661 * return -1 if error happens
1662 */
1663 static int ms_try_read_network(ms_conn_t *c)
1664 {
1665 int gotdata= 0;
1666 int res;
1667 int64_t avail;
1668
1669 assert(c != NULL);
1670
1671 if ((c->rcurr != c->rbuf)
1672 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1673 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1674 {
1675 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1676 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1677 c->rcurr= c->rbuf;
1678 }
1679
1680 while (1)
1681 {
1682 if (c->rbytes >= c->rsize)
1683 {
1684 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1685 if (! new_rbuf)
1686 {
1687 fprintf(stderr, "Couldn't realloc input buffer.\n");
1688 c->rbytes= 0; /* ignore what we read */
1689 return -1;
1690 }
1691 c->rcurr= c->rbuf= new_rbuf;
1692 c->rsize*= 2;
1693 }
1694
1695 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1696 if (avail == 0)
1697 {
1698 break;
1699 }
1700
1701 if (c->udp)
1702 {
1703 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1704 }
1705 else
1706 {
1707 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1708 }
1709
1710 if (res > 0)
1711 {
1712 if (! c->udp)
1713 {
1714 atomic_add_size(&ms_stats.bytes_read, res);
1715 }
1716 gotdata= 1;
1717 c->rbytes+= res;
1718 if (res == avail)
1719 {
1720 continue;
1721 }
1722 else
1723 {
1724 break;
1725 }
1726 }
1727 if (res == 0)
1728 {
1729 /* connection closed */
1730 ms_conn_set_state(c, conn_closing);
1731 return -1;
1732 }
1733 if (res == -1)
1734 {
1735 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1736 break;
1737 /* Should close on unhandled errors. */
1738 ms_conn_set_state(c, conn_closing);
1739 return -1;
1740 }
1741 }
1742
1743 return gotdata;
1744 } /* ms_try_read_network */
1745
1746
1747 /**
1748 * after get the object from server, verify the value if
1749 * necessary.
1750 *
1751 * @param c, pointer of the concurrency
1752 * @param mlget_item, pointer of mulit-get task item structure
1753 * @param value, received value string
1754 * @param vlen, received value string length
1755 */
1756 static void ms_verify_value(ms_conn_t *c,
1757 ms_mlget_task_item_t *mlget_item,
1758 char *value,
1759 int vlen)
1760 {
1761 if (c->curr_task.verify)
1762 {
1763 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1764 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1765 char *orignkey=
1766 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1767
1768 /* verify expire time if necessary */
1769 if (c->curr_task.item->exp_time > 0)
1770 {
1771 struct timeval curr_time;
1772 gettimeofday(&curr_time, NULL);
1773
1774 /* object expired but get it now */
1775 if (curr_time.tv_sec - c->curr_task.item->client_time
1776 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1777 {
1778 atomic_add_size(&ms_stats.exp_get, 1);
1779
1780 if (ms_setting.verbose)
1781 {
1782 char set_time[64];
1783 char cur_time[64];
1784 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1785 localtime(&c->curr_task.item->client_time));
1786 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1787 localtime(&curr_time.tv_sec));
1788 fprintf(stderr,
1789 "\n<%d expire time verification failed, "
1790 "object expired but get it now\n"
1791 "\tkey len: %d\n"
1792 "\tkey: %" PRIx64 " %.*s\n"
1793 "\tset time: %s current time: %s "
1794 "diff time: %d expire time: %d\n"
1795 "\texpected data: \n"
1796 "\treceived data len: %d\n"
1797 "\treceived data: %.*s\n",
1798 c->sfd,
1799 c->curr_task.item->key_size,
1800 c->curr_task.item->key_prefix,
1801 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1802 orignkey,
1803 set_time,
1804 cur_time,
1805 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1806 c->curr_task.item->exp_time,
1807 vlen,
1808 vlen,
1809 value);
1810 fflush(stderr);
1811 }
1812 }
1813 }
1814 else
1815 {
1816 if ((c->curr_task.item->value_size != vlen)
1817 || (memcmp(orignval, value, (size_t)vlen) != 0))
1818 {
1819 atomic_add_size(&ms_stats.vef_failed, 1);
1820
1821 if (ms_setting.verbose)
1822 {
1823 fprintf(stderr,
1824 "\n<%d data verification failed\n"
1825 "\tkey len: %d\n"
1826 "\tkey: %" PRIx64" %.*s\n"
1827 "\texpected data len: %d\n"
1828 "\texpected data: %.*s\n"
1829 "\treceived data len: %d\n"
1830 "\treceived data: %.*s\n",
1831 c->sfd,
1832 c->curr_task.item->key_size,
1833 c->curr_task.item->key_prefix,
1834 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1835 orignkey,
1836 c->curr_task.item->value_size,
1837 c->curr_task.item->value_size,
1838 orignval,
1839 vlen,
1840 vlen,
1841 value);
1842 fflush(stderr);
1843 }
1844 }
1845 }
1846
1847 c->curr_task.finish_verify= true;
1848
1849 if (mlget_item != NULL)
1850 {
1851 mlget_item->finish_verify= true;
1852 }
1853 }
1854 } /* ms_verify_value */
1855
1856
1857 /**
1858 * For ASCII protocol, after store the data into the local
1859 * buffer, run this function to handle the data.
1860 *
1861 * @param c, pointer of the concurrency
1862 */
1863 static void ms_ascii_complete_nread(ms_conn_t *c)
1864 {
1865 assert(c != NULL);
1866 assert(c->rbytes >= c->rvbytes);
1867 assert(c->protocol == ascii_prot);
1868 if (c->rvbytes > 2)
1869 {
1870 assert(
1871 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1872 }
1873
1874 /* multi-get */
1875 ms_mlget_task_item_t *mlget_item= NULL;
1876 if (((ms_setting.mult_key_num > 1)
1877 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1878 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1879 {
1880 c->mlget_task.value_index++;
1881 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1882
1883 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1884 {
1885 c->curr_task.item= mlget_item->item;
1886 c->curr_task.verify= mlget_item->verify;
1887 c->curr_task.finish_verify= mlget_item->finish_verify;
1888 mlget_item->get_miss= false;
1889 }
1890 else
1891 {
1892 /* Try to find the task item in multi-get task array */
1893 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1894 {
1895 mlget_item= &c->mlget_task.mlget_item[i];
1896 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1897 {
1898 c->curr_task.item= mlget_item->item;
1899 c->curr_task.verify= mlget_item->verify;
1900 c->curr_task.finish_verify= mlget_item->finish_verify;
1901 mlget_item->get_miss= false;
1902
1903 break;
1904 }
1905 }
1906 }
1907 }
1908
1909 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1910
1911 c->curr_task.get_miss= false;
1912 c->rbytes-= c->rvbytes;
1913 c->rcurr= c->rcurr + c->rvbytes;
1914 assert(c->rcurr <= (c->rbuf + c->rsize));
1915 c->readval= false;
1916 c->rvbytes= 0;
1917 } /* ms_ascii_complete_nread */
1918
1919
1920 /**
1921 * For binary protocol, after store the data into the local
1922 * buffer, run this function to handle the data.
1923 *
1924 * @param c, pointer of the concurrency
1925 */
1926 static void ms_bin_complete_nread(ms_conn_t *c)
1927 {
1928 assert(c != NULL);
1929 assert(c->rbytes >= c->rvbytes);
1930 assert(c->protocol == binary_prot);
1931
1932 int extlen= c->binary_header.response.extlen;
1933 int keylen= c->binary_header.response.keylen;
1934 uint8_t opcode= c->binary_header.response.opcode;
1935
1936 /* not get command or not include value, just return */
1937 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1938 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1939 || (c->rvbytes <= extlen + keylen))
1940 {
1941 /* get miss */
1942 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1943 {
1944 c->currcmd.retstat= MCD_END;
1945 c->curr_task.get_miss= true;
1946 }
1947
1948 c->readval= false;
1949 c->rvbytes= 0;
1950 ms_reset_conn(c, false);
1951 return;
1952 }
1953
1954 /* multi-get */
1955 ms_mlget_task_item_t *mlget_item= NULL;
1956 if (((ms_setting.mult_key_num > 1)
1957 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1958 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1959 {
1960 c->mlget_task.value_index++;
1961 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1962
1963 c->curr_task.item= mlget_item->item;
1964 c->curr_task.verify= mlget_item->verify;
1965 c->curr_task.finish_verify= mlget_item->finish_verify;
1966 mlget_item->get_miss= false;
1967 }
1968
1969 ms_verify_value(c,
1970 mlget_item,
1971 c->rcurr + extlen + keylen,
1972 c->rvbytes - extlen - keylen);
1973
1974 c->currcmd.retstat= MCD_END;
1975 c->curr_task.get_miss= false;
1976 c->rbytes-= c->rvbytes;
1977 c->rcurr= c->rcurr + c->rvbytes;
1978 assert(c->rcurr <= (c->rbuf + c->rsize));
1979 c->readval= false;
1980 c->rvbytes= 0;
1981
1982 if (ms_setting.mult_key_num > 1)
1983 {
1984 /* multi-get have check all the item */
1985 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1986 {
1987 ms_reset_conn(c, false);
1988 }
1989 }
1990 else
1991 {
1992 /* single get */
1993 ms_reset_conn(c, false);
1994 }
1995 } /* ms_bin_complete_nread */
1996
1997
1998 /**
1999 * we get here after reading the value of get commands.
2000 *
2001 * @param c, pointer of the concurrency
2002 */
2003 static void ms_complete_nread(ms_conn_t *c)
2004 {
2005 assert(c != NULL);
2006 assert(c->rbytes >= c->rvbytes);
2007 assert(c->protocol == ascii_prot
2008 || c->protocol == binary_prot);
2009
2010 if (c->protocol == binary_prot)
2011 {
2012 ms_bin_complete_nread(c);
2013 }
2014 else
2015 {
2016 ms_ascii_complete_nread(c);
2017 }
2018 } /* ms_complete_nread */
2019
2020
2021 /**
2022 * Adds a message header to a connection.
2023 *
2024 * @param c, pointer of the concurrency
2025 *
2026 * @return int, if success, return EXIT_SUCCESS, else return -1
2027 */
2028 static int ms_add_msghdr(ms_conn_t *c)
2029 {
2030 struct msghdr *msg;
2031
2032 assert(c != NULL);
2033
2034 if (c->msgsize == c->msgused)
2035 {
2036 msg=
2037 realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2038 if (! msg)
2039 return -1;
2040
2041 c->msglist= msg;
2042 c->msgsize*= 2;
2043 }
2044
2045 msg= c->msglist + c->msgused;
2046
2047 /**
2048 * this wipes msg_iovlen, msg_control, msg_controllen, and
2049 * msg_flags, the last 3 of which aren't defined on solaris:
2050 */
2051 memset(msg, 0, sizeof(struct msghdr));
2052
2053 msg->msg_iov= &c->iov[c->iovused];
2054
2055 if (c->udp && (c->srv_recv_addr_size > 0))
2056 {
2057 msg->msg_name= &c->srv_recv_addr;
2058 msg->msg_namelen= c->srv_recv_addr_size;
2059 }
2060
2061 c->msgbytes= 0;
2062 c->msgused++;
2063
2064 if (c->udp)
2065 {
2066 /* Leave room for the UDP header, which we'll fill in later. */
2067 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2068 }
2069
2070 return EXIT_SUCCESS;
2071 } /* ms_add_msghdr */
2072
2073
2074 /**
2075 * Ensures that there is room for another structure iovec in a connection's
2076 * iov list.
2077 *
2078 * @param c, pointer of the concurrency
2079 *
2080 * @return int, if success, return EXIT_SUCCESS, else return -1
2081 */
2082 static int ms_ensure_iov_space(ms_conn_t *c)
2083 {
2084 assert(c != NULL);
2085
2086 if (c->iovused >= c->iovsize)
2087 {
2088 int i, iovnum;
2089 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2090 ((size_t)c->iovsize
2091 * 2)
2092 * sizeof(struct iovec));
2093 if (! new_iov)
2094 return -1;
2095
2096 c->iov= new_iov;
2097 c->iovsize*= 2;
2098
2099 /* Point all the msghdr structures at the new list. */
2100 for (i= 0, iovnum= 0; i < c->msgused; i++)
2101 {
2102 c->msglist[i].msg_iov= &c->iov[iovnum];
2103 iovnum+= (int)c->msglist[i].msg_iovlen;
2104 }
2105 }
2106
2107 return EXIT_SUCCESS;
2108 } /* ms_ensure_iov_space */
2109
2110
2111 /**
2112 * Adds data to the list of pending data that will be written out to a
2113 * connection.
2114 *
2115 * @param c, pointer of the concurrency
2116 * @param buf, the buffer includes data to send
2117 * @param len, the data length in the buffer
2118 *
2119 * @return int, if success, return EXIT_SUCCESS, else return -1
2120 */
2121 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2122 {
2123 struct msghdr *m;
2124 int leftover;
2125 bool limit_to_mtu;
2126
2127 assert(c != NULL);
2128
2129 do
2130 {
2131 m= &c->msglist[c->msgused - 1];
2132
2133 /*
2134 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2135 */
2136 limit_to_mtu= c->udp;
2137
2138 #ifdef IOV_MAX
2139 /* We may need to start a new msghdr if this one is full. */
2140 if ((m->msg_iovlen == IOV_MAX)
2141 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2142 {
2143 ms_add_msghdr(c);
2144 m= &c->msglist[c->msgused - 1];
2145 }
2146 #endif
2147
2148 if (ms_ensure_iov_space(c) != 0)
2149 return -1;
2150
2151 /* If the fragment is too big to fit in the datagram, split it up */
2152 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2153 {
2154 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2155 len-= leftover;
2156 }
2157 else
2158 {
2159 leftover= 0;
2160 }
2161
2162 m= &c->msglist[c->msgused - 1];
2163 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2164 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2165
2166 c->msgbytes+= len;
2167 c->iovused++;
2168 m->msg_iovlen++;
2169
2170 buf= ((char *)buf) + len;
2171 len= leftover;
2172 }
2173 while (leftover > 0);
2174
2175 return EXIT_SUCCESS;
2176 } /* ms_add_iov */
2177
2178
2179 /**
2180 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2181 *
2182 * @param c, pointer of the concurrency
2183 *
2184 * @return int, if success, return EXIT_SUCCESS, else return -1
2185 */
2186 static int ms_build_udp_headers(ms_conn_t *c)
2187 {
2188 int i;
2189 unsigned char *hdr;
2190
2191 assert(c != NULL);
2192
2193 c->request_id= ms_get_udp_request_id();
2194
2195 if (c->msgused > c->hdrsize)
2196 {
2197 void *new_hdrbuf;
2198 if (c->hdrbuf)
2199 new_hdrbuf= realloc(c->hdrbuf,
2200 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2201 else
2202 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2203 if (! new_hdrbuf)
2204 return -1;
2205
2206 c->hdrbuf= (unsigned char *)new_hdrbuf;
2207 c->hdrsize= c->msgused * 2;
2208 }
2209
2210 /* If this is a multi-packet request, drop it. */
2211 if (c->udp && (c->msgused > 1))
2212 {
2213 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2214 return -1;
2215 }
2216
2217 hdr= c->hdrbuf;
2218 for (i= 0; i < c->msgused; i++)
2219 {
2220 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2221 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2222 *hdr++= (unsigned char)(c->request_id / 256);
2223 *hdr++= (unsigned char)(c->request_id % 256);
2224 *hdr++= (unsigned char)(i / 256);
2225 *hdr++= (unsigned char)(i % 256);
2226 *hdr++= (unsigned char)(c->msgused / 256);
2227 *hdr++= (unsigned char)(c->msgused % 256);
2228 *hdr++= (unsigned char)1; /* support facebook memcached */
2229 *hdr++= (unsigned char)0;
2230 assert(hdr ==
2231 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2232 + UDP_HEADER_SIZE));
2233 }
2234
2235 return EXIT_SUCCESS;
2236 } /* ms_build_udp_headers */
2237
2238
2239 /**
2240 * Transmit the next chunk of data from our list of msgbuf structures.
2241 *
2242 * @param c, pointer of the concurrency
2243 *
2244 * @return TRANSMIT_COMPLETE All done writing.
2245 * TRANSMIT_INCOMPLETE More data remaining to write.
2246 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2247 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2248 */
2249 static int ms_transmit(ms_conn_t *c)
2250 {
2251 assert(c != NULL);
2252
2253 if ((c->msgcurr < c->msgused)
2254 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2255 {
2256 /* Finished writing the current msg; advance to the next. */
2257 c->msgcurr++;
2258 }
2259
2260 if (c->msgcurr < c->msgused)
2261 {
2262 ssize_t res;
2263 struct msghdr *m= &c->msglist[c->msgcurr];
2264
2265 res= sendmsg(c->sfd, m, 0);
2266 if (res > 0)
2267 {
2268 atomic_add_size(&ms_stats.bytes_written, res);
2269
2270 /* We've written some of the data. Remove the completed
2271 * iovec entries from the list of pending writes. */
2272 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2273 {
2274 res-= (ssize_t)m->msg_iov->iov_len;
2275 m->msg_iovlen--;
2276 m->msg_iov++;
2277 }
2278
2279 /* Might have written just part of the last iovec entry;
2280 * adjust it so the next write will do the rest. */
2281 if (res > 0)
2282 {
2283 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2284 m->msg_iov->iov_len-= (size_t)res;
2285 }
2286 return TRANSMIT_INCOMPLETE;
2287 }
2288 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2289 {
2290 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2291 {
2292 fprintf(stderr, "Couldn't update event.\n");
2293 ms_conn_set_state(c, conn_closing);
2294 return TRANSMIT_HARD_ERROR;
2295 }
2296 return TRANSMIT_SOFT_ERROR;
2297 }
2298
2299 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2300 * we have a real error, on which we close the connection */
2301 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2302
2303 ms_conn_set_state(c, conn_closing);
2304 return TRANSMIT_HARD_ERROR;
2305 }
2306 else
2307 {
2308 return TRANSMIT_COMPLETE;
2309 }
2310 } /* ms_transmit */
2311
2312
2313 /**
2314 * Shrinks a connection's buffers if they're too big. This prevents
2315 * periodic large "mget" response from server chewing lots of client
2316 * memory.
2317 *
2318 * This should only be called in between requests since it can wipe output
2319 * buffers!
2320 *
2321 * @param c, pointer of the concurrency
2322 */
2323 static void ms_conn_shrink(ms_conn_t *c)
2324 {
2325 assert(c != NULL);
2326
2327 if (c->udp)
2328 return;
2329
2330 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2331 {
2332 char *newbuf;
2333
2334 if (c->rcurr != c->rbuf)
2335 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2336
2337 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2338
2339 if (newbuf)
2340 {
2341 c->rbuf= newbuf;
2342 c->rsize= DATA_BUFFER_SIZE;
2343 }
2344 c->rcurr= c->rbuf;
2345 }
2346
2347 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2348 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2349 {
2350 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2351 if (new_rbuf)
2352 {
2353 c->rudpbuf= new_rbuf;
2354 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2355 }
2356 /* TODO check error condition? */
2357 }
2358
2359 if (c->msgsize > MSG_LIST_HIGHWAT)
2360 {
2361 struct msghdr *newbuf= (struct msghdr *)realloc(
2362 (void *)c->msglist,
2363 MSG_LIST_INITIAL
2364 * sizeof(c->msglist[0]));
2365 if (newbuf)
2366 {
2367 c->msglist= newbuf;
2368 c->msgsize= MSG_LIST_INITIAL;
2369 }
2370 /* TODO check error condition? */
2371 }
2372
2373 if (c->iovsize > IOV_LIST_HIGHWAT)
2374 {
2375 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2376 IOV_LIST_INITIAL
2377 * sizeof(c->iov[0]));
2378 if (newbuf)
2379 {
2380 c->iov= newbuf;
2381 c->iovsize= IOV_LIST_INITIAL;
2382 }
2383 /* TODO check return value */
2384 }
2385 } /* ms_conn_shrink */
2386
2387
2388 /**
2389 * Sets a connection's current state in the state machine. Any special
2390 * processing that needs to happen on certain state transitions can
2391 * happen here.
2392 *
2393 * @param c, pointer of the concurrency
2394 * @param state, connection state
2395 */
2396 static void ms_conn_set_state(ms_conn_t *c, int state)
2397 {
2398 assert(c != NULL);
2399
2400 if (state != c->state)
2401 {
2402 if (state == conn_read)
2403 {
2404 ms_conn_shrink(c);
2405 }
2406 c->state= state;
2407 }
2408 } /* ms_conn_set_state */
2409
2410
2411 /**
2412 * update the event if socks change state. for example: when
2413 * change the listen scoket read event to sock write event, or
2414 * change socket handler, we could call this function.
2415 *
2416 * @param c, pointer of the concurrency
2417 * @param new_flags, new event flags
2418 *
2419 * @return bool, if success, return true, else return false
2420 */
2421 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2422 {
2423 assert(c != NULL);
2424
2425 struct event_base *base= c->event.ev_base;
2426 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2427 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2428 {
2429 return true;
2430 }
2431
2432 if (event_del(&c->event) == -1)
2433 {
2434 /* try to delete the event again */
2435 if (event_del(&c->event) == -1)
2436 {
2437 return false;
2438 }
2439 }
2440
2441 event_set(&c->event,
2442 c->sfd,
2443 (short)new_flags,
2444 ms_event_handler,
2445 (void *)c);
2446 event_base_set(base, &c->event);
2447 c->ev_flags= (short)new_flags;
2448
2449 if (event_add(&c->event, NULL) == -1)
2450 {
2451 return false;
2452 }
2453
2454 return true;
2455 } /* ms_update_event */
2456
2457
2458 /**
2459 * If user want to get the expected throughput, we could limit
2460 * the performance of memslap. we could give up some work and
2461 * just wait a short time. The function is used to check this
2462 * case.
2463 *
2464 * @param c, pointer of the concurrency
2465 *
2466 * @return bool, if success, return true, else return false
2467 */
2468 static bool ms_need_yield(ms_conn_t *c)
2469 {
2470 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2471 int64_t tps= 0;
2472 int64_t time_diff= 0;
2473 struct timeval curr_time;
2474 ms_task_t *task= &c->curr_task;
2475
2476 if (ms_setting.expected_tps > 0)
2477 {
2478 gettimeofday(&curr_time, NULL);
2479 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2480 tps= (int64_t)(((task->get_opt + task->set_opt) / (uint64_t)time_diff) * 1000000);
2481
2482 /* current throughput is greater than expected throughput */
2483 if (tps > ms_thread->thread_ctx->tps_perconn)
2484 {
2485 return true;
2486 }
2487 }
2488
2489 return false;
2490 } /* ms_need_yield */
2491
2492
2493 /**
2494 * used to update the start time of each operation
2495 *
2496 * @param c, pointer of the concurrency
2497 */
2498 static void ms_update_start_time(ms_conn_t *c)
2499 {
2500 ms_task_item_t *item= c->curr_task.item;
2501
2502 if ((ms_setting.stat_freq > 0) || c->udp
2503 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2504 {
2505 gettimeofday(&c->start_time, NULL);
2506 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2507 {
2508 /* record the current time */
2509 item->client_time= c->start_time.tv_sec;
2510 }
2511 }
2512 } /* ms_update_start_time */
2513
2514
2515 /**
2516 * run the state machine
2517 *
2518 * @param c, pointer of the concurrency
2519 */
2520 static void ms_drive_machine(ms_conn_t *c)
2521 {
2522 bool stop= false;
2523
2524 assert(c != NULL);
2525
2526 while (! stop)
2527 {
2528 switch (c->state)
2529 {
2530 case conn_read:
2531 if (c->readval)
2532 {
2533 if (c->rbytes >= c->rvbytes)
2534 {
2535 ms_complete_nread(c);
2536 break;
2537 }
2538 }
2539 else
2540 {
2541 if (ms_try_read_line(c) != 0)
2542 {
2543 break;
2544 }
2545 }
2546
2547 if (ms_try_read_network(c) != 0)
2548 {
2549 break;
2550 }
2551
2552 /* doesn't read all the response data, wait event wake up */
2553 if (! c->currcmd.isfinish)
2554 {
2555 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2556 {
2557 fprintf(stderr, "Couldn't update event.\n");
2558 ms_conn_set_state(c, conn_closing);
2559 break;
2560 }
2561 stop= true;
2562 break;
2563 }
2564
2565 /* we have no command line and no data to read from network, next write */
2566 ms_conn_set_state(c, conn_write);
2567 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2568
2569 break;
2570
2571 case conn_write:
2572 if (! c->ctnwrite && ms_need_yield(c))
2573 {
2574 usleep(10);
2575
2576 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2577 {
2578 fprintf(stderr, "Couldn't update event.\n");
2579 ms_conn_set_state(c, conn_closing);
2580 break;
2581 }
2582 stop= true;
2583 break;
2584 }
2585
2586 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2587 {
2588 ms_conn_set_state(c, conn_closing);
2589 break;
2590 }
2591
2592 /* record the start time before starting to send data if necessary */
2593 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2594 {
2595 if (c->change_sfd)
2596 {
2597 c->change_sfd= false;
2598 }
2599 ms_update_start_time(c);
2600 }
2601
2602 /* change sfd if necessary */
2603 if (c->change_sfd)
2604 {
2605 c->ctnwrite= true;
2606 stop= true;
2607 break;
2608 }
2609
2610 /* execute task until nothing need be written to network */
2611 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2612 {
2613 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2614 {
2615 fprintf(stderr, "Couldn't update event.\n");
2616 ms_conn_set_state(c, conn_closing);
2617 break;
2618 }
2619 stop= true;
2620 break;
2621 }
2622
2623 switch (ms_transmit(c))
2624 {
2625 case TRANSMIT_COMPLETE:
2626 /* we have no data to write to network, next wait repose */
2627 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2628 {
2629 fprintf(stderr, "Couldn't update event.\n");
2630 ms_conn_set_state(c, conn_closing);
2631 c->ctnwrite= false;
2632 break;
2633 }
2634 ms_conn_set_state(c, conn_read);
2635 c->ctnwrite= false;
2636 stop= true;
2637 break;
2638
2639 case TRANSMIT_INCOMPLETE:
2640 c->ctnwrite= true;
2641 break; /* Continue in state machine. */
2642
2643 case TRANSMIT_HARD_ERROR:
2644 c->ctnwrite= false;
2645 break;
2646
2647 case TRANSMIT_SOFT_ERROR:
2648 c->ctnwrite= true;
2649 stop= true;
2650 break;
2651
2652 default:
2653 break;
2654 } /* switch */
2655
2656 break;
2657
2658 case conn_closing:
2659 /* recovery mode, need reconnect if connection close */
2660 if (ms_setting.reconnect && (! ms_global.time_out
2661 || ((ms_setting.run_time == 0)
2662 && (c->remain_exec_num > 0))))
2663 {
2664 if (ms_reconn(c) != 0)
2665 {
2666 ms_conn_close(c);
2667 stop= true;
2668 break;
2669 }
2670
2671 ms_reset_conn(c, false);
2672
2673 if (c->total_sfds == 1)
2674 {
2675 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2676 {
2677 fprintf(stderr, "Couldn't update event.\n");
2678 ms_conn_set_state(c, conn_closing);
2679 break;
2680 }
2681 }
2682
2683 break;
2684 }
2685 else
2686 {
2687 ms_conn_close(c);
2688 stop= true;
2689 break;
2690 }
2691
2692 default:
2693 assert(0);
2694 } /* switch */
2695 }
2696 } /* ms_drive_machine */
2697
2698
2699 /**
2700 * the event handler of each thread
2701 *
2702 * @param fd, the file descriptor of socket
2703 * @param which, event flag
2704 * @param arg, argument
2705 */
2706 void ms_event_handler(const int fd, const short which, void *arg)
2707 {
2708 ms_conn_t *c= (ms_conn_t *)arg;
2709
2710 assert(c != NULL);
2711
2712 c->which= which;
2713
2714 /* sanity */
2715 if (fd != c->sfd)
2716 {
2717 fprintf(stderr,
2718 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2719 fd,
2720 c->sfd);
2721 ms_conn_close(c);
2722 exit(1);
2723 }
2724 assert(fd == c->sfd);
2725
2726 ms_drive_machine(c);
2727
2728 /* wait for next event */
2729 } /* ms_event_handler */
2730
2731
2732 /**
2733 * get the next socket descriptor index to run for replication
2734 *
2735 * @param c, pointer of the concurrency
2736 * @param cmd, command(get or set )
2737 *
2738 * @return int, if success, return the index, else return EXIT_SUCCESS
2739 */
2740 static uint32_t ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2741 {
2742 uint32_t sock_index= 0;
2743 uint32_t i= 0;
2744
2745 if (c->total_sfds == 1)
2746 {
2747 return EXIT_SUCCESS;
2748 }
2749
2750 if (ms_setting.rep_write_srv == 0)
2751 {
2752 return sock_index;
2753 }
2754
2755 do
2756 {
2757 if (cmd == CMD_SET)
2758 {
2759 for (i= 0; i < ms_setting.rep_write_srv; i++)
2760 {
2761 if (c->tcpsfd[i] > 0)
2762 {
2763 break;
2764 }
2765 }
2766
2767 if (i == ms_setting.rep_write_srv)
2768 {
2769 /* random get one replication server to read */
2770 sock_index= (uint32_t)random() % c->total_sfds;
2771 }
2772 else
2773 {
2774 /* random get one replication writing server to write */
2775 sock_index= (uint32_t)random() % ms_setting.rep_write_srv;
2776 }
2777 }
2778 else if (cmd == CMD_GET)
2779 {
2780 /* random get one replication server to read */
2781 sock_index= (uint32_t)random() % c->total_sfds;
2782 }
2783 }
2784 while (c->tcpsfd[sock_index] == 0);
2785
2786 return sock_index;
2787 } /* ms_get_rep_sock_index */
2788
2789
2790 /**
2791 * get the next socket descriptor index to run
2792 *
2793 * @param c, pointer of the concurrency
2794 *
2795 * @return int, return the index
2796 */
2797 static uint32_t ms_get_next_sock_index(ms_conn_t *c)
2798 {
2799 uint32_t sock_index= 0;
2800
2801 do
2802 {
2803 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2804 }
2805 while (c->tcpsfd[sock_index] == 0);
2806
2807 return sock_index;
2808 } /* ms_get_next_sock_index */
2809
2810
2811 /**
2812 * update socket event of the connections
2813 *
2814 * @param c, pointer of the concurrency
2815 *
2816 * @return int, if success, return EXIT_SUCCESS, else return -1
2817 */
2818 static int ms_update_conn_sock_event(ms_conn_t *c)
2819 {
2820 assert(c != NULL);
2821
2822 switch (c->currcmd.cmd)
2823 {
2824 case CMD_SET:
2825 if (ms_setting.facebook_test && c->udp)
2826 {
2827 c->sfd= c->tcpsfd[0];
2828 c->udp= false;
2829 c->change_sfd= true;
2830 }
2831 break;
2832
2833 case CMD_GET:
2834 if (ms_setting.facebook_test && ! c->udp)
2835 {
2836 c->sfd= c->udpsfd;
2837 c->udp= true;
2838 c->change_sfd= true;
2839 }
2840 break;
2841
2842 default:
2843 break;
2844 } /* switch */
2845
2846 if (! c->udp && (c->total_sfds > 1))
2847 {
2848 if (c->cur_idx != c->total_sfds)
2849 {
2850 if (ms_setting.rep_write_srv == 0)
2851 {
2852 c->cur_idx= ms_get_next_sock_index(c);
2853 }
2854 else
2855 {
2856 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2857 }
2858 }
2859 else
2860 {
2861 /* must select the first sock of the connection at the beginning */
2862 c->cur_idx= 0;
2863 }
2864
2865 c->sfd= c->tcpsfd[c->cur_idx];
2866 assert(c->sfd != 0);
2867 c->change_sfd= true;
2868 }
2869
2870 if (c->change_sfd)
2871 {
2872 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2873 {
2874 fprintf(stderr, "Couldn't update event.\n");
2875 ms_conn_set_state(c, conn_closing);
2876 return -1;
2877 }
2878 }
2879
2880 return EXIT_SUCCESS;
2881 } /* ms_update_conn_sock_event */
2882
2883
2884 /**
2885 * for ASCII protocol, this function build the set command
2886 * string and send the command.
2887 *
2888 * @param c, pointer of the concurrency
2889 * @param item, pointer of task item which includes the object
2890 * information
2891 *
2892 * @return int, if success, return EXIT_SUCCESS, else return -1
2893 */
2894 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2895 {
2896 int value_offset;
2897 int write_len;
2898 char *buffer= c->wbuf;
2899
2900 write_len= snprintf(buffer,
2901 c->wsize,
2902 " %u %d %d\r\n",
2903 0,
2904 item->exp_time,
2905 item->value_size);
2906
2907 if (write_len > c->wsize || write_len < 0)
2908 {
2909 /* ought to be always enough. just fail for simplicity */
2910 fprintf(stderr, "output command line too long.\n");
2911 return -1;
2912 }
2913
2914 if (item->value_offset == INVALID_OFFSET)
2915 {
2916 value_offset= item->key_suffix_offset;
2917 }
2918 else
2919 {
2920 value_offset= item->value_offset;
2921 }
2922
2923 if ((ms_add_iov(c, "set ", 4) != 0)
2924 || (ms_add_iov(c, (char *)&item->key_prefix,
2925 (int)KEY_PREFIX_SIZE) != 0)
2926 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2927 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2928 || (ms_add_iov(c, buffer, write_len) != 0)
2929 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2930 item->value_size) != 0)
2931 || (ms_add_iov(c, "\r\n", 2) != 0)
2932 || (c->udp && (ms_build_udp_headers(c) != 0)))
2933 {
2934 return -1;
2935 }
2936
2937 return EXIT_SUCCESS;
2938 } /* ms_build_ascii_write_buf_set */
2939
2940
2941 /**
2942 * used to send set command to server
2943 *
2944 * @param c, pointer of the concurrency
2945 * @param item, pointer of task item which includes the object
2946 * information
2947 *
2948 * @return int, if success, return EXIT_SUCCESS, else return -1
2949 */
2950 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2951 {
2952 assert(c != NULL);
2953
2954 c->currcmd.cmd= CMD_SET;
2955 c->currcmd.isfinish= false;
2956 c->currcmd.retstat= MCD_FAILURE;
2957
2958 if (ms_update_conn_sock_event(c) != 0)
2959 {
2960 return -1;
2961 }
2962
2963 c->msgcurr= 0;
2964 c->msgused= 0;
2965 c->iovused= 0;
2966 if (ms_add_msghdr(c) != 0)
2967 {
2968 fprintf(stderr, "Out of memory preparing request.");
2969 return -1;
2970 }
2971
2972 /* binary protocol */
2973 if (c->protocol == binary_prot)
2974 {
2975 if (ms_build_bin_write_buf_set(c, item) != 0)
2976 {
2977 return -1;
2978 }
2979 }
2980 else
2981 {
2982 if (ms_build_ascii_write_buf_set(c, item) != 0)
2983 {
2984 return -1;
2985 }
2986 }
2987
2988 atomic_add_size(&ms_stats.obj_bytes,
2989 item->key_size + item->value_size);
2990 atomic_add_size(&ms_stats.cmd_set, 1);
2991
2992 return EXIT_SUCCESS;
2993 } /* ms_mcd_set */
2994
2995
2996 /**
2997 * for ASCII protocol, this function build the get command
2998 * string and send the command.
2999 *
3000 * @param c, pointer of the concurrency
3001 * @param item, pointer of task item which includes the object
3002 * information
3003 *
3004 * @return int, if success, return EXIT_SUCCESS, else return -1
3005 */
3006 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3007 {
3008 if ((ms_add_iov(c, "get ", 4) != 0)
3009 || (ms_add_iov(c, (char *)&item->key_prefix,
3010 (int)KEY_PREFIX_SIZE) != 0)
3011 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3012 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3013 || (ms_add_iov(c, "\r\n", 2) != 0)
3014 || (c->udp && (ms_build_udp_headers(c) != 0)))
3015 {
3016 return -1;
3017 }
3018
3019 return EXIT_SUCCESS;
3020 } /* ms_build_ascii_write_buf_get */
3021
3022
3023 /**
3024 * used to send the get command to server
3025 *
3026 * @param c, pointer of the concurrency
3027 * @param item, pointer of task item which includes the object
3028 * information
3029 *
3030 * @return int, if success, return EXIT_SUCCESS, else return -1
3031 */
3032 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item)
3033 {
3034 assert(c != NULL);
3035
3036 c->currcmd.cmd= CMD_GET;
3037 c->currcmd.isfinish= false;
3038 c->currcmd.retstat= MCD_FAILURE;
3039
3040 if (ms_update_conn_sock_event(c) != 0)
3041 {
3042 return -1;
3043 }
3044
3045 c->msgcurr= 0;
3046 c->msgused= 0;
3047 c->iovused= 0;
3048 if (ms_add_msghdr(c) != 0)
3049 {
3050 fprintf(stderr, "Out of memory preparing request.");
3051 return -1;
3052 }
3053
3054 /* binary protocol */
3055 if (c->protocol == binary_prot)
3056 {
3057 if (ms_build_bin_write_buf_get(c, item) != 0)
3058 {
3059 return -1;
3060 }
3061 }
3062 else
3063 {
3064 if (ms_build_ascii_write_buf_get(c, item) != 0)
3065 {
3066 return -1;
3067 }
3068 }
3069
3070 atomic_add_size(&ms_stats.cmd_get, 1);
3071
3072 return EXIT_SUCCESS;
3073 } /* ms_mcd_get */
3074
3075
3076 /**
3077 * for ASCII protocol, this function build the multi-get command
3078 * string and send the command.
3079 *
3080 * @param c, pointer of the concurrency
3081 *
3082 * @return int, if success, return EXIT_SUCCESS, else return -1
3083 */
3084 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3085 {
3086 ms_task_item_t *item;
3087
3088 if (ms_add_iov(c, "get", 3) != 0)
3089 {
3090 return -1;
3091 }
3092
3093 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3094 {
3095 item= c->mlget_task.mlget_item[i].item;
3096 assert(item != NULL);
3097 if ((ms_add_iov(c, " ", 1) != 0)
3098 || (ms_add_iov(c, (char *)&item->key_prefix,
3099 (int)KEY_PREFIX_SIZE) != 0)
3100 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3101 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3102 {
3103 return -1;
3104 }
3105 }
3106
3107 if ((ms_add_iov(c, "\r\n", 2) != 0)
3108 || (c->udp && (ms_build_udp_headers(c) != 0)))
3109 {
3110 return -1;
3111 }
3112
3113 return EXIT_SUCCESS;
3114 } /* ms_build_ascii_write_buf_mlget */
3115
3116
3117 /**
3118 * used to send the multi-get command to server
3119 *
3120 * @param c, pointer of the concurrency
3121 *
3122 * @return int, if success, return EXIT_SUCCESS, else return -1
3123 */
3124 int ms_mcd_mlget(ms_conn_t *c)
3125 {
3126 ms_task_item_t *item;
3127
3128 assert(c != NULL);
3129 assert(c->mlget_task.mlget_num >= 1);
3130
3131 c->currcmd.cmd= CMD_GET;
3132 c->currcmd.isfinish= false;
3133 c->currcmd.retstat= MCD_FAILURE;
3134
3135 if (ms_update_conn_sock_event(c) != 0)
3136 {
3137 return -1;
3138 }
3139
3140 c->msgcurr= 0;
3141 c->msgused= 0;
3142 c->iovused= 0;
3143 if (ms_add_msghdr(c) != 0)
3144 {
3145 fprintf(stderr, "Out of memory preparing request.");
3146 return -1;
3147 }
3148
3149 /* binary protocol */
3150 if (c->protocol == binary_prot)
3151 {
3152 if (ms_build_bin_write_buf_mlget(c) != 0)
3153 {
3154 return -1;
3155 }
3156 }
3157 else
3158 {
3159 if (ms_build_ascii_write_buf_mlget(c) != 0)
3160 {
3161 return -1;
3162 }
3163 }
3164
3165 /* decrease operation time of each item */
3166 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3167 {
3168 item= c->mlget_task.mlget_item[i].item;
3169 atomic_add_size(&ms_stats.cmd_get, 1);
3170 }
3171
3172 (void)item;
3173
3174 return EXIT_SUCCESS;
3175 } /* ms_mcd_mlget */
3176
3177
3178 /**
3179 * binary protocol support
3180 */
3181
3182 /**
3183 * for binary protocol, parse the response of server
3184 *
3185 * @param c, pointer of the concurrency
3186 *
3187 * @return int, if success, return EXIT_SUCCESS, else return -1
3188 */
3189 static int ms_bin_process_response(ms_conn_t *c)
3190 {
3191 const char *errstr= NULL;
3192
3193 assert(c != NULL);
3194
3195 uint32_t bodylen= c->binary_header.response.bodylen;
3196 uint8_t opcode= c->binary_header.response.opcode;
3197 uint16_t status= c->binary_header.response.status;
3198
3199 if (bodylen > 0)
3200 {
3201 c->rvbytes= (int32_t)bodylen;
3202 c->readval= true;
3203 return EXIT_FAILURE;
3204 }
3205 else
3206 {
3207 switch (status)
3208 {
3209 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3210 if (opcode == PROTOCOL_BINARY_CMD_SET)
3211 {
3212 c->currcmd.retstat= MCD_STORED;
3213 }
3214 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3215 {
3216 c->currcmd.retstat= MCD_DELETED;
3217 }
3218 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3219 {
3220 c->currcmd.retstat= MCD_END;
3221 }
3222 break;
3223
3224 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3225 errstr= "Out of memory";
3226 c->currcmd.retstat= MCD_SERVER_ERROR;
3227 break;
3228
3229 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3230 errstr= "Unknown command";
3231 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3232 break;
3233
3234 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3235 errstr= "Not found";
3236 c->currcmd.retstat= MCD_NOTFOUND;
3237 break;
3238
3239 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3240 errstr= "Invalid arguments";
3241 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3242 break;
3243
3244 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3245 errstr= "Data exists for key.";
3246 break;
3247
3248 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3249 errstr= "Too large.";
3250 c->currcmd.retstat= MCD_SERVER_ERROR;
3251 break;
3252
3253 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3254 errstr= "Not stored.";
3255 c->currcmd.retstat= MCD_NOTSTORED;
3256 break;
3257
3258 default:
3259 errstr= "Unknown error";
3260 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3261 break;
3262 } /* switch */
3263
3264 if (errstr != NULL)
3265 {
3266 fprintf(stderr, "%s\n", errstr);
3267 }
3268 }
3269
3270 return EXIT_SUCCESS;
3271 } /* ms_bin_process_response */
3272
3273
3274 /* build binary header and add the header to the buffer to send */
3275
3276 /**
3277 * build binary header and add the header to the buffer to send
3278 *
3279 * @param c, pointer of the concurrency
3280 * @param opcode, operation code
3281 * @param hdr_len, length of header
3282 * @param key_len, length of key
3283 * @param body_len. length of body
3284 */
3285 static void ms_add_bin_header(ms_conn_t *c,
3286 uint8_t opcode,
3287 uint8_t hdr_len,
3288 uint16_t key_len,
3289 uint32_t body_len)
3290 {
3291 protocol_binary_request_header *header;
3292
3293 assert(c != NULL);
3294
3295 header= (protocol_binary_request_header *)c->wcurr;
3296
3297 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3298 header->request.opcode= (uint8_t)opcode;
3299 header->request.keylen= htons(key_len);
3300
3301 header->request.extlen= (uint8_t)hdr_len;
3302 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3303 header->request.vbucket= 0;
3304
3305 header->request.bodylen= htonl(body_len);
3306 header->request.opaque= 0;
3307 header->request.cas= 0;
3308
3309 ms_add_iov(c, c->wcurr, sizeof(header->request));
3310 } /* ms_add_bin_header */
3311
3312
3313 /**
3314 * add the key to the socket write buffer array
3315 *
3316 * @param c, pointer of the concurrency
3317 * @param item, pointer of task item which includes the object
3318 * information
3319 */
3320 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3321 {
3322 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3323 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3324 item->key_size - (int)KEY_PREFIX_SIZE);
3325 }
3326
3327
3328 /**
3329 * for binary protocol, this function build the set command
3330 * and add the command to send buffer array.
3331 *
3332 * @param c, pointer of the concurrency
3333 * @param item, pointer of task item which includes the object
3334 * information
3335 *
3336 * @return int, if success, return EXIT_SUCCESS, else return -1
3337 */
3338 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3339 {
3340 assert(c->wbuf == c->wcurr);
3341
3342 int value_offset;
3343 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3344 uint16_t keylen= (uint16_t)item->key_size;
3345 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3346 + (uint32_t)keylen + (uint32_t)item->value_size;
3347
3348 ms_add_bin_header(c,
3349 PROTOCOL_BINARY_CMD_SET,
3350 sizeof(rep->message.body),
3351 keylen,
3352 bodylen);
3353 rep->message.body.flags= 0;
3354 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3355 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3356 ms_add_key_to_iov(c, item);
3357
3358 if (item->value_offset == INVALID_OFFSET)
3359 {
3360 value_offset= item->key_suffix_offset;
3361 }
3362 else
3363 {
3364 value_offset= item->value_offset;
3365 }
3366 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3367
3368 return EXIT_SUCCESS;
3369 } /* ms_build_bin_write_buf_set */
3370
3371
3372 /**
3373 * for binary protocol, this function build the get command and
3374 * add the command to send buffer array.
3375 *
3376 * @param c, pointer of the concurrency
3377 * @param item, pointer of task item which includes the object
3378 * information
3379 *
3380 * @return int, if success, return EXIT_SUCCESS, else return -1
3381 */
3382 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3383 {
3384 assert(c->wbuf == c->wcurr);
3385
3386 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3387 (uint32_t)item->key_size);
3388 ms_add_key_to_iov(c, item);
3389
3390 return EXIT_SUCCESS;
3391 } /* ms_build_bin_write_buf_get */
3392
3393
3394 /**
3395 * for binary protocol, this function build the multi-get
3396 * command and add the command to send buffer array.
3397 *
3398 * @param c, pointer of the concurrency
3399 * @param item, pointer of task item which includes the object
3400 * information
3401 *
3402 * @return int, if success, return EXIT_SUCCESS, else return -1
3403 */
3404 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3405 {
3406 ms_task_item_t *item;
3407
3408 assert(c->wbuf == c->wcurr);
3409
3410 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3411 {
3412 item= c->mlget_task.mlget_item[i].item;
3413 assert(item != NULL);
3414
3415 ms_add_bin_header(c,
3416 PROTOCOL_BINARY_CMD_GET,
3417 0,
3418 (uint16_t)item->key_size,
3419 (uint32_t)item->key_size);
3420 ms_add_key_to_iov(c, item);
3421 c->wcurr+= sizeof(protocol_binary_request_get);
3422 }
3423
3424 c->wcurr= c->wbuf;
3425
3426 return EXIT_SUCCESS;
3427 } /* ms_build_bin_write_buf_mlget */