Update memcached, fix style in test cases.
[m6w6/libmemcached] / memcached / memcached.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * memcached - memory caching daemon
4 *
5 * http://www.danga.com/memcached/
6 *
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
8 *
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
11 *
12 * Authors:
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
15 */
16 #include "memcached.h"
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <signal.h>
21 #include <sys/resource.h>
22 #include <sys/uio.h>
23 #include <ctype.h>
24 #include <stdarg.h>
25
26 /* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h. */
28 #ifndef _P1003_1B_VISIBLE
29 #define _P1003_1B_VISIBLE
30 #endif
31 /* need this to get IOV_MAX on some platforms. */
32 #ifndef __need_IOV_MAX
33 #define __need_IOV_MAX
34 #endif
35 #include <pwd.h>
36 #include <sys/mman.h>
37 #include <fcntl.h>
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <time.h>
45 #include <assert.h>
46 #include <limits.h>
47 #include <sysexits.h>
48 #include <stddef.h>
49
50 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
51 #ifndef IOV_MAX
52 #if defined(__FreeBSD__) || defined(__APPLE__)
53 # define IOV_MAX 1024
54 #endif
55 #endif
56
57 /*
58 * forward declarations
59 */
60 static void drive_machine(conn *c);
61 static int new_socket(struct addrinfo *ai);
62 static int try_read_command(conn *c);
63
64 enum try_read_result {
65 READ_DATA_RECEIVED,
66 READ_NO_DATA_RECEIVED,
67 READ_ERROR, /** an error occured (on the socket) (or client closed connection) */
68 READ_MEMORY_ERROR /** failed to allocate more memory */
69 };
70
71 static enum try_read_result try_read_network(conn *c);
72 static enum try_read_result try_read_udp(conn *c);
73
74 static void conn_set_state(conn *c, enum conn_states state);
75
76 /* stats */
77 static void stats_init(void);
78 static void server_stats(ADD_STAT add_stats, conn *c);
79 static void process_stat_settings(ADD_STAT add_stats, void *c);
80
81
82 /* defaults */
83 static void settings_init(void);
84
85 /* event handling, network IO */
86 static void event_handler(const int fd, const short which, void *arg);
87 static void conn_close(conn *c);
88 static void conn_init(void);
89 static bool update_event(conn *c, const int new_flags);
90 static void complete_nread(conn *c);
91 static void process_command(conn *c, char *command);
92 static void write_and_free(conn *c, char *buf, int bytes);
93 static int ensure_iov_space(conn *c);
94 static int add_iov(conn *c, const void *buf, int len);
95 static int add_msghdr(conn *c);
96
97
98 static void conn_free(conn *c);
99
100 /** exported globals **/
101 struct stats stats;
102 struct settings settings;
103 time_t process_started; /* when the process was started */
104
105 struct slab_rebalance slab_rebal;
106 volatile int slab_rebalance_signal;
107
108 /** file scope variables **/
109 static conn *listen_conn = NULL;
110 static struct event_base *main_base;
111
112 enum transmit_result {
113 TRANSMIT_COMPLETE, /** All done writing. */
114 TRANSMIT_INCOMPLETE, /** More data remaining to write. */
115 TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
116 TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
117 };
118
119 static enum transmit_result transmit(conn *c);
120
121 /* This reduces the latency without adding lots of extra wiring to be able to
122 * notify the listener thread of when to listen again.
123 * Also, the clock timer could be broken out into its own thread and we
124 * can block the listener via a condition.
125 */
126 static volatile bool allow_new_conns = true;
127 static struct event maxconnsevent;
128 #ifndef __INTEL_COMPILER
129 #pragma GCC diagnostic ignored "-Wunused-parameter"
130 #endif
131 static void maxconns_handler(const int fd, const short which, void *arg) {
132 struct timeval t = {.tv_sec = 0, .tv_usec = 10000};
133
134 if (fd == -42 || allow_new_conns == false) {
135 /* reschedule in 10ms if we need to keep polling */
136 evtimer_set(&maxconnsevent, maxconns_handler, 0);
137 event_base_set(main_base, &maxconnsevent);
138 evtimer_add(&maxconnsevent, &t);
139 } else {
140 evtimer_del(&maxconnsevent);
141 accept_new_conns(true);
142 }
143 }
144
145 #define REALTIME_MAXDELTA 60*60*24*30
146
147 /*
148 * given time value that's either unix time or delta from current unix time, return
149 * unix time. Use the fact that delta can't exceed one month (and real time value can't
150 * be that low).
151 */
152 static rel_time_t realtime(const time_t exptime) {
153 /* no. of seconds in 30 days - largest possible delta exptime */
154
155 if (exptime == 0) return 0; /* 0 means never expire */
156
157 if (exptime > REALTIME_MAXDELTA) {
158 /* if item expiration is at/before the server started, give it an
159 expiration time of 1 second after the server started.
160 (because 0 means don't expire). without this, we'd
161 underflow and wrap around to some large value way in the
162 future, effectively making items expiring in the past
163 really expiring never */
164 if (exptime <= process_started)
165 return (rel_time_t)1;
166 return (rel_time_t)(exptime - process_started);
167 } else {
168 return (rel_time_t)(exptime + current_time);
169 }
170 }
171
172 static void stats_init(void) {
173 stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
174 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = stats.reclaimed = 0;
175 stats.touch_cmds = stats.touch_misses = stats.touch_hits = stats.rejected_conns = 0;
176 stats.curr_bytes = stats.listen_disabled_num = 0;
177 stats.hash_power_level = stats.hash_bytes = stats.hash_is_expanding = 0;
178 stats.expired_unfetched = stats.evicted_unfetched = 0;
179 stats.slabs_moved = 0;
180 stats.accepting_conns = true; /* assuming we start in this state. */
181 stats.slab_reassign_running = false;
182
183 /* make the time we started always be 2 seconds before we really
184 did, so time(0) - time.started is never zero. if so, things
185 like 'settings.oldest_live' which act as booleans as well as
186 values are now false in boolean context... */
187 process_started = time(0) - 2;
188 stats_prefix_init();
189 }
190
191 static void stats_reset(void) {
192 STATS_LOCK();
193 stats.total_items = stats.total_conns = 0;
194 stats.rejected_conns = 0;
195 stats.evictions = 0;
196 stats.reclaimed = 0;
197 stats.listen_disabled_num = 0;
198 stats_prefix_clear();
199 STATS_UNLOCK();
200 threadlocal_stats_reset();
201 item_stats_reset();
202 }
203
204 static void settings_init(void) {
205 settings.use_cas = true;
206 settings.access = 0700;
207 settings.port = 11211;
208 settings.udpport = 11211;
209 /* By default this string should be NULL for getaddrinfo() */
210 settings.inter = NULL;
211 settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
212 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
213 settings.verbose = 0;
214 settings.oldest_live = 0;
215 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
216 settings.socketpath = NULL; /* by default, not using a unix socket */
217 settings.factor = 1.25;
218 settings.chunk_size = 48; /* space for a modest key and value */
219 settings.num_threads = 4; /* N workers */
220 settings.num_threads_per_udp = 0;
221 settings.prefix_delimiter = ':';
222 settings.detail_enabled = 0;
223 settings.reqs_per_event = 20;
224 settings.backlog = 1024;
225 settings.binding_protocol = negotiating_prot;
226 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
227 settings.maxconns_fast = false;
228 settings.hashpower_init = 0;
229 settings.slab_reassign = false;
230 settings.slab_automove = false;
231 }
232
233 /*
234 * Adds a message header to a connection.
235 *
236 * Returns 0 on success, -1 on out-of-memory.
237 */
238 static int add_msghdr(conn *c)
239 {
240 struct msghdr *msg;
241
242 assert(c != NULL);
243
244 if (c->msgsize == c->msgused) {
245 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
246 if (! msg)
247 return -1;
248 c->msglist = msg;
249 c->msgsize *= 2;
250 }
251
252 msg = c->msglist + c->msgused;
253
254 /* this wipes msg_iovlen, msg_control, msg_controllen, and
255 msg_flags, the last 3 of which aren't defined on solaris: */
256 memset(msg, 0, sizeof(struct msghdr));
257
258 msg->msg_iov = &c->iov[c->iovused];
259
260 if (c->request_addr_size > 0) {
261 msg->msg_name = &c->request_addr;
262 msg->msg_namelen = c->request_addr_size;
263 }
264
265 c->msgbytes = 0;
266 c->msgused++;
267
268 if (IS_UDP(c->transport)) {
269 /* Leave room for the UDP header, which we'll fill in later. */
270 return add_iov(c, NULL, UDP_HEADER_SIZE);
271 }
272
273 return 0;
274 }
275
276
277 /*
278 * Free list management for connections.
279 */
280
281 static conn **freeconns;
282 static int freetotal;
283 static int freecurr;
284 /* Lock for connection freelist */
285 static pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
286
287
288 static void conn_init(void) {
289 freetotal = 200;
290 freecurr = 0;
291 if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
292 fprintf(stderr, "Failed to allocate connection structures\n");
293 }
294 return;
295 }
296
297 /*
298 * Returns a connection from the freelist, if any.
299 */
300 conn *conn_from_freelist() {
301 conn *c;
302
303 pthread_mutex_lock(&conn_lock);
304 if (freecurr > 0) {
305 c = freeconns[--freecurr];
306 } else {
307 c = NULL;
308 }
309 pthread_mutex_unlock(&conn_lock);
310
311 return c;
312 }
313
314 /*
315 * Adds a connection to the freelist. 0 = success.
316 */
317 bool conn_add_to_freelist(conn *c) {
318 bool ret = true;
319 pthread_mutex_lock(&conn_lock);
320 if (freecurr < freetotal) {
321 freeconns[freecurr++] = c;
322 ret = false;
323 } else {
324 /* try to enlarge free connections array */
325 size_t newsize = freetotal * 2;
326 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
327 if (new_freeconns) {
328 freetotal = newsize;
329 freeconns = new_freeconns;
330 freeconns[freecurr++] = c;
331 ret = false;
332 }
333 }
334 pthread_mutex_unlock(&conn_lock);
335 return ret;
336 }
337
338 static const char *prot_text(enum protocol prot) {
339 const char *rv = "unknown";
340 switch(prot) {
341 case ascii_prot:
342 rv = "ascii";
343 break;
344 case binary_prot:
345 rv = "binary";
346 break;
347 case negotiating_prot:
348 rv = "auto-negotiate";
349 break;
350 default:
351 abort();
352 }
353 return rv;
354 }
355
356 conn *conn_new(const int sfd, enum conn_states init_state,
357 const int event_flags,
358 const int read_buffer_size, enum network_transport transport,
359 struct event_base *base) {
360 conn *c = conn_from_freelist();
361
362 if (NULL == c) {
363 if (!(c = (conn *)calloc(1, sizeof(conn)))) {
364 fprintf(stderr, "calloc()\n");
365 return NULL;
366 }
367 MEMCACHED_CONN_CREATE(c);
368
369 c->rbuf = c->wbuf = 0;
370 c->ilist = 0;
371 c->suffixlist = 0;
372 c->iov = 0;
373 c->msglist = 0;
374 c->hdrbuf = 0;
375
376 c->rsize = read_buffer_size;
377 c->wsize = DATA_BUFFER_SIZE;
378 c->isize = ITEM_LIST_INITIAL;
379 c->suffixsize = SUFFIX_LIST_INITIAL;
380 c->iovsize = IOV_LIST_INITIAL;
381 c->msgsize = MSG_LIST_INITIAL;
382 c->hdrsize = 0;
383
384 c->rbuf = (char *)malloc((size_t)c->rsize);
385 c->wbuf = (char *)malloc((size_t)c->wsize);
386 c->ilist = (item **)malloc(sizeof(item *) * c->isize);
387 c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
388 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
389 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
390
391 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
392 c->msglist == 0 || c->suffixlist == 0) {
393 conn_free(c);
394 fprintf(stderr, "malloc()\n");
395 return NULL;
396 }
397
398 STATS_LOCK();
399 stats.conn_structs++;
400 STATS_UNLOCK();
401 }
402
403 c->transport = transport;
404 c->protocol = settings.binding_protocol;
405
406 /* unix socket mode doesn't need this, so zeroed out. but why
407 * is this done for every command? presumably for UDP
408 * mode. */
409 if (!settings.socketpath) {
410 c->request_addr_size = sizeof(c->request_addr);
411 } else {
412 c->request_addr_size = 0;
413 }
414
415 if (settings.verbose > 1) {
416 if (init_state == conn_listening) {
417 fprintf(stderr, "<%d server listening (%s)\n", sfd,
418 prot_text(c->protocol));
419 } else if (IS_UDP(transport)) {
420 fprintf(stderr, "<%d server listening (udp)\n", sfd);
421 } else if (c->protocol == negotiating_prot) {
422 fprintf(stderr, "<%d new auto-negotiating client connection\n",
423 sfd);
424 } else if (c->protocol == ascii_prot) {
425 fprintf(stderr, "<%d new ascii client connection.\n", sfd);
426 } else if (c->protocol == binary_prot) {
427 fprintf(stderr, "<%d new binary client connection.\n", sfd);
428 } else {
429 fprintf(stderr, "<%d new unknown (%d) client connection\n",
430 sfd, c->protocol);
431 assert(false);
432 }
433 }
434
435 c->sfd = sfd;
436 c->state = init_state;
437 c->rlbytes = 0;
438 c->cmd = -1;
439 c->rbytes = c->wbytes = 0;
440 c->wcurr = c->wbuf;
441 c->rcurr = c->rbuf;
442 c->ritem = 0;
443 c->icurr = c->ilist;
444 c->suffixcurr = c->suffixlist;
445 c->ileft = 0;
446 c->suffixleft = 0;
447 c->iovused = 0;
448 c->msgcurr = 0;
449 c->msgused = 0;
450
451 c->write_and_go = init_state;
452 c->write_and_free = 0;
453 c->item = 0;
454
455 c->noreply = false;
456
457 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
458 event_base_set(base, &c->event);
459 c->ev_flags = event_flags;
460
461 if (event_add(&c->event, 0) == -1) {
462 if (conn_add_to_freelist(c)) {
463 conn_free(c);
464 }
465 perror("event_add");
466 return NULL;
467 }
468
469 STATS_LOCK();
470 stats.curr_conns++;
471 stats.total_conns++;
472 STATS_UNLOCK();
473
474 MEMCACHED_CONN_ALLOCATE(c->sfd);
475
476 return c;
477 }
478
479 static void conn_cleanup(conn *c) {
480 assert(c != NULL);
481
482 if (c->item) {
483 item_remove(c->item);
484 c->item = 0;
485 }
486
487 if (c->ileft != 0) {
488 for (; c->ileft > 0; c->ileft--,c->icurr++) {
489 item_remove(*(c->icurr));
490 }
491 }
492
493 if (c->suffixleft != 0) {
494 for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
495 cache_free(c->thread->suffix_cache, *(c->suffixcurr));
496 }
497 }
498
499 if (c->write_and_free) {
500 free(c->write_and_free);
501 c->write_and_free = 0;
502 }
503
504 if (c->sasl_conn) {
505 assert(settings.sasl);
506 sasl_dispose(&c->sasl_conn);
507 c->sasl_conn = NULL;
508 }
509
510 if (IS_UDP(c->transport)) {
511 conn_set_state(c, conn_read);
512 }
513 }
514
515 /*
516 * Frees a connection.
517 */
518 void conn_free(conn *c) {
519 if (c) {
520 MEMCACHED_CONN_DESTROY(c);
521 if (c->hdrbuf)
522 free(c->hdrbuf);
523 if (c->msglist)
524 free(c->msglist);
525 if (c->rbuf)
526 free(c->rbuf);
527 if (c->wbuf)
528 free(c->wbuf);
529 if (c->ilist)
530 free(c->ilist);
531 if (c->suffixlist)
532 free(c->suffixlist);
533 if (c->iov)
534 free(c->iov);
535 free(c);
536 }
537 }
538
539 static void conn_close(conn *c) {
540 assert(c != NULL);
541
542 /* delete the event, the socket and the conn */
543 event_del(&c->event);
544
545 if (settings.verbose > 1)
546 fprintf(stderr, "<%d connection closed.\n", c->sfd);
547
548 MEMCACHED_CONN_RELEASE(c->sfd);
549 close(c->sfd);
550 pthread_mutex_lock(&conn_lock);
551 allow_new_conns = true;
552 pthread_mutex_unlock(&conn_lock);
553 conn_cleanup(c);
554
555 /* if the connection has big buffers, just free it */
556 if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
557 conn_free(c);
558 }
559
560 STATS_LOCK();
561 stats.curr_conns--;
562 STATS_UNLOCK();
563
564 return;
565 }
566
567 /*
568 * Shrinks a connection's buffers if they're too big. This prevents
569 * periodic large "get" requests from permanently chewing lots of server
570 * memory.
571 *
572 * This should only be called in between requests since it can wipe output
573 * buffers!
574 */
575 static void conn_shrink(conn *c) {
576 assert(c != NULL);
577
578 if (IS_UDP(c->transport))
579 return;
580
581 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
582 char *newbuf;
583
584 if (c->rcurr != c->rbuf)
585 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
586
587 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
588
589 if (newbuf) {
590 c->rbuf = newbuf;
591 c->rsize = DATA_BUFFER_SIZE;
592 }
593 /* TODO check other branch... */
594 c->rcurr = c->rbuf;
595 }
596
597 if (c->isize > ITEM_LIST_HIGHWAT) {
598 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
599 if (newbuf) {
600 c->ilist = newbuf;
601 c->isize = ITEM_LIST_INITIAL;
602 }
603 /* TODO check error condition? */
604 }
605
606 if (c->msgsize > MSG_LIST_HIGHWAT) {
607 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
608 if (newbuf) {
609 c->msglist = newbuf;
610 c->msgsize = MSG_LIST_INITIAL;
611 }
612 /* TODO check error condition? */
613 }
614
615 if (c->iovsize > IOV_LIST_HIGHWAT) {
616 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
617 if (newbuf) {
618 c->iov = newbuf;
619 c->iovsize = IOV_LIST_INITIAL;
620 }
621 /* TODO check return value */
622 }
623 }
624
625 /**
626 * Convert a state name to a human readable form.
627 */
628 static const char *state_text(enum conn_states state) {
629 const char* const statenames[] = { "conn_listening",
630 "conn_new_cmd",
631 "conn_waiting",
632 "conn_read",
633 "conn_parse_cmd",
634 "conn_write",
635 "conn_nread",
636 "conn_swallow",
637 "conn_closing",
638 "conn_mwrite" };
639 return statenames[state];
640 }
641
642 #ifndef __INTEL_COMPILER
643 #pragma GCC diagnostic ignored "-Wtype-limits"
644 #endif
645 /*
646 * Sets a connection's current state in the state machine. Any special
647 * processing that needs to happen on certain state transitions can
648 * happen here.
649 */
650 static void conn_set_state(conn *c, enum conn_states state) {
651 assert(c != NULL);
652 assert(state >= conn_listening && state < conn_max_state);
653
654 if (state != c->state) {
655 if (settings.verbose > 2) {
656 fprintf(stderr, "%d: going from %s to %s\n",
657 c->sfd, state_text(c->state),
658 state_text(state));
659 }
660
661 if (state == conn_write || state == conn_mwrite) {
662 MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
663 }
664 c->state = state;
665 }
666 }
667
668 /*
669 * Ensures that there is room for another struct iovec in a connection's
670 * iov list.
671 *
672 * Returns 0 on success, -1 on out-of-memory.
673 */
674 static int ensure_iov_space(conn *c) {
675 assert(c != NULL);
676
677 if (c->iovused >= c->iovsize) {
678 int i, iovnum;
679 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
680 (c->iovsize * 2) * sizeof(struct iovec));
681 if (! new_iov)
682 return -1;
683 c->iov = new_iov;
684 c->iovsize *= 2;
685
686 /* Point all the msghdr structures at the new list. */
687 for (i = 0, iovnum = 0; i < c->msgused; i++) {
688 c->msglist[i].msg_iov = &c->iov[iovnum];
689 iovnum += c->msglist[i].msg_iovlen;
690 }
691 }
692
693 return 0;
694 }
695
696
697 /*
698 * Adds data to the list of pending data that will be written out to a
699 * connection.
700 *
701 * Returns 0 on success, -1 on out-of-memory.
702 */
703
704 static int add_iov(conn *c, const void *buf, int len) {
705 struct msghdr *m;
706 int leftover;
707 bool limit_to_mtu;
708
709 assert(c != NULL);
710
711 do {
712 m = &c->msglist[c->msgused - 1];
713
714 /*
715 * Limit UDP packets, and the first payloads of TCP replies, to
716 * UDP_MAX_PAYLOAD_SIZE bytes.
717 */
718 limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
719
720 /* We may need to start a new msghdr if this one is full. */
721 if (m->msg_iovlen == IOV_MAX ||
722 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
723 add_msghdr(c);
724 m = &c->msglist[c->msgused - 1];
725 }
726
727 if (ensure_iov_space(c) != 0)
728 return -1;
729
730 /* If the fragment is too big to fit in the datagram, split it up */
731 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
732 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
733 len -= leftover;
734 } else {
735 leftover = 0;
736 }
737
738 m = &c->msglist[c->msgused - 1];
739 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
740 m->msg_iov[m->msg_iovlen].iov_len = len;
741
742 c->msgbytes += len;
743 c->iovused++;
744 m->msg_iovlen++;
745
746 buf = ((char *)buf) + len;
747 len = leftover;
748 } while (leftover > 0);
749
750 return 0;
751 }
752
753
754 /*
755 * Constructs a set of UDP headers and attaches them to the outgoing messages.
756 */
757 static int build_udp_headers(conn *c) {
758 int i;
759 unsigned char *hdr;
760
761 assert(c != NULL);
762
763 if (c->msgused > c->hdrsize) {
764 void *new_hdrbuf;
765 if (c->hdrbuf)
766 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
767 else
768 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
769 if (! new_hdrbuf)
770 return -1;
771 c->hdrbuf = (unsigned char *)new_hdrbuf;
772 c->hdrsize = c->msgused * 2;
773 }
774
775 hdr = c->hdrbuf;
776 for (i = 0; i < c->msgused; i++) {
777 c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
778 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
779 *hdr++ = c->request_id / 256;
780 *hdr++ = c->request_id % 256;
781 *hdr++ = i / 256;
782 *hdr++ = i % 256;
783 *hdr++ = c->msgused / 256;
784 *hdr++ = c->msgused % 256;
785 *hdr++ = 0;
786 *hdr++ = 0;
787 assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
788 }
789
790 return 0;
791 }
792
793
794 #ifndef __INTEL_COMPILER
795 #pragma GCC diagnostic ignored "-Wsign-compare"
796 #endif
797 static void out_string(conn *c, const char *str) {
798 size_t len;
799
800 assert(c != NULL);
801
802 if (c->noreply) {
803 if (settings.verbose > 1)
804 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
805 c->noreply = false;
806 conn_set_state(c, conn_new_cmd);
807 return;
808 }
809
810 if (settings.verbose > 1)
811 fprintf(stderr, ">%d %s\n", c->sfd, str);
812
813 /* Nuke a partial output... */
814 c->msgcurr = 0;
815 c->msgused = 0;
816 c->iovused = 0;
817 add_msghdr(c);
818
819 len = strlen(str);
820 if ((len + 2) > c->wsize) {
821 /* ought to be always enough. just fail for simplicity */
822 str = "SERVER_ERROR output line too long";
823 len = strlen(str);
824 }
825
826 memcpy(c->wbuf, str, len);
827 memcpy(c->wbuf + len, "\r\n", 2);
828 c->wbytes = len + 2;
829 c->wcurr = c->wbuf;
830
831 conn_set_state(c, conn_write);
832 c->write_and_go = conn_new_cmd;
833 return;
834 }
835
836 /*
837 * we get here after reading the value in set/add/replace commands. The command
838 * has been stored in c->cmd, and the item is ready in c->item.
839 */
840 static void complete_nread_ascii(conn *c) {
841 assert(c != NULL);
842
843 item *it = c->item;
844 int comm = c->cmd;
845 enum store_item_type ret;
846
847 pthread_mutex_lock(&c->thread->stats.mutex);
848 c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
849 pthread_mutex_unlock(&c->thread->stats.mutex);
850
851 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
852 out_string(c, "CLIENT_ERROR bad data chunk");
853 } else {
854 ret = store_item(it, comm, c);
855
856 #ifdef ENABLE_DTRACE
857 uint64_t cas = ITEM_get_cas(it);
858 switch (c->cmd) {
859 case NREAD_ADD:
860 MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
861 (ret == 1) ? it->nbytes : -1, cas);
862 break;
863 case NREAD_REPLACE:
864 MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
865 (ret == 1) ? it->nbytes : -1, cas);
866 break;
867 case NREAD_APPEND:
868 MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
869 (ret == 1) ? it->nbytes : -1, cas);
870 break;
871 case NREAD_PREPEND:
872 MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
873 (ret == 1) ? it->nbytes : -1, cas);
874 break;
875 case NREAD_SET:
876 MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
877 (ret == 1) ? it->nbytes : -1, cas);
878 break;
879 case NREAD_CAS:
880 MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes,
881 cas);
882 break;
883 }
884 #endif
885
886 switch (ret) {
887 case STORED:
888 out_string(c, "STORED");
889 break;
890 case EXISTS:
891 out_string(c, "EXISTS");
892 break;
893 case NOT_FOUND:
894 out_string(c, "NOT_FOUND");
895 break;
896 case NOT_STORED:
897 out_string(c, "NOT_STORED");
898 break;
899 default:
900 out_string(c, "SERVER_ERROR Unhandled storage type.");
901 }
902
903 }
904
905 item_remove(c->item); /* release the c->item reference */
906 c->item = 0;
907 }
908
909 /**
910 * get a pointer to the start of the request struct for the current command
911 */
912 static void* binary_get_request(conn *c) {
913 char *ret = c->rcurr;
914 ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
915 c->binary_header.request.extlen);
916
917 assert(ret >= c->rbuf);
918 return ret;
919 }
920
921 /**
922 * get a pointer to the key in this request
923 */
924 static char* binary_get_key(conn *c) {
925 return c->rcurr - (c->binary_header.request.keylen);
926 }
927
928 static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
929 protocol_binary_response_header* header;
930
931 assert(c);
932
933 c->msgcurr = 0;
934 c->msgused = 0;
935 c->iovused = 0;
936 if (add_msghdr(c) != 0) {
937 /* XXX: out_string is inappropriate here */
938 out_string(c, "SERVER_ERROR out of memory");
939 return;
940 }
941
942 header = (protocol_binary_response_header *)c->wbuf;
943
944 header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
945 header->response.opcode = c->binary_header.request.opcode;
946 header->response.keylen = (uint16_t)htons(key_len);
947
948 header->response.extlen = (uint8_t)hdr_len;
949 header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
950 header->response.status = (uint16_t)htons(err);
951
952 header->response.bodylen = htonl(body_len);
953 header->response.opaque = c->opaque;
954 header->response.cas = htonll(c->cas);
955
956 if (settings.verbose > 1) {
957 int ii;
958 fprintf(stderr, ">%d Writing bin response:", c->sfd);
959 for (ii = 0; ii < sizeof(header->bytes); ++ii) {
960 if (ii % 4 == 0) {
961 fprintf(stderr, "\n>%d ", c->sfd);
962 }
963 fprintf(stderr, " 0x%02x", header->bytes[ii]);
964 }
965 fprintf(stderr, "\n");
966 }
967
968 add_iov(c, c->wbuf, sizeof(header->response));
969 }
970
971 static void write_bin_error(conn *c, protocol_binary_response_status err, int swallow) {
972 const char *errstr = "Unknown error";
973 size_t len;
974
975 switch (err) {
976 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
977 errstr = "Out of memory";
978 break;
979 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
980 errstr = "Unknown command";
981 break;
982 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
983 errstr = "Not found";
984 break;
985 case PROTOCOL_BINARY_RESPONSE_EINVAL:
986 errstr = "Invalid arguments";
987 break;
988 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
989 errstr = "Data exists for key.";
990 break;
991 case PROTOCOL_BINARY_RESPONSE_E2BIG:
992 errstr = "Too large.";
993 break;
994 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
995 errstr = "Non-numeric server-side value for incr or decr";
996 break;
997 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
998 errstr = "Not stored.";
999 break;
1000 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1001 errstr = "Auth failure.";
1002 break;
1003 default:
1004 assert(false);
1005 errstr = "UNHANDLED ERROR";
1006 fprintf(stderr, ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1007 }
1008
1009 if (settings.verbose > 1) {
1010 fprintf(stderr, ">%d Writing an error: %s\n", c->sfd, errstr);
1011 }
1012
1013 len = strlen(errstr);
1014 add_bin_header(c, err, 0, 0, len);
1015 if (len > 0) {
1016 add_iov(c, errstr, len);
1017 }
1018 conn_set_state(c, conn_mwrite);
1019 if(swallow > 0) {
1020 c->sbytes = swallow;
1021 c->write_and_go = conn_swallow;
1022 } else {
1023 c->write_and_go = conn_new_cmd;
1024 }
1025 }
1026
1027 /* Form and send a response to a command over the binary protocol */
1028 static void write_bin_response(conn *c, const void *d, int hlen, int keylen, int dlen) {
1029 if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1030 c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1031 add_bin_header(c, 0, hlen, keylen, dlen);
1032 if(dlen > 0) {
1033 add_iov(c, d, dlen);
1034 }
1035 conn_set_state(c, conn_mwrite);
1036 c->write_and_go = conn_new_cmd;
1037 } else {
1038 conn_set_state(c, conn_new_cmd);
1039 }
1040 }
1041
1042 static void complete_incr_bin(conn *c) {
1043 item *it;
1044 char *key;
1045 size_t nkey;
1046 /* Weird magic in add_delta forces me to pad here */
1047 char tmpbuf[INCR_MAX_STORAGE_LEN];
1048 uint64_t cas = 0;
1049
1050 protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
1051 protocol_binary_request_incr* req = binary_get_request(c);
1052
1053 assert(c != NULL);
1054 assert(c->wsize >= sizeof(*rsp));
1055
1056 /* fix byteorder in the request */
1057 req->message.body.delta = ntohll(req->message.body.delta);
1058 req->message.body.initial = ntohll(req->message.body.initial);
1059 req->message.body.expiration = ntohl(req->message.body.expiration);
1060 key = binary_get_key(c);
1061 nkey = c->binary_header.request.keylen;
1062
1063 if (settings.verbose > 1) {
1064 int i;
1065 fprintf(stderr, "incr ");
1066
1067 for (i = 0; i < nkey; i++) {
1068 fprintf(stderr, "%c", key[i]);
1069 }
1070 fprintf(stderr, " %lld, %llu, %d\n",
1071 (long long)req->message.body.delta,
1072 (long long)req->message.body.initial,
1073 req->message.body.expiration);
1074 }
1075
1076 if (c->binary_header.request.cas != 0) {
1077 cas = c->binary_header.request.cas;
1078 }
1079 switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
1080 req->message.body.delta, tmpbuf,
1081 &cas)) {
1082 case OK:
1083 rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
1084 if (cas) {
1085 c->cas = cas;
1086 }
1087 write_bin_response(c, &rsp->message.body, 0, 0,
1088 sizeof(rsp->message.body.value));
1089 break;
1090 case NON_NUMERIC:
1091 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
1092 break;
1093 case EOM:
1094 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1095 break;
1096 case DELTA_ITEM_NOT_FOUND:
1097 if (req->message.body.expiration != 0xffffffff) {
1098 /* Save some room for the response */
1099 rsp->message.body.value = htonll(req->message.body.initial);
1100 it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
1101 INCR_MAX_STORAGE_LEN);
1102
1103 if (it != NULL) {
1104 snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
1105 (unsigned long long)req->message.body.initial);
1106
1107 if (store_item(it, NREAD_ADD, c)) {
1108 c->cas = ITEM_get_cas(it);
1109 write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
1110 } else {
1111 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1112 }
1113 item_remove(it); /* release our reference */
1114 } else {
1115 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1116 }
1117 } else {
1118 pthread_mutex_lock(&c->thread->stats.mutex);
1119 if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1120 c->thread->stats.incr_misses++;
1121 } else {
1122 c->thread->stats.decr_misses++;
1123 }
1124 pthread_mutex_unlock(&c->thread->stats.mutex);
1125
1126 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1127 }
1128 break;
1129 case DELTA_ITEM_CAS_MISMATCH:
1130 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1131 break;
1132 }
1133 }
1134
1135 static void complete_update_bin(conn *c) {
1136 protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1137 enum store_item_type ret = NOT_STORED;
1138 assert(c != NULL);
1139
1140 item *it = c->item;
1141
1142 pthread_mutex_lock(&c->thread->stats.mutex);
1143 c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
1144 pthread_mutex_unlock(&c->thread->stats.mutex);
1145
1146 /* We don't actually receive the trailing two characters in the bin
1147 * protocol, so we're going to just set them here */
1148 *(ITEM_data(it) + it->nbytes - 2) = '\r';
1149 *(ITEM_data(it) + it->nbytes - 1) = '\n';
1150
1151 ret = store_item(it, c->cmd, c);
1152
1153 #ifdef ENABLE_DTRACE
1154 uint64_t cas = ITEM_get_cas(it);
1155 switch (c->cmd) {
1156 case NREAD_ADD:
1157 MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
1158 (ret == STORED) ? it->nbytes : -1, cas);
1159 break;
1160 case NREAD_REPLACE:
1161 MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
1162 (ret == STORED) ? it->nbytes : -1, cas);
1163 break;
1164 case NREAD_APPEND:
1165 MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
1166 (ret == STORED) ? it->nbytes : -1, cas);
1167 break;
1168 case NREAD_PREPEND:
1169 MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
1170 (ret == STORED) ? it->nbytes : -1, cas);
1171 break;
1172 case NREAD_SET:
1173 MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
1174 (ret == STORED) ? it->nbytes : -1, cas);
1175 break;
1176 }
1177 #endif
1178
1179 switch (ret) {
1180 case STORED:
1181 /* Stored */
1182 write_bin_response(c, NULL, 0, 0, 0);
1183 break;
1184 case EXISTS:
1185 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1186 break;
1187 case NOT_FOUND:
1188 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1189 break;
1190 case NOT_STORED:
1191 if (c->cmd == NREAD_ADD) {
1192 eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1193 } else if(c->cmd == NREAD_REPLACE) {
1194 eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1195 } else {
1196 eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1197 }
1198 write_bin_error(c, eno, 0);
1199 }
1200
1201 item_remove(c->item); /* release the c->item reference */
1202 c->item = 0;
1203 }
1204
1205 static void process_bin_touch(conn *c) {
1206 item *it;
1207
1208 protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1209 char* key = binary_get_key(c);
1210 size_t nkey = c->binary_header.request.keylen;
1211 protocol_binary_request_touch *t = (void *)&c->binary_header;
1212 uint32_t exptime = ntohl(t->message.body.expiration);
1213
1214 if (settings.verbose > 1) {
1215 int ii;
1216 /* May be GAT/GATQ/etc */
1217 fprintf(stderr, "<%d TOUCH ", c->sfd);
1218 for (ii = 0; ii < nkey; ++ii) {
1219 fprintf(stderr, "%c", key[ii]);
1220 }
1221 fprintf(stderr, "\n");
1222 }
1223
1224 it = item_touch(key, nkey, realtime(exptime));
1225
1226 if (it) {
1227 /* the length has two unnecessary bytes ("\r\n") */
1228 uint16_t keylen = 0;
1229 uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1230
1231 item_update(it);
1232 pthread_mutex_lock(&c->thread->stats.mutex);
1233 c->thread->stats.touch_cmds++;
1234 c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
1235 pthread_mutex_unlock(&c->thread->stats.mutex);
1236
1237 MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey,
1238 it->nbytes, ITEM_get_cas(it));
1239
1240 if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) {
1241 bodylen -= it->nbytes - 2;
1242 } else if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1243 bodylen += nkey;
1244 keylen = nkey;
1245 }
1246
1247 add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1248 rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
1249
1250 // add the flags
1251 rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1252 add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1253
1254 if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1255 add_iov(c, ITEM_key(it), nkey);
1256 }
1257
1258 /* Add the data minus the CRLF */
1259 if (c->cmd != PROTOCOL_BINARY_CMD_TOUCH) {
1260 add_iov(c, ITEM_data(it), it->nbytes - 2);
1261 }
1262
1263 conn_set_state(c, conn_mwrite);
1264 c->write_and_go = conn_new_cmd;
1265 /* Remember this command so we can garbage collect it later */
1266 c->item = it;
1267 } else {
1268 pthread_mutex_lock(&c->thread->stats.mutex);
1269 c->thread->stats.touch_cmds++;
1270 c->thread->stats.touch_misses++;
1271 pthread_mutex_unlock(&c->thread->stats.mutex);
1272
1273 MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0);
1274
1275 if (c->noreply) {
1276 conn_set_state(c, conn_new_cmd);
1277 } else {
1278 if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1279 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1280 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1281 0, nkey, nkey);
1282 memcpy(ofs, key, nkey);
1283 add_iov(c, ofs, nkey);
1284 conn_set_state(c, conn_mwrite);
1285 c->write_and_go = conn_new_cmd;
1286 } else {
1287 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1288 }
1289 }
1290 }
1291
1292 if (settings.detail_enabled) {
1293 stats_prefix_record_get(key, nkey, NULL != it);
1294 }
1295 }
1296
1297 static void process_bin_get(conn *c) {
1298 item *it;
1299
1300 protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1301 char* key = binary_get_key(c);
1302 size_t nkey = c->binary_header.request.keylen;
1303
1304 if (settings.verbose > 1) {
1305 int ii;
1306 fprintf(stderr, "<%d GET ", c->sfd);
1307 for (ii = 0; ii < nkey; ++ii) {
1308 fprintf(stderr, "%c", key[ii]);
1309 }
1310 fprintf(stderr, "\n");
1311 }
1312
1313 it = item_get(key, nkey);
1314 if (it) {
1315 /* the length has two unnecessary bytes ("\r\n") */
1316 uint16_t keylen = 0;
1317 uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1318
1319 item_update(it);
1320 pthread_mutex_lock(&c->thread->stats.mutex);
1321 c->thread->stats.get_cmds++;
1322 c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
1323 pthread_mutex_unlock(&c->thread->stats.mutex);
1324
1325 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
1326 it->nbytes, ITEM_get_cas(it));
1327
1328 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1329 bodylen += nkey;
1330 keylen = nkey;
1331 }
1332 add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1333 rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
1334
1335 // add the flags
1336 rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1337 add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1338
1339 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1340 add_iov(c, ITEM_key(it), nkey);
1341 }
1342
1343 /* Add the data minus the CRLF */
1344 add_iov(c, ITEM_data(it), it->nbytes - 2);
1345 conn_set_state(c, conn_mwrite);
1346 c->write_and_go = conn_new_cmd;
1347 /* Remember this command so we can garbage collect it later */
1348 c->item = it;
1349 } else {
1350 pthread_mutex_lock(&c->thread->stats.mutex);
1351 c->thread->stats.get_cmds++;
1352 c->thread->stats.get_misses++;
1353 pthread_mutex_unlock(&c->thread->stats.mutex);
1354
1355 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1356
1357 if (c->noreply) {
1358 conn_set_state(c, conn_new_cmd);
1359 } else {
1360 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1361 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1362 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1363 0, nkey, nkey);
1364 memcpy(ofs, key, nkey);
1365 add_iov(c, ofs, nkey);
1366 conn_set_state(c, conn_mwrite);
1367 c->write_and_go = conn_new_cmd;
1368 } else {
1369 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1370 }
1371 }
1372 }
1373
1374 if (settings.detail_enabled) {
1375 stats_prefix_record_get(key, nkey, NULL != it);
1376 }
1377 }
1378
1379 #ifndef __INTEL_COMPILER
1380 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
1381 #endif
1382 static void append_bin_stats(const char *key, const uint16_t klen,
1383 const char *val, const uint32_t vlen,
1384 conn *c) {
1385 char *buf = c->stats.buffer + c->stats.offset;
1386 uint32_t bodylen = klen + vlen;
1387 protocol_binary_response_header header = {
1388 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
1389 .response.opcode = (uint8_t)PROTOCOL_BINARY_CMD_STAT,
1390 .response.keylen = (uint16_t)htons(klen),
1391 .response.extlen = (uint8_t)0,
1392 .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1393 .response.status = (uint16_t)0,
1394 .response.bodylen = htonl(bodylen),
1395 .response.opaque = c->opaque,
1396 .response.cas = (uint64_t)0
1397 };
1398
1399 memcpy(buf, header.bytes, sizeof(header.response));
1400 buf += sizeof(header.response);
1401
1402 if (klen > 0) {
1403 memcpy(buf, key, klen);
1404 buf += klen;
1405
1406 if (vlen > 0) {
1407 memcpy(buf, val, vlen);
1408 }
1409 }
1410
1411 c->stats.offset += sizeof(header.response) + bodylen;
1412 }
1413
1414 static void append_ascii_stats(const char *key, const uint16_t klen,
1415 const char *val, const uint32_t vlen,
1416 conn *c) {
1417 char *pos = c->stats.buffer + c->stats.offset;
1418 uint32_t nbytes = 0;
1419 int remaining = c->stats.size - c->stats.offset;
1420 int room = remaining - 1;
1421
1422 if (klen == 0 && vlen == 0) {
1423 nbytes = snprintf(pos, room, "END\r\n");
1424 } else if (vlen == 0) {
1425 nbytes = snprintf(pos, room, "STAT %s\r\n", key);
1426 } else {
1427 nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
1428 }
1429
1430 c->stats.offset += nbytes;
1431 }
1432
1433 static bool grow_stats_buf(conn *c, size_t needed) {
1434 size_t nsize = c->stats.size;
1435 size_t available = nsize - c->stats.offset;
1436 bool rv = true;
1437
1438 /* Special case: No buffer -- need to allocate fresh */
1439 if (c->stats.buffer == NULL) {
1440 nsize = 1024;
1441 available = c->stats.size = c->stats.offset = 0;
1442 }
1443
1444 while (needed > available) {
1445 assert(nsize > 0);
1446 nsize = nsize << 1;
1447 available = nsize - c->stats.offset;
1448 }
1449
1450 if (nsize != c->stats.size) {
1451 char *ptr = realloc(c->stats.buffer, nsize);
1452 if (ptr) {
1453 c->stats.buffer = ptr;
1454 c->stats.size = nsize;
1455 } else {
1456 rv = false;
1457 }
1458 }
1459
1460 return rv;
1461 }
1462
1463 static void append_stats(const char *key, const uint16_t klen,
1464 const char *val, const uint32_t vlen,
1465 const void *cookie)
1466 {
1467 /* value without a key is invalid */
1468 if (klen == 0 && vlen > 0) {
1469 return ;
1470 }
1471
1472 conn *c = (conn*)cookie;
1473
1474 if (c->protocol == binary_prot) {
1475 size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1476 if (!grow_stats_buf(c, needed)) {
1477 return ;
1478 }
1479 append_bin_stats(key, klen, val, vlen, c);
1480 } else {
1481 size_t needed = vlen + klen + 10; // 10 == "STAT = \r\n"
1482 if (!grow_stats_buf(c, needed)) {
1483 return ;
1484 }
1485 append_ascii_stats(key, klen, val, vlen, c);
1486 }
1487
1488 assert(c->stats.offset <= c->stats.size);
1489 }
1490
1491 static void process_bin_stat(conn *c) {
1492 char *subcommand = binary_get_key(c);
1493 size_t nkey = c->binary_header.request.keylen;
1494
1495 if (settings.verbose > 1) {
1496 int ii;
1497 fprintf(stderr, "<%d STATS ", c->sfd);
1498 for (ii = 0; ii < nkey; ++ii) {
1499 fprintf(stderr, "%c", subcommand[ii]);
1500 }
1501 fprintf(stderr, "\n");
1502 }
1503
1504 if (nkey == 0) {
1505 /* request all statistics */
1506 server_stats(&append_stats, c);
1507 (void)get_stats(NULL, 0, &append_stats, c);
1508 } else if (strncmp(subcommand, "reset", 5) == 0) {
1509 stats_reset();
1510 } else if (strncmp(subcommand, "settings", 8) == 0) {
1511 process_stat_settings(&append_stats, c);
1512 } else if (strncmp(subcommand, "detail", 6) == 0) {
1513 char *subcmd_pos = subcommand + 6;
1514 if (strncmp(subcmd_pos, " dump", 5) == 0) {
1515 int len;
1516 char *dump_buf = stats_prefix_dump(&len);
1517 if (dump_buf == NULL || len <= 0) {
1518 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1519 return ;
1520 } else {
1521 append_stats("detailed", strlen("detailed"), dump_buf, len, c);
1522 free(dump_buf);
1523 }
1524 } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1525 settings.detail_enabled = 1;
1526 } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1527 settings.detail_enabled = 0;
1528 } else {
1529 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1530 return;
1531 }
1532 } else {
1533 if (get_stats(subcommand, nkey, &append_stats, c)) {
1534 if (c->stats.buffer == NULL) {
1535 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1536 } else {
1537 write_and_free(c, c->stats.buffer, c->stats.offset);
1538 c->stats.buffer = NULL;
1539 }
1540 } else {
1541 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1542 }
1543
1544 return;
1545 }
1546
1547 /* Append termination package and start the transfer */
1548 append_stats(NULL, 0, NULL, 0, c);
1549 if (c->stats.buffer == NULL) {
1550 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1551 } else {
1552 write_and_free(c, c->stats.buffer, c->stats.offset);
1553 c->stats.buffer = NULL;
1554 }
1555 }
1556
1557 static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1558 assert(c);
1559 c->substate = next_substate;
1560 c->rlbytes = c->keylen + extra;
1561
1562 /* Ok... do we have room for the extras and the key in the input buffer? */
1563 ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1564 if (c->rlbytes > c->rsize - offset) {
1565 size_t nsize = c->rsize;
1566 size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
1567
1568 while (size > nsize) {
1569 nsize *= 2;
1570 }
1571
1572 if (nsize != c->rsize) {
1573 if (settings.verbose > 1) {
1574 fprintf(stderr, "%d: Need to grow buffer from %lu to %lu\n",
1575 c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1576 }
1577 char *newm = realloc(c->rbuf, nsize);
1578 if (newm == NULL) {
1579 if (settings.verbose) {
1580 fprintf(stderr, "%d: Failed to grow buffer.. closing connection\n",
1581 c->sfd);
1582 }
1583 conn_set_state(c, conn_closing);
1584 return;
1585 }
1586
1587 c->rbuf= newm;
1588 /* rcurr should point to the same offset in the packet */
1589 c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1590 c->rsize = nsize;
1591 }
1592 if (c->rbuf != c->rcurr) {
1593 memmove(c->rbuf, c->rcurr, c->rbytes);
1594 c->rcurr = c->rbuf;
1595 if (settings.verbose > 1) {
1596 fprintf(stderr, "%d: Repack input buffer\n", c->sfd);
1597 }
1598 }
1599 }
1600
1601 /* preserve the header in the buffer.. */
1602 c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1603 conn_set_state(c, conn_nread);
1604 }
1605
1606 /* Just write an error message and disconnect the client */
1607 static void handle_binary_protocol_error(conn *c) {
1608 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1609 if (settings.verbose) {
1610 fprintf(stderr, "Protocol error (opcode %02x), close connection %d\n",
1611 c->binary_header.request.opcode, c->sfd);
1612 }
1613 c->write_and_go = conn_closing;
1614 }
1615
1616 static void init_sasl_conn(conn *c) {
1617 assert(c);
1618 /* should something else be returned? */
1619 if (!settings.sasl)
1620 return;
1621
1622 if (!c->sasl_conn) {
1623 int result=sasl_server_new("memcached",
1624 NULL,
1625 my_sasl_hostname[0] ? my_sasl_hostname : NULL,
1626 NULL, NULL,
1627 NULL, 0, &c->sasl_conn);
1628 if (result != SASL_OK) {
1629 if (settings.verbose) {
1630 fprintf(stderr, "Failed to initialize SASL conn.\n");
1631 }
1632 c->sasl_conn = NULL;
1633 }
1634 }
1635 }
1636
1637 static void bin_list_sasl_mechs(conn *c) {
1638 // Guard against a disabled SASL.
1639 if (!settings.sasl) {
1640 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
1641 c->binary_header.request.bodylen
1642 - c->binary_header.request.keylen);
1643 return;
1644 }
1645
1646 init_sasl_conn(c);
1647 const char *result_string = NULL;
1648 unsigned int string_length = 0;
1649 int result=sasl_listmech(c->sasl_conn, NULL,
1650 "", /* What to prepend the string with */
1651 " ", /* What to separate mechanisms with */
1652 "", /* What to append to the string */
1653 &result_string, &string_length,
1654 NULL);
1655 if (result != SASL_OK) {
1656 /* Perhaps there's a better error for this... */
1657 if (settings.verbose) {
1658 fprintf(stderr, "Failed to list SASL mechanisms.\n");
1659 }
1660 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1661 return;
1662 }
1663 write_bin_response(c, (char*)result_string, 0, 0, string_length);
1664 }
1665
1666 static void process_bin_sasl_auth(conn *c) {
1667 // Guard for handling disabled SASL on the server.
1668 if (!settings.sasl) {
1669 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
1670 c->binary_header.request.bodylen
1671 - c->binary_header.request.keylen);
1672 return;
1673 }
1674
1675 assert(c->binary_header.request.extlen == 0);
1676
1677 int nkey = c->binary_header.request.keylen;
1678 int vlen = c->binary_header.request.bodylen - nkey;
1679
1680 if (nkey > MAX_SASL_MECH_LEN) {
1681 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
1682 c->write_and_go = conn_swallow;
1683 return;
1684 }
1685
1686 char *key = binary_get_key(c);
1687 assert(key);
1688
1689 item *it = item_alloc(key, nkey, 0, 0, vlen);
1690
1691 if (it == 0) {
1692 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1693 c->write_and_go = conn_swallow;
1694 return;
1695 }
1696
1697 c->item = it;
1698 c->ritem = ITEM_data(it);
1699 c->rlbytes = vlen;
1700 conn_set_state(c, conn_nread);
1701 c->substate = bin_reading_sasl_auth_data;
1702 }
1703
1704 static void process_bin_complete_sasl_auth(conn *c) {
1705 assert(settings.sasl);
1706 const char *out = NULL;
1707 unsigned int outlen = 0;
1708
1709 assert(c->item);
1710 init_sasl_conn(c);
1711
1712 int nkey = c->binary_header.request.keylen;
1713 int vlen = c->binary_header.request.bodylen - nkey;
1714
1715 char mech[nkey+1];
1716 memcpy(mech, ITEM_key((item*)c->item), nkey);
1717 mech[nkey] = 0x00;
1718
1719 if (settings.verbose)
1720 fprintf(stderr, "mech: ``%s'' with %d bytes of data\n", mech, vlen);
1721
1722 const char *challenge = vlen == 0 ? NULL : ITEM_data((item*) c->item);
1723
1724 int result=-1;
1725
1726 switch (c->cmd) {
1727 case PROTOCOL_BINARY_CMD_SASL_AUTH:
1728 result = sasl_server_start(c->sasl_conn, mech,
1729 challenge, vlen,
1730 &out, &outlen);
1731 break;
1732 case PROTOCOL_BINARY_CMD_SASL_STEP:
1733 result = sasl_server_step(c->sasl_conn,
1734 challenge, vlen,
1735 &out, &outlen);
1736 break;
1737 default:
1738 assert(false); /* CMD should be one of the above */
1739 /* This code is pretty much impossible, but makes the compiler
1740 happier */
1741 if (settings.verbose) {
1742 fprintf(stderr, "Unhandled command %d with challenge %s\n",
1743 c->cmd, challenge);
1744 }
1745 break;
1746 }
1747
1748 item_unlink(c->item);
1749
1750 if (settings.verbose) {
1751 fprintf(stderr, "sasl result code: %d\n", result);
1752 }
1753
1754 switch(result) {
1755 case SASL_OK:
1756 write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated"));
1757 pthread_mutex_lock(&c->thread->stats.mutex);
1758 c->thread->stats.auth_cmds++;
1759 pthread_mutex_unlock(&c->thread->stats.mutex);
1760 break;
1761 case SASL_CONTINUE:
1762 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
1763 if(outlen > 0) {
1764 add_iov(c, out, outlen);
1765 }
1766 conn_set_state(c, conn_mwrite);
1767 c->write_and_go = conn_new_cmd;
1768 break;
1769 default:
1770 if (settings.verbose)
1771 fprintf(stderr, "Unknown sasl response: %d\n", result);
1772 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1773 pthread_mutex_lock(&c->thread->stats.mutex);
1774 c->thread->stats.auth_cmds++;
1775 c->thread->stats.auth_errors++;
1776 pthread_mutex_unlock(&c->thread->stats.mutex);
1777 }
1778 }
1779
1780 static bool authenticated(conn *c) {
1781 assert(settings.sasl);
1782 bool rv = false;
1783
1784 switch (c->cmd) {
1785 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
1786 case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */
1787 case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */
1788 case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */
1789 rv = true;
1790 break;
1791 default:
1792 if (c->sasl_conn) {
1793 const void *uname = NULL;
1794 sasl_getprop(c->sasl_conn, SASL_USERNAME, &uname);
1795 rv = uname != NULL;
1796 }
1797 }
1798
1799 if (settings.verbose > 1) {
1800 fprintf(stderr, "authenticated() in cmd 0x%02x is %s\n",
1801 c->cmd, rv ? "true" : "false");
1802 }
1803
1804 return rv;
1805 }
1806
1807 static void dispatch_bin_command(conn *c) {
1808 int protocol_error = 0;
1809
1810 int extlen = c->binary_header.request.extlen;
1811 int keylen = c->binary_header.request.keylen;
1812 uint32_t bodylen = c->binary_header.request.bodylen;
1813
1814 if (settings.sasl && !authenticated(c)) {
1815 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1816 c->write_and_go = conn_closing;
1817 return;
1818 }
1819
1820 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
1821 c->noreply = true;
1822
1823 /* binprot supports 16bit keys, but internals are still 8bit */
1824 if (keylen > KEY_MAX_LENGTH) {
1825 handle_binary_protocol_error(c);
1826 return;
1827 }
1828
1829 switch (c->cmd) {
1830 case PROTOCOL_BINARY_CMD_SETQ:
1831 c->cmd = PROTOCOL_BINARY_CMD_SET;
1832 break;
1833 case PROTOCOL_BINARY_CMD_ADDQ:
1834 c->cmd = PROTOCOL_BINARY_CMD_ADD;
1835 break;
1836 case PROTOCOL_BINARY_CMD_REPLACEQ:
1837 c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
1838 break;
1839 case PROTOCOL_BINARY_CMD_DELETEQ:
1840 c->cmd = PROTOCOL_BINARY_CMD_DELETE;
1841 break;
1842 case PROTOCOL_BINARY_CMD_INCREMENTQ:
1843 c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
1844 break;
1845 case PROTOCOL_BINARY_CMD_DECREMENTQ:
1846 c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
1847 break;
1848 case PROTOCOL_BINARY_CMD_QUITQ:
1849 c->cmd = PROTOCOL_BINARY_CMD_QUIT;
1850 break;
1851 case PROTOCOL_BINARY_CMD_FLUSHQ:
1852 c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
1853 break;
1854 case PROTOCOL_BINARY_CMD_APPENDQ:
1855 c->cmd = PROTOCOL_BINARY_CMD_APPEND;
1856 break;
1857 case PROTOCOL_BINARY_CMD_PREPENDQ:
1858 c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
1859 break;
1860 case PROTOCOL_BINARY_CMD_GETQ:
1861 c->cmd = PROTOCOL_BINARY_CMD_GET;
1862 break;
1863 case PROTOCOL_BINARY_CMD_GETKQ:
1864 c->cmd = PROTOCOL_BINARY_CMD_GETK;
1865 break;
1866 case PROTOCOL_BINARY_CMD_GATQ:
1867 c->cmd = PROTOCOL_BINARY_CMD_GAT;
1868 break;
1869 case PROTOCOL_BINARY_CMD_GATKQ:
1870 c->cmd = PROTOCOL_BINARY_CMD_GAT;
1871 break;
1872 default:
1873 c->noreply = false;
1874 }
1875
1876 switch (c->cmd) {
1877 case PROTOCOL_BINARY_CMD_VERSION:
1878 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1879 write_bin_response(c, RVERSION, 0, 0, strlen(RVERSION));
1880 } else {
1881 protocol_error = 1;
1882 }
1883 break;
1884 case PROTOCOL_BINARY_CMD_FLUSH:
1885 if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
1886 bin_read_key(c, bin_read_flush_exptime, extlen);
1887 } else {
1888 protocol_error = 1;
1889 }
1890 break;
1891 case PROTOCOL_BINARY_CMD_NOOP:
1892 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1893 write_bin_response(c, NULL, 0, 0, 0);
1894 } else {
1895 protocol_error = 1;
1896 }
1897 break;
1898 case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
1899 case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
1900 case PROTOCOL_BINARY_CMD_REPLACE:
1901 if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
1902 bin_read_key(c, bin_reading_set_header, 8);
1903 } else {
1904 protocol_error = 1;
1905 }
1906 break;
1907 case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */
1908 case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */
1909 case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
1910 case PROTOCOL_BINARY_CMD_GETK:
1911 if (extlen == 0 && bodylen == keylen && keylen > 0) {
1912 bin_read_key(c, bin_reading_get_key, 0);
1913 } else {
1914 protocol_error = 1;
1915 }
1916 break;
1917 case PROTOCOL_BINARY_CMD_DELETE:
1918 if (keylen > 0 && extlen == 0 && bodylen == keylen) {
1919 bin_read_key(c, bin_reading_del_header, extlen);
1920 } else {
1921 protocol_error = 1;
1922 }
1923 break;
1924 case PROTOCOL_BINARY_CMD_INCREMENT:
1925 case PROTOCOL_BINARY_CMD_DECREMENT:
1926 if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
1927 bin_read_key(c, bin_reading_incr_header, 20);
1928 } else {
1929 protocol_error = 1;
1930 }
1931 break;
1932 case PROTOCOL_BINARY_CMD_APPEND:
1933 case PROTOCOL_BINARY_CMD_PREPEND:
1934 if (keylen > 0 && extlen == 0) {
1935 bin_read_key(c, bin_reading_set_header, 0);
1936 } else {
1937 protocol_error = 1;
1938 }
1939 break;
1940 case PROTOCOL_BINARY_CMD_STAT:
1941 if (extlen == 0) {
1942 bin_read_key(c, bin_reading_stat, 0);
1943 } else {
1944 protocol_error = 1;
1945 }
1946 break;
1947 case PROTOCOL_BINARY_CMD_QUIT:
1948 if (keylen == 0 && extlen == 0 && bodylen == 0) {
1949 write_bin_response(c, NULL, 0, 0, 0);
1950 c->write_and_go = conn_closing;
1951 if (c->noreply) {
1952 conn_set_state(c, conn_closing);
1953 }
1954 } else {
1955 protocol_error = 1;
1956 }
1957 break;
1958 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
1959 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1960 bin_list_sasl_mechs(c);
1961 } else {
1962 protocol_error = 1;
1963 }
1964 break;
1965 case PROTOCOL_BINARY_CMD_SASL_AUTH:
1966 case PROTOCOL_BINARY_CMD_SASL_STEP:
1967 if (extlen == 0 && keylen != 0) {
1968 bin_read_key(c, bin_reading_sasl_auth, 0);
1969 } else {
1970 protocol_error = 1;
1971 }
1972 break;
1973 case PROTOCOL_BINARY_CMD_TOUCH:
1974 case PROTOCOL_BINARY_CMD_GAT:
1975 case PROTOCOL_BINARY_CMD_GATQ:
1976 case PROTOCOL_BINARY_CMD_GATK:
1977 case PROTOCOL_BINARY_CMD_GATKQ:
1978 if (extlen == 4 && keylen != 0) {
1979 bin_read_key(c, bin_reading_touch_key, 4);
1980 } else {
1981 protocol_error = 1;
1982 }
1983 break;
1984 default:
1985 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
1986 }
1987
1988 if (protocol_error)
1989 handle_binary_protocol_error(c);
1990 }
1991
1992 static void process_bin_update(conn *c) {
1993 char *key;
1994 int nkey;
1995 int vlen;
1996 item *it;
1997 protocol_binary_request_set* req = binary_get_request(c);
1998
1999 assert(c != NULL);
2000
2001 key = binary_get_key(c);
2002 nkey = c->binary_header.request.keylen;
2003
2004 /* fix byteorder in the request */
2005 req->message.body.flags = ntohl(req->message.body.flags);
2006 req->message.body.expiration = ntohl(req->message.body.expiration);
2007
2008 vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
2009
2010 if (settings.verbose > 1) {
2011 int ii;
2012 if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
2013 fprintf(stderr, "<%d ADD ", c->sfd);
2014 } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
2015 fprintf(stderr, "<%d SET ", c->sfd);
2016 } else {
2017 fprintf(stderr, "<%d REPLACE ", c->sfd);
2018 }
2019 for (ii = 0; ii < nkey; ++ii) {
2020 fprintf(stderr, "%c", key[ii]);
2021 }
2022
2023 fprintf(stderr, " Value len is %d", vlen);
2024 fprintf(stderr, "\n");
2025 }
2026
2027 if (settings.detail_enabled) {
2028 stats_prefix_record_set(key, nkey);
2029 }
2030
2031 it = item_alloc(key, nkey, req->message.body.flags,
2032 realtime(req->message.body.expiration), vlen+2);
2033
2034 if (it == 0) {
2035 if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
2036 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
2037 } else {
2038 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2039 }
2040
2041 /* Avoid stale data persisting in cache because we failed alloc.
2042 * Unacceptable for SET. Anywhere else too? */
2043 if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
2044 it = item_get(key, nkey);
2045 if (it) {
2046 item_unlink(it);
2047 item_remove(it);
2048 }
2049 }
2050
2051 /* swallow the data line */
2052 c->write_and_go = conn_swallow;
2053 return;
2054 }
2055
2056 ITEM_set_cas(it, c->binary_header.request.cas);
2057
2058 switch (c->cmd) {
2059 case PROTOCOL_BINARY_CMD_ADD:
2060 c->cmd = NREAD_ADD;
2061 break;
2062 case PROTOCOL_BINARY_CMD_SET:
2063 c->cmd = NREAD_SET;
2064 break;
2065 case PROTOCOL_BINARY_CMD_REPLACE:
2066 c->cmd = NREAD_REPLACE;
2067 break;
2068 default:
2069 assert(0);
2070 }
2071
2072 if (ITEM_get_cas(it) != 0) {
2073 c->cmd = NREAD_CAS;
2074 }
2075
2076 c->item = it;
2077 c->ritem = ITEM_data(it);
2078 c->rlbytes = vlen;
2079 conn_set_state(c, conn_nread);
2080 c->substate = bin_read_set_value;
2081 }
2082
2083 static void process_bin_append_prepend(conn *c) {
2084 char *key;
2085 int nkey;
2086 int vlen;
2087 item *it;
2088
2089 assert(c != NULL);
2090
2091 key = binary_get_key(c);
2092 nkey = c->binary_header.request.keylen;
2093 vlen = c->binary_header.request.bodylen - nkey;
2094
2095 if (settings.verbose > 1) {
2096 fprintf(stderr, "Value len is %d\n", vlen);
2097 }
2098
2099 if (settings.detail_enabled) {
2100 stats_prefix_record_set(key, nkey);
2101 }
2102
2103 it = item_alloc(key, nkey, 0, 0, vlen+2);
2104
2105 if (it == 0) {
2106 if (! item_size_ok(nkey, 0, vlen + 2)) {
2107 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
2108 } else {
2109 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2110 }
2111 /* swallow the data line */
2112 c->write_and_go = conn_swallow;
2113 return;
2114 }
2115
2116 ITEM_set_cas(it, c->binary_header.request.cas);
2117
2118 switch (c->cmd) {
2119 case PROTOCOL_BINARY_CMD_APPEND:
2120 c->cmd = NREAD_APPEND;
2121 break;
2122 case PROTOCOL_BINARY_CMD_PREPEND:
2123 c->cmd = NREAD_PREPEND;
2124 break;
2125 default:
2126 assert(0);
2127 }
2128
2129 c->item = it;
2130 c->ritem = ITEM_data(it);
2131 c->rlbytes = vlen;
2132 conn_set_state(c, conn_nread);
2133 c->substate = bin_read_set_value;
2134 }
2135
2136 static void process_bin_flush(conn *c) {
2137 time_t exptime = 0;
2138 protocol_binary_request_flush* req = binary_get_request(c);
2139
2140 if (c->binary_header.request.extlen == sizeof(req->message.body)) {
2141 exptime = ntohl(req->message.body.expiration);
2142 }
2143
2144 if (exptime > 0) {
2145 settings.oldest_live = realtime(exptime) - 1;
2146 } else {
2147 settings.oldest_live = current_time - 1;
2148 }
2149 item_flush_expired();
2150
2151 pthread_mutex_lock(&c->thread->stats.mutex);
2152 c->thread->stats.flush_cmds++;
2153 pthread_mutex_unlock(&c->thread->stats.mutex);
2154
2155 write_bin_response(c, NULL, 0, 0, 0);
2156 }
2157
2158 static void process_bin_delete(conn *c) {
2159 item *it;
2160
2161 protocol_binary_request_delete* req = binary_get_request(c);
2162
2163 char* key = binary_get_key(c);
2164 size_t nkey = c->binary_header.request.keylen;
2165
2166 assert(c != NULL);
2167
2168 if (settings.verbose > 1) {
2169 fprintf(stderr, "Deleting %s\n", key);
2170 }
2171
2172 if (settings.detail_enabled) {
2173 stats_prefix_record_delete(key, nkey);
2174 }
2175
2176 it = item_get(key, nkey);
2177 if (it) {
2178 uint64_t cas = ntohll(req->message.header.request.cas);
2179 if (cas == 0 || cas == ITEM_get_cas(it)) {
2180 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
2181 pthread_mutex_lock(&c->thread->stats.mutex);
2182 c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
2183 pthread_mutex_unlock(&c->thread->stats.mutex);
2184 item_unlink(it);
2185 write_bin_response(c, NULL, 0, 0, 0);
2186 } else {
2187 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
2188 }
2189 item_remove(it); /* release our reference */
2190 } else {
2191 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
2192 pthread_mutex_lock(&c->thread->stats.mutex);
2193 c->thread->stats.delete_misses++;
2194 pthread_mutex_unlock(&c->thread->stats.mutex);
2195 }
2196 }
2197
2198 static void complete_nread_binary(conn *c) {
2199 assert(c != NULL);
2200 assert(c->cmd >= 0);
2201
2202 switch(c->substate) {
2203 case bin_reading_set_header:
2204 if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
2205 c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
2206 process_bin_append_prepend(c);
2207 } else {
2208 process_bin_update(c);
2209 }
2210 break;
2211 case bin_read_set_value:
2212 complete_update_bin(c);
2213 break;
2214 case bin_reading_get_key:
2215 process_bin_get(c);
2216 break;
2217 case bin_reading_touch_key:
2218 process_bin_touch(c);
2219 break;
2220 case bin_reading_stat:
2221 process_bin_stat(c);
2222 break;
2223 case bin_reading_del_header:
2224 process_bin_delete(c);
2225 break;
2226 case bin_reading_incr_header:
2227 complete_incr_bin(c);
2228 break;
2229 case bin_read_flush_exptime:
2230 process_bin_flush(c);
2231 break;
2232 case bin_reading_sasl_auth:
2233 process_bin_sasl_auth(c);
2234 break;
2235 case bin_reading_sasl_auth_data:
2236 process_bin_complete_sasl_auth(c);
2237 break;
2238 default:
2239 fprintf(stderr, "Not handling substate %d\n", c->substate);
2240 assert(0);
2241 }
2242 }
2243
2244 static void reset_cmd_handler(conn *c) {
2245 c->cmd = -1;
2246 c->substate = bin_no_state;
2247 if(c->item != NULL) {
2248 item_remove(c->item);
2249 c->item = NULL;
2250 }
2251 conn_shrink(c);
2252 if (c->rbytes > 0) {
2253 conn_set_state(c, conn_parse_cmd);
2254 } else {
2255 conn_set_state(c, conn_waiting);
2256 }
2257 }
2258
2259 static void complete_nread(conn *c) {
2260 assert(c != NULL);
2261 assert(c->protocol == ascii_prot
2262 || c->protocol == binary_prot);
2263
2264 if (c->protocol == ascii_prot) {
2265 complete_nread_ascii(c);
2266 } else if (c->protocol == binary_prot) {
2267 complete_nread_binary(c);
2268 }
2269 }
2270
2271 /*
2272 * Stores an item in the cache according to the semantics of one of the set
2273 * commands. In threaded mode, this is protected by the cache lock.
2274 *
2275 * Returns the state of storage.
2276 */
2277 enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
2278 char *key = ITEM_key(it);
2279 item *old_it = do_item_get(key, it->nkey, hv);
2280 enum store_item_type stored = NOT_STORED;
2281
2282 item *new_it = NULL;
2283 int flags;
2284
2285 if (old_it != NULL && comm == NREAD_ADD) {
2286 /* add only adds a nonexistent item, but promote to head of LRU */
2287 do_item_update(old_it);
2288 } else if (!old_it && (comm == NREAD_REPLACE
2289 || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2290 {
2291 /* replace only replaces an existing value; don't store */
2292 } else if (comm == NREAD_CAS) {
2293 /* validate cas operation */
2294 if(old_it == NULL) {
2295 // LRU expired
2296 stored = NOT_FOUND;
2297 pthread_mutex_lock(&c->thread->stats.mutex);
2298 c->thread->stats.cas_misses++;
2299 pthread_mutex_unlock(&c->thread->stats.mutex);
2300 }
2301 else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
2302 // cas validates
2303 // it and old_it may belong to different classes.
2304 // I'm updating the stats for the one that's getting pushed out
2305 pthread_mutex_lock(&c->thread->stats.mutex);
2306 c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
2307 pthread_mutex_unlock(&c->thread->stats.mutex);
2308
2309 item_replace(old_it, it, hv);
2310 stored = STORED;
2311 } else {
2312 pthread_mutex_lock(&c->thread->stats.mutex);
2313 c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
2314 pthread_mutex_unlock(&c->thread->stats.mutex);
2315
2316 if(settings.verbose > 1) {
2317 fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
2318 (unsigned long long)ITEM_get_cas(old_it),
2319 (unsigned long long)ITEM_get_cas(it));
2320 }
2321 stored = EXISTS;
2322 }
2323 } else {
2324 /*
2325 * Append - combine new and old record into single one. Here it's
2326 * atomic and thread-safe.
2327 */
2328 if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2329 /*
2330 * Validate CAS
2331 */
2332 if (ITEM_get_cas(it) != 0) {
2333 // CAS much be equal
2334 if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2335 stored = EXISTS;
2336 }
2337 }
2338
2339 if (stored == NOT_STORED) {
2340 /* we have it and old_it here - alloc memory to hold both */
2341 /* flags was already lost - so recover them from ITEM_suffix(it) */
2342
2343 flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
2344
2345 new_it = item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
2346
2347 if (new_it == NULL) {
2348 /* SERVER_ERROR out of memory */
2349 if (old_it != NULL)
2350 do_item_remove(old_it);
2351
2352 return NOT_STORED;
2353 }
2354
2355 /* copy data from it and old_it to new_it */
2356
2357 if (comm == NREAD_APPEND) {
2358 memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
2359 memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
2360 } else {
2361 /* NREAD_PREPEND */
2362 memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
2363 memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
2364 }
2365
2366 it = new_it;
2367 }
2368 }
2369
2370 if (stored == NOT_STORED) {
2371 if (old_it != NULL)
2372 item_replace(old_it, it, hv);
2373 else
2374 do_item_link(it, hv);
2375
2376 c->cas = ITEM_get_cas(it);
2377
2378 stored = STORED;
2379 }
2380 }
2381
2382 if (old_it != NULL)
2383 do_item_remove(old_it); /* release our reference */
2384 if (new_it != NULL)
2385 do_item_remove(new_it);
2386
2387 if (stored == STORED) {
2388 c->cas = ITEM_get_cas(it);
2389 }
2390
2391 return stored;
2392 }
2393
2394 typedef struct token_s {
2395 char *value;
2396 size_t length;
2397 } token_t;
2398
2399 #define COMMAND_TOKEN 0
2400 #define SUBCOMMAND_TOKEN 1
2401 #define KEY_TOKEN 1
2402
2403 #define MAX_TOKENS 8
2404
2405 /*
2406 * Tokenize the command string by replacing whitespace with '\0' and update
2407 * the token array tokens with pointer to start of each token and length.
2408 * Returns total number of tokens. The last valid token is the terminal
2409 * token (value points to the first unprocessed character of the string and
2410 * length zero).
2411 *
2412 * Usage example:
2413 *
2414 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2415 * for(int ix = 0; tokens[ix].length != 0; ix++) {
2416 * ...
2417 * }
2418 * ncommand = tokens[ix].value - command;
2419 * command = tokens[ix].value;
2420 * }
2421 */
2422 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
2423 char *s, *e;
2424 size_t ntokens = 0;
2425 size_t len = strlen(command);
2426 unsigned int i = 0;
2427
2428 assert(command != NULL && tokens != NULL && max_tokens > 1);
2429
2430 s = e = command;
2431 for (i = 0; i < len; i++) {
2432 if (*e == ' ') {
2433 if (s != e) {
2434 tokens[ntokens].value = s;
2435 tokens[ntokens].length = e - s;
2436 ntokens++;
2437 *e = '\0';
2438 if (ntokens == max_tokens - 1) {
2439 e++;
2440 s = e; /* so we don't add an extra token */
2441 break;
2442 }
2443 }
2444 s = e + 1;
2445 }
2446 e++;
2447 }
2448
2449 if (s != e) {
2450 tokens[ntokens].value = s;
2451 tokens[ntokens].length = e - s;
2452 ntokens++;
2453 }
2454
2455 /*
2456 * If we scanned the whole string, the terminal value pointer is null,
2457 * otherwise it is the first unprocessed character.
2458 */
2459 tokens[ntokens].value = *e == '\0' ? NULL : e;
2460 tokens[ntokens].length = 0;
2461 ntokens++;
2462
2463 return ntokens;
2464 }
2465
2466 /* set up a connection to write a buffer then free it, used for stats */
2467 static void write_and_free(conn *c, char *buf, int bytes) {
2468 if (buf) {
2469 c->write_and_free = buf;
2470 c->wcurr = buf;
2471 c->wbytes = bytes;
2472 conn_set_state(c, conn_write);
2473 c->write_and_go = conn_new_cmd;
2474 } else {
2475 out_string(c, "SERVER_ERROR out of memory writing stats");
2476 }
2477 }
2478
2479 static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
2480 {
2481 int noreply_index = ntokens - 2;
2482
2483 /*
2484 NOTE: this function is not the first place where we are going to
2485 send the reply. We could send it instead from process_command()
2486 if the request line has wrong number of tokens. However parsing
2487 malformed line for "noreply" option is not reliable anyway, so
2488 it can't be helped.
2489 */
2490 if (tokens[noreply_index].value
2491 && strcmp(tokens[noreply_index].value, "noreply") == 0) {
2492 c->noreply = true;
2493 }
2494 return c->noreply;
2495 }
2496
2497 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
2498 const char *fmt, ...) {
2499 char val_str[STAT_VAL_LEN];
2500 int vlen;
2501 va_list ap;
2502
2503 assert(name);
2504 assert(add_stats);
2505 assert(c);
2506 assert(fmt);
2507
2508 va_start(ap, fmt);
2509 vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
2510 va_end(ap);
2511
2512 add_stats(name, strlen(name), val_str, vlen, c);
2513 }
2514
2515 inline static void process_stats_detail(conn *c, const char *command) {
2516 assert(c != NULL);
2517
2518 if (strcmp(command, "on") == 0) {
2519 settings.detail_enabled = 1;
2520 out_string(c, "OK");
2521 }
2522 else if (strcmp(command, "off") == 0) {
2523 settings.detail_enabled = 0;
2524 out_string(c, "OK");
2525 }
2526 else if (strcmp(command, "dump") == 0) {
2527 int len;
2528 char *stats = stats_prefix_dump(&len);
2529 write_and_free(c, stats, len);
2530 }
2531 else {
2532 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
2533 }
2534 }
2535
2536 /* return server specific stats only */
2537 static void server_stats(ADD_STAT add_stats, conn *c) {
2538 pid_t pid = getpid();
2539 rel_time_t now = current_time;
2540
2541 struct thread_stats thread_stats;
2542 threadlocal_stats_aggregate(&thread_stats);
2543 struct slab_stats slab_stats;
2544 slab_stats_aggregate(&thread_stats, &slab_stats);
2545
2546 #ifndef WIN32
2547 struct rusage usage;
2548 getrusage(RUSAGE_SELF, &usage);
2549 #endif /* !WIN32 */
2550
2551 STATS_LOCK();
2552
2553 APPEND_STAT("pid", "%lu", (long)pid);
2554 APPEND_STAT("uptime", "%u", now);
2555 APPEND_STAT("time", "%ld", now + (long)process_started);
2556 APPEND_STAT("version", "%s", RVERSION);
2557 APPEND_STAT("libevent", "%s", event_get_version());
2558 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2559
2560 #ifndef WIN32
2561 append_stat("rusage_user", add_stats, c, "%ld.%06ld",
2562 (long)usage.ru_utime.tv_sec,
2563 (long)usage.ru_utime.tv_usec);
2564 append_stat("rusage_system", add_stats, c, "%ld.%06ld",
2565 (long)usage.ru_stime.tv_sec,
2566 (long)usage.ru_stime.tv_usec);
2567 #endif /* !WIN32 */
2568
2569 APPEND_STAT("curr_connections", "%u", stats.curr_conns - 1);
2570 APPEND_STAT("total_connections", "%u", stats.total_conns);
2571 if (settings.maxconns_fast) {
2572 APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats.rejected_conns);
2573 }
2574 APPEND_STAT("connection_structures", "%u", stats.conn_structs);
2575 APPEND_STAT("reserved_fds", "%u", stats.reserved_fds);
2576 APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
2577 APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
2578 APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
2579 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds);
2580 APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
2581 APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
2582 APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
2583 APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
2584 APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
2585 APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
2586 APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
2587 APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
2588 APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
2589 APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
2590 APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
2591 APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits);
2592 APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses);
2593 APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds);
2594 APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors);
2595 APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
2596 APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
2597 APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
2598 APPEND_STAT("accepting_conns", "%u", stats.accepting_conns);
2599 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
2600 APPEND_STAT("threads", "%d", settings.num_threads);
2601 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
2602 APPEND_STAT("hash_power_level", "%u", stats.hash_power_level);
2603 APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats.hash_bytes);
2604 APPEND_STAT("hash_is_expanding", "%u", stats.hash_is_expanding);
2605 APPEND_STAT("expired_unfetched", "%llu", stats.expired_unfetched);
2606 APPEND_STAT("evicted_unfetched", "%llu", stats.evicted_unfetched);
2607 if (settings.slab_reassign) {
2608 APPEND_STAT("slab_reassign_running", "%u", stats.slab_reassign_running);
2609 APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved);
2610 }
2611 STATS_UNLOCK();
2612 }
2613
2614 static void process_stat_settings(ADD_STAT add_stats, void *c) {
2615 assert(add_stats);
2616 APPEND_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
2617 APPEND_STAT("maxconns", "%d", settings.maxconns);
2618 APPEND_STAT("tcpport", "%d", settings.port);
2619 APPEND_STAT("udpport", "%d", settings.udpport);
2620 APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
2621 APPEND_STAT("verbosity", "%d", settings.verbose);
2622 APPEND_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
2623 APPEND_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
2624 APPEND_STAT("domain_socket", "%s",
2625 settings.socketpath ? settings.socketpath : "NULL");
2626 APPEND_STAT("umask", "%o", settings.access);
2627 APPEND_STAT("growth_factor", "%.2f", settings.factor);
2628 APPEND_STAT("chunk_size", "%d", settings.chunk_size);
2629 APPEND_STAT("num_threads", "%d", settings.num_threads);
2630 APPEND_STAT("num_threads_per_udp", "%d", settings.num_threads_per_udp);
2631 APPEND_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
2632 APPEND_STAT("detail_enabled", "%s",
2633 settings.detail_enabled ? "yes" : "no");
2634 APPEND_STAT("reqs_per_event", "%d", settings.reqs_per_event);
2635 APPEND_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
2636 APPEND_STAT("tcp_backlog", "%d", settings.backlog);
2637 APPEND_STAT("binding_protocol", "%s",
2638 prot_text(settings.binding_protocol));
2639 APPEND_STAT("auth_enabled_sasl", "%s", settings.sasl ? "yes" : "no");
2640 APPEND_STAT("item_size_max", "%d", settings.item_size_max);
2641 APPEND_STAT("maxconns_fast", "%s", settings.maxconns_fast ? "yes" : "no");
2642 APPEND_STAT("hashpower_init", "%d", settings.hashpower_init);
2643 APPEND_STAT("slab_reassign", "%s", settings.slab_reassign ? "yes" : "no");
2644 APPEND_STAT("slab_automove", "%s", settings.slab_automove ? "yes" : "no");
2645 }
2646
2647 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
2648 const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
2649 assert(c != NULL);
2650
2651 if (ntokens < 2) {
2652 out_string(c, "CLIENT_ERROR bad command line");
2653 return;
2654 }
2655
2656 if (ntokens == 2) {
2657 server_stats(&append_stats, c);
2658 (void)get_stats(NULL, 0, &append_stats, c);
2659 } else if (strcmp(subcommand, "reset") == 0) {
2660 stats_reset();
2661 out_string(c, "RESET");
2662 return ;
2663 } else if (strcmp(subcommand, "detail") == 0) {
2664 /* NOTE: how to tackle detail with binary? */
2665 if (ntokens < 4)
2666 process_stats_detail(c, ""); /* outputs the error message */
2667 else
2668 process_stats_detail(c, tokens[2].value);
2669 /* Output already generated */
2670 return ;
2671 } else if (strcmp(subcommand, "settings") == 0) {
2672 process_stat_settings(&append_stats, c);
2673 } else if (strcmp(subcommand, "cachedump") == 0) {
2674 char *buf;
2675 unsigned int bytes, id, limit = 0;
2676
2677 if (ntokens < 5) {
2678 out_string(c, "CLIENT_ERROR bad command line");
2679 return;
2680 }
2681
2682 if (!safe_strtoul(tokens[2].value, &id) ||
2683 !safe_strtoul(tokens[3].value, &limit)) {
2684 out_string(c, "CLIENT_ERROR bad command line format");
2685 return;
2686 }
2687
2688 if (id >= POWER_LARGEST) {
2689 out_string(c, "CLIENT_ERROR Illegal slab id");
2690 return;
2691 }
2692
2693 buf = item_cachedump(id, limit, &bytes);
2694 write_and_free(c, buf, bytes);
2695 return ;
2696 } else {
2697 /* getting here means that the subcommand is either engine specific or
2698 is invalid. query the engine and see. */
2699 if (get_stats(subcommand, strlen(subcommand), &append_stats, c)) {
2700 if (c->stats.buffer == NULL) {
2701 out_string(c, "SERVER_ERROR out of memory writing stats");
2702 } else {
2703 write_and_free(c, c->stats.buffer, c->stats.offset);
2704 c->stats.buffer = NULL;
2705 }
2706 } else {
2707 out_string(c, "ERROR");
2708 }
2709 return ;
2710 }
2711
2712 /* append terminator and start the transfer */
2713 append_stats(NULL, 0, NULL, 0, c);
2714
2715 if (c->stats.buffer == NULL) {
2716 out_string(c, "SERVER_ERROR out of memory writing stats");
2717 } else {
2718 write_and_free(c, c->stats.buffer, c->stats.offset);
2719 c->stats.buffer = NULL;
2720 }
2721 }
2722
2723 #ifndef __INTEL_COMPILER
2724 #pragma GCC diagnostic ignored "-Wunused-but-set-parameter"
2725 #endif
2726 /* ntokens is overwritten here... shrug.. */
2727 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
2728 char *key;
2729 size_t nkey;
2730 int i = 0;
2731 item *it;
2732 token_t *key_token = &tokens[KEY_TOKEN];
2733 char *suffix;
2734 assert(c != NULL);
2735
2736 do {
2737 while(key_token->length != 0) {
2738
2739 key = key_token->value;
2740 nkey = key_token->length;
2741
2742 if(nkey > KEY_MAX_LENGTH) {
2743 out_string(c, "CLIENT_ERROR bad command line format");
2744 return;
2745 }
2746
2747 it = item_get(key, nkey);
2748 if (settings.detail_enabled) {
2749 stats_prefix_record_get(key, nkey, NULL != it);
2750 }
2751 if (it) {
2752 if (i >= c->isize) {
2753 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
2754 if (new_list) {
2755 c->isize *= 2;
2756 c->ilist = new_list;
2757 } else {
2758 item_remove(it);
2759 break;
2760 }
2761 }
2762
2763 /*
2764 * Construct the response. Each hit adds three elements to the
2765 * outgoing data list:
2766 * "VALUE "
2767 * key
2768 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
2769 */
2770
2771 if (return_cas)
2772 {
2773 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2774 it->nbytes, ITEM_get_cas(it));
2775 /* Goofy mid-flight realloc. */
2776 if (i >= c->suffixsize) {
2777 char **new_suffix_list = realloc(c->suffixlist,
2778 sizeof(char *) * c->suffixsize * 2);
2779 if (new_suffix_list) {
2780 c->suffixsize *= 2;
2781 c->suffixlist = new_suffix_list;
2782 } else {
2783 item_remove(it);
2784 break;
2785 }
2786 }
2787
2788 suffix = cache_alloc(c->thread->suffix_cache);
2789 if (suffix == NULL) {
2790 out_string(c, "SERVER_ERROR out of memory making CAS suffix");
2791 item_remove(it);
2792 return;
2793 }
2794 *(c->suffixlist + i) = suffix;
2795 int suffix_len = snprintf(suffix, SUFFIX_SIZE,
2796 " %llu\r\n",
2797 (unsigned long long)ITEM_get_cas(it));
2798 if (add_iov(c, "VALUE ", 6) != 0 ||
2799 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2800 add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
2801 add_iov(c, suffix, suffix_len) != 0 ||
2802 add_iov(c, ITEM_data(it), it->nbytes) != 0)
2803 {
2804 item_remove(it);
2805 break;
2806 }
2807 }
2808 else
2809 {
2810 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2811 it->nbytes, ITEM_get_cas(it));
2812 if (add_iov(c, "VALUE ", 6) != 0 ||
2813 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2814 add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
2815 {
2816 item_remove(it);
2817 break;
2818 }
2819 }
2820
2821
2822 if (settings.verbose > 1)
2823 fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
2824
2825 /* item_get() has incremented it->refcount for us */
2826 pthread_mutex_lock(&c->thread->stats.mutex);
2827 c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
2828 c->thread->stats.get_cmds++;
2829 pthread_mutex_unlock(&c->thread->stats.mutex);
2830 item_update(it);
2831 *(c->ilist + i) = it;
2832 i++;
2833
2834 } else {
2835 pthread_mutex_lock(&c->thread->stats.mutex);
2836 c->thread->stats.get_misses++;
2837 c->thread->stats.get_cmds++;
2838 pthread_mutex_unlock(&c->thread->stats.mutex);
2839 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
2840 }
2841
2842 key_token++;
2843 }
2844
2845 /*
2846 * If the command string hasn't been fully processed, get the next set
2847 * of tokens.
2848 */
2849 if(key_token->value != NULL) {
2850 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
2851 key_token = tokens;
2852 }
2853
2854 } while(key_token->value != NULL);
2855
2856 c->icurr = c->ilist;
2857 c->ileft = i;
2858 if (return_cas) {
2859 c->suffixcurr = c->suffixlist;
2860 c->suffixleft = i;
2861 }
2862
2863 if (settings.verbose > 1)
2864 fprintf(stderr, ">%d END\n", c->sfd);
2865
2866 /*
2867 If the loop was terminated because of out-of-memory, it is not
2868 reliable to add END\r\n to the buffer, because it might not end
2869 in \r\n. So we send SERVER_ERROR instead.
2870 */
2871 if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
2872 || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
2873 out_string(c, "SERVER_ERROR out of memory writing get response");
2874 }
2875 else {
2876 conn_set_state(c, conn_mwrite);
2877 c->msgcurr = 0;
2878 }
2879
2880 return;
2881 }
2882
2883 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2884 char *key;
2885 size_t nkey;
2886 unsigned int flags;
2887 int32_t exptime_int = 0;
2888 time_t exptime;
2889 int vlen;
2890 uint64_t req_cas_id=0;
2891 item *it;
2892
2893 assert(c != NULL);
2894
2895 set_noreply_maybe(c, tokens, ntokens);
2896
2897 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2898 out_string(c, "CLIENT_ERROR bad command line format");
2899 return;
2900 }
2901
2902 key = tokens[KEY_TOKEN].value;
2903 nkey = tokens[KEY_TOKEN].length;
2904
2905 if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
2906 && safe_strtol(tokens[3].value, &exptime_int)
2907 && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
2908 out_string(c, "CLIENT_ERROR bad command line format");
2909 return;
2910 }
2911
2912 /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2913 exptime = exptime_int;
2914
2915 /* Negative exptimes can underflow and end up immortal. realtime() will
2916 immediately expire values that are greater than REALTIME_MAXDELTA, but less
2917 than process_started, so lets aim for that. */
2918 if (exptime < 0)
2919 exptime = REALTIME_MAXDELTA + 1;
2920
2921 // does cas value exist?
2922 if (handle_cas) {
2923 if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
2924 out_string(c, "CLIENT_ERROR bad command line format");
2925 return;
2926 }
2927 }
2928
2929 vlen += 2;
2930 if (vlen < 0 || vlen - 2 < 0) {
2931 out_string(c, "CLIENT_ERROR bad command line format");
2932 return;
2933 }
2934
2935 if (settings.detail_enabled) {
2936 stats_prefix_record_set(key, nkey);
2937 }
2938
2939 it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
2940
2941 if (it == 0) {
2942 if (! item_size_ok(nkey, flags, vlen))
2943 out_string(c, "SERVER_ERROR object too large for cache");
2944 else
2945 out_string(c, "SERVER_ERROR out of memory storing object");
2946 /* swallow the data line */
2947 c->write_and_go = conn_swallow;
2948 c->sbytes = vlen;
2949
2950 /* Avoid stale data persisting in cache because we failed alloc.
2951 * Unacceptable for SET. Anywhere else too? */
2952 if (comm == NREAD_SET) {
2953 it = item_get(key, nkey);
2954 if (it) {
2955 item_unlink(it);
2956 item_remove(it);
2957 }
2958 }
2959
2960 return;
2961 }
2962 ITEM_set_cas(it, req_cas_id);
2963
2964 c->item = it;
2965 c->ritem = ITEM_data(it);
2966 c->rlbytes = it->nbytes;
2967 c->cmd = comm;
2968 conn_set_state(c, conn_nread);
2969 }
2970
2971 static void process_touch_command(conn *c, token_t *tokens, const size_t ntokens) {
2972 char *key;
2973 size_t nkey;
2974 int32_t exptime_int = 0;
2975 item *it;
2976
2977 assert(c != NULL);
2978
2979 set_noreply_maybe(c, tokens, ntokens);
2980
2981 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2982 out_string(c, "CLIENT_ERROR bad command line format");
2983 return;
2984 }
2985
2986 key = tokens[KEY_TOKEN].value;
2987 nkey = tokens[KEY_TOKEN].length;
2988
2989 if (!safe_strtol(tokens[2].value, &exptime_int)) {
2990 out_string(c, "CLIENT_ERROR invalid exptime argument");
2991 return;
2992 }
2993
2994 it = item_touch(key, nkey, realtime(exptime_int));
2995 if (it) {
2996 item_update(it);
2997 pthread_mutex_lock(&c->thread->stats.mutex);
2998 c->thread->stats.touch_cmds++;
2999 c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
3000 pthread_mutex_unlock(&c->thread->stats.mutex);
3001
3002 out_string(c, "TOUCHED");
3003 item_remove(it);
3004 } else {
3005 pthread_mutex_lock(&c->thread->stats.mutex);
3006 c->thread->stats.touch_cmds++;
3007 c->thread->stats.touch_misses++;
3008 pthread_mutex_unlock(&c->thread->stats.mutex);
3009
3010 out_string(c, "NOT_FOUND");
3011 }
3012 }
3013
3014 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
3015 char temp[INCR_MAX_STORAGE_LEN];
3016 uint64_t delta;
3017 char *key;
3018 size_t nkey;
3019
3020 assert(c != NULL);
3021
3022 set_noreply_maybe(c, tokens, ntokens);
3023
3024 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
3025 out_string(c, "CLIENT_ERROR bad command line format");
3026 return;
3027 }
3028
3029 key = tokens[KEY_TOKEN].value;
3030 nkey = tokens[KEY_TOKEN].length;
3031
3032 if (!safe_strtoull(tokens[2].value, &delta)) {
3033 out_string(c, "CLIENT_ERROR invalid numeric delta argument");
3034 return;
3035 }
3036
3037 switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) {
3038 case OK:
3039 out_string(c, temp);
3040 break;
3041 case NON_NUMERIC:
3042 out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
3043 break;
3044 case EOM:
3045 out_string(c, "SERVER_ERROR out of memory");
3046 break;
3047 case DELTA_ITEM_NOT_FOUND:
3048 pthread_mutex_lock(&c->thread->stats.mutex);
3049 if (incr) {
3050 c->thread->stats.incr_misses++;
3051 } else {
3052 c->thread->stats.decr_misses++;
3053 }
3054 pthread_mutex_unlock(&c->thread->stats.mutex);
3055
3056 out_string(c, "NOT_FOUND");
3057 break;
3058 case DELTA_ITEM_CAS_MISMATCH:
3059 break; /* Should never get here */
3060 }
3061 }
3062
3063 /*
3064 * adds a delta value to a numeric item.
3065 *
3066 * c connection requesting the operation
3067 * it item to adjust
3068 * incr true to increment value, false to decrement
3069 * delta amount to adjust value by
3070 * buf buffer for response string
3071 *
3072 * returns a response string to send back to the client.
3073 */
3074 enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
3075 const bool incr, const int64_t delta,
3076 char *buf, uint64_t *cas,
3077 const uint32_t hv) {
3078 char *ptr;
3079 uint64_t value;
3080 int res;
3081 item *it;
3082
3083 it = do_item_get(key, nkey, hv);
3084 if (!it) {
3085 return DELTA_ITEM_NOT_FOUND;
3086 }
3087
3088 if (cas != NULL && *cas != 0 && ITEM_get_cas(it) != *cas) {
3089 do_item_remove(it);
3090 return DELTA_ITEM_CAS_MISMATCH;
3091 }
3092
3093 ptr = ITEM_data(it);
3094
3095 if (!safe_strtoull(ptr, &value)) {
3096 do_item_remove(it);
3097 return NON_NUMERIC;
3098 }
3099
3100 if (incr) {
3101 value += delta;
3102 MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
3103 } else {
3104 if(delta > value) {
3105 value = 0;
3106 } else {
3107 value -= delta;
3108 }
3109 MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
3110 }
3111
3112 pthread_mutex_lock(&c->thread->stats.mutex);
3113 if (incr) {
3114 c->thread->stats.slab_stats[it->slabs_clsid].incr_hits++;
3115 } else {
3116 c->thread->stats.slab_stats[it->slabs_clsid].decr_hits++;
3117 }
3118 pthread_mutex_unlock(&c->thread->stats.mutex);
3119
3120 snprintf(buf, INCR_MAX_STORAGE_LEN, "%llu", (unsigned long long)value);
3121 res = strlen(buf);
3122 if (res + 2 > it->nbytes || it->refcount != 1) { /* need to realloc */
3123 item *new_it;
3124 new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
3125 if (new_it == 0) {
3126 do_item_remove(it);
3127 return EOM;
3128 }
3129 memcpy(ITEM_data(new_it), buf, res);
3130 memcpy(ITEM_data(new_it) + res, "\r\n", 2);
3131 item_replace(it, new_it, hv);
3132 // Overwrite the older item's CAS with our new CAS since we're
3133 // returning the CAS of the old item below.
3134 ITEM_set_cas(it, (settings.use_cas) ? ITEM_get_cas(new_it) : 0);
3135 do_item_remove(new_it); /* release our reference */
3136 } else { /* replace in-place */
3137 /* When changing the value without replacing the item, we
3138 need to update the CAS on the existing item. */
3139 mutex_lock(&cache_lock); /* FIXME */
3140 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
3141 pthread_mutex_unlock(&cache_lock);
3142
3143 memcpy(ITEM_data(it), buf, res);
3144 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
3145 do_item_update(it);
3146 }
3147
3148 if (cas) {
3149 *cas = ITEM_get_cas(it); /* swap the incoming CAS value */
3150 }
3151 do_item_remove(it); /* release our reference */
3152 return OK;
3153 }
3154
3155 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
3156 char *key;
3157 size_t nkey;
3158 item *it;
3159
3160 assert(c != NULL);
3161
3162 if (ntokens > 3) {
3163 bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value, "0") == 0;
3164 bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
3165 bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
3166 || (ntokens == 5 && hold_is_zero && sets_noreply);
3167 if (!valid) {
3168 out_string(c, "CLIENT_ERROR bad command line format. "
3169 "Usage: delete <key> [noreply]");
3170 return;
3171 }
3172 }
3173
3174
3175 key = tokens[KEY_TOKEN].value;
3176 nkey = tokens[KEY_TOKEN].length;
3177
3178 if(nkey > KEY_MAX_LENGTH) {
3179 out_string(c, "CLIENT_ERROR bad command line format");
3180 return;
3181 }
3182
3183 if (settings.detail_enabled) {
3184 stats_prefix_record_delete(key, nkey);
3185 }
3186
3187 it = item_get(key, nkey);
3188 if (it) {
3189 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
3190
3191 pthread_mutex_lock(&c->thread->stats.mutex);
3192 c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
3193 pthread_mutex_unlock(&c->thread->stats.mutex);
3194
3195 item_unlink(it);
3196 item_remove(it); /* release our reference */
3197 out_string(c, "DELETED");
3198 } else {
3199 pthread_mutex_lock(&c->thread->stats.mutex);
3200 c->thread->stats.delete_misses++;
3201 pthread_mutex_unlock(&c->thread->stats.mutex);
3202
3203 out_string(c, "NOT_FOUND");
3204 }
3205 }
3206
3207 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
3208 unsigned int level;
3209
3210 assert(c != NULL);
3211
3212 set_noreply_maybe(c, tokens, ntokens);
3213
3214 level = strtoul(tokens[1].value, NULL, 10);
3215 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
3216 out_string(c, "OK");
3217 return;
3218 }
3219
3220 static void process_slabs_automove_command(conn *c, token_t *tokens, const size_t ntokens) {
3221 unsigned int level;
3222
3223 assert(c != NULL);
3224
3225 set_noreply_maybe(c, tokens, ntokens);
3226
3227 level = strtoul(tokens[2].value, NULL, 10);
3228 if (level == 0) {
3229 settings.slab_automove = false;
3230 } else if (level == 1) {
3231 settings.slab_automove = true;
3232 } else {
3233 out_string(c, "ERROR");
3234 return;
3235 }
3236 out_string(c, "OK");
3237 return;
3238 }
3239
3240 static void process_command(conn *c, char *command) {
3241
3242 token_t tokens[MAX_TOKENS];
3243 size_t ntokens;
3244 int comm;
3245
3246 assert(c != NULL);
3247
3248 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
3249
3250 if (settings.verbose > 1)
3251 fprintf(stderr, "<%d %s\n", c->sfd, command);
3252
3253 /*
3254 * for commands set/add/replace, we build an item and read the data
3255 * directly into it, then continue in nread_complete().
3256 */
3257
3258 c->msgcurr = 0;
3259 c->msgused = 0;
3260 c->iovused = 0;
3261 if (add_msghdr(c) != 0) {
3262 out_string(c, "SERVER_ERROR out of memory preparing response");
3263 return;
3264 }
3265
3266 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
3267 if (ntokens >= 3 &&
3268 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
3269 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
3270
3271 process_get_command(c, tokens, ntokens, false);
3272
3273 } else if ((ntokens == 6 || ntokens == 7) &&
3274 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
3275 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
3276 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
3277 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
3278 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
3279
3280 process_update_command(c, tokens, ntokens, comm, false);
3281
3282 } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
3283
3284 process_update_command(c, tokens, ntokens, comm, true);
3285
3286 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
3287
3288 process_arithmetic_command(c, tokens, ntokens, 1);
3289
3290 } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
3291
3292 process_get_command(c, tokens, ntokens, true);
3293
3294 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
3295
3296 process_arithmetic_command(c, tokens, ntokens, 0);
3297
3298 } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
3299
3300 process_delete_command(c, tokens, ntokens);
3301
3302 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "touch") == 0)) {
3303
3304 process_touch_command(c, tokens, ntokens);
3305
3306 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
3307
3308 process_stat(c, tokens, ntokens);
3309
3310 } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
3311 time_t exptime = 0;
3312
3313 set_noreply_maybe(c, tokens, ntokens);
3314
3315 pthread_mutex_lock(&c->thread->stats.mutex);
3316 c->thread->stats.flush_cmds++;
3317 pthread_mutex_unlock(&c->thread->stats.mutex);
3318
3319 if(ntokens == (c->noreply ? 3 : 2)) {
3320 settings.oldest_live = current_time - 1;
3321 item_flush_expired();
3322 out_string(c, "OK");
3323 return;
3324 }
3325
3326 exptime = strtol(tokens[1].value, NULL, 10);
3327 if(errno == ERANGE) {
3328 out_string(c, "CLIENT_ERROR bad command line format");
3329 return;
3330 }
3331
3332 /*
3333 If exptime is zero realtime() would return zero too, and
3334 realtime(exptime) - 1 would overflow to the max unsigned
3335 value. So we process exptime == 0 the same way we do when
3336 no delay is given at all.
3337 */
3338 if (exptime > 0)
3339 settings.oldest_live = realtime(exptime) - 1;
3340 else /* exptime == 0 */
3341 settings.oldest_live = current_time - 1;
3342 item_flush_expired();
3343 out_string(c, "OK");
3344 return;
3345
3346 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
3347
3348 out_string(c, "VERSION " RVERSION);
3349
3350 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
3351
3352 conn_set_state(c, conn_closing);
3353
3354 } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) {
3355 if (ntokens == 5 && strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0) {
3356 int src, dst, rv;
3357
3358 if (settings.slab_reassign == false) {
3359 out_string(c, "CLIENT_ERROR slab reassignment disabled");
3360 return;
3361 }
3362
3363 src = strtol(tokens[2].value, NULL, 10);
3364 dst = strtol(tokens[3].value, NULL, 10);
3365
3366 if (errno == ERANGE) {
3367 out_string(c, "CLIENT_ERROR bad command line format");
3368 return;
3369 }
3370
3371 rv = slabs_reassign(src, dst);
3372 switch (rv) {
3373 case REASSIGN_OK:
3374 out_string(c, "OK");
3375 break;
3376 case REASSIGN_RUNNING:
3377 out_string(c, "BUSY currently processing reassign request");
3378 break;
3379 case REASSIGN_BADCLASS:
3380 out_string(c, "BADCLASS invalid src or dst class id");
3381 break;
3382 case REASSIGN_NOSPARE:
3383 out_string(c, "NOSPARE source class has no spare pages");
3384 break;
3385 case REASSIGN_DEST_NOT_FULL:
3386 out_string(c, "NOTFULL dest class has spare memory");
3387 break;
3388 case REASSIGN_SRC_NOT_SAFE:
3389 out_string(c, "UNSAFE src class is in an unsafe state");
3390 break;
3391 case REASSIGN_SRC_DST_SAME:
3392 out_string(c, "SAME src and dst class are identical");
3393 break;
3394 }
3395 return;
3396 } else if (ntokens == 4 &&
3397 (strcmp(tokens[COMMAND_TOKEN + 1].value, "automove") == 0)) {
3398 process_slabs_automove_command(c, tokens, ntokens);
3399 } else {
3400 out_string(c, "ERROR");
3401 }
3402 } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
3403 process_verbosity_command(c, tokens, ntokens);
3404 } else {
3405 out_string(c, "ERROR");
3406 }
3407 return;
3408 }
3409
3410 /*
3411 * if we have a complete line in the buffer, process it.
3412 */
3413 static int try_read_command(conn *c) {
3414 assert(c != NULL);
3415 assert(c->rcurr <= (c->rbuf + c->rsize));
3416 assert(c->rbytes > 0);
3417
3418 if (c->protocol == negotiating_prot || c->transport == udp_transport) {
3419 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
3420 c->protocol = binary_prot;
3421 } else {
3422 c->protocol = ascii_prot;
3423 }
3424
3425 if (settings.verbose > 1) {
3426 fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
3427 prot_text(c->protocol));
3428 }
3429 }
3430
3431 if (c->protocol == binary_prot) {
3432 /* Do we have the complete packet header? */
3433 if (c->rbytes < sizeof(c->binary_header)) {
3434 /* need more data! */
3435 return 0;
3436 } else {
3437 #ifdef NEED_ALIGN
3438 if (((long)(c->rcurr)) % 8 != 0) {
3439 /* must realign input buffer */
3440 memmove(c->rbuf, c->rcurr, c->rbytes);
3441 c->rcurr = c->rbuf;
3442 if (settings.verbose > 1) {
3443 fprintf(stderr, "%d: Realign input buffer\n", c->sfd);
3444 }
3445 }
3446 #endif
3447 protocol_binary_request_header* req;
3448 req = (protocol_binary_request_header*)c->rcurr;
3449
3450 if (settings.verbose > 1) {
3451 /* Dump the packet before we convert it to host order */
3452 int ii;
3453 fprintf(stderr, "<%d Read binary protocol data:", c->sfd);
3454 for (ii = 0; ii < sizeof(req->bytes); ++ii) {
3455 if (ii % 4 == 0) {
3456 fprintf(stderr, "\n<%d ", c->sfd);
3457 }
3458 fprintf(stderr, " 0x%02x", req->bytes[ii]);
3459 }
3460 fprintf(stderr, "\n");
3461 }
3462
3463 c->binary_header = *req;
3464 c->binary_header.request.keylen = ntohs(req->request.keylen);
3465 c->binary_header.request.bodylen = ntohl(req->request.bodylen);
3466 c->binary_header.request.cas = ntohll(req->request.cas);
3467
3468 if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ) {
3469 if (settings.verbose) {
3470 fprintf(stderr, "Invalid magic: %x\n",
3471 c->binary_header.request.magic);
3472 }
3473 conn_set_state(c, conn_closing);
3474 return -1;
3475 }
3476
3477 c->msgcurr = 0;
3478 c->msgused = 0;
3479 c->iovused = 0;
3480 if (add_msghdr(c) != 0) {
3481 out_string(c, "SERVER_ERROR out of memory");
3482 return 0;
3483 }
3484
3485 c->cmd = c->binary_header.request.opcode;
3486 c->keylen = c->binary_header.request.keylen;
3487 c->opaque = c->binary_header.request.opaque;
3488 /* clear the returned cas value */
3489 c->cas = 0;
3490
3491 dispatch_bin_command(c);
3492
3493 c->rbytes -= sizeof(c->binary_header);
3494 c->rcurr += sizeof(c->binary_header);
3495 }
3496 } else {
3497 char *el, *cont;
3498
3499 if (c->rbytes == 0)
3500 return 0;
3501
3502 el = memchr(c->rcurr, '\n', c->rbytes);
3503 if (!el) {
3504 if (c->rbytes > 1024) {
3505 /*
3506 * We didn't have a '\n' in the first k. This _has_ to be a
3507 * large multiget, if not we should just nuke the connection.
3508 */
3509 char *ptr = c->rcurr;
3510 while (*ptr == ' ') { /* ignore leading whitespaces */
3511 ++ptr;
3512 }
3513
3514 if (ptr - c->rcurr > 100 ||
3515 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
3516
3517 conn_set_state(c, conn_closing);
3518 return 1;
3519 }
3520 }
3521
3522 return 0;
3523 }
3524 cont = el + 1;
3525 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
3526 el--;
3527 }
3528 *el = '\0';
3529
3530 assert(cont <= (c->rcurr + c->rbytes));
3531
3532 process_command(c, c->rcurr);
3533
3534 c->rbytes -= (cont - c->rcurr);
3535 c->rcurr = cont;
3536
3537 assert(c->rcurr <= (c->rbuf + c->rsize));
3538 }
3539
3540 return 1;
3541 }
3542
3543 /*
3544 * read a UDP request.
3545 */
3546 static enum try_read_result try_read_udp(conn *c) {
3547 int res;
3548
3549 assert(c != NULL);
3550
3551 c->request_addr_size = sizeof(c->request_addr);
3552 res = recvfrom(c->sfd, c->rbuf, c->rsize,
3553 0, &c->request_addr, &c->request_addr_size);
3554 if (res > 8) {
3555 unsigned char *buf = (unsigned char *)c->rbuf;
3556 pthread_mutex_lock(&c->thread->stats.mutex);
3557 c->thread->stats.bytes_read += res;
3558 pthread_mutex_unlock(&c->thread->stats.mutex);
3559
3560 /* Beginning of UDP packet is the request ID; save it. */
3561 c->request_id = buf[0] * 256 + buf[1];
3562
3563 /* If this is a multi-packet request, drop it. */
3564 if (buf[4] != 0 || buf[5] != 1) {
3565 out_string(c, "SERVER_ERROR multi-packet request not supported");
3566 return READ_NO_DATA_RECEIVED;
3567 }
3568
3569 /* Don't care about any of the rest of the header. */
3570 res -= 8;
3571 memmove(c->rbuf, c->rbuf + 8, res);
3572
3573 c->rbytes = res;
3574 c->rcurr = c->rbuf;
3575 return READ_DATA_RECEIVED;
3576 }
3577 return READ_NO_DATA_RECEIVED;
3578 }
3579
3580 /*
3581 * read from network as much as we can, handle buffer overflow and connection
3582 * close.
3583 * before reading, move the remaining incomplete fragment of a command
3584 * (if any) to the beginning of the buffer.
3585 *
3586 * To protect us from someone flooding a connection with bogus data causing
3587 * the connection to eat up all available memory, break out and start looking
3588 * at the data I've got after a number of reallocs...
3589 *
3590 * @return enum try_read_result
3591 */
3592 static enum try_read_result try_read_network(conn *c) {
3593 enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
3594 int res;
3595 int num_allocs = 0;
3596 assert(c != NULL);
3597
3598 if (c->rcurr != c->rbuf) {
3599 if (c->rbytes != 0) /* otherwise there's nothing to copy */
3600 memmove(c->rbuf, c->rcurr, c->rbytes);
3601 c->rcurr = c->rbuf;
3602 }
3603
3604 while (1) {
3605 if (c->rbytes >= c->rsize) {
3606 if (num_allocs == 4) {
3607 return gotdata;
3608 }
3609 ++num_allocs;
3610 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
3611 if (!new_rbuf) {
3612 if (settings.verbose > 0)
3613 fprintf(stderr, "Couldn't realloc input buffer\n");
3614 c->rbytes = 0; /* ignore what we read */
3615 out_string(c, "SERVER_ERROR out of memory reading request");
3616 c->write_and_go = conn_closing;
3617 return READ_MEMORY_ERROR;
3618 }
3619 c->rcurr = c->rbuf = new_rbuf;
3620 c->rsize *= 2;
3621 }
3622
3623 int avail = c->rsize - c->rbytes;
3624 res = read(c->sfd, c->rbuf + c->rbytes, avail);
3625 if (res > 0) {
3626 pthread_mutex_lock(&c->thread->stats.mutex);
3627 c->thread->stats.bytes_read += res;
3628 pthread_mutex_unlock(&c->thread->stats.mutex);
3629 gotdata = READ_DATA_RECEIVED;
3630 c->rbytes += res;
3631 if (res == avail) {
3632 continue;
3633 } else {
3634 break;
3635 }
3636 }
3637 if (res == 0) {
3638 return READ_ERROR;
3639 }
3640 if (res == -1) {
3641 if (errno == EAGAIN || errno == EWOULDBLOCK) {
3642 break;
3643 }
3644 return READ_ERROR;
3645 }
3646 }
3647 return gotdata;
3648 }
3649
3650 static bool update_event(conn *c, const int new_flags) {
3651 assert(c != NULL);
3652
3653 struct event_base *base = c->event.ev_base;
3654 if (c->ev_flags == new_flags)
3655 return true;
3656 if (event_del(&c->event) == -1) return false;
3657 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
3658 event_base_set(base, &c->event);
3659 c->ev_flags = new_flags;
3660 if (event_add(&c->event, 0) == -1) return false;
3661 return true;
3662 }
3663
3664 /*
3665 * Sets whether we are listening for new connections or not.
3666 */
3667 void do_accept_new_conns(const bool do_accept) {
3668 conn *next;
3669
3670 for (next = listen_conn; next; next = next->next) {
3671 if (do_accept) {
3672 update_event(next, EV_READ | EV_PERSIST);
3673 if (listen(next->sfd, settings.backlog) != 0) {
3674 perror("listen");
3675 }
3676 }
3677 else {
3678 update_event(next, 0);
3679 if (listen(next->sfd, 0) != 0) {
3680 perror("listen");
3681 }
3682 }
3683 }
3684
3685 if (do_accept) {
3686 STATS_LOCK();
3687 stats.accepting_conns = true;
3688 STATS_UNLOCK();
3689 } else {
3690 STATS_LOCK();
3691 stats.accepting_conns = false;
3692 stats.listen_disabled_num++;
3693 STATS_UNLOCK();
3694 allow_new_conns = false;
3695 maxconns_handler(-42, 0, 0);
3696 }
3697 }
3698
3699 /*
3700 * Transmit the next chunk of data from our list of msgbuf structures.
3701 *
3702 * Returns:
3703 * TRANSMIT_COMPLETE All done writing.
3704 * TRANSMIT_INCOMPLETE More data remaining to write.
3705 * TRANSMIT_SOFT_ERROR Can't write any more right now.
3706 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3707 */
3708 static enum transmit_result transmit(conn *c) {
3709 assert(c != NULL);
3710
3711 if (c->msgcurr < c->msgused &&
3712 c->msglist[c->msgcurr].msg_iovlen == 0) {
3713 /* Finished writing the current msg; advance to the next. */
3714 c->msgcurr++;
3715 }
3716 if (c->msgcurr < c->msgused) {
3717 ssize_t res;
3718 struct msghdr *m = &c->msglist[c->msgcurr];
3719
3720 res = sendmsg(c->sfd, m, 0);
3721 if (res > 0) {
3722 pthread_mutex_lock(&c->thread->stats.mutex);
3723 c->thread->stats.bytes_written += res;
3724 pthread_mutex_unlock(&c->thread->stats.mutex);
3725
3726 /* We've written some of the data. Remove the completed
3727 iovec entries from the list of pending writes. */
3728 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
3729 res -= m->msg_iov->iov_len;
3730 m->msg_iovlen--;
3731 m->msg_iov++;
3732 }
3733
3734 /* Might have written just part of the last iovec entry;
3735 adjust it so the next write will do the rest. */
3736 if (res > 0) {
3737 m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
3738 m->msg_iov->iov_len -= res;
3739 }
3740 return TRANSMIT_INCOMPLETE;
3741 }
3742 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3743 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3744 if (settings.verbose > 0)
3745 fprintf(stderr, "Couldn't update event\n");
3746 conn_set_state(c, conn_closing);
3747 return TRANSMIT_HARD_ERROR;
3748 }
3749 return TRANSMIT_SOFT_ERROR;
3750 }
3751 /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3752 we have a real error, on which we close the connection */
3753 if (settings.verbose > 0)
3754 perror("Failed to write, and not due to blocking");
3755
3756 if (IS_UDP(c->transport))
3757 conn_set_state(c, conn_read);
3758 else
3759 conn_set_state(c, conn_closing);
3760 return TRANSMIT_HARD_ERROR;
3761 } else {
3762 return TRANSMIT_COMPLETE;
3763 }
3764 }
3765
3766 static void drive_machine(conn *c) {
3767 bool stop = false;
3768 int sfd, flags = 1;
3769 socklen_t addrlen;
3770 struct sockaddr_storage addr;
3771 int nreqs = settings.reqs_per_event;
3772 int res;
3773 const char *str;
3774
3775 assert(c != NULL);
3776
3777 while (!stop) {
3778
3779 switch(c->state) {
3780 case conn_listening:
3781 addrlen = sizeof(addr);
3782 if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
3783 if (errno == EAGAIN || errno == EWOULDBLOCK) {
3784 /* these are transient, so don't log anything */
3785 stop = true;
3786 } else if (errno == EMFILE) {
3787 if (settings.verbose > 0)
3788 fprintf(stderr, "Too many open connections\n");
3789 accept_new_conns(false);
3790 stop = true;
3791 } else {
3792 perror("accept()");
3793 stop = true;
3794 }
3795 break;
3796 }
3797 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
3798 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
3799 perror("setting O_NONBLOCK");
3800 close(sfd);
3801 break;
3802 }
3803
3804 if (settings.maxconns_fast &&
3805 stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
3806 str = "ERROR Too many open connections\r\n";
3807 res = write(sfd, str, strlen(str));
3808 close(sfd);
3809 STATS_LOCK();
3810 stats.rejected_conns++;
3811 STATS_UNLOCK();
3812 } else {
3813 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
3814 DATA_BUFFER_SIZE, tcp_transport);
3815 }
3816
3817 stop = true;
3818 break;
3819
3820 case conn_waiting:
3821 if (!update_event(c, EV_READ | EV_PERSIST)) {
3822 if (settings.verbose > 0)
3823 fprintf(stderr, "Couldn't update event\n");
3824 conn_set_state(c, conn_closing);
3825 break;
3826 }
3827
3828 conn_set_state(c, conn_read);
3829 stop = true;
3830 break;
3831
3832 case conn_read:
3833 res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
3834
3835 switch (res) {
3836 case READ_NO_DATA_RECEIVED:
3837 conn_set_state(c, conn_waiting);
3838 break;
3839 case READ_DATA_RECEIVED:
3840 conn_set_state(c, conn_parse_cmd);
3841 break;
3842 case READ_ERROR:
3843 conn_set_state(c, conn_closing);
3844 break;
3845 case READ_MEMORY_ERROR: /* Failed to allocate more memory */
3846 /* State already set by try_read_network */
3847 break;
3848 }
3849 break;
3850
3851 case conn_parse_cmd :
3852 if (try_read_command(c) == 0) {
3853 /* wee need more data! */
3854 conn_set_state(c, conn_waiting);
3855 }
3856
3857 break;
3858
3859 case conn_new_cmd:
3860 /* Only process nreqs at a time to avoid starving other
3861 connections */
3862
3863 --nreqs;
3864 if (nreqs >= 0) {
3865 reset_cmd_handler(c);
3866 } else {
3867 pthread_mutex_lock(&c->thread->stats.mutex);
3868 c->thread->stats.conn_yields++;
3869 pthread_mutex_unlock(&c->thread->stats.mutex);
3870 if (c->rbytes > 0) {
3871 /* We have already read in data into the input buffer,
3872 so libevent will most likely not signal read events
3873 on the socket (unless more data is available. As a
3874 hack we should just put in a request to write data,
3875 because that should be possible ;-)
3876 */
3877 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3878 if (settings.verbose > 0)
3879 fprintf(stderr, "Couldn't update event\n");
3880 conn_set_state(c, conn_closing);
3881 }
3882 }
3883 stop = true;
3884 }
3885 break;
3886
3887 case conn_nread:
3888 if (c->rlbytes == 0) {
3889 complete_nread(c);
3890 break;
3891 }
3892 /* first check if we have leftovers in the conn_read buffer */
3893 if (c->rbytes > 0) {
3894 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
3895 if (c->ritem != c->rcurr) {
3896 memmove(c->ritem, c->rcurr, tocopy);
3897 }
3898 c->ritem += tocopy;
3899 c->rlbytes -= tocopy;
3900 c->rcurr += tocopy;
3901 c->rbytes -= tocopy;
3902 if (c->rlbytes == 0) {
3903 break;
3904 }
3905 }
3906
3907 /* now try reading from the socket */
3908 res = read(c->sfd, c->ritem, c->rlbytes);
3909 if (res > 0) {
3910 pthread_mutex_lock(&c->thread->stats.mutex);
3911 c->thread->stats.bytes_read += res;
3912 pthread_mutex_unlock(&c->thread->stats.mutex);
3913 if (c->rcurr == c->ritem) {
3914 c->rcurr += res;
3915 }
3916 c->ritem += res;
3917 c->rlbytes -= res;
3918 break;
3919 }
3920 if (res == 0) { /* end of stream */
3921 conn_set_state(c, conn_closing);
3922 break;
3923 }
3924 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3925 if (!update_event(c, EV_READ | EV_PERSIST)) {
3926 if (settings.verbose > 0)
3927 fprintf(stderr, "Couldn't update event\n");
3928 conn_set_state(c, conn_closing);
3929 break;
3930 }
3931 stop = true;
3932 break;
3933 }
3934 /* otherwise we have a real error, on which we close the connection */
3935 if (settings.verbose > 0) {
3936 fprintf(stderr, "Failed to read, and not due to blocking:\n"
3937 "errno: %d %s \n"
3938 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3939 errno, strerror(errno),
3940 (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
3941 (int)c->rlbytes, (int)c->rsize);
3942 }
3943 conn_set_state(c, conn_closing);
3944 break;
3945
3946 case conn_swallow:
3947 /* we are reading sbytes and throwing them away */
3948 if (c->sbytes == 0) {
3949 conn_set_state(c, conn_new_cmd);
3950 break;
3951 }
3952
3953 /* first check if we have leftovers in the conn_read buffer */
3954 if (c->rbytes > 0) {
3955 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
3956 c->sbytes -= tocopy;
3957 c->rcurr += tocopy;
3958 c->rbytes -= tocopy;
3959 break;
3960 }
3961
3962 /* now try reading from the socket */
3963 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
3964 if (res > 0) {
3965 pthread_mutex_lock(&c->thread->stats.mutex);
3966 c->thread->stats.bytes_read += res;
3967 pthread_mutex_unlock(&c->thread->stats.mutex);
3968 c->sbytes -= res;
3969 break;
3970 }
3971 if (res == 0) { /* end of stream */
3972 conn_set_state(c, conn_closing);
3973 break;
3974 }
3975 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3976 if (!update_event(c, EV_READ | EV_PERSIST)) {
3977 if (settings.verbose > 0)
3978 fprintf(stderr, "Couldn't update event\n");
3979 conn_set_state(c, conn_closing);
3980 break;
3981 }
3982 stop = true;
3983 break;
3984 }
3985 /* otherwise we have a real error, on which we close the connection */
3986 if (settings.verbose > 0)
3987 fprintf(stderr, "Failed to read, and not due to blocking\n");
3988 conn_set_state(c, conn_closing);
3989 break;
3990
3991 case conn_write:
3992 /*
3993 * We want to write out a simple response. If we haven't already,
3994 * assemble it into a msgbuf list (this will be a single-entry
3995 * list for TCP or a two-entry list for UDP).
3996 */
3997 if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
3998 if (add_iov(c, c->wcurr, c->wbytes) != 0) {
3999 if (settings.verbose > 0)
4000 fprintf(stderr, "Couldn't build response\n");
4001 conn_set_state(c, conn_closing);
4002 break;
4003 }
4004 }
4005
4006 /* fall through... */
4007
4008 case conn_mwrite:
4009 if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
4010 if (settings.verbose > 0)
4011 fprintf(stderr, "Failed to build UDP headers\n");
4012 conn_set_state(c, conn_closing);
4013 break;
4014 }
4015 switch (transmit(c)) {
4016 case TRANSMIT_COMPLETE:
4017 if (c->state == conn_mwrite) {
4018 while (c->ileft > 0) {
4019 item *it = *(c->icurr);
4020 assert((it->it_flags & ITEM_SLABBED) == 0);
4021 item_remove(it);
4022 c->icurr++;
4023 c->ileft--;
4024 }
4025 while (c->suffixleft > 0) {
4026 char *suffix = *(c->suffixcurr);
4027 cache_free(c->thread->suffix_cache, suffix);
4028 c->suffixcurr++;
4029 c->suffixleft--;
4030 }
4031 /* XXX: I don't know why this wasn't the general case */
4032 if(c->protocol == binary_prot) {
4033 conn_set_state(c, c->write_and_go);
4034 } else {
4035 conn_set_state(c, conn_new_cmd);
4036 }
4037 } else if (c->state == conn_write) {
4038 if (c->write_and_free) {
4039 free(c->write_and_free);
4040 c->write_and_free = 0;
4041 }
4042 conn_set_state(c, c->write_and_go);
4043 } else {
4044 if (settings.verbose > 0)
4045 fprintf(stderr, "Unexpected state %d\n", c->state);
4046 conn_set_state(c, conn_closing);
4047 }
4048 break;
4049
4050 case TRANSMIT_INCOMPLETE:
4051 case TRANSMIT_HARD_ERROR:
4052 break; /* Continue in state machine. */
4053
4054 case TRANSMIT_SOFT_ERROR:
4055 stop = true;
4056 break;
4057 }
4058 break;
4059
4060 case conn_closing:
4061 if (IS_UDP(c->transport))
4062 conn_cleanup(c);
4063 else
4064 conn_close(c);
4065 stop = true;
4066 break;
4067
4068 case conn_max_state:
4069 assert(false);
4070 break;
4071 }
4072 }
4073
4074 return;
4075 }
4076
4077 void event_handler(const int fd, const short which, void *arg) {
4078 conn *c;
4079
4080 c = (conn *)arg;
4081 assert(c != NULL);
4082
4083 c->which = which;
4084
4085 /* sanity */
4086 if (fd != c->sfd) {
4087 if (settings.verbose > 0)
4088 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
4089 conn_close(c);
4090 return;
4091 }
4092
4093 drive_machine(c);
4094
4095 /* wait for next event */
4096 return;
4097 }
4098
4099 static int new_socket(struct addrinfo *ai) {
4100 int sfd;
4101 int flags;
4102
4103 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
4104 return -1;
4105 }
4106
4107 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
4108 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
4109 perror("setting O_NONBLOCK");
4110 close(sfd);
4111 return -1;
4112 }
4113 return sfd;
4114 }
4115
4116
4117 /*
4118 * Sets a socket's send buffer size to the maximum allowed by the system.
4119 */
4120 static void maximize_sndbuf(const int sfd) {
4121 socklen_t intsize = sizeof(int);
4122 int last_good = 0;
4123 int min, max, avg;
4124 int old_size;
4125
4126 /* Start with the default size. */
4127 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
4128 if (settings.verbose > 0)
4129 perror("getsockopt(SO_SNDBUF)");
4130 return;
4131 }
4132
4133 /* Binary-search for the real maximum. */
4134 min = old_size;
4135 max = MAX_SENDBUF_SIZE;
4136
4137 while (min <= max) {
4138 avg = ((unsigned int)(min + max)) / 2;
4139 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
4140 last_good = avg;
4141 min = avg + 1;
4142 } else {
4143 max = avg - 1;
4144 }
4145 }
4146
4147 if (settings.verbose > 1)
4148 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
4149 }
4150
4151 /**
4152 * Create a socket and bind it to a specific port number
4153 * @param interface the interface to bind to
4154 * @param port the port number to bind to
4155 * @param transport the transport protocol (TCP / UDP)
4156 * @param portnumber_file A filepointer to write the port numbers to
4157 * when they are successfully added to the list of ports we
4158 * listen on.
4159 */
4160 static int server_socket(const char *interface,
4161 int port,
4162 enum network_transport transport,
4163 FILE *portnumber_file) {
4164 int sfd;
4165 struct linger ling = {0, 0};
4166 struct addrinfo *ai;
4167 struct addrinfo *next;
4168 struct addrinfo hints = { .ai_flags = AI_PASSIVE,
4169 .ai_family = AF_UNSPEC };
4170 char port_buf[NI_MAXSERV];
4171 int error;
4172 int success = 0;
4173 int flags =1;
4174
4175 hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
4176
4177 if (port == -1) {
4178 port = 0;
4179 }
4180 snprintf(port_buf, sizeof(port_buf), "%d", port);
4181 error= getaddrinfo(interface, port_buf, &hints, &ai);
4182 if (error != 0) {
4183 if (error != EAI_SYSTEM)
4184 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
4185 else
4186 perror("getaddrinfo()");
4187 return 1;
4188 }
4189
4190 for (next= ai; next; next= next->ai_next) {
4191 conn *listen_conn_add;
4192 if ((sfd = new_socket(next)) == -1) {
4193 /* getaddrinfo can return "junk" addresses,
4194 * we make sure at least one works before erroring.
4195 */
4196 if (errno == EMFILE) {
4197 /* ...unless we're out of fds */
4198 perror("server_socket");
4199 exit(EX_OSERR);
4200 }
4201 continue;
4202 }
4203
4204 #ifdef IPV6_V6ONLY
4205 if (next->ai_family == AF_INET6) {
4206 error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
4207 if (error != 0) {
4208 perror("setsockopt");
4209 close(sfd);
4210 continue;
4211 }
4212 }
4213 #endif
4214
4215 error = setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flags, sizeof(flags));
4216 if (error != 0)
4217 {
4218 perror("setsockopt(SO_REUSEADDR)");
4219 }
4220
4221 if (IS_UDP(transport)) {
4222 maximize_sndbuf(sfd);
4223 } else {
4224 error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
4225 if (error != 0)
4226 perror("setsockopt");
4227
4228 error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
4229 if (error != 0)
4230 perror("setsockopt");
4231
4232 error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
4233 if (error != 0)
4234 perror("setsockopt");
4235 }
4236
4237 if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
4238 if (errno != EADDRINUSE) {
4239 perror("bind()");
4240 close(sfd);
4241 freeaddrinfo(ai);
4242 return 1;
4243 }
4244 close(sfd);
4245 continue;
4246 } else {
4247 success++;
4248 if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
4249 perror("listen()");
4250 close(sfd);
4251 freeaddrinfo(ai);
4252 return 1;
4253 }
4254 if (portnumber_file != NULL &&
4255 (next->ai_addr->sa_family == AF_INET ||
4256 next->ai_addr->sa_family == AF_INET6)) {
4257 union {
4258 struct sockaddr_in in;
4259 struct sockaddr_in6 in6;
4260 } my_sockaddr;
4261 socklen_t len = sizeof(my_sockaddr);
4262 if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
4263 if (next->ai_addr->sa_family == AF_INET) {
4264 fprintf(portnumber_file, "%s INET: %u\n",
4265 IS_UDP(transport) ? "UDP" : "TCP",
4266 ntohs(my_sockaddr.in.sin_port));
4267 } else {
4268 fprintf(portnumber_file, "%s INET6: %u\n",
4269 IS_UDP(transport) ? "UDP" : "TCP",
4270 ntohs(my_sockaddr.in6.sin6_port));
4271 }
4272 }
4273 }
4274 }
4275
4276 if (IS_UDP(transport)) {
4277 int c;
4278
4279 for (c = 0; c < settings.num_threads_per_udp; c++) {
4280 /* this is guaranteed to hit all threads because we round-robin */
4281 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
4282 UDP_READ_BUFFER_SIZE, transport);
4283 }
4284 } else {
4285 if (!(listen_conn_add = conn_new(sfd, conn_listening,
4286 EV_READ | EV_PERSIST, 1,
4287 transport, main_base))) {
4288 fprintf(stderr, "failed to create listening connection\n");
4289 exit(EXIT_FAILURE);
4290 }
4291 listen_conn_add->next = listen_conn;
4292 listen_conn = listen_conn_add;
4293 }
4294 }
4295
4296 freeaddrinfo(ai);
4297
4298 /* Return zero iff we detected no errors in starting up connections */
4299 return success == 0;
4300 }
4301
4302 static int server_sockets(int port, enum network_transport transport,
4303 FILE *portnumber_file) {
4304 if (settings.inter == NULL) {
4305 return server_socket(settings.inter, port, transport, portnumber_file);
4306 } else {
4307 // tokenize them and bind to each one of them..
4308 char *b;
4309 int ret = 0;
4310 char *list = strdup(settings.inter);
4311
4312 if (list == NULL) {
4313 fprintf(stderr, "Failed to allocate memory for parsing server interface string\n");
4314 return 1;
4315 }
4316 for (char *p = strtok_r(list, ";,", &b);
4317 p != NULL;
4318 p = strtok_r(NULL, ";,", &b)) {
4319 int the_port = port;
4320 char *s = strchr(p, ':');
4321 if (s != NULL) {
4322 *s = '\0';
4323 ++s;
4324 if (!safe_strtol(s, &the_port)) {
4325 fprintf(stderr, "Invalid port number: \"%s\"", s);
4326 return 1;
4327 }
4328 }
4329 if (strcmp(p, "*") == 0) {
4330 p = NULL;
4331 }
4332 ret |= server_socket(p, the_port, transport, portnumber_file);
4333 }
4334 free(list);
4335 return ret;
4336 }
4337 }
4338
4339 static int new_socket_unix(void) {
4340 int sfd;
4341 int flags;
4342
4343 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
4344 perror("socket()");
4345 return -1;
4346 }
4347
4348 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
4349 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
4350 perror("setting O_NONBLOCK");
4351 close(sfd);
4352 return -1;
4353 }
4354 return sfd;
4355 }
4356
4357 static int server_socket_unix(const char *path, int access_mask) {
4358 int sfd;
4359 struct linger ling = {0, 0};
4360 struct sockaddr_un addr;
4361 struct stat tstat;
4362 int flags =1;
4363 int old_umask;
4364
4365 if (!path) {
4366 return 1;
4367 }
4368
4369 if ((sfd = new_socket_unix()) == -1) {
4370 return 1;
4371 }
4372
4373 /*
4374 * Clean up a previous socket file if we left it around
4375 */
4376 if (lstat(path, &tstat) == 0) {
4377 if (S_ISSOCK(tstat.st_mode))
4378 unlink(path);
4379 }
4380
4381 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
4382 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
4383 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
4384
4385 /*
4386 * the memset call clears nonstandard fields in some impementations
4387 * that otherwise mess things up.
4388 */
4389 memset(&addr, 0, sizeof(addr));
4390
4391 addr.sun_family = AF_UNIX;
4392 strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
4393 assert(strcmp(addr.sun_path, path) == 0);
4394 old_umask = umask( ~(access_mask&0777));
4395 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
4396 perror("bind()");
4397 close(sfd);
4398 umask(old_umask);
4399 return 1;
4400 }
4401 umask(old_umask);
4402 if (listen(sfd, settings.backlog) == -1) {
4403 perror("listen()");
4404 close(sfd);
4405 return 1;
4406 }
4407 if (!(listen_conn = conn_new(sfd, conn_listening,
4408 EV_READ | EV_PERSIST, 1,
4409 local_transport, main_base))) {
4410 fprintf(stderr, "failed to create listening connection\n");
4411 exit(EXIT_FAILURE);
4412 }
4413
4414 return 0;
4415 }
4416
4417 /*
4418 * We keep the current time of day in a global variable that's updated by a
4419 * timer event. This saves us a bunch of time() system calls (we really only
4420 * need to get the time once a second, whereas there can be tens of thousands
4421 * of requests a second) and allows us to use server-start-relative timestamps
4422 * rather than absolute UNIX timestamps, a space savings on systems where
4423 * sizeof(time_t) > sizeof(unsigned int).
4424 */
4425 volatile rel_time_t current_time;
4426 static struct event clockevent;
4427
4428 /* libevent uses a monotonic clock when available for event scheduling. Aside
4429 * from jitter, simply ticking our internal timer here is accurate enough.
4430 * Note that users who are setting explicit dates for expiration times *must*
4431 * ensure their clocks are correct before starting memcached. */
4432 static void clock_handler(const int fd, const short which, void *arg) {
4433 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
4434 static bool initialized = false;
4435 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4436 static bool monotonic = false;
4437 static time_t monotonic_start;
4438 #endif
4439
4440 if (initialized) {
4441 /* only delete the event if it's actually there. */
4442 evtimer_del(&clockevent);
4443 } else {
4444 initialized = true;
4445 /* process_started is initialized to time() - 2. We initialize to 1 so
4446 * flush_all won't underflow during tests. */
4447 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4448 struct timespec ts;
4449 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
4450 monotonic = true;
4451 monotonic_start = ts.tv_sec - 2;
4452 }
4453 #endif
4454 }
4455
4456 evtimer_set(&clockevent, clock_handler, 0);
4457 event_base_set(main_base, &clockevent);
4458 evtimer_add(&clockevent, &t);
4459
4460 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4461 if (monotonic) {
4462 struct timespec ts;
4463 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
4464 return;
4465 current_time = (rel_time_t) (ts.tv_sec - monotonic_start);
4466 return;
4467 }
4468 #endif
4469 {
4470 struct timeval tv;
4471 gettimeofday(&tv, NULL);
4472 current_time = (rel_time_t) (tv.tv_sec - process_started);
4473 }
4474 }
4475
4476 static void usage(void) {
4477 printf(RPACKAGE " " RVERSION "\n");
4478 printf("-p <num> TCP port number to listen on (default: 11211)\n"
4479 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
4480 "-s <file> UNIX socket path to listen on (disables network support)\n"
4481 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
4482 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
4483 " <addr> may be specified as host:port. If you don't specify\n"
4484 " a port number, the value you specified with -p or -U is\n"
4485 " used. You may specify multiple addresses separated by comma\n"
4486 " or by using -l multiple times\n"
4487
4488 "-d run as a daemon\n"
4489 "-r maximize core file limit\n"
4490 "-u <username> assume identity of <username> (only when run as root)\n"
4491 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
4492 "-M return error on memory exhausted (rather than removing items)\n"
4493 "-c <num> max simultaneous connections (default: 1024)\n"
4494 "-k lock down all paged memory. Note that there is a\n"
4495 " limit on how much memory you may lock. Trying to\n"
4496 " allocate more than that would fail, so be sure you\n"
4497 " set the limit correctly for the user you started\n"
4498 " the daemon with (not for -u <username> user;\n"
4499 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
4500 "-v verbose (print errors/warnings while in event loop)\n"
4501 "-vv very verbose (also print client commands/reponses)\n"
4502 "-vvv extremely verbose (also print internal state transitions)\n"
4503 "-h print this help and exit\n"
4504 "-i print memcached and libevent license\n"
4505 "-P <file> save PID in <file>, only used with -d option\n"
4506 "-f <factor> chunk size growth factor (default: 1.25)\n"
4507 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
4508 printf("-L Try to use large memory pages (if available). Increasing\n"
4509 " the memory page size could reduce the number of TLB misses\n"
4510 " and improve the performance. In order to get large pages\n"
4511 " from the OS, memcached will allocate the total item-cache\n"
4512 " in one large chunk.\n");
4513 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
4514 " This is used for per-prefix stats reporting. The default is\n"
4515 " \":\" (colon). If this option is specified, stats collection\n"
4516 " is turned on automatically; if not, then it may be turned on\n"
4517 " by sending the \"stats detail on\" command to the server.\n");
4518 printf("-t <num> number of threads to use (default: 4)\n");
4519 printf("-R Maximum number of requests per event, limits the number of\n"
4520 " requests process for a given connection to prevent \n"
4521 " starvation (default: 20)\n");
4522 printf("-C Disable use of CAS\n");
4523 printf("-b Set the backlog queue limit (default: 1024)\n");
4524 printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
4525 printf("-I Override the size of each slab page. Adjusts max item size\n"
4526 " (default: 1mb, min: 1k, max: 128m)\n");
4527 #ifdef ENABLE_SASL
4528 printf("-S Turn on Sasl authentication\n");
4529 #endif
4530 printf("-o Comma separated list of extended or experimental options\n"
4531 " - (EXPERIMENTAL) maxconns_fast: immediately close new\n"
4532 " connections if over maxconns limit\n"
4533 " - hashpower: An integer multiplier for how large the hash\n"
4534 " table should be. Can be grown at runtime if not big enough.\n"
4535 " Set this based on \"STAT hash_power_level\" before a \n"
4536 " restart.\n"
4537 );
4538 return;
4539 }
4540
4541 static void usage_license(void) {
4542 printf(RPACKAGE " " RVERSION "\n\n");
4543 printf(
4544 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
4545 "All rights reserved.\n"
4546 "\n"
4547 "Redistribution and use in source and binary forms, with or without\n"
4548 "modification, are permitted provided that the following conditions are\n"
4549 "met:\n"
4550 "\n"
4551 " * Redistributions of source code must retain the above copyright\n"
4552 "notice, this list of conditions and the following disclaimer.\n"
4553 "\n"
4554 " * Redistributions in binary form must reproduce the above\n"
4555 "copyright notice, this list of conditions and the following disclaimer\n"
4556 "in the documentation and/or other materials provided with the\n"
4557 "distribution.\n"
4558 "\n"
4559 " * Neither the name of the Danga Interactive nor the names of its\n"
4560 "contributors may be used to endorse or promote products derived from\n"
4561 "this software without specific prior written permission.\n"
4562 "\n"
4563 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
4564 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
4565 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
4566 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
4567 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
4568 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
4569 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4570 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4571 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4572 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
4573 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4574 "\n"
4575 "\n"
4576 "This product includes software developed by Niels Provos.\n"
4577 "\n"
4578 "[ libevent ]\n"
4579 "\n"
4580 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
4581 "All rights reserved.\n"
4582 "\n"
4583 "Redistribution and use in source and binary forms, with or without\n"
4584 "modification, are permitted provided that the following conditions\n"
4585 "are met:\n"
4586 "1. Redistributions of source code must retain the above copyright\n"
4587 " notice, this list of conditions and the following disclaimer.\n"
4588 "2. Redistributions in binary form must reproduce the above copyright\n"
4589 " notice, this list of conditions and the following disclaimer in the\n"
4590 " documentation and/or other materials provided with the distribution.\n"
4591 "3. All advertising materials mentioning features or use of this software\n"
4592 " must display the following acknowledgement:\n"
4593 " This product includes software developed by Niels Provos.\n"
4594 "4. The name of the author may not be used to endorse or promote products\n"
4595 " derived from this software without specific prior written permission.\n"
4596 "\n"
4597 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
4598 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
4599 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
4600 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
4601 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
4602 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4603 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4604 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4605 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
4606 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4607 );
4608
4609 return;
4610 }
4611
4612 static void save_pid(const char *pid_file) {
4613 FILE *fp;
4614 if (access(pid_file, F_OK) == 0) {
4615 if ((fp = fopen(pid_file, "r")) != NULL) {
4616 char buffer[1024];
4617 if (fgets(buffer, sizeof(buffer), fp) != NULL) {
4618 unsigned int pid;
4619 if (safe_strtoul(buffer, &pid) && kill((pid_t)pid, 0) == 0) {
4620 fprintf(stderr, "WARNING: The pid file contained the following (running) pid: %u\n", pid);
4621 }
4622 }
4623 fclose(fp);
4624 }
4625 }
4626
4627 if ((fp = fopen(pid_file, "w")) == NULL) {
4628 vperror("Could not open the pid file %s for writing", pid_file);
4629 return;
4630 }
4631
4632 fprintf(fp,"%ld\n", (long)getpid());
4633 if (fclose(fp) == -1) {
4634 vperror("Could not close the pid file %s", pid_file);
4635 }
4636 }
4637
4638 static void remove_pidfile(const char *pid_file) {
4639 if (pid_file == NULL)
4640 return;
4641
4642 if (unlink(pid_file) != 0) {
4643 vperror("Could not remove the pid file %s", pid_file);
4644 }
4645
4646 }
4647
4648 static void sig_handler(const int sig) {
4649 printf("SIGINT handled.\n");
4650 exit(EXIT_SUCCESS);
4651 }
4652
4653 #ifndef HAVE_SIGIGNORE
4654 static int sigignore(int sig) {
4655 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
4656
4657 if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
4658 return -1;
4659 }
4660 return 0;
4661 }
4662 #endif
4663
4664
4665 /*
4666 * On systems that supports multiple page sizes we may reduce the
4667 * number of TLB-misses by using the biggest available page size
4668 */
4669 static int enable_large_pages(void) {
4670 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
4671 int ret = -1;
4672 size_t sizes[32];
4673 int avail = getpagesizes(sizes, 32);
4674 if (avail != -1) {
4675 size_t max = sizes[0];
4676 struct memcntl_mha arg = {0};
4677 int ii;
4678
4679 for (ii = 1; ii < avail; ++ii) {
4680 if (max < sizes[ii]) {
4681 max = sizes[ii];
4682 }
4683 }
4684
4685 arg.mha_flags = 0;
4686 arg.mha_pagesize = max;
4687 arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
4688
4689 if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
4690 fprintf(stderr, "Failed to set large pages: %s\n",
4691 strerror(errno));
4692 fprintf(stderr, "Will use default page size\n");
4693 } else {
4694 ret = 0;
4695 }
4696 } else {
4697 fprintf(stderr, "Failed to get supported pagesizes: %s\n",
4698 strerror(errno));
4699 fprintf(stderr, "Will use default page size\n");
4700 }
4701
4702 return ret;
4703 #else
4704 return 0;
4705 #endif
4706 }
4707
4708 /**
4709 * Do basic sanity check of the runtime environment
4710 * @return true if no errors found, false if we can't use this env
4711 */
4712 static bool sanitycheck(void) {
4713 /* One of our biggest problems is old and bogus libevents */
4714 const char *ever = event_get_version();
4715 if (ever != NULL) {
4716 if (strncmp(ever, "1.", 2) == 0) {
4717 /* Require at least 1.3 (that's still a couple of years old) */
4718 if ((ever[2] == '1' || ever[2] == '2') && !isdigit(ever[3])) {
4719 fprintf(stderr, "You are using libevent %s.\nPlease upgrade to"
4720 " a more recent version (1.3 or newer)\n",
4721 event_get_version());
4722 return false;
4723 }
4724 }
4725 }
4726
4727 return true;
4728 }
4729
4730 int main (int argc, char **argv) {
4731 int c;
4732 bool lock_memory = false;
4733 bool do_daemonize = false;
4734 bool preallocate = false;
4735 int maxcore = 0;
4736 char *username = NULL;
4737 char *pid_file = NULL;
4738 struct passwd *pw;
4739 struct rlimit rlim;
4740 char unit = '\0';
4741 int size_max = 0;
4742 int retval = EXIT_SUCCESS;
4743 /* listening sockets */
4744 static int *l_socket = NULL;
4745
4746 /* udp socket */
4747 static int *u_socket = NULL;
4748 bool protocol_specified = false;
4749 bool tcp_specified = false;
4750 bool udp_specified = false;
4751
4752 char *subopts;
4753 char *subopts_value;
4754 enum {
4755 MAXCONNS_FAST = 0,
4756 HASHPOWER_INIT,
4757 SLAB_REASSIGN,
4758 SLAB_AUTOMOVE
4759 };
4760 char *const subopts_tokens[] = {
4761 [MAXCONNS_FAST] = (char*)"maxconns_fast",
4762 [HASHPOWER_INIT] = (char*)"hashpower",
4763 [SLAB_REASSIGN] = (char*)"slab_reassign",
4764 [SLAB_AUTOMOVE] = (char*)"slab_automove",
4765 NULL
4766 };
4767
4768 if (!sanitycheck()) {
4769 return EX_OSERR;
4770 }
4771
4772 /* handle SIGINT */
4773 signal(SIGINT, sig_handler);
4774
4775 /* init settings */
4776 settings_init();
4777
4778 /* set stderr non-buffering (for running under, say, daemontools) */
4779 setbuf(stderr, NULL);
4780
4781 /* process arguments */
4782 while (-1 != (c = getopt(argc, argv,
4783 "a:" /* access mask for unix socket */
4784 "p:" /* TCP port number to listen on */
4785 "s:" /* unix socket path to listen on */
4786 "U:" /* UDP port number to listen on */
4787 "m:" /* max memory to use for items in megabytes */
4788 "M" /* return error on memory exhausted */
4789 "c:" /* max simultaneous connections */
4790 "k" /* lock down all paged memory */
4791 "hi" /* help, licence info */
4792 "r" /* maximize core file limit */
4793 "v" /* verbose */
4794 "d" /* daemon mode */
4795 "l:" /* interface to listen on */
4796 "u:" /* user identity to run as */
4797 "P:" /* save PID in file */
4798 "f:" /* factor? */
4799 "n:" /* minimum space allocated for key+value+flags */
4800 "t:" /* threads */
4801 "D:" /* prefix delimiter? */
4802 "L" /* Large memory pages */
4803 "R:" /* max requests per event */
4804 "C" /* Disable use of CAS */
4805 "b:" /* backlog queue limit */
4806 "B:" /* Binding protocol */
4807 "I:" /* Max item size */
4808 "S" /* Sasl ON */
4809 "o:" /* Extended generic options */
4810 ))) {
4811 switch (c) {
4812 case 'a':
4813 /* access for unix domain socket, as octal mask (like chmod)*/
4814 settings.access= strtol(optarg,NULL,8);
4815 break;
4816
4817 case 'U':
4818 settings.udpport = atoi(optarg);
4819 udp_specified = true;
4820 break;
4821 case 'p':
4822 settings.port = atoi(optarg);
4823 tcp_specified = true;
4824 break;
4825 case 's':
4826 settings.socketpath = optarg;
4827 break;
4828 case 'm':
4829 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
4830 break;
4831 case 'M':
4832 settings.evict_to_free = 0;
4833 break;
4834 case 'c':
4835 settings.maxconns = atoi(optarg);
4836 break;
4837 case 'h':
4838 usage();
4839 exit(EXIT_SUCCESS);
4840 case 'i':
4841 usage_license();
4842 exit(EXIT_SUCCESS);
4843 case 'k':
4844 lock_memory = true;
4845 break;
4846 case 'v':
4847 settings.verbose++;
4848 break;
4849 case 'l':
4850 if (settings.inter != NULL) {
4851 size_t len = strlen(settings.inter) + strlen(optarg) + 2;
4852 char *p = malloc(len);
4853 if (p == NULL) {
4854 fprintf(stderr, "Failed to allocate memory\n");
4855 return 1;
4856 }
4857 snprintf(p, len, "%s,%s", settings.inter, optarg);
4858 free(settings.inter);
4859 settings.inter = p;
4860 } else {
4861 settings.inter= strdup(optarg);
4862 }
4863 break;
4864 case 'd':
4865 do_daemonize = true;
4866 break;
4867 case 'r':
4868 maxcore = 1;
4869 break;
4870 case 'R':
4871 settings.reqs_per_event = atoi(optarg);
4872 if (settings.reqs_per_event == 0) {
4873 fprintf(stderr, "Number of requests per event must be greater than 0\n");
4874 return 1;
4875 }
4876 break;
4877 case 'u':
4878 username = optarg;
4879 break;
4880 case 'P':
4881 pid_file = optarg;
4882 break;
4883 case 'f':
4884 settings.factor = atof(optarg);
4885 if (settings.factor <= 1.0) {
4886 fprintf(stderr, "Factor must be greater than 1\n");
4887 return 1;
4888 }
4889 break;
4890 case 'n':
4891 settings.chunk_size = atoi(optarg);
4892 if (settings.chunk_size == 0) {
4893 fprintf(stderr, "Chunk size must be greater than 0\n");
4894 return 1;
4895 }
4896 break;
4897 case 't':
4898 settings.num_threads = atoi(optarg);
4899 if (settings.num_threads <= 0) {
4900 fprintf(stderr, "Number of threads must be greater than 0\n");
4901 return 1;
4902 }
4903 /* There're other problems when you get above 64 threads.
4904 * In the future we should portably detect # of cores for the
4905 * default.
4906 */
4907 if (settings.num_threads > 64) {
4908 fprintf(stderr, "WARNING: Setting a high number of worker"
4909 "threads is not recommended.\n"
4910 " Set this value to the number of cores in"
4911 " your machine or less.\n");
4912 }
4913 break;
4914 case 'D':
4915 if (! optarg || ! optarg[0]) {
4916 fprintf(stderr, "No delimiter specified\n");
4917 return 1;
4918 }
4919 settings.prefix_delimiter = optarg[0];
4920 settings.detail_enabled = 1;
4921 break;
4922 case 'L' :
4923 if (enable_large_pages() == 0) {
4924 preallocate = true;
4925 }
4926 break;
4927 case 'C' :
4928 settings.use_cas = false;
4929 break;
4930 case 'b' :
4931 settings.backlog = atoi(optarg);
4932 break;
4933 case 'B':
4934 protocol_specified = true;
4935 if (strcmp(optarg, "auto") == 0) {
4936 settings.binding_protocol = negotiating_prot;
4937 } else if (strcmp(optarg, "binary") == 0) {
4938 settings.binding_protocol = binary_prot;
4939 } else if (strcmp(optarg, "ascii") == 0) {
4940 settings.binding_protocol = ascii_prot;
4941 } else {
4942 fprintf(stderr, "Invalid value for binding protocol: %s\n"
4943 " -- should be one of auto, binary, or ascii\n", optarg);
4944 exit(EX_USAGE);
4945 }
4946 break;
4947 case 'I':
4948 unit = optarg[strlen(optarg)-1];
4949 if (unit == 'k' || unit == 'm' ||
4950 unit == 'K' || unit == 'M') {
4951 optarg[strlen(optarg)-1] = '\0';
4952 size_max = atoi(optarg);
4953 if (unit == 'k' || unit == 'K')
4954 size_max *= 1024;
4955 if (unit == 'm' || unit == 'M')
4956 size_max *= 1024 * 1024;
4957 settings.item_size_max = size_max;
4958 } else {
4959 settings.item_size_max = atoi(optarg);
4960 }
4961 if (settings.item_size_max < 1024) {
4962 fprintf(stderr, "Item max size cannot be less than 1024 bytes.\n");
4963 return 1;
4964 }
4965 if (settings.item_size_max > 1024 * 1024 * 128) {
4966 fprintf(stderr, "Cannot set item size limit higher than 128 mb.\n");
4967 return 1;
4968 }
4969 if (settings.item_size_max > 1024 * 1024) {
4970 fprintf(stderr, "WARNING: Setting item max size above 1MB is not"
4971 " recommended!\n"
4972 " Raising this limit increases the minimum memory requirements\n"
4973 " and will decrease your memory efficiency.\n"
4974 );
4975 }
4976 break;
4977 case 'S': /* set Sasl authentication to true. Default is false */
4978 #ifndef ENABLE_SASL
4979 fprintf(stderr, "This server is not built with SASL support.\n");
4980 exit(EX_USAGE);
4981 #endif
4982 settings.sasl = true;
4983 break;
4984 case 'o': /* It's sub-opts time! */
4985 subopts = optarg;
4986
4987 while (*subopts != '\0') {
4988
4989 switch (getsubopt(&subopts, subopts_tokens, &subopts_value)) {
4990 case MAXCONNS_FAST:
4991 settings.maxconns_fast = true;
4992 break;
4993 case HASHPOWER_INIT:
4994 if (subopts_value == NULL) {
4995 fprintf(stderr, "Missing numeric argument for hashpower\n");
4996 return 1;
4997 }
4998 settings.hashpower_init = atoi(subopts_value);
4999 if (settings.hashpower_init < 12) {
5000 fprintf(stderr, "Initial hashtable multiplier of %d is too low\n",
5001 settings.hashpower_init);
5002 return 1;
5003 } else if (settings.hashpower_init > 64) {
5004 fprintf(stderr, "Initial hashtable multiplier of %d is too high\n"
5005 "Choose a value based on \"STAT hash_power_level\" from a running instance\n",
5006 settings.hashpower_init);
5007 return 1;
5008 }
5009 break;
5010 case SLAB_REASSIGN:
5011 settings.slab_reassign = true;
5012 break;
5013 case SLAB_AUTOMOVE:
5014 settings.slab_automove = true;
5015 break;
5016 default:
5017 printf("Illegal suboption \"%s\"\n", subopts_value);
5018 return 1;
5019 }
5020
5021 }
5022 break;
5023 default:
5024 fprintf(stderr, "Illegal argument \"%c\"\n", c);
5025 return 1;
5026 }
5027 }
5028
5029 /*
5030 * Use one workerthread to serve each UDP port if the user specified
5031 * multiple ports
5032 */
5033 if (settings.inter != NULL && strchr(settings.inter, ',')) {
5034 settings.num_threads_per_udp = 1;
5035 } else {
5036 settings.num_threads_per_udp = settings.num_threads;
5037 }
5038
5039 if (settings.sasl) {
5040 if (!protocol_specified) {
5041 settings.binding_protocol = binary_prot;
5042 } else {
5043 if (settings.binding_protocol != binary_prot) {
5044 fprintf(stderr, "ERROR: You cannot allow the ASCII protocol while using SASL.\n");
5045 exit(EX_USAGE);
5046 }
5047 }
5048 }
5049
5050 if (tcp_specified && !udp_specified) {
5051 settings.udpport = settings.port;
5052 } else if (udp_specified && !tcp_specified) {
5053 settings.port = settings.udpport;
5054 }
5055
5056 if (maxcore != 0) {
5057 struct rlimit rlim_new;
5058 /*
5059 * First try raising to infinity; if that fails, try bringing
5060 * the soft limit to the hard.
5061 */
5062 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
5063 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
5064 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
5065 /* failed. try raising just to the old max */
5066 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
5067 (void)setrlimit(RLIMIT_CORE, &rlim_new);
5068 }
5069 }
5070 /*
5071 * getrlimit again to see what we ended up with. Only fail if
5072 * the soft limit ends up 0, because then no core files will be
5073 * created at all.
5074 */
5075
5076 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
5077 fprintf(stderr, "failed to ensure corefile creation\n");
5078 exit(EX_OSERR);
5079 }
5080 }
5081
5082 /*
5083 * If needed, increase rlimits to allow as many connections
5084 * as needed.
5085 */
5086
5087 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
5088 fprintf(stderr, "failed to getrlimit number of files\n");
5089 exit(EX_OSERR);
5090 } else {
5091 rlim.rlim_cur = settings.maxconns;
5092 rlim.rlim_max = settings.maxconns;
5093 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
5094 fprintf(stderr, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n");
5095 exit(EX_OSERR);
5096 }
5097 }
5098
5099 /* lose root privileges if we have them */
5100 if (getuid() == 0 || geteuid() == 0) {
5101 if (username == 0 || *username == '\0') {
5102 fprintf(stderr, "can't run as root without the -u switch\n");
5103 exit(EX_USAGE);
5104 }
5105 if ((pw = getpwnam(username)) == 0) {
5106 fprintf(stderr, "can't find the user %s to switch to\n", username);
5107 exit(EX_NOUSER);
5108 }
5109 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
5110 fprintf(stderr, "failed to assume identity of user %s\n", username);
5111 exit(EX_OSERR);
5112 }
5113 }
5114
5115 /* Initialize Sasl if -S was specified */
5116 if (settings.sasl) {
5117 init_sasl();
5118 }
5119
5120 /* daemonize if requested */
5121 /* if we want to ensure our ability to dump core, don't chdir to / */
5122 if (do_daemonize) {
5123 if (sigignore(SIGHUP) == -1) {
5124 perror("Failed to ignore SIGHUP");
5125 }
5126 if (daemonize(maxcore, settings.verbose) == -1) {
5127 fprintf(stderr, "failed to daemon() in order to daemonize\n");
5128 exit(EXIT_FAILURE);
5129 }
5130 }
5131
5132 /* lock paged memory if needed */
5133 if (lock_memory) {
5134 #ifdef HAVE_MLOCKALL
5135 int res = mlockall(MCL_CURRENT | MCL_FUTURE);
5136 if (res != 0) {
5137 fprintf(stderr, "warning: -k invalid, mlockall() failed: %s\n",
5138 strerror(errno));
5139 }
5140 #else
5141 fprintf(stderr, "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
5142 #endif
5143 }
5144
5145 /* initialize main thread libevent instance */
5146 main_base = event_init();
5147
5148 /* initialize other stuff */
5149 stats_init();
5150 assoc_init(settings.hashpower_init);
5151 conn_init();
5152 slabs_init(settings.maxbytes, settings.factor, preallocate);
5153
5154 /*
5155 * ignore SIGPIPE signals; we can use errno == EPIPE if we
5156 * need that information
5157 */
5158 if (sigignore(SIGPIPE) == -1) {
5159 perror("failed to ignore SIGPIPE; sigaction");
5160 exit(EX_OSERR);
5161 }
5162 /* start up worker threads if MT mode */
5163 thread_init(settings.num_threads, main_base);
5164
5165 if (start_assoc_maintenance_thread() == -1) {
5166 exit(EXIT_FAILURE);
5167 }
5168
5169 if (settings.slab_reassign &&
5170 start_slab_maintenance_thread() == -1) {
5171 exit(EXIT_FAILURE);
5172 }
5173
5174 /* initialise clock event */
5175 clock_handler(0, 0, 0);
5176
5177 /* create unix mode sockets after dropping privileges */
5178 if (settings.socketpath != NULL) {
5179 errno = 0;
5180 if (server_socket_unix(settings.socketpath,settings.access)) {
5181 vperror("failed to listen on UNIX socket: %s", settings.socketpath);
5182 exit(EX_OSERR);
5183 }
5184 }
5185
5186 /* create the listening socket, bind it, and init */
5187 if (settings.socketpath == NULL) {
5188 const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
5189 char temp_portnumber_filename[PATH_MAX];
5190 FILE *portnumber_file = NULL;
5191
5192 if (portnumber_filename != NULL) {
5193 snprintf(temp_portnumber_filename,
5194 sizeof(temp_portnumber_filename),
5195 "%s.lck", portnumber_filename);
5196
5197 portnumber_file = fopen(temp_portnumber_filename, "a");
5198 if (portnumber_file == NULL) {
5199 fprintf(stderr, "Failed to open \"%s\": %s\n",
5200 temp_portnumber_filename, strerror(errno));
5201 }
5202 }
5203
5204 errno = 0;
5205 if (settings.port && server_sockets(settings.port, tcp_transport,
5206 portnumber_file)) {
5207 vperror("failed to listen on TCP port %d", settings.port);
5208 exit(EX_OSERR);
5209 }
5210
5211 /*
5212 * initialization order: first create the listening sockets
5213 * (may need root on low ports), then drop root if needed,
5214 * then daemonise if needed, then init libevent (in some cases
5215 * descriptors created by libevent wouldn't survive forking).
5216 */
5217
5218 /* create the UDP listening socket and bind it */
5219 errno = 0;
5220 if (settings.udpport && server_sockets(settings.udpport, udp_transport,
5221 portnumber_file)) {
5222 vperror("failed to listen on UDP port %d", settings.udpport);
5223 exit(EX_OSERR);
5224 }
5225
5226 if (portnumber_file) {
5227 fclose(portnumber_file);
5228 rename(temp_portnumber_filename, portnumber_filename);
5229 }
5230 }
5231
5232 /* Give the sockets a moment to open. I know this is dumb, but the error
5233 * is only an advisory.
5234 */
5235 usleep(1000);
5236 if (stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
5237 fprintf(stderr, "Maxconns setting is too low, use -c to increase.\n");
5238 exit(EXIT_FAILURE);
5239 }
5240
5241 if (pid_file != NULL) {
5242 save_pid(pid_file);
5243 }
5244
5245 /* Drop privileges no longer needed */
5246 drop_privileges();
5247
5248 /* enter the event loop */
5249 if (event_base_loop(main_base, 0) != 0) {
5250 retval = EXIT_FAILURE;
5251 }
5252
5253 stop_assoc_maintenance_thread();
5254
5255 /* remove the PID file if we're a daemon */
5256 #if 0
5257 if (do_daemonize)
5258 remove_pidfile(pid_file);
5259 #endif
5260 /* Clean up strdup() call for bind() address */
5261 if (settings.inter)
5262 free(settings.inter);
5263 if (l_socket)
5264 free(l_socket);
5265 if (u_socket)
5266 free(u_socket);
5267
5268 return retval;
5269 }