Deal with atomic stats variables on 32-bit machines.
[m6w6/libmemcached] / clients / ms_conn.c
1 /*
2 * File: ms_conn.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include <stdio.h>
15 #include <inttypes.h>
16 #include <limits.h>
17 #include <sys/uio.h>
18 #include <event.h>
19 #include <fcntl.h>
20 #include <netinet/tcp.h>
21 #include <arpa/inet.h>
22 #include "ms_setting.h"
23 #include "ms_thread.h"
24 #include "ms_atomic.h"
25
26 #ifdef linux
27 /* /usr/include/netinet/in.h defines macros from ntohs() to _bswap_nn to
28 * optimize the conversion functions, but the prototypes generate warnings
29 * from gcc. The conversion methods isn't the bottleneck for my app, so
30 * just remove the warnings by undef'ing the optimization ..
31 */
32 #undef ntohs
33 #undef ntohl
34 #undef htons
35 #undef htonl
36 #endif
37
38 /* for network write */
39 #define TRANSMIT_COMPLETE 0
40 #define TRANSMIT_INCOMPLETE 1
41 #define TRANSMIT_SOFT_ERROR 2
42 #define TRANSMIT_HARD_ERROR 3
43
44 /* for generating key */
45 #define KEY_PREFIX_BASE 0x1010101010101010 /* not include ' ' '\r' '\n' '\0' */
46 #define KEY_PREFIX_MASK 0x1010101010101010
47
48 /* For parse the value length return by server */
49 #define KEY_TOKEN 1
50 #define VALUELEN_TOKEN 3
51
52 /* global increasing counter, to ensure the key prefix unique */
53 static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
54
55 /* global increasing counter, generating request id for UDP */
56 static volatile uint32_t udp_request_id= 0;
57
58 extern pthread_key_t ms_thread_key;
59
60 /* generate upd request id */
61 static uint32_t ms_get_udp_request_id(void);
62
63
64 /* connect initialize */
65 static void ms_task_init(ms_conn_t *c);
66 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp);
67 static int ms_conn_sock_init(ms_conn_t *c);
68 static int ms_conn_event_init(ms_conn_t *c);
69 static int ms_conn_init(ms_conn_t *c,
70 const int init_state,
71 const int read_buffer_size,
72 const bool is_udp);
73 static void ms_warmup_num_init(ms_conn_t *c);
74 static int ms_item_win_init(ms_conn_t *c);
75
76
77 /* connection close */
78 void ms_conn_free(ms_conn_t *c);
79 static void ms_conn_close(ms_conn_t *c);
80
81
82 /* create network connection */
83 static int ms_new_socket(struct addrinfo *ai);
84 static void ms_maximize_sndbuf(const int sfd);
85 static int ms_network_connect(ms_conn_t *c,
86 char *srv_host_name,
87 const int srv_port,
88 const bool is_udp,
89 int *ret_sfd);
90 static int ms_reconn(ms_conn_t *c);
91
92
93 /* read and parse */
94 static int ms_tokenize_command(char *command,
95 token_t *tokens,
96 const int max_tokens);
97 static int ms_ascii_process_line(ms_conn_t *c, char *command);
98 static int ms_try_read_line(ms_conn_t *c);
99 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes);
100 static int ms_udp_read(ms_conn_t *c, char *buf, int len);
101 static int ms_try_read_network(ms_conn_t *c);
102 static void ms_verify_value(ms_conn_t *c,
103 ms_mlget_task_item_t *mlget_item,
104 char *value,
105 int vlen);
106 static void ms_ascii_complete_nread(ms_conn_t *c);
107 static void ms_bin_complete_nread(ms_conn_t *c);
108 static void ms_complete_nread(ms_conn_t *c);
109
110
111 /* send functions */
112 static int ms_add_msghdr(ms_conn_t *c);
113 static int ms_ensure_iov_space(ms_conn_t *c);
114 static int ms_add_iov(ms_conn_t *c, const void *buf, int len);
115 static int ms_build_udp_headers(ms_conn_t *c);
116 static int ms_transmit(ms_conn_t *c);
117
118
119 /* status adjustment */
120 static void ms_conn_shrink(ms_conn_t *c);
121 static void ms_conn_set_state(ms_conn_t *c, int state);
122 static bool ms_update_event(ms_conn_t *c, const int new_flags);
123 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd);
124 static int ms_get_next_sock_index(ms_conn_t *c);
125 static int ms_update_conn_sock_event(ms_conn_t *c);
126 static bool ms_need_yield(ms_conn_t *c);
127 static void ms_update_start_time(ms_conn_t *c);
128
129
130 /* main loop */
131 static void ms_drive_machine(ms_conn_t *c);
132 void ms_event_handler(const int fd, const short which, void *arg);
133
134
135 /* ascii protocol */
136 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
137 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
138 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c);
139
140
141 /* binary protocol */
142 static int ms_bin_process_response(ms_conn_t *c);
143 static void ms_add_bin_header(ms_conn_t *c,
144 uint8_t opcode,
145 uint8_t hdr_len,
146 uint16_t key_len,
147 uint32_t body_len);
148 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item);
149 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item);
150 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item);
151 static int ms_build_bin_write_buf_mlget(ms_conn_t *c);
152
153
154 /**
155 * each key has two parts, prefix and suffix. The suffix is a
156 * string random get form the character table. The prefix is a
157 * uint64_t variable. And the prefix must be unique. we use the
158 * prefix to identify a key. And the prefix can't include
159 * character ' ' '\r' '\n' '\0'.
160 *
161 * @return uint64_t
162 */
163 uint64_t ms_get_key_prefix(void)
164 {
165 uint64_t key_prefix;
166
167 pthread_mutex_lock(&ms_global.seq_mutex);
168 key_prefix_seq|= KEY_PREFIX_MASK;
169 key_prefix= key_prefix_seq;
170 key_prefix_seq++;
171 pthread_mutex_unlock(&ms_global.seq_mutex);
172
173 return key_prefix;
174 } /* ms_get_key_prefix */
175
176
177 /**
178 * get an unique udp request id
179 *
180 * @return an unique UDP request id
181 */
182 static uint32_t ms_get_udp_request_id(void)
183 {
184 return atomic_add_32_nv(&udp_request_id, 1);
185 }
186
187
188 /**
189 * initialize current task structure
190 *
191 * @param c, pointer of the concurrency
192 */
193 static void ms_task_init(ms_conn_t *c)
194 {
195 c->curr_task.cmd= CMD_NULL;
196 c->curr_task.item= 0;
197 c->curr_task.verify= false;
198 c->curr_task.finish_verify= true;
199 c->curr_task.get_miss= true;
200
201 c->curr_task.get_opt= 0;
202 c->curr_task.set_opt= 0;
203 c->curr_task.cycle_undo_get= 0;
204 c->curr_task.cycle_undo_set= 0;
205 c->curr_task.verified_get= 0;
206 c->curr_task.overwrite_set= 0;
207 } /* ms_task_init */
208
209
210 /**
211 * initialize udp for the connection structure
212 *
213 * @param c, pointer of the concurrency
214 * @param is_udp, whether it's udp
215 *
216 * @return int, if success, return 0, else return -1
217 */
218 static int ms_conn_udp_init(ms_conn_t *c, const bool is_udp)
219 {
220 c->hdrbuf= 0;
221 c->rudpbuf= 0;
222 c->udppkt= 0;
223
224 c->rudpsize= UDP_DATA_BUFFER_SIZE;
225 c->hdrsize= 0;
226
227 c->rudpbytes= 0;
228 c->packets= 0;
229 c->recvpkt= 0;
230 c->pktcurr= 0;
231 c->ordcurr= 0;
232
233 c->udp= is_udp;
234
235 if (c->udp || (! c->udp && ms_setting.facebook_test))
236 {
237 c->rudpbuf= (char *)malloc((size_t)c->rudpsize);
238 c->udppkt= (ms_udppkt_t *)malloc(MAX_UDP_PACKET * sizeof(ms_udppkt_t));
239
240 if ((c->rudpbuf == NULL) || (c->udppkt == NULL))
241 {
242 if (c->rudpbuf != NULL)
243 free(c->rudpbuf);
244 if (c->udppkt != NULL)
245 free(c->udppkt);
246 fprintf(stderr, "malloc()\n");
247 return -1;
248 }
249 memset(c->udppkt, 0, MAX_UDP_PACKET * sizeof(ms_udppkt_t));
250 }
251
252 return 0;
253 } /* ms_conn_udp_init */
254
255
256 /**
257 * initialize the connection structure
258 *
259 * @param c, pointer of the concurrency
260 * @param init_state, (conn_read, conn_write, conn_closing)
261 * @param read_buffer_size
262 * @param is_udp, whether it's udp
263 *
264 * @return int, if success, return 0, else return -1
265 */
266 static int ms_conn_init(ms_conn_t *c,
267 const int init_state,
268 const int read_buffer_size,
269 const bool is_udp)
270 {
271 assert(c != NULL);
272
273 c->rbuf= c->wbuf= 0;
274 c->iov= 0;
275 c->msglist= 0;
276
277 c->rsize= read_buffer_size;
278 c->wsize= WRITE_BUFFER_SIZE;
279 c->iovsize= IOV_LIST_INITIAL;
280 c->msgsize= MSG_LIST_INITIAL;
281
282 /* for replication, each connection need connect all the server */
283 if (ms_setting.rep_write_srv > 0)
284 {
285 c->total_sfds= ms_setting.srv_cnt;
286 }
287 else
288 {
289 c->total_sfds= ms_setting.sock_per_conn;
290 }
291 c->alive_sfds= 0;
292
293 c->rbuf= (char *)malloc((size_t)c->rsize);
294 c->wbuf= (char *)malloc((size_t)c->wsize);
295 c->iov= (struct iovec *)malloc(sizeof(struct iovec) * (size_t)c->iovsize);
296 c->msglist= (struct msghdr *)malloc(
297 sizeof(struct msghdr) * (size_t)c->msgsize);
298 if (ms_setting.mult_key_num > 1)
299 {
300 c->mlget_task.mlget_item= (ms_mlget_task_item_t *)
301 malloc(
302 sizeof(ms_mlget_task_item_t) * (size_t)ms_setting.mult_key_num);
303 }
304 c->tcpsfd= (int *)malloc((size_t)c->total_sfds * sizeof(int));
305
306 if ((c->rbuf == NULL) || (c->wbuf == NULL) || (c->iov == NULL)
307 || (c->msglist == NULL) || (c->tcpsfd == NULL)
308 || ((ms_setting.mult_key_num > 1)
309 && (c->mlget_task.mlget_item == NULL)))
310 {
311 if (c->rbuf != NULL)
312 free(c->rbuf);
313 if (c->wbuf != NULL)
314 free(c->wbuf);
315 if (c->iov != NULL)
316 free(c->iov);
317 if (c->msglist != NULL)
318 free(c->msglist);
319 if (c->mlget_task.mlget_item != NULL)
320 free(c->mlget_task.mlget_item);
321 if (c->tcpsfd != NULL)
322 free(c->tcpsfd);
323 fprintf(stderr, "malloc()\n");
324 return -1;
325 }
326
327 c->state= init_state;
328 c->rvbytes= 0;
329 c->rbytes= 0;
330 c->rcurr= c->rbuf;
331 c->wcurr= c->wbuf;
332 c->iovused= 0;
333 c->msgcurr= 0;
334 c->msgused= 0;
335 c->cur_idx= c->total_sfds; /* default index is a invalid value */
336
337 c->ctnwrite= false;
338 c->readval= false;
339 c->change_sfd= false;
340
341 c->precmd.cmd= c->currcmd.cmd= CMD_NULL;
342 c->precmd.isfinish= true; /* default the previous command finished */
343 c->currcmd.isfinish= false;
344 c->precmd.retstat= c->currcmd.retstat= MCD_FAILURE;
345 c->precmd.key_prefix= c->currcmd.key_prefix= 0;
346
347 c->mlget_task.mlget_num= 0;
348 c->mlget_task.value_index= -1; /* default invalid value */
349
350 if (ms_setting.binary_prot)
351 {
352 c->protocol= binary_prot;
353 }
354 else if (is_udp)
355 {
356 c->protocol= ascii_udp_prot;
357 }
358 else
359 {
360 c->protocol= ascii_prot;
361 }
362
363 /* initialize udp */
364 if (ms_conn_udp_init(c, is_udp) != 0)
365 {
366 return -1;
367 }
368
369 /* initialize task */
370 ms_task_init(c);
371
372 if (! (ms_setting.facebook_test && is_udp))
373 {
374 atomic_add_32(&ms_stats.active_conns, 1);
375 }
376
377 return 0;
378 } /* ms_conn_init */
379
380
381 /**
382 * when doing 100% get operation, it could preset some objects
383 * to warmup the server. this function is used to initialize the
384 * number of the objects to preset.
385 *
386 * @param c, pointer of the concurrency
387 */
388 static void ms_warmup_num_init(ms_conn_t *c)
389 {
390 /* no set operation, preset all the items in the window */
391 if (ms_setting.cmd_distr[CMD_SET].cmd_prop < PROP_ERROR)
392 {
393 c->warmup_num= c->win_size;
394 c->remain_warmup_num= c->warmup_num;
395 }
396 else
397 {
398 c->warmup_num= 0;
399 c->remain_warmup_num= c->warmup_num;
400 }
401 } /* ms_warmup_num_init */
402
403
404 /**
405 * each connection has an item window, this function initialize
406 * the window. The window is used to generate task.
407 *
408 * @param c, pointer of the concurrency
409 *
410 * @return int, if success, return 0, else return -1
411 */
412 static int ms_item_win_init(ms_conn_t *c)
413 {
414 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
415 int exp_cnt= 0;
416
417 c->win_size= (int)ms_setting.win_size;
418 c->set_cursor= 0;
419 c->exec_num= ms_thread->thread_ctx->exec_num_perconn;
420 c->remain_exec_num= c->exec_num;
421
422 c->item_win= (ms_task_item_t *)malloc(
423 sizeof(ms_task_item_t) * (size_t)c->win_size);
424 if (c->item_win == NULL)
425 {
426 fprintf(stderr, "Can't allocate task item array for conn.\n");
427 return -1;
428 }
429 memset(c->item_win, 0, sizeof(ms_task_item_t) * (size_t)c->win_size);
430
431 for (int i= 0; i < c->win_size; i++)
432 {
433 c->item_win[i].key_size= (int)ms_setting.distr[i].key_size;
434 c->item_win[i].key_prefix= ms_get_key_prefix();
435 c->item_win[i].key_suffix_offset= ms_setting.distr[i].key_offset;
436 c->item_win[i].value_size= (int)ms_setting.distr[i].value_size;
437 c->item_win[i].value_offset= INVALID_OFFSET; /* default in invalid offset */
438 c->item_win[i].client_time= 0;
439
440 /* set expire time base on the proportion */
441 if (exp_cnt < ms_setting.exp_ver_per * i)
442 {
443 c->item_win[i].exp_time= FIXED_EXPIRE_TIME;
444 exp_cnt++;
445 }
446 else
447 {
448 c->item_win[i].exp_time= 0;
449 }
450 }
451
452 ms_warmup_num_init(c);
453
454 return 0;
455 } /* ms_item_win_init */
456
457
458 /**
459 * each connection structure can include one or more sock
460 * handlers. this function create these socks and connect the
461 * server(s).
462 *
463 * @param c, pointer of the concurrency
464 *
465 * @return int, if success, return 0, else return -1
466 */
467 static int ms_conn_sock_init(ms_conn_t *c)
468 {
469 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
470 int i;
471 int ret_sfd;
472 int srv_idx= 0;
473
474 assert(c != NULL);
475 assert(c->tcpsfd != NULL);
476
477 for (i= 0; i < c->total_sfds; i++)
478 {
479 ret_sfd= 0;
480 if (ms_setting.rep_write_srv > 0)
481 {
482 /* for replication, each connection need connect all the server */
483 srv_idx= i;
484 }
485 else
486 {
487 /* all the connections in a thread connects the same server */
488 srv_idx= ms_thread->thread_ctx->srv_idx;
489 }
490
491 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
492 ms_setting.servers[srv_idx].srv_port,
493 ms_setting.udp, &ret_sfd) != 0)
494 {
495 break;
496 }
497
498 if (i == 0)
499 {
500 c->sfd= ret_sfd;
501 }
502
503 if (! ms_setting.udp)
504 {
505 c->tcpsfd[i]= ret_sfd;
506 }
507
508 c->alive_sfds++;
509 }
510
511 /* initialize udp sock handler if necessary */
512 if (ms_setting.facebook_test)
513 {
514 ret_sfd= 0;
515 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
516 ms_setting.servers[srv_idx].srv_port,
517 true, &ret_sfd) != 0)
518 {
519 c->udpsfd= 0;
520 }
521 else
522 {
523 c->udpsfd= ret_sfd;
524 }
525 }
526
527 if ((i != c->total_sfds) || (ms_setting.facebook_test && (c->udpsfd == 0)))
528 {
529 if (ms_setting.udp)
530 {
531 close(c->sfd);
532 }
533 else
534 {
535 for (int j= 0; j < i; j++)
536 {
537 close(c->tcpsfd[j]);
538 }
539 }
540
541 if (c->udpsfd != 0)
542 {
543 close(c->udpsfd);
544 }
545
546 return -1;
547 }
548
549 return 0;
550 } /* ms_conn_sock_init */
551
552
553 /**
554 * each connection is managed by libevent, this function
555 * initialize the event of the connection structure.
556 *
557 * @param c, pointer of the concurrency
558 *
559 * @return int, if success, return 0, else return -1
560 */
561 static int ms_conn_event_init(ms_conn_t *c)
562 {
563 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
564 /* default event timeout 10 seconds */
565 struct timeval t=
566 {
567 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
568 };
569 short event_flags= EV_WRITE | EV_PERSIST;
570
571 event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
572 event_base_set(ms_thread->base, &c->event);
573 c->ev_flags= event_flags;
574
575 if (c->total_sfds == 1)
576 {
577 if (event_add(&c->event, NULL) == -1)
578 {
579 return -1;
580 }
581 }
582 else
583 {
584 if (event_add(&c->event, &t) == -1)
585 {
586 return -1;
587 }
588 }
589
590 return 0;
591 } /* ms_conn_event_init */
592
593
594 /**
595 * setup a connection, each connection structure of each
596 * thread must call this function to initialize.
597 *
598 * @param c, pointer of the concurrency
599 *
600 * @return int, if success, return 0, else return -1
601 */
602 int ms_setup_conn(ms_conn_t *c)
603 {
604 if (ms_item_win_init(c) != 0)
605 {
606 return -1;
607 }
608
609 if (ms_conn_init(c, conn_write, DATA_BUFFER_SIZE, ms_setting.udp) != 0)
610 {
611 return -1;
612 }
613
614 if (ms_conn_sock_init(c) != 0)
615 {
616 return -1;
617 }
618
619 if (ms_conn_event_init(c) != 0)
620 {
621 return -1;
622 }
623
624 return 0;
625 } /* ms_setup_conn */
626
627
628 /**
629 * Frees a connection.
630 *
631 * @param c, pointer of the concurrency
632 */
633 void ms_conn_free(ms_conn_t *c)
634 {
635 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
636 if (c != NULL)
637 {
638 if (c->hdrbuf != NULL)
639 free(c->hdrbuf);
640 if (c->msglist != NULL)
641 free(c->msglist);
642 if (c->rbuf != NULL)
643 free(c->rbuf);
644 if (c->wbuf != NULL)
645 free(c->wbuf);
646 if (c->iov != NULL)
647 free(c->iov);
648 if (c->mlget_task.mlget_item != NULL)
649 free(c->mlget_task.mlget_item);
650 if (c->rudpbuf != NULL)
651 free(c->rudpbuf);
652 if (c->udppkt != NULL)
653 free(c->udppkt);
654 if (c->item_win != NULL)
655 free(c->item_win);
656 if (c->tcpsfd != NULL)
657 free(c->tcpsfd);
658
659 if (--ms_thread->nactive_conn == 0)
660 {
661 free(ms_thread->conn);
662 }
663 }
664 } /* ms_conn_free */
665
666
667 /**
668 * close a connection
669 *
670 * @param c, pointer of the concurrency
671 */
672 static void ms_conn_close(ms_conn_t *c)
673 {
674 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
675 assert(c != NULL);
676
677 /* delete the event, the socket and the connection */
678 event_del(&c->event);
679
680 for (int i= 0; i < c->total_sfds; i++)
681 {
682 if (c->tcpsfd[i] > 0)
683 {
684 close(c->tcpsfd[i]);
685 }
686 }
687 c->sfd= 0;
688
689 if (ms_setting.facebook_test)
690 {
691 close(c->udpsfd);
692 }
693
694 atomic_dec_32(&ms_stats.active_conns);
695
696 ms_conn_free(c);
697
698 if (ms_setting.run_time == 0)
699 {
700 pthread_mutex_lock(&ms_global.run_lock.lock);
701 ms_global.run_lock.count++;
702 pthread_cond_signal(&ms_global.run_lock.cond);
703 pthread_mutex_unlock(&ms_global.run_lock.lock);
704 }
705
706 if (ms_thread->nactive_conn == 0)
707 {
708 pthread_exit(NULL);
709 }
710 } /* ms_conn_close */
711
712
713 /**
714 * create a new sock
715 *
716 * @param ai, server address information
717 *
718 * @return int, if success, return 0, else return -1
719 */
720 static int ms_new_socket(struct addrinfo *ai)
721 {
722 int sfd;
723
724 if ((sfd= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1)
725 {
726 fprintf(stderr, "socket() error: %s.\n", strerror(errno));
727 return -1;
728 }
729
730 return sfd;
731 } /* ms_new_socket */
732
733
734 /**
735 * Sets a socket's send buffer size to the maximum allowed by the system.
736 *
737 * @param sfd, file descriptor of socket
738 */
739 static void ms_maximize_sndbuf(const int sfd)
740 {
741 socklen_t intsize= sizeof(int);
742 unsigned int last_good= 0;
743 unsigned int min, max, avg;
744 unsigned int old_size;
745
746 /* Start with the default size. */
747 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0)
748 {
749 fprintf(stderr, "getsockopt(SO_SNDBUF)\n");
750 return;
751 }
752
753 /* Binary-search for the real maximum. */
754 min= old_size;
755 max= MAX_SENDBUF_SIZE;
756
757 while (min <= max)
758 {
759 avg= ((unsigned int)(min + max)) / 2;
760 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0)
761 {
762 last_good= avg;
763 min= avg + 1;
764 }
765 else
766 {
767 max= avg - 1;
768 }
769 }
770 } /* ms_maximize_sndbuf */
771
772
773 /**
774 * socket connects the server
775 *
776 * @param c, pointer of the concurrency
777 * @param srv_host_name, the host name of the server
778 * @param srv_port, port of server
779 * @param is_udp, whether it's udp
780 * @param ret_sfd, the connected socket file descriptor
781 *
782 * @return int, if success, return 0, else return -1
783 */
784 static int ms_network_connect(ms_conn_t *c,
785 char *srv_host_name,
786 const int srv_port,
787 const bool is_udp,
788 int *ret_sfd)
789 {
790 int sfd;
791 struct linger ling=
792 {
793 0, 0
794 };
795 struct addrinfo *ai;
796 struct addrinfo *next;
797 struct addrinfo hints;
798 char port_buf[NI_MAXSERV];
799 int error;
800 int success= 0;
801
802 int flags= 1;
803
804 /*
805 * the memset call clears nonstandard fields in some impementations
806 * that otherwise mess things up.
807 */
808 memset(&hints, 0, sizeof(hints));
809 hints.ai_flags= AI_PASSIVE | AI_ADDRCONFIG;
810 if (is_udp)
811 {
812 hints.ai_protocol= IPPROTO_UDP;
813 hints.ai_socktype= SOCK_DGRAM;
814 hints.ai_family= AF_INET; /* This left here because of issues with OSX 10.5 */
815 }
816 else
817 {
818 hints.ai_family= AF_UNSPEC;
819 hints.ai_protocol= IPPROTO_TCP;
820 hints.ai_socktype= SOCK_STREAM;
821 }
822
823 snprintf(port_buf, NI_MAXSERV, "%d", srv_port);
824 error= getaddrinfo(srv_host_name, port_buf, &hints, &ai);
825 if (error != 0)
826 {
827 if (error != EAI_SYSTEM)
828 fprintf(stderr, "getaddrinfo(): %s.\n", gai_strerror(error));
829 else
830 perror("getaddrinfo()\n");
831
832 return -1;
833 }
834
835 for (next= ai; next; next= next->ai_next)
836 {
837 if ((sfd= ms_new_socket(next)) == -1)
838 {
839 freeaddrinfo(ai);
840 return -1;
841 }
842
843 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
844 if (is_udp)
845 {
846 ms_maximize_sndbuf(sfd);
847 }
848 else
849 {
850 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags,
851 sizeof(flags));
852 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
853 setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags,
854 sizeof(flags));
855 }
856
857 if (is_udp)
858 {
859 c->srv_recv_addr_size= sizeof(struct sockaddr);
860 memcpy(&c->srv_recv_addr, next->ai_addr, c->srv_recv_addr_size);
861 }
862 else
863 {
864 if (connect(sfd, next->ai_addr, next->ai_addrlen) == -1)
865 {
866 close(sfd);
867 freeaddrinfo(ai);
868 return -1;
869 }
870 }
871
872 if (((flags= fcntl(sfd, F_GETFL, 0)) < 0)
873 || (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0))
874 {
875 fprintf(stderr, "setting O_NONBLOCK\n");
876 close(sfd);
877 freeaddrinfo(ai);
878 return -1;
879 }
880
881 if (ret_sfd != NULL)
882 {
883 *ret_sfd= sfd;
884 }
885
886 success++;
887 }
888
889 freeaddrinfo(ai);
890
891 /* Return zero if we detected no errors in starting up connections */
892 return success == 0;
893 } /* ms_network_connect */
894
895
896 /**
897 * reconnect a disconnected sock
898 *
899 * @param c, pointer of the concurrency
900 *
901 * @return int, if success, return 0, else return -1
902 */
903 static int ms_reconn(ms_conn_t *c)
904 {
905 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
906 int srv_idx= 0;
907 int32_t srv_conn_cnt= 0;
908
909 if (ms_setting.rep_write_srv > 0)
910 {
911 srv_idx= c->cur_idx;
912 srv_conn_cnt= ms_setting.nconns;
913 }
914 else
915 {
916 srv_idx= ms_thread->thread_ctx->srv_idx;
917 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
918 }
919
920 /* close the old socket handler */
921 close(c->sfd);
922 c->tcpsfd[c->cur_idx]= 0;
923
924 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].disconn_cnt, 1)
925 % (uint32_t)srv_conn_cnt == 0)
926 {
927 gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
928 fprintf(stderr, "Server %s:%d disconnect\n",
929 ms_setting.servers[srv_idx].srv_host_name,
930 ms_setting.servers[srv_idx].srv_port);
931 }
932
933 if (ms_setting.rep_write_srv > 0)
934 {
935 int i= 0;
936 for (i= 0; i < c->total_sfds; i++)
937 {
938 if (c->tcpsfd[i] != 0)
939 {
940 break;
941 }
942 }
943
944 /* all socks disconnect */
945 if (i == c->total_sfds)
946 {
947 return -1;
948 }
949 }
950 else
951 {
952 do
953 {
954 /* reconnect success, break the loop */
955 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
956 ms_setting.servers[srv_idx].srv_port,
957 ms_setting.udp, &c->sfd) == 0)
958 {
959 c->tcpsfd[c->cur_idx]= c->sfd;
960 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
961 % (uint32_t)srv_conn_cnt == 0)
962 {
963 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
964 int reconn_time=
965 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
966 - ms_setting.servers[srv_idx].disconn_time
967 .tv_sec);
968 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
969 ms_setting.servers[srv_idx].srv_host_name,
970 ms_setting.servers[srv_idx].srv_port, reconn_time);
971 }
972 break;
973 }
974
975 if (c->total_sfds == 1)
976 {
977 /* wait a second and reconnect */
978 sleep(1);
979 }
980 }
981 while (c->total_sfds == 1);
982 }
983
984 if ((c->total_sfds > 1) && (c->tcpsfd[c->cur_idx] == 0))
985 {
986 c->sfd= 0;
987 c->alive_sfds--;
988 }
989
990 return 0;
991 } /* ms_reconn */
992
993
994 /**
995 * reconnect several disconnected socks in the connection
996 * structure, the ever-1-second timer of the thread will check
997 * whether some socks in the connections disconnect. if
998 * disconnect, reconnect the sock.
999 *
1000 * @param c, pointer of the concurrency
1001 *
1002 * @return int, if success, return 0, else return -1
1003 */
1004 int ms_reconn_socks(ms_conn_t *c)
1005 {
1006 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
1007 int srv_idx= 0;
1008 int ret_sfd= 0;
1009 int srv_conn_cnt= 0;
1010 struct timeval cur_time;
1011
1012 assert(c != NULL);
1013
1014 if ((c->total_sfds == 1) || (c->total_sfds == c->alive_sfds))
1015 {
1016 return 0;
1017 }
1018
1019 for (int i= 0; i < c->total_sfds; i++)
1020 {
1021 if (c->tcpsfd[i] == 0)
1022 {
1023 gettimeofday(&cur_time, NULL);
1024
1025 /**
1026 * For failover test of replication, reconnect the socks after
1027 * it disconnects more than 5 seconds, Otherwise memslap will
1028 * block at connect() function and the work threads can't work
1029 * in this interval.
1030 */
1031 if (cur_time.tv_sec
1032 - ms_setting.servers[srv_idx].disconn_time.tv_sec < 5)
1033 {
1034 break;
1035 }
1036
1037 if (ms_setting.rep_write_srv > 0)
1038 {
1039 srv_idx= i;
1040 srv_conn_cnt= ms_setting.nconns;
1041 }
1042 else
1043 {
1044 srv_idx= ms_thread->thread_ctx->srv_idx;
1045 srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
1046 }
1047
1048 if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
1049 ms_setting.servers[srv_idx].srv_port,
1050 ms_setting.udp, &ret_sfd) == 0)
1051 {
1052 c->tcpsfd[i]= ret_sfd;
1053 c->alive_sfds++;
1054
1055 if (atomic_add_32_nv(&ms_setting.servers[srv_idx].reconn_cnt, 1)
1056 % (uint32_t)srv_conn_cnt == 0)
1057 {
1058 gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
1059 int reconn_time=
1060 (int)(ms_setting.servers[srv_idx].reconn_time.tv_sec
1061 - ms_setting.servers[srv_idx].disconn_time
1062 .tv_sec);
1063 fprintf(stderr, "Server %s:%d reconnect after %ds\n",
1064 ms_setting.servers[srv_idx].srv_host_name,
1065 ms_setting.servers[srv_idx].srv_port, reconn_time);
1066 }
1067 }
1068 }
1069 }
1070
1071 return 0;
1072 } /* ms_reconn_socks */
1073
1074
1075 /**
1076 * Tokenize the command string by replacing whitespace with '\0' and update
1077 * the token array tokens with pointer to start of each token and length.
1078 * Returns total number of tokens. The last valid token is the terminal
1079 * token (value points to the first unprocessed character of the string and
1080 * length zero).
1081 *
1082 * Usage example:
1083 *
1084 * while(ms_tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
1085 * for(int ix = 0; tokens[ix].length != 0; ix++) {
1086 * ...
1087 * }
1088 * ncommand = tokens[ix].value - command;
1089 * command = tokens[ix].value;
1090 * }
1091 *
1092 * @param command, the command string to token
1093 * @param tokens, array to store tokens
1094 * @param max_tokens, maximum tokens number
1095 *
1096 * @return int, the number of tokens
1097 */
1098 static int ms_tokenize_command(char *command,
1099 token_t *tokens,
1100 const int max_tokens)
1101 {
1102 char *s, *e;
1103 int ntokens= 0;
1104
1105 assert(command != NULL && tokens != NULL && max_tokens > 1);
1106
1107 for (s= e= command; ntokens < max_tokens - 1; ++e)
1108 {
1109 if (*e == ' ')
1110 {
1111 if (s != e)
1112 {
1113 tokens[ntokens].value= s;
1114 tokens[ntokens].length= (size_t)(e - s);
1115 ntokens++;
1116 *e= '\0';
1117 }
1118 s= e + 1;
1119 }
1120 else if (*e == '\0')
1121 {
1122 if (s != e)
1123 {
1124 tokens[ntokens].value= s;
1125 tokens[ntokens].length= (size_t)(e - s);
1126 ntokens++;
1127 }
1128
1129 break; /* string end */
1130 }
1131 }
1132
1133 return ntokens;
1134 } /* ms_tokenize_command */
1135
1136
1137 /**
1138 * parse the response of server.
1139 *
1140 * @param c, pointer of the concurrency
1141 * @param command, the string responded by server
1142 *
1143 * @return int, if the command completed return 0, else return
1144 * -1
1145 */
1146 static int ms_ascii_process_line(ms_conn_t *c, char *command)
1147 {
1148 int ret= 0;
1149 int64_t value_len;
1150 char *buffer= command;
1151
1152 assert(c != NULL);
1153
1154 /**
1155 * for command get, we store the returned value into local buffer
1156 * then continue in ms_complete_nread().
1157 */
1158
1159 switch (buffer[0])
1160 {
1161 case 'V': /* VALUE || VERSION */
1162 if (buffer[1] == 'A') /* VALUE */
1163 {
1164 token_t tokens[MAX_TOKENS];
1165 ms_tokenize_command(command, tokens, MAX_TOKENS);
1166 value_len= strtol(tokens[VALUELEN_TOKEN].value, NULL, 10);
1167 c->currcmd.key_prefix= *(uint64_t *)tokens[KEY_TOKEN].value;
1168
1169 /*
1170 * We read the \r\n into the string since not doing so is more
1171 * cycles then the waster of memory to do so.
1172 *
1173 * We are null terminating through, which will most likely make
1174 * some people lazy about using the return length.
1175 */
1176 c->rvbytes= (int)(value_len + 2);
1177 c->readval= true;
1178 ret= -1;
1179 }
1180
1181 break;
1182
1183 case 'O': /* OK */
1184 c->currcmd.retstat= MCD_SUCCESS;
1185
1186 case 'S': /* STORED STATS SERVER_ERROR */
1187 if (buffer[2] == 'A') /* STORED STATS */
1188 { /* STATS*/
1189 c->currcmd.retstat= MCD_STAT;
1190 }
1191 else if (buffer[1] == 'E')
1192 {
1193 /* SERVER_ERROR */
1194 printf("<%d %s\n", c->sfd, buffer);
1195
1196 c->currcmd.retstat= MCD_SERVER_ERROR;
1197 }
1198 else if (buffer[1] == 'T')
1199 {
1200 /* STORED */
1201 c->currcmd.retstat= MCD_STORED;
1202 }
1203 else
1204 {
1205 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1206 }
1207 break;
1208
1209 case 'D': /* DELETED DATA */
1210 if (buffer[1] == 'E')
1211 {
1212 c->currcmd.retstat= MCD_DELETED;
1213 }
1214 else
1215 {
1216 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1217 }
1218
1219 break;
1220
1221 case 'N': /* NOT_FOUND NOT_STORED*/
1222 if (buffer[4] == 'F')
1223 {
1224 c->currcmd.retstat= MCD_NOTFOUND;
1225 }
1226 else if (buffer[4] == 'S')
1227 {
1228 printf("<%d %s\n", c->sfd, buffer);
1229 c->currcmd.retstat= MCD_NOTSTORED;
1230 }
1231 else
1232 {
1233 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1234 }
1235 break;
1236
1237 case 'E': /* PROTOCOL ERROR or END */
1238 if (buffer[1] == 'N')
1239 {
1240 /* END */
1241 c->currcmd.retstat= MCD_END;
1242 }
1243 else if (buffer[1] == 'R')
1244 {
1245 printf("<%d ERROR\n", c->sfd);
1246 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
1247 }
1248 else if (buffer[1] == 'X')
1249 {
1250 c->currcmd.retstat= MCD_DATA_EXISTS;
1251 printf("<%d %s\n", c->sfd, buffer);
1252 }
1253 else
1254 {
1255 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1256 }
1257 break;
1258
1259 case 'C': /* CLIENT ERROR */
1260 printf("<%d %s\n", c->sfd, buffer);
1261 c->currcmd.retstat= MCD_CLIENT_ERROR;
1262 break;
1263
1264 default:
1265 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
1266 break;
1267 } /* switch */
1268
1269 return ret;
1270 } /* ms_ascii_process_line */
1271
1272
1273 /**
1274 * after one operation completes, reset the concurrency
1275 *
1276 * @param c, pointer of the concurrency
1277 * @param timeout, whether it's timeout
1278 */
1279 void ms_reset_conn(ms_conn_t *c, bool timeout)
1280 {
1281 assert(c != NULL);
1282
1283 if (c->udp)
1284 {
1285 if ((c->packets > 0) && (c->packets < MAX_UDP_PACKET))
1286 {
1287 memset(c->udppkt, 0, sizeof(ms_udppkt_t) * (size_t)c->packets);
1288 }
1289
1290 c->packets= 0;
1291 c->recvpkt= 0;
1292 c->pktcurr= 0;
1293 c->ordcurr= 0;
1294 c->rudpbytes= 0;
1295 }
1296 c->currcmd.isfinish= true;
1297 c->ctnwrite= false;
1298 c->rbytes= 0;
1299 c->rcurr= c->rbuf;
1300 ms_conn_set_state(c, conn_write);
1301 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
1302
1303 if (timeout)
1304 {
1305 ms_drive_machine(c);
1306 }
1307 } /* ms_reset_conn */
1308
1309
1310 /**
1311 * if we have a complete line in the buffer, process it.
1312 *
1313 * @param c, pointer of the concurrency
1314 *
1315 * @return int, if success, return 0, else return -1
1316 */
1317 static int ms_try_read_line(ms_conn_t *c)
1318 {
1319 if (c->protocol == binary_prot)
1320 {
1321 /* Do we have the complete packet header? */
1322 if ((uint64_t)c->rbytes < sizeof(c->binary_header))
1323 {
1324 /* need more data! */
1325 return 0;
1326 }
1327 else
1328 {
1329 #ifdef NEED_ALIGN
1330 if (((long)(c->rcurr)) % 8 != 0)
1331 {
1332 /* must realign input buffer */
1333 memmove(c->rbuf, c->rcurr, c->rbytes);
1334 c->rcurr= c->rbuf;
1335 if (settings.verbose)
1336 {
1337 fprintf(stderr, "%d: Realign input buffer.\n", c->sfd);
1338 }
1339 }
1340 #endif
1341 protocol_binary_response_header *rsp;
1342 rsp= (protocol_binary_response_header *)c->rcurr;
1343
1344 c->binary_header= *rsp;
1345 c->binary_header.response.extlen= rsp->response.extlen;
1346 c->binary_header.response.keylen= ntohs(rsp->response.keylen);
1347 c->binary_header.response.bodylen= ntohl(rsp->response.bodylen);
1348 c->binary_header.response.status= ntohs(rsp->response.status);
1349
1350 if (c->binary_header.response.magic != PROTOCOL_BINARY_RES)
1351 {
1352 fprintf(stderr, "Invalid magic: %x\n",
1353 c->binary_header.response.magic);
1354 ms_conn_set_state(c, conn_closing);
1355 return 0;
1356 }
1357
1358 /* process this complete response */
1359 if (ms_bin_process_response(c) == 0)
1360 {
1361 /* current operation completed */
1362 ms_reset_conn(c, false);
1363 return -1;
1364 }
1365 else
1366 {
1367 c->rbytes-= (int32_t)sizeof(c->binary_header);
1368 c->rcurr+= sizeof(c->binary_header);
1369 }
1370 }
1371 }
1372 else
1373 {
1374 char *el, *cont;
1375
1376 assert(c != NULL);
1377 assert(c->rcurr <= (c->rbuf + c->rsize));
1378
1379 if (c->rbytes == 0)
1380 return 0;
1381
1382 el= memchr(c->rcurr, '\n', (size_t)c->rbytes);
1383 if (! el)
1384 return 0;
1385
1386 cont= el + 1;
1387 if (((el - c->rcurr) > 1) && (*(el - 1) == '\r'))
1388 {
1389 el--;
1390 }
1391 *el= '\0';
1392
1393 assert(cont <= (c->rcurr + c->rbytes));
1394
1395 /* process this complete line */
1396 if (ms_ascii_process_line(c, c->rcurr) == 0)
1397 {
1398 /* current operation completed */
1399 ms_reset_conn(c, false);
1400 return -1;
1401 }
1402 else
1403 {
1404 /* current operation didn't complete */
1405 c->rbytes-= (int32_t)(cont - c->rcurr);
1406 c->rcurr= cont;
1407 }
1408
1409 assert(c->rcurr <= (c->rbuf + c->rsize));
1410 }
1411
1412 return -1;
1413 } /* ms_try_read_line */
1414
1415
1416 /**
1417 * because the packet of UDP can't ensure the order, the
1418 * function is used to sort the received udp packet.
1419 *
1420 * @param c, pointer of the concurrency
1421 * @param buf, the buffer to store the ordered packages data
1422 * @param rbytes, the maximum capacity of the buffer
1423 *
1424 * @return int, if success, return the copy bytes, else return
1425 * -1
1426 */
1427 static int ms_sort_udp_packet(ms_conn_t *c, char *buf, int rbytes)
1428 {
1429 int len= 0;
1430 int wbytes= 0;
1431 uint16_t req_id= 0;
1432 uint16_t seq_num= 0;
1433 uint16_t packets= 0;
1434 unsigned char *header= NULL;
1435
1436 /* no enough data */
1437 assert(c != NULL);
1438 assert(buf != NULL);
1439 assert(c->rudpbytes >= UDP_HEADER_SIZE);
1440
1441 /* calculate received packets count */
1442 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE >= UDP_HEADER_SIZE)
1443 {
1444 /* the last packet has some data */
1445 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE + 1;
1446 }
1447 else
1448 {
1449 c->recvpkt= c->rudpbytes / UDP_MAX_PAYLOAD_SIZE;
1450 }
1451
1452 /* get the total packets count if necessary */
1453 if (c->packets == 0)
1454 {
1455 c->packets= HEADER_TO_PACKETS((unsigned char *)c->rudpbuf);
1456 }
1457
1458 /* build the ordered packet array */
1459 for (int i= c->pktcurr; i < c->recvpkt; i++)
1460 {
1461 header= (unsigned char *)c->rudpbuf + i * UDP_MAX_PAYLOAD_SIZE;
1462 req_id= (uint16_t)HEADER_TO_REQID(header);
1463 assert(req_id == c->request_id % (1 << 16));
1464
1465 packets= (uint16_t)HEADER_TO_PACKETS(header);
1466 assert(c->packets == HEADER_TO_PACKETS(header));
1467
1468 seq_num= (uint16_t)HEADER_TO_SEQNUM(header);
1469 c->udppkt[seq_num].header= header;
1470 c->udppkt[seq_num].data= (char *)header + UDP_HEADER_SIZE;
1471
1472 if (i == c->recvpkt - 1)
1473 {
1474 /* last received packet */
1475 if (c->rudpbytes % UDP_MAX_PAYLOAD_SIZE == 0)
1476 {
1477 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1478 c->pktcurr++;
1479 }
1480 else
1481 {
1482 c->udppkt[seq_num].rbytes= c->rudpbytes % UDP_MAX_PAYLOAD_SIZE
1483 - UDP_HEADER_SIZE;
1484 }
1485 }
1486 else
1487 {
1488 c->udppkt[seq_num].rbytes= UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE;
1489 c->pktcurr++;
1490 }
1491 }
1492
1493 for (int i= c->ordcurr; i < c->recvpkt; i++)
1494 {
1495 /* there is some data to copy */
1496 if ((c->udppkt[i].data != NULL)
1497 && (c->udppkt[i].copybytes < c->udppkt[i].rbytes))
1498 {
1499 header= c->udppkt[i].header;
1500 len= c->udppkt[i].rbytes - c->udppkt[i].copybytes;
1501 if (len > rbytes - wbytes)
1502 {
1503 len= rbytes - wbytes;
1504 }
1505
1506 assert(len <= rbytes - wbytes);
1507 assert(i == HEADER_TO_SEQNUM(header));
1508
1509 memcpy(buf + wbytes, c->udppkt[i].data + c->udppkt[i].copybytes,
1510 (size_t)len);
1511 wbytes+= len;
1512 c->udppkt[i].copybytes+= len;
1513
1514 if ((c->udppkt[i].copybytes == c->udppkt[i].rbytes)
1515 && (c->udppkt[i].rbytes == UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1516 {
1517 /* finish copying all the data of this packet, next */
1518 c->ordcurr++;
1519 }
1520
1521 /* last received packet, and finish copying all the data */
1522 if ((c->recvpkt == c->packets) && (i == c->recvpkt - 1)
1523 && (c->udppkt[i].copybytes == c->udppkt[i].rbytes))
1524 {
1525 break;
1526 }
1527
1528 /* no space to copy data */
1529 if (wbytes >= rbytes)
1530 {
1531 break;
1532 }
1533
1534 /* it doesn't finish reading all the data of the packet from network */
1535 if ((i != c->recvpkt - 1)
1536 && (c->udppkt[i].rbytes < UDP_MAX_PAYLOAD_SIZE - UDP_HEADER_SIZE))
1537 {
1538 break;
1539 }
1540 }
1541 else
1542 {
1543 /* no data to copy */
1544 break;
1545 }
1546 }
1547
1548 return wbytes == 0 ? -1 : wbytes;
1549 } /* ms_sort_udp_packet */
1550
1551
1552 /**
1553 * encapsulate upd read like tcp read
1554 *
1555 * @param c, pointer of the concurrency
1556 * @param buf, read buffer
1557 * @param len, length to read
1558 *
1559 * @return int, if success, return the read bytes, else return
1560 * -1
1561 */
1562 static int ms_udp_read(ms_conn_t *c, char *buf, int len)
1563 {
1564 int res= 0;
1565 int avail= 0;
1566 int rbytes= 0;
1567 int copybytes= 0;
1568
1569 assert(c->udp);
1570
1571 while (1)
1572 {
1573 if (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE > c->rudpsize)
1574 {
1575 char *new_rbuf= realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
1576 if (! new_rbuf)
1577 {
1578 fprintf(stderr, "Couldn't realloc input buffer.\n");
1579 c->rudpbytes= 0; /* ignore what we read */
1580 return -1;
1581 }
1582 c->rudpbuf= new_rbuf;
1583 c->rudpsize*= 2;
1584 }
1585
1586 avail= c->rudpsize - c->rudpbytes;
1587 /* UDP each time read a packet, 1400 bytes */
1588 res= (int)read(c->sfd, c->rudpbuf + c->rudpbytes, (size_t)avail);
1589
1590 if (res > 0)
1591 {
1592 atomic_add_size(&ms_stats.bytes_read, res);
1593 c->rudpbytes+= res;
1594 rbytes+= res;
1595 if (res == avail)
1596 {
1597 continue;
1598 }
1599 else
1600 {
1601 break;
1602 }
1603 }
1604
1605 if (res == 0)
1606 {
1607 /* "connection" closed */
1608 return res;
1609 }
1610
1611 if (res == -1)
1612 {
1613 /* no data to read */
1614 return res;
1615 }
1616 }
1617
1618 /* copy data to read buffer */
1619 if (rbytes > 0)
1620 {
1621 copybytes= ms_sort_udp_packet(c, buf, len);
1622 }
1623
1624 if (copybytes == -1)
1625 {
1626 atomic_add_size(&ms_stats.pkt_disorder, 1);
1627 }
1628
1629 return copybytes;
1630 } /* ms_udp_read */
1631
1632
1633 /*
1634 * read from network as much as we can, handle buffer overflow and connection
1635 * close.
1636 * before reading, move the remaining incomplete fragment of a command
1637 * (if any) to the beginning of the buffer.
1638 * return 0 if there's nothing to read on the first read.
1639 */
1640
1641 /**
1642 * read from network as much as we can, handle buffer overflow and connection
1643 * close. before reading, move the remaining incomplete fragment of a command
1644 * (if any) to the beginning of the buffer.
1645 *
1646 * @param c, pointer of the concurrency
1647 *
1648 * @return int,
1649 * return 0 if there's nothing to read on the first read.
1650 * return 1 if get data
1651 * return -1 if error happens
1652 */
1653 static int ms_try_read_network(ms_conn_t *c)
1654 {
1655 int gotdata= 0;
1656 int res;
1657 int64_t avail;
1658
1659 assert(c != NULL);
1660
1661 if ((c->rcurr != c->rbuf)
1662 && (! c->readval || (c->rvbytes > c->rsize - (c->rcurr - c->rbuf))
1663 || (c->readval && (c->rcurr - c->rbuf > c->rbytes))))
1664 {
1665 if (c->rbytes != 0) /* otherwise there's nothing to copy */
1666 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
1667 c->rcurr= c->rbuf;
1668 }
1669
1670 while (1)
1671 {
1672 if (c->rbytes >= c->rsize)
1673 {
1674 char *new_rbuf= realloc(c->rbuf, (size_t)c->rsize * 2);
1675 if (! new_rbuf)
1676 {
1677 fprintf(stderr, "Couldn't realloc input buffer.\n");
1678 c->rbytes= 0; /* ignore what we read */
1679 return -1;
1680 }
1681 c->rcurr= c->rbuf= new_rbuf;
1682 c->rsize*= 2;
1683 }
1684
1685 avail= c->rsize - c->rbytes - (c->rcurr - c->rbuf);
1686 if (avail == 0)
1687 {
1688 break;
1689 }
1690
1691 if (c->udp)
1692 {
1693 res= (int32_t)ms_udp_read(c, c->rcurr + c->rbytes, (int32_t)avail);
1694 }
1695 else
1696 {
1697 res= (int)read(c->sfd, c->rcurr + c->rbytes, (size_t)avail);
1698 }
1699
1700 if (res > 0)
1701 {
1702 if (! c->udp)
1703 {
1704 atomic_add_size(&ms_stats.bytes_read, res);
1705 }
1706 gotdata= 1;
1707 c->rbytes+= res;
1708 if (res == avail)
1709 {
1710 continue;
1711 }
1712 else
1713 {
1714 break;
1715 }
1716 }
1717 if (res == 0)
1718 {
1719 /* connection closed */
1720 ms_conn_set_state(c, conn_closing);
1721 return -1;
1722 }
1723 if (res == -1)
1724 {
1725 if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
1726 break;
1727 /* Should close on unhandled errors. */
1728 ms_conn_set_state(c, conn_closing);
1729 return -1;
1730 }
1731 }
1732
1733 return gotdata;
1734 } /* ms_try_read_network */
1735
1736
1737 /**
1738 * after get the object from server, verify the value if
1739 * necessary.
1740 *
1741 * @param c, pointer of the concurrency
1742 * @param mlget_item, pointer of mulit-get task item structure
1743 * @param value, received value string
1744 * @param vlen, received value string length
1745 */
1746 static void ms_verify_value(ms_conn_t *c,
1747 ms_mlget_task_item_t *mlget_item,
1748 char *value,
1749 int vlen)
1750 {
1751 if (c->curr_task.verify)
1752 {
1753 assert(c->curr_task.item->value_offset != INVALID_OFFSET);
1754 char *orignval= &ms_setting.char_block[c->curr_task.item->value_offset];
1755 char *orignkey=
1756 &ms_setting.char_block[c->curr_task.item->key_suffix_offset];
1757
1758 /* verify expire time if necessary */
1759 if (c->curr_task.item->exp_time > 0)
1760 {
1761 struct timeval curr_time;
1762 gettimeofday(&curr_time, NULL);
1763
1764 /* object expired but get it now */
1765 if (curr_time.tv_sec - c->curr_task.item->client_time
1766 > c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
1767 {
1768 atomic_add_size(&ms_stats.exp_get, 1);
1769
1770 if (ms_setting.verbose)
1771 {
1772 char set_time[64];
1773 char cur_time[64];
1774 strftime(set_time, 64, "%Y-%m-%d %H:%M:%S",
1775 localtime(&c->curr_task.item->client_time));
1776 strftime(cur_time, 64, "%Y-%m-%d %H:%M:%S",
1777 localtime(&curr_time.tv_sec));
1778 fprintf(stderr,
1779 "\n<%d expire time verification failed, "
1780 "object expired but get it now\n"
1781 "\tkey len: %d\n"
1782 "\tkey: %" PRIx64 " %.*s\n"
1783 "\tset time: %s current time: %s "
1784 "diff time: %d expire time: %d\n"
1785 "\texpected data: \n"
1786 "\treceived data len: %d\n"
1787 "\treceived data: %.*s\n",
1788 c->sfd,
1789 c->curr_task.item->key_size,
1790 c->curr_task.item->key_prefix,
1791 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1792 orignkey,
1793 set_time,
1794 cur_time,
1795 (int)(curr_time.tv_sec - c->curr_task.item->client_time),
1796 c->curr_task.item->exp_time,
1797 vlen,
1798 vlen,
1799 value);
1800 fflush(stderr);
1801 }
1802 }
1803 }
1804 else
1805 {
1806 if ((c->curr_task.item->value_size != vlen)
1807 || (memcmp(orignval, value, (size_t)vlen) != 0))
1808 {
1809 atomic_add_size(&ms_stats.vef_failed, 1);
1810
1811 if (ms_setting.verbose)
1812 {
1813 fprintf(stderr,
1814 "\n<%d data verification failed\n"
1815 "\tkey len: %d\n"
1816 "\tkey: %" PRIx64" %.*s\n"
1817 "\texpected data len: %d\n"
1818 "\texpected data: %.*s\n"
1819 "\treceived data len: %d\n"
1820 "\treceived data: %.*s\n",
1821 c->sfd,
1822 c->curr_task.item->key_size,
1823 c->curr_task.item->key_prefix,
1824 c->curr_task.item->key_size - (int)KEY_PREFIX_SIZE,
1825 orignkey,
1826 c->curr_task.item->value_size,
1827 c->curr_task.item->value_size,
1828 orignval,
1829 vlen,
1830 vlen,
1831 value);
1832 fflush(stderr);
1833 }
1834 }
1835 }
1836
1837 c->curr_task.finish_verify= true;
1838
1839 if (mlget_item != NULL)
1840 {
1841 mlget_item->finish_verify= true;
1842 }
1843 }
1844 } /* ms_verify_value */
1845
1846
1847 /**
1848 * For ASCII protocol, after store the data into the local
1849 * buffer, run this function to handle the data.
1850 *
1851 * @param c, pointer of the concurrency
1852 */
1853 static void ms_ascii_complete_nread(ms_conn_t *c)
1854 {
1855 assert(c != NULL);
1856 assert(c->rbytes >= c->rvbytes);
1857 assert(c->protocol == ascii_udp_prot || c->protocol == ascii_prot);
1858 if (c->rvbytes > 2)
1859 {
1860 assert(
1861 c->rcurr[c->rvbytes - 1] == '\n' && c->rcurr[c->rvbytes - 2] == '\r');
1862 }
1863
1864 /* multi-get */
1865 ms_mlget_task_item_t *mlget_item= NULL;
1866 if (((ms_setting.mult_key_num > 1)
1867 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1868 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1869 {
1870 c->mlget_task.value_index++;
1871 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1872
1873 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1874 {
1875 c->curr_task.item= mlget_item->item;
1876 c->curr_task.verify= mlget_item->verify;
1877 c->curr_task.finish_verify= mlget_item->finish_verify;
1878 mlget_item->get_miss= false;
1879 }
1880 else
1881 {
1882 /* Try to find the task item in multi-get task array */
1883 for (int i= 0; i < c->mlget_task.mlget_num; i++)
1884 {
1885 mlget_item= &c->mlget_task.mlget_item[i];
1886 if (mlget_item->item->key_prefix == c->currcmd.key_prefix)
1887 {
1888 c->curr_task.item= mlget_item->item;
1889 c->curr_task.verify= mlget_item->verify;
1890 c->curr_task.finish_verify= mlget_item->finish_verify;
1891 mlget_item->get_miss= false;
1892
1893 break;
1894 }
1895 }
1896 }
1897 }
1898
1899 ms_verify_value(c, mlget_item, c->rcurr, c->rvbytes - 2);
1900
1901 c->curr_task.get_miss= false;
1902 c->rbytes-= c->rvbytes;
1903 c->rcurr= c->rcurr + c->rvbytes;
1904 assert(c->rcurr <= (c->rbuf + c->rsize));
1905 c->readval= false;
1906 c->rvbytes= 0;
1907 } /* ms_ascii_complete_nread */
1908
1909
1910 /**
1911 * For binary protocol, after store the data into the local
1912 * buffer, run this function to handle the data.
1913 *
1914 * @param c, pointer of the concurrency
1915 */
1916 static void ms_bin_complete_nread(ms_conn_t *c)
1917 {
1918 assert(c != NULL);
1919 assert(c->rbytes >= c->rvbytes);
1920 assert(c->protocol == binary_prot);
1921
1922 int extlen= c->binary_header.response.extlen;
1923 int keylen= c->binary_header.response.keylen;
1924 uint8_t opcode= c->binary_header.response.opcode;
1925
1926 /* not get command or not include value, just return */
1927 if (((opcode != PROTOCOL_BINARY_CMD_GET)
1928 && (opcode != PROTOCOL_BINARY_CMD_GETQ))
1929 || (c->rvbytes <= extlen + keylen))
1930 {
1931 /* get miss */
1932 if (c->binary_header.response.opcode == PROTOCOL_BINARY_CMD_GET)
1933 {
1934 c->currcmd.retstat= MCD_END;
1935 c->curr_task.get_miss= true;
1936 }
1937
1938 c->readval= false;
1939 c->rvbytes= 0;
1940 ms_reset_conn(c, false);
1941 return;
1942 }
1943
1944 /* multi-get */
1945 ms_mlget_task_item_t *mlget_item= NULL;
1946 if (((ms_setting.mult_key_num > 1)
1947 && (c->mlget_task.mlget_num >= ms_setting.mult_key_num))
1948 || ((c->remain_exec_num == 0) && (c->mlget_task.mlget_num > 0)))
1949 {
1950 c->mlget_task.value_index++;
1951 mlget_item= &c->mlget_task.mlget_item[c->mlget_task.value_index];
1952
1953 c->curr_task.item= mlget_item->item;
1954 c->curr_task.verify= mlget_item->verify;
1955 c->curr_task.finish_verify= mlget_item->finish_verify;
1956 mlget_item->get_miss= false;
1957 }
1958
1959 ms_verify_value(c,
1960 mlget_item,
1961 c->rcurr + extlen + keylen,
1962 c->rvbytes - extlen - keylen);
1963
1964 c->currcmd.retstat= MCD_END;
1965 c->curr_task.get_miss= false;
1966 c->rbytes-= c->rvbytes;
1967 c->rcurr= c->rcurr + c->rvbytes;
1968 assert(c->rcurr <= (c->rbuf + c->rsize));
1969 c->readval= false;
1970 c->rvbytes= 0;
1971
1972 if (ms_setting.mult_key_num > 1)
1973 {
1974 /* multi-get have check all the item */
1975 if (c->mlget_task.value_index == c->mlget_task.mlget_num - 1)
1976 {
1977 ms_reset_conn(c, false);
1978 }
1979 }
1980 else
1981 {
1982 /* single get */
1983 ms_reset_conn(c, false);
1984 }
1985 } /* ms_bin_complete_nread */
1986
1987
1988 /**
1989 * we get here after reading the value of get commands.
1990 *
1991 * @param c, pointer of the concurrency
1992 */
1993 static void ms_complete_nread(ms_conn_t *c)
1994 {
1995 assert(c != NULL);
1996 assert(c->rbytes >= c->rvbytes);
1997 assert(c->protocol == ascii_udp_prot
1998 || c->protocol == ascii_prot
1999 || c->protocol == binary_prot);
2000
2001 if (c->protocol == binary_prot)
2002 {
2003 ms_bin_complete_nread(c);
2004 }
2005 else
2006 {
2007 ms_ascii_complete_nread(c);
2008 }
2009 } /* ms_complete_nread */
2010
2011
2012 /**
2013 * Adds a message header to a connection.
2014 *
2015 * @param c, pointer of the concurrency
2016 *
2017 * @return int, if success, return 0, else return -1
2018 */
2019 static int ms_add_msghdr(ms_conn_t *c)
2020 {
2021 struct msghdr *msg;
2022
2023 assert(c != NULL);
2024
2025 if (c->msgsize == c->msgused)
2026 {
2027 msg=
2028 realloc(c->msglist, (size_t)c->msgsize * 2 * sizeof(struct msghdr));
2029 if (! msg)
2030 return -1;
2031
2032 c->msglist= msg;
2033 c->msgsize*= 2;
2034 }
2035
2036 msg= c->msglist + c->msgused;
2037
2038 /**
2039 * this wipes msg_iovlen, msg_control, msg_controllen, and
2040 * msg_flags, the last 3 of which aren't defined on solaris:
2041 */
2042 memset(msg, 0, sizeof(struct msghdr));
2043
2044 msg->msg_iov= &c->iov[c->iovused];
2045
2046 if (c->udp && (c->srv_recv_addr_size > 0))
2047 {
2048 msg->msg_name= &c->srv_recv_addr;
2049 msg->msg_namelen= c->srv_recv_addr_size;
2050 }
2051
2052 c->msgbytes= 0;
2053 c->msgused++;
2054
2055 if (c->udp)
2056 {
2057 /* Leave room for the UDP header, which we'll fill in later. */
2058 return ms_add_iov(c, NULL, UDP_HEADER_SIZE);
2059 }
2060
2061 return 0;
2062 } /* ms_add_msghdr */
2063
2064
2065 /**
2066 * Ensures that there is room for another structure iovec in a connection's
2067 * iov list.
2068 *
2069 * @param c, pointer of the concurrency
2070 *
2071 * @return int, if success, return 0, else return -1
2072 */
2073 static int ms_ensure_iov_space(ms_conn_t *c)
2074 {
2075 assert(c != NULL);
2076
2077 if (c->iovused >= c->iovsize)
2078 {
2079 int i, iovnum;
2080 struct iovec *new_iov= (struct iovec *)realloc(c->iov,
2081 ((size_t)c->iovsize
2082 * 2)
2083 * sizeof(struct iovec));
2084 if (! new_iov)
2085 return -1;
2086
2087 c->iov= new_iov;
2088 c->iovsize*= 2;
2089
2090 /* Point all the msghdr structures at the new list. */
2091 for (i= 0, iovnum= 0; i < c->msgused; i++)
2092 {
2093 c->msglist[i].msg_iov= &c->iov[iovnum];
2094 iovnum+= (int)c->msglist[i].msg_iovlen;
2095 }
2096 }
2097
2098 return 0;
2099 } /* ms_ensure_iov_space */
2100
2101
2102 /**
2103 * Adds data to the list of pending data that will be written out to a
2104 * connection.
2105 *
2106 * @param c, pointer of the concurrency
2107 * @param buf, the buffer includes data to send
2108 * @param len, the data length in the buffer
2109 *
2110 * @return int, if success, return 0, else return -1
2111 */
2112 static int ms_add_iov(ms_conn_t *c, const void *buf, int len)
2113 {
2114 struct msghdr *m;
2115 int leftover;
2116 bool limit_to_mtu;
2117
2118 assert(c != NULL);
2119
2120 do
2121 {
2122 m= &c->msglist[c->msgused - 1];
2123
2124 /*
2125 * Limit UDP packets, to UDP_MAX_PAYLOAD_SIZE bytes.
2126 */
2127 limit_to_mtu= c->udp;
2128
2129 /* We may need to start a new msghdr if this one is full. */
2130 if ((m->msg_iovlen == IOV_MAX)
2131 || (limit_to_mtu && (c->msgbytes >= UDP_MAX_SEND_PAYLOAD_SIZE)))
2132 {
2133 ms_add_msghdr(c);
2134 m= &c->msglist[c->msgused - 1];
2135 }
2136
2137 if (ms_ensure_iov_space(c) != 0)
2138 return -1;
2139
2140 /* If the fragment is too big to fit in the datagram, split it up */
2141 if (limit_to_mtu && (len + c->msgbytes > UDP_MAX_SEND_PAYLOAD_SIZE))
2142 {
2143 leftover= len + c->msgbytes - UDP_MAX_SEND_PAYLOAD_SIZE;
2144 len-= leftover;
2145 }
2146 else
2147 {
2148 leftover= 0;
2149 }
2150
2151 m= &c->msglist[c->msgused - 1];
2152 m->msg_iov[m->msg_iovlen].iov_base= (void *)buf;
2153 m->msg_iov[m->msg_iovlen].iov_len= (size_t)len;
2154
2155 c->msgbytes+= len;
2156 c->iovused++;
2157 m->msg_iovlen++;
2158
2159 buf= ((char *)buf) + len;
2160 len= leftover;
2161 }
2162 while (leftover > 0);
2163
2164 return 0;
2165 } /* ms_add_iov */
2166
2167
2168 /**
2169 * Constructs a set of UDP headers and attaches them to the outgoing messages.
2170 *
2171 * @param c, pointer of the concurrency
2172 *
2173 * @return int, if success, return 0, else return -1
2174 */
2175 static int ms_build_udp_headers(ms_conn_t *c)
2176 {
2177 int i;
2178 unsigned char *hdr;
2179
2180 assert(c != NULL);
2181
2182 c->request_id= ms_get_udp_request_id();
2183
2184 if (c->msgused > c->hdrsize)
2185 {
2186 void *new_hdrbuf;
2187 if (c->hdrbuf)
2188 new_hdrbuf= realloc(c->hdrbuf,
2189 (size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2190 else
2191 new_hdrbuf= malloc((size_t)c->msgused * 2 * UDP_HEADER_SIZE);
2192 if (! new_hdrbuf)
2193 return -1;
2194
2195 c->hdrbuf= (unsigned char *)new_hdrbuf;
2196 c->hdrsize= c->msgused * 2;
2197 }
2198
2199 /* If this is a multi-packet request, drop it. */
2200 if (c->udp && (c->msgused > 1))
2201 {
2202 fprintf(stderr, "multi-packet request for UDP not supported.\n");
2203 return -1;
2204 }
2205
2206 hdr= c->hdrbuf;
2207 for (i= 0; i < c->msgused; i++)
2208 {
2209 c->msglist[i].msg_iov[0].iov_base= (void *)hdr;
2210 c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
2211 *hdr++= (unsigned char)(c->request_id / 256);
2212 *hdr++= (unsigned char)(c->request_id % 256);
2213 *hdr++= (unsigned char)(i / 256);
2214 *hdr++= (unsigned char)(i % 256);
2215 *hdr++= (unsigned char)(c->msgused / 256);
2216 *hdr++= (unsigned char)(c->msgused % 256);
2217 *hdr++= (unsigned char)1; /* support facebook memcached */
2218 *hdr++= (unsigned char)0;
2219 assert(hdr ==
2220 ((unsigned char *)c->msglist[i].msg_iov[0].iov_base
2221 + UDP_HEADER_SIZE));
2222 }
2223
2224 return 0;
2225 } /* ms_build_udp_headers */
2226
2227
2228 /**
2229 * Transmit the next chunk of data from our list of msgbuf structures.
2230 *
2231 * @param c, pointer of the concurrency
2232 *
2233 * @return TRANSMIT_COMPLETE All done writing.
2234 * TRANSMIT_INCOMPLETE More data remaining to write.
2235 * TRANSMIT_SOFT_ERROR Can't write any more right now.
2236 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
2237 */
2238 static int ms_transmit(ms_conn_t *c)
2239 {
2240 assert(c != NULL);
2241
2242 if ((c->msgcurr < c->msgused)
2243 && (c->msglist[c->msgcurr].msg_iovlen == 0))
2244 {
2245 /* Finished writing the current msg; advance to the next. */
2246 c->msgcurr++;
2247 }
2248
2249 if (c->msgcurr < c->msgused)
2250 {
2251 ssize_t res;
2252 struct msghdr *m= &c->msglist[c->msgcurr];
2253
2254 res= sendmsg(c->sfd, m, 0);
2255 if (res > 0)
2256 {
2257 atomic_add_size(&ms_stats.bytes_written, res);
2258
2259 /* We've written some of the data. Remove the completed
2260 * iovec entries from the list of pending writes. */
2261 while (m->msg_iovlen > 0 && res >= (ssize_t)m->msg_iov->iov_len)
2262 {
2263 res-= (ssize_t)m->msg_iov->iov_len;
2264 m->msg_iovlen--;
2265 m->msg_iov++;
2266 }
2267
2268 /* Might have written just part of the last iovec entry;
2269 * adjust it so the next write will do the rest. */
2270 if (res > 0)
2271 {
2272 m->msg_iov->iov_base= (void *)((unsigned char *)m->msg_iov->iov_base + res);
2273 m->msg_iov->iov_len-= (size_t)res;
2274 }
2275 return TRANSMIT_INCOMPLETE;
2276 }
2277 if ((res == -1) && ((errno == EAGAIN) || (errno == EWOULDBLOCK)))
2278 {
2279 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2280 {
2281 fprintf(stderr, "Couldn't update event.\n");
2282 ms_conn_set_state(c, conn_closing);
2283 return TRANSMIT_HARD_ERROR;
2284 }
2285 return TRANSMIT_SOFT_ERROR;
2286 }
2287
2288 /* if res==0 or res==-1 and error is not EAGAIN or EWOULDBLOCK,
2289 * we have a real error, on which we close the connection */
2290 fprintf(stderr, "Failed to write, and not due to blocking.\n");
2291
2292 ms_conn_set_state(c, conn_closing);
2293 return TRANSMIT_HARD_ERROR;
2294 }
2295 else
2296 {
2297 return TRANSMIT_COMPLETE;
2298 }
2299 } /* ms_transmit */
2300
2301
2302 /**
2303 * Shrinks a connection's buffers if they're too big. This prevents
2304 * periodic large "mget" response from server chewing lots of client
2305 * memory.
2306 *
2307 * This should only be called in between requests since it can wipe output
2308 * buffers!
2309 *
2310 * @param c, pointer of the concurrency
2311 */
2312 static void ms_conn_shrink(ms_conn_t *c)
2313 {
2314 assert(c != NULL);
2315
2316 if (c->udp)
2317 return;
2318
2319 if ((c->rsize > READ_BUFFER_HIGHWAT) && (c->rbytes < DATA_BUFFER_SIZE))
2320 {
2321 char *newbuf;
2322
2323 if (c->rcurr != c->rbuf)
2324 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
2325
2326 newbuf= (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
2327
2328 if (newbuf)
2329 {
2330 c->rbuf= newbuf;
2331 c->rsize= DATA_BUFFER_SIZE;
2332 }
2333 c->rcurr= c->rbuf;
2334 }
2335
2336 if (c->udp && (c->rudpsize > UDP_DATA_BUFFER_HIGHWAT)
2337 && (c->rudpbytes + UDP_MAX_PAYLOAD_SIZE < UDP_DATA_BUFFER_SIZE))
2338 {
2339 char *new_rbuf= (char *)realloc(c->rudpbuf, (size_t)c->rudpsize * 2);
2340 if (! new_rbuf)
2341 {
2342 c->rudpbuf= new_rbuf;
2343 c->rudpsize= UDP_DATA_BUFFER_SIZE;
2344 }
2345 /* TODO check error condition? */
2346 }
2347
2348 if (c->msgsize > MSG_LIST_HIGHWAT)
2349 {
2350 struct msghdr *newbuf= (struct msghdr *)realloc(
2351 (void *)c->msglist,
2352 MSG_LIST_INITIAL
2353 * sizeof(c->msglist[0]));
2354 if (newbuf)
2355 {
2356 c->msglist= newbuf;
2357 c->msgsize= MSG_LIST_INITIAL;
2358 }
2359 /* TODO check error condition? */
2360 }
2361
2362 if (c->iovsize > IOV_LIST_HIGHWAT)
2363 {
2364 struct iovec *newbuf= (struct iovec *)realloc((void *)c->iov,
2365 IOV_LIST_INITIAL
2366 * sizeof(c->iov[0]));
2367 if (newbuf)
2368 {
2369 c->iov= newbuf;
2370 c->iovsize= IOV_LIST_INITIAL;
2371 }
2372 /* TODO check return value */
2373 }
2374 } /* ms_conn_shrink */
2375
2376
2377 /**
2378 * Sets a connection's current state in the state machine. Any special
2379 * processing that needs to happen on certain state transitions can
2380 * happen here.
2381 *
2382 * @param c, pointer of the concurrency
2383 * @param state, connection state
2384 */
2385 static void ms_conn_set_state(ms_conn_t *c, int state)
2386 {
2387 assert(c != NULL);
2388
2389 if (state != c->state)
2390 {
2391 if (state == conn_read)
2392 {
2393 ms_conn_shrink(c);
2394 }
2395 c->state= state;
2396 }
2397 } /* ms_conn_set_state */
2398
2399
2400 /**
2401 * update the event if socks change state. for example: when
2402 * change the listen scoket read event to sock write event, or
2403 * change socket handler, we could call this function.
2404 *
2405 * @param c, pointer of the concurrency
2406 * @param new_flags, new event flags
2407 *
2408 * @return bool, if success, return true, else return false
2409 */
2410 static bool ms_update_event(ms_conn_t *c, const int new_flags)
2411 {
2412 /* default event timeout 10 seconds */
2413 struct timeval t=
2414 {
2415 .tv_sec= EVENT_TIMEOUT, .tv_usec= 0
2416 };
2417
2418 assert(c != NULL);
2419
2420 struct event_base *base= c->event.ev_base;
2421 if ((c->ev_flags == new_flags) && (ms_setting.rep_write_srv == 0)
2422 && (! ms_setting.facebook_test || (c->total_sfds == 1)))
2423 {
2424 return true;
2425 }
2426
2427 if (event_del(&c->event) == -1)
2428 {
2429 /* try to delete the event again */
2430 if (event_del(&c->event) == -1)
2431 {
2432 return false;
2433 }
2434 }
2435
2436 event_set(&c->event,
2437 c->sfd,
2438 (short)new_flags,
2439 ms_event_handler,
2440 (void *)c);
2441 event_base_set(base, &c->event);
2442 c->ev_flags= (short)new_flags;
2443
2444 if (c->total_sfds == 1)
2445 {
2446 if (event_add(&c->event, NULL) == -1)
2447 {
2448 return false;
2449 }
2450 }
2451 else
2452 {
2453 if (event_add(&c->event, &t) == -1)
2454 {
2455 return false;
2456 }
2457 }
2458
2459 return true;
2460 } /* ms_update_event */
2461
2462
2463 /**
2464 * If user want to get the expected throughput, we could limit
2465 * the performance of memslap. we could give up some work and
2466 * just wait a short time. The function is used to check this
2467 * case.
2468 *
2469 * @param c, pointer of the concurrency
2470 *
2471 * @return bool, if success, return true, else return false
2472 */
2473 static bool ms_need_yield(ms_conn_t *c)
2474 {
2475 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
2476 int64_t tps= 0;
2477 int64_t time_diff= 0;
2478 struct timeval curr_time;
2479 ms_task_t *task= &c->curr_task;
2480
2481 if (ms_setting.expected_tps > 0)
2482 {
2483 gettimeofday(&curr_time, NULL);
2484 time_diff= ms_time_diff(&ms_thread->startup_time, &curr_time);
2485 tps=
2486 (int64_t)((task->get_opt
2487 + task->set_opt) / ((uint64_t)time_diff / 1000000));
2488
2489 /* current throughput is greater than expected throughput */
2490 if (tps > ms_thread->thread_ctx->tps_perconn)
2491 {
2492 return true;
2493 }
2494 }
2495
2496 return false;
2497 } /* ms_need_yield */
2498
2499
2500 /**
2501 * used to update the start time of each operation
2502 *
2503 * @param c, pointer of the concurrency
2504 */
2505 static void ms_update_start_time(ms_conn_t *c)
2506 {
2507 ms_task_item_t *item= c->curr_task.item;
2508
2509 if ((ms_setting.stat_freq > 0) || c->udp
2510 || ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0)))
2511 {
2512 gettimeofday(&c->start_time, NULL);
2513 if ((c->currcmd.cmd == CMD_SET) && (item->exp_time > 0))
2514 {
2515 /* record the current time */
2516 item->client_time= c->start_time.tv_sec;
2517 }
2518 }
2519 } /* ms_update_start_time */
2520
2521
2522 /**
2523 * run the state machine
2524 *
2525 * @param c, pointer of the concurrency
2526 */
2527 static void ms_drive_machine(ms_conn_t *c)
2528 {
2529 bool stop= false;
2530
2531 assert(c != NULL);
2532
2533 while (! stop)
2534 {
2535 switch (c->state)
2536 {
2537 case conn_read:
2538 if (c->readval)
2539 {
2540 if (c->rbytes >= c->rvbytes)
2541 {
2542 ms_complete_nread(c);
2543 break;
2544 }
2545 }
2546 else
2547 {
2548 if (ms_try_read_line(c) != 0)
2549 {
2550 break;
2551 }
2552 }
2553
2554 if (ms_try_read_network(c) != 0)
2555 {
2556 break;
2557 }
2558
2559 /* doesn't read all the response data, wait event wake up */
2560 if (! c->currcmd.isfinish)
2561 {
2562 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2563 {
2564 fprintf(stderr, "Couldn't update event.\n");
2565 ms_conn_set_state(c, conn_closing);
2566 break;
2567 }
2568 stop= true;
2569 break;
2570 }
2571
2572 /* we have no command line and no data to read from network, next write */
2573 ms_conn_set_state(c, conn_write);
2574 memcpy(&c->precmd, &c->currcmd, sizeof(ms_cmdstat_t)); /* replicate command state */
2575
2576 break;
2577
2578 case conn_write:
2579 if (! c->ctnwrite && ms_need_yield(c))
2580 {
2581 usleep(10);
2582
2583 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2584 {
2585 fprintf(stderr, "Couldn't update event.\n");
2586 ms_conn_set_state(c, conn_closing);
2587 break;
2588 }
2589 stop= true;
2590 break;
2591 }
2592
2593 if (! c->ctnwrite && (ms_exec_task(c) != 0))
2594 {
2595 ms_conn_set_state(c, conn_closing);
2596 break;
2597 }
2598
2599 /* record the start time before starting to send data if necessary */
2600 if (! c->ctnwrite || (c->change_sfd && c->ctnwrite))
2601 {
2602 if (c->change_sfd)
2603 {
2604 c->change_sfd= false;
2605 }
2606 ms_update_start_time(c);
2607 }
2608
2609 /* change sfd if necessary */
2610 if (c->change_sfd)
2611 {
2612 c->ctnwrite= true;
2613 stop= true;
2614 break;
2615 }
2616
2617 /* execute task until nothing need be written to network */
2618 if (! c->ctnwrite && (c->msgcurr == c->msgused))
2619 {
2620 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2621 {
2622 fprintf(stderr, "Couldn't update event.\n");
2623 ms_conn_set_state(c, conn_closing);
2624 break;
2625 }
2626 stop= true;
2627 break;
2628 }
2629
2630 switch (ms_transmit(c))
2631 {
2632 case TRANSMIT_COMPLETE:
2633 /* we have no data to write to network, next wait repose */
2634 if (! ms_update_event(c, EV_READ | EV_PERSIST))
2635 {
2636 fprintf(stderr, "Couldn't update event.\n");
2637 ms_conn_set_state(c, conn_closing);
2638 c->ctnwrite= false;
2639 break;
2640 }
2641 ms_conn_set_state(c, conn_read);
2642 c->ctnwrite= false;
2643 stop= true;
2644 break;
2645
2646 case TRANSMIT_INCOMPLETE:
2647 c->ctnwrite= true;
2648 break; /* Continue in state machine. */
2649
2650 case TRANSMIT_HARD_ERROR:
2651 c->ctnwrite= false;
2652 break;
2653
2654 case TRANSMIT_SOFT_ERROR:
2655 c->ctnwrite= true;
2656 stop= true;
2657 break;
2658
2659 default:
2660 break;
2661 } /* switch */
2662
2663 break;
2664
2665 case conn_closing:
2666 /* recovery mode, need reconnect if connection close */
2667 if (ms_setting.reconnect && (! ms_global.time_out
2668 || ((ms_setting.run_time == 0)
2669 && (c->remain_exec_num > 0))))
2670 {
2671 if (ms_reconn(c) != 0)
2672 {
2673 ms_conn_close(c);
2674 stop= true;
2675 break;
2676 }
2677
2678 ms_reset_conn(c, false);
2679
2680 if (c->total_sfds == 1)
2681 {
2682 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2683 {
2684 fprintf(stderr, "Couldn't update event.\n");
2685 ms_conn_set_state(c, conn_closing);
2686 break;
2687 }
2688 }
2689
2690 break;
2691 }
2692 else
2693 {
2694 ms_conn_close(c);
2695 stop= true;
2696 break;
2697 }
2698
2699 default:
2700 assert(0);
2701 } /* switch */
2702 }
2703 } /* ms_drive_machine */
2704
2705
2706 /**
2707 * the event handler of each thread
2708 *
2709 * @param fd, the file descriptor of socket
2710 * @param which, event flag
2711 * @param arg, argument
2712 */
2713 void ms_event_handler(const int fd, const short which, void *arg)
2714 {
2715 ms_conn_t *c= (ms_conn_t *)arg;
2716
2717 assert(c != NULL);
2718
2719 c->which= which;
2720
2721 /* sanity */
2722 if (fd != c->sfd)
2723 {
2724 fprintf(stderr,
2725 "Catastrophic: event fd: %d doesn't match conn fd: %d\n",
2726 fd,
2727 c->sfd);
2728 ms_conn_close(c);
2729 exit(1);
2730 }
2731 assert(fd == c->sfd);
2732
2733 /* event timeout, close the current connection */
2734 if (c->which == EV_TIMEOUT)
2735 {
2736 ms_conn_set_state(c, conn_closing);
2737 }
2738
2739 ms_drive_machine(c);
2740
2741 /* wait for next event */
2742 } /* ms_event_handler */
2743
2744
2745 /**
2746 * get the next socket descriptor index to run for replication
2747 *
2748 * @param c, pointer of the concurrency
2749 * @param cmd, command(get or set )
2750 *
2751 * @return int, if success, return the index, else return 0
2752 */
2753 static int ms_get_rep_sock_index(ms_conn_t *c, int cmd)
2754 {
2755 int sock_index= -1;
2756 int i= 0;
2757
2758 if (c->total_sfds == 1)
2759 {
2760 return 0;
2761 }
2762
2763 if (ms_setting.rep_write_srv == 0)
2764 {
2765 return sock_index;
2766 }
2767
2768 do
2769 {
2770 if (cmd == CMD_SET)
2771 {
2772 for (i= 0; i < ms_setting.rep_write_srv; i++)
2773 {
2774 if (c->tcpsfd[i] > 0)
2775 {
2776 break;
2777 }
2778 }
2779
2780 if (i == ms_setting.rep_write_srv)
2781 {
2782 /* random get one replication server to read */
2783 sock_index= (int)(random() % c->total_sfds);
2784 }
2785 else
2786 {
2787 /* random get one replication writing server to write */
2788 sock_index= (int)(random() % ms_setting.rep_write_srv);
2789 }
2790 }
2791 else if (cmd == CMD_GET)
2792 {
2793 /* random get one replication server to read */
2794 sock_index= (int)(random() % c->total_sfds);
2795 }
2796 }
2797 while (c->tcpsfd[sock_index] == 0);
2798
2799 return sock_index;
2800 } /* ms_get_rep_sock_index */
2801
2802
2803 /**
2804 * get the next socket descriptor index to run
2805 *
2806 * @param c, pointer of the concurrency
2807 *
2808 * @return int, return the index
2809 */
2810 static int ms_get_next_sock_index(ms_conn_t *c)
2811 {
2812 int sock_index= 0;
2813
2814 do
2815 {
2816 sock_index= (++c->cur_idx == c->total_sfds) ? 0 : c->cur_idx;
2817 }
2818 while (c->tcpsfd[sock_index] == 0);
2819
2820 return sock_index;
2821 } /* ms_get_next_sock_index */
2822
2823
2824 /**
2825 * update socket event of the connections
2826 *
2827 * @param c, pointer of the concurrency
2828 *
2829 * @return int, if success, return 0, else return -1
2830 */
2831 static int ms_update_conn_sock_event(ms_conn_t *c)
2832 {
2833 assert(c != NULL);
2834
2835 switch (c->currcmd.cmd)
2836 {
2837 case CMD_SET:
2838 if (ms_setting.facebook_test && c->udp)
2839 {
2840 c->sfd= c->tcpsfd[0];
2841 c->udp= false;
2842 c->change_sfd= true;
2843 }
2844 break;
2845
2846 case CMD_GET:
2847 if (ms_setting.facebook_test && ! c->udp)
2848 {
2849 c->sfd= c->udpsfd;
2850 c->udp= true;
2851 c->change_sfd= true;
2852 }
2853 break;
2854
2855 default:
2856 break;
2857 } /* switch */
2858
2859 if (! c->udp && (c->total_sfds > 1))
2860 {
2861 if (c->cur_idx != c->total_sfds)
2862 {
2863 if (ms_setting.rep_write_srv == 0)
2864 {
2865 c->cur_idx= ms_get_next_sock_index(c);
2866 }
2867 else
2868 {
2869 c->cur_idx= ms_get_rep_sock_index(c, c->currcmd.cmd);
2870 }
2871 }
2872 else
2873 {
2874 /* must select the first sock of the connection at the beginning */
2875 c->cur_idx= 0;
2876 }
2877
2878 c->sfd= c->tcpsfd[c->cur_idx];
2879 assert(c->sfd != 0);
2880 c->change_sfd= true;
2881 }
2882
2883 if (c->change_sfd)
2884 {
2885 if (! ms_update_event(c, EV_WRITE | EV_PERSIST))
2886 {
2887 fprintf(stderr, "Couldn't update event.\n");
2888 ms_conn_set_state(c, conn_closing);
2889 return -1;
2890 }
2891 }
2892
2893 return 0;
2894 } /* ms_update_conn_sock_event */
2895
2896
2897 /**
2898 * for ASCII protocol, this function build the set command
2899 * string and send the command.
2900 *
2901 * @param c, pointer of the concurrency
2902 * @param item, pointer of task item which includes the object
2903 * information
2904 *
2905 * @return int, if success, return 0, else return -1
2906 */
2907 static int ms_build_ascii_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
2908 {
2909 int value_offset;
2910 int write_len;
2911 char *buffer= c->wbuf;
2912
2913 write_len= sprintf(buffer,
2914 " %u %d %d\r\n",
2915 0,
2916 item->exp_time,
2917 item->value_size);
2918
2919 if (write_len > c->wsize)
2920 {
2921 /* ought to be always enough. just fail for simplicity */
2922 fprintf(stderr, "output command line too long.\n");
2923 return -1;
2924 }
2925
2926 if (item->value_offset == INVALID_OFFSET)
2927 {
2928 value_offset= item->key_suffix_offset;
2929 }
2930 else
2931 {
2932 value_offset= item->value_offset;
2933 }
2934
2935 if ((ms_add_iov(c, "set ", 4) != 0)
2936 || (ms_add_iov(c, (char *)&item->key_prefix,
2937 (int)KEY_PREFIX_SIZE) != 0)
2938 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
2939 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
2940 || (ms_add_iov(c, buffer, write_len) != 0)
2941 || (ms_add_iov(c, &ms_setting.char_block[value_offset],
2942 item->value_size) != 0)
2943 || (ms_add_iov(c, "\r\n", 2) != 0)
2944 || (c->udp && (ms_build_udp_headers(c) != 0)))
2945 {
2946 return -1;
2947 }
2948
2949 return 0;
2950 } /* ms_build_ascii_write_buf_set */
2951
2952
2953 /**
2954 * used to send set command to server
2955 *
2956 * @param c, pointer of the concurrency
2957 * @param item, pointer of task item which includes the object
2958 * information
2959 *
2960 * @return int, if success, return 0, else return -1
2961 */
2962 int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
2963 {
2964 assert(c != NULL);
2965
2966 c->currcmd.cmd= CMD_SET;
2967 c->currcmd.isfinish= false;
2968 c->currcmd.retstat= MCD_FAILURE;
2969
2970 if (ms_update_conn_sock_event(c) != 0)
2971 {
2972 return -1;
2973 }
2974
2975 c->msgcurr= 0;
2976 c->msgused= 0;
2977 c->iovused= 0;
2978 if (ms_add_msghdr(c) != 0)
2979 {
2980 fprintf(stderr, "Out of memory preparing request.");
2981 return -1;
2982 }
2983
2984 /* binary protocol */
2985 if (c->protocol == binary_prot)
2986 {
2987 if (ms_build_bin_write_buf_set(c, item) != 0)
2988 {
2989 return -1;
2990 }
2991 }
2992 else
2993 {
2994 if (ms_build_ascii_write_buf_set(c, item) != 0)
2995 {
2996 return -1;
2997 }
2998 }
2999
3000 atomic_add_size(&ms_stats.obj_bytes,
3001 item->key_size + item->value_size);
3002 atomic_add_size(&ms_stats.cmd_set, 1);
3003
3004 return 0;
3005 } /* ms_mcd_set */
3006
3007
3008 /**
3009 * for ASCII protocol, this function build the get command
3010 * string and send the command.
3011 *
3012 * @param c, pointer of the concurrency
3013 * @param item, pointer of task item which includes the object
3014 * information
3015 *
3016 * @return int, if success, return 0, else return -1
3017 */
3018 static int ms_build_ascii_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3019 {
3020 if ((ms_add_iov(c, "get ", 4) != 0)
3021 || (ms_add_iov(c, (char *)&item->key_prefix,
3022 (int)KEY_PREFIX_SIZE) != 0)
3023 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3024 item->key_size - (int)KEY_PREFIX_SIZE) != 0)
3025 || (ms_add_iov(c, "\r\n", 2) != 0)
3026 || (c->udp && (ms_build_udp_headers(c) != 0)))
3027 {
3028 return -1;
3029 }
3030
3031 return 0;
3032 } /* ms_build_ascii_write_buf_get */
3033
3034
3035 /**
3036 * used to send the get command to server
3037 *
3038 * @param c, pointer of the concurrency
3039 * @param item, pointer of task item which includes the object
3040 * information
3041 * @param verify, whether do verification
3042 *
3043 * @return int, if success, return 0, else return -1
3044 */
3045 int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
3046 {
3047 /* verify not supported yet */
3048 UNUSED_ARGUMENT(verify);
3049
3050 assert(c != NULL);
3051
3052 c->currcmd.cmd= CMD_GET;
3053 c->currcmd.isfinish= false;
3054 c->currcmd.retstat= MCD_FAILURE;
3055
3056 if (ms_update_conn_sock_event(c) != 0)
3057 {
3058 return -1;
3059 }
3060
3061 c->msgcurr= 0;
3062 c->msgused= 0;
3063 c->iovused= 0;
3064 if (ms_add_msghdr(c) != 0)
3065 {
3066 fprintf(stderr, "Out of memory preparing request.");
3067 return -1;
3068 }
3069
3070 /* binary protocol */
3071 if (c->protocol == binary_prot)
3072 {
3073 if (ms_build_bin_write_buf_get(c, item) != 0)
3074 {
3075 return -1;
3076 }
3077 }
3078 else
3079 {
3080 if (ms_build_ascii_write_buf_get(c, item) != 0)
3081 {
3082 return -1;
3083 }
3084 }
3085
3086 atomic_add_size(&ms_stats.cmd_get, 1);
3087
3088 return 0;
3089 } /* ms_mcd_get */
3090
3091
3092 /**
3093 * for ASCII protocol, this function build the multi-get command
3094 * string and send the command.
3095 *
3096 * @param c, pointer of the concurrency
3097 *
3098 * @return int, if success, return 0, else return -1
3099 */
3100 static int ms_build_ascii_write_buf_mlget(ms_conn_t *c)
3101 {
3102 ms_task_item_t *item;
3103
3104 if (ms_add_iov(c, "get", 3) != 0)
3105 {
3106 return -1;
3107 }
3108
3109 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3110 {
3111 item= c->mlget_task.mlget_item[i].item;
3112 assert(item != NULL);
3113 if ((ms_add_iov(c, " ", 1) != 0)
3114 || (ms_add_iov(c, (char *)&item->key_prefix,
3115 (int)KEY_PREFIX_SIZE) != 0)
3116 || (ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3117 item->key_size - (int)KEY_PREFIX_SIZE) != 0))
3118 {
3119 return -1;
3120 }
3121 }
3122
3123 if ((ms_add_iov(c, "\r\n", 2) != 0)
3124 || (c->udp && (ms_build_udp_headers(c) != 0)))
3125 {
3126 return -1;
3127 }
3128
3129 return 0;
3130 } /* ms_build_ascii_write_buf_mlget */
3131
3132
3133 /**
3134 * used to send the multi-get command to server
3135 *
3136 * @param c, pointer of the concurrency
3137 *
3138 * @return int, if success, return 0, else return -1
3139 */
3140 int ms_mcd_mlget(ms_conn_t *c)
3141 {
3142 ms_task_item_t *item;
3143
3144 assert(c != NULL);
3145 assert(c->mlget_task.mlget_num >= 1);
3146
3147 c->currcmd.cmd= CMD_GET;
3148 c->currcmd.isfinish= false;
3149 c->currcmd.retstat= MCD_FAILURE;
3150
3151 if (ms_update_conn_sock_event(c) != 0)
3152 {
3153 return -1;
3154 }
3155
3156 c->msgcurr= 0;
3157 c->msgused= 0;
3158 c->iovused= 0;
3159 if (ms_add_msghdr(c) != 0)
3160 {
3161 fprintf(stderr, "Out of memory preparing request.");
3162 return -1;
3163 }
3164
3165 /* binary protocol */
3166 if (c->protocol == binary_prot)
3167 {
3168 if (ms_build_bin_write_buf_mlget(c) != 0)
3169 {
3170 return -1;
3171 }
3172 }
3173 else
3174 {
3175 if (ms_build_ascii_write_buf_mlget(c) != 0)
3176 {
3177 return -1;
3178 }
3179 }
3180
3181 /* decrease operation time of each item */
3182 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3183 {
3184 item= c->mlget_task.mlget_item[i].item;
3185 atomic_add_size(&ms_stats.cmd_get, 1);
3186 }
3187
3188 return 0;
3189 } /* ms_mcd_mlget */
3190
3191
3192 /**
3193 * binary protocol support
3194 */
3195
3196 /**
3197 * for binary protocol, parse the response of server
3198 *
3199 * @param c, pointer of the concurrency
3200 *
3201 * @return int, if success, return 0, else return -1
3202 */
3203 static int ms_bin_process_response(ms_conn_t *c)
3204 {
3205 const char *errstr= NULL;
3206
3207 assert(c != NULL);
3208
3209 uint32_t bodylen= c->binary_header.response.bodylen;
3210 uint8_t opcode= c->binary_header.response.opcode;
3211 uint16_t status= c->binary_header.response.status;
3212
3213 if (bodylen > 0)
3214 {
3215 c->rvbytes= (int32_t)bodylen;
3216 c->readval= true;
3217 return 1;
3218 }
3219 else
3220 {
3221 switch (status)
3222 {
3223 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
3224 if (opcode == PROTOCOL_BINARY_CMD_SET)
3225 {
3226 c->currcmd.retstat= MCD_STORED;
3227 }
3228 else if (opcode == PROTOCOL_BINARY_CMD_DELETE)
3229 {
3230 c->currcmd.retstat= MCD_DELETED;
3231 }
3232 else if (opcode == PROTOCOL_BINARY_CMD_GET)
3233 {
3234 c->currcmd.retstat= MCD_END;
3235 }
3236 break;
3237
3238 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
3239 errstr= "Out of memory";
3240 c->currcmd.retstat= MCD_SERVER_ERROR;
3241 break;
3242
3243 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
3244 errstr= "Unknown command";
3245 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3246 break;
3247
3248 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
3249 errstr= "Not found";
3250 c->currcmd.retstat= MCD_NOTFOUND;
3251 break;
3252
3253 case PROTOCOL_BINARY_RESPONSE_EINVAL:
3254 errstr= "Invalid arguments";
3255 c->currcmd.retstat= MCD_PROTOCOL_ERROR;
3256 break;
3257
3258 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
3259 errstr= "Data exists for key.";
3260 break;
3261
3262 case PROTOCOL_BINARY_RESPONSE_E2BIG:
3263 errstr= "Too large.";
3264 c->currcmd.retstat= MCD_SERVER_ERROR;
3265 break;
3266
3267 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
3268 errstr= "Not stored.";
3269 c->currcmd.retstat= MCD_NOTSTORED;
3270 break;
3271
3272 default:
3273 errstr= "Unknown error";
3274 c->currcmd.retstat= MCD_UNKNOWN_READ_FAILURE;
3275 break;
3276 } /* switch */
3277
3278 if (errstr != NULL)
3279 {
3280 fprintf(stderr, "%s\n", errstr);
3281 }
3282 }
3283
3284 return 0;
3285 } /* ms_bin_process_response */
3286
3287
3288 /* build binary header and add the header to the buffer to send */
3289
3290 /**
3291 * build binary header and add the header to the buffer to send
3292 *
3293 * @param c, pointer of the concurrency
3294 * @param opcode, operation code
3295 * @param hdr_len, length of header
3296 * @param key_len, length of key
3297 * @param body_len. length of body
3298 */
3299 static void ms_add_bin_header(ms_conn_t *c,
3300 uint8_t opcode,
3301 uint8_t hdr_len,
3302 uint16_t key_len,
3303 uint32_t body_len)
3304 {
3305 protocol_binary_request_header *header;
3306
3307 assert(c != NULL);
3308
3309 header= (protocol_binary_request_header *)c->wcurr;
3310
3311 header->request.magic= (uint8_t)PROTOCOL_BINARY_REQ;
3312 header->request.opcode= (uint8_t)opcode;
3313 header->request.keylen= htons(key_len);
3314
3315 header->request.extlen= (uint8_t)hdr_len;
3316 header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
3317 header->request.reserved= 0;
3318
3319 header->request.bodylen= htonl(body_len);
3320 header->request.opaque= 0;
3321 header->request.cas= 0;
3322
3323 ms_add_iov(c, c->wcurr, sizeof(header->request));
3324 } /* ms_add_bin_header */
3325
3326
3327 /**
3328 * add the key to the socket write buffer array
3329 *
3330 * @param c, pointer of the concurrency
3331 * @param item, pointer of task item which includes the object
3332 * information
3333 */
3334 static void ms_add_key_to_iov(ms_conn_t *c, ms_task_item_t *item)
3335 {
3336 ms_add_iov(c, (char *)&item->key_prefix, (int)KEY_PREFIX_SIZE);
3337 ms_add_iov(c, &ms_setting.char_block[item->key_suffix_offset],
3338 item->key_size - (int)KEY_PREFIX_SIZE);
3339 }
3340
3341
3342 /**
3343 * for binary protocol, this function build the set command
3344 * and add the command to send buffer array.
3345 *
3346 * @param c, pointer of the concurrency
3347 * @param item, pointer of task item which includes the object
3348 * information
3349 *
3350 * @return int, if success, return 0, else return -1
3351 */
3352 static int ms_build_bin_write_buf_set(ms_conn_t *c, ms_task_item_t *item)
3353 {
3354 assert(c->wbuf == c->wcurr);
3355
3356 int value_offset;
3357 protocol_binary_request_set *rep= (protocol_binary_request_set *)c->wcurr;
3358 uint16_t keylen= (uint16_t)item->key_size;
3359 uint32_t bodylen= (uint32_t)sizeof(rep->message.body)
3360 + (uint32_t)keylen + (uint32_t)item->value_size;
3361
3362 ms_add_bin_header(c,
3363 PROTOCOL_BINARY_CMD_SET,
3364 sizeof(rep->message.body),
3365 keylen,
3366 bodylen);
3367 rep->message.body.flags= 0;
3368 rep->message.body.expiration= htonl((uint32_t)item->exp_time);
3369 ms_add_iov(c, &rep->message.body, sizeof(rep->message.body));
3370 ms_add_key_to_iov(c, item);
3371
3372 if (item->value_offset == INVALID_OFFSET)
3373 {
3374 value_offset= item->key_suffix_offset;
3375 }
3376 else
3377 {
3378 value_offset= item->value_offset;
3379 }
3380 ms_add_iov(c, &ms_setting.char_block[value_offset], item->value_size);
3381
3382 return 0;
3383 } /* ms_build_bin_write_buf_set */
3384
3385
3386 /**
3387 * for binary protocol, this function build the get command and
3388 * add the command to send buffer array.
3389 *
3390 * @param c, pointer of the concurrency
3391 * @param item, pointer of task item which includes the object
3392 * information
3393 *
3394 * @return int, if success, return 0, else return -1
3395 */
3396 static int ms_build_bin_write_buf_get(ms_conn_t *c, ms_task_item_t *item)
3397 {
3398 assert(c->wbuf == c->wcurr);
3399
3400 ms_add_bin_header(c, PROTOCOL_BINARY_CMD_GET, 0, (uint16_t)item->key_size,
3401 (uint32_t)item->key_size);
3402 ms_add_key_to_iov(c, item);
3403
3404 return 0;
3405 } /* ms_build_bin_write_buf_get */
3406
3407
3408 /**
3409 * for binary protocol, this function build the multi-get
3410 * command and add the command to send buffer array.
3411 *
3412 * @param c, pointer of the concurrency
3413 * @param item, pointer of task item which includes the object
3414 * information
3415 *
3416 * @return int, if success, return 0, else return -1
3417 */
3418 static int ms_build_bin_write_buf_mlget(ms_conn_t *c)
3419 {
3420 ms_task_item_t *item;
3421
3422 assert(c->wbuf == c->wcurr);
3423
3424 for (int i= 0; i < c->mlget_task.mlget_num; i++)
3425 {
3426 item= c->mlget_task.mlget_item[i].item;
3427 assert(item != NULL);
3428
3429 ms_add_bin_header(c,
3430 PROTOCOL_BINARY_CMD_GET,
3431 0,
3432 (uint16_t)item->key_size,
3433 (uint32_t)item->key_size);
3434 ms_add_key_to_iov(c, item);
3435 c->wcurr+= sizeof(protocol_binary_request_get);
3436 }
3437
3438 c->wcurr= c->wbuf;
3439
3440 return 0;
3441 } /* ms_build_bin_write_buf_mlget */