10374421c39f8b068e8cc221a39c7ef8e9ba125e
[m6w6/libmemcached] / memcached / memcached.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * memcached - memory caching daemon
4 *
5 * http://www.danga.com/memcached/
6 *
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
8 *
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
11 *
12 * Authors:
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
15 */
16 #include "memcached.h"
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <signal.h>
21 #include <sys/resource.h>
22 #include <sys/uio.h>
23 #include <ctype.h>
24 #include <stdarg.h>
25
26 /* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h. */
28 #ifndef _P1003_1B_VISIBLE
29 #define _P1003_1B_VISIBLE
30 #endif
31 /* need this to get IOV_MAX on some platforms. */
32 #ifndef __need_IOV_MAX
33 #define __need_IOV_MAX
34 #endif
35 #include <pwd.h>
36 #include <sys/mman.h>
37 #include <fcntl.h>
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <time.h>
45 #include <assert.h>
46 #include <limits.h>
47 #include <sysexits.h>
48 #include <stddef.h>
49
50 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
51 #ifndef IOV_MAX
52 #if defined(__FreeBSD__) || defined(__APPLE__)
53 # define IOV_MAX 1024
54 #endif
55 #endif
56
57 /*
58 * forward declarations
59 */
60 static void drive_machine(conn *c);
61 static int new_socket(struct addrinfo *ai);
62 static int try_read_command(conn *c);
63
64 enum try_read_result {
65 READ_DATA_RECEIVED,
66 READ_NO_DATA_RECEIVED,
67 READ_ERROR, /** an error occured (on the socket) (or client closed connection) */
68 READ_MEMORY_ERROR /** failed to allocate more memory */
69 };
70
71 static enum try_read_result try_read_network(conn *c);
72 static enum try_read_result try_read_udp(conn *c);
73
74 static void conn_set_state(conn *c, enum conn_states state);
75
76 /* stats */
77 static void stats_init(void);
78 static void server_stats(ADD_STAT add_stats, conn *c);
79 static void process_stat_settings(ADD_STAT add_stats, void *c);
80
81
82 /* defaults */
83 static void settings_init(void);
84
85 /* event handling, network IO */
86 static void event_handler(const int fd, const short which, void *arg);
87 static void conn_close(conn *c);
88 static void conn_init(void);
89 static bool update_event(conn *c, const int new_flags);
90 static void complete_nread(conn *c);
91 static void process_command(conn *c, char *command);
92 static void write_and_free(conn *c, char *buf, int bytes);
93 static int ensure_iov_space(conn *c);
94 static int add_iov(conn *c, const void *buf, int len);
95 static int add_msghdr(conn *c);
96
97
98 static void conn_free(conn *c);
99
100 /** exported globals **/
101 struct stats stats;
102 struct settings settings;
103 time_t process_started; /* when the process was started */
104
105 struct slab_rebalance slab_rebal;
106 volatile int slab_rebalance_signal;
107
108 /** file scope variables **/
109 static conn *listen_conn = NULL;
110 static struct event_base *main_base;
111
112 enum transmit_result {
113 TRANSMIT_COMPLETE, /** All done writing. */
114 TRANSMIT_INCOMPLETE, /** More data remaining to write. */
115 TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
116 TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
117 };
118
119 static enum transmit_result transmit(conn *c);
120
121 /* This reduces the latency without adding lots of extra wiring to be able to
122 * notify the listener thread of when to listen again.
123 * Also, the clock timer could be broken out into its own thread and we
124 * can block the listener via a condition.
125 */
126 static volatile bool allow_new_conns = true;
127 static struct event maxconnsevent;
128 #ifndef __INTEL_COMPILER
129 #pragma GCC diagnostic ignored "-Wunused-parameter"
130 #endif
131 static void maxconns_handler(const int fd, const short which, void *arg) {
132 struct timeval t = {.tv_sec = 0, .tv_usec = 10000};
133
134 if (fd == -42 || allow_new_conns == false) {
135 /* reschedule in 10ms if we need to keep polling */
136 evtimer_set(&maxconnsevent, maxconns_handler, 0);
137 event_base_set(main_base, &maxconnsevent);
138 evtimer_add(&maxconnsevent, &t);
139 } else {
140 evtimer_del(&maxconnsevent);
141 accept_new_conns(true);
142 }
143 }
144
145 #define REALTIME_MAXDELTA 60*60*24*30
146
147 /*
148 * given time value that's either unix time or delta from current unix time, return
149 * unix time. Use the fact that delta can't exceed one month (and real time value can't
150 * be that low).
151 */
152 static rel_time_t realtime(const time_t exptime) {
153 /* no. of seconds in 30 days - largest possible delta exptime */
154
155 if (exptime == 0) return 0; /* 0 means never expire */
156
157 if (exptime > REALTIME_MAXDELTA) {
158 /* if item expiration is at/before the server started, give it an
159 expiration time of 1 second after the server started.
160 (because 0 means don't expire). without this, we'd
161 underflow and wrap around to some large value way in the
162 future, effectively making items expiring in the past
163 really expiring never */
164 if (exptime <= process_started)
165 return (rel_time_t)1;
166 return (rel_time_t)(exptime - process_started);
167 } else {
168 return (rel_time_t)(exptime + current_time);
169 }
170 }
171
172 static void stats_init(void) {
173 stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
174 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = stats.reclaimed = 0;
175 stats.touch_cmds = stats.touch_misses = stats.touch_hits = stats.rejected_conns = 0;
176 stats.curr_bytes = stats.listen_disabled_num = 0;
177 stats.hash_power_level = stats.hash_bytes = stats.hash_is_expanding = 0;
178 stats.expired_unfetched = stats.evicted_unfetched = 0;
179 stats.slabs_moved = 0;
180 stats.accepting_conns = true; /* assuming we start in this state. */
181 stats.slab_reassign_running = false;
182
183 /* make the time we started always be 2 seconds before we really
184 did, so time(0) - time.started is never zero. if so, things
185 like 'settings.oldest_live' which act as booleans as well as
186 values are now false in boolean context... */
187 process_started = time(0) - 2;
188 stats_prefix_init();
189 }
190
191 static void stats_reset(void) {
192 STATS_LOCK();
193 stats.total_items = stats.total_conns = 0;
194 stats.rejected_conns = 0;
195 stats.evictions = 0;
196 stats.reclaimed = 0;
197 stats.listen_disabled_num = 0;
198 stats_prefix_clear();
199 STATS_UNLOCK();
200 threadlocal_stats_reset();
201 item_stats_reset();
202 }
203
204 static void settings_init(void) {
205 settings.use_cas = true;
206 settings.access = 0700;
207 settings.port = 11211;
208 settings.udpport = 11211;
209 /* By default this string should be NULL for getaddrinfo() */
210 settings.inter = NULL;
211 settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
212 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
213 settings.verbose = 0;
214 settings.oldest_live = 0;
215 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
216 settings.socketpath = NULL; /* by default, not using a unix socket */
217 settings.factor = 1.25;
218 settings.chunk_size = 48; /* space for a modest key and value */
219 settings.num_threads = 4; /* N workers */
220 settings.num_threads_per_udp = 0;
221 settings.prefix_delimiter = ':';
222 settings.detail_enabled = 0;
223 settings.reqs_per_event = 20;
224 settings.backlog = 1024;
225 settings.binding_protocol = negotiating_prot;
226 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
227 settings.maxconns_fast = false;
228 settings.hashpower_init = 0;
229 settings.slab_reassign = false;
230 settings.slab_automove = false;
231 }
232
233 /*
234 * Adds a message header to a connection.
235 *
236 * Returns 0 on success, -1 on out-of-memory.
237 */
238 static int add_msghdr(conn *c)
239 {
240 struct msghdr *msg;
241
242 assert(c != NULL);
243
244 if (c->msgsize == c->msgused) {
245 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
246 if (! msg)
247 return -1;
248 c->msglist = msg;
249 c->msgsize *= 2;
250 }
251
252 msg = c->msglist + c->msgused;
253
254 /* this wipes msg_iovlen, msg_control, msg_controllen, and
255 msg_flags, the last 3 of which aren't defined on solaris: */
256 memset(msg, 0, sizeof(struct msghdr));
257
258 msg->msg_iov = &c->iov[c->iovused];
259
260 if (c->request_addr_size > 0) {
261 msg->msg_name = &c->request_addr;
262 msg->msg_namelen = c->request_addr_size;
263 }
264
265 c->msgbytes = 0;
266 c->msgused++;
267
268 if (IS_UDP(c->transport)) {
269 /* Leave room for the UDP header, which we'll fill in later. */
270 return add_iov(c, NULL, UDP_HEADER_SIZE);
271 }
272
273 return 0;
274 }
275
276
277 /*
278 * Free list management for connections.
279 */
280
281 static conn **freeconns;
282 static int freetotal;
283 static int freecurr;
284 /* Lock for connection freelist */
285 static pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
286
287
288 static void conn_init(void) {
289 freetotal = 200;
290 freecurr = 0;
291 if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
292 fprintf(stderr, "Failed to allocate connection structures\n");
293 }
294 return;
295 }
296
297 /*
298 * Returns a connection from the freelist, if any.
299 */
300 conn *conn_from_freelist() {
301 conn *c;
302
303 pthread_mutex_lock(&conn_lock);
304 if (freecurr > 0) {
305 c = freeconns[--freecurr];
306 } else {
307 c = NULL;
308 }
309 pthread_mutex_unlock(&conn_lock);
310
311 return c;
312 }
313
314 /*
315 * Adds a connection to the freelist. 0 = success.
316 */
317 bool conn_add_to_freelist(conn *c) {
318 bool ret = true;
319 pthread_mutex_lock(&conn_lock);
320 if (freecurr < freetotal) {
321 freeconns[freecurr++] = c;
322 ret = false;
323 } else {
324 /* try to enlarge free connections array */
325 size_t newsize = freetotal * 2;
326 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
327 if (new_freeconns) {
328 freetotal = newsize;
329 freeconns = new_freeconns;
330 freeconns[freecurr++] = c;
331 ret = false;
332 }
333 }
334 pthread_mutex_unlock(&conn_lock);
335 return ret;
336 }
337
338 static const char *prot_text(enum protocol prot) {
339 const char *rv = "unknown";
340 switch(prot) {
341 case ascii_prot:
342 rv = "ascii";
343 break;
344 case binary_prot:
345 rv = "binary";
346 break;
347 case negotiating_prot:
348 rv = "auto-negotiate";
349 break;
350 default:
351 abort();
352 }
353 return rv;
354 }
355
356 conn *conn_new(const int sfd, enum conn_states init_state,
357 const int event_flags,
358 const int read_buffer_size, enum network_transport transport,
359 struct event_base *base) {
360 conn *c = conn_from_freelist();
361
362 if (NULL == c) {
363 if (!(c = (conn *)calloc(1, sizeof(conn)))) {
364 fprintf(stderr, "calloc()\n");
365 return NULL;
366 }
367 MEMCACHED_CONN_CREATE(c);
368
369 c->rbuf = c->wbuf = 0;
370 c->ilist = 0;
371 c->suffixlist = 0;
372 c->iov = 0;
373 c->msglist = 0;
374 c->hdrbuf = 0;
375
376 c->rsize = read_buffer_size;
377 c->wsize = DATA_BUFFER_SIZE;
378 c->isize = ITEM_LIST_INITIAL;
379 c->suffixsize = SUFFIX_LIST_INITIAL;
380 c->iovsize = IOV_LIST_INITIAL;
381 c->msgsize = MSG_LIST_INITIAL;
382 c->hdrsize = 0;
383
384 c->rbuf = (char *)malloc((size_t)c->rsize);
385 c->wbuf = (char *)malloc((size_t)c->wsize);
386 c->ilist = (item **)malloc(sizeof(item *) * c->isize);
387 c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
388 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
389 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
390
391 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
392 c->msglist == 0 || c->suffixlist == 0) {
393 conn_free(c);
394 fprintf(stderr, "malloc()\n");
395 return NULL;
396 }
397
398 STATS_LOCK();
399 stats.conn_structs++;
400 STATS_UNLOCK();
401 }
402
403 c->transport = transport;
404 c->protocol = settings.binding_protocol;
405
406 /* unix socket mode doesn't need this, so zeroed out. but why
407 * is this done for every command? presumably for UDP
408 * mode. */
409 if (!settings.socketpath) {
410 c->request_addr_size = sizeof(c->request_addr);
411 } else {
412 c->request_addr_size = 0;
413 }
414
415 if (settings.verbose > 1) {
416 if (init_state == conn_listening) {
417 fprintf(stderr, "<%d server listening (%s)\n", sfd,
418 prot_text(c->protocol));
419 } else if (IS_UDP(transport)) {
420 fprintf(stderr, "<%d server listening (udp)\n", sfd);
421 } else if (c->protocol == negotiating_prot) {
422 fprintf(stderr, "<%d new auto-negotiating client connection\n",
423 sfd);
424 } else if (c->protocol == ascii_prot) {
425 fprintf(stderr, "<%d new ascii client connection.\n", sfd);
426 } else if (c->protocol == binary_prot) {
427 fprintf(stderr, "<%d new binary client connection.\n", sfd);
428 } else {
429 fprintf(stderr, "<%d new unknown (%d) client connection\n",
430 sfd, c->protocol);
431 assert(false);
432 }
433 }
434
435 c->sfd = sfd;
436 c->state = init_state;
437 c->rlbytes = 0;
438 c->cmd = -1;
439 c->rbytes = c->wbytes = 0;
440 c->wcurr = c->wbuf;
441 c->rcurr = c->rbuf;
442 c->ritem = 0;
443 c->icurr = c->ilist;
444 c->suffixcurr = c->suffixlist;
445 c->ileft = 0;
446 c->suffixleft = 0;
447 c->iovused = 0;
448 c->msgcurr = 0;
449 c->msgused = 0;
450
451 c->write_and_go = init_state;
452 c->write_and_free = 0;
453 c->item = 0;
454
455 c->noreply = false;
456
457 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
458 event_base_set(base, &c->event);
459 c->ev_flags = event_flags;
460
461 if (event_add(&c->event, 0) == -1) {
462 if (conn_add_to_freelist(c)) {
463 conn_free(c);
464 }
465 perror("event_add");
466 return NULL;
467 }
468
469 STATS_LOCK();
470 stats.curr_conns++;
471 stats.total_conns++;
472 STATS_UNLOCK();
473
474 MEMCACHED_CONN_ALLOCATE(c->sfd);
475
476 return c;
477 }
478
479 static void conn_cleanup(conn *c) {
480 assert(c != NULL);
481
482 if (c->item) {
483 item_remove(c->item);
484 c->item = 0;
485 }
486
487 if (c->ileft != 0) {
488 for (; c->ileft > 0; c->ileft--,c->icurr++) {
489 item_remove(*(c->icurr));
490 }
491 }
492
493 if (c->suffixleft != 0) {
494 for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
495 cache_free(c->thread->suffix_cache, *(c->suffixcurr));
496 }
497 }
498
499 if (c->write_and_free) {
500 free(c->write_and_free);
501 c->write_and_free = 0;
502 }
503
504 if (c->sasl_conn) {
505 assert(settings.sasl);
506 sasl_dispose(&c->sasl_conn);
507 c->sasl_conn = NULL;
508 }
509
510 if (IS_UDP(c->transport)) {
511 conn_set_state(c, conn_read);
512 }
513 }
514
515 /*
516 * Frees a connection.
517 */
518 void conn_free(conn *c) {
519 if (c) {
520 MEMCACHED_CONN_DESTROY(c);
521 if (c->hdrbuf)
522 free(c->hdrbuf);
523 if (c->msglist)
524 free(c->msglist);
525 if (c->rbuf)
526 free(c->rbuf);
527 if (c->wbuf)
528 free(c->wbuf);
529 if (c->ilist)
530 free(c->ilist);
531 if (c->suffixlist)
532 free(c->suffixlist);
533 if (c->iov)
534 free(c->iov);
535 free(c);
536 }
537 }
538
539 static void conn_close(conn *c) {
540 assert(c != NULL);
541
542 /* delete the event, the socket and the conn */
543 event_del(&c->event);
544
545 if (settings.verbose > 1)
546 fprintf(stderr, "<%d connection closed.\n", c->sfd);
547
548 MEMCACHED_CONN_RELEASE(c->sfd);
549 close(c->sfd);
550 pthread_mutex_lock(&conn_lock);
551 allow_new_conns = true;
552 pthread_mutex_unlock(&conn_lock);
553 conn_cleanup(c);
554
555 /* if the connection has big buffers, just free it */
556 if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
557 conn_free(c);
558 }
559
560 STATS_LOCK();
561 stats.curr_conns--;
562 STATS_UNLOCK();
563
564 return;
565 }
566
567 /*
568 * Shrinks a connection's buffers if they're too big. This prevents
569 * periodic large "get" requests from permanently chewing lots of server
570 * memory.
571 *
572 * This should only be called in between requests since it can wipe output
573 * buffers!
574 */
575 static void conn_shrink(conn *c) {
576 assert(c != NULL);
577
578 if (IS_UDP(c->transport))
579 return;
580
581 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
582 char *newbuf;
583
584 if (c->rcurr != c->rbuf)
585 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
586
587 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
588
589 if (newbuf) {
590 c->rbuf = newbuf;
591 c->rsize = DATA_BUFFER_SIZE;
592 }
593 /* TODO check other branch... */
594 c->rcurr = c->rbuf;
595 }
596
597 if (c->isize > ITEM_LIST_HIGHWAT) {
598 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
599 if (newbuf) {
600 c->ilist = newbuf;
601 c->isize = ITEM_LIST_INITIAL;
602 }
603 /* TODO check error condition? */
604 }
605
606 if (c->msgsize > MSG_LIST_HIGHWAT) {
607 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
608 if (newbuf) {
609 c->msglist = newbuf;
610 c->msgsize = MSG_LIST_INITIAL;
611 }
612 /* TODO check error condition? */
613 }
614
615 if (c->iovsize > IOV_LIST_HIGHWAT) {
616 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
617 if (newbuf) {
618 c->iov = newbuf;
619 c->iovsize = IOV_LIST_INITIAL;
620 }
621 /* TODO check return value */
622 }
623 }
624
625 /**
626 * Convert a state name to a human readable form.
627 */
628 static const char *state_text(enum conn_states state) {
629 const char* const statenames[] = { "conn_listening",
630 "conn_new_cmd",
631 "conn_waiting",
632 "conn_read",
633 "conn_parse_cmd",
634 "conn_write",
635 "conn_nread",
636 "conn_swallow",
637 "conn_closing",
638 "conn_mwrite" };
639 return statenames[state];
640 }
641
642 #ifndef __INTEL_COMPILER
643 #pragma GCC diagnostic ignored "-Wtype-limits"
644 #endif
645 /*
646 * Sets a connection's current state in the state machine. Any special
647 * processing that needs to happen on certain state transitions can
648 * happen here.
649 */
650 static void conn_set_state(conn *c, enum conn_states state) {
651 assert(c != NULL);
652 assert(state >= conn_listening && state < conn_max_state);
653
654 if (state != c->state) {
655 if (settings.verbose > 2) {
656 fprintf(stderr, "%d: going from %s to %s\n",
657 c->sfd, state_text(c->state),
658 state_text(state));
659 }
660
661 if (state == conn_write || state == conn_mwrite) {
662 MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
663 }
664 c->state = state;
665 }
666 }
667
668 /*
669 * Ensures that there is room for another struct iovec in a connection's
670 * iov list.
671 *
672 * Returns 0 on success, -1 on out-of-memory.
673 */
674 static int ensure_iov_space(conn *c) {
675 assert(c != NULL);
676
677 if (c->iovused >= c->iovsize) {
678 int i, iovnum;
679 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
680 (c->iovsize * 2) * sizeof(struct iovec));
681 if (! new_iov)
682 return -1;
683 c->iov = new_iov;
684 c->iovsize *= 2;
685
686 /* Point all the msghdr structures at the new list. */
687 for (i = 0, iovnum = 0; i < c->msgused; i++) {
688 c->msglist[i].msg_iov = &c->iov[iovnum];
689 iovnum += c->msglist[i].msg_iovlen;
690 }
691 }
692
693 return 0;
694 }
695
696
697 /*
698 * Adds data to the list of pending data that will be written out to a
699 * connection.
700 *
701 * Returns 0 on success, -1 on out-of-memory.
702 */
703
704 static int add_iov(conn *c, const void *buf, int len) {
705 struct msghdr *m;
706 int leftover;
707 bool limit_to_mtu;
708
709 assert(c != NULL);
710
711 do {
712 m = &c->msglist[c->msgused - 1];
713
714 /*
715 * Limit UDP packets, and the first payloads of TCP replies, to
716 * UDP_MAX_PAYLOAD_SIZE bytes.
717 */
718 limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
719
720 /* We may need to start a new msghdr if this one is full. */
721 if (m->msg_iovlen == IOV_MAX ||
722 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
723 add_msghdr(c);
724 m = &c->msglist[c->msgused - 1];
725 }
726
727 if (ensure_iov_space(c) != 0)
728 return -1;
729
730 /* If the fragment is too big to fit in the datagram, split it up */
731 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
732 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
733 len -= leftover;
734 } else {
735 leftover = 0;
736 }
737
738 m = &c->msglist[c->msgused - 1];
739 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
740 m->msg_iov[m->msg_iovlen].iov_len = len;
741
742 c->msgbytes += len;
743 c->iovused++;
744 m->msg_iovlen++;
745
746 buf = ((char *)buf) + len;
747 len = leftover;
748 } while (leftover > 0);
749
750 return 0;
751 }
752
753
754 /*
755 * Constructs a set of UDP headers and attaches them to the outgoing messages.
756 */
757 static int build_udp_headers(conn *c) {
758 int i;
759 unsigned char *hdr;
760
761 assert(c != NULL);
762
763 if (c->msgused > c->hdrsize) {
764 void *new_hdrbuf;
765 if (c->hdrbuf)
766 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
767 else
768 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
769 if (! new_hdrbuf)
770 return -1;
771 c->hdrbuf = (unsigned char *)new_hdrbuf;
772 c->hdrsize = c->msgused * 2;
773 }
774
775 hdr = c->hdrbuf;
776 for (i = 0; i < c->msgused; i++) {
777 c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
778 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
779 *hdr++ = c->request_id / 256;
780 *hdr++ = c->request_id % 256;
781 *hdr++ = i / 256;
782 *hdr++ = i % 256;
783 *hdr++ = c->msgused / 256;
784 *hdr++ = c->msgused % 256;
785 *hdr++ = 0;
786 *hdr++ = 0;
787 assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
788 }
789
790 return 0;
791 }
792
793
794 #ifndef __INTEL_COMPILER
795 #pragma GCC diagnostic ignored "-Wsign-compare"
796 #endif
797 static void out_string(conn *c, const char *str) {
798 size_t len;
799
800 assert(c != NULL);
801
802 if (c->noreply) {
803 if (settings.verbose > 1)
804 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
805 c->noreply = false;
806 conn_set_state(c, conn_new_cmd);
807 return;
808 }
809
810 if (settings.verbose > 1)
811 fprintf(stderr, ">%d %s\n", c->sfd, str);
812
813 /* Nuke a partial output... */
814 c->msgcurr = 0;
815 c->msgused = 0;
816 c->iovused = 0;
817 add_msghdr(c);
818
819 len = strlen(str);
820 if ((len + 2) > c->wsize) {
821 /* ought to be always enough. just fail for simplicity */
822 str = "SERVER_ERROR output line too long";
823 len = strlen(str);
824 }
825
826 memcpy(c->wbuf, str, len);
827 memcpy(c->wbuf + len, "\r\n", 2);
828 c->wbytes = len + 2;
829 c->wcurr = c->wbuf;
830
831 conn_set_state(c, conn_write);
832 c->write_and_go = conn_new_cmd;
833 return;
834 }
835
836 /*
837 * we get here after reading the value in set/add/replace commands. The command
838 * has been stored in c->cmd, and the item is ready in c->item.
839 */
840 static void complete_nread_ascii(conn *c) {
841 assert(c != NULL);
842
843 item *it = c->item;
844 int comm = c->cmd;
845 enum store_item_type ret;
846
847 pthread_mutex_lock(&c->thread->stats.mutex);
848 c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
849 pthread_mutex_unlock(&c->thread->stats.mutex);
850
851 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
852 out_string(c, "CLIENT_ERROR bad data chunk");
853 } else {
854 ret = store_item(it, comm, c);
855
856 #ifdef ENABLE_DTRACE
857 uint64_t cas = ITEM_get_cas(it);
858 switch (c->cmd) {
859 case NREAD_ADD:
860 MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
861 (ret == 1) ? it->nbytes : -1, cas);
862 break;
863 case NREAD_REPLACE:
864 MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
865 (ret == 1) ? it->nbytes : -1, cas);
866 break;
867 case NREAD_APPEND:
868 MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
869 (ret == 1) ? it->nbytes : -1, cas);
870 break;
871 case NREAD_PREPEND:
872 MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
873 (ret == 1) ? it->nbytes : -1, cas);
874 break;
875 case NREAD_SET:
876 MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
877 (ret == 1) ? it->nbytes : -1, cas);
878 break;
879 case NREAD_CAS:
880 MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes,
881 cas);
882 break;
883 }
884 #endif
885
886 switch (ret) {
887 case STORED:
888 out_string(c, "STORED");
889 break;
890 case EXISTS:
891 out_string(c, "EXISTS");
892 break;
893 case NOT_FOUND:
894 out_string(c, "NOT_FOUND");
895 break;
896 case NOT_STORED:
897 out_string(c, "NOT_STORED");
898 break;
899 default:
900 out_string(c, "SERVER_ERROR Unhandled storage type.");
901 }
902
903 }
904
905 item_remove(c->item); /* release the c->item reference */
906 c->item = 0;
907 }
908
909 /**
910 * get a pointer to the start of the request struct for the current command
911 */
912 static void* binary_get_request(conn *c) {
913 char *ret = c->rcurr;
914 ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
915 c->binary_header.request.extlen);
916
917 assert(ret >= c->rbuf);
918 return ret;
919 }
920
921 /**
922 * get a pointer to the key in this request
923 */
924 static char* binary_get_key(conn *c) {
925 return c->rcurr - (c->binary_header.request.keylen);
926 }
927
928 static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
929 protocol_binary_response_header* header;
930
931 assert(c);
932
933 c->msgcurr = 0;
934 c->msgused = 0;
935 c->iovused = 0;
936 if (add_msghdr(c) != 0) {
937 /* XXX: out_string is inappropriate here */
938 out_string(c, "SERVER_ERROR out of memory");
939 return;
940 }
941
942 header = (protocol_binary_response_header *)c->wbuf;
943
944 header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
945 header->response.opcode = c->binary_header.request.opcode;
946 header->response.keylen = (uint16_t)htons(key_len);
947
948 header->response.extlen = (uint8_t)hdr_len;
949 header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
950 header->response.status = (uint16_t)htons(err);
951
952 header->response.bodylen = htonl(body_len);
953 header->response.opaque = c->opaque;
954 header->response.cas = htonll(c->cas);
955
956 if (settings.verbose > 1) {
957 int ii;
958 fprintf(stderr, ">%d Writing bin response:", c->sfd);
959 for (ii = 0; ii < sizeof(header->bytes); ++ii) {
960 if (ii % 4 == 0) {
961 fprintf(stderr, "\n>%d ", c->sfd);
962 }
963 fprintf(stderr, " 0x%02x", header->bytes[ii]);
964 }
965 fprintf(stderr, "\n");
966 }
967
968 add_iov(c, c->wbuf, sizeof(header->response));
969 }
970
971 static void write_bin_error(conn *c, protocol_binary_response_status err, int swallow) {
972 const char *errstr = "Unknown error";
973 size_t len;
974
975 switch (err) {
976 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
977 errstr = "Out of memory";
978 break;
979 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
980 errstr = "Unknown command";
981 break;
982 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
983 errstr = "Not found";
984 break;
985 case PROTOCOL_BINARY_RESPONSE_EINVAL:
986 errstr = "Invalid arguments";
987 break;
988 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
989 errstr = "Data exists for key.";
990 break;
991 case PROTOCOL_BINARY_RESPONSE_E2BIG:
992 errstr = "Too large.";
993 break;
994 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
995 errstr = "Non-numeric server-side value for incr or decr";
996 break;
997 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
998 errstr = "Not stored.";
999 break;
1000 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1001 errstr = "Auth failure.";
1002 break;
1003 case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE:
1004 assert(false);
1005 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
1006 assert(false);
1007 default:
1008 assert(false);
1009 errstr = "UNHANDLED ERROR";
1010 fprintf(stderr, ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1011 }
1012
1013 if (settings.verbose > 1) {
1014 fprintf(stderr, ">%d Writing an error: %s\n", c->sfd, errstr);
1015 }
1016
1017 len = strlen(errstr);
1018 add_bin_header(c, err, 0, 0, len);
1019 if (len > 0) {
1020 add_iov(c, errstr, len);
1021 }
1022 conn_set_state(c, conn_mwrite);
1023 if(swallow > 0) {
1024 c->sbytes = swallow;
1025 c->write_and_go = conn_swallow;
1026 } else {
1027 c->write_and_go = conn_new_cmd;
1028 }
1029 }
1030
1031 /* Form and send a response to a command over the binary protocol */
1032 static void write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen) {
1033 if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1034 c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1035 add_bin_header(c, 0, hlen, keylen, dlen);
1036 if(dlen > 0) {
1037 add_iov(c, d, dlen);
1038 }
1039 conn_set_state(c, conn_mwrite);
1040 c->write_and_go = conn_new_cmd;
1041 } else {
1042 conn_set_state(c, conn_new_cmd);
1043 }
1044 }
1045
1046 static void complete_incr_bin(conn *c) {
1047 item *it;
1048 char *key;
1049 size_t nkey;
1050 /* Weird magic in add_delta forces me to pad here */
1051 char tmpbuf[INCR_MAX_STORAGE_LEN];
1052 uint64_t cas = 0;
1053
1054 protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
1055 protocol_binary_request_incr* req = binary_get_request(c);
1056
1057 assert(c != NULL);
1058 assert(c->wsize >= sizeof(*rsp));
1059
1060 /* fix byteorder in the request */
1061 req->message.body.delta = ntohll(req->message.body.delta);
1062 req->message.body.initial = ntohll(req->message.body.initial);
1063 req->message.body.expiration = ntohl(req->message.body.expiration);
1064 key = binary_get_key(c);
1065 nkey = c->binary_header.request.keylen;
1066
1067 if (settings.verbose > 1) {
1068 int i;
1069 fprintf(stderr, "incr ");
1070
1071 for (i = 0; i < nkey; i++) {
1072 fprintf(stderr, "%c", key[i]);
1073 }
1074 fprintf(stderr, " %lld, %llu, %d\n",
1075 (long long)req->message.body.delta,
1076 (long long)req->message.body.initial,
1077 req->message.body.expiration);
1078 }
1079
1080 if (c->binary_header.request.cas != 0) {
1081 cas = c->binary_header.request.cas;
1082 }
1083 switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
1084 req->message.body.delta, tmpbuf,
1085 &cas)) {
1086 case OK:
1087 rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
1088 if (cas) {
1089 c->cas = cas;
1090 }
1091 write_bin_response(c, &rsp->message.body, 0, 0,
1092 sizeof(rsp->message.body.value));
1093 break;
1094 case NON_NUMERIC:
1095 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
1096 break;
1097 case EOM:
1098 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1099 break;
1100 case DELTA_ITEM_NOT_FOUND:
1101 if (req->message.body.expiration != 0xffffffff) {
1102 /* Save some room for the response */
1103 rsp->message.body.value = htonll(req->message.body.initial);
1104 it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
1105 INCR_MAX_STORAGE_LEN);
1106
1107 if (it != NULL) {
1108 snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
1109 (unsigned long long)req->message.body.initial);
1110
1111 if (store_item(it, NREAD_ADD, c)) {
1112 c->cas = ITEM_get_cas(it);
1113 write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
1114 } else {
1115 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1116 }
1117 item_remove(it); /* release our reference */
1118 } else {
1119 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1120 }
1121 } else {
1122 pthread_mutex_lock(&c->thread->stats.mutex);
1123 if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1124 c->thread->stats.incr_misses++;
1125 } else {
1126 c->thread->stats.decr_misses++;
1127 }
1128 pthread_mutex_unlock(&c->thread->stats.mutex);
1129
1130 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1131 }
1132 break;
1133 case DELTA_ITEM_CAS_MISMATCH:
1134 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1135 break;
1136
1137 default:
1138 assert(0);
1139 abort();
1140 }
1141 }
1142
1143 static void complete_update_bin(conn *c) {
1144 protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1145 enum store_item_type ret = NOT_STORED;
1146 assert(c != NULL);
1147
1148 item *it = c->item;
1149
1150 pthread_mutex_lock(&c->thread->stats.mutex);
1151 c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
1152 pthread_mutex_unlock(&c->thread->stats.mutex);
1153
1154 /* We don't actually receive the trailing two characters in the bin
1155 * protocol, so we're going to just set them here */
1156 *(ITEM_data(it) + it->nbytes - 2) = '\r';
1157 *(ITEM_data(it) + it->nbytes - 1) = '\n';
1158
1159 ret = store_item(it, c->cmd, c);
1160
1161 #ifdef ENABLE_DTRACE
1162 uint64_t cas = ITEM_get_cas(it);
1163 switch (c->cmd) {
1164 case NREAD_ADD:
1165 MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
1166 (ret == STORED) ? it->nbytes : -1, cas);
1167 break;
1168 case NREAD_REPLACE:
1169 MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
1170 (ret == STORED) ? it->nbytes : -1, cas);
1171 break;
1172 case NREAD_APPEND:
1173 MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
1174 (ret == STORED) ? it->nbytes : -1, cas);
1175 break;
1176 case NREAD_PREPEND:
1177 MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
1178 (ret == STORED) ? it->nbytes : -1, cas);
1179 break;
1180 case NREAD_SET:
1181 MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
1182 (ret == STORED) ? it->nbytes : -1, cas);
1183 break;
1184 }
1185 #endif
1186
1187 switch (ret) {
1188 case STORED:
1189 /* Stored */
1190 write_bin_response(c, NULL, 0, 0, 0);
1191 break;
1192 case EXISTS:
1193 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1194 break;
1195 case NOT_FOUND:
1196 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1197 break;
1198 case NOT_STORED:
1199 if (c->cmd == NREAD_ADD) {
1200 eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1201 } else if(c->cmd == NREAD_REPLACE) {
1202 eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1203 } else {
1204 eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1205 }
1206 write_bin_error(c, eno, 0);
1207 default:
1208 assert(false);
1209 abort();
1210 }
1211
1212 item_remove(c->item); /* release the c->item reference */
1213 c->item = 0;
1214 }
1215
1216 static void process_bin_touch(conn *c) {
1217 item *it;
1218
1219 protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1220 char* key = binary_get_key(c);
1221 size_t nkey = c->binary_header.request.keylen;
1222 protocol_binary_request_touch *t = (void *)&c->binary_header;
1223 uint32_t exptime = ntohl(t->message.body.expiration);
1224
1225 if (settings.verbose > 1) {
1226 int ii;
1227 /* May be GAT/GATQ/etc */
1228 fprintf(stderr, "<%d TOUCH ", c->sfd);
1229 for (ii = 0; ii < nkey; ++ii) {
1230 fprintf(stderr, "%c", key[ii]);
1231 }
1232 fprintf(stderr, "\n");
1233 }
1234
1235 it = item_touch(key, nkey, realtime(exptime));
1236
1237 if (it) {
1238 /* the length has two unnecessary bytes ("\r\n") */
1239 uint16_t keylen = 0;
1240 uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1241
1242 item_update(it);
1243 pthread_mutex_lock(&c->thread->stats.mutex);
1244 c->thread->stats.touch_cmds++;
1245 c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
1246 pthread_mutex_unlock(&c->thread->stats.mutex);
1247
1248 MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey,
1249 it->nbytes, ITEM_get_cas(it));
1250
1251 if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) {
1252 bodylen -= it->nbytes - 2;
1253 } else if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1254 bodylen += nkey;
1255 keylen = nkey;
1256 }
1257
1258 add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1259 rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
1260
1261 // add the flags
1262 rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1263 add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1264
1265 if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1266 add_iov(c, ITEM_key(it), nkey);
1267 }
1268
1269 /* Add the data minus the CRLF */
1270 if (c->cmd != PROTOCOL_BINARY_CMD_TOUCH) {
1271 add_iov(c, ITEM_data(it), it->nbytes - 2);
1272 }
1273
1274 conn_set_state(c, conn_mwrite);
1275 c->write_and_go = conn_new_cmd;
1276 /* Remember this command so we can garbage collect it later */
1277 c->item = it;
1278 } else {
1279 pthread_mutex_lock(&c->thread->stats.mutex);
1280 c->thread->stats.touch_cmds++;
1281 c->thread->stats.touch_misses++;
1282 pthread_mutex_unlock(&c->thread->stats.mutex);
1283
1284 MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0);
1285
1286 if (c->noreply) {
1287 conn_set_state(c, conn_new_cmd);
1288 } else {
1289 if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1290 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1291 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1292 0, nkey, nkey);
1293 memcpy(ofs, key, nkey);
1294 add_iov(c, ofs, nkey);
1295 conn_set_state(c, conn_mwrite);
1296 c->write_and_go = conn_new_cmd;
1297 } else {
1298 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1299 }
1300 }
1301 }
1302
1303 if (settings.detail_enabled) {
1304 stats_prefix_record_get(key, nkey, NULL != it);
1305 }
1306 }
1307
1308 static void process_bin_get(conn *c) {
1309 item *it;
1310
1311 protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1312 char* key = binary_get_key(c);
1313 size_t nkey = c->binary_header.request.keylen;
1314
1315 if (settings.verbose > 1) {
1316 int ii;
1317 fprintf(stderr, "<%d GET ", c->sfd);
1318 for (ii = 0; ii < nkey; ++ii) {
1319 fprintf(stderr, "%c", key[ii]);
1320 }
1321 fprintf(stderr, "\n");
1322 }
1323
1324 it = item_get(key, nkey);
1325 if (it) {
1326 /* the length has two unnecessary bytes ("\r\n") */
1327 uint16_t keylen = 0;
1328 uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1329
1330 item_update(it);
1331 pthread_mutex_lock(&c->thread->stats.mutex);
1332 c->thread->stats.get_cmds++;
1333 c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
1334 pthread_mutex_unlock(&c->thread->stats.mutex);
1335
1336 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
1337 it->nbytes, ITEM_get_cas(it));
1338
1339 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1340 bodylen += nkey;
1341 keylen = nkey;
1342 }
1343 add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1344 rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
1345
1346 // add the flags
1347 rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1348 add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1349
1350 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1351 add_iov(c, ITEM_key(it), nkey);
1352 }
1353
1354 /* Add the data minus the CRLF */
1355 add_iov(c, ITEM_data(it), it->nbytes - 2);
1356 conn_set_state(c, conn_mwrite);
1357 c->write_and_go = conn_new_cmd;
1358 /* Remember this command so we can garbage collect it later */
1359 c->item = it;
1360 } else {
1361 pthread_mutex_lock(&c->thread->stats.mutex);
1362 c->thread->stats.get_cmds++;
1363 c->thread->stats.get_misses++;
1364 pthread_mutex_unlock(&c->thread->stats.mutex);
1365
1366 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1367
1368 if (c->noreply) {
1369 conn_set_state(c, conn_new_cmd);
1370 } else {
1371 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1372 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1373 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1374 0, nkey, nkey);
1375 memcpy(ofs, key, nkey);
1376 add_iov(c, ofs, nkey);
1377 conn_set_state(c, conn_mwrite);
1378 c->write_and_go = conn_new_cmd;
1379 } else {
1380 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1381 }
1382 }
1383 }
1384
1385 if (settings.detail_enabled) {
1386 stats_prefix_record_get(key, nkey, NULL != it);
1387 }
1388 }
1389
1390 #ifndef __INTEL_COMPILER
1391 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
1392 #endif
1393 static void append_bin_stats(const char *key, const uint16_t klen,
1394 const char *val, const uint32_t vlen,
1395 conn *c) {
1396 char *buf = c->stats.buffer + c->stats.offset;
1397 uint32_t bodylen = klen + vlen;
1398 protocol_binary_response_header header = {
1399 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
1400 .response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_STAT,
1401 .response.keylen = (uint16_t)htons(klen),
1402 .response.extlen = (uint8_t)0,
1403 .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1404 .response.status = (uint16_t)0,
1405 .response.bodylen = htonl(bodylen),
1406 .response.opaque = c->opaque,
1407 .response.cas = (uint64_t)0
1408 };
1409
1410 memcpy(buf, header.bytes, sizeof(header.response));
1411 buf += sizeof(header.response);
1412
1413 if (klen > 0) {
1414 memcpy(buf, key, klen);
1415 buf += klen;
1416
1417 if (vlen > 0) {
1418 memcpy(buf, val, vlen);
1419 }
1420 }
1421
1422 c->stats.offset += sizeof(header.response) + bodylen;
1423 }
1424
1425 static void append_ascii_stats(const char *key, const uint16_t klen,
1426 const char *val, const uint32_t vlen,
1427 conn *c) {
1428 char *pos = c->stats.buffer + c->stats.offset;
1429 uint32_t nbytes = 0;
1430 int remaining = c->stats.size - c->stats.offset;
1431 int room = remaining - 1;
1432
1433 if (klen == 0 && vlen == 0) {
1434 nbytes = snprintf(pos, room, "END\r\n");
1435 } else if (vlen == 0) {
1436 nbytes = snprintf(pos, room, "STAT %s\r\n", key);
1437 } else {
1438 nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
1439 }
1440
1441 c->stats.offset += nbytes;
1442 }
1443
1444 static bool grow_stats_buf(conn *c, size_t needed) {
1445 size_t nsize = c->stats.size;
1446 size_t available = nsize - c->stats.offset;
1447 bool rv = true;
1448
1449 /* Special case: No buffer -- need to allocate fresh */
1450 if (c->stats.buffer == NULL) {
1451 nsize = 1024;
1452 available = c->stats.size = c->stats.offset = 0;
1453 }
1454
1455 while (needed > available) {
1456 assert(nsize > 0);
1457 nsize = nsize << 1;
1458 available = nsize - c->stats.offset;
1459 }
1460
1461 if (nsize != c->stats.size) {
1462 char *ptr = realloc(c->stats.buffer, nsize);
1463 if (ptr) {
1464 c->stats.buffer = ptr;
1465 c->stats.size = nsize;
1466 } else {
1467 rv = false;
1468 }
1469 }
1470
1471 return rv;
1472 }
1473
1474 static void append_stats(const char *key, const uint16_t klen,
1475 const char *val, const uint32_t vlen,
1476 const void *cookie)
1477 {
1478 /* value without a key is invalid */
1479 if (klen == 0 && vlen > 0) {
1480 return ;
1481 }
1482
1483 conn *c = (conn*)cookie;
1484
1485 if (c->protocol == binary_prot) {
1486 size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1487 if (!grow_stats_buf(c, needed)) {
1488 return ;
1489 }
1490 append_bin_stats(key, klen, val, vlen, c);
1491 } else {
1492 size_t needed = vlen + klen + 10; // 10 == "STAT = \r\n"
1493 if (!grow_stats_buf(c, needed)) {
1494 return ;
1495 }
1496 append_ascii_stats(key, klen, val, vlen, c);
1497 }
1498
1499 assert(c->stats.offset <= c->stats.size);
1500 }
1501
1502 static void process_bin_stat(conn *c) {
1503 char *subcommand = binary_get_key(c);
1504 size_t nkey = c->binary_header.request.keylen;
1505
1506 if (settings.verbose > 1) {
1507 int ii;
1508 fprintf(stderr, "<%d STATS ", c->sfd);
1509 for (ii = 0; ii < nkey; ++ii) {
1510 fprintf(stderr, "%c", subcommand[ii]);
1511 }
1512 fprintf(stderr, "\n");
1513 }
1514
1515 if (nkey == 0) {
1516 /* request all statistics */
1517 server_stats(&append_stats, c);
1518 (void)get_stats(NULL, 0, &append_stats, c);
1519 } else if (strncmp(subcommand, "reset", 5) == 0) {
1520 stats_reset();
1521 } else if (strncmp(subcommand, "settings", 8) == 0) {
1522 process_stat_settings(&append_stats, c);
1523 } else if (strncmp(subcommand, "detail", 6) == 0) {
1524 char *subcmd_pos = subcommand + 6;
1525 if (strncmp(subcmd_pos, " dump", 5) == 0) {
1526 int len;
1527 char *dump_buf = stats_prefix_dump(&len);
1528 if (dump_buf == NULL || len <= 0) {
1529 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1530 return ;
1531 } else {
1532 append_stats("detailed", strlen("detailed"), dump_buf, len, c);
1533 free(dump_buf);
1534 }
1535 } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1536 settings.detail_enabled = 1;
1537 } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1538 settings.detail_enabled = 0;
1539 } else {
1540 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1541 return;
1542 }
1543 } else {
1544 if (get_stats(subcommand, nkey, &append_stats, c)) {
1545 if (c->stats.buffer == NULL) {
1546 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1547 } else {
1548 write_and_free(c, c->stats.buffer, c->stats.offset);
1549 c->stats.buffer = NULL;
1550 }
1551 } else {
1552 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1553 }
1554
1555 return;
1556 }
1557
1558 /* Append termination package and start the transfer */
1559 append_stats(NULL, 0, NULL, 0, c);
1560 if (c->stats.buffer == NULL) {
1561 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1562 } else {
1563 write_and_free(c, c->stats.buffer, c->stats.offset);
1564 c->stats.buffer = NULL;
1565 }
1566 }
1567
1568 static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1569 assert(c);
1570 c->substate = next_substate;
1571 c->rlbytes = c->keylen + extra;
1572
1573 /* Ok... do we have room for the extras and the key in the input buffer? */
1574 ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1575 if (c->rlbytes > c->rsize - offset) {
1576 size_t nsize = c->rsize;
1577 size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
1578
1579 while (size > nsize) {
1580 nsize *= 2;
1581 }
1582
1583 if (nsize != c->rsize) {
1584 if (settings.verbose > 1) {
1585 fprintf(stderr, "%d: Need to grow buffer from %lu to %lu\n",
1586 c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1587 }
1588 char *newm = realloc(c->rbuf, nsize);
1589 if (newm == NULL) {
1590 if (settings.verbose) {
1591 fprintf(stderr, "%d: Failed to grow buffer.. closing connection\n",
1592 c->sfd);
1593 }
1594 conn_set_state(c, conn_closing);
1595 return;
1596 }
1597
1598 c->rbuf= newm;
1599 /* rcurr should point to the same offset in the packet */
1600 c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1601 c->rsize = nsize;
1602 }
1603 if (c->rbuf != c->rcurr) {
1604 memmove(c->rbuf, c->rcurr, c->rbytes);
1605 c->rcurr = c->rbuf;
1606 if (settings.verbose > 1) {
1607 fprintf(stderr, "%d: Repack input buffer\n", c->sfd);
1608 }
1609 }
1610 }
1611
1612 /* preserve the header in the buffer.. */
1613 c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1614 conn_set_state(c, conn_nread);
1615 }
1616
1617 /* Just write an error message and disconnect the client */
1618 static void handle_binary_protocol_error(conn *c) {
1619 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1620 if (settings.verbose) {
1621 fprintf(stderr, "Protocol error (opcode %02x), close connection %d\n",
1622 c->binary_header.request.opcode, c->sfd);
1623 }
1624 c->write_and_go = conn_closing;
1625 }
1626
1627 static void init_sasl_conn(conn *c) {
1628 assert(c);
1629 /* should something else be returned? */
1630 if (!settings.sasl)
1631 return;
1632
1633 if (!c->sasl_conn) {
1634 int result=sasl_server_new("memcached",
1635 NULL,
1636 my_sasl_hostname[0] ? my_sasl_hostname : NULL,
1637 NULL, NULL,
1638 NULL, 0, &c->sasl_conn);
1639 if (result != SASL_OK) {
1640 if (settings.verbose) {
1641 fprintf(stderr, "Failed to initialize SASL conn.\n");
1642 }
1643 c->sasl_conn = NULL;
1644 }
1645 }
1646 }
1647
1648 static void bin_list_sasl_mechs(conn *c) {
1649 // Guard against a disabled SASL.
1650 if (!settings.sasl) {
1651 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
1652 c->binary_header.request.bodylen
1653 - c->binary_header.request.keylen);
1654 return;
1655 }
1656
1657 init_sasl_conn(c);
1658 const char *result_string = NULL;
1659 unsigned int string_length = 0;
1660 int result=sasl_listmech(c->sasl_conn, NULL,
1661 "", /* What to prepend the string with */
1662 " ", /* What to separate mechanisms with */
1663 "", /* What to append to the string */
1664 &result_string, &string_length,
1665 NULL);
1666 if (result != SASL_OK) {
1667 /* Perhaps there's a better error for this... */
1668 if (settings.verbose) {
1669 fprintf(stderr, "Failed to list SASL mechanisms.\n");
1670 }
1671 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1672 return;
1673 }
1674 write_bin_response(c, (char*)result_string, 0, 0, string_length);
1675 }
1676
1677 static void process_bin_sasl_auth(conn *c) {
1678 // Guard for handling disabled SASL on the server.
1679 if (!settings.sasl) {
1680 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
1681 c->binary_header.request.bodylen
1682 - c->binary_header.request.keylen);
1683 return;
1684 }
1685
1686 assert(c->binary_header.request.extlen == 0);
1687
1688 int nkey = c->binary_header.request.keylen;
1689 int vlen = c->binary_header.request.bodylen - nkey;
1690
1691 if (nkey > MAX_SASL_MECH_LEN) {
1692 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
1693 c->write_and_go = conn_swallow;
1694 return;
1695 }
1696
1697 char *key = binary_get_key(c);
1698 assert(key);
1699
1700 item *it = item_alloc(key, nkey, 0, 0, vlen);
1701
1702 if (it == 0) {
1703 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1704 c->write_and_go = conn_swallow;
1705 return;
1706 }
1707
1708 c->item = it;
1709 c->ritem = ITEM_data(it);
1710 c->rlbytes = vlen;
1711 conn_set_state(c, conn_nread);
1712 c->substate = bin_reading_sasl_auth_data;
1713 }
1714
1715 static void process_bin_complete_sasl_auth(conn *c) {
1716 assert(settings.sasl);
1717 const char *out = NULL;
1718 unsigned int outlen = 0;
1719
1720 assert(c->item);
1721 init_sasl_conn(c);
1722
1723 int nkey = c->binary_header.request.keylen;
1724 int vlen = c->binary_header.request.bodylen - nkey;
1725
1726 char mech[nkey+1];
1727 memcpy(mech, ITEM_key((item*)c->item), nkey);
1728 mech[nkey] = 0x00;
1729
1730 if (settings.verbose)
1731 fprintf(stderr, "mech: ``%s'' with %d bytes of data\n", mech, vlen);
1732
1733 const char *challenge = vlen == 0 ? NULL : ITEM_data((item*) c->item);
1734
1735 int result=-1;
1736
1737 switch (c->cmd) {
1738 case PROTOCOL_BINARY_CMD_SASL_AUTH:
1739 result = sasl_server_start(c->sasl_conn, mech,
1740 challenge, vlen,
1741 &out, &outlen);
1742 break;
1743 case PROTOCOL_BINARY_CMD_SASL_STEP:
1744 result = sasl_server_step(c->sasl_conn,
1745 challenge, vlen,
1746 &out, &outlen);
1747 break;
1748 default:
1749 assert(false); /* CMD should be one of the above */
1750 /* This code is pretty much impossible, but makes the compiler
1751 happier */
1752 if (settings.verbose) {
1753 fprintf(stderr, "Unhandled command %d with challenge %s\n",
1754 c->cmd, challenge);
1755 }
1756 break;
1757 }
1758
1759 item_unlink(c->item);
1760
1761 if (settings.verbose) {
1762 fprintf(stderr, "sasl result code: %d\n", result);
1763 }
1764
1765 switch(result) {
1766 case SASL_OK:
1767 write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated"));
1768 pthread_mutex_lock(&c->thread->stats.mutex);
1769 c->thread->stats.auth_cmds++;
1770 pthread_mutex_unlock(&c->thread->stats.mutex);
1771 break;
1772 case SASL_CONTINUE:
1773 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
1774 if(outlen > 0) {
1775 add_iov(c, out, outlen);
1776 }
1777 conn_set_state(c, conn_mwrite);
1778 c->write_and_go = conn_new_cmd;
1779 break;
1780 default:
1781 if (settings.verbose)
1782 fprintf(stderr, "Unknown sasl response: %d\n", result);
1783 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1784 pthread_mutex_lock(&c->thread->stats.mutex);
1785 c->thread->stats.auth_cmds++;
1786 c->thread->stats.auth_errors++;
1787 pthread_mutex_unlock(&c->thread->stats.mutex);
1788 }
1789 }
1790
1791 static bool authenticated(conn *c) {
1792 assert(settings.sasl);
1793 bool rv = false;
1794
1795 switch (c->cmd) {
1796 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
1797 case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */
1798 case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */
1799 case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */
1800 rv = true;
1801 break;
1802 default:
1803 if (c->sasl_conn) {
1804 const void *uname = NULL;
1805 sasl_getprop(c->sasl_conn, SASL_USERNAME, &uname);
1806 rv = uname != NULL;
1807 }
1808 }
1809
1810 if (settings.verbose > 1) {
1811 fprintf(stderr, "authenticated() in cmd 0x%02x is %s\n",
1812 c->cmd, rv ? "true" : "false");
1813 }
1814
1815 return rv;
1816 }
1817
1818 static void dispatch_bin_command(conn *c) {
1819 int protocol_error = 0;
1820
1821 int extlen = c->binary_header.request.extlen;
1822 int keylen = c->binary_header.request.keylen;
1823 uint32_t bodylen = c->binary_header.request.bodylen;
1824
1825 if (settings.sasl && !authenticated(c)) {
1826 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1827 c->write_and_go = conn_closing;
1828 return;
1829 }
1830
1831 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
1832 c->noreply = true;
1833
1834 /* binprot supports 16bit keys, but internals are still 8bit */
1835 if (keylen > KEY_MAX_LENGTH) {
1836 handle_binary_protocol_error(c);
1837 return;
1838 }
1839
1840 switch (c->cmd) {
1841 case PROTOCOL_BINARY_CMD_SETQ:
1842 c->cmd = PROTOCOL_BINARY_CMD_SET;
1843 break;
1844 case PROTOCOL_BINARY_CMD_ADDQ:
1845 c->cmd = PROTOCOL_BINARY_CMD_ADD;
1846 break;
1847 case PROTOCOL_BINARY_CMD_REPLACEQ:
1848 c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
1849 break;
1850 case PROTOCOL_BINARY_CMD_DELETEQ:
1851 c->cmd = PROTOCOL_BINARY_CMD_DELETE;
1852 break;
1853 case PROTOCOL_BINARY_CMD_INCREMENTQ:
1854 c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
1855 break;
1856 case PROTOCOL_BINARY_CMD_DECREMENTQ:
1857 c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
1858 break;
1859 case PROTOCOL_BINARY_CMD_QUITQ:
1860 c->cmd = PROTOCOL_BINARY_CMD_QUIT;
1861 break;
1862 case PROTOCOL_BINARY_CMD_FLUSHQ:
1863 c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
1864 break;
1865 case PROTOCOL_BINARY_CMD_APPENDQ:
1866 c->cmd = PROTOCOL_BINARY_CMD_APPEND;
1867 break;
1868 case PROTOCOL_BINARY_CMD_PREPENDQ:
1869 c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
1870 break;
1871 case PROTOCOL_BINARY_CMD_GETQ:
1872 c->cmd = PROTOCOL_BINARY_CMD_GET;
1873 break;
1874 case PROTOCOL_BINARY_CMD_GETKQ:
1875 c->cmd = PROTOCOL_BINARY_CMD_GETK;
1876 break;
1877 case PROTOCOL_BINARY_CMD_GATQ:
1878 c->cmd = PROTOCOL_BINARY_CMD_GAT;
1879 break;
1880 case PROTOCOL_BINARY_CMD_GATKQ:
1881 c->cmd = PROTOCOL_BINARY_CMD_GAT;
1882 break;
1883 default:
1884 c->noreply = false;
1885 }
1886
1887 switch (c->cmd) {
1888 case PROTOCOL_BINARY_CMD_VERSION:
1889 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1890 write_bin_response(c, RVERSION, 0, 0, strlen(RVERSION));
1891 } else {
1892 protocol_error = 1;
1893 }
1894 break;
1895 case PROTOCOL_BINARY_CMD_FLUSH:
1896 if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
1897 bin_read_key(c, bin_read_flush_exptime, extlen);
1898 } else {
1899 protocol_error = 1;
1900 }
1901 break;
1902 case PROTOCOL_BINARY_CMD_NOOP:
1903 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1904 write_bin_response(c, NULL, 0, 0, 0);
1905 } else {
1906 protocol_error = 1;
1907 }
1908 break;
1909 case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
1910 case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
1911 case PROTOCOL_BINARY_CMD_REPLACE:
1912 if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
1913 bin_read_key(c, bin_reading_set_header, 8);
1914 } else {
1915 protocol_error = 1;
1916 }
1917 break;
1918 case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */
1919 case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */
1920 case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
1921 case PROTOCOL_BINARY_CMD_GETK:
1922 if (extlen == 0 && bodylen == keylen && keylen > 0) {
1923 bin_read_key(c, bin_reading_get_key, 0);
1924 } else {
1925 protocol_error = 1;
1926 }
1927 break;
1928 case PROTOCOL_BINARY_CMD_DELETE:
1929 if (keylen > 0 && extlen == 0 && bodylen == keylen) {
1930 bin_read_key(c, bin_reading_del_header, extlen);
1931 } else {
1932 protocol_error = 1;
1933 }
1934 break;
1935 case PROTOCOL_BINARY_CMD_INCREMENT:
1936 case PROTOCOL_BINARY_CMD_DECREMENT:
1937 if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
1938 bin_read_key(c, bin_reading_incr_header, 20);
1939 } else {
1940 protocol_error = 1;
1941 }
1942 break;
1943 case PROTOCOL_BINARY_CMD_APPEND:
1944 case PROTOCOL_BINARY_CMD_PREPEND:
1945 if (keylen > 0 && extlen == 0) {
1946 bin_read_key(c, bin_reading_set_header, 0);
1947 } else {
1948 protocol_error = 1;
1949 }
1950 break;
1951 case PROTOCOL_BINARY_CMD_STAT:
1952 if (extlen == 0) {
1953 bin_read_key(c, bin_reading_stat, 0);
1954 } else {
1955 protocol_error = 1;
1956 }
1957 break;
1958 case PROTOCOL_BINARY_CMD_QUIT:
1959 if (keylen == 0 && extlen == 0 && bodylen == 0) {
1960 write_bin_response(c, NULL, 0, 0, 0);
1961 c->write_and_go = conn_closing;
1962 if (c->noreply) {
1963 conn_set_state(c, conn_closing);
1964 }
1965 } else {
1966 protocol_error = 1;
1967 }
1968 break;
1969 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
1970 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1971 bin_list_sasl_mechs(c);
1972 } else {
1973 protocol_error = 1;
1974 }
1975 break;
1976 case PROTOCOL_BINARY_CMD_SASL_AUTH:
1977 case PROTOCOL_BINARY_CMD_SASL_STEP:
1978 if (extlen == 0 && keylen != 0) {
1979 bin_read_key(c, bin_reading_sasl_auth, 0);
1980 } else {
1981 protocol_error = 1;
1982 }
1983 break;
1984 case PROTOCOL_BINARY_CMD_TOUCH:
1985 case PROTOCOL_BINARY_CMD_GAT:
1986 case PROTOCOL_BINARY_CMD_GATQ:
1987 case PROTOCOL_BINARY_CMD_GATK:
1988 case PROTOCOL_BINARY_CMD_GATKQ:
1989 if (extlen == 4 && keylen != 0) {
1990 bin_read_key(c, bin_reading_touch_key, 4);
1991 } else {
1992 protocol_error = 1;
1993 }
1994 break;
1995 default:
1996 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
1997 }
1998
1999 if (protocol_error)
2000 handle_binary_protocol_error(c);
2001 }
2002
2003 static void process_bin_update(conn *c) {
2004 char *key;
2005 int nkey;
2006 int vlen;
2007 item *it;
2008 protocol_binary_request_set* req = binary_get_request(c);
2009
2010 assert(c != NULL);
2011
2012 key = binary_get_key(c);
2013 nkey = c->binary_header.request.keylen;
2014
2015 /* fix byteorder in the request */
2016 req->message.body.flags = ntohl(req->message.body.flags);
2017 req->message.body.expiration = ntohl(req->message.body.expiration);
2018
2019 vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
2020
2021 if (settings.verbose > 1) {
2022 int ii;
2023 if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
2024 fprintf(stderr, "<%d ADD ", c->sfd);
2025 } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
2026 fprintf(stderr, "<%d SET ", c->sfd);
2027 } else {
2028 fprintf(stderr, "<%d REPLACE ", c->sfd);
2029 }
2030 for (ii = 0; ii < nkey; ++ii) {
2031 fprintf(stderr, "%c", key[ii]);
2032 }
2033
2034 fprintf(stderr, " Value len is %d", vlen);
2035 fprintf(stderr, "\n");
2036 }
2037
2038 if (settings.detail_enabled) {
2039 stats_prefix_record_set(key, nkey);
2040 }
2041
2042 it = item_alloc(key, nkey, req->message.body.flags,
2043 realtime(req->message.body.expiration), vlen+2);
2044
2045 if (it == 0) {
2046 if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
2047 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
2048 } else {
2049 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2050 }
2051
2052 /* Avoid stale data persisting in cache because we failed alloc.
2053 * Unacceptable for SET. Anywhere else too? */
2054 if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
2055 it = item_get(key, nkey);
2056 if (it) {
2057 item_unlink(it);
2058 item_remove(it);
2059 }
2060 }
2061
2062 /* swallow the data line */
2063 c->write_and_go = conn_swallow;
2064 return;
2065 }
2066
2067 ITEM_set_cas(it, c->binary_header.request.cas);
2068
2069 switch (c->cmd) {
2070 case PROTOCOL_BINARY_CMD_ADD:
2071 c->cmd = NREAD_ADD;
2072 break;
2073 case PROTOCOL_BINARY_CMD_SET:
2074 c->cmd = NREAD_SET;
2075 break;
2076 case PROTOCOL_BINARY_CMD_REPLACE:
2077 c->cmd = NREAD_REPLACE;
2078 break;
2079 default:
2080 assert(0);
2081 }
2082
2083 if (ITEM_get_cas(it) != 0) {
2084 c->cmd = NREAD_CAS;
2085 }
2086
2087 c->item = it;
2088 c->ritem = ITEM_data(it);
2089 c->rlbytes = vlen;
2090 conn_set_state(c, conn_nread);
2091 c->substate = bin_read_set_value;
2092 }
2093
2094 static void process_bin_append_prepend(conn *c) {
2095 char *key;
2096 int nkey;
2097 int vlen;
2098 item *it;
2099
2100 assert(c != NULL);
2101
2102 key = binary_get_key(c);
2103 nkey = c->binary_header.request.keylen;
2104 vlen = c->binary_header.request.bodylen - nkey;
2105
2106 if (settings.verbose > 1) {
2107 fprintf(stderr, "Value len is %d\n", vlen);
2108 }
2109
2110 if (settings.detail_enabled) {
2111 stats_prefix_record_set(key, nkey);
2112 }
2113
2114 it = item_alloc(key, nkey, 0, 0, vlen+2);
2115
2116 if (it == 0) {
2117 if (! item_size_ok(nkey, 0, vlen + 2)) {
2118 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
2119 } else {
2120 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2121 }
2122 /* swallow the data line */
2123 c->write_and_go = conn_swallow;
2124 return;
2125 }
2126
2127 ITEM_set_cas(it, c->binary_header.request.cas);
2128
2129 switch (c->cmd) {
2130 case PROTOCOL_BINARY_CMD_APPEND:
2131 c->cmd = NREAD_APPEND;
2132 break;
2133 case PROTOCOL_BINARY_CMD_PREPEND:
2134 c->cmd = NREAD_PREPEND;
2135 break;
2136 default:
2137 assert(0);
2138 }
2139
2140 c->item = it;
2141 c->ritem = ITEM_data(it);
2142 c->rlbytes = vlen;
2143 conn_set_state(c, conn_nread);
2144 c->substate = bin_read_set_value;
2145 }
2146
2147 static void process_bin_flush(conn *c) {
2148 time_t exptime = 0;
2149 protocol_binary_request_flush* req = binary_get_request(c);
2150
2151 if (c->binary_header.request.extlen == sizeof(req->message.body)) {
2152 exptime = ntohl(req->message.body.expiration);
2153 }
2154
2155 if (exptime > 0) {
2156 settings.oldest_live = realtime(exptime) - 1;
2157 } else {
2158 settings.oldest_live = current_time - 1;
2159 }
2160 item_flush_expired();
2161
2162 pthread_mutex_lock(&c->thread->stats.mutex);
2163 c->thread->stats.flush_cmds++;
2164 pthread_mutex_unlock(&c->thread->stats.mutex);
2165
2166 write_bin_response(c, NULL, 0, 0, 0);
2167 }
2168
2169 static void process_bin_delete(conn *c) {
2170 item *it;
2171
2172 protocol_binary_request_delete* req = binary_get_request(c);
2173
2174 char* key = binary_get_key(c);
2175 size_t nkey = c->binary_header.request.keylen;
2176
2177 assert(c != NULL);
2178
2179 if (settings.verbose > 1) {
2180 fprintf(stderr, "Deleting %s\n", key);
2181 }
2182
2183 if (settings.detail_enabled) {
2184 stats_prefix_record_delete(key, nkey);
2185 }
2186
2187 it = item_get(key, nkey);
2188 if (it) {
2189 uint64_t cas = ntohll(req->message.header.request.cas);
2190 if (cas == 0 || cas == ITEM_get_cas(it)) {
2191 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
2192 pthread_mutex_lock(&c->thread->stats.mutex);
2193 c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
2194 pthread_mutex_unlock(&c->thread->stats.mutex);
2195 item_unlink(it);
2196 write_bin_response(c, NULL, 0, 0, 0);
2197 } else {
2198 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
2199 }
2200 item_remove(it); /* release our reference */
2201 } else {
2202 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
2203 pthread_mutex_lock(&c->thread->stats.mutex);
2204 c->thread->stats.delete_misses++;
2205 pthread_mutex_unlock(&c->thread->stats.mutex);
2206 }
2207 }
2208
2209 static void complete_nread_binary(conn *c) {
2210 assert(c != NULL);
2211 assert(c->cmd >= 0);
2212
2213 switch(c->substate) {
2214 case bin_reading_set_header:
2215 if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
2216 c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
2217 process_bin_append_prepend(c);
2218 } else {
2219 process_bin_update(c);
2220 }
2221 break;
2222 case bin_read_set_value:
2223 complete_update_bin(c);
2224 break;
2225 case bin_reading_get_key:
2226 process_bin_get(c);
2227 break;
2228 case bin_reading_touch_key:
2229 process_bin_touch(c);
2230 break;
2231 case bin_reading_stat:
2232 process_bin_stat(c);
2233 break;
2234 case bin_reading_del_header:
2235 process_bin_delete(c);
2236 break;
2237 case bin_reading_incr_header:
2238 complete_incr_bin(c);
2239 break;
2240 case bin_read_flush_exptime:
2241 process_bin_flush(c);
2242 break;
2243 case bin_reading_sasl_auth:
2244 process_bin_sasl_auth(c);
2245 break;
2246 case bin_reading_sasl_auth_data:
2247 process_bin_complete_sasl_auth(c);
2248 break;
2249 case bin_reading_cas_header:
2250 assert(0);
2251 case bin_no_state:
2252 assert(0);
2253 default:
2254 fprintf(stderr, "Not handling substate %d\n", c->substate);
2255 assert(0);
2256 }
2257 }
2258
2259 static void reset_cmd_handler(conn *c) {
2260 c->cmd = -1;
2261 c->substate = bin_no_state;
2262 if(c->item != NULL) {
2263 item_remove(c->item);
2264 c->item = NULL;
2265 }
2266 conn_shrink(c);
2267 if (c->rbytes > 0) {
2268 conn_set_state(c, conn_parse_cmd);
2269 } else {
2270 conn_set_state(c, conn_waiting);
2271 }
2272 }
2273
2274 static void complete_nread(conn *c) {
2275 assert(c != NULL);
2276 assert(c->protocol == ascii_prot
2277 || c->protocol == binary_prot);
2278
2279 if (c->protocol == ascii_prot) {
2280 complete_nread_ascii(c);
2281 } else if (c->protocol == binary_prot) {
2282 complete_nread_binary(c);
2283 }
2284 }
2285
2286 /*
2287 * Stores an item in the cache according to the semantics of one of the set
2288 * commands. In threaded mode, this is protected by the cache lock.
2289 *
2290 * Returns the state of storage.
2291 */
2292 enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
2293 char *key = ITEM_key(it);
2294 item *old_it = do_item_get(key, it->nkey, hv);
2295 enum store_item_type stored = NOT_STORED;
2296
2297 item *new_it = NULL;
2298 int flags;
2299
2300 if (old_it != NULL && comm == NREAD_ADD) {
2301 /* add only adds a nonexistent item, but promote to head of LRU */
2302 do_item_update(old_it);
2303 } else if (!old_it && (comm == NREAD_REPLACE
2304 || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2305 {
2306 /* replace only replaces an existing value; don't store */
2307 } else if (comm == NREAD_CAS) {
2308 /* validate cas operation */
2309 if(old_it == NULL) {
2310 // LRU expired
2311 stored = NOT_FOUND;
2312 pthread_mutex_lock(&c->thread->stats.mutex);
2313 c->thread->stats.cas_misses++;
2314 pthread_mutex_unlock(&c->thread->stats.mutex);
2315 }
2316 else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
2317 // cas validates
2318 // it and old_it may belong to different classes.
2319 // I'm updating the stats for the one that's getting pushed out
2320 pthread_mutex_lock(&c->thread->stats.mutex);
2321 c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
2322 pthread_mutex_unlock(&c->thread->stats.mutex);
2323
2324 item_replace(old_it, it, hv);
2325 stored = STORED;
2326 } else {
2327 pthread_mutex_lock(&c->thread->stats.mutex);
2328 c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
2329 pthread_mutex_unlock(&c->thread->stats.mutex);
2330
2331 if(settings.verbose > 1) {
2332 fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
2333 (unsigned long long)ITEM_get_cas(old_it),
2334 (unsigned long long)ITEM_get_cas(it));
2335 }
2336 stored = EXISTS;
2337 }
2338 } else {
2339 /*
2340 * Append - combine new and old record into single one. Here it's
2341 * atomic and thread-safe.
2342 */
2343 if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2344 /*
2345 * Validate CAS
2346 */
2347 if (ITEM_get_cas(it) != 0) {
2348 // CAS much be equal
2349 if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2350 stored = EXISTS;
2351 }
2352 }
2353
2354 if (stored == NOT_STORED) {
2355 /* we have it and old_it here - alloc memory to hold both */
2356 /* flags was already lost - so recover them from ITEM_suffix(it) */
2357
2358 flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
2359
2360 new_it = item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
2361
2362 if (new_it == NULL) {
2363 /* SERVER_ERROR out of memory */
2364 if (old_it != NULL)
2365 do_item_remove(old_it);
2366
2367 return NOT_STORED;
2368 }
2369
2370 /* copy data from it and old_it to new_it */
2371
2372 if (comm == NREAD_APPEND) {
2373 memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
2374 memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
2375 } else {
2376 /* NREAD_PREPEND */
2377 memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
2378 memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
2379 }
2380
2381 it = new_it;
2382 }
2383 }
2384
2385 if (stored == NOT_STORED) {
2386 if (old_it != NULL)
2387 item_replace(old_it, it, hv);
2388 else
2389 do_item_link(it, hv);
2390
2391 c->cas = ITEM_get_cas(it);
2392
2393 stored = STORED;
2394 }
2395 }
2396
2397 if (old_it != NULL)
2398 do_item_remove(old_it); /* release our reference */
2399 if (new_it != NULL)
2400 do_item_remove(new_it);
2401
2402 if (stored == STORED) {
2403 c->cas = ITEM_get_cas(it);
2404 }
2405
2406 return stored;
2407 }
2408
2409 typedef struct token_s {
2410 char *value;
2411 size_t length;
2412 } token_t;
2413
2414 #define COMMAND_TOKEN 0
2415 #define SUBCOMMAND_TOKEN 1
2416 #define KEY_TOKEN 1
2417
2418 #define MAX_TOKENS 8
2419
2420 /*
2421 * Tokenize the command string by replacing whitespace with '\0' and update
2422 * the token array tokens with pointer to start of each token and length.
2423 * Returns total number of tokens. The last valid token is the terminal
2424 * token (value points to the first unprocessed character of the string and
2425 * length zero).
2426 *
2427 * Usage example:
2428 *
2429 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2430 * for(int ix = 0; tokens[ix].length != 0; ix++) {
2431 * ...
2432 * }
2433 * ncommand = tokens[ix].value - command;
2434 * command = tokens[ix].value;
2435 * }
2436 */
2437 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
2438 char *s, *e;
2439 size_t ntokens = 0;
2440 size_t len = strlen(command);
2441 unsigned int i = 0;
2442
2443 assert(command != NULL && tokens != NULL && max_tokens > 1);
2444
2445 s = e = command;
2446 for (i = 0; i < len; i++) {
2447 if (*e == ' ') {
2448 if (s != e) {
2449 tokens[ntokens].value = s;
2450 tokens[ntokens].length = e - s;
2451 ntokens++;
2452 *e = '\0';
2453 if (ntokens == max_tokens - 1) {
2454 e++;
2455 s = e; /* so we don't add an extra token */
2456 break;
2457 }
2458 }
2459 s = e + 1;
2460 }
2461 e++;
2462 }
2463
2464 if (s != e) {
2465 tokens[ntokens].value = s;
2466 tokens[ntokens].length = e - s;
2467 ntokens++;
2468 }
2469
2470 /*
2471 * If we scanned the whole string, the terminal value pointer is null,
2472 * otherwise it is the first unprocessed character.
2473 */
2474 tokens[ntokens].value = *e == '\0' ? NULL : e;
2475 tokens[ntokens].length = 0;
2476 ntokens++;
2477
2478 return ntokens;
2479 }
2480
2481 /* set up a connection to write a buffer then free it, used for stats */
2482 static void write_and_free(conn *c, char *buf, int bytes) {
2483 if (buf) {
2484 c->write_and_free = buf;
2485 c->wcurr = buf;
2486 c->wbytes = bytes;
2487 conn_set_state(c, conn_write);
2488 c->write_and_go = conn_new_cmd;
2489 } else {
2490 out_string(c, "SERVER_ERROR out of memory writing stats");
2491 }
2492 }
2493
2494 static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
2495 {
2496 int noreply_index = ntokens - 2;
2497
2498 /*
2499 NOTE: this function is not the first place where we are going to
2500 send the reply. We could send it instead from process_command()
2501 if the request line has wrong number of tokens. However parsing
2502 malformed line for "noreply" option is not reliable anyway, so
2503 it can't be helped.
2504 */
2505 if (tokens[noreply_index].value
2506 && strcmp(tokens[noreply_index].value, "noreply") == 0) {
2507 c->noreply = true;
2508 }
2509 return c->noreply;
2510 }
2511
2512 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
2513 const char *fmt, ...) {
2514 char val_str[STAT_VAL_LEN];
2515 int vlen;
2516 va_list ap;
2517
2518 assert(name);
2519 assert(add_stats);
2520 assert(c);
2521 assert(fmt);
2522
2523 va_start(ap, fmt);
2524 vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
2525 va_end(ap);
2526
2527 add_stats(name, strlen(name), val_str, vlen, c);
2528 }
2529
2530 inline static void process_stats_detail(conn *c, const char *command) {
2531 assert(c != NULL);
2532
2533 if (strcmp(command, "on") == 0) {
2534 settings.detail_enabled = 1;
2535 out_string(c, "OK");
2536 }
2537 else if (strcmp(command, "off") == 0) {
2538 settings.detail_enabled = 0;
2539 out_string(c, "OK");
2540 }
2541 else if (strcmp(command, "dump") == 0) {
2542 int len;
2543 char *stats = stats_prefix_dump(&len);
2544 write_and_free(c, stats, len);
2545 }
2546 else {
2547 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
2548 }
2549 }
2550
2551 /* return server specific stats only */
2552 static void server_stats(ADD_STAT add_stats, conn *c) {
2553 pid_t pid = getpid();
2554 rel_time_t now = current_time;
2555
2556 struct thread_stats thread_stats;
2557 threadlocal_stats_aggregate(&thread_stats);
2558 struct slab_stats slab_stats;
2559 slab_stats_aggregate(&thread_stats, &slab_stats);
2560
2561 #ifndef WIN32
2562 struct rusage usage;
2563 getrusage(RUSAGE_SELF, &usage);
2564 #endif /* !WIN32 */
2565
2566 STATS_LOCK();
2567
2568 APPEND_STAT("pid", "%lu", (long)pid);
2569 APPEND_STAT("uptime", "%u", now);
2570 APPEND_STAT("time", "%ld", now + (long)process_started);
2571 APPEND_STAT("version", "%s", RVERSION);
2572 APPEND_STAT("libevent", "%s", event_get_version());
2573 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2574
2575 #ifndef WIN32
2576 append_stat("rusage_user", add_stats, c, "%ld.%06ld",
2577 (long)usage.ru_utime.tv_sec,
2578 (long)usage.ru_utime.tv_usec);
2579 append_stat("rusage_system", add_stats, c, "%ld.%06ld",
2580 (long)usage.ru_stime.tv_sec,
2581 (long)usage.ru_stime.tv_usec);
2582 #endif /* !WIN32 */
2583
2584 APPEND_STAT("curr_connections", "%u", stats.curr_conns - 1);
2585 APPEND_STAT("total_connections", "%u", stats.total_conns);
2586 if (settings.maxconns_fast) {
2587 APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats.rejected_conns);
2588 }
2589 APPEND_STAT("connection_structures", "%u", stats.conn_structs);
2590 APPEND_STAT("reserved_fds", "%u", stats.reserved_fds);
2591 APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
2592 APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
2593 APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
2594 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds);
2595 APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
2596 APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
2597 APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
2598 APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
2599 APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
2600 APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
2601 APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
2602 APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
2603 APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
2604 APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
2605 APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
2606 APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits);
2607 APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses);
2608 APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds);
2609 APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors);
2610 APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
2611 APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
2612 APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
2613 APPEND_STAT("accepting_conns", "%u", stats.accepting_conns);
2614 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
2615 APPEND_STAT("threads", "%d", settings.num_threads);
2616 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
2617 APPEND_STAT("hash_power_level", "%u", stats.hash_power_level);
2618 APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats.hash_bytes);
2619 APPEND_STAT("hash_is_expanding", "%u", stats.hash_is_expanding);
2620 APPEND_STAT("expired_unfetched", "%llu", stats.expired_unfetched);
2621 APPEND_STAT("evicted_unfetched", "%llu", stats.evicted_unfetched);
2622 if (settings.slab_reassign) {
2623 APPEND_STAT("slab_reassign_running", "%u", stats.slab_reassign_running);
2624 APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved);
2625 }
2626 STATS_UNLOCK();
2627 }
2628
2629 static void process_stat_settings(ADD_STAT add_stats, void *c) {
2630 assert(add_stats);
2631 APPEND_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
2632 APPEND_STAT("maxconns", "%d", settings.maxconns);
2633 APPEND_STAT("tcpport", "%d", settings.port);
2634 APPEND_STAT("udpport", "%d", settings.udpport);
2635 APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
2636 APPEND_STAT("verbosity", "%d", settings.verbose);
2637 APPEND_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
2638 APPEND_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
2639 APPEND_STAT("domain_socket", "%s",
2640 settings.socketpath ? settings.socketpath : "NULL");
2641 APPEND_STAT("umask", "%o", settings.access);
2642 APPEND_STAT("growth_factor", "%.2f", settings.factor);
2643 APPEND_STAT("chunk_size", "%d", settings.chunk_size);
2644 APPEND_STAT("num_threads", "%d", settings.num_threads);
2645 APPEND_STAT("num_threads_per_udp", "%d", settings.num_threads_per_udp);
2646 APPEND_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
2647 APPEND_STAT("detail_enabled", "%s",
2648 settings.detail_enabled ? "yes" : "no");
2649 APPEND_STAT("reqs_per_event", "%d", settings.reqs_per_event);
2650 APPEND_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
2651 APPEND_STAT("tcp_backlog", "%d", settings.backlog);
2652 APPEND_STAT("binding_protocol", "%s",
2653 prot_text(settings.binding_protocol));
2654 APPEND_STAT("auth_enabled_sasl", "%s", settings.sasl ? "yes" : "no");
2655 APPEND_STAT("item_size_max", "%d", settings.item_size_max);
2656 APPEND_STAT("maxconns_fast", "%s", settings.maxconns_fast ? "yes" : "no");
2657 APPEND_STAT("hashpower_init", "%d", settings.hashpower_init);
2658 APPEND_STAT("slab_reassign", "%s", settings.slab_reassign ? "yes" : "no");
2659 APPEND_STAT("slab_automove", "%s", settings.slab_automove ? "yes" : "no");
2660 }
2661
2662 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
2663 const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
2664 assert(c != NULL);
2665
2666 if (ntokens < 2) {
2667 out_string(c, "CLIENT_ERROR bad command line");
2668 return;
2669 }
2670
2671 if (ntokens == 2) {
2672 server_stats(&append_stats, c);
2673 (void)get_stats(NULL, 0, &append_stats, c);
2674 } else if (strcmp(subcommand, "reset") == 0) {
2675 stats_reset();
2676 out_string(c, "RESET");
2677 return ;
2678 } else if (strcmp(subcommand, "detail") == 0) {
2679 /* NOTE: how to tackle detail with binary? */
2680 if (ntokens < 4)
2681 process_stats_detail(c, ""); /* outputs the error message */
2682 else
2683 process_stats_detail(c, tokens[2].value);
2684 /* Output already generated */
2685 return ;
2686 } else if (strcmp(subcommand, "settings") == 0) {
2687 process_stat_settings(&append_stats, c);
2688 } else if (strcmp(subcommand, "cachedump") == 0) {
2689 char *buf;
2690 unsigned int bytes, id, limit = 0;
2691
2692 if (ntokens < 5) {
2693 out_string(c, "CLIENT_ERROR bad command line");
2694 return;
2695 }
2696
2697 if (!safe_strtoul(tokens[2].value, &id) ||
2698 !safe_strtoul(tokens[3].value, &limit)) {
2699 out_string(c, "CLIENT_ERROR bad command line format");
2700 return;
2701 }
2702
2703 if (id >= POWER_LARGEST) {
2704 out_string(c, "CLIENT_ERROR Illegal slab id");
2705 return;
2706 }
2707
2708 buf = item_cachedump(id, limit, &bytes);
2709 write_and_free(c, buf, bytes);
2710 return ;
2711 } else {
2712 /* getting here means that the subcommand is either engine specific or
2713 is invalid. query the engine and see. */
2714 if (get_stats(subcommand, strlen(subcommand), &append_stats, c)) {
2715 if (c->stats.buffer == NULL) {
2716 out_string(c, "SERVER_ERROR out of memory writing stats");
2717 } else {
2718 write_and_free(c, c->stats.buffer, c->stats.offset);
2719 c->stats.buffer = NULL;
2720 }
2721 } else {
2722 out_string(c, "ERROR");
2723 }
2724 return ;
2725 }
2726
2727 /* append terminator and start the transfer */
2728 append_stats(NULL, 0, NULL, 0, c);
2729
2730 if (c->stats.buffer == NULL) {
2731 out_string(c, "SERVER_ERROR out of memory writing stats");
2732 } else {
2733 write_and_free(c, c->stats.buffer, c->stats.offset);
2734 c->stats.buffer = NULL;
2735 }
2736 }
2737
2738 #ifndef __INTEL_COMPILER
2739 #pragma GCC diagnostic ignored "-Wunused-but-set-parameter"
2740 #endif
2741 /* ntokens is overwritten here... shrug.. */
2742 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
2743 char *key;
2744 size_t nkey;
2745 int i = 0;
2746 item *it;
2747 token_t *key_token = &tokens[KEY_TOKEN];
2748 char *suffix;
2749 assert(c != NULL);
2750
2751 do {
2752 while(key_token->length != 0) {
2753
2754 key = key_token->value;
2755 nkey = key_token->length;
2756
2757 if(nkey > KEY_MAX_LENGTH) {
2758 out_string(c, "CLIENT_ERROR bad command line format");
2759 return;
2760 }
2761
2762 it = item_get(key, nkey);
2763 if (settings.detail_enabled) {
2764 stats_prefix_record_get(key, nkey, NULL != it);
2765 }
2766 if (it) {
2767 if (i >= c->isize) {
2768 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
2769 if (new_list) {
2770 c->isize *= 2;
2771 c->ilist = new_list;
2772 } else {
2773 item_remove(it);
2774 break;
2775 }
2776 }
2777
2778 /*
2779 * Construct the response. Each hit adds three elements to the
2780 * outgoing data list:
2781 * "VALUE "
2782 * key
2783 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
2784 */
2785
2786 if (return_cas)
2787 {
2788 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2789 it->nbytes, ITEM_get_cas(it));
2790 /* Goofy mid-flight realloc. */
2791 if (i >= c->suffixsize) {
2792 char **new_suffix_list = realloc(c->suffixlist,
2793 sizeof(char *) * c->suffixsize * 2);
2794 if (new_suffix_list) {
2795 c->suffixsize *= 2;
2796 c->suffixlist = new_suffix_list;
2797 } else {
2798 item_remove(it);
2799 break;
2800 }
2801 }
2802
2803 suffix = cache_alloc(c->thread->suffix_cache);
2804 if (suffix == NULL) {
2805 out_string(c, "SERVER_ERROR out of memory making CAS suffix");
2806 item_remove(it);
2807 return;
2808 }
2809 *(c->suffixlist + i) = suffix;
2810 int suffix_len = snprintf(suffix, SUFFIX_SIZE,
2811 " %llu\r\n",
2812 (unsigned long long)ITEM_get_cas(it));
2813 if (add_iov(c, "VALUE ", 6) != 0 ||
2814 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2815 add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
2816 add_iov(c, suffix, suffix_len) != 0 ||
2817 add_iov(c, ITEM_data(it), it->nbytes) != 0)
2818 {
2819 item_remove(it);
2820 break;
2821 }
2822 }
2823 else
2824 {
2825 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2826 it->nbytes, ITEM_get_cas(it));
2827 if (add_iov(c, "VALUE ", 6) != 0 ||
2828 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2829 add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
2830 {
2831 item_remove(it);
2832 break;
2833 }
2834 }
2835
2836
2837 if (settings.verbose > 1)
2838 fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
2839
2840 /* item_get() has incremented it->refcount for us */
2841 pthread_mutex_lock(&c->thread->stats.mutex);
2842 c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
2843 c->thread->stats.get_cmds++;
2844 pthread_mutex_unlock(&c->thread->stats.mutex);
2845 item_update(it);
2846 *(c->ilist + i) = it;
2847 i++;
2848
2849 } else {
2850 pthread_mutex_lock(&c->thread->stats.mutex);
2851 c->thread->stats.get_misses++;
2852 c->thread->stats.get_cmds++;
2853 pthread_mutex_unlock(&c->thread->stats.mutex);
2854 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
2855 }
2856
2857 key_token++;
2858 }
2859
2860 /*
2861 * If the command string hasn't been fully processed, get the next set
2862 * of tokens.
2863 */
2864 if(key_token->value != NULL) {
2865 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
2866 key_token = tokens;
2867 }
2868
2869 } while(key_token->value != NULL);
2870
2871 c->icurr = c->ilist;
2872 c->ileft = i;
2873 if (return_cas) {
2874 c->suffixcurr = c->suffixlist;
2875 c->suffixleft = i;
2876 }
2877
2878 if (settings.verbose > 1)
2879 fprintf(stderr, ">%d END\n", c->sfd);
2880
2881 /*
2882 If the loop was terminated because of out-of-memory, it is not
2883 reliable to add END\r\n to the buffer, because it might not end
2884 in \r\n. So we send SERVER_ERROR instead.
2885 */
2886 if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
2887 || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
2888 out_string(c, "SERVER_ERROR out of memory writing get response");
2889 }
2890 else {
2891 conn_set_state(c, conn_mwrite);
2892 c->msgcurr = 0;
2893 }
2894
2895 return;
2896 }
2897
2898 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2899 char *key;
2900 size_t nkey;
2901 unsigned int flags;
2902 int32_t exptime_int = 0;
2903 time_t exptime;
2904 int vlen;
2905 uint64_t req_cas_id=0;
2906 item *it;
2907
2908 assert(c != NULL);
2909
2910 set_noreply_maybe(c, tokens, ntokens);
2911
2912 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2913 out_string(c, "CLIENT_ERROR bad command line format");
2914 return;
2915 }
2916
2917 key = tokens[KEY_TOKEN].value;
2918 nkey = tokens[KEY_TOKEN].length;
2919
2920 if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
2921 && safe_strtol(tokens[3].value, &exptime_int)
2922 && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
2923 out_string(c, "CLIENT_ERROR bad command line format");
2924 return;
2925 }
2926
2927 /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2928 exptime = exptime_int;
2929
2930 /* Negative exptimes can underflow and end up immortal. realtime() will
2931 immediately expire values that are greater than REALTIME_MAXDELTA, but less
2932 than process_started, so lets aim for that. */
2933 if (exptime < 0)
2934 exptime = REALTIME_MAXDELTA + 1;
2935
2936 // does cas value exist?
2937 if (handle_cas) {
2938 if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
2939 out_string(c, "CLIENT_ERROR bad command line format");
2940 return;
2941 }
2942 }
2943
2944 vlen += 2;
2945 if (vlen < 0 || vlen - 2 < 0) {
2946 out_string(c, "CLIENT_ERROR bad command line format");
2947 return;
2948 }
2949
2950 if (settings.detail_enabled) {
2951 stats_prefix_record_set(key, nkey);
2952 }
2953
2954 it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
2955
2956 if (it == 0) {
2957 if (! item_size_ok(nkey, flags, vlen))
2958 out_string(c, "SERVER_ERROR object too large for cache");
2959 else
2960 out_string(c, "SERVER_ERROR out of memory storing object");
2961 /* swallow the data line */
2962 c->write_and_go = conn_swallow;
2963 c->sbytes = vlen;
2964
2965 /* Avoid stale data persisting in cache because we failed alloc.
2966 * Unacceptable for SET. Anywhere else too? */
2967 if (comm == NREAD_SET) {
2968 it = item_get(key, nkey);
2969 if (it) {
2970 item_unlink(it);
2971 item_remove(it);
2972 }
2973 }
2974
2975 return;
2976 }
2977 ITEM_set_cas(it, req_cas_id);
2978
2979 c->item = it;
2980 c->ritem = ITEM_data(it);
2981 c->rlbytes = it->nbytes;
2982 c->cmd = comm;
2983 conn_set_state(c, conn_nread);
2984 }
2985
2986 static void process_touch_command(conn *c, token_t *tokens, const size_t ntokens) {
2987 char *key;
2988 size_t nkey;
2989 int32_t exptime_int = 0;
2990 item *it;
2991
2992 assert(c != NULL);
2993
2994 set_noreply_maybe(c, tokens, ntokens);
2995
2996 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2997 out_string(c, "CLIENT_ERROR bad command line format");
2998 return;
2999 }
3000
3001 key = tokens[KEY_TOKEN].value;
3002 nkey = tokens[KEY_TOKEN].length;
3003
3004 if (!safe_strtol(tokens[2].value, &exptime_int)) {
3005 out_string(c, "CLIENT_ERROR invalid exptime argument");
3006 return;
3007 }
3008
3009 it = item_touch(key, nkey, realtime(exptime_int));
3010 if (it) {
3011 item_update(it);
3012 pthread_mutex_lock(&c->thread->stats.mutex);
3013 c->thread->stats.touch_cmds++;
3014 c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
3015 pthread_mutex_unlock(&c->thread->stats.mutex);
3016
3017 out_string(c, "TOUCHED");
3018 item_remove(it);
3019 } else {
3020 pthread_mutex_lock(&c->thread->stats.mutex);
3021 c->thread->stats.touch_cmds++;
3022 c->thread->stats.touch_misses++;
3023 pthread_mutex_unlock(&c->thread->stats.mutex);
3024
3025 out_string(c, "NOT_FOUND");
3026 }
3027 }
3028
3029 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
3030 char temp[INCR_MAX_STORAGE_LEN];
3031 uint64_t delta;
3032 char *key;
3033 size_t nkey;
3034
3035 assert(c != NULL);
3036
3037 set_noreply_maybe(c, tokens, ntokens);
3038
3039 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
3040 out_string(c, "CLIENT_ERROR bad command line format");
3041 return;
3042 }
3043
3044 key = tokens[KEY_TOKEN].value;
3045 nkey = tokens[KEY_TOKEN].length;
3046
3047 if (!safe_strtoull(tokens[2].value, &delta)) {
3048 out_string(c, "CLIENT_ERROR invalid numeric delta argument");
3049 return;
3050 }
3051
3052 switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) {
3053 case OK:
3054 out_string(c, temp);
3055 break;
3056 case NON_NUMERIC:
3057 out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
3058 break;
3059 case EOM:
3060 out_string(c, "SERVER_ERROR out of memory");
3061 break;
3062 case DELTA_ITEM_NOT_FOUND:
3063 pthread_mutex_lock(&c->thread->stats.mutex);
3064 if (incr) {
3065 c->thread->stats.incr_misses++;
3066 } else {
3067 c->thread->stats.decr_misses++;
3068 }
3069 pthread_mutex_unlock(&c->thread->stats.mutex);
3070
3071 out_string(c, "NOT_FOUND");
3072 break;
3073 case DELTA_ITEM_CAS_MISMATCH:
3074 break; /* Should never get here */
3075 default:
3076 assert(false);
3077 abort();
3078 }
3079 }
3080
3081 /*
3082 * adds a delta value to a numeric item.
3083 *
3084 * c connection requesting the operation
3085 * it item to adjust
3086 * incr true to increment value, false to decrement
3087 * delta amount to adjust value by
3088 * buf buffer for response string
3089 *
3090 * returns a response string to send back to the client.
3091 */
3092 enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
3093 const bool incr, const int64_t delta,
3094 char *buf, uint64_t *cas,
3095 const uint32_t hv) {
3096 char *ptr;
3097 uint64_t value;
3098 int res;
3099 item *it;
3100
3101 it = do_item_get(key, nkey, hv);
3102 if (!it) {
3103 return DELTA_ITEM_NOT_FOUND;
3104 }
3105
3106 if (cas != NULL && *cas != 0 && ITEM_get_cas(it) != *cas) {
3107 do_item_remove(it);
3108 return DELTA_ITEM_CAS_MISMATCH;
3109 }
3110
3111 ptr = ITEM_data(it);
3112
3113 if (!safe_strtoull(ptr, &value)) {
3114 do_item_remove(it);
3115 return NON_NUMERIC;
3116 }
3117
3118 if (incr) {
3119 value += delta;
3120 MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
3121 } else {
3122 if(delta > value) {
3123 value = 0;
3124 } else {
3125 value -= delta;
3126 }
3127 MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
3128 }
3129
3130 pthread_mutex_lock(&c->thread->stats.mutex);
3131 if (incr) {
3132 c->thread->stats.slab_stats[it->slabs_clsid].incr_hits++;
3133 } else {
3134 c->thread->stats.slab_stats[it->slabs_clsid].decr_hits++;
3135 }
3136 pthread_mutex_unlock(&c->thread->stats.mutex);
3137
3138 snprintf(buf, INCR_MAX_STORAGE_LEN, "%llu", (unsigned long long)value);
3139 res = strlen(buf);
3140 if (res + 2 > it->nbytes || it->refcount != 1) { /* need to realloc */
3141 item *new_it;
3142 new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
3143 if (new_it == 0) {
3144 do_item_remove(it);
3145 return EOM;
3146 }
3147 memcpy(ITEM_data(new_it), buf, res);
3148 memcpy(ITEM_data(new_it) + res, "\r\n", 2);
3149 item_replace(it, new_it, hv);
3150 // Overwrite the older item's CAS with our new CAS since we're
3151 // returning the CAS of the old item below.
3152 ITEM_set_cas(it, (settings.use_cas) ? ITEM_get_cas(new_it) : 0);
3153 do_item_remove(new_it); /* release our reference */
3154 } else { /* replace in-place */
3155 /* When changing the value without replacing the item, we
3156 need to update the CAS on the existing item. */
3157 mutex_lock(&cache_lock); /* FIXME */
3158 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
3159 pthread_mutex_unlock(&cache_lock);
3160
3161 memcpy(ITEM_data(it), buf, res);
3162 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
3163 do_item_update(it);
3164 }
3165
3166 if (cas) {
3167 *cas = ITEM_get_cas(it); /* swap the incoming CAS value */
3168 }
3169 do_item_remove(it); /* release our reference */
3170 return OK;
3171 }
3172
3173 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
3174 char *key;
3175 size_t nkey;
3176 item *it;
3177
3178 assert(c != NULL);
3179
3180 if (ntokens > 3) {
3181 bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value, "0") == 0;
3182 bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
3183 bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
3184 || (ntokens == 5 && hold_is_zero && sets_noreply);
3185 if (!valid) {
3186 out_string(c, "CLIENT_ERROR bad command line format. "
3187 "Usage: delete <key> [noreply]");
3188 return;
3189 }
3190 }
3191
3192
3193 key = tokens[KEY_TOKEN].value;
3194 nkey = tokens[KEY_TOKEN].length;
3195
3196 if(nkey > KEY_MAX_LENGTH) {
3197 out_string(c, "CLIENT_ERROR bad command line format");
3198 return;
3199 }
3200
3201 if (settings.detail_enabled) {
3202 stats_prefix_record_delete(key, nkey);
3203 }
3204
3205 it = item_get(key, nkey);
3206 if (it) {
3207 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
3208
3209 pthread_mutex_lock(&c->thread->stats.mutex);
3210 c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
3211 pthread_mutex_unlock(&c->thread->stats.mutex);
3212
3213 item_unlink(it);
3214 item_remove(it); /* release our reference */
3215 out_string(c, "DELETED");
3216 } else {
3217 pthread_mutex_lock(&c->thread->stats.mutex);
3218 c->thread->stats.delete_misses++;
3219 pthread_mutex_unlock(&c->thread->stats.mutex);
3220
3221 out_string(c, "NOT_FOUND");
3222 }
3223 }
3224
3225 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
3226 unsigned int level;
3227
3228 assert(c != NULL);
3229
3230 set_noreply_maybe(c, tokens, ntokens);
3231
3232 level = strtoul(tokens[1].value, NULL, 10);
3233 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
3234 out_string(c, "OK");
3235 return;
3236 }
3237
3238 static void process_slabs_automove_command(conn *c, token_t *tokens, const size_t ntokens) {
3239 unsigned int level;
3240
3241 assert(c != NULL);
3242
3243 set_noreply_maybe(c, tokens, ntokens);
3244
3245 level = strtoul(tokens[2].value, NULL, 10);
3246 if (level == 0) {
3247 settings.slab_automove = false;
3248 } else if (level == 1) {
3249 settings.slab_automove = true;
3250 } else {
3251 out_string(c, "ERROR");
3252 return;
3253 }
3254 out_string(c, "OK");
3255 return;
3256 }
3257
3258 static void process_command(conn *c, char *command) {
3259
3260 token_t tokens[MAX_TOKENS];
3261 size_t ntokens;
3262 int comm;
3263
3264 assert(c != NULL);
3265
3266 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
3267
3268 if (settings.verbose > 1)
3269 fprintf(stderr, "<%d %s\n", c->sfd, command);
3270
3271 /*
3272 * for commands set/add/replace, we build an item and read the data
3273 * directly into it, then continue in nread_complete().
3274 */
3275
3276 c->msgcurr = 0;
3277 c->msgused = 0;
3278 c->iovused = 0;
3279 if (add_msghdr(c) != 0) {
3280 out_string(c, "SERVER_ERROR out of memory preparing response");
3281 return;
3282 }
3283
3284 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
3285 if (ntokens >= 3 &&
3286 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
3287 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
3288
3289 process_get_command(c, tokens, ntokens, false);
3290
3291 } else if ((ntokens == 6 || ntokens == 7) &&
3292 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
3293 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
3294 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
3295 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
3296 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
3297
3298 process_update_command(c, tokens, ntokens, comm, false);
3299
3300 } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
3301
3302 process_update_command(c, tokens, ntokens, comm, true);
3303
3304 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
3305
3306 process_arithmetic_command(c, tokens, ntokens, 1);
3307
3308 } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
3309
3310 process_get_command(c, tokens, ntokens, true);
3311
3312 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
3313
3314 process_arithmetic_command(c, tokens, ntokens, 0);
3315
3316 } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
3317
3318 process_delete_command(c, tokens, ntokens);
3319
3320 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "touch") == 0)) {
3321
3322 process_touch_command(c, tokens, ntokens);
3323
3324 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
3325
3326 process_stat(c, tokens, ntokens);
3327
3328 } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
3329 time_t exptime = 0;
3330
3331 set_noreply_maybe(c, tokens, ntokens);
3332
3333 pthread_mutex_lock(&c->thread->stats.mutex);
3334 c->thread->stats.flush_cmds++;
3335 pthread_mutex_unlock(&c->thread->stats.mutex);
3336
3337 if(ntokens == (c->noreply ? 3 : 2)) {
3338 settings.oldest_live = current_time - 1;
3339 item_flush_expired();
3340 out_string(c, "OK");
3341 return;
3342 }
3343
3344 exptime = strtol(tokens[1].value, NULL, 10);
3345 if(errno == ERANGE) {
3346 out_string(c, "CLIENT_ERROR bad command line format");
3347 return;
3348 }
3349
3350 /*
3351 If exptime is zero realtime() would return zero too, and
3352 realtime(exptime) - 1 would overflow to the max unsigned
3353 value. So we process exptime == 0 the same way we do when
3354 no delay is given at all.
3355 */
3356 if (exptime > 0)
3357 settings.oldest_live = realtime(exptime) - 1;
3358 else /* exptime == 0 */
3359 settings.oldest_live = current_time - 1;
3360 item_flush_expired();
3361 out_string(c, "OK");
3362 return;
3363
3364 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
3365
3366 out_string(c, "VERSION " RVERSION);
3367
3368 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
3369
3370 conn_set_state(c, conn_closing);
3371
3372 } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) {
3373 if (ntokens == 5 && strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0) {
3374 int src, dst, rv;
3375
3376 if (settings.slab_reassign == false) {
3377 out_string(c, "CLIENT_ERROR slab reassignment disabled");
3378 return;
3379 }
3380
3381 src = strtol(tokens[2].value, NULL, 10);
3382 dst = strtol(tokens[3].value, NULL, 10);
3383
3384 if (errno == ERANGE) {
3385 out_string(c, "CLIENT_ERROR bad command line format");
3386 return;
3387 }
3388
3389 rv = slabs_reassign(src, dst);
3390 switch (rv) {
3391 case REASSIGN_OK:
3392 out_string(c, "OK");
3393 break;
3394 case REASSIGN_RUNNING:
3395 out_string(c, "BUSY currently processing reassign request");
3396 break;
3397 case REASSIGN_BADCLASS:
3398 out_string(c, "BADCLASS invalid src or dst class id");
3399 break;
3400 case REASSIGN_NOSPARE:
3401 out_string(c, "NOSPARE source class has no spare pages");
3402 break;
3403 case REASSIGN_DEST_NOT_FULL:
3404 out_string(c, "NOTFULL dest class has spare memory");
3405 break;
3406 case REASSIGN_SRC_NOT_SAFE:
3407 out_string(c, "UNSAFE src class is in an unsafe state");
3408 break;
3409 case REASSIGN_SRC_DST_SAME:
3410 out_string(c, "SAME src and dst class are identical");
3411 break;
3412 default:
3413 assert(false);
3414 abort();
3415 }
3416 return;
3417 } else if (ntokens == 4 &&
3418 (strcmp(tokens[COMMAND_TOKEN + 1].value, "automove") == 0)) {
3419 process_slabs_automove_command(c, tokens, ntokens);
3420 } else {
3421 out_string(c, "ERROR");
3422 }
3423 } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
3424 process_verbosity_command(c, tokens, ntokens);
3425 } else {
3426 out_string(c, "ERROR");
3427 }
3428 return;
3429 }
3430
3431 /*
3432 * if we have a complete line in the buffer, process it.
3433 */
3434 static int try_read_command(conn *c) {
3435 assert(c != NULL);
3436 assert(c->rcurr <= (c->rbuf + c->rsize));
3437 assert(c->rbytes > 0);
3438
3439 if (c->protocol == negotiating_prot || c->transport == udp_transport) {
3440 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
3441 c->protocol = binary_prot;
3442 } else {
3443 c->protocol = ascii_prot;
3444 }
3445
3446 if (settings.verbose > 1) {
3447 fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
3448 prot_text(c->protocol));
3449 }
3450 }
3451
3452 if (c->protocol == binary_prot) {
3453 /* Do we have the complete packet header? */
3454 if (c->rbytes < sizeof(c->binary_header)) {
3455 /* need more data! */
3456 return 0;
3457 } else {
3458 #ifdef NEED_ALIGN
3459 if (((long)(c->rcurr)) % 8 != 0) {
3460 /* must realign input buffer */
3461 memmove(c->rbuf, c->rcurr, c->rbytes);
3462 c->rcurr = c->rbuf;
3463 if (settings.verbose > 1) {
3464 fprintf(stderr, "%d: Realign input buffer\n", c->sfd);
3465 }
3466 }
3467 #endif
3468 protocol_binary_request_header* req;
3469 req = (protocol_binary_request_header*)c->rcurr;
3470
3471 if (settings.verbose > 1) {
3472 /* Dump the packet before we convert it to host order */
3473 int ii;
3474 fprintf(stderr, "<%d Read binary protocol data:", c->sfd);
3475 for (ii = 0; ii < sizeof(req->bytes); ++ii) {
3476 if (ii % 4 == 0) {
3477 fprintf(stderr, "\n<%d ", c->sfd);
3478 }
3479 fprintf(stderr, " 0x%02x", req->bytes[ii]);
3480 }
3481 fprintf(stderr, "\n");
3482 }
3483
3484 c->binary_header = *req;
3485 c->binary_header.request.keylen = ntohs(req->request.keylen);
3486 c->binary_header.request.bodylen = ntohl(req->request.bodylen);
3487 c->binary_header.request.cas = ntohll(req->request.cas);
3488
3489 if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ) {
3490 if (settings.verbose) {
3491 fprintf(stderr, "Invalid magic: %x\n",
3492 c->binary_header.request.magic);
3493 }
3494 conn_set_state(c, conn_closing);
3495 return -1;
3496 }
3497
3498 c->msgcurr = 0;
3499 c->msgused = 0;
3500 c->iovused = 0;
3501 if (add_msghdr(c) != 0) {
3502 out_string(c, "SERVER_ERROR out of memory");
3503 return 0;
3504 }
3505
3506 c->cmd = c->binary_header.request.opcode;
3507 c->keylen = c->binary_header.request.keylen;
3508 c->opaque = c->binary_header.request.opaque;
3509 /* clear the returned cas value */
3510 c->cas = 0;
3511
3512 dispatch_bin_command(c);
3513
3514 c->rbytes -= sizeof(c->binary_header);
3515 c->rcurr += sizeof(c->binary_header);
3516 }
3517 } else {
3518 char *el, *cont;
3519
3520 if (c->rbytes == 0)
3521 return 0;
3522
3523 el = memchr(c->rcurr, '\n', c->rbytes);
3524 if (!el) {
3525 if (c->rbytes > 1024) {
3526 /*
3527 * We didn't have a '\n' in the first k. This _has_ to be a
3528 * large multiget, if not we should just nuke the connection.
3529 */
3530 char *ptr = c->rcurr;
3531 while (*ptr == ' ') { /* ignore leading whitespaces */
3532 ++ptr;
3533 }
3534
3535 if (ptr - c->rcurr > 100 ||
3536 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
3537
3538 conn_set_state(c, conn_closing);
3539 return 1;
3540 }
3541 }
3542
3543 return 0;
3544 }
3545 cont = el + 1;
3546 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
3547 el--;
3548 }
3549 *el = '\0';
3550
3551 assert(cont <= (c->rcurr + c->rbytes));
3552
3553 process_command(c, c->rcurr);
3554
3555 c->rbytes -= (cont - c->rcurr);
3556 c->rcurr = cont;
3557
3558 assert(c->rcurr <= (c->rbuf + c->rsize));
3559 }
3560
3561 return 1;
3562 }
3563
3564 /*
3565 * read a UDP request.
3566 */
3567 static enum try_read_result try_read_udp(conn *c) {
3568 int res;
3569
3570 assert(c != NULL);
3571
3572 c->request_addr_size = sizeof(c->request_addr);
3573 res = recvfrom(c->sfd, c->rbuf, c->rsize,
3574 0, &c->request_addr, &c->request_addr_size);
3575 if (res > 8) {
3576 unsigned char *buf = (unsigned char *)c->rbuf;
3577 pthread_mutex_lock(&c->thread->stats.mutex);
3578 c->thread->stats.bytes_read += res;
3579 pthread_mutex_unlock(&c->thread->stats.mutex);
3580
3581 /* Beginning of UDP packet is the request ID; save it. */
3582 c->request_id = buf[0] * 256 + buf[1];
3583
3584 /* If this is a multi-packet request, drop it. */
3585 if (buf[4] != 0 || buf[5] != 1) {
3586 out_string(c, "SERVER_ERROR multi-packet request not supported");
3587 return READ_NO_DATA_RECEIVED;
3588 }
3589
3590 /* Don't care about any of the rest of the header. */
3591 res -= 8;
3592 memmove(c->rbuf, c->rbuf + 8, res);
3593
3594 c->rbytes = res;
3595 c->rcurr = c->rbuf;
3596 return READ_DATA_RECEIVED;
3597 }
3598 return READ_NO_DATA_RECEIVED;
3599 }
3600
3601 /*
3602 * read from network as much as we can, handle buffer overflow and connection
3603 * close.
3604 * before reading, move the remaining incomplete fragment of a command
3605 * (if any) to the beginning of the buffer.
3606 *
3607 * To protect us from someone flooding a connection with bogus data causing
3608 * the connection to eat up all available memory, break out and start looking
3609 * at the data I've got after a number of reallocs...
3610 *
3611 * @return enum try_read_result
3612 */
3613 static enum try_read_result try_read_network(conn *c) {
3614 enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
3615 int res;
3616 int num_allocs = 0;
3617 assert(c != NULL);
3618
3619 if (c->rcurr != c->rbuf) {
3620 if (c->rbytes != 0) /* otherwise there's nothing to copy */
3621 memmove(c->rbuf, c->rcurr, c->rbytes);
3622 c->rcurr = c->rbuf;
3623 }
3624
3625 while (1) {
3626 if (c->rbytes >= c->rsize) {
3627 if (num_allocs == 4) {
3628 return gotdata;
3629 }
3630 ++num_allocs;
3631 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
3632 if (!new_rbuf) {
3633 if (settings.verbose > 0)
3634 fprintf(stderr, "Couldn't realloc input buffer\n");
3635 c->rbytes = 0; /* ignore what we read */
3636 out_string(c, "SERVER_ERROR out of memory reading request");
3637 c->write_and_go = conn_closing;
3638 return READ_MEMORY_ERROR;
3639 }
3640 c->rcurr = c->rbuf = new_rbuf;
3641 c->rsize *= 2;
3642 }
3643
3644 int avail = c->rsize - c->rbytes;
3645 res = read(c->sfd, c->rbuf + c->rbytes, avail);
3646 if (res > 0) {
3647 pthread_mutex_lock(&c->thread->stats.mutex);
3648 c->thread->stats.bytes_read += res;
3649 pthread_mutex_unlock(&c->thread->stats.mutex);
3650 gotdata = READ_DATA_RECEIVED;
3651 c->rbytes += res;
3652 if (res == avail) {
3653 continue;
3654 } else {
3655 break;
3656 }
3657 }
3658 if (res == 0) {
3659 return READ_ERROR;
3660 }
3661 if (res == -1) {
3662 if (errno == EAGAIN || errno == EWOULDBLOCK) {
3663 break;
3664 }
3665 return READ_ERROR;
3666 }
3667 }
3668 return gotdata;
3669 }
3670
3671 static bool update_event(conn *c, const int new_flags) {
3672 assert(c != NULL);
3673
3674 struct event_base *base = c->event.ev_base;
3675 if (c->ev_flags == new_flags)
3676 return true;
3677 if (event_del(&c->event) == -1) return false;
3678 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
3679 event_base_set(base, &c->event);
3680 c->ev_flags = new_flags;
3681 if (event_add(&c->event, 0) == -1) return false;
3682 return true;
3683 }
3684
3685 /*
3686 * Sets whether we are listening for new connections or not.
3687 */
3688 void do_accept_new_conns(const bool do_accept) {
3689 conn *next;
3690
3691 for (next = listen_conn; next; next = next->next) {
3692 if (do_accept) {
3693 update_event(next, EV_READ | EV_PERSIST);
3694 if (listen(next->sfd, settings.backlog) != 0) {
3695 perror("listen");
3696 }
3697 }
3698 else {
3699 update_event(next, 0);
3700 if (listen(next->sfd, 0) != 0) {
3701 perror("listen");
3702 }
3703 }
3704 }
3705
3706 if (do_accept) {
3707 STATS_LOCK();
3708 stats.accepting_conns = true;
3709 STATS_UNLOCK();
3710 } else {
3711 STATS_LOCK();
3712 stats.accepting_conns = false;
3713 stats.listen_disabled_num++;
3714 STATS_UNLOCK();
3715 allow_new_conns = false;
3716 maxconns_handler(-42, 0, 0);
3717 }
3718 }
3719
3720 /*
3721 * Transmit the next chunk of data from our list of msgbuf structures.
3722 *
3723 * Returns:
3724 * TRANSMIT_COMPLETE All done writing.
3725 * TRANSMIT_INCOMPLETE More data remaining to write.
3726 * TRANSMIT_SOFT_ERROR Can't write any more right now.
3727 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3728 */
3729 static enum transmit_result transmit(conn *c) {
3730 assert(c != NULL);
3731
3732 if (c->msgcurr < c->msgused &&
3733 c->msglist[c->msgcurr].msg_iovlen == 0) {
3734 /* Finished writing the current msg; advance to the next. */
3735 c->msgcurr++;
3736 }
3737 if (c->msgcurr < c->msgused) {
3738 ssize_t res;
3739 struct msghdr *m = &c->msglist[c->msgcurr];
3740
3741 res = sendmsg(c->sfd, m, 0);
3742 if (res > 0) {
3743 pthread_mutex_lock(&c->thread->stats.mutex);
3744 c->thread->stats.bytes_written += res;
3745 pthread_mutex_unlock(&c->thread->stats.mutex);
3746
3747 /* We've written some of the data. Remove the completed
3748 iovec entries from the list of pending writes. */
3749 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
3750 res -= m->msg_iov->iov_len;
3751 m->msg_iovlen--;
3752 m->msg_iov++;
3753 }
3754
3755 /* Might have written just part of the last iovec entry;
3756 adjust it so the next write will do the rest. */
3757 if (res > 0) {
3758 m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
3759 m->msg_iov->iov_len -= res;
3760 }
3761 return TRANSMIT_INCOMPLETE;
3762 }
3763 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3764 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3765 if (settings.verbose > 0)
3766 fprintf(stderr, "Couldn't update event\n");
3767 conn_set_state(c, conn_closing);
3768 return TRANSMIT_HARD_ERROR;
3769 }
3770 return TRANSMIT_SOFT_ERROR;
3771 }
3772 /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3773 we have a real error, on which we close the connection */
3774 if (settings.verbose > 0)
3775 perror("Failed to write, and not due to blocking");
3776
3777 if (IS_UDP(c->transport))
3778 conn_set_state(c, conn_read);
3779 else
3780 conn_set_state(c, conn_closing);
3781 return TRANSMIT_HARD_ERROR;
3782 } else {
3783 return TRANSMIT_COMPLETE;
3784 }
3785 }
3786
3787 static void drive_machine(conn *c) {
3788 bool stop = false;
3789 int sfd, flags = 1;
3790 socklen_t addrlen;
3791 struct sockaddr_storage addr;
3792 int nreqs = settings.reqs_per_event;
3793 int res;
3794 const char *str;
3795
3796 assert(c != NULL);
3797
3798 while (!stop) {
3799
3800 switch(c->state) {
3801 case conn_listening:
3802 addrlen = sizeof(addr);
3803 if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
3804 if (errno == EAGAIN || errno == EWOULDBLOCK) {
3805 /* these are transient, so don't log anything */
3806 stop = true;
3807 } else if (errno == EMFILE) {
3808 if (settings.verbose > 0)
3809 fprintf(stderr, "Too many open connections\n");
3810 accept_new_conns(false);
3811 stop = true;
3812 } else {
3813 perror("accept()");
3814 stop = true;
3815 }
3816 break;
3817 }
3818 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
3819 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
3820 perror("setting O_NONBLOCK");
3821 close(sfd);
3822 break;
3823 }
3824
3825 if (settings.maxconns_fast &&
3826 stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
3827 str = "ERROR Too many open connections\r\n";
3828 res = write(sfd, str, strlen(str));
3829 close(sfd);
3830 STATS_LOCK();
3831 stats.rejected_conns++;
3832 STATS_UNLOCK();
3833 } else {
3834 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
3835 DATA_BUFFER_SIZE, tcp_transport);
3836 }
3837
3838 stop = true;
3839 break;
3840
3841 case conn_waiting:
3842 if (!update_event(c, EV_READ | EV_PERSIST)) {
3843 if (settings.verbose > 0)
3844 fprintf(stderr, "Couldn't update event\n");
3845 conn_set_state(c, conn_closing);
3846 break;
3847 }
3848
3849 conn_set_state(c, conn_read);
3850 stop = true;
3851 break;
3852
3853 case conn_read:
3854 res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
3855
3856 switch (res) {
3857 case READ_NO_DATA_RECEIVED:
3858 conn_set_state(c, conn_waiting);
3859 break;
3860 case READ_DATA_RECEIVED:
3861 conn_set_state(c, conn_parse_cmd);
3862 break;
3863 case READ_ERROR:
3864 conn_set_state(c, conn_closing);
3865 break;
3866 case READ_MEMORY_ERROR: /* Failed to allocate more memory */
3867 /* State already set by try_read_network */
3868 break;
3869 default:
3870 assert(false);
3871 abort();
3872 }
3873 break;
3874
3875 case conn_parse_cmd :
3876 if (try_read_command(c) == 0) {
3877 /* wee need more data! */
3878 conn_set_state(c, conn_waiting);
3879 }
3880
3881 break;
3882
3883 case conn_new_cmd:
3884 /* Only process nreqs at a time to avoid starving other
3885 connections */
3886
3887 --nreqs;
3888 if (nreqs >= 0) {
3889 reset_cmd_handler(c);
3890 } else {
3891 pthread_mutex_lock(&c->thread->stats.mutex);
3892 c->thread->stats.conn_yields++;
3893 pthread_mutex_unlock(&c->thread->stats.mutex);
3894 if (c->rbytes > 0) {
3895 /* We have already read in data into the input buffer,
3896 so libevent will most likely not signal read events
3897 on the socket (unless more data is available. As a
3898 hack we should just put in a request to write data,
3899 because that should be possible ;-)
3900 */
3901 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3902 if (settings.verbose > 0)
3903 fprintf(stderr, "Couldn't update event\n");
3904 conn_set_state(c, conn_closing);
3905 }
3906 }
3907 stop = true;
3908 }
3909 break;
3910
3911 case conn_nread:
3912 if (c->rlbytes == 0) {
3913 complete_nread(c);
3914 break;
3915 }
3916 /* first check if we have leftovers in the conn_read buffer */
3917 if (c->rbytes > 0) {
3918 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
3919 if (c->ritem != c->rcurr) {
3920 memmove(c->ritem, c->rcurr, tocopy);
3921 }
3922 c->ritem += tocopy;
3923 c->rlbytes -= tocopy;
3924 c->rcurr += tocopy;
3925 c->rbytes -= tocopy;
3926 if (c->rlbytes == 0) {
3927 break;
3928 }
3929 }
3930
3931 /* now try reading from the socket */
3932 res = read(c->sfd, c->ritem, c->rlbytes);
3933 if (res > 0) {
3934 pthread_mutex_lock(&c->thread->stats.mutex);
3935 c->thread->stats.bytes_read += res;
3936 pthread_mutex_unlock(&c->thread->stats.mutex);
3937 if (c->rcurr == c->ritem) {
3938 c->rcurr += res;
3939 }
3940 c->ritem += res;
3941 c->rlbytes -= res;
3942 break;
3943 }
3944 if (res == 0) { /* end of stream */
3945 conn_set_state(c, conn_closing);
3946 break;
3947 }
3948 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3949 if (!update_event(c, EV_READ | EV_PERSIST)) {
3950 if (settings.verbose > 0)
3951 fprintf(stderr, "Couldn't update event\n");
3952 conn_set_state(c, conn_closing);
3953 break;
3954 }
3955 stop = true;
3956 break;
3957 }
3958 /* otherwise we have a real error, on which we close the connection */
3959 if (settings.verbose > 0) {
3960 fprintf(stderr, "Failed to read, and not due to blocking:\n"
3961 "errno: %d %s \n"
3962 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3963 errno, strerror(errno),
3964 (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
3965 (int)c->rlbytes, (int)c->rsize);
3966 }
3967 conn_set_state(c, conn_closing);
3968 break;
3969
3970 case conn_swallow:
3971 /* we are reading sbytes and throwing them away */
3972 if (c->sbytes == 0) {
3973 conn_set_state(c, conn_new_cmd);
3974 break;
3975 }
3976
3977 /* first check if we have leftovers in the conn_read buffer */
3978 if (c->rbytes > 0) {
3979 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
3980 c->sbytes -= tocopy;
3981 c->rcurr += tocopy;
3982 c->rbytes -= tocopy;
3983 break;
3984 }
3985
3986 /* now try reading from the socket */
3987 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
3988 if (res > 0) {
3989 pthread_mutex_lock(&c->thread->stats.mutex);
3990 c->thread->stats.bytes_read += res;
3991 pthread_mutex_unlock(&c->thread->stats.mutex);
3992 c->sbytes -= res;
3993 break;
3994 }
3995 if (res == 0) { /* end of stream */
3996 conn_set_state(c, conn_closing);
3997 break;
3998 }
3999 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
4000 if (!update_event(c, EV_READ | EV_PERSIST)) {
4001 if (settings.verbose > 0)
4002 fprintf(stderr, "Couldn't update event\n");
4003 conn_set_state(c, conn_closing);
4004 break;
4005 }
4006 stop = true;
4007 break;
4008 }
4009 /* otherwise we have a real error, on which we close the connection */
4010 if (settings.verbose > 0)
4011 fprintf(stderr, "Failed to read, and not due to blocking\n");
4012 conn_set_state(c, conn_closing);
4013 break;
4014
4015 case conn_write:
4016 /*
4017 * We want to write out a simple response. If we haven't already,
4018 * assemble it into a msgbuf list (this will be a single-entry
4019 * list for TCP or a two-entry list for UDP).
4020 */
4021 if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
4022 if (add_iov(c, c->wcurr, c->wbytes) != 0) {
4023 if (settings.verbose > 0)
4024 fprintf(stderr, "Couldn't build response\n");
4025 conn_set_state(c, conn_closing);
4026 break;
4027 }
4028 }
4029
4030 /* fall through... */
4031
4032 case conn_mwrite:
4033 if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
4034 if (settings.verbose > 0)
4035 fprintf(stderr, "Failed to build UDP headers\n");
4036 conn_set_state(c, conn_closing);
4037 break;
4038 }
4039 switch (transmit(c)) {
4040 case TRANSMIT_COMPLETE:
4041 if (c->state == conn_mwrite) {
4042 while (c->ileft > 0) {
4043 item *it = *(c->icurr);
4044 assert((it->it_flags & ITEM_SLABBED) == 0);
4045 item_remove(it);
4046 c->icurr++;
4047 c->ileft--;
4048 }
4049 while (c->suffixleft > 0) {
4050 char *suffix = *(c->suffixcurr);
4051 cache_free(c->thread->suffix_cache, suffix);
4052 c->suffixcurr++;
4053 c->suffixleft--;
4054 }
4055 /* XXX: I don't know why this wasn't the general case */
4056 if(c->protocol == binary_prot) {
4057 conn_set_state(c, c->write_and_go);
4058 } else {
4059 conn_set_state(c, conn_new_cmd);
4060 }
4061 } else if (c->state == conn_write) {
4062 if (c->write_and_free) {
4063 free(c->write_and_free);
4064 c->write_and_free = 0;
4065 }
4066 conn_set_state(c, c->write_and_go);
4067 } else {
4068 if (settings.verbose > 0)
4069 fprintf(stderr, "Unexpected state %d\n", c->state);
4070 conn_set_state(c, conn_closing);
4071 }
4072 break;
4073
4074 case TRANSMIT_INCOMPLETE:
4075 case TRANSMIT_HARD_ERROR:
4076 break; /* Continue in state machine. */
4077
4078 case TRANSMIT_SOFT_ERROR:
4079 stop = true;
4080 break;
4081 default:
4082 assert(false);
4083 abort();
4084 }
4085 break;
4086
4087 case conn_closing:
4088 if (IS_UDP(c->transport))
4089 conn_cleanup(c);
4090 else
4091 conn_close(c);
4092 stop = true;
4093 break;
4094
4095 case conn_max_state:
4096 assert(false);
4097 break;
4098 default:
4099 assert(false);
4100 abort();
4101 }
4102 }
4103
4104 return;
4105 }
4106
4107 void event_handler(const int fd, const short which, void *arg) {
4108 conn *c;
4109
4110 c = (conn *)arg;
4111 assert(c != NULL);
4112
4113 c->which = which;
4114
4115 /* sanity */
4116 if (fd != c->sfd) {
4117 if (settings.verbose > 0)
4118 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
4119 conn_close(c);
4120 return;
4121 }
4122
4123 drive_machine(c);
4124
4125 /* wait for next event */
4126 return;
4127 }
4128
4129 static int new_socket(struct addrinfo *ai) {
4130 int sfd;
4131 int flags;
4132
4133 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
4134 return -1;
4135 }
4136
4137 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
4138 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
4139 perror("setting O_NONBLOCK");
4140 close(sfd);
4141 return -1;
4142 }
4143 return sfd;
4144 }
4145
4146
4147 /*
4148 * Sets a socket's send buffer size to the maximum allowed by the system.
4149 */
4150 static void maximize_sndbuf(const int sfd) {
4151 socklen_t intsize = sizeof(int);
4152 int last_good = 0;
4153 int min, max, avg;
4154 int old_size;
4155
4156 /* Start with the default size. */
4157 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
4158 if (settings.verbose > 0)
4159 perror("getsockopt(SO_SNDBUF)");
4160 return;
4161 }
4162
4163 /* Binary-search for the real maximum. */
4164 min = old_size;
4165 max = MAX_SENDBUF_SIZE;
4166
4167 while (min <= max) {
4168 avg = ((unsigned int)(min + max)) / 2;
4169 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
4170 last_good = avg;
4171 min = avg + 1;
4172 } else {
4173 max = avg - 1;
4174 }
4175 }
4176
4177 if (settings.verbose > 1)
4178 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
4179 }
4180
4181 /**
4182 * Create a socket and bind it to a specific port number
4183 * @param interface the interface to bind to
4184 * @param port the port number to bind to
4185 * @param transport the transport protocol (TCP / UDP)
4186 * @param portnumber_file A filepointer to write the port numbers to
4187 * when they are successfully added to the list of ports we
4188 * listen on.
4189 */
4190 static int server_socket(const char *interface,
4191 int port,
4192 enum network_transport transport,
4193 FILE *portnumber_file) {
4194 int sfd;
4195 struct linger ling = {0, 0};
4196 struct addrinfo *ai;
4197 struct addrinfo *next;
4198 struct addrinfo hints = { .ai_flags = AI_PASSIVE,
4199 .ai_family = AF_UNSPEC };
4200 char port_buf[NI_MAXSERV];
4201 int error;
4202 int success = 0;
4203 int flags =1;
4204
4205 hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
4206
4207 if (port == -1) {
4208 port = 0;
4209 }
4210 snprintf(port_buf, sizeof(port_buf), "%d", port);
4211 error= getaddrinfo(interface, port_buf, &hints, &ai);
4212 if (error != 0) {
4213 if (error != EAI_SYSTEM)
4214 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
4215 else
4216 perror("getaddrinfo()");
4217 return 1;
4218 }
4219
4220 for (next= ai; next; next= next->ai_next) {
4221 conn *listen_conn_add;
4222 if ((sfd = new_socket(next)) == -1) {
4223 /* getaddrinfo can return "junk" addresses,
4224 * we make sure at least one works before erroring.
4225 */
4226 if (errno == EMFILE) {
4227 /* ...unless we're out of fds */
4228 perror("server_socket");
4229 exit(EX_OSERR);
4230 }
4231 continue;
4232 }
4233
4234 #ifdef IPV6_V6ONLY
4235 if (next->ai_family == AF_INET6) {
4236 error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
4237 if (error != 0) {
4238 perror("setsockopt");
4239 close(sfd);
4240 continue;
4241 }
4242 }
4243 #endif
4244
4245 error = setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
4246 if (error != 0)
4247 {
4248 perror("setsockopt(SO_REUSEADDR)");
4249 }
4250
4251 if (IS_UDP(transport)) {
4252 maximize_sndbuf(sfd);
4253 } else {
4254 error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
4255 if (error != 0)
4256 perror("setsockopt");
4257
4258 error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
4259 if (error != 0)
4260 perror("setsockopt");
4261
4262 error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
4263 if (error != 0)
4264 perror("setsockopt");
4265 }
4266
4267 if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
4268 if (errno != EADDRINUSE) {
4269 perror("bind()");
4270 close(sfd);
4271 freeaddrinfo(ai);
4272 return 1;
4273 }
4274 close(sfd);
4275 continue;
4276 } else {
4277 success++;
4278 if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
4279 perror("listen()");
4280 close(sfd);
4281 freeaddrinfo(ai);
4282 return 1;
4283 }
4284 if (portnumber_file != NULL &&
4285 (next->ai_addr->sa_family == AF_INET ||
4286 next->ai_addr->sa_family == AF_INET6)) {
4287 union {
4288 struct sockaddr_in in;
4289 struct sockaddr_in6 in6;
4290 } my_sockaddr;
4291 socklen_t len = sizeof(my_sockaddr);
4292 if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
4293 if (next->ai_addr->sa_family == AF_INET) {
4294 fprintf(portnumber_file, "%s INET: %u\n",
4295 IS_UDP(transport) ? "UDP" : "TCP",
4296 ntohs(my_sockaddr.in.sin_port));
4297 } else {
4298 fprintf(portnumber_file, "%s INET6: %u\n",
4299 IS_UDP(transport) ? "UDP" : "TCP",
4300 ntohs(my_sockaddr.in6.sin6_port));
4301 }
4302 }
4303 }
4304 }
4305
4306 if (IS_UDP(transport)) {
4307 int c;
4308
4309 for (c = 0; c < settings.num_threads_per_udp; c++) {
4310 /* this is guaranteed to hit all threads because we round-robin */
4311 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
4312 UDP_READ_BUFFER_SIZE, transport);
4313 }
4314 } else {
4315 if (!(listen_conn_add = conn_new(sfd, conn_listening,
4316 EV_READ | EV_PERSIST, 1,
4317 transport, main_base))) {
4318 fprintf(stderr, "failed to create listening connection\n");
4319 exit(EXIT_FAILURE);
4320 }
4321 listen_conn_add->next = listen_conn;
4322 listen_conn = listen_conn_add;
4323 }
4324 }
4325
4326 freeaddrinfo(ai);
4327
4328 /* Return zero iff we detected no errors in starting up connections */
4329 return success == 0;
4330 }
4331
4332 static int server_sockets(int port, enum network_transport transport,
4333 FILE *portnumber_file) {
4334 if (settings.inter == NULL) {
4335 return server_socket(settings.inter, port, transport, portnumber_file);
4336 } else {
4337 // tokenize them and bind to each one of them..
4338 char *b;
4339 int ret = 0;
4340 char *list = strdup(settings.inter);
4341
4342 if (list == NULL) {
4343 fprintf(stderr, "Failed to allocate memory for parsing server interface string\n");
4344 return 1;
4345 }
4346 for (char *p = strtok_r(list, ";,", &b);
4347 p != NULL;
4348 p = strtok_r(NULL, ";,", &b)) {
4349 int the_port = port;
4350 char *s = strchr(p, ':');
4351 if (s != NULL) {
4352 *s = '\0';
4353 ++s;
4354 if (!safe_strtol(s, &the_port)) {
4355 fprintf(stderr, "Invalid port number: \"%s\"", s);
4356 return 1;
4357 }
4358 }
4359 if (strcmp(p, "*") == 0) {
4360 p = NULL;
4361 }
4362 ret |= server_socket(p, the_port, transport, portnumber_file);
4363 }
4364 free(list);
4365 return ret;
4366 }
4367 }
4368
4369 static int new_socket_unix(void) {
4370 int sfd;
4371 int flags;
4372
4373 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
4374 perror("socket()");
4375 return -1;
4376 }
4377
4378 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
4379 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
4380 perror("setting O_NONBLOCK");
4381 close(sfd);
4382 return -1;
4383 }
4384 return sfd;
4385 }
4386
4387 static int server_socket_unix(const char *path, int access_mask) {
4388 int sfd;
4389 struct linger ling = {0, 0};
4390 struct sockaddr_un addr;
4391 struct stat tstat;
4392 int flags =1;
4393 int old_umask;
4394
4395 if (!path) {
4396 return 1;
4397 }
4398
4399 if ((sfd = new_socket_unix()) == -1) {
4400 return 1;
4401 }
4402
4403 /*
4404 * Clean up a previous socket file if we left it around
4405 */
4406 if (lstat(path, &tstat) == 0) {
4407 if (S_ISSOCK(tstat.st_mode))
4408 unlink(path);
4409 }
4410
4411 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
4412 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
4413 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
4414
4415 /*
4416 * the memset call clears nonstandard fields in some impementations
4417 * that otherwise mess things up.
4418 */
4419 memset(&addr, 0, sizeof(addr));
4420
4421 addr.sun_family = AF_UNIX;
4422 strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
4423 assert(strcmp(addr.sun_path, path) == 0);
4424 old_umask = umask( ~(access_mask&0777));
4425 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
4426 perror("bind()");
4427 close(sfd);
4428 umask(old_umask);
4429 return 1;
4430 }
4431 umask(old_umask);
4432 if (listen(sfd, settings.backlog) == -1) {
4433 perror("listen()");
4434 close(sfd);
4435 return 1;
4436 }
4437 if (!(listen_conn = conn_new(sfd, conn_listening,
4438 EV_READ | EV_PERSIST, 1,
4439 local_transport, main_base))) {
4440 fprintf(stderr, "failed to create listening connection\n");
4441 exit(EXIT_FAILURE);
4442 }
4443
4444 return 0;
4445 }
4446
4447 /*
4448 * We keep the current time of day in a global variable that's updated by a
4449 * timer event. This saves us a bunch of time() system calls (we really only
4450 * need to get the time once a second, whereas there can be tens of thousands
4451 * of requests a second) and allows us to use server-start-relative timestamps
4452 * rather than absolute UNIX timestamps, a space savings on systems where
4453 * sizeof(time_t) > sizeof(unsigned int).
4454 */
4455 volatile rel_time_t current_time;
4456 static struct event clockevent;
4457
4458 /* libevent uses a monotonic clock when available for event scheduling. Aside
4459 * from jitter, simply ticking our internal timer here is accurate enough.
4460 * Note that users who are setting explicit dates for expiration times *must*
4461 * ensure their clocks are correct before starting memcached. */
4462 static void clock_handler(const int fd, const short which, void *arg) {
4463 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
4464 static bool initialized = false;
4465 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4466 static bool monotonic = false;
4467 static time_t monotonic_start;
4468 #endif
4469
4470 if (initialized) {
4471 /* only delete the event if it's actually there. */
4472 evtimer_del(&clockevent);
4473 } else {
4474 initialized = true;
4475 /* process_started is initialized to time() - 2. We initialize to 1 so
4476 * flush_all won't underflow during tests. */
4477 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4478 struct timespec ts;
4479 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
4480 monotonic = true;
4481 monotonic_start = ts.tv_sec - 2;
4482 }
4483 #endif
4484 }
4485
4486 evtimer_set(&clockevent, clock_handler, 0);
4487 event_base_set(main_base, &clockevent);
4488 evtimer_add(&clockevent, &t);
4489
4490 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4491 if (monotonic) {
4492 struct timespec ts;
4493 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
4494 return;
4495 current_time = (rel_time_t) (ts.tv_sec - monotonic_start);
4496 return;
4497 }
4498 #endif
4499 {
4500 struct timeval tv;
4501 gettimeofday(&tv, NULL);
4502 current_time = (rel_time_t) (tv.tv_sec - process_started);
4503 }
4504 }
4505
4506 static void usage(void) {
4507 printf(RPACKAGE " " RVERSION "\n");
4508 printf("-p <num> TCP port number to listen on (default: 11211)\n"
4509 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
4510 "-s <file> UNIX socket path to listen on (disables network support)\n"
4511 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
4512 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
4513 " <addr> may be specified as host:port. If you don't specify\n"
4514 " a port number, the value you specified with -p or -U is\n"
4515 " used. You may specify multiple addresses separated by comma\n"
4516 " or by using -l multiple times\n"
4517
4518 "-d run as a daemon\n"
4519 "-r maximize core file limit\n"
4520 "-u <username> assume identity of <username> (only when run as root)\n"
4521 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
4522 "-M return error on memory exhausted (rather than removing items)\n"
4523 "-c <num> max simultaneous connections (default: 1024)\n"
4524 "-k lock down all paged memory. Note that there is a\n"
4525 " limit on how much memory you may lock. Trying to\n"
4526 " allocate more than that would fail, so be sure you\n"
4527 " set the limit correctly for the user you started\n"
4528 " the daemon with (not for -u <username> user;\n"
4529 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
4530 "-v verbose (print errors/warnings while in event loop)\n"
4531 "-vv very verbose (also print client commands/reponses)\n"
4532 "-vvv extremely verbose (also print internal state transitions)\n"
4533 "-h print this help and exit\n"
4534 "-i print memcached and libevent license\n"
4535 "-P <file> save PID in <file>, only used with -d option\n"
4536 "-f <factor> chunk size growth factor (default: 1.25)\n"
4537 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
4538 printf("-L Try to use large memory pages (if available). Increasing\n"
4539 " the memory page size could reduce the number of TLB misses\n"
4540 " and improve the performance. In order to get large pages\n"
4541 " from the OS, memcached will allocate the total item-cache\n"
4542 " in one large chunk.\n");
4543 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
4544 " This is used for per-prefix stats reporting. The default is\n"
4545 " \":\" (colon). If this option is specified, stats collection\n"
4546 " is turned on automatically; if not, then it may be turned on\n"
4547 " by sending the \"stats detail on\" command to the server.\n");
4548 printf("-t <num> number of threads to use (default: 4)\n");
4549 printf("-R Maximum number of requests per event, limits the number of\n"
4550 " requests process for a given connection to prevent \n"
4551 " starvation (default: 20)\n");
4552 printf("-C Disable use of CAS\n");
4553 printf("-b Set the backlog queue limit (default: 1024)\n");
4554 printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
4555 printf("-I Override the size of each slab page. Adjusts max item size\n"
4556 " (default: 1mb, min: 1k, max: 128m)\n");
4557 #ifdef ENABLE_SASL
4558 printf("-S Turn on Sasl authentication\n");
4559 #endif
4560 printf("-o Comma separated list of extended or experimental options\n"
4561 " - (EXPERIMENTAL) maxconns_fast: immediately close new\n"
4562 " connections if over maxconns limit\n"
4563 " - hashpower: An integer multiplier for how large the hash\n"
4564 " table should be. Can be grown at runtime if not big enough.\n"
4565 " Set this based on \"STAT hash_power_level\" before a \n"
4566 " restart.\n"
4567 );
4568 return;
4569 }
4570
4571 static void usage_license(void) {
4572 printf(RPACKAGE " " RVERSION "\n\n");
4573 printf(
4574 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
4575 "All rights reserved.\n"
4576 "\n"
4577 "Redistribution and use in source and binary forms, with or without\n"
4578 "modification, are permitted provided that the following conditions are\n"
4579 "met:\n"
4580 "\n"
4581 " * Redistributions of source code must retain the above copyright\n"
4582 "notice, this list of conditions and the following disclaimer.\n"
4583 "\n"
4584 " * Redistributions in binary form must reproduce the above\n"
4585 "copyright notice, this list of conditions and the following disclaimer\n"
4586 "in the documentation and/or other materials provided with the\n"
4587 "distribution.\n"
4588 "\n"
4589 " * Neither the name of the Danga Interactive nor the names of its\n"
4590 "contributors may be used to endorse or promote products derived from\n"
4591 "this software without specific prior written permission.\n"
4592 "\n"
4593 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
4594 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
4595 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
4596 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
4597 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
4598 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
4599 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4600 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4601 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4602 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
4603 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4604 "\n"
4605 "\n"
4606 "This product includes software developed by Niels Provos.\n"
4607 "\n"
4608 "[ libevent ]\n"
4609 "\n"
4610 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
4611 "All rights reserved.\n"
4612 "\n"
4613 "Redistribution and use in source and binary forms, with or without\n"
4614 "modification, are permitted provided that the following conditions\n"
4615 "are met:\n"
4616 "1. Redistributions of source code must retain the above copyright\n"
4617 " notice, this list of conditions and the following disclaimer.\n"
4618 "2. Redistributions in binary form must reproduce the above copyright\n"
4619 " notice, this list of conditions and the following disclaimer in the\n"
4620 " documentation and/or other materials provided with the distribution.\n"
4621 "3. All advertising materials mentioning features or use of this software\n"
4622 " must display the following acknowledgement:\n"
4623 " This product includes software developed by Niels Provos.\n"
4624 "4. The name of the author may not be used to endorse or promote products\n"
4625 " derived from this software without specific prior written permission.\n"
4626 "\n"
4627 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
4628 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
4629 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
4630 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
4631 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
4632 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4633 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4634 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4635 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
4636 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4637 );
4638
4639 return;
4640 }
4641
4642 static void save_pid(const char *pid_file) {
4643 FILE *fp;
4644 if (access(pid_file, F_OK) == 0) {
4645 if ((fp = fopen(pid_file, "r")) != NULL) {
4646 char buffer[1024];
4647 if (fgets(buffer, sizeof(buffer), fp) != NULL) {
4648 unsigned int pid;
4649 if (safe_strtoul(buffer, &pid) && kill((pid_t)pid, 0) == 0) {
4650 fprintf(stderr, "WARNING: The pid file contained the following (running) pid: %u\n", pid);
4651 }
4652 }
4653 fclose(fp);
4654 }
4655 }
4656
4657 if ((fp = fopen(pid_file, "w")) == NULL) {
4658 vperror("Could not open the pid file %s for writing", pid_file);
4659 return;
4660 }
4661
4662 fprintf(fp,"%ld\n", (long)getpid());
4663 if (fclose(fp) == -1) {
4664 vperror("Could not close the pid file %s", pid_file);
4665 }
4666 }
4667
4668 static void remove_pidfile(const char *pid_file) {
4669 if (pid_file == NULL)
4670 return;
4671
4672 if (unlink(pid_file) != 0) {
4673 vperror("Could not remove the pid file %s", pid_file);
4674 }
4675
4676 }
4677
4678 static void sig_handler(const int sig) {
4679 printf("SIGINT handled.\n");
4680 exit(EXIT_SUCCESS);
4681 }
4682
4683 #ifndef HAVE_SIGIGNORE
4684 static int sigignore(int sig) {
4685 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
4686
4687 if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
4688 return -1;
4689 }
4690 return 0;
4691 }
4692 #endif
4693
4694
4695 /*
4696 * On systems that supports multiple page sizes we may reduce the
4697 * number of TLB-misses by using the biggest available page size
4698 */
4699 static int enable_large_pages(void) {
4700 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
4701 int ret = -1;
4702 size_t sizes[32];
4703 int avail = getpagesizes(sizes, 32);
4704 if (avail != -1) {
4705 size_t max = sizes[0];
4706 struct memcntl_mha arg = {0};
4707 int ii;
4708
4709 for (ii = 1; ii < avail; ++ii) {
4710 if (max < sizes[ii]) {
4711 max = sizes[ii];
4712 }
4713 }
4714
4715 arg.mha_flags = 0;
4716 arg.mha_pagesize = max;
4717 arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
4718
4719 if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
4720 fprintf(stderr, "Failed to set large pages: %s\n",
4721 strerror(errno));
4722 fprintf(stderr, "Will use default page size\n");
4723 } else {
4724 ret = 0;
4725 }
4726 } else {
4727 fprintf(stderr, "Failed to get supported pagesizes: %s\n",
4728 strerror(errno));
4729 fprintf(stderr, "Will use default page size\n");
4730 }
4731
4732 return ret;
4733 #else
4734 return 0;
4735 #endif
4736 }
4737
4738 /**
4739 * Do basic sanity check of the runtime environment
4740 * @return true if no errors found, false if we can't use this env
4741 */
4742 static bool sanitycheck(void) {
4743 /* One of our biggest problems is old and bogus libevents */
4744 const char *ever = event_get_version();
4745 if (ever != NULL) {
4746 if (strncmp(ever, "1.", 2) == 0) {
4747 /* Require at least 1.3 (that's still a couple of years old) */
4748 if ((ever[2] == '1' || ever[2] == '2') && !isdigit(ever[3])) {
4749 fprintf(stderr, "You are using libevent %s.\nPlease upgrade to"
4750 " a more recent version (1.3 or newer)\n",
4751 event_get_version());
4752 return false;
4753 }
4754 }
4755 }
4756
4757 return true;
4758 }
4759
4760 int main (int argc, char **argv) {
4761 int c;
4762 bool lock_memory = false;
4763 bool do_daemonize = false;
4764 bool preallocate = false;
4765 int maxcore = 0;
4766 char *username = NULL;
4767 char *pid_file = NULL;
4768 struct passwd *pw;
4769 struct rlimit rlim;
4770 char unit = '\0';
4771 int size_max = 0;
4772 int retval = EXIT_SUCCESS;
4773 /* listening sockets */
4774 static int *l_socket = NULL;
4775
4776 /* udp socket */
4777 static int *u_socket = NULL;
4778 bool protocol_specified = false;
4779 bool tcp_specified = false;
4780 bool udp_specified = false;
4781
4782 char *subopts;
4783 char *subopts_value;
4784 enum {
4785 MAXCONNS_FAST = 0,
4786 HASHPOWER_INIT,
4787 SLAB_REASSIGN,
4788 SLAB_AUTOMOVE
4789 };
4790 char *const subopts_tokens[] = {
4791 [MAXCONNS_FAST] = (char*)"maxconns_fast",
4792 [HASHPOWER_INIT] = (char*)"hashpower",
4793 [SLAB_REASSIGN] = (char*)"slab_reassign",
4794 [SLAB_AUTOMOVE] = (char*)"slab_automove",
4795 NULL
4796 };
4797
4798 if (!sanitycheck()) {
4799 return EX_OSERR;
4800 }
4801
4802 /* handle SIGINT */
4803 signal(SIGINT, sig_handler);
4804
4805 /* init settings */
4806 settings_init();
4807
4808 /* set stderr non-buffering (for running under, say, daemontools) */
4809 setbuf(stderr, NULL);
4810
4811 /* process arguments */
4812 while (-1 != (c = getopt(argc, argv,
4813 "a:" /* access mask for unix socket */
4814 "p:" /* TCP port number to listen on */
4815 "s:" /* unix socket path to listen on */
4816 "U:" /* UDP port number to listen on */
4817 "m:" /* max memory to use for items in megabytes */
4818 "M" /* return error on memory exhausted */
4819 "c:" /* max simultaneous connections */
4820 "k" /* lock down all paged memory */
4821 "hi" /* help, licence info */
4822 "r" /* maximize core file limit */
4823 "v" /* verbose */
4824 "d" /* daemon mode */
4825 "l:" /* interface to listen on */
4826 "u:" /* user identity to run as */
4827 "P:" /* save PID in file */
4828 "f:" /* factor? */
4829 "n:" /* minimum space allocated for key+value+flags */
4830 "t:" /* threads */
4831 "D:" /* prefix delimiter? */
4832 "L" /* Large memory pages */
4833 "R:" /* max requests per event */
4834 "C" /* Disable use of CAS */
4835 "b:" /* backlog queue limit */
4836 "B:" /* Binding protocol */
4837 "I:" /* Max item size */
4838 "S" /* Sasl ON */
4839 "o:" /* Extended generic options */
4840 ))) {
4841 switch (c) {
4842 case 'a':
4843 /* access for unix domain socket, as octal mask (like chmod)*/
4844 settings.access= strtol(optarg,NULL,8);
4845 break;
4846
4847 case 'U':
4848 settings.udpport = atoi(optarg);
4849 udp_specified = true;
4850 break;
4851 case 'p':
4852 settings.port = atoi(optarg);
4853 tcp_specified = true;
4854 break;
4855 case 's':
4856 settings.socketpath = optarg;
4857 break;
4858 case 'm':
4859 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
4860 break;
4861 case 'M':
4862 settings.evict_to_free = 0;
4863 break;
4864 case 'c':
4865 settings.maxconns = atoi(optarg);
4866 break;
4867 case 'h':
4868 usage();
4869 exit(EXIT_SUCCESS);
4870 case 'i':
4871 usage_license();
4872 exit(EXIT_SUCCESS);
4873 case 'k':
4874 lock_memory = true;
4875 break;
4876 case 'v':
4877 settings.verbose++;
4878 break;
4879 case 'l':
4880 if (settings.inter != NULL) {
4881 size_t len = strlen(settings.inter) + strlen(optarg) + 2;
4882 char *p = malloc(len);
4883 if (p == NULL) {
4884 fprintf(stderr, "Failed to allocate memory\n");
4885 return 1;
4886 }
4887 snprintf(p, len, "%s,%s", settings.inter, optarg);
4888 free(settings.inter);
4889 settings.inter = p;
4890 } else {
4891 settings.inter= strdup(optarg);
4892 }
4893 break;
4894 case 'd':
4895 do_daemonize = true;
4896 break;
4897 case 'r':
4898 maxcore = 1;
4899 break;
4900 case 'R':
4901 settings.reqs_per_event = atoi(optarg);
4902 if (settings.reqs_per_event == 0) {
4903 fprintf(stderr, "Number of requests per event must be greater than 0\n");
4904 return 1;
4905 }
4906 break;
4907 case 'u':
4908 username = optarg;
4909 break;
4910 case 'P':
4911 pid_file = optarg;
4912 break;
4913 case 'f':
4914 settings.factor = atof(optarg);
4915 if (settings.factor <= 1.0) {
4916 fprintf(stderr, "Factor must be greater than 1\n");
4917 return 1;
4918 }
4919 break;
4920 case 'n':
4921 settings.chunk_size = atoi(optarg);
4922 if (settings.chunk_size == 0) {
4923 fprintf(stderr, "Chunk size must be greater than 0\n");
4924 return 1;
4925 }
4926 break;
4927 case 't':
4928 settings.num_threads = atoi(optarg);
4929 if (settings.num_threads <= 0) {
4930 fprintf(stderr, "Number of threads must be greater than 0\n");
4931 return 1;
4932 }
4933 /* There're other problems when you get above 64 threads.
4934 * In the future we should portably detect # of cores for the
4935 * default.
4936 */
4937 if (settings.num_threads > 64) {
4938 fprintf(stderr, "WARNING: Setting a high number of worker"
4939 "threads is not recommended.\n"
4940 " Set this value to the number of cores in"
4941 " your machine or less.\n");
4942 }
4943 break;
4944 case 'D':
4945 if (! optarg || ! optarg[0]) {
4946 fprintf(stderr, "No delimiter specified\n");
4947 return 1;
4948 }
4949 settings.prefix_delimiter = optarg[0];
4950 settings.detail_enabled = 1;
4951 break;
4952 case 'L' :
4953 if (enable_large_pages() == 0) {
4954 preallocate = true;
4955 }
4956 break;
4957 case 'C' :
4958 settings.use_cas = false;
4959 break;
4960 case 'b' :
4961 settings.backlog = atoi(optarg);
4962 break;
4963 case 'B':
4964 protocol_specified = true;
4965 if (strcmp(optarg, "auto") == 0) {
4966 settings.binding_protocol = negotiating_prot;
4967 } else if (strcmp(optarg, "binary") == 0) {
4968 settings.binding_protocol = binary_prot;
4969 } else if (strcmp(optarg, "ascii") == 0) {
4970 settings.binding_protocol = ascii_prot;
4971 } else {
4972 fprintf(stderr, "Invalid value for binding protocol: %s\n"
4973 " -- should be one of auto, binary, or ascii\n", optarg);
4974 exit(EX_USAGE);
4975 }
4976 break;
4977 case 'I':
4978 unit = optarg[strlen(optarg)-1];
4979 if (unit == 'k' || unit == 'm' ||
4980 unit == 'K' || unit == 'M') {
4981 optarg[strlen(optarg)-1] = '\0';
4982 size_max = atoi(optarg);
4983 if (unit == 'k' || unit == 'K')
4984 size_max *= 1024;
4985 if (unit == 'm' || unit == 'M')
4986 size_max *= 1024 * 1024;
4987 settings.item_size_max = size_max;
4988 } else {
4989 settings.item_size_max = atoi(optarg);
4990 }
4991 if (settings.item_size_max < 1024) {
4992 fprintf(stderr, "Item max size cannot be less than 1024 bytes.\n");
4993 return 1;
4994 }
4995 if (settings.item_size_max > 1024 * 1024 * 128) {
4996 fprintf(stderr, "Cannot set item size limit higher than 128 mb.\n");
4997 return 1;
4998 }
4999 if (settings.item_size_max > 1024 * 1024) {
5000 fprintf(stderr, "WARNING: Setting item max size above 1MB is not"
5001 " recommended!\n"
5002 " Raising this limit increases the minimum memory requirements\n"
5003 " and will decrease your memory efficiency.\n"
5004 );
5005 }
5006 break;
5007 case 'S': /* set Sasl authentication to true. Default is false */
5008 #ifndef ENABLE_SASL
5009 fprintf(stderr, "This server is not built with SASL support.\n");
5010 exit(EX_USAGE);
5011 #endif
5012 settings.sasl = true;
5013 break;
5014 case 'o': /* It's sub-opts time! */
5015 subopts = optarg;
5016
5017 while (*subopts != '\0') {
5018
5019 switch (getsubopt(&subopts, subopts_tokens, &subopts_value)) {
5020 case MAXCONNS_FAST:
5021 settings.maxconns_fast = true;
5022 break;
5023 case HASHPOWER_INIT:
5024 if (subopts_value == NULL) {
5025 fprintf(stderr, "Missing numeric argument for hashpower\n");
5026 return 1;
5027 }
5028 settings.hashpower_init = atoi(subopts_value);
5029 if (settings.hashpower_init < 12) {
5030 fprintf(stderr, "Initial hashtable multiplier of %d is too low\n",
5031 settings.hashpower_init);
5032 return 1;
5033 } else if (settings.hashpower_init > 64) {
5034 fprintf(stderr, "Initial hashtable multiplier of %d is too high\n"
5035 "Choose a value based on \"STAT hash_power_level\" from a running instance\n",
5036 settings.hashpower_init);
5037 return 1;
5038 }
5039 break;
5040 case SLAB_REASSIGN:
5041 settings.slab_reassign = true;
5042 break;
5043 case SLAB_AUTOMOVE:
5044 settings.slab_automove = true;
5045 break;
5046 default:
5047 printf("Illegal suboption \"%s\"\n", subopts_value);
5048 return 1;
5049 }
5050
5051 }
5052 break;
5053 default:
5054 fprintf(stderr, "Illegal argument \"%c\"\n", c);
5055 return 1;
5056 }
5057 }
5058
5059 /*
5060 * Use one workerthread to serve each UDP port if the user specified
5061 * multiple ports
5062 */
5063 if (settings.inter != NULL && strchr(settings.inter, ',')) {
5064 settings.num_threads_per_udp = 1;
5065 } else {
5066 settings.num_threads_per_udp = settings.num_threads;
5067 }
5068
5069 if (settings.sasl) {
5070 if (!protocol_specified) {
5071 settings.binding_protocol = binary_prot;
5072 } else {
5073 if (settings.binding_protocol != binary_prot) {
5074 fprintf(stderr, "ERROR: You cannot allow the ASCII protocol while using SASL.\n");
5075 exit(EX_USAGE);
5076 }
5077 }
5078 }
5079
5080 if (tcp_specified && !udp_specified) {
5081 settings.udpport = settings.port;
5082 } else if (udp_specified && !tcp_specified) {
5083 settings.port = settings.udpport;
5084 }
5085
5086 if (maxcore != 0) {
5087 struct rlimit rlim_new;
5088 /*
5089 * First try raising to infinity; if that fails, try bringing
5090 * the soft limit to the hard.
5091 */
5092 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
5093 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
5094 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
5095 /* failed. try raising just to the old max */
5096 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
5097 (void)setrlimit(RLIMIT_CORE, &rlim_new);
5098 }
5099 }
5100 /*
5101 * getrlimit again to see what we ended up with. Only fail if
5102 * the soft limit ends up 0, because then no core files will be
5103 * created at all.
5104 */
5105
5106 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
5107 fprintf(stderr, "failed to ensure corefile creation\n");
5108 exit(EX_OSERR);
5109 }
5110 }
5111
5112 /*
5113 * If needed, increase rlimits to allow as many connections
5114 * as needed.
5115 */
5116
5117 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
5118 fprintf(stderr, "failed to getrlimit number of files\n");
5119 exit(EX_OSERR);
5120 } else {
5121 rlim.rlim_cur = settings.maxconns;
5122 rlim.rlim_max = settings.maxconns;
5123 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
5124 fprintf(stderr, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n");
5125 exit(EX_OSERR);
5126 }
5127 }
5128
5129 /* lose root privileges if we have them */
5130 if (getuid() == 0 || geteuid() == 0) {
5131 if (username == 0 || *username == '\0') {
5132 fprintf(stderr, "can't run as root without the -u switch\n");
5133 exit(EX_USAGE);
5134 }
5135 if ((pw = getpwnam(username)) == 0) {
5136 fprintf(stderr, "can't find the user %s to switch to\n", username);
5137 exit(EX_NOUSER);
5138 }
5139 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
5140 fprintf(stderr, "failed to assume identity of user %s\n", username);
5141 exit(EX_OSERR);
5142 }
5143 }
5144
5145 /* Initialize Sasl if -S was specified */
5146 if (settings.sasl) {
5147 init_sasl();
5148 }
5149
5150 /* daemonize if requested */
5151 /* if we want to ensure our ability to dump core, don't chdir to / */
5152 if (do_daemonize) {
5153 if (sigignore(SIGHUP) == -1) {
5154 perror("Failed to ignore SIGHUP");
5155 }
5156 if (daemonize(maxcore, settings.verbose) == -1) {
5157 fprintf(stderr, "failed to daemon() in order to daemonize\n");
5158 exit(EXIT_FAILURE);
5159 }
5160 }
5161
5162 /* lock paged memory if needed */
5163 if (lock_memory) {
5164 #ifdef HAVE_MLOCKALL
5165 int res = mlockall(MCL_CURRENT | MCL_FUTURE);
5166 if (res != 0) {
5167 fprintf(stderr, "warning: -k invalid, mlockall() failed: %s\n",
5168 strerror(errno));
5169 }
5170 #else
5171 fprintf(stderr, "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
5172 #endif
5173 }
5174
5175 /* initialize main thread libevent instance */
5176 main_base = event_init();
5177
5178 /* initialize other stuff */
5179 stats_init();
5180 assoc_init(settings.hashpower_init);
5181 conn_init();
5182 slabs_init(settings.maxbytes, settings.factor, preallocate);
5183
5184 /*
5185 * ignore SIGPIPE signals; we can use errno == EPIPE if we
5186 * need that information
5187 */
5188 if (sigignore(SIGPIPE) == -1) {
5189 perror("failed to ignore SIGPIPE; sigaction");
5190 exit(EX_OSERR);
5191 }
5192 /* start up worker threads if MT mode */
5193 thread_init(settings.num_threads, main_base);
5194
5195 if (start_assoc_maintenance_thread() == -1) {
5196 exit(EXIT_FAILURE);
5197 }
5198
5199 if (settings.slab_reassign &&
5200 start_slab_maintenance_thread() == -1) {
5201 exit(EXIT_FAILURE);
5202 }
5203
5204 /* initialise clock event */
5205 clock_handler(0, 0, 0);
5206
5207 /* create unix mode sockets after dropping privileges */
5208 if (settings.socketpath != NULL) {
5209 errno = 0;
5210 if (server_socket_unix(settings.socketpath,settings.access)) {
5211 vperror("failed to listen on UNIX socket: %s", settings.socketpath);
5212 exit(EX_OSERR);
5213 }
5214 }
5215
5216 /* create the listening socket, bind it, and init */
5217 if (settings.socketpath == NULL) {
5218 const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
5219 char temp_portnumber_filename[PATH_MAX];
5220 FILE *portnumber_file = NULL;
5221
5222 if (portnumber_filename != NULL) {
5223 snprintf(temp_portnumber_filename,
5224 sizeof(temp_portnumber_filename),
5225 "%s.lck", portnumber_filename);
5226
5227 portnumber_file = fopen(temp_portnumber_filename, "a");
5228 if (portnumber_file == NULL) {
5229 fprintf(stderr, "Failed to open \"%s\": %s\n",
5230 temp_portnumber_filename, strerror(errno));
5231 }
5232 }
5233
5234 errno = 0;
5235 if (settings.port && server_sockets(settings.port, tcp_transport,
5236 portnumber_file)) {
5237 vperror("failed to listen on TCP port %d", settings.port);
5238 exit(EX_OSERR);
5239 }
5240
5241 /*
5242 * initialization order: first create the listening sockets
5243 * (may need root on low ports), then drop root if needed,
5244 * then daemonise if needed, then init libevent (in some cases
5245 * descriptors created by libevent wouldn't survive forking).
5246 */
5247
5248 /* create the UDP listening socket and bind it */
5249 errno = 0;
5250 if (settings.udpport && server_sockets(settings.udpport, udp_transport,
5251 portnumber_file)) {
5252 vperror("failed to listen on UDP port %d", settings.udpport);
5253 exit(EX_OSERR);
5254 }
5255
5256 if (portnumber_file) {
5257 fclose(portnumber_file);
5258 rename(temp_portnumber_filename, portnumber_filename);
5259 }
5260 }
5261
5262 /* Give the sockets a moment to open. I know this is dumb, but the error
5263 * is only an advisory.
5264 */
5265 usleep(1000);
5266 if (stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
5267 fprintf(stderr, "Maxconns setting is too low, use -c to increase.\n");
5268 exit(EXIT_FAILURE);
5269 }
5270
5271 if (pid_file != NULL) {
5272 save_pid(pid_file);
5273 }
5274
5275 /* Drop privileges no longer needed */
5276 drop_privileges();
5277
5278 /* enter the event loop */
5279 if (event_base_loop(main_base, 0) != 0) {
5280 retval = EXIT_FAILURE;
5281 }
5282
5283 stop_assoc_maintenance_thread();
5284
5285 /* remove the PID file if we're a daemon */
5286 #if 0
5287 if (do_daemonize)
5288 remove_pidfile(pid_file);
5289 #endif
5290 /* Clean up strdup() call for bind() address */
5291 if (settings.inter)
5292 free(settings.inter);
5293 if (l_socket)
5294 free(l_socket);
5295 if (u_socket)
5296 free(u_socket);
5297
5298 return retval;
5299 }