Update hardening rules.
[awesomized/libmemcached] / memcached / memcached.c
1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * memcached - memory caching daemon
4 *
5 * http://www.danga.com/memcached/
6 *
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
8 *
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
11 *
12 * Authors:
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
15 */
16 #include "memcached.h"
17 #include <sys/stat.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <signal.h>
21 #include <sys/resource.h>
22 #include <sys/uio.h>
23 #include <ctype.h>
24 #include <stdarg.h>
25
26 /* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h. */
28 #ifndef _P1003_1B_VISIBLE
29 #define _P1003_1B_VISIBLE
30 #endif
31 /* need this to get IOV_MAX on some platforms. */
32 #ifndef __need_IOV_MAX
33 #define __need_IOV_MAX
34 #endif
35 #include <pwd.h>
36 #include <sys/mman.h>
37 #include <fcntl.h>
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
40 #include <errno.h>
41 #include <stdlib.h>
42 #include <stdio.h>
43 #include <string.h>
44 #include <time.h>
45 #include <assert.h>
46 #include <limits.h>
47 #include <sysexits.h>
48 #include <stddef.h>
49
50 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
51 #ifndef IOV_MAX
52 #if defined(__FreeBSD__) || defined(__APPLE__)
53 # define IOV_MAX 1024
54 #endif
55 #endif
56
57 /*
58 * forward declarations
59 */
60 static void drive_machine(conn *c);
61 static int new_socket(struct addrinfo *ai);
62 static int try_read_command(conn *c);
63
64 enum try_read_result {
65 READ_DATA_RECEIVED,
66 READ_NO_DATA_RECEIVED,
67 READ_ERROR, /** an error occured (on the socket) (or client closed connection) */
68 READ_MEMORY_ERROR /** failed to allocate more memory */
69 };
70
71 static enum try_read_result try_read_network(conn *c);
72 static enum try_read_result try_read_udp(conn *c);
73
74 static void conn_set_state(conn *c, enum conn_states state);
75
76 /* stats */
77 static void stats_init(void);
78 static void server_stats(ADD_STAT add_stats, conn *c);
79 static void process_stat_settings(ADD_STAT add_stats, void *c);
80
81
82 /* defaults */
83 static void settings_init(void);
84
85 /* event handling, network IO */
86 static void event_handler(const int fd, const short which, void *arg);
87 static void conn_close(conn *c);
88 static void conn_init(void);
89 static bool update_event(conn *c, const int new_flags);
90 static void complete_nread(conn *c);
91 static void process_command(conn *c, char *command);
92 static void write_and_free(conn *c, char *buf, int bytes);
93 static int ensure_iov_space(conn *c);
94 static int add_iov(conn *c, const void *buf, int len);
95 static int add_msghdr(conn *c);
96
97
98 static void conn_free(conn *c);
99
100 /** exported globals **/
101 struct stats stats;
102 struct settings settings;
103 time_t process_started; /* when the process was started */
104
105 struct slab_rebalance slab_rebal;
106 volatile int slab_rebalance_signal;
107
108 /** file scope variables **/
109 static conn *listen_conn = NULL;
110 static struct event_base *main_base;
111
112 enum transmit_result {
113 TRANSMIT_COMPLETE, /** All done writing. */
114 TRANSMIT_INCOMPLETE, /** More data remaining to write. */
115 TRANSMIT_SOFT_ERROR, /** Can't write any more right now. */
116 TRANSMIT_HARD_ERROR /** Can't write (c->state is set to conn_closing) */
117 };
118
119 static enum transmit_result transmit(conn *c);
120
121 /* This reduces the latency without adding lots of extra wiring to be able to
122 * notify the listener thread of when to listen again.
123 * Also, the clock timer could be broken out into its own thread and we
124 * can block the listener via a condition.
125 */
126 static volatile bool allow_new_conns = true;
127 static struct event maxconnsevent;
128 static void maxconns_handler(const int fd, const short which, void *arg) {
129 struct timeval t = {.tv_sec = 0, .tv_usec = 10000};
130
131 if (fd == -42 || allow_new_conns == false) {
132 /* reschedule in 10ms if we need to keep polling */
133 evtimer_set(&maxconnsevent, maxconns_handler, 0);
134 event_base_set(main_base, &maxconnsevent);
135 evtimer_add(&maxconnsevent, &t);
136 } else {
137 evtimer_del(&maxconnsevent);
138 accept_new_conns(true);
139 }
140 }
141
142 #define REALTIME_MAXDELTA 60*60*24*30
143
144 /*
145 * given time value that's either unix time or delta from current unix time, return
146 * unix time. Use the fact that delta can't exceed one month (and real time value can't
147 * be that low).
148 */
149 static rel_time_t realtime(const time_t exptime) {
150 /* no. of seconds in 30 days - largest possible delta exptime */
151
152 if (exptime == 0) return 0; /* 0 means never expire */
153
154 if (exptime > REALTIME_MAXDELTA) {
155 /* if item expiration is at/before the server started, give it an
156 expiration time of 1 second after the server started.
157 (because 0 means don't expire). without this, we'd
158 underflow and wrap around to some large value way in the
159 future, effectively making items expiring in the past
160 really expiring never */
161 if (exptime <= process_started)
162 return (rel_time_t)1;
163 return (rel_time_t)(exptime - process_started);
164 } else {
165 return (rel_time_t)(exptime + current_time);
166 }
167 }
168
169 static void stats_init(void) {
170 stats.curr_items = stats.total_items = stats.curr_conns = stats.total_conns = stats.conn_structs = 0;
171 stats.get_cmds = stats.set_cmds = stats.get_hits = stats.get_misses = stats.evictions = stats.reclaimed = 0;
172 stats.touch_cmds = stats.touch_misses = stats.touch_hits = stats.rejected_conns = 0;
173 stats.curr_bytes = stats.listen_disabled_num = 0;
174 stats.hash_power_level = stats.hash_bytes = stats.hash_is_expanding = 0;
175 stats.expired_unfetched = stats.evicted_unfetched = 0;
176 stats.slabs_moved = 0;
177 stats.accepting_conns = true; /* assuming we start in this state. */
178 stats.slab_reassign_running = false;
179
180 /* make the time we started always be 2 seconds before we really
181 did, so time(0) - time.started is never zero. if so, things
182 like 'settings.oldest_live' which act as booleans as well as
183 values are now false in boolean context... */
184 process_started = time(0) - 2;
185 stats_prefix_init();
186 }
187
188 static void stats_reset(void) {
189 STATS_LOCK();
190 stats.total_items = stats.total_conns = 0;
191 stats.rejected_conns = 0;
192 stats.evictions = 0;
193 stats.reclaimed = 0;
194 stats.listen_disabled_num = 0;
195 stats_prefix_clear();
196 STATS_UNLOCK();
197 threadlocal_stats_reset();
198 item_stats_reset();
199 }
200
201 static void settings_init(void) {
202 settings.use_cas = true;
203 settings.access = 0700;
204 settings.port = 11211;
205 settings.udpport = 11211;
206 /* By default this string should be NULL for getaddrinfo() */
207 settings.inter = NULL;
208 settings.maxbytes = 64 * 1024 * 1024; /* default is 64MB */
209 settings.maxconns = 1024; /* to limit connections-related memory to about 5MB */
210 settings.verbose = 0;
211 settings.oldest_live = 0;
212 settings.evict_to_free = 1; /* push old items out of cache when memory runs out */
213 settings.socketpath = NULL; /* by default, not using a unix socket */
214 settings.factor = 1.25;
215 settings.chunk_size = 48; /* space for a modest key and value */
216 settings.num_threads = 4; /* N workers */
217 settings.num_threads_per_udp = 0;
218 settings.prefix_delimiter = ':';
219 settings.detail_enabled = 0;
220 settings.reqs_per_event = 20;
221 settings.backlog = 1024;
222 settings.binding_protocol = negotiating_prot;
223 settings.item_size_max = 1024 * 1024; /* The famous 1MB upper limit. */
224 settings.maxconns_fast = false;
225 settings.hashpower_init = 0;
226 settings.slab_reassign = false;
227 settings.slab_automove = 0;
228 }
229
230 /*
231 * Adds a message header to a connection.
232 *
233 * Returns 0 on success, -1 on out-of-memory.
234 */
235 static int add_msghdr(conn *c)
236 {
237 struct msghdr *msg;
238
239 assert(c != NULL);
240
241 if (c->msgsize == c->msgused) {
242 msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
243 if (! msg)
244 return -1;
245 c->msglist = msg;
246 c->msgsize *= 2;
247 }
248
249 msg = c->msglist + c->msgused;
250
251 /* this wipes msg_iovlen, msg_control, msg_controllen, and
252 msg_flags, the last 3 of which aren't defined on solaris: */
253 memset(msg, 0, sizeof(struct msghdr));
254
255 msg->msg_iov = &c->iov[c->iovused];
256
257 if (c->request_addr_size > 0) {
258 msg->msg_name = &c->request_addr;
259 msg->msg_namelen = c->request_addr_size;
260 }
261
262 c->msgbytes = 0;
263 c->msgused++;
264
265 if (IS_UDP(c->transport)) {
266 /* Leave room for the UDP header, which we'll fill in later. */
267 return add_iov(c, NULL, UDP_HEADER_SIZE);
268 }
269
270 return 0;
271 }
272
273
274 /*
275 * Free list management for connections.
276 */
277
278 static conn **freeconns;
279 static int freetotal;
280 static int freecurr;
281 /* Lock for connection freelist */
282 static pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
283
284
285 static void conn_init(void) {
286 freetotal = 200;
287 freecurr = 0;
288 if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {
289 fprintf(stderr, "Failed to allocate connection structures\n");
290 }
291 return;
292 }
293
294 /*
295 * Returns a connection from the freelist, if any.
296 */
297 conn *conn_from_freelist() {
298 conn *c;
299
300 pthread_mutex_lock(&conn_lock);
301 if (freecurr > 0) {
302 c = freeconns[--freecurr];
303 } else {
304 c = NULL;
305 }
306 pthread_mutex_unlock(&conn_lock);
307
308 return c;
309 }
310
311 /*
312 * Adds a connection to the freelist. 0 = success.
313 */
314 bool conn_add_to_freelist(conn *c) {
315 bool ret = true;
316 pthread_mutex_lock(&conn_lock);
317 if (freecurr < freetotal) {
318 freeconns[freecurr++] = c;
319 ret = false;
320 } else {
321 /* try to enlarge free connections array */
322 size_t newsize = freetotal * 2;
323 conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
324 if (new_freeconns) {
325 freetotal = newsize;
326 freeconns = new_freeconns;
327 freeconns[freecurr++] = c;
328 ret = false;
329 }
330 }
331 pthread_mutex_unlock(&conn_lock);
332 return ret;
333 }
334
335 static const char *prot_text(enum protocol prot) {
336 char *rv = "unknown";
337 switch(prot) {
338 case ascii_prot:
339 rv = "ascii";
340 break;
341 case binary_prot:
342 rv = "binary";
343 break;
344 case negotiating_prot:
345 rv = "auto-negotiate";
346 break;
347 }
348 return rv;
349 }
350
351 conn *conn_new(const int sfd, enum conn_states init_state,
352 const int event_flags,
353 const int read_buffer_size, enum network_transport transport,
354 struct event_base *base) {
355 conn *c = conn_from_freelist();
356
357 if (NULL == c) {
358 if (!(c = (conn *)calloc(1, sizeof(conn)))) {
359 fprintf(stderr, "calloc()\n");
360 return NULL;
361 }
362 MEMCACHED_CONN_CREATE(c);
363
364 c->rbuf = c->wbuf = 0;
365 c->ilist = 0;
366 c->suffixlist = 0;
367 c->iov = 0;
368 c->msglist = 0;
369 c->hdrbuf = 0;
370
371 c->rsize = read_buffer_size;
372 c->wsize = DATA_BUFFER_SIZE;
373 c->isize = ITEM_LIST_INITIAL;
374 c->suffixsize = SUFFIX_LIST_INITIAL;
375 c->iovsize = IOV_LIST_INITIAL;
376 c->msgsize = MSG_LIST_INITIAL;
377 c->hdrsize = 0;
378
379 c->rbuf = (char *)malloc((size_t)c->rsize);
380 c->wbuf = (char *)malloc((size_t)c->wsize);
381 c->ilist = (item **)malloc(sizeof(item *) * c->isize);
382 c->suffixlist = (char **)malloc(sizeof(char *) * c->suffixsize);
383 c->iov = (struct iovec *)malloc(sizeof(struct iovec) * c->iovsize);
384 c->msglist = (struct msghdr *)malloc(sizeof(struct msghdr) * c->msgsize);
385
386 if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0 ||
387 c->msglist == 0 || c->suffixlist == 0) {
388 conn_free(c);
389 fprintf(stderr, "malloc()\n");
390 return NULL;
391 }
392
393 STATS_LOCK();
394 stats.conn_structs++;
395 STATS_UNLOCK();
396 }
397
398 c->transport = transport;
399 c->protocol = settings.binding_protocol;
400
401 /* unix socket mode doesn't need this, so zeroed out. but why
402 * is this done for every command? presumably for UDP
403 * mode. */
404 if (!settings.socketpath) {
405 c->request_addr_size = sizeof(c->request_addr);
406 } else {
407 c->request_addr_size = 0;
408 }
409
410 if (settings.verbose > 1) {
411 if (init_state == conn_listening) {
412 fprintf(stderr, "<%d server listening (%s)\n", sfd,
413 prot_text(c->protocol));
414 } else if (IS_UDP(transport)) {
415 fprintf(stderr, "<%d server listening (udp)\n", sfd);
416 } else if (c->protocol == negotiating_prot) {
417 fprintf(stderr, "<%d new auto-negotiating client connection\n",
418 sfd);
419 } else if (c->protocol == ascii_prot) {
420 fprintf(stderr, "<%d new ascii client connection.\n", sfd);
421 } else if (c->protocol == binary_prot) {
422 fprintf(stderr, "<%d new binary client connection.\n", sfd);
423 } else {
424 fprintf(stderr, "<%d new unknown (%d) client connection\n",
425 sfd, c->protocol);
426 assert(false);
427 }
428 }
429
430 c->sfd = sfd;
431 c->state = init_state;
432 c->rlbytes = 0;
433 c->cmd = -1;
434 c->rbytes = c->wbytes = 0;
435 c->wcurr = c->wbuf;
436 c->rcurr = c->rbuf;
437 c->ritem = 0;
438 c->icurr = c->ilist;
439 c->suffixcurr = c->suffixlist;
440 c->ileft = 0;
441 c->suffixleft = 0;
442 c->iovused = 0;
443 c->msgcurr = 0;
444 c->msgused = 0;
445
446 c->write_and_go = init_state;
447 c->write_and_free = 0;
448 c->item = 0;
449
450 c->noreply = false;
451
452 event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
453 event_base_set(base, &c->event);
454 c->ev_flags = event_flags;
455
456 if (event_add(&c->event, 0) == -1) {
457 if (conn_add_to_freelist(c)) {
458 conn_free(c);
459 }
460 perror("event_add");
461 return NULL;
462 }
463
464 STATS_LOCK();
465 stats.curr_conns++;
466 stats.total_conns++;
467 STATS_UNLOCK();
468
469 MEMCACHED_CONN_ALLOCATE(c->sfd);
470
471 return c;
472 }
473
474 static void conn_cleanup(conn *c) {
475 assert(c != NULL);
476
477 if (c->item) {
478 item_remove(c->item);
479 c->item = 0;
480 }
481
482 if (c->ileft != 0) {
483 for (; c->ileft > 0; c->ileft--,c->icurr++) {
484 item_remove(*(c->icurr));
485 }
486 }
487
488 if (c->suffixleft != 0) {
489 for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
490 cache_free(c->thread->suffix_cache, *(c->suffixcurr));
491 }
492 }
493
494 if (c->write_and_free) {
495 free(c->write_and_free);
496 c->write_and_free = 0;
497 }
498
499 if (c->sasl_conn) {
500 assert(settings.sasl);
501 sasl_dispose(&c->sasl_conn);
502 c->sasl_conn = NULL;
503 }
504
505 if (IS_UDP(c->transport)) {
506 conn_set_state(c, conn_read);
507 }
508 }
509
510 /*
511 * Frees a connection.
512 */
513 void conn_free(conn *c) {
514 if (c) {
515 MEMCACHED_CONN_DESTROY(c);
516 if (c->hdrbuf)
517 free(c->hdrbuf);
518 if (c->msglist)
519 free(c->msglist);
520 if (c->rbuf)
521 free(c->rbuf);
522 if (c->wbuf)
523 free(c->wbuf);
524 if (c->ilist)
525 free(c->ilist);
526 if (c->suffixlist)
527 free(c->suffixlist);
528 if (c->iov)
529 free(c->iov);
530 free(c);
531 }
532 }
533
534 static void conn_close(conn *c) {
535 assert(c != NULL);
536
537 /* delete the event, the socket and the conn */
538 event_del(&c->event);
539
540 if (settings.verbose > 1)
541 fprintf(stderr, "<%d connection closed.\n", c->sfd);
542
543 MEMCACHED_CONN_RELEASE(c->sfd);
544 close(c->sfd);
545 pthread_mutex_lock(&conn_lock);
546 allow_new_conns = true;
547 pthread_mutex_unlock(&conn_lock);
548 conn_cleanup(c);
549
550 /* if the connection has big buffers, just free it */
551 if (c->rsize > READ_BUFFER_HIGHWAT || conn_add_to_freelist(c)) {
552 conn_free(c);
553 }
554
555 STATS_LOCK();
556 stats.curr_conns--;
557 STATS_UNLOCK();
558
559 return;
560 }
561
562 /*
563 * Shrinks a connection's buffers if they're too big. This prevents
564 * periodic large "get" requests from permanently chewing lots of server
565 * memory.
566 *
567 * This should only be called in between requests since it can wipe output
568 * buffers!
569 */
570 static void conn_shrink(conn *c) {
571 assert(c != NULL);
572
573 if (IS_UDP(c->transport))
574 return;
575
576 if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
577 char *newbuf;
578
579 if (c->rcurr != c->rbuf)
580 memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);
581
582 newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
583
584 if (newbuf) {
585 c->rbuf = newbuf;
586 c->rsize = DATA_BUFFER_SIZE;
587 }
588 /* TODO check other branch... */
589 c->rcurr = c->rbuf;
590 }
591
592 if (c->isize > ITEM_LIST_HIGHWAT) {
593 item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
594 if (newbuf) {
595 c->ilist = newbuf;
596 c->isize = ITEM_LIST_INITIAL;
597 }
598 /* TODO check error condition? */
599 }
600
601 if (c->msgsize > MSG_LIST_HIGHWAT) {
602 struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
603 if (newbuf) {
604 c->msglist = newbuf;
605 c->msgsize = MSG_LIST_INITIAL;
606 }
607 /* TODO check error condition? */
608 }
609
610 if (c->iovsize > IOV_LIST_HIGHWAT) {
611 struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
612 if (newbuf) {
613 c->iov = newbuf;
614 c->iovsize = IOV_LIST_INITIAL;
615 }
616 /* TODO check return value */
617 }
618 }
619
620 /**
621 * Convert a state name to a human readable form.
622 */
623 static const char *state_text(enum conn_states state) {
624 const char* const statenames[] = { "conn_listening",
625 "conn_new_cmd",
626 "conn_waiting",
627 "conn_read",
628 "conn_parse_cmd",
629 "conn_write",
630 "conn_nread",
631 "conn_swallow",
632 "conn_closing",
633 "conn_mwrite" };
634 return statenames[state];
635 }
636
637 /*
638 * Sets a connection's current state in the state machine. Any special
639 * processing that needs to happen on certain state transitions can
640 * happen here.
641 */
642 static void conn_set_state(conn *c, enum conn_states state) {
643 assert(c != NULL);
644 assert(state >= conn_listening && state < conn_max_state);
645
646 if (state != c->state) {
647 if (settings.verbose > 2) {
648 fprintf(stderr, "%d: going from %s to %s\n",
649 c->sfd, state_text(c->state),
650 state_text(state));
651 }
652
653 if (state == conn_write || state == conn_mwrite) {
654 MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
655 }
656 c->state = state;
657 }
658 }
659
660 /*
661 * Ensures that there is room for another struct iovec in a connection's
662 * iov list.
663 *
664 * Returns 0 on success, -1 on out-of-memory.
665 */
666 static int ensure_iov_space(conn *c) {
667 assert(c != NULL);
668
669 if (c->iovused >= c->iovsize) {
670 int i, iovnum;
671 struct iovec *new_iov = (struct iovec *)realloc(c->iov,
672 (c->iovsize * 2) * sizeof(struct iovec));
673 if (! new_iov)
674 return -1;
675 c->iov = new_iov;
676 c->iovsize *= 2;
677
678 /* Point all the msghdr structures at the new list. */
679 for (i = 0, iovnum = 0; i < c->msgused; i++) {
680 c->msglist[i].msg_iov = &c->iov[iovnum];
681 iovnum += c->msglist[i].msg_iovlen;
682 }
683 }
684
685 return 0;
686 }
687
688
689 /*
690 * Adds data to the list of pending data that will be written out to a
691 * connection.
692 *
693 * Returns 0 on success, -1 on out-of-memory.
694 */
695
696 static int add_iov(conn *c, const void *buf, int len) {
697 struct msghdr *m;
698 int leftover;
699 bool limit_to_mtu;
700
701 assert(c != NULL);
702
703 do {
704 m = &c->msglist[c->msgused - 1];
705
706 /*
707 * Limit UDP packets, and the first payloads of TCP replies, to
708 * UDP_MAX_PAYLOAD_SIZE bytes.
709 */
710 limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
711
712 /* We may need to start a new msghdr if this one is full. */
713 if (m->msg_iovlen == IOV_MAX ||
714 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
715 add_msghdr(c);
716 m = &c->msglist[c->msgused - 1];
717 }
718
719 if (ensure_iov_space(c) != 0)
720 return -1;
721
722 /* If the fragment is too big to fit in the datagram, split it up */
723 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
724 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
725 len -= leftover;
726 } else {
727 leftover = 0;
728 }
729
730 m = &c->msglist[c->msgused - 1];
731 m->msg_iov[m->msg_iovlen].iov_base = (void *)buf;
732 m->msg_iov[m->msg_iovlen].iov_len = len;
733
734 c->msgbytes += len;
735 c->iovused++;
736 m->msg_iovlen++;
737
738 buf = ((char *)buf) + len;
739 len = leftover;
740 } while (leftover > 0);
741
742 return 0;
743 }
744
745
746 /*
747 * Constructs a set of UDP headers and attaches them to the outgoing messages.
748 */
749 static int build_udp_headers(conn *c) {
750 int i;
751 unsigned char *hdr;
752
753 assert(c != NULL);
754
755 if (c->msgused > c->hdrsize) {
756 void *new_hdrbuf;
757 if (c->hdrbuf)
758 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
759 else
760 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
761 if (! new_hdrbuf)
762 return -1;
763 c->hdrbuf = (unsigned char *)new_hdrbuf;
764 c->hdrsize = c->msgused * 2;
765 }
766
767 hdr = c->hdrbuf;
768 for (i = 0; i < c->msgused; i++) {
769 c->msglist[i].msg_iov[0].iov_base = (void*)hdr;
770 c->msglist[i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
771 *hdr++ = c->request_id / 256;
772 *hdr++ = c->request_id % 256;
773 *hdr++ = i / 256;
774 *hdr++ = i % 256;
775 *hdr++ = c->msgused / 256;
776 *hdr++ = c->msgused % 256;
777 *hdr++ = 0;
778 *hdr++ = 0;
779 assert((void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
780 }
781
782 return 0;
783 }
784
785
786 static void out_string(conn *c, const char *str) {
787 size_t len;
788
789 assert(c != NULL);
790
791 if (c->noreply) {
792 if (settings.verbose > 1)
793 fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);
794 c->noreply = false;
795 conn_set_state(c, conn_new_cmd);
796 return;
797 }
798
799 if (settings.verbose > 1)
800 fprintf(stderr, ">%d %s\n", c->sfd, str);
801
802 /* Nuke a partial output... */
803 c->msgcurr = 0;
804 c->msgused = 0;
805 c->iovused = 0;
806 add_msghdr(c);
807
808 len = strlen(str);
809 if ((len + 2) > c->wsize) {
810 /* ought to be always enough. just fail for simplicity */
811 str = "SERVER_ERROR output line too long";
812 len = strlen(str);
813 }
814
815 memcpy(c->wbuf, str, len);
816 memcpy(c->wbuf + len, "\r\n", 2);
817 c->wbytes = len + 2;
818 c->wcurr = c->wbuf;
819
820 conn_set_state(c, conn_write);
821 c->write_and_go = conn_new_cmd;
822 return;
823 }
824
825 /*
826 * we get here after reading the value in set/add/replace commands. The command
827 * has been stored in c->cmd, and the item is ready in c->item.
828 */
829 static void complete_nread_ascii(conn *c) {
830 assert(c != NULL);
831
832 item *it = c->item;
833 int comm = c->cmd;
834 enum store_item_type ret;
835
836 pthread_mutex_lock(&c->thread->stats.mutex);
837 c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
838 pthread_mutex_unlock(&c->thread->stats.mutex);
839
840 if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
841 out_string(c, "CLIENT_ERROR bad data chunk");
842 } else {
843 ret = store_item(it, comm, c);
844
845 #ifdef ENABLE_DTRACE
846 uint64_t cas = ITEM_get_cas(it);
847 switch (c->cmd) {
848 case NREAD_ADD:
849 MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
850 (ret == 1) ? it->nbytes : -1, cas);
851 break;
852 case NREAD_REPLACE:
853 MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
854 (ret == 1) ? it->nbytes : -1, cas);
855 break;
856 case NREAD_APPEND:
857 MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
858 (ret == 1) ? it->nbytes : -1, cas);
859 break;
860 case NREAD_PREPEND:
861 MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
862 (ret == 1) ? it->nbytes : -1, cas);
863 break;
864 case NREAD_SET:
865 MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
866 (ret == 1) ? it->nbytes : -1, cas);
867 break;
868 case NREAD_CAS:
869 MEMCACHED_COMMAND_CAS(c->sfd, ITEM_key(it), it->nkey, it->nbytes,
870 cas);
871 break;
872 }
873 #endif
874
875 switch (ret) {
876 case STORED:
877 out_string(c, "STORED");
878 break;
879 case EXISTS:
880 out_string(c, "EXISTS");
881 break;
882 case NOT_FOUND:
883 out_string(c, "NOT_FOUND");
884 break;
885 case NOT_STORED:
886 out_string(c, "NOT_STORED");
887 break;
888 default:
889 out_string(c, "SERVER_ERROR Unhandled storage type.");
890 }
891
892 }
893
894 item_remove(c->item); /* release the c->item reference */
895 c->item = 0;
896 }
897
898 /**
899 * get a pointer to the start of the request struct for the current command
900 */
901 static void* binary_get_request(conn *c) {
902 char *ret = c->rcurr;
903 ret -= (sizeof(c->binary_header) + c->binary_header.request.keylen +
904 c->binary_header.request.extlen);
905
906 assert(ret >= c->rbuf);
907 return ret;
908 }
909
910 /**
911 * get a pointer to the key in this request
912 */
913 static char* binary_get_key(conn *c) {
914 return c->rcurr - (c->binary_header.request.keylen);
915 }
916
917 static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
918 protocol_binary_response_header* header;
919
920 assert(c);
921
922 c->msgcurr = 0;
923 c->msgused = 0;
924 c->iovused = 0;
925 if (add_msghdr(c) != 0) {
926 /* XXX: out_string is inappropriate here */
927 out_string(c, "SERVER_ERROR out of memory");
928 return;
929 }
930
931 header = (protocol_binary_response_header *)c->wbuf;
932
933 header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
934 header->response.opcode = c->binary_header.request.opcode;
935 header->response.keylen = (uint16_t)htons(key_len);
936
937 header->response.extlen = (uint8_t)hdr_len;
938 header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
939 header->response.status = (uint16_t)htons(err);
940
941 header->response.bodylen = htonl(body_len);
942 header->response.opaque = c->opaque;
943 header->response.cas = htonll(c->cas);
944
945 if (settings.verbose > 1) {
946 int ii;
947 fprintf(stderr, ">%d Writing bin response:", c->sfd);
948 for (ii = 0; ii < sizeof(header->bytes); ++ii) {
949 if (ii % 4 == 0) {
950 fprintf(stderr, "\n>%d ", c->sfd);
951 }
952 fprintf(stderr, " 0x%02x", header->bytes[ii]);
953 }
954 fprintf(stderr, "\n");
955 }
956
957 add_iov(c, c->wbuf, sizeof(header->response));
958 }
959
960 static void write_bin_error(conn *c, protocol_binary_response_status err, int swallow) {
961 const char *errstr = "Unknown error";
962 size_t len;
963
964 switch (err) {
965 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
966 errstr = "Out of memory";
967 break;
968 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
969 errstr = "Unknown command";
970 break;
971 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
972 errstr = "Not found";
973 break;
974 case PROTOCOL_BINARY_RESPONSE_EINVAL:
975 errstr = "Invalid arguments";
976 break;
977 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
978 errstr = "Data exists for key.";
979 break;
980 case PROTOCOL_BINARY_RESPONSE_E2BIG:
981 errstr = "Too large.";
982 break;
983 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
984 errstr = "Non-numeric server-side value for incr or decr";
985 break;
986 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
987 errstr = "Not stored.";
988 break;
989 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
990 errstr = "Auth failure.";
991 break;
992 default:
993 assert(false);
994 errstr = "UNHANDLED ERROR";
995 fprintf(stderr, ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
996 }
997
998 if (settings.verbose > 1) {
999 fprintf(stderr, ">%d Writing an error: %s\n", c->sfd, errstr);
1000 }
1001
1002 len = strlen(errstr);
1003 add_bin_header(c, err, 0, 0, len);
1004 if (len > 0) {
1005 add_iov(c, errstr, len);
1006 }
1007 conn_set_state(c, conn_mwrite);
1008 if(swallow > 0) {
1009 c->sbytes = swallow;
1010 c->write_and_go = conn_swallow;
1011 } else {
1012 c->write_and_go = conn_new_cmd;
1013 }
1014 }
1015
1016 /* Form and send a response to a command over the binary protocol */
1017 static void write_bin_response(conn *c, void *d, int hlen, int keylen, int dlen) {
1018 if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1019 c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1020 add_bin_header(c, 0, hlen, keylen, dlen);
1021 if(dlen > 0) {
1022 add_iov(c, d, dlen);
1023 }
1024 conn_set_state(c, conn_mwrite);
1025 c->write_and_go = conn_new_cmd;
1026 } else {
1027 conn_set_state(c, conn_new_cmd);
1028 }
1029 }
1030
1031 static void complete_incr_bin(conn *c) {
1032 item *it;
1033 char *key;
1034 size_t nkey;
1035 /* Weird magic in add_delta forces me to pad here */
1036 char tmpbuf[INCR_MAX_STORAGE_LEN];
1037 uint64_t cas = 0;
1038
1039 protocol_binary_response_incr* rsp = (protocol_binary_response_incr*)c->wbuf;
1040 protocol_binary_request_incr* req = binary_get_request(c);
1041
1042 assert(c != NULL);
1043 assert(c->wsize >= sizeof(*rsp));
1044
1045 /* fix byteorder in the request */
1046 req->message.body.delta = ntohll(req->message.body.delta);
1047 req->message.body.initial = ntohll(req->message.body.initial);
1048 req->message.body.expiration = ntohl(req->message.body.expiration);
1049 key = binary_get_key(c);
1050 nkey = c->binary_header.request.keylen;
1051
1052 if (settings.verbose > 1) {
1053 int i;
1054 fprintf(stderr, "incr ");
1055
1056 for (i = 0; i < nkey; i++) {
1057 fprintf(stderr, "%c", key[i]);
1058 }
1059 fprintf(stderr, " %lld, %llu, %d\n",
1060 (long long)req->message.body.delta,
1061 (long long)req->message.body.initial,
1062 req->message.body.expiration);
1063 }
1064
1065 if (c->binary_header.request.cas != 0) {
1066 cas = c->binary_header.request.cas;
1067 }
1068 switch(add_delta(c, key, nkey, c->cmd == PROTOCOL_BINARY_CMD_INCREMENT,
1069 req->message.body.delta, tmpbuf,
1070 &cas)) {
1071 case OK:
1072 rsp->message.body.value = htonll(strtoull(tmpbuf, NULL, 10));
1073 if (cas) {
1074 c->cas = cas;
1075 }
1076 write_bin_response(c, &rsp->message.body, 0, 0,
1077 sizeof(rsp->message.body.value));
1078 break;
1079 case NON_NUMERIC:
1080 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
1081 break;
1082 case EOM:
1083 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1084 break;
1085 case DELTA_ITEM_NOT_FOUND:
1086 if (req->message.body.expiration != 0xffffffff) {
1087 /* Save some room for the response */
1088 rsp->message.body.value = htonll(req->message.body.initial);
1089 it = item_alloc(key, nkey, 0, realtime(req->message.body.expiration),
1090 INCR_MAX_STORAGE_LEN);
1091
1092 if (it != NULL) {
1093 snprintf(ITEM_data(it), INCR_MAX_STORAGE_LEN, "%llu",
1094 (unsigned long long)req->message.body.initial);
1095
1096 if (store_item(it, NREAD_ADD, c)) {
1097 c->cas = ITEM_get_cas(it);
1098 write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
1099 } else {
1100 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1101 }
1102 item_remove(it); /* release our reference */
1103 } else {
1104 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1105 }
1106 } else {
1107 pthread_mutex_lock(&c->thread->stats.mutex);
1108 if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1109 c->thread->stats.incr_misses++;
1110 } else {
1111 c->thread->stats.decr_misses++;
1112 }
1113 pthread_mutex_unlock(&c->thread->stats.mutex);
1114
1115 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1116 }
1117 break;
1118 case DELTA_ITEM_CAS_MISMATCH:
1119 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1120 break;
1121 }
1122 }
1123
1124 static void complete_update_bin(conn *c) {
1125 protocol_binary_response_status eno = PROTOCOL_BINARY_RESPONSE_EINVAL;
1126 enum store_item_type ret = NOT_STORED;
1127 assert(c != NULL);
1128
1129 item *it = c->item;
1130
1131 pthread_mutex_lock(&c->thread->stats.mutex);
1132 c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
1133 pthread_mutex_unlock(&c->thread->stats.mutex);
1134
1135 /* We don't actually receive the trailing two characters in the bin
1136 * protocol, so we're going to just set them here */
1137 *(ITEM_data(it) + it->nbytes - 2) = '\r';
1138 *(ITEM_data(it) + it->nbytes - 1) = '\n';
1139
1140 ret = store_item(it, c->cmd, c);
1141
1142 #ifdef ENABLE_DTRACE
1143 uint64_t cas = ITEM_get_cas(it);
1144 switch (c->cmd) {
1145 case NREAD_ADD:
1146 MEMCACHED_COMMAND_ADD(c->sfd, ITEM_key(it), it->nkey,
1147 (ret == STORED) ? it->nbytes : -1, cas);
1148 break;
1149 case NREAD_REPLACE:
1150 MEMCACHED_COMMAND_REPLACE(c->sfd, ITEM_key(it), it->nkey,
1151 (ret == STORED) ? it->nbytes : -1, cas);
1152 break;
1153 case NREAD_APPEND:
1154 MEMCACHED_COMMAND_APPEND(c->sfd, ITEM_key(it), it->nkey,
1155 (ret == STORED) ? it->nbytes : -1, cas);
1156 break;
1157 case NREAD_PREPEND:
1158 MEMCACHED_COMMAND_PREPEND(c->sfd, ITEM_key(it), it->nkey,
1159 (ret == STORED) ? it->nbytes : -1, cas);
1160 break;
1161 case NREAD_SET:
1162 MEMCACHED_COMMAND_SET(c->sfd, ITEM_key(it), it->nkey,
1163 (ret == STORED) ? it->nbytes : -1, cas);
1164 break;
1165 }
1166 #endif
1167
1168 switch (ret) {
1169 case STORED:
1170 /* Stored */
1171 write_bin_response(c, NULL, 0, 0, 0);
1172 break;
1173 case EXISTS:
1174 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1175 break;
1176 case NOT_FOUND:
1177 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1178 break;
1179 case NOT_STORED:
1180 if (c->cmd == NREAD_ADD) {
1181 eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1182 } else if(c->cmd == NREAD_REPLACE) {
1183 eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1184 } else {
1185 eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1186 }
1187 write_bin_error(c, eno, 0);
1188 }
1189
1190 item_remove(c->item); /* release the c->item reference */
1191 c->item = 0;
1192 }
1193
1194 static void process_bin_touch(conn *c) {
1195 item *it;
1196
1197 protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1198 char* key = binary_get_key(c);
1199 size_t nkey = c->binary_header.request.keylen;
1200 protocol_binary_request_touch *t = binary_get_request(c);
1201 time_t exptime = ntohl(t->message.body.expiration);
1202
1203 if (settings.verbose > 1) {
1204 int ii;
1205 /* May be GAT/GATQ/etc */
1206 fprintf(stderr, "<%d TOUCH ", c->sfd);
1207 for (ii = 0; ii < nkey; ++ii) {
1208 fprintf(stderr, "%c", key[ii]);
1209 }
1210 fprintf(stderr, "\n");
1211 }
1212
1213 it = item_touch(key, nkey, realtime(exptime));
1214
1215 if (it) {
1216 /* the length has two unnecessary bytes ("\r\n") */
1217 uint16_t keylen = 0;
1218 uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1219
1220 item_update(it);
1221 pthread_mutex_lock(&c->thread->stats.mutex);
1222 c->thread->stats.touch_cmds++;
1223 c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
1224 pthread_mutex_unlock(&c->thread->stats.mutex);
1225
1226 MEMCACHED_COMMAND_TOUCH(c->sfd, ITEM_key(it), it->nkey,
1227 it->nbytes, ITEM_get_cas(it));
1228
1229 if (c->cmd == PROTOCOL_BINARY_CMD_TOUCH) {
1230 bodylen -= it->nbytes - 2;
1231 } else if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1232 bodylen += nkey;
1233 keylen = nkey;
1234 }
1235
1236 add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1237 rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
1238
1239 // add the flags
1240 rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1241 add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1242
1243 if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1244 add_iov(c, ITEM_key(it), nkey);
1245 }
1246
1247 /* Add the data minus the CRLF */
1248 if (c->cmd != PROTOCOL_BINARY_CMD_TOUCH) {
1249 add_iov(c, ITEM_data(it), it->nbytes - 2);
1250 }
1251
1252 conn_set_state(c, conn_mwrite);
1253 c->write_and_go = conn_new_cmd;
1254 /* Remember this command so we can garbage collect it later */
1255 c->item = it;
1256 } else {
1257 pthread_mutex_lock(&c->thread->stats.mutex);
1258 c->thread->stats.touch_cmds++;
1259 c->thread->stats.touch_misses++;
1260 pthread_mutex_unlock(&c->thread->stats.mutex);
1261
1262 MEMCACHED_COMMAND_TOUCH(c->sfd, key, nkey, -1, 0);
1263
1264 if (c->noreply) {
1265 conn_set_state(c, conn_new_cmd);
1266 } else {
1267 if (c->cmd == PROTOCOL_BINARY_CMD_GATK) {
1268 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1269 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1270 0, nkey, nkey);
1271 memcpy(ofs, key, nkey);
1272 add_iov(c, ofs, nkey);
1273 conn_set_state(c, conn_mwrite);
1274 c->write_and_go = conn_new_cmd;
1275 } else {
1276 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1277 }
1278 }
1279 }
1280
1281 if (settings.detail_enabled) {
1282 stats_prefix_record_get(key, nkey, NULL != it);
1283 }
1284 }
1285
1286 static void process_bin_get(conn *c) {
1287 item *it;
1288
1289 protocol_binary_response_get* rsp = (protocol_binary_response_get*)c->wbuf;
1290 char* key = binary_get_key(c);
1291 size_t nkey = c->binary_header.request.keylen;
1292
1293 if (settings.verbose > 1) {
1294 int ii;
1295 fprintf(stderr, "<%d GET ", c->sfd);
1296 for (ii = 0; ii < nkey; ++ii) {
1297 fprintf(stderr, "%c", key[ii]);
1298 }
1299 fprintf(stderr, "\n");
1300 }
1301
1302 it = item_get(key, nkey);
1303 if (it) {
1304 /* the length has two unnecessary bytes ("\r\n") */
1305 uint16_t keylen = 0;
1306 uint32_t bodylen = sizeof(rsp->message.body) + (it->nbytes - 2);
1307
1308 item_update(it);
1309 pthread_mutex_lock(&c->thread->stats.mutex);
1310 c->thread->stats.get_cmds++;
1311 c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
1312 pthread_mutex_unlock(&c->thread->stats.mutex);
1313
1314 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
1315 it->nbytes, ITEM_get_cas(it));
1316
1317 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1318 bodylen += nkey;
1319 keylen = nkey;
1320 }
1321 add_bin_header(c, 0, sizeof(rsp->message.body), keylen, bodylen);
1322 rsp->message.header.response.cas = htonll(ITEM_get_cas(it));
1323
1324 // add the flags
1325 rsp->message.body.flags = htonl(strtoul(ITEM_suffix(it), NULL, 10));
1326 add_iov(c, &rsp->message.body, sizeof(rsp->message.body));
1327
1328 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1329 add_iov(c, ITEM_key(it), nkey);
1330 }
1331
1332 /* Add the data minus the CRLF */
1333 add_iov(c, ITEM_data(it), it->nbytes - 2);
1334 conn_set_state(c, conn_mwrite);
1335 c->write_and_go = conn_new_cmd;
1336 /* Remember this command so we can garbage collect it later */
1337 c->item = it;
1338 } else {
1339 pthread_mutex_lock(&c->thread->stats.mutex);
1340 c->thread->stats.get_cmds++;
1341 c->thread->stats.get_misses++;
1342 pthread_mutex_unlock(&c->thread->stats.mutex);
1343
1344 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1345
1346 if (c->noreply) {
1347 conn_set_state(c, conn_new_cmd);
1348 } else {
1349 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1350 char *ofs = c->wbuf + sizeof(protocol_binary_response_header);
1351 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1352 0, nkey, nkey);
1353 memcpy(ofs, key, nkey);
1354 add_iov(c, ofs, nkey);
1355 conn_set_state(c, conn_mwrite);
1356 c->write_and_go = conn_new_cmd;
1357 } else {
1358 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1359 }
1360 }
1361 }
1362
1363 if (settings.detail_enabled) {
1364 stats_prefix_record_get(key, nkey, NULL != it);
1365 }
1366 }
1367
1368 static void append_bin_stats(const char *key, const uint16_t klen,
1369 const char *val, const uint32_t vlen,
1370 conn *c) {
1371 char *buf = c->stats.buffer + c->stats.offset;
1372 uint32_t bodylen = klen + vlen;
1373 protocol_binary_response_header header = {
1374 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
1375 .response.opcode = PROTOCOL_BINARY_CMD_STAT,
1376 .response.keylen = (uint16_t)htons(klen),
1377 .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1378 .response.bodylen = htonl(bodylen),
1379 .response.opaque = c->opaque
1380 };
1381
1382 memcpy(buf, header.bytes, sizeof(header.response));
1383 buf += sizeof(header.response);
1384
1385 if (klen > 0) {
1386 memcpy(buf, key, klen);
1387 buf += klen;
1388
1389 if (vlen > 0) {
1390 memcpy(buf, val, vlen);
1391 }
1392 }
1393
1394 c->stats.offset += sizeof(header.response) + bodylen;
1395 }
1396
1397 static void append_ascii_stats(const char *key, const uint16_t klen,
1398 const char *val, const uint32_t vlen,
1399 conn *c) {
1400 char *pos = c->stats.buffer + c->stats.offset;
1401 uint32_t nbytes = 0;
1402 int remaining = c->stats.size - c->stats.offset;
1403 int room = remaining - 1;
1404
1405 if (klen == 0 && vlen == 0) {
1406 nbytes = snprintf(pos, room, "END\r\n");
1407 } else if (vlen == 0) {
1408 nbytes = snprintf(pos, room, "STAT %s\r\n", key);
1409 } else {
1410 nbytes = snprintf(pos, room, "STAT %s %s\r\n", key, val);
1411 }
1412
1413 c->stats.offset += nbytes;
1414 }
1415
1416 static bool grow_stats_buf(conn *c, size_t needed) {
1417 size_t nsize = c->stats.size;
1418 size_t available = nsize - c->stats.offset;
1419 bool rv = true;
1420
1421 /* Special case: No buffer -- need to allocate fresh */
1422 if (c->stats.buffer == NULL) {
1423 nsize = 1024;
1424 available = c->stats.size = c->stats.offset = 0;
1425 }
1426
1427 while (needed > available) {
1428 assert(nsize > 0);
1429 nsize = nsize << 1;
1430 available = nsize - c->stats.offset;
1431 }
1432
1433 if (nsize != c->stats.size) {
1434 char *ptr = realloc(c->stats.buffer, nsize);
1435 if (ptr) {
1436 c->stats.buffer = ptr;
1437 c->stats.size = nsize;
1438 } else {
1439 rv = false;
1440 }
1441 }
1442
1443 return rv;
1444 }
1445
1446 static void append_stats(const char *key, const uint16_t klen,
1447 const char *val, const uint32_t vlen,
1448 const void *cookie)
1449 {
1450 /* value without a key is invalid */
1451 if (klen == 0 && vlen > 0) {
1452 return ;
1453 }
1454
1455 conn *c = (conn*)cookie;
1456
1457 if (c->protocol == binary_prot) {
1458 size_t needed = vlen + klen + sizeof(protocol_binary_response_header);
1459 if (!grow_stats_buf(c, needed)) {
1460 return ;
1461 }
1462 append_bin_stats(key, klen, val, vlen, c);
1463 } else {
1464 size_t needed = vlen + klen + 10; // 10 == "STAT = \r\n"
1465 if (!grow_stats_buf(c, needed)) {
1466 return ;
1467 }
1468 append_ascii_stats(key, klen, val, vlen, c);
1469 }
1470
1471 assert(c->stats.offset <= c->stats.size);
1472 }
1473
1474 static void process_bin_stat(conn *c) {
1475 char *subcommand = binary_get_key(c);
1476 size_t nkey = c->binary_header.request.keylen;
1477
1478 if (settings.verbose > 1) {
1479 int ii;
1480 fprintf(stderr, "<%d STATS ", c->sfd);
1481 for (ii = 0; ii < nkey; ++ii) {
1482 fprintf(stderr, "%c", subcommand[ii]);
1483 }
1484 fprintf(stderr, "\n");
1485 }
1486
1487 if (nkey == 0) {
1488 /* request all statistics */
1489 server_stats(&append_stats, c);
1490 (void)get_stats(NULL, 0, &append_stats, c);
1491 } else if (strncmp(subcommand, "reset", 5) == 0) {
1492 stats_reset();
1493 } else if (strncmp(subcommand, "settings", 8) == 0) {
1494 process_stat_settings(&append_stats, c);
1495 } else if (strncmp(subcommand, "detail", 6) == 0) {
1496 char *subcmd_pos = subcommand + 6;
1497 if (strncmp(subcmd_pos, " dump", 5) == 0) {
1498 int len;
1499 char *dump_buf = stats_prefix_dump(&len);
1500 if (dump_buf == NULL || len <= 0) {
1501 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1502 return ;
1503 } else {
1504 append_stats("detailed", strlen("detailed"), dump_buf, len, c);
1505 free(dump_buf);
1506 }
1507 } else if (strncmp(subcmd_pos, " on", 3) == 0) {
1508 settings.detail_enabled = 1;
1509 } else if (strncmp(subcmd_pos, " off", 4) == 0) {
1510 settings.detail_enabled = 0;
1511 } else {
1512 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1513 return;
1514 }
1515 } else {
1516 if (get_stats(subcommand, nkey, &append_stats, c)) {
1517 if (c->stats.buffer == NULL) {
1518 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1519 } else {
1520 write_and_free(c, c->stats.buffer, c->stats.offset);
1521 c->stats.buffer = NULL;
1522 }
1523 } else {
1524 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1525 }
1526
1527 return;
1528 }
1529
1530 /* Append termination package and start the transfer */
1531 append_stats(NULL, 0, NULL, 0, c);
1532 if (c->stats.buffer == NULL) {
1533 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1534 } else {
1535 write_and_free(c, c->stats.buffer, c->stats.offset);
1536 c->stats.buffer = NULL;
1537 }
1538 }
1539
1540 static void bin_read_key(conn *c, enum bin_substates next_substate, int extra) {
1541 assert(c);
1542 c->substate = next_substate;
1543 c->rlbytes = c->keylen + extra;
1544
1545 /* Ok... do we have room for the extras and the key in the input buffer? */
1546 ptrdiff_t offset = c->rcurr + sizeof(protocol_binary_request_header) - c->rbuf;
1547 if (c->rlbytes > c->rsize - offset) {
1548 size_t nsize = c->rsize;
1549 size_t size = c->rlbytes + sizeof(protocol_binary_request_header);
1550
1551 while (size > nsize) {
1552 nsize *= 2;
1553 }
1554
1555 if (nsize != c->rsize) {
1556 if (settings.verbose > 1) {
1557 fprintf(stderr, "%d: Need to grow buffer from %lu to %lu\n",
1558 c->sfd, (unsigned long)c->rsize, (unsigned long)nsize);
1559 }
1560 char *newm = realloc(c->rbuf, nsize);
1561 if (newm == NULL) {
1562 if (settings.verbose) {
1563 fprintf(stderr, "%d: Failed to grow buffer.. closing connection\n",
1564 c->sfd);
1565 }
1566 conn_set_state(c, conn_closing);
1567 return;
1568 }
1569
1570 c->rbuf= newm;
1571 /* rcurr should point to the same offset in the packet */
1572 c->rcurr = c->rbuf + offset - sizeof(protocol_binary_request_header);
1573 c->rsize = nsize;
1574 }
1575 if (c->rbuf != c->rcurr) {
1576 memmove(c->rbuf, c->rcurr, c->rbytes);
1577 c->rcurr = c->rbuf;
1578 if (settings.verbose > 1) {
1579 fprintf(stderr, "%d: Repack input buffer\n", c->sfd);
1580 }
1581 }
1582 }
1583
1584 /* preserve the header in the buffer.. */
1585 c->ritem = c->rcurr + sizeof(protocol_binary_request_header);
1586 conn_set_state(c, conn_nread);
1587 }
1588
1589 /* Just write an error message and disconnect the client */
1590 static void handle_binary_protocol_error(conn *c) {
1591 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
1592 if (settings.verbose) {
1593 fprintf(stderr, "Protocol error (opcode %02x), close connection %d\n",
1594 c->binary_header.request.opcode, c->sfd);
1595 }
1596 c->write_and_go = conn_closing;
1597 }
1598
1599 static void init_sasl_conn(conn *c) {
1600 assert(c);
1601 /* should something else be returned? */
1602 if (!settings.sasl)
1603 return;
1604
1605 if (!c->sasl_conn) {
1606 int result=sasl_server_new("memcached",
1607 NULL,
1608 my_sasl_hostname[0] ? my_sasl_hostname : NULL,
1609 NULL, NULL,
1610 NULL, 0, &c->sasl_conn);
1611 if (result != SASL_OK) {
1612 if (settings.verbose) {
1613 fprintf(stderr, "Failed to initialize SASL conn.\n");
1614 }
1615 c->sasl_conn = NULL;
1616 }
1617 }
1618 }
1619
1620 static void bin_list_sasl_mechs(conn *c) {
1621 // Guard against a disabled SASL.
1622 if (!settings.sasl) {
1623 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
1624 c->binary_header.request.bodylen
1625 - c->binary_header.request.keylen);
1626 return;
1627 }
1628
1629 init_sasl_conn(c);
1630 const char *result_string = NULL;
1631 unsigned int string_length = 0;
1632 int result=sasl_listmech(c->sasl_conn, NULL,
1633 "", /* What to prepend the string with */
1634 " ", /* What to separate mechanisms with */
1635 "", /* What to append to the string */
1636 &result_string, &string_length,
1637 NULL);
1638 if (result != SASL_OK) {
1639 /* Perhaps there's a better error for this... */
1640 if (settings.verbose) {
1641 fprintf(stderr, "Failed to list SASL mechanisms.\n");
1642 }
1643 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1644 return;
1645 }
1646 write_bin_response(c, (char*)result_string, 0, 0, string_length);
1647 }
1648
1649 static void process_bin_sasl_auth(conn *c) {
1650 // Guard for handling disabled SASL on the server.
1651 if (!settings.sasl) {
1652 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
1653 c->binary_header.request.bodylen
1654 - c->binary_header.request.keylen);
1655 return;
1656 }
1657
1658 assert(c->binary_header.request.extlen == 0);
1659
1660 int nkey = c->binary_header.request.keylen;
1661 int vlen = c->binary_header.request.bodylen - nkey;
1662
1663 if (nkey > MAX_SASL_MECH_LEN) {
1664 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
1665 c->write_and_go = conn_swallow;
1666 return;
1667 }
1668
1669 char *key = binary_get_key(c);
1670 assert(key);
1671
1672 item *it = item_alloc(key, nkey, 0, 0, vlen);
1673
1674 if (it == 0) {
1675 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
1676 c->write_and_go = conn_swallow;
1677 return;
1678 }
1679
1680 c->item = it;
1681 c->ritem = ITEM_data(it);
1682 c->rlbytes = vlen;
1683 conn_set_state(c, conn_nread);
1684 c->substate = bin_reading_sasl_auth_data;
1685 }
1686
1687 static void process_bin_complete_sasl_auth(conn *c) {
1688 assert(settings.sasl);
1689 const char *out = NULL;
1690 unsigned int outlen = 0;
1691
1692 assert(c->item);
1693 init_sasl_conn(c);
1694
1695 int nkey = c->binary_header.request.keylen;
1696 int vlen = c->binary_header.request.bodylen - nkey;
1697
1698 char mech[nkey+1];
1699 memcpy(mech, ITEM_key((item*)c->item), nkey);
1700 mech[nkey] = 0x00;
1701
1702 if (settings.verbose)
1703 fprintf(stderr, "mech: ``%s'' with %d bytes of data\n", mech, vlen);
1704
1705 const char *challenge = vlen == 0 ? NULL : ITEM_data((item*) c->item);
1706
1707 int result=-1;
1708
1709 switch (c->cmd) {
1710 case PROTOCOL_BINARY_CMD_SASL_AUTH:
1711 result = sasl_server_start(c->sasl_conn, mech,
1712 challenge, vlen,
1713 &out, &outlen);
1714 break;
1715 case PROTOCOL_BINARY_CMD_SASL_STEP:
1716 result = sasl_server_step(c->sasl_conn,
1717 challenge, vlen,
1718 &out, &outlen);
1719 break;
1720 default:
1721 assert(false); /* CMD should be one of the above */
1722 /* This code is pretty much impossible, but makes the compiler
1723 happier */
1724 if (settings.verbose) {
1725 fprintf(stderr, "Unhandled command %d with challenge %s\n",
1726 c->cmd, challenge);
1727 }
1728 break;
1729 }
1730
1731 item_unlink(c->item);
1732
1733 if (settings.verbose) {
1734 fprintf(stderr, "sasl result code: %d\n", result);
1735 }
1736
1737 switch(result) {
1738 case SASL_OK:
1739 write_bin_response(c, "Authenticated", 0, 0, strlen("Authenticated"));
1740 pthread_mutex_lock(&c->thread->stats.mutex);
1741 c->thread->stats.auth_cmds++;
1742 pthread_mutex_unlock(&c->thread->stats.mutex);
1743 break;
1744 case SASL_CONTINUE:
1745 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
1746 if(outlen > 0) {
1747 add_iov(c, out, outlen);
1748 }
1749 conn_set_state(c, conn_mwrite);
1750 c->write_and_go = conn_new_cmd;
1751 break;
1752 default:
1753 if (settings.verbose)
1754 fprintf(stderr, "Unknown sasl response: %d\n", result);
1755 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1756 pthread_mutex_lock(&c->thread->stats.mutex);
1757 c->thread->stats.auth_cmds++;
1758 c->thread->stats.auth_errors++;
1759 pthread_mutex_unlock(&c->thread->stats.mutex);
1760 }
1761 }
1762
1763 static bool authenticated(conn *c) {
1764 assert(settings.sasl);
1765 bool rv = false;
1766
1767 switch (c->cmd) {
1768 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: /* FALLTHROUGH */
1769 case PROTOCOL_BINARY_CMD_SASL_AUTH: /* FALLTHROUGH */
1770 case PROTOCOL_BINARY_CMD_SASL_STEP: /* FALLTHROUGH */
1771 case PROTOCOL_BINARY_CMD_VERSION: /* FALLTHROUGH */
1772 rv = true;
1773 break;
1774 default:
1775 if (c->sasl_conn) {
1776 const void *uname = NULL;
1777 sasl_getprop(c->sasl_conn, SASL_USERNAME, &uname);
1778 rv = uname != NULL;
1779 }
1780 }
1781
1782 if (settings.verbose > 1) {
1783 fprintf(stderr, "authenticated() in cmd 0x%02x is %s\n",
1784 c->cmd, rv ? "true" : "false");
1785 }
1786
1787 return rv;
1788 }
1789
1790 static void dispatch_bin_command(conn *c) {
1791 int protocol_error = 0;
1792
1793 int extlen = c->binary_header.request.extlen;
1794 int keylen = c->binary_header.request.keylen;
1795 uint32_t bodylen = c->binary_header.request.bodylen;
1796
1797 if (settings.sasl && !authenticated(c)) {
1798 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
1799 c->write_and_go = conn_closing;
1800 return;
1801 }
1802
1803 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
1804 c->noreply = true;
1805
1806 /* binprot supports 16bit keys, but internals are still 8bit */
1807 if (keylen > KEY_MAX_LENGTH) {
1808 handle_binary_protocol_error(c);
1809 return;
1810 }
1811
1812 switch (c->cmd) {
1813 case PROTOCOL_BINARY_CMD_SETQ:
1814 c->cmd = PROTOCOL_BINARY_CMD_SET;
1815 break;
1816 case PROTOCOL_BINARY_CMD_ADDQ:
1817 c->cmd = PROTOCOL_BINARY_CMD_ADD;
1818 break;
1819 case PROTOCOL_BINARY_CMD_REPLACEQ:
1820 c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
1821 break;
1822 case PROTOCOL_BINARY_CMD_DELETEQ:
1823 c->cmd = PROTOCOL_BINARY_CMD_DELETE;
1824 break;
1825 case PROTOCOL_BINARY_CMD_INCREMENTQ:
1826 c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
1827 break;
1828 case PROTOCOL_BINARY_CMD_DECREMENTQ:
1829 c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
1830 break;
1831 case PROTOCOL_BINARY_CMD_QUITQ:
1832 c->cmd = PROTOCOL_BINARY_CMD_QUIT;
1833 break;
1834 case PROTOCOL_BINARY_CMD_FLUSHQ:
1835 c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
1836 break;
1837 case PROTOCOL_BINARY_CMD_APPENDQ:
1838 c->cmd = PROTOCOL_BINARY_CMD_APPEND;
1839 break;
1840 case PROTOCOL_BINARY_CMD_PREPENDQ:
1841 c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
1842 break;
1843 case PROTOCOL_BINARY_CMD_GETQ:
1844 c->cmd = PROTOCOL_BINARY_CMD_GET;
1845 break;
1846 case PROTOCOL_BINARY_CMD_GETKQ:
1847 c->cmd = PROTOCOL_BINARY_CMD_GETK;
1848 break;
1849 case PROTOCOL_BINARY_CMD_GATQ:
1850 c->cmd = PROTOCOL_BINARY_CMD_GAT;
1851 break;
1852 case PROTOCOL_BINARY_CMD_GATKQ:
1853 c->cmd = PROTOCOL_BINARY_CMD_GAT;
1854 break;
1855 default:
1856 c->noreply = false;
1857 }
1858
1859 switch (c->cmd) {
1860 case PROTOCOL_BINARY_CMD_VERSION:
1861 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1862 write_bin_response(c, VERSION, 0, 0, strlen(VERSION));
1863 } else {
1864 protocol_error = 1;
1865 }
1866 break;
1867 case PROTOCOL_BINARY_CMD_FLUSH:
1868 if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
1869 bin_read_key(c, bin_read_flush_exptime, extlen);
1870 } else {
1871 protocol_error = 1;
1872 }
1873 break;
1874 case PROTOCOL_BINARY_CMD_NOOP:
1875 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1876 write_bin_response(c, NULL, 0, 0, 0);
1877 } else {
1878 protocol_error = 1;
1879 }
1880 break;
1881 case PROTOCOL_BINARY_CMD_SET: /* FALLTHROUGH */
1882 case PROTOCOL_BINARY_CMD_ADD: /* FALLTHROUGH */
1883 case PROTOCOL_BINARY_CMD_REPLACE:
1884 if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
1885 bin_read_key(c, bin_reading_set_header, 8);
1886 } else {
1887 protocol_error = 1;
1888 }
1889 break;
1890 case PROTOCOL_BINARY_CMD_GETQ: /* FALLTHROUGH */
1891 case PROTOCOL_BINARY_CMD_GET: /* FALLTHROUGH */
1892 case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
1893 case PROTOCOL_BINARY_CMD_GETK:
1894 if (extlen == 0 && bodylen == keylen && keylen > 0) {
1895 bin_read_key(c, bin_reading_get_key, 0);
1896 } else {
1897 protocol_error = 1;
1898 }
1899 break;
1900 case PROTOCOL_BINARY_CMD_DELETE:
1901 if (keylen > 0 && extlen == 0 && bodylen == keylen) {
1902 bin_read_key(c, bin_reading_del_header, extlen);
1903 } else {
1904 protocol_error = 1;
1905 }
1906 break;
1907 case PROTOCOL_BINARY_CMD_INCREMENT:
1908 case PROTOCOL_BINARY_CMD_DECREMENT:
1909 if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
1910 bin_read_key(c, bin_reading_incr_header, 20);
1911 } else {
1912 protocol_error = 1;
1913 }
1914 break;
1915 case PROTOCOL_BINARY_CMD_APPEND:
1916 case PROTOCOL_BINARY_CMD_PREPEND:
1917 if (keylen > 0 && extlen == 0) {
1918 bin_read_key(c, bin_reading_set_header, 0);
1919 } else {
1920 protocol_error = 1;
1921 }
1922 break;
1923 case PROTOCOL_BINARY_CMD_STAT:
1924 if (extlen == 0) {
1925 bin_read_key(c, bin_reading_stat, 0);
1926 } else {
1927 protocol_error = 1;
1928 }
1929 break;
1930 case PROTOCOL_BINARY_CMD_QUIT:
1931 if (keylen == 0 && extlen == 0 && bodylen == 0) {
1932 write_bin_response(c, NULL, 0, 0, 0);
1933 c->write_and_go = conn_closing;
1934 if (c->noreply) {
1935 conn_set_state(c, conn_closing);
1936 }
1937 } else {
1938 protocol_error = 1;
1939 }
1940 break;
1941 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
1942 if (extlen == 0 && keylen == 0 && bodylen == 0) {
1943 bin_list_sasl_mechs(c);
1944 } else {
1945 protocol_error = 1;
1946 }
1947 break;
1948 case PROTOCOL_BINARY_CMD_SASL_AUTH:
1949 case PROTOCOL_BINARY_CMD_SASL_STEP:
1950 if (extlen == 0 && keylen != 0) {
1951 bin_read_key(c, bin_reading_sasl_auth, 0);
1952 } else {
1953 protocol_error = 1;
1954 }
1955 break;
1956 case PROTOCOL_BINARY_CMD_TOUCH:
1957 case PROTOCOL_BINARY_CMD_GAT:
1958 case PROTOCOL_BINARY_CMD_GATQ:
1959 case PROTOCOL_BINARY_CMD_GATK:
1960 case PROTOCOL_BINARY_CMD_GATKQ:
1961 if (extlen == 4 && keylen != 0) {
1962 bin_read_key(c, bin_reading_touch_key, 4);
1963 } else {
1964 protocol_error = 1;
1965 }
1966 break;
1967 default:
1968 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, bodylen);
1969 }
1970
1971 if (protocol_error)
1972 handle_binary_protocol_error(c);
1973 }
1974
1975 static void process_bin_update(conn *c) {
1976 char *key;
1977 int nkey;
1978 int vlen;
1979 item *it;
1980 protocol_binary_request_set* req = binary_get_request(c);
1981
1982 assert(c != NULL);
1983
1984 key = binary_get_key(c);
1985 nkey = c->binary_header.request.keylen;
1986
1987 /* fix byteorder in the request */
1988 req->message.body.flags = ntohl(req->message.body.flags);
1989 req->message.body.expiration = ntohl(req->message.body.expiration);
1990
1991 vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
1992
1993 if (settings.verbose > 1) {
1994 int ii;
1995 if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
1996 fprintf(stderr, "<%d ADD ", c->sfd);
1997 } else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
1998 fprintf(stderr, "<%d SET ", c->sfd);
1999 } else {
2000 fprintf(stderr, "<%d REPLACE ", c->sfd);
2001 }
2002 for (ii = 0; ii < nkey; ++ii) {
2003 fprintf(stderr, "%c", key[ii]);
2004 }
2005
2006 fprintf(stderr, " Value len is %d", vlen);
2007 fprintf(stderr, "\n");
2008 }
2009
2010 if (settings.detail_enabled) {
2011 stats_prefix_record_set(key, nkey);
2012 }
2013
2014 it = item_alloc(key, nkey, req->message.body.flags,
2015 realtime(req->message.body.expiration), vlen+2);
2016
2017 if (it == 0) {
2018 if (! item_size_ok(nkey, req->message.body.flags, vlen + 2)) {
2019 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
2020 } else {
2021 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2022 }
2023
2024 /* Avoid stale data persisting in cache because we failed alloc.
2025 * Unacceptable for SET. Anywhere else too? */
2026 if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
2027 it = item_get(key, nkey);
2028 if (it) {
2029 item_unlink(it);
2030 item_remove(it);
2031 }
2032 }
2033
2034 /* swallow the data line */
2035 c->write_and_go = conn_swallow;
2036 return;
2037 }
2038
2039 ITEM_set_cas(it, c->binary_header.request.cas);
2040
2041 switch (c->cmd) {
2042 case PROTOCOL_BINARY_CMD_ADD:
2043 c->cmd = NREAD_ADD;
2044 break;
2045 case PROTOCOL_BINARY_CMD_SET:
2046 c->cmd = NREAD_SET;
2047 break;
2048 case PROTOCOL_BINARY_CMD_REPLACE:
2049 c->cmd = NREAD_REPLACE;
2050 break;
2051 default:
2052 assert(0);
2053 }
2054
2055 if (ITEM_get_cas(it) != 0) {
2056 c->cmd = NREAD_CAS;
2057 }
2058
2059 c->item = it;
2060 c->ritem = ITEM_data(it);
2061 c->rlbytes = vlen;
2062 conn_set_state(c, conn_nread);
2063 c->substate = bin_read_set_value;
2064 }
2065
2066 static void process_bin_append_prepend(conn *c) {
2067 char *key;
2068 int nkey;
2069 int vlen;
2070 item *it;
2071
2072 assert(c != NULL);
2073
2074 key = binary_get_key(c);
2075 nkey = c->binary_header.request.keylen;
2076 vlen = c->binary_header.request.bodylen - nkey;
2077
2078 if (settings.verbose > 1) {
2079 fprintf(stderr, "Value len is %d\n", vlen);
2080 }
2081
2082 if (settings.detail_enabled) {
2083 stats_prefix_record_set(key, nkey);
2084 }
2085
2086 it = item_alloc(key, nkey, 0, 0, vlen+2);
2087
2088 if (it == 0) {
2089 if (! item_size_ok(nkey, 0, vlen + 2)) {
2090 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
2091 } else {
2092 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2093 }
2094 /* swallow the data line */
2095 c->write_and_go = conn_swallow;
2096 return;
2097 }
2098
2099 ITEM_set_cas(it, c->binary_header.request.cas);
2100
2101 switch (c->cmd) {
2102 case PROTOCOL_BINARY_CMD_APPEND:
2103 c->cmd = NREAD_APPEND;
2104 break;
2105 case PROTOCOL_BINARY_CMD_PREPEND:
2106 c->cmd = NREAD_PREPEND;
2107 break;
2108 default:
2109 assert(0);
2110 }
2111
2112 c->item = it;
2113 c->ritem = ITEM_data(it);
2114 c->rlbytes = vlen;
2115 conn_set_state(c, conn_nread);
2116 c->substate = bin_read_set_value;
2117 }
2118
2119 static void process_bin_flush(conn *c) {
2120 time_t exptime = 0;
2121 protocol_binary_request_flush* req = binary_get_request(c);
2122
2123 if (c->binary_header.request.extlen == sizeof(req->message.body)) {
2124 exptime = ntohl(req->message.body.expiration);
2125 }
2126
2127 if (exptime > 0) {
2128 settings.oldest_live = realtime(exptime) - 1;
2129 } else {
2130 settings.oldest_live = current_time - 1;
2131 }
2132 item_flush_expired();
2133
2134 pthread_mutex_lock(&c->thread->stats.mutex);
2135 c->thread->stats.flush_cmds++;
2136 pthread_mutex_unlock(&c->thread->stats.mutex);
2137
2138 write_bin_response(c, NULL, 0, 0, 0);
2139 }
2140
2141 static void process_bin_delete(conn *c) {
2142 item *it;
2143
2144 protocol_binary_request_delete* req = binary_get_request(c);
2145
2146 char* key = binary_get_key(c);
2147 size_t nkey = c->binary_header.request.keylen;
2148
2149 assert(c != NULL);
2150
2151 if (settings.verbose > 1) {
2152 fprintf(stderr, "Deleting %s\n", key);
2153 }
2154
2155 if (settings.detail_enabled) {
2156 stats_prefix_record_delete(key, nkey);
2157 }
2158
2159 it = item_get(key, nkey);
2160 if (it) {
2161 uint64_t cas = ntohll(req->message.header.request.cas);
2162 if (cas == 0 || cas == ITEM_get_cas(it)) {
2163 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
2164 pthread_mutex_lock(&c->thread->stats.mutex);
2165 c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
2166 pthread_mutex_unlock(&c->thread->stats.mutex);
2167 item_unlink(it);
2168 write_bin_response(c, NULL, 0, 0, 0);
2169 } else {
2170 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
2171 }
2172 item_remove(it); /* release our reference */
2173 } else {
2174 write_bin_error(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
2175 pthread_mutex_lock(&c->thread->stats.mutex);
2176 c->thread->stats.delete_misses++;
2177 pthread_mutex_unlock(&c->thread->stats.mutex);
2178 }
2179 }
2180
2181 static void complete_nread_binary(conn *c) {
2182 assert(c != NULL);
2183 assert(c->cmd >= 0);
2184
2185 switch(c->substate) {
2186 case bin_reading_set_header:
2187 if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
2188 c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
2189 process_bin_append_prepend(c);
2190 } else {
2191 process_bin_update(c);
2192 }
2193 break;
2194 case bin_read_set_value:
2195 complete_update_bin(c);
2196 break;
2197 case bin_reading_get_key:
2198 process_bin_get(c);
2199 break;
2200 case bin_reading_touch_key:
2201 process_bin_touch(c);
2202 break;
2203 case bin_reading_stat:
2204 process_bin_stat(c);
2205 break;
2206 case bin_reading_del_header:
2207 process_bin_delete(c);
2208 break;
2209 case bin_reading_incr_header:
2210 complete_incr_bin(c);
2211 break;
2212 case bin_read_flush_exptime:
2213 process_bin_flush(c);
2214 break;
2215 case bin_reading_sasl_auth:
2216 process_bin_sasl_auth(c);
2217 break;
2218 case bin_reading_sasl_auth_data:
2219 process_bin_complete_sasl_auth(c);
2220 break;
2221 default:
2222 fprintf(stderr, "Not handling substate %d\n", c->substate);
2223 assert(0);
2224 }
2225 }
2226
2227 static void reset_cmd_handler(conn *c) {
2228 c->cmd = -1;
2229 c->substate = bin_no_state;
2230 if(c->item != NULL) {
2231 item_remove(c->item);
2232 c->item = NULL;
2233 }
2234 conn_shrink(c);
2235 if (c->rbytes > 0) {
2236 conn_set_state(c, conn_parse_cmd);
2237 } else {
2238 conn_set_state(c, conn_waiting);
2239 }
2240 }
2241
2242 static void complete_nread(conn *c) {
2243 assert(c != NULL);
2244 assert(c->protocol == ascii_prot
2245 || c->protocol == binary_prot);
2246
2247 if (c->protocol == ascii_prot) {
2248 complete_nread_ascii(c);
2249 } else if (c->protocol == binary_prot) {
2250 complete_nread_binary(c);
2251 }
2252 }
2253
2254 /*
2255 * Stores an item in the cache according to the semantics of one of the set
2256 * commands. In threaded mode, this is protected by the cache lock.
2257 *
2258 * Returns the state of storage.
2259 */
2260 enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t hv) {
2261 char *key = ITEM_key(it);
2262 item *old_it = do_item_get(key, it->nkey, hv);
2263 enum store_item_type stored = NOT_STORED;
2264
2265 item *new_it = NULL;
2266 int flags;
2267
2268 if (old_it != NULL && comm == NREAD_ADD) {
2269 /* add only adds a nonexistent item, but promote to head of LRU */
2270 do_item_update(old_it);
2271 } else if (!old_it && (comm == NREAD_REPLACE
2272 || comm == NREAD_APPEND || comm == NREAD_PREPEND))
2273 {
2274 /* replace only replaces an existing value; don't store */
2275 } else if (comm == NREAD_CAS) {
2276 /* validate cas operation */
2277 if(old_it == NULL) {
2278 // LRU expired
2279 stored = NOT_FOUND;
2280 pthread_mutex_lock(&c->thread->stats.mutex);
2281 c->thread->stats.cas_misses++;
2282 pthread_mutex_unlock(&c->thread->stats.mutex);
2283 }
2284 else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
2285 // cas validates
2286 // it and old_it may belong to different classes.
2287 // I'm updating the stats for the one that's getting pushed out
2288 pthread_mutex_lock(&c->thread->stats.mutex);
2289 c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
2290 pthread_mutex_unlock(&c->thread->stats.mutex);
2291
2292 item_replace(old_it, it, hv);
2293 stored = STORED;
2294 } else {
2295 pthread_mutex_lock(&c->thread->stats.mutex);
2296 c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
2297 pthread_mutex_unlock(&c->thread->stats.mutex);
2298
2299 if(settings.verbose > 1) {
2300 fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
2301 (unsigned long long)ITEM_get_cas(old_it),
2302 (unsigned long long)ITEM_get_cas(it));
2303 }
2304 stored = EXISTS;
2305 }
2306 } else {
2307 /*
2308 * Append - combine new and old record into single one. Here it's
2309 * atomic and thread-safe.
2310 */
2311 if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
2312 /*
2313 * Validate CAS
2314 */
2315 if (ITEM_get_cas(it) != 0) {
2316 // CAS much be equal
2317 if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
2318 stored = EXISTS;
2319 }
2320 }
2321
2322 if (stored == NOT_STORED) {
2323 /* we have it and old_it here - alloc memory to hold both */
2324 /* flags was already lost - so recover them from ITEM_suffix(it) */
2325
2326 flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
2327
2328 new_it = item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);
2329
2330 if (new_it == NULL) {
2331 /* SERVER_ERROR out of memory */
2332 if (old_it != NULL)
2333 do_item_remove(old_it);
2334
2335 return NOT_STORED;
2336 }
2337
2338 /* copy data from it and old_it to new_it */
2339
2340 if (comm == NREAD_APPEND) {
2341 memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
2342 memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
2343 } else {
2344 /* NREAD_PREPEND */
2345 memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
2346 memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
2347 }
2348
2349 it = new_it;
2350 }
2351 }
2352
2353 if (stored == NOT_STORED) {
2354 if (old_it != NULL)
2355 item_replace(old_it, it, hv);
2356 else
2357 do_item_link(it, hv);
2358
2359 c->cas = ITEM_get_cas(it);
2360
2361 stored = STORED;
2362 }
2363 }
2364
2365 if (old_it != NULL)
2366 do_item_remove(old_it); /* release our reference */
2367 if (new_it != NULL)
2368 do_item_remove(new_it);
2369
2370 if (stored == STORED) {
2371 c->cas = ITEM_get_cas(it);
2372 }
2373
2374 return stored;
2375 }
2376
2377 typedef struct token_s {
2378 char *value;
2379 size_t length;
2380 } token_t;
2381
2382 #define COMMAND_TOKEN 0
2383 #define SUBCOMMAND_TOKEN 1
2384 #define KEY_TOKEN 1
2385
2386 #define MAX_TOKENS 8
2387
2388 /*
2389 * Tokenize the command string by replacing whitespace with '\0' and update
2390 * the token array tokens with pointer to start of each token and length.
2391 * Returns total number of tokens. The last valid token is the terminal
2392 * token (value points to the first unprocessed character of the string and
2393 * length zero).
2394 *
2395 * Usage example:
2396 *
2397 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2398 * for(int ix = 0; tokens[ix].length != 0; ix++) {
2399 * ...
2400 * }
2401 * ncommand = tokens[ix].value - command;
2402 * command = tokens[ix].value;
2403 * }
2404 */
2405 static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {
2406 char *s, *e;
2407 size_t ntokens = 0;
2408 size_t len = strlen(command);
2409 unsigned int i = 0;
2410
2411 assert(command != NULL && tokens != NULL && max_tokens > 1);
2412
2413 s = e = command;
2414 for (i = 0; i < len; i++) {
2415 if (*e == ' ') {
2416 if (s != e) {
2417 tokens[ntokens].value = s;
2418 tokens[ntokens].length = e - s;
2419 ntokens++;
2420 *e = '\0';
2421 if (ntokens == max_tokens - 1) {
2422 e++;
2423 s = e; /* so we don't add an extra token */
2424 break;
2425 }
2426 }
2427 s = e + 1;
2428 }
2429 e++;
2430 }
2431
2432 if (s != e) {
2433 tokens[ntokens].value = s;
2434 tokens[ntokens].length = e - s;
2435 ntokens++;
2436 }
2437
2438 /*
2439 * If we scanned the whole string, the terminal value pointer is null,
2440 * otherwise it is the first unprocessed character.
2441 */
2442 tokens[ntokens].value = *e == '\0' ? NULL : e;
2443 tokens[ntokens].length = 0;
2444 ntokens++;
2445
2446 return ntokens;
2447 }
2448
2449 /* set up a connection to write a buffer then free it, used for stats */
2450 static void write_and_free(conn *c, char *buf, int bytes) {
2451 if (buf) {
2452 c->write_and_free = buf;
2453 c->wcurr = buf;
2454 c->wbytes = bytes;
2455 conn_set_state(c, conn_write);
2456 c->write_and_go = conn_new_cmd;
2457 } else {
2458 out_string(c, "SERVER_ERROR out of memory writing stats");
2459 }
2460 }
2461
2462 static inline bool set_noreply_maybe(conn *c, token_t *tokens, size_t ntokens)
2463 {
2464 int noreply_index = ntokens - 2;
2465
2466 /*
2467 NOTE: this function is not the first place where we are going to
2468 send the reply. We could send it instead from process_command()
2469 if the request line has wrong number of tokens. However parsing
2470 malformed line for "noreply" option is not reliable anyway, so
2471 it can't be helped.
2472 */
2473 if (tokens[noreply_index].value
2474 && strcmp(tokens[noreply_index].value, "noreply") == 0) {
2475 c->noreply = true;
2476 }
2477 return c->noreply;
2478 }
2479
2480 void append_stat(const char *name, ADD_STAT add_stats, conn *c,
2481 const char *fmt, ...) {
2482 char val_str[STAT_VAL_LEN];
2483 int vlen;
2484 va_list ap;
2485
2486 assert(name);
2487 assert(add_stats);
2488 assert(c);
2489 assert(fmt);
2490
2491 va_start(ap, fmt);
2492 vlen = vsnprintf(val_str, sizeof(val_str) - 1, fmt, ap);
2493 va_end(ap);
2494
2495 add_stats(name, strlen(name), val_str, vlen, c);
2496 }
2497
2498 inline static void process_stats_detail(conn *c, const char *command) {
2499 assert(c != NULL);
2500
2501 if (strcmp(command, "on") == 0) {
2502 settings.detail_enabled = 1;
2503 out_string(c, "OK");
2504 }
2505 else if (strcmp(command, "off") == 0) {
2506 settings.detail_enabled = 0;
2507 out_string(c, "OK");
2508 }
2509 else if (strcmp(command, "dump") == 0) {
2510 int len;
2511 char *stats = stats_prefix_dump(&len);
2512 write_and_free(c, stats, len);
2513 }
2514 else {
2515 out_string(c, "CLIENT_ERROR usage: stats detail on|off|dump");
2516 }
2517 }
2518
2519 /* return server specific stats only */
2520 static void server_stats(ADD_STAT add_stats, conn *c) {
2521 pid_t pid = getpid();
2522 rel_time_t now = current_time;
2523
2524 struct thread_stats thread_stats;
2525 threadlocal_stats_aggregate(&thread_stats);
2526 struct slab_stats slab_stats;
2527 slab_stats_aggregate(&thread_stats, &slab_stats);
2528
2529 #ifndef WIN32
2530 struct rusage usage;
2531 getrusage(RUSAGE_SELF, &usage);
2532 #endif /* !WIN32 */
2533
2534 STATS_LOCK();
2535
2536 APPEND_STAT("pid", "%lu", (long)pid);
2537 APPEND_STAT("uptime", "%u", now);
2538 APPEND_STAT("time", "%ld", now + (long)process_started);
2539 APPEND_STAT("version", "%s", VERSION);
2540 APPEND_STAT("libevent", "%s", event_get_version());
2541 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2542
2543 #ifndef WIN32
2544 append_stat("rusage_user", add_stats, c, "%ld.%06ld",
2545 (long)usage.ru_utime.tv_sec,
2546 (long)usage.ru_utime.tv_usec);
2547 append_stat("rusage_system", add_stats, c, "%ld.%06ld",
2548 (long)usage.ru_stime.tv_sec,
2549 (long)usage.ru_stime.tv_usec);
2550 #endif /* !WIN32 */
2551
2552 APPEND_STAT("curr_connections", "%u", stats.curr_conns - 1);
2553 APPEND_STAT("total_connections", "%u", stats.total_conns);
2554 if (settings.maxconns_fast) {
2555 APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats.rejected_conns);
2556 }
2557 APPEND_STAT("connection_structures", "%u", stats.conn_structs);
2558 APPEND_STAT("reserved_fds", "%u", stats.reserved_fds);
2559 APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats.get_cmds);
2560 APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats.set_cmds);
2561 APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats.flush_cmds);
2562 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats.touch_cmds);
2563 APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats.get_hits);
2564 APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats.get_misses);
2565 APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats.delete_misses);
2566 APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats.delete_hits);
2567 APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats.incr_misses);
2568 APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats.incr_hits);
2569 APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats.decr_misses);
2570 APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats.decr_hits);
2571 APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats.cas_misses);
2572 APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats.cas_hits);
2573 APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats.cas_badval);
2574 APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats.touch_hits);
2575 APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats.touch_misses);
2576 APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats.auth_cmds);
2577 APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats.auth_errors);
2578 APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats.bytes_read);
2579 APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats.bytes_written);
2580 APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings.maxbytes);
2581 APPEND_STAT("accepting_conns", "%u", stats.accepting_conns);
2582 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats.listen_disabled_num);
2583 APPEND_STAT("threads", "%d", settings.num_threads);
2584 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats.conn_yields);
2585 APPEND_STAT("hash_power_level", "%u", stats.hash_power_level);
2586 APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats.hash_bytes);
2587 APPEND_STAT("hash_is_expanding", "%u", stats.hash_is_expanding);
2588 APPEND_STAT("expired_unfetched", "%llu", stats.expired_unfetched);
2589 APPEND_STAT("evicted_unfetched", "%llu", stats.evicted_unfetched);
2590 if (settings.slab_reassign) {
2591 APPEND_STAT("slab_reassign_running", "%u", stats.slab_reassign_running);
2592 APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved);
2593 }
2594 STATS_UNLOCK();
2595 }
2596
2597 static void process_stat_settings(ADD_STAT add_stats, void *c) {
2598 assert(add_stats);
2599 APPEND_STAT("maxbytes", "%u", (unsigned int)settings.maxbytes);
2600 APPEND_STAT("maxconns", "%d", settings.maxconns);
2601 APPEND_STAT("tcpport", "%d", settings.port);
2602 APPEND_STAT("udpport", "%d", settings.udpport);
2603 APPEND_STAT("inter", "%s", settings.inter ? settings.inter : "NULL");
2604 APPEND_STAT("verbosity", "%d", settings.verbose);
2605 APPEND_STAT("oldest", "%lu", (unsigned long)settings.oldest_live);
2606 APPEND_STAT("evictions", "%s", settings.evict_to_free ? "on" : "off");
2607 APPEND_STAT("domain_socket", "%s",
2608 settings.socketpath ? settings.socketpath : "NULL");
2609 APPEND_STAT("umask", "%o", settings.access);
2610 APPEND_STAT("growth_factor", "%.2f", settings.factor);
2611 APPEND_STAT("chunk_size", "%d", settings.chunk_size);
2612 APPEND_STAT("num_threads", "%d", settings.num_threads);
2613 APPEND_STAT("num_threads_per_udp", "%d", settings.num_threads_per_udp);
2614 APPEND_STAT("stat_key_prefix", "%c", settings.prefix_delimiter);
2615 APPEND_STAT("detail_enabled", "%s",
2616 settings.detail_enabled ? "yes" : "no");
2617 APPEND_STAT("reqs_per_event", "%d", settings.reqs_per_event);
2618 APPEND_STAT("cas_enabled", "%s", settings.use_cas ? "yes" : "no");
2619 APPEND_STAT("tcp_backlog", "%d", settings.backlog);
2620 APPEND_STAT("binding_protocol", "%s",
2621 prot_text(settings.binding_protocol));
2622 APPEND_STAT("auth_enabled_sasl", "%s", settings.sasl ? "yes" : "no");
2623 APPEND_STAT("item_size_max", "%d", settings.item_size_max);
2624 APPEND_STAT("maxconns_fast", "%s", settings.maxconns_fast ? "yes" : "no");
2625 APPEND_STAT("hashpower_init", "%d", settings.hashpower_init);
2626 APPEND_STAT("slab_reassign", "%s", settings.slab_reassign ? "yes" : "no");
2627 APPEND_STAT("slab_automove", "%d", settings.slab_automove);
2628 }
2629
2630 static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
2631 const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
2632 assert(c != NULL);
2633
2634 if (ntokens < 2) {
2635 out_string(c, "CLIENT_ERROR bad command line");
2636 return;
2637 }
2638
2639 if (ntokens == 2) {
2640 server_stats(&append_stats, c);
2641 (void)get_stats(NULL, 0, &append_stats, c);
2642 } else if (strcmp(subcommand, "reset") == 0) {
2643 stats_reset();
2644 out_string(c, "RESET");
2645 return ;
2646 } else if (strcmp(subcommand, "detail") == 0) {
2647 /* NOTE: how to tackle detail with binary? */
2648 if (ntokens < 4)
2649 process_stats_detail(c, ""); /* outputs the error message */
2650 else
2651 process_stats_detail(c, tokens[2].value);
2652 /* Output already generated */
2653 return ;
2654 } else if (strcmp(subcommand, "settings") == 0) {
2655 process_stat_settings(&append_stats, c);
2656 } else if (strcmp(subcommand, "cachedump") == 0) {
2657 char *buf;
2658 unsigned int bytes, id, limit = 0;
2659
2660 if (ntokens < 5) {
2661 out_string(c, "CLIENT_ERROR bad command line");
2662 return;
2663 }
2664
2665 if (!safe_strtoul(tokens[2].value, &id) ||
2666 !safe_strtoul(tokens[3].value, &limit)) {
2667 out_string(c, "CLIENT_ERROR bad command line format");
2668 return;
2669 }
2670
2671 if (id >= POWER_LARGEST) {
2672 out_string(c, "CLIENT_ERROR Illegal slab id");
2673 return;
2674 }
2675
2676 buf = item_cachedump(id, limit, &bytes);
2677 write_and_free(c, buf, bytes);
2678 return ;
2679 } else {
2680 /* getting here means that the subcommand is either engine specific or
2681 is invalid. query the engine and see. */
2682 if (get_stats(subcommand, strlen(subcommand), &append_stats, c)) {
2683 if (c->stats.buffer == NULL) {
2684 out_string(c, "SERVER_ERROR out of memory writing stats");
2685 } else {
2686 write_and_free(c, c->stats.buffer, c->stats.offset);
2687 c->stats.buffer = NULL;
2688 }
2689 } else {
2690 out_string(c, "ERROR");
2691 }
2692 return ;
2693 }
2694
2695 /* append terminator and start the transfer */
2696 append_stats(NULL, 0, NULL, 0, c);
2697
2698 if (c->stats.buffer == NULL) {
2699 out_string(c, "SERVER_ERROR out of memory writing stats");
2700 } else {
2701 write_and_free(c, c->stats.buffer, c->stats.offset);
2702 c->stats.buffer = NULL;
2703 }
2704 }
2705
2706 /* ntokens is overwritten here... shrug.. */
2707 static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens, bool return_cas) {
2708 char *key;
2709 size_t nkey;
2710 int i = 0;
2711 item *it;
2712 token_t *key_token = &tokens[KEY_TOKEN];
2713 char *suffix;
2714 assert(c != NULL);
2715
2716 do {
2717 while(key_token->length != 0) {
2718
2719 key = key_token->value;
2720 nkey = key_token->length;
2721
2722 if(nkey > KEY_MAX_LENGTH) {
2723 out_string(c, "CLIENT_ERROR bad command line format");
2724 return;
2725 }
2726
2727 it = item_get(key, nkey);
2728 if (settings.detail_enabled) {
2729 stats_prefix_record_get(key, nkey, NULL != it);
2730 }
2731 if (it) {
2732 if (i >= c->isize) {
2733 item **new_list = realloc(c->ilist, sizeof(item *) * c->isize * 2);
2734 if (new_list) {
2735 c->isize *= 2;
2736 c->ilist = new_list;
2737 } else {
2738 item_remove(it);
2739 break;
2740 }
2741 }
2742
2743 /*
2744 * Construct the response. Each hit adds three elements to the
2745 * outgoing data list:
2746 * "VALUE "
2747 * key
2748 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
2749 */
2750
2751 if (return_cas)
2752 {
2753 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2754 it->nbytes, ITEM_get_cas(it));
2755 /* Goofy mid-flight realloc. */
2756 if (i >= c->suffixsize) {
2757 char **new_suffix_list = realloc(c->suffixlist,
2758 sizeof(char *) * c->suffixsize * 2);
2759 if (new_suffix_list) {
2760 c->suffixsize *= 2;
2761 c->suffixlist = new_suffix_list;
2762 } else {
2763 item_remove(it);
2764 break;
2765 }
2766 }
2767
2768 suffix = cache_alloc(c->thread->suffix_cache);
2769 if (suffix == NULL) {
2770 out_string(c, "SERVER_ERROR out of memory making CAS suffix");
2771 item_remove(it);
2772 return;
2773 }
2774 *(c->suffixlist + i) = suffix;
2775 int suffix_len = snprintf(suffix, SUFFIX_SIZE,
2776 " %llu\r\n",
2777 (unsigned long long)ITEM_get_cas(it));
2778 if (add_iov(c, "VALUE ", 6) != 0 ||
2779 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2780 add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0 ||
2781 add_iov(c, suffix, suffix_len) != 0 ||
2782 add_iov(c, ITEM_data(it), it->nbytes) != 0)
2783 {
2784 item_remove(it);
2785 break;
2786 }
2787 }
2788 else
2789 {
2790 MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
2791 it->nbytes, ITEM_get_cas(it));
2792 if (add_iov(c, "VALUE ", 6) != 0 ||
2793 add_iov(c, ITEM_key(it), it->nkey) != 0 ||
2794 add_iov(c, ITEM_suffix(it), it->nsuffix + it->nbytes) != 0)
2795 {
2796 item_remove(it);
2797 break;
2798 }
2799 }
2800
2801
2802 if (settings.verbose > 1)
2803 fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
2804
2805 /* item_get() has incremented it->refcount for us */
2806 pthread_mutex_lock(&c->thread->stats.mutex);
2807 c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
2808 c->thread->stats.get_cmds++;
2809 pthread_mutex_unlock(&c->thread->stats.mutex);
2810 item_update(it);
2811 *(c->ilist + i) = it;
2812 i++;
2813
2814 } else {
2815 pthread_mutex_lock(&c->thread->stats.mutex);
2816 c->thread->stats.get_misses++;
2817 c->thread->stats.get_cmds++;
2818 pthread_mutex_unlock(&c->thread->stats.mutex);
2819 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
2820 }
2821
2822 key_token++;
2823 }
2824
2825 /*
2826 * If the command string hasn't been fully processed, get the next set
2827 * of tokens.
2828 */
2829 if(key_token->value != NULL) {
2830 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
2831 key_token = tokens;
2832 }
2833
2834 } while(key_token->value != NULL);
2835
2836 c->icurr = c->ilist;
2837 c->ileft = i;
2838 if (return_cas) {
2839 c->suffixcurr = c->suffixlist;
2840 c->suffixleft = i;
2841 }
2842
2843 if (settings.verbose > 1)
2844 fprintf(stderr, ">%d END\n", c->sfd);
2845
2846 /*
2847 If the loop was terminated because of out-of-memory, it is not
2848 reliable to add END\r\n to the buffer, because it might not end
2849 in \r\n. So we send SERVER_ERROR instead.
2850 */
2851 if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
2852 || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
2853 out_string(c, "SERVER_ERROR out of memory writing get response");
2854 }
2855 else {
2856 conn_set_state(c, conn_mwrite);
2857 c->msgcurr = 0;
2858 }
2859
2860 return;
2861 }
2862
2863 static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
2864 char *key;
2865 size_t nkey;
2866 unsigned int flags;
2867 int32_t exptime_int = 0;
2868 time_t exptime;
2869 int vlen;
2870 uint64_t req_cas_id=0;
2871 item *it;
2872
2873 assert(c != NULL);
2874
2875 set_noreply_maybe(c, tokens, ntokens);
2876
2877 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2878 out_string(c, "CLIENT_ERROR bad command line format");
2879 return;
2880 }
2881
2882 key = tokens[KEY_TOKEN].value;
2883 nkey = tokens[KEY_TOKEN].length;
2884
2885 if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
2886 && safe_strtol(tokens[3].value, &exptime_int)
2887 && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
2888 out_string(c, "CLIENT_ERROR bad command line format");
2889 return;
2890 }
2891
2892 /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2893 exptime = exptime_int;
2894
2895 /* Negative exptimes can underflow and end up immortal. realtime() will
2896 immediately expire values that are greater than REALTIME_MAXDELTA, but less
2897 than process_started, so lets aim for that. */
2898 if (exptime < 0)
2899 exptime = REALTIME_MAXDELTA + 1;
2900
2901 // does cas value exist?
2902 if (handle_cas) {
2903 if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
2904 out_string(c, "CLIENT_ERROR bad command line format");
2905 return;
2906 }
2907 }
2908
2909 vlen += 2;
2910 if (vlen < 0 || vlen - 2 < 0) {
2911 out_string(c, "CLIENT_ERROR bad command line format");
2912 return;
2913 }
2914
2915 if (settings.detail_enabled) {
2916 stats_prefix_record_set(key, nkey);
2917 }
2918
2919 it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
2920
2921 if (it == 0) {
2922 if (! item_size_ok(nkey, flags, vlen))
2923 out_string(c, "SERVER_ERROR object too large for cache");
2924 else
2925 out_string(c, "SERVER_ERROR out of memory storing object");
2926 /* swallow the data line */
2927 c->write_and_go = conn_swallow;
2928 c->sbytes = vlen;
2929
2930 /* Avoid stale data persisting in cache because we failed alloc.
2931 * Unacceptable for SET. Anywhere else too? */
2932 if (comm == NREAD_SET) {
2933 it = item_get(key, nkey);
2934 if (it) {
2935 item_unlink(it);
2936 item_remove(it);
2937 }
2938 }
2939
2940 return;
2941 }
2942 ITEM_set_cas(it, req_cas_id);
2943
2944 c->item = it;
2945 c->ritem = ITEM_data(it);
2946 c->rlbytes = it->nbytes;
2947 c->cmd = comm;
2948 conn_set_state(c, conn_nread);
2949 }
2950
2951 static void process_touch_command(conn *c, token_t *tokens, const size_t ntokens) {
2952 char *key;
2953 size_t nkey;
2954 int32_t exptime_int = 0;
2955 item *it;
2956
2957 assert(c != NULL);
2958
2959 set_noreply_maybe(c, tokens, ntokens);
2960
2961 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
2962 out_string(c, "CLIENT_ERROR bad command line format");
2963 return;
2964 }
2965
2966 key = tokens[KEY_TOKEN].value;
2967 nkey = tokens[KEY_TOKEN].length;
2968
2969 if (!safe_strtol(tokens[2].value, &exptime_int)) {
2970 out_string(c, "CLIENT_ERROR invalid exptime argument");
2971 return;
2972 }
2973
2974 it = item_touch(key, nkey, realtime(exptime_int));
2975 if (it) {
2976 item_update(it);
2977 pthread_mutex_lock(&c->thread->stats.mutex);
2978 c->thread->stats.touch_cmds++;
2979 c->thread->stats.slab_stats[it->slabs_clsid].touch_hits++;
2980 pthread_mutex_unlock(&c->thread->stats.mutex);
2981
2982 out_string(c, "TOUCHED");
2983 item_remove(it);
2984 } else {
2985 pthread_mutex_lock(&c->thread->stats.mutex);
2986 c->thread->stats.touch_cmds++;
2987 c->thread->stats.touch_misses++;
2988 pthread_mutex_unlock(&c->thread->stats.mutex);
2989
2990 out_string(c, "NOT_FOUND");
2991 }
2992 }
2993
2994 static void process_arithmetic_command(conn *c, token_t *tokens, const size_t ntokens, const bool incr) {
2995 char temp[INCR_MAX_STORAGE_LEN];
2996 uint64_t delta;
2997 char *key;
2998 size_t nkey;
2999
3000 assert(c != NULL);
3001
3002 set_noreply_maybe(c, tokens, ntokens);
3003
3004 if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
3005 out_string(c, "CLIENT_ERROR bad command line format");
3006 return;
3007 }
3008
3009 key = tokens[KEY_TOKEN].value;
3010 nkey = tokens[KEY_TOKEN].length;
3011
3012 if (!safe_strtoull(tokens[2].value, &delta)) {
3013 out_string(c, "CLIENT_ERROR invalid numeric delta argument");
3014 return;
3015 }
3016
3017 switch(add_delta(c, key, nkey, incr, delta, temp, NULL)) {
3018 case OK:
3019 out_string(c, temp);
3020 break;
3021 case NON_NUMERIC:
3022 out_string(c, "CLIENT_ERROR cannot increment or decrement non-numeric value");
3023 break;
3024 case EOM:
3025 out_string(c, "SERVER_ERROR out of memory");
3026 break;
3027 case DELTA_ITEM_NOT_FOUND:
3028 pthread_mutex_lock(&c->thread->stats.mutex);
3029 if (incr) {
3030 c->thread->stats.incr_misses++;
3031 } else {
3032 c->thread->stats.decr_misses++;
3033 }
3034 pthread_mutex_unlock(&c->thread->stats.mutex);
3035
3036 out_string(c, "NOT_FOUND");
3037 break;
3038 case DELTA_ITEM_CAS_MISMATCH:
3039 break; /* Should never get here */
3040 }
3041 }
3042
3043 /*
3044 * adds a delta value to a numeric item.
3045 *
3046 * c connection requesting the operation
3047 * it item to adjust
3048 * incr true to increment value, false to decrement
3049 * delta amount to adjust value by
3050 * buf buffer for response string
3051 *
3052 * returns a response string to send back to the client.
3053 */
3054 enum delta_result_type do_add_delta(conn *c, const char *key, const size_t nkey,
3055 const bool incr, const int64_t delta,
3056 char *buf, uint64_t *cas,
3057 const uint32_t hv) {
3058 char *ptr;
3059 uint64_t value;
3060 int res;
3061 item *it;
3062
3063 it = do_item_get(key, nkey, hv);
3064 if (!it) {
3065 return DELTA_ITEM_NOT_FOUND;
3066 }
3067
3068 if (cas != NULL && *cas != 0 && ITEM_get_cas(it) != *cas) {
3069 do_item_remove(it);
3070 return DELTA_ITEM_CAS_MISMATCH;
3071 }
3072
3073 ptr = ITEM_data(it);
3074
3075 if (!safe_strtoull(ptr, &value)) {
3076 do_item_remove(it);
3077 return NON_NUMERIC;
3078 }
3079
3080 if (incr) {
3081 value += delta;
3082 MEMCACHED_COMMAND_INCR(c->sfd, ITEM_key(it), it->nkey, value);
3083 } else {
3084 if(delta > value) {
3085 value = 0;
3086 } else {
3087 value -= delta;
3088 }
3089 MEMCACHED_COMMAND_DECR(c->sfd, ITEM_key(it), it->nkey, value);
3090 }
3091
3092 pthread_mutex_lock(&c->thread->stats.mutex);
3093 if (incr) {
3094 c->thread->stats.slab_stats[it->slabs_clsid].incr_hits++;
3095 } else {
3096 c->thread->stats.slab_stats[it->slabs_clsid].decr_hits++;
3097 }
3098 pthread_mutex_unlock(&c->thread->stats.mutex);
3099
3100 snprintf(buf, INCR_MAX_STORAGE_LEN, "%llu", (unsigned long long)value);
3101 res = strlen(buf);
3102 if (res + 2 > it->nbytes || it->refcount != 1) { /* need to realloc */
3103 item *new_it;
3104 new_it = item_alloc(ITEM_key(it), it->nkey, atoi(ITEM_suffix(it) + 1), it->exptime, res + 2 );
3105 if (new_it == 0) {
3106 do_item_remove(it);
3107 return EOM;
3108 }
3109 memcpy(ITEM_data(new_it), buf, res);
3110 memcpy(ITEM_data(new_it) + res, "\r\n", 2);
3111 item_replace(it, new_it, hv);
3112 // Overwrite the older item's CAS with our new CAS since we're
3113 // returning the CAS of the old item below.
3114 ITEM_set_cas(it, (settings.use_cas) ? ITEM_get_cas(new_it) : 0);
3115 do_item_remove(new_it); /* release our reference */
3116 } else { /* replace in-place */
3117 /* When changing the value without replacing the item, we
3118 need to update the CAS on the existing item. */
3119 mutex_lock(&cache_lock); /* FIXME */
3120 ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
3121 mutex_unlock(&cache_lock);
3122
3123 memcpy(ITEM_data(it), buf, res);
3124 memset(ITEM_data(it) + res, ' ', it->nbytes - res - 2);
3125 do_item_update(it);
3126 }
3127
3128 if (cas) {
3129 *cas = ITEM_get_cas(it); /* swap the incoming CAS value */
3130 }
3131 do_item_remove(it); /* release our reference */
3132 return OK;
3133 }
3134
3135 static void process_delete_command(conn *c, token_t *tokens, const size_t ntokens) {
3136 char *key;
3137 size_t nkey;
3138 item *it;
3139
3140 assert(c != NULL);
3141
3142 if (ntokens > 3) {
3143 bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value, "0") == 0;
3144 bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
3145 bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
3146 || (ntokens == 5 && hold_is_zero && sets_noreply);
3147 if (!valid) {
3148 out_string(c, "CLIENT_ERROR bad command line format. "
3149 "Usage: delete <key> [noreply]");
3150 return;
3151 }
3152 }
3153
3154
3155 key = tokens[KEY_TOKEN].value;
3156 nkey = tokens[KEY_TOKEN].length;
3157
3158 if(nkey > KEY_MAX_LENGTH) {
3159 out_string(c, "CLIENT_ERROR bad command line format");
3160 return;
3161 }
3162
3163 if (settings.detail_enabled) {
3164 stats_prefix_record_delete(key, nkey);
3165 }
3166
3167 it = item_get(key, nkey);
3168 if (it) {
3169 MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
3170
3171 pthread_mutex_lock(&c->thread->stats.mutex);
3172 c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
3173 pthread_mutex_unlock(&c->thread->stats.mutex);
3174
3175 item_unlink(it);
3176 item_remove(it); /* release our reference */
3177 out_string(c, "DELETED");
3178 } else {
3179 pthread_mutex_lock(&c->thread->stats.mutex);
3180 c->thread->stats.delete_misses++;
3181 pthread_mutex_unlock(&c->thread->stats.mutex);
3182
3183 out_string(c, "NOT_FOUND");
3184 }
3185 }
3186
3187 static void process_verbosity_command(conn *c, token_t *tokens, const size_t ntokens) {
3188 unsigned int level;
3189
3190 assert(c != NULL);
3191
3192 set_noreply_maybe(c, tokens, ntokens);
3193
3194 level = strtoul(tokens[1].value, NULL, 10);
3195 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL : level;
3196 out_string(c, "OK");
3197 return;
3198 }
3199
3200 static void process_slabs_automove_command(conn *c, token_t *tokens, const size_t ntokens) {
3201 unsigned int level;
3202
3203 assert(c != NULL);
3204
3205 set_noreply_maybe(c, tokens, ntokens);
3206
3207 level = strtoul(tokens[2].value, NULL, 10);
3208 if (level == 0) {
3209 settings.slab_automove = 0;
3210 } else if (level == 1 || level == 2) {
3211 settings.slab_automove = level;
3212 } else {
3213 out_string(c, "ERROR");
3214 return;
3215 }
3216 out_string(c, "OK");
3217 return;
3218 }
3219
3220 static void process_command(conn *c, char *command) {
3221
3222 token_t tokens[MAX_TOKENS];
3223 size_t ntokens;
3224 int comm;
3225
3226 assert(c != NULL);
3227
3228 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);
3229
3230 if (settings.verbose > 1)
3231 fprintf(stderr, "<%d %s\n", c->sfd, command);
3232
3233 /*
3234 * for commands set/add/replace, we build an item and read the data
3235 * directly into it, then continue in nread_complete().
3236 */
3237
3238 c->msgcurr = 0;
3239 c->msgused = 0;
3240 c->iovused = 0;
3241 if (add_msghdr(c) != 0) {
3242 out_string(c, "SERVER_ERROR out of memory preparing response");
3243 return;
3244 }
3245
3246 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
3247 if (ntokens >= 3 &&
3248 ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) ||
3249 (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {
3250
3251 process_get_command(c, tokens, ntokens, false);
3252
3253 } else if ((ntokens == 6 || ntokens == 7) &&
3254 ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
3255 (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
3256 (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
3257 (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
3258 (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {
3259
3260 process_update_command(c, tokens, ntokens, comm, false);
3261
3262 } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) {
3263
3264 process_update_command(c, tokens, ntokens, comm, true);
3265
3266 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {
3267
3268 process_arithmetic_command(c, tokens, ntokens, 1);
3269
3270 } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {
3271
3272 process_get_command(c, tokens, ntokens, true);
3273
3274 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {
3275
3276 process_arithmetic_command(c, tokens, ntokens, 0);
3277
3278 } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {
3279
3280 process_delete_command(c, tokens, ntokens);
3281
3282 } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "touch") == 0)) {
3283
3284 process_touch_command(c, tokens, ntokens);
3285
3286 } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {
3287
3288 process_stat(c, tokens, ntokens);
3289
3290 } else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value, "flush_all") == 0)) {
3291 time_t exptime = 0;
3292
3293 set_noreply_maybe(c, tokens, ntokens);
3294
3295 pthread_mutex_lock(&c->thread->stats.mutex);
3296 c->thread->stats.flush_cmds++;
3297 pthread_mutex_unlock(&c->thread->stats.mutex);
3298
3299 if(ntokens == (c->noreply ? 3 : 2)) {
3300 settings.oldest_live = current_time - 1;
3301 item_flush_expired();
3302 out_string(c, "OK");
3303 return;
3304 }
3305
3306 exptime = strtol(tokens[1].value, NULL, 10);
3307 if(errno == ERANGE) {
3308 out_string(c, "CLIENT_ERROR bad command line format");
3309 return;
3310 }
3311
3312 /*
3313 If exptime is zero realtime() would return zero too, and
3314 realtime(exptime) - 1 would overflow to the max unsigned
3315 value. So we process exptime == 0 the same way we do when
3316 no delay is given at all.
3317 */
3318 if (exptime > 0)
3319 settings.oldest_live = realtime(exptime) - 1;
3320 else /* exptime == 0 */
3321 settings.oldest_live = current_time - 1;
3322 item_flush_expired();
3323 out_string(c, "OK");
3324 return;
3325
3326 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "version") == 0)) {
3327
3328 out_string(c, "VERSION " VERSION);
3329
3330 } else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value, "quit") == 0)) {
3331
3332 conn_set_state(c, conn_closing);
3333
3334 } else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) {
3335 if (ntokens == 5 && strcmp(tokens[COMMAND_TOKEN + 1].value, "reassign") == 0) {
3336 int src, dst, rv;
3337
3338 if (settings.slab_reassign == false) {
3339 out_string(c, "CLIENT_ERROR slab reassignment disabled");
3340 return;
3341 }
3342
3343 src = strtol(tokens[2].value, NULL, 10);
3344 dst = strtol(tokens[3].value, NULL, 10);
3345
3346 if (errno == ERANGE) {
3347 out_string(c, "CLIENT_ERROR bad command line format");
3348 return;
3349 }
3350
3351 rv = slabs_reassign(src, dst);
3352 switch (rv) {
3353 case REASSIGN_OK:
3354 out_string(c, "OK");
3355 break;
3356 case REASSIGN_RUNNING:
3357 out_string(c, "BUSY currently processing reassign request");
3358 break;
3359 case REASSIGN_BADCLASS:
3360 out_string(c, "BADCLASS invalid src or dst class id");
3361 break;
3362 case REASSIGN_NOSPARE:
3363 out_string(c, "NOSPARE source class has no spare pages");
3364 break;
3365 case REASSIGN_SRC_DST_SAME:
3366 out_string(c, "SAME src and dst class are identical");
3367 break;
3368 }
3369 return;
3370 } else if (ntokens == 4 &&
3371 (strcmp(tokens[COMMAND_TOKEN + 1].value, "automove") == 0)) {
3372 process_slabs_automove_command(c, tokens, ntokens);
3373 } else {
3374 out_string(c, "ERROR");
3375 }
3376 } else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value, "verbosity") == 0)) {
3377 process_verbosity_command(c, tokens, ntokens);
3378 } else {
3379 out_string(c, "ERROR");
3380 }
3381 return;
3382 }
3383
3384 /*
3385 * if we have a complete line in the buffer, process it.
3386 */
3387 static int try_read_command(conn *c) {
3388 assert(c != NULL);
3389 assert(c->rcurr <= (c->rbuf + c->rsize));
3390 assert(c->rbytes > 0);
3391
3392 if (c->protocol == negotiating_prot || c->transport == udp_transport) {
3393 if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
3394 c->protocol = binary_prot;
3395 } else {
3396 c->protocol = ascii_prot;
3397 }
3398
3399 if (settings.verbose > 1) {
3400 fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
3401 prot_text(c->protocol));
3402 }
3403 }
3404
3405 if (c->protocol == binary_prot) {
3406 /* Do we have the complete packet header? */
3407 if (c->rbytes < sizeof(c->binary_header)) {
3408 /* need more data! */
3409 return 0;
3410 } else {
3411 #ifdef NEED_ALIGN
3412 if (((long)(c->rcurr)) % 8 != 0) {
3413 /* must realign input buffer */
3414 memmove(c->rbuf, c->rcurr, c->rbytes);
3415 c->rcurr = c->rbuf;
3416 if (settings.verbose > 1) {
3417 fprintf(stderr, "%d: Realign input buffer\n", c->sfd);
3418 }
3419 }
3420 #endif
3421 protocol_binary_request_header* req;
3422 req = (protocol_binary_request_header*)c->rcurr;
3423
3424 if (settings.verbose > 1) {
3425 /* Dump the packet before we convert it to host order */
3426 int ii;
3427 fprintf(stderr, "<%d Read binary protocol data:", c->sfd);
3428 for (ii = 0; ii < sizeof(req->bytes); ++ii) {
3429 if (ii % 4 == 0) {
3430 fprintf(stderr, "\n<%d ", c->sfd);
3431 }
3432 fprintf(stderr, " 0x%02x", req->bytes[ii]);
3433 }
3434 fprintf(stderr, "\n");
3435 }
3436
3437 c->binary_header = *req;
3438 c->binary_header.request.keylen = ntohs(req->request.keylen);
3439 c->binary_header.request.bodylen = ntohl(req->request.bodylen);
3440 c->binary_header.request.cas = ntohll(req->request.cas);
3441
3442 if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ) {
3443 if (settings.verbose) {
3444 fprintf(stderr, "Invalid magic: %x\n",
3445 c->binary_header.request.magic);
3446 }
3447 conn_set_state(c, conn_closing);
3448 return -1;
3449 }
3450
3451 c->msgcurr = 0;
3452 c->msgused = 0;
3453 c->iovused = 0;
3454 if (add_msghdr(c) != 0) {
3455 out_string(c, "SERVER_ERROR out of memory");
3456 return 0;
3457 }
3458
3459 c->cmd = c->binary_header.request.opcode;
3460 c->keylen = c->binary_header.request.keylen;
3461 c->opaque = c->binary_header.request.opaque;
3462 /* clear the returned cas value */
3463 c->cas = 0;
3464
3465 dispatch_bin_command(c);
3466
3467 c->rbytes -= sizeof(c->binary_header);
3468 c->rcurr += sizeof(c->binary_header);
3469 }
3470 } else {
3471 char *el, *cont;
3472
3473 if (c->rbytes == 0)
3474 return 0;
3475
3476 el = memchr(c->rcurr, '\n', c->rbytes);
3477 if (!el) {
3478 if (c->rbytes > 1024) {
3479 /*
3480 * We didn't have a '\n' in the first k. This _has_ to be a
3481 * large multiget, if not we should just nuke the connection.
3482 */
3483 char *ptr = c->rcurr;
3484 while (*ptr == ' ') { /* ignore leading whitespaces */
3485 ++ptr;
3486 }
3487
3488 if (ptr - c->rcurr > 100 ||
3489 (strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {
3490
3491 conn_set_state(c, conn_closing);
3492 return 1;
3493 }
3494 }
3495
3496 return 0;
3497 }
3498 cont = el + 1;
3499 if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
3500 el--;
3501 }
3502 *el = '\0';
3503
3504 assert(cont <= (c->rcurr + c->rbytes));
3505
3506 process_command(c, c->rcurr);
3507
3508 c->rbytes -= (cont - c->rcurr);
3509 c->rcurr = cont;
3510
3511 assert(c->rcurr <= (c->rbuf + c->rsize));
3512 }
3513
3514 return 1;
3515 }
3516
3517 /*
3518 * read a UDP request.
3519 */
3520 static enum try_read_result try_read_udp(conn *c) {
3521 int res;
3522
3523 assert(c != NULL);
3524
3525 c->request_addr_size = sizeof(c->request_addr);
3526 res = recvfrom(c->sfd, c->rbuf, c->rsize,
3527 0, &c->request_addr, &c->request_addr_size);
3528 if (res > 8) {
3529 unsigned char *buf = (unsigned char *)c->rbuf;
3530 pthread_mutex_lock(&c->thread->stats.mutex);
3531 c->thread->stats.bytes_read += res;
3532 pthread_mutex_unlock(&c->thread->stats.mutex);
3533
3534 /* Beginning of UDP packet is the request ID; save it. */
3535 c->request_id = buf[0] * 256 + buf[1];
3536
3537 /* If this is a multi-packet request, drop it. */
3538 if (buf[4] != 0 || buf[5] != 1) {
3539 out_string(c, "SERVER_ERROR multi-packet request not supported");
3540 return READ_NO_DATA_RECEIVED;
3541 }
3542
3543 /* Don't care about any of the rest of the header. */
3544 res -= 8;
3545 memmove(c->rbuf, c->rbuf + 8, res);
3546
3547 c->rbytes = res;
3548 c->rcurr = c->rbuf;
3549 return READ_DATA_RECEIVED;
3550 }
3551 return READ_NO_DATA_RECEIVED;
3552 }
3553
3554 /*
3555 * read from network as much as we can, handle buffer overflow and connection
3556 * close.
3557 * before reading, move the remaining incomplete fragment of a command
3558 * (if any) to the beginning of the buffer.
3559 *
3560 * To protect us from someone flooding a connection with bogus data causing
3561 * the connection to eat up all available memory, break out and start looking
3562 * at the data I've got after a number of reallocs...
3563 *
3564 * @return enum try_read_result
3565 */
3566 static enum try_read_result try_read_network(conn *c) {
3567 enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
3568 int res;
3569 int num_allocs = 0;
3570 assert(c != NULL);
3571
3572 if (c->rcurr != c->rbuf) {
3573 if (c->rbytes != 0) /* otherwise there's nothing to copy */
3574 memmove(c->rbuf, c->rcurr, c->rbytes);
3575 c->rcurr = c->rbuf;
3576 }
3577
3578 while (1) {
3579 if (c->rbytes >= c->rsize) {
3580 if (num_allocs == 4) {
3581 return gotdata;
3582 }
3583 ++num_allocs;
3584 char *new_rbuf = realloc(c->rbuf, c->rsize * 2);
3585 if (!new_rbuf) {
3586 if (settings.verbose > 0)
3587 fprintf(stderr, "Couldn't realloc input buffer\n");
3588 c->rbytes = 0; /* ignore what we read */
3589 out_string(c, "SERVER_ERROR out of memory reading request");
3590 c->write_and_go = conn_closing;
3591 return READ_MEMORY_ERROR;
3592 }
3593 c->rcurr = c->rbuf = new_rbuf;
3594 c->rsize *= 2;
3595 }
3596
3597 int avail = c->rsize - c->rbytes;
3598 res = read(c->sfd, c->rbuf + c->rbytes, avail);
3599 if (res > 0) {
3600 pthread_mutex_lock(&c->thread->stats.mutex);
3601 c->thread->stats.bytes_read += res;
3602 pthread_mutex_unlock(&c->thread->stats.mutex);
3603 gotdata = READ_DATA_RECEIVED;
3604 c->rbytes += res;
3605 if (res == avail) {
3606 continue;
3607 } else {
3608 break;
3609 }
3610 }
3611 if (res == 0) {
3612 return READ_ERROR;
3613 }
3614 if (res == -1) {
3615 if (errno == EAGAIN || errno == EWOULDBLOCK) {
3616 break;
3617 }
3618 return READ_ERROR;
3619 }
3620 }
3621 return gotdata;
3622 }
3623
3624 static bool update_event(conn *c, const int new_flags) {
3625 assert(c != NULL);
3626
3627 struct event_base *base = c->event.ev_base;
3628 if (c->ev_flags == new_flags)
3629 return true;
3630 if (event_del(&c->event) == -1) return false;
3631 event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
3632 event_base_set(base, &c->event);
3633 c->ev_flags = new_flags;
3634 if (event_add(&c->event, 0) == -1) return false;
3635 return true;
3636 }
3637
3638 /*
3639 * Sets whether we are listening for new connections or not.
3640 */
3641 void do_accept_new_conns(const bool do_accept) {
3642 conn *next;
3643
3644 for (next = listen_conn; next; next = next->next) {
3645 if (do_accept) {
3646 update_event(next, EV_READ | EV_PERSIST);
3647 if (listen(next->sfd, settings.backlog) != 0) {
3648 perror("listen");
3649 }
3650 }
3651 else {
3652 update_event(next, 0);
3653 if (listen(next->sfd, 0) != 0) {
3654 perror("listen");
3655 }
3656 }
3657 }
3658
3659 if (do_accept) {
3660 STATS_LOCK();
3661 stats.accepting_conns = true;
3662 STATS_UNLOCK();
3663 } else {
3664 STATS_LOCK();
3665 stats.accepting_conns = false;
3666 stats.listen_disabled_num++;
3667 STATS_UNLOCK();
3668 allow_new_conns = false;
3669 maxconns_handler(-42, 0, 0);
3670 }
3671 }
3672
3673 /*
3674 * Transmit the next chunk of data from our list of msgbuf structures.
3675 *
3676 * Returns:
3677 * TRANSMIT_COMPLETE All done writing.
3678 * TRANSMIT_INCOMPLETE More data remaining to write.
3679 * TRANSMIT_SOFT_ERROR Can't write any more right now.
3680 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3681 */
3682 static enum transmit_result transmit(conn *c) {
3683 assert(c != NULL);
3684
3685 if (c->msgcurr < c->msgused &&
3686 c->msglist[c->msgcurr].msg_iovlen == 0) {
3687 /* Finished writing the current msg; advance to the next. */
3688 c->msgcurr++;
3689 }
3690 if (c->msgcurr < c->msgused) {
3691 ssize_t res;
3692 struct msghdr *m = &c->msglist[c->msgcurr];
3693
3694 res = sendmsg(c->sfd, m, 0);
3695 if (res > 0) {
3696 pthread_mutex_lock(&c->thread->stats.mutex);
3697 c->thread->stats.bytes_written += res;
3698 pthread_mutex_unlock(&c->thread->stats.mutex);
3699
3700 /* We've written some of the data. Remove the completed
3701 iovec entries from the list of pending writes. */
3702 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
3703 res -= m->msg_iov->iov_len;
3704 m->msg_iovlen--;
3705 m->msg_iov++;
3706 }
3707
3708 /* Might have written just part of the last iovec entry;
3709 adjust it so the next write will do the rest. */
3710 if (res > 0) {
3711 m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
3712 m->msg_iov->iov_len -= res;
3713 }
3714 return TRANSMIT_INCOMPLETE;
3715 }
3716 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3717 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3718 if (settings.verbose > 0)
3719 fprintf(stderr, "Couldn't update event\n");
3720 conn_set_state(c, conn_closing);
3721 return TRANSMIT_HARD_ERROR;
3722 }
3723 return TRANSMIT_SOFT_ERROR;
3724 }
3725 /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3726 we have a real error, on which we close the connection */
3727 if (settings.verbose > 0)
3728 perror("Failed to write, and not due to blocking");
3729
3730 if (IS_UDP(c->transport))
3731 conn_set_state(c, conn_read);
3732 else
3733 conn_set_state(c, conn_closing);
3734 return TRANSMIT_HARD_ERROR;
3735 } else {
3736 return TRANSMIT_COMPLETE;
3737 }
3738 }
3739
3740 static void drive_machine(conn *c) {
3741 bool stop = false;
3742 int sfd, flags = 1;
3743 socklen_t addrlen;
3744 struct sockaddr_storage addr;
3745 int nreqs = settings.reqs_per_event;
3746 int res;
3747 const char *str;
3748
3749 assert(c != NULL);
3750
3751 while (!stop) {
3752
3753 switch(c->state) {
3754 case conn_listening:
3755 addrlen = sizeof(addr);
3756 if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
3757 if (errno == EAGAIN || errno == EWOULDBLOCK) {
3758 /* these are transient, so don't log anything */
3759 stop = true;
3760 } else if (errno == EMFILE) {
3761 if (settings.verbose > 0)
3762 fprintf(stderr, "Too many open connections\n");
3763 accept_new_conns(false);
3764 stop = true;
3765 } else {
3766 perror("accept()");
3767 stop = true;
3768 }
3769 break;
3770 }
3771 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
3772 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
3773 perror("setting O_NONBLOCK");
3774 close(sfd);
3775 break;
3776 }
3777
3778 if (settings.maxconns_fast &&
3779 stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
3780 str = "ERROR Too many open connections\r\n";
3781 res = write(sfd, str, strlen(str));
3782 close(sfd);
3783 STATS_LOCK();
3784 stats.rejected_conns++;
3785 STATS_UNLOCK();
3786 } else {
3787 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
3788 DATA_BUFFER_SIZE, tcp_transport);
3789 }
3790
3791 stop = true;
3792 break;
3793
3794 case conn_waiting:
3795 if (!update_event(c, EV_READ | EV_PERSIST)) {
3796 if (settings.verbose > 0)
3797 fprintf(stderr, "Couldn't update event\n");
3798 conn_set_state(c, conn_closing);
3799 break;
3800 }
3801
3802 conn_set_state(c, conn_read);
3803 stop = true;
3804 break;
3805
3806 case conn_read:
3807 res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
3808
3809 switch (res) {
3810 case READ_NO_DATA_RECEIVED:
3811 conn_set_state(c, conn_waiting);
3812 break;
3813 case READ_DATA_RECEIVED:
3814 conn_set_state(c, conn_parse_cmd);
3815 break;
3816 case READ_ERROR:
3817 conn_set_state(c, conn_closing);
3818 break;
3819 case READ_MEMORY_ERROR: /* Failed to allocate more memory */
3820 /* State already set by try_read_network */
3821 break;
3822 }
3823 break;
3824
3825 case conn_parse_cmd :
3826 if (try_read_command(c) == 0) {
3827 /* wee need more data! */
3828 conn_set_state(c, conn_waiting);
3829 }
3830
3831 break;
3832
3833 case conn_new_cmd:
3834 /* Only process nreqs at a time to avoid starving other
3835 connections */
3836
3837 --nreqs;
3838 if (nreqs >= 0) {
3839 reset_cmd_handler(c);
3840 } else {
3841 pthread_mutex_lock(&c->thread->stats.mutex);
3842 c->thread->stats.conn_yields++;
3843 pthread_mutex_unlock(&c->thread->stats.mutex);
3844 if (c->rbytes > 0) {
3845 /* We have already read in data into the input buffer,
3846 so libevent will most likely not signal read events
3847 on the socket (unless more data is available. As a
3848 hack we should just put in a request to write data,
3849 because that should be possible ;-)
3850 */
3851 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
3852 if (settings.verbose > 0)
3853 fprintf(stderr, "Couldn't update event\n");
3854 conn_set_state(c, conn_closing);
3855 }
3856 }
3857 stop = true;
3858 }
3859 break;
3860
3861 case conn_nread:
3862 if (c->rlbytes == 0) {
3863 complete_nread(c);
3864 break;
3865 }
3866 /* first check if we have leftovers in the conn_read buffer */
3867 if (c->rbytes > 0) {
3868 int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
3869 if (c->ritem != c->rcurr) {
3870 memmove(c->ritem, c->rcurr, tocopy);
3871 }
3872 c->ritem += tocopy;
3873 c->rlbytes -= tocopy;
3874 c->rcurr += tocopy;
3875 c->rbytes -= tocopy;
3876 if (c->rlbytes == 0) {
3877 break;
3878 }
3879 }
3880
3881 /* now try reading from the socket */
3882 res = read(c->sfd, c->ritem, c->rlbytes);
3883 if (res > 0) {
3884 pthread_mutex_lock(&c->thread->stats.mutex);
3885 c->thread->stats.bytes_read += res;
3886 pthread_mutex_unlock(&c->thread->stats.mutex);
3887 if (c->rcurr == c->ritem) {
3888 c->rcurr += res;
3889 }
3890 c->ritem += res;
3891 c->rlbytes -= res;
3892 break;
3893 }
3894 if (res == 0) { /* end of stream */
3895 conn_set_state(c, conn_closing);
3896 break;
3897 }
3898 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3899 if (!update_event(c, EV_READ | EV_PERSIST)) {
3900 if (settings.verbose > 0)
3901 fprintf(stderr, "Couldn't update event\n");
3902 conn_set_state(c, conn_closing);
3903 break;
3904 }
3905 stop = true;
3906 break;
3907 }
3908 /* otherwise we have a real error, on which we close the connection */
3909 if (settings.verbose > 0) {
3910 fprintf(stderr, "Failed to read, and not due to blocking:\n"
3911 "errno: %d %s \n"
3912 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3913 errno, strerror(errno),
3914 (long)c->rcurr, (long)c->ritem, (long)c->rbuf,
3915 (int)c->rlbytes, (int)c->rsize);
3916 }
3917 conn_set_state(c, conn_closing);
3918 break;
3919
3920 case conn_swallow:
3921 /* we are reading sbytes and throwing them away */
3922 if (c->sbytes == 0) {
3923 conn_set_state(c, conn_new_cmd);
3924 break;
3925 }
3926
3927 /* first check if we have leftovers in the conn_read buffer */
3928 if (c->rbytes > 0) {
3929 int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
3930 c->sbytes -= tocopy;
3931 c->rcurr += tocopy;
3932 c->rbytes -= tocopy;
3933 break;
3934 }
3935
3936 /* now try reading from the socket */
3937 res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
3938 if (res > 0) {
3939 pthread_mutex_lock(&c->thread->stats.mutex);
3940 c->thread->stats.bytes_read += res;
3941 pthread_mutex_unlock(&c->thread->stats.mutex);
3942 c->sbytes -= res;
3943 break;
3944 }
3945 if (res == 0) { /* end of stream */
3946 conn_set_state(c, conn_closing);
3947 break;
3948 }
3949 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
3950 if (!update_event(c, EV_READ | EV_PERSIST)) {
3951 if (settings.verbose > 0)
3952 fprintf(stderr, "Couldn't update event\n");
3953 conn_set_state(c, conn_closing);
3954 break;
3955 }
3956 stop = true;
3957 break;
3958 }
3959 /* otherwise we have a real error, on which we close the connection */
3960 if (settings.verbose > 0)
3961 fprintf(stderr, "Failed to read, and not due to blocking\n");
3962 conn_set_state(c, conn_closing);
3963 break;
3964
3965 case conn_write:
3966 /*
3967 * We want to write out a simple response. If we haven't already,
3968 * assemble it into a msgbuf list (this will be a single-entry
3969 * list for TCP or a two-entry list for UDP).
3970 */
3971 if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
3972 if (add_iov(c, c->wcurr, c->wbytes) != 0) {
3973 if (settings.verbose > 0)
3974 fprintf(stderr, "Couldn't build response\n");
3975 conn_set_state(c, conn_closing);
3976 break;
3977 }
3978 }
3979
3980 /* fall through... */
3981
3982 case conn_mwrite:
3983 if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
3984 if (settings.verbose > 0)
3985 fprintf(stderr, "Failed to build UDP headers\n");
3986 conn_set_state(c, conn_closing);
3987 break;
3988 }
3989 switch (transmit(c)) {
3990 case TRANSMIT_COMPLETE:
3991 if (c->state == conn_mwrite) {
3992 while (c->ileft > 0) {
3993 item *it = *(c->icurr);
3994 assert((it->it_flags & ITEM_SLABBED) == 0);
3995 item_remove(it);
3996 c->icurr++;
3997 c->ileft--;
3998 }
3999 while (c->suffixleft > 0) {
4000 char *suffix = *(c->suffixcurr);
4001 cache_free(c->thread->suffix_cache, suffix);
4002 c->suffixcurr++;
4003 c->suffixleft--;
4004 }
4005 /* XXX: I don't know why this wasn't the general case */
4006 if(c->protocol == binary_prot) {
4007 conn_set_state(c, c->write_and_go);
4008 } else {
4009 conn_set_state(c, conn_new_cmd);
4010 }
4011 } else if (c->state == conn_write) {
4012 if (c->write_and_free) {
4013 free(c->write_and_free);
4014 c->write_and_free = 0;
4015 }
4016 conn_set_state(c, c->write_and_go);
4017 } else {
4018 if (settings.verbose > 0)
4019 fprintf(stderr, "Unexpected state %d\n", c->state);
4020 conn_set_state(c, conn_closing);
4021 }
4022 break;
4023
4024 case TRANSMIT_INCOMPLETE:
4025 case TRANSMIT_HARD_ERROR:
4026 break; /* Continue in state machine. */
4027
4028 case TRANSMIT_SOFT_ERROR:
4029 stop = true;
4030 break;
4031 }
4032 break;
4033
4034 case conn_closing:
4035 if (IS_UDP(c->transport))
4036 conn_cleanup(c);
4037 else
4038 conn_close(c);
4039 stop = true;
4040 break;
4041
4042 case conn_max_state:
4043 assert(false);
4044 break;
4045 }
4046 }
4047
4048 return;
4049 }
4050
4051 void event_handler(const int fd, const short which, void *arg) {
4052 conn *c;
4053
4054 c = (conn *)arg;
4055 assert(c != NULL);
4056
4057 c->which = which;
4058
4059 /* sanity */
4060 if (fd != c->sfd) {
4061 if (settings.verbose > 0)
4062 fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
4063 conn_close(c);
4064 return;
4065 }
4066
4067 drive_machine(c);
4068
4069 /* wait for next event */
4070 return;
4071 }
4072
4073 static int new_socket(struct addrinfo *ai) {
4074 int sfd;
4075 int flags;
4076
4077 if ((sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol)) == -1) {
4078 return -1;
4079 }
4080
4081 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
4082 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
4083 perror("setting O_NONBLOCK");
4084 close(sfd);
4085 return -1;
4086 }
4087 return sfd;
4088 }
4089
4090
4091 /*
4092 * Sets a socket's send buffer size to the maximum allowed by the system.
4093 */
4094 static void maximize_sndbuf(const int sfd) {
4095 socklen_t intsize = sizeof(int);
4096 int last_good = 0;
4097 int min, max, avg;
4098 int old_size;
4099
4100 /* Start with the default size. */
4101 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &old_size, &intsize) != 0) {
4102 if (settings.verbose > 0)
4103 perror("getsockopt(SO_SNDBUF)");
4104 return;
4105 }
4106
4107 /* Binary-search for the real maximum. */
4108 min = old_size;
4109 max = MAX_SENDBUF_SIZE;
4110
4111 while (min <= max) {
4112 avg = ((unsigned int)(min + max)) / 2;
4113 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (void *)&avg, intsize) == 0) {
4114 last_good = avg;
4115 min = avg + 1;
4116 } else {
4117 max = avg - 1;
4118 }
4119 }
4120
4121 if (settings.verbose > 1)
4122 fprintf(stderr, "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
4123 }
4124
4125 /**
4126 * Create a socket and bind it to a specific port number
4127 * @param interface the interface to bind to
4128 * @param port the port number to bind to
4129 * @param transport the transport protocol (TCP / UDP)
4130 * @param portnumber_file A filepointer to write the port numbers to
4131 * when they are successfully added to the list of ports we
4132 * listen on.
4133 */
4134 static int server_socket(const char *interface,
4135 int port,
4136 enum network_transport transport,
4137 FILE *portnumber_file) {
4138 int sfd;
4139 struct linger ling = {0, 0};
4140 struct addrinfo *ai;
4141 struct addrinfo *next;
4142 struct addrinfo hints = { .ai_flags = AI_PASSIVE,
4143 .ai_family = AF_UNSPEC };
4144 char port_buf[NI_MAXSERV];
4145 int error;
4146 int success = 0;
4147 int flags =1;
4148
4149 hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
4150
4151 if (port == -1) {
4152 port = 0;
4153 }
4154 snprintf(port_buf, sizeof(port_buf), "%d", port);
4155 error= getaddrinfo(interface, port_buf, &hints, &ai);
4156 if (error != 0) {
4157 if (error != EAI_SYSTEM)
4158 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
4159 else
4160 perror("getaddrinfo()");
4161 return 1;
4162 }
4163
4164 for (next= ai; next; next= next->ai_next) {
4165 conn *listen_conn_add;
4166 if ((sfd = new_socket(next)) == -1) {
4167 /* getaddrinfo can return "junk" addresses,
4168 * we make sure at least one works before erroring.
4169 */
4170 if (errno == EMFILE) {
4171 /* ...unless we're out of fds */
4172 perror("server_socket");
4173 exit(EX_OSERR);
4174 }
4175 continue;
4176 }
4177
4178 #ifdef IPV6_V6ONLY
4179 if (next->ai_family == AF_INET6) {
4180 error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));
4181 if (error != 0) {
4182 perror("setsockopt");
4183 close(sfd);
4184 continue;
4185 }
4186 }
4187 #endif
4188
4189 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
4190 if (IS_UDP(transport)) {
4191 maximize_sndbuf(sfd);
4192 } else {
4193 error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
4194 if (error != 0)
4195 perror("setsockopt");
4196
4197 error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
4198 if (error != 0)
4199 perror("setsockopt");
4200
4201 error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
4202 if (error != 0)
4203 perror("setsockopt");
4204 }
4205
4206 if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
4207 if (errno != EADDRINUSE) {
4208 perror("bind()");
4209 close(sfd);
4210 freeaddrinfo(ai);
4211 return 1;
4212 }
4213 close(sfd);
4214 continue;
4215 } else {
4216 success++;
4217 if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
4218 perror("listen()");
4219 close(sfd);
4220 freeaddrinfo(ai);
4221 return 1;
4222 }
4223 if (portnumber_file != NULL &&
4224 (next->ai_addr->sa_family == AF_INET ||
4225 next->ai_addr->sa_family == AF_INET6)) {
4226 union {
4227 struct sockaddr_in in;
4228 struct sockaddr_in6 in6;
4229 } my_sockaddr;
4230 socklen_t len = sizeof(my_sockaddr);
4231 if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {
4232 if (next->ai_addr->sa_family == AF_INET) {
4233 fprintf(portnumber_file, "%s INET: %u\n",
4234 IS_UDP(transport) ? "UDP" : "TCP",
4235 ntohs(my_sockaddr.in.sin_port));
4236 } else {
4237 fprintf(portnumber_file, "%s INET6: %u\n",
4238 IS_UDP(transport) ? "UDP" : "TCP",
4239 ntohs(my_sockaddr.in6.sin6_port));
4240 }
4241 }
4242 }
4243 }
4244
4245 if (IS_UDP(transport)) {
4246 int c;
4247
4248 for (c = 0; c < settings.num_threads_per_udp; c++) {
4249 /* this is guaranteed to hit all threads because we round-robin */
4250 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
4251 UDP_READ_BUFFER_SIZE, transport);
4252 }
4253 } else {
4254 if (!(listen_conn_add = conn_new(sfd, conn_listening,
4255 EV_READ | EV_PERSIST, 1,
4256 transport, main_base))) {
4257 fprintf(stderr, "failed to create listening connection\n");
4258 exit(EXIT_FAILURE);
4259 }
4260 listen_conn_add->next = listen_conn;
4261 listen_conn = listen_conn_add;
4262 }
4263 }
4264
4265 freeaddrinfo(ai);
4266
4267 /* Return zero iff we detected no errors in starting up connections */
4268 return success == 0;
4269 }
4270
4271 static int server_sockets(int port, enum network_transport transport,
4272 FILE *portnumber_file) {
4273 if (settings.inter == NULL) {
4274 return server_socket(settings.inter, port, transport, portnumber_file);
4275 } else {
4276 // tokenize them and bind to each one of them..
4277 char *b;
4278 int ret = 0;
4279 char *list = strdup(settings.inter);
4280
4281 if (list == NULL) {
4282 fprintf(stderr, "Failed to allocate memory for parsing server interface string\n");
4283 return 1;
4284 }
4285 for (char *p = strtok_r(list, ";,", &b);
4286 p != NULL;
4287 p = strtok_r(NULL, ";,", &b)) {
4288 int the_port = port;
4289 char *s = strchr(p, ':');
4290 if (s != NULL) {
4291 *s = '\0';
4292 ++s;
4293 if (!safe_strtol(s, &the_port)) {
4294 fprintf(stderr, "Invalid port number: \"%s\"", s);
4295 return 1;
4296 }
4297 }
4298 if (strcmp(p, "*") == 0) {
4299 p = NULL;
4300 }
4301 ret |= server_socket(p, the_port, transport, portnumber_file);
4302 }
4303 free(list);
4304 return ret;
4305 }
4306 }
4307
4308 static int new_socket_unix(void) {
4309 int sfd;
4310 int flags;
4311
4312 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
4313 perror("socket()");
4314 return -1;
4315 }
4316
4317 if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
4318 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
4319 perror("setting O_NONBLOCK");
4320 close(sfd);
4321 return -1;
4322 }
4323 return sfd;
4324 }
4325
4326 static int server_socket_unix(const char *path, int access_mask) {
4327 int sfd;
4328 struct linger ling = {0, 0};
4329 struct sockaddr_un addr;
4330 struct stat tstat;
4331 int flags =1;
4332 int old_umask;
4333
4334 if (!path) {
4335 return 1;
4336 }
4337
4338 if ((sfd = new_socket_unix()) == -1) {
4339 return 1;
4340 }
4341
4342 /*
4343 * Clean up a previous socket file if we left it around
4344 */
4345 if (lstat(path, &tstat) == 0) {
4346 if (S_ISSOCK(tstat.st_mode))
4347 unlink(path);
4348 }
4349
4350 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
4351 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
4352 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
4353
4354 /*
4355 * the memset call clears nonstandard fields in some impementations
4356 * that otherwise mess things up.
4357 */
4358 memset(&addr, 0, sizeof(addr));
4359
4360 addr.sun_family = AF_UNIX;
4361 strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
4362 assert(strcmp(addr.sun_path, path) == 0);
4363 old_umask = umask( ~(access_mask&0777));
4364 if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
4365 perror("bind()");
4366 close(sfd);
4367 umask(old_umask);
4368 return 1;
4369 }
4370 umask(old_umask);
4371 if (listen(sfd, settings.backlog) == -1) {
4372 perror("listen()");
4373 close(sfd);
4374 return 1;
4375 }
4376 if (!(listen_conn = conn_new(sfd, conn_listening,
4377 EV_READ | EV_PERSIST, 1,
4378 local_transport, main_base))) {
4379 fprintf(stderr, "failed to create listening connection\n");
4380 exit(EXIT_FAILURE);
4381 }
4382
4383 return 0;
4384 }
4385
4386 /*
4387 * We keep the current time of day in a global variable that's updated by a
4388 * timer event. This saves us a bunch of time() system calls (we really only
4389 * need to get the time once a second, whereas there can be tens of thousands
4390 * of requests a second) and allows us to use server-start-relative timestamps
4391 * rather than absolute UNIX timestamps, a space savings on systems where
4392 * sizeof(time_t) > sizeof(unsigned int).
4393 */
4394 volatile rel_time_t current_time;
4395 static struct event clockevent;
4396
4397 /* libevent uses a monotonic clock when available for event scheduling. Aside
4398 * from jitter, simply ticking our internal timer here is accurate enough.
4399 * Note that users who are setting explicit dates for expiration times *must*
4400 * ensure their clocks are correct before starting memcached. */
4401 static void clock_handler(const int fd, const short which, void *arg) {
4402 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
4403 static bool initialized = false;
4404 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4405 static bool monotonic = false;
4406 static time_t monotonic_start;
4407 #endif
4408
4409 if (initialized) {
4410 /* only delete the event if it's actually there. */
4411 evtimer_del(&clockevent);
4412 } else {
4413 initialized = true;
4414 /* process_started is initialized to time() - 2. We initialize to 1 so
4415 * flush_all won't underflow during tests. */
4416 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4417 struct timespec ts;
4418 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
4419 monotonic = true;
4420 monotonic_start = ts.tv_sec - 2;
4421 }
4422 #endif
4423 }
4424
4425 evtimer_set(&clockevent, clock_handler, 0);
4426 event_base_set(main_base, &clockevent);
4427 evtimer_add(&clockevent, &t);
4428
4429 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4430 if (monotonic) {
4431 struct timespec ts;
4432 if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1)
4433 return;
4434 current_time = (rel_time_t) (ts.tv_sec - monotonic_start);
4435 return;
4436 }
4437 #endif
4438 {
4439 struct timeval tv;
4440 gettimeofday(&tv, NULL);
4441 current_time = (rel_time_t) (tv.tv_sec - process_started);
4442 }
4443 }
4444
4445 static void usage(void) {
4446 printf(PACKAGE " " VERSION "\n");
4447 printf("-p <num> TCP port number to listen on (default: 11211)\n"
4448 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
4449 "-s <file> UNIX socket path to listen on (disables network support)\n"
4450 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
4451 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
4452 " <addr> may be specified as host:port. If you don't specify\n"
4453 " a port number, the value you specified with -p or -U is\n"
4454 " used. You may specify multiple addresses separated by comma\n"
4455 " or by using -l multiple times\n"
4456
4457 "-d run as a daemon\n"
4458 "-r maximize core file limit\n"
4459 "-u <username> assume identity of <username> (only when run as root)\n"
4460 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
4461 "-M return error on memory exhausted (rather than removing items)\n"
4462 "-c <num> max simultaneous connections (default: 1024)\n"
4463 "-k lock down all paged memory. Note that there is a\n"
4464 " limit on how much memory you may lock. Trying to\n"
4465 " allocate more than that would fail, so be sure you\n"
4466 " set the limit correctly for the user you started\n"
4467 " the daemon with (not for -u <username> user;\n"
4468 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
4469 "-v verbose (print errors/warnings while in event loop)\n"
4470 "-vv very verbose (also print client commands/reponses)\n"
4471 "-vvv extremely verbose (also print internal state transitions)\n"
4472 "-h print this help and exit\n"
4473 "-i print memcached and libevent license\n"
4474 "-P <file> save PID in <file>, only used with -d option\n"
4475 "-f <factor> chunk size growth factor (default: 1.25)\n"
4476 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
4477 printf("-L Try to use large memory pages (if available). Increasing\n"
4478 " the memory page size could reduce the number of TLB misses\n"
4479 " and improve the performance. In order to get large pages\n"
4480 " from the OS, memcached will allocate the total item-cache\n"
4481 " in one large chunk.\n");
4482 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
4483 " This is used for per-prefix stats reporting. The default is\n"
4484 " \":\" (colon). If this option is specified, stats collection\n"
4485 " is turned on automatically; if not, then it may be turned on\n"
4486 " by sending the \"stats detail on\" command to the server.\n");
4487 printf("-t <num> number of threads to use (default: 4)\n");
4488 printf("-R Maximum number of requests per event, limits the number of\n"
4489 " requests process for a given connection to prevent \n"
4490 " starvation (default: 20)\n");
4491 printf("-C Disable use of CAS\n");
4492 printf("-b Set the backlog queue limit (default: 1024)\n");
4493 printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
4494 printf("-I Override the size of each slab page. Adjusts max item size\n"
4495 " (default: 1mb, min: 1k, max: 128m)\n");
4496 #ifdef ENABLE_SASL
4497 printf("-S Turn on Sasl authentication\n");
4498 #endif
4499 printf("-o Comma separated list of extended or experimental options\n"
4500 " - (EXPERIMENTAL) maxconns_fast: immediately close new\n"
4501 " connections if over maxconns limit\n"
4502 " - hashpower: An integer multiplier for how large the hash\n"
4503 " table should be. Can be grown at runtime if not big enough.\n"
4504 " Set this based on \"STAT hash_power_level\" before a \n"
4505 " restart.\n"
4506 );
4507 return;
4508 }
4509
4510 static void usage_license(void) {
4511 printf(PACKAGE " " VERSION "\n\n");
4512 printf(
4513 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
4514 "All rights reserved.\n"
4515 "\n"
4516 "Redistribution and use in source and binary forms, with or without\n"
4517 "modification, are permitted provided that the following conditions are\n"
4518 "met:\n"
4519 "\n"
4520 " * Redistributions of source code must retain the above copyright\n"
4521 "notice, this list of conditions and the following disclaimer.\n"
4522 "\n"
4523 " * Redistributions in binary form must reproduce the above\n"
4524 "copyright notice, this list of conditions and the following disclaimer\n"
4525 "in the documentation and/or other materials provided with the\n"
4526 "distribution.\n"
4527 "\n"
4528 " * Neither the name of the Danga Interactive nor the names of its\n"
4529 "contributors may be used to endorse or promote products derived from\n"
4530 "this software without specific prior written permission.\n"
4531 "\n"
4532 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
4533 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
4534 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
4535 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
4536 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
4537 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
4538 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4539 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4540 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4541 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
4542 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4543 "\n"
4544 "\n"
4545 "This product includes software developed by Niels Provos.\n"
4546 "\n"
4547 "[ libevent ]\n"
4548 "\n"
4549 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
4550 "All rights reserved.\n"
4551 "\n"
4552 "Redistribution and use in source and binary forms, with or without\n"
4553 "modification, are permitted provided that the following conditions\n"
4554 "are met:\n"
4555 "1. Redistributions of source code must retain the above copyright\n"
4556 " notice, this list of conditions and the following disclaimer.\n"
4557 "2. Redistributions in binary form must reproduce the above copyright\n"
4558 " notice, this list of conditions and the following disclaimer in the\n"
4559 " documentation and/or other materials provided with the distribution.\n"
4560 "3. All advertising materials mentioning features or use of this software\n"
4561 " must display the following acknowledgement:\n"
4562 " This product includes software developed by Niels Provos.\n"
4563 "4. The name of the author may not be used to endorse or promote products\n"
4564 " derived from this software without specific prior written permission.\n"
4565 "\n"
4566 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
4567 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
4568 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
4569 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
4570 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
4571 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4572 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4573 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4574 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
4575 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4576 );
4577
4578 return;
4579 }
4580
4581 static void save_pid(const char *pid_file) {
4582 FILE *fp;
4583 if (access(pid_file, F_OK) == 0) {
4584 if ((fp = fopen(pid_file, "r")) != NULL) {
4585 char buffer[1024];
4586 if (fgets(buffer, sizeof(buffer), fp) != NULL) {
4587 unsigned int pid;
4588 if (safe_strtoul(buffer, &pid) && kill((pid_t)pid, 0) == 0) {
4589 fprintf(stderr, "WARNING: The pid file contained the following (running) pid: %u\n", pid);
4590 }
4591 }
4592 fclose(fp);
4593 }
4594 }
4595
4596 if ((fp = fopen(pid_file, "w")) == NULL) {
4597 vperror("Could not open the pid file %s for writing", pid_file);
4598 return;
4599 }
4600
4601 fprintf(fp,"%ld\n", (long)getpid());
4602 if (fclose(fp) == -1) {
4603 vperror("Could not close the pid file %s", pid_file);
4604 }
4605 }
4606
4607 static void remove_pidfile(const char *pid_file) {
4608 if (pid_file == NULL)
4609 return;
4610
4611 if (unlink(pid_file) != 0) {
4612 vperror("Could not remove the pid file %s", pid_file);
4613 }
4614
4615 }
4616
4617 static void sig_handler(const int sig) {
4618 printf("SIGINT handled.\n");
4619 exit(EXIT_SUCCESS);
4620 }
4621
4622 #ifndef HAVE_SIGIGNORE
4623 static int sigignore(int sig) {
4624 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
4625
4626 if (sigemptyset(&sa.sa_mask) == -1 || sigaction(sig, &sa, 0) == -1) {
4627 return -1;
4628 }
4629 return 0;
4630 }
4631 #endif
4632
4633
4634 /*
4635 * On systems that supports multiple page sizes we may reduce the
4636 * number of TLB-misses by using the biggest available page size
4637 */
4638 static int enable_large_pages(void) {
4639 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
4640 int ret = -1;
4641 size_t sizes[32];
4642 int avail = getpagesizes(sizes, 32);
4643 if (avail != -1) {
4644 size_t max = sizes[0];
4645 struct memcntl_mha arg = {0};
4646 int ii;
4647
4648 for (ii = 1; ii < avail; ++ii) {
4649 if (max < sizes[ii]) {
4650 max = sizes[ii];
4651 }
4652 }
4653
4654 arg.mha_flags = 0;
4655 arg.mha_pagesize = max;
4656 arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
4657
4658 if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
4659 fprintf(stderr, "Failed to set large pages: %s\n",
4660 strerror(errno));
4661 fprintf(stderr, "Will use default page size\n");
4662 } else {
4663 ret = 0;
4664 }
4665 } else {
4666 fprintf(stderr, "Failed to get supported pagesizes: %s\n",
4667 strerror(errno));
4668 fprintf(stderr, "Will use default page size\n");
4669 }
4670
4671 return ret;
4672 #else
4673 return -1;
4674 #endif
4675 }
4676
4677 /**
4678 * Do basic sanity check of the runtime environment
4679 * @return true if no errors found, false if we can't use this env
4680 */
4681 static bool sanitycheck(void) {
4682 /* One of our biggest problems is old and bogus libevents */
4683 const char *ever = event_get_version();
4684 if (ever != NULL) {
4685 if (strncmp(ever, "1.", 2) == 0) {
4686 /* Require at least 1.3 (that's still a couple of years old) */
4687 if ((ever[2] == '1' || ever[2] == '2') && !isdigit(ever[3])) {
4688 fprintf(stderr, "You are using libevent %s.\nPlease upgrade to"
4689 " a more recent version (1.3 or newer)\n",
4690 event_get_version());
4691 return false;
4692 }
4693 }
4694 }
4695
4696 return true;
4697 }
4698
4699 int main (int argc, char **argv) {
4700 int c;
4701 bool lock_memory = false;
4702 bool do_daemonize = false;
4703 bool preallocate = false;
4704 int maxcore = 0;
4705 char *username = NULL;
4706 char *pid_file = NULL;
4707 struct passwd *pw;
4708 struct rlimit rlim;
4709 char unit = '\0';
4710 int size_max = 0;
4711 int retval = EXIT_SUCCESS;
4712 /* listening sockets */
4713 static int *l_socket = NULL;
4714
4715 /* udp socket */
4716 static int *u_socket = NULL;
4717 bool protocol_specified = false;
4718 bool tcp_specified = false;
4719 bool udp_specified = false;
4720
4721 char *subopts;
4722 char *subopts_value;
4723 enum {
4724 MAXCONNS_FAST = 0,
4725 HASHPOWER_INIT,
4726 SLAB_REASSIGN,
4727 SLAB_AUTOMOVE
4728 };
4729 char *const subopts_tokens[] = {
4730 [MAXCONNS_FAST] = "maxconns_fast",
4731 [HASHPOWER_INIT] = "hashpower",
4732 [SLAB_REASSIGN] = "slab_reassign",
4733 [SLAB_AUTOMOVE] = "slab_automove",
4734 NULL
4735 };
4736
4737 if (!sanitycheck()) {
4738 return EX_OSERR;
4739 }
4740
4741 /* handle SIGINT */
4742 signal(SIGINT, sig_handler);
4743
4744 /* init settings */
4745 settings_init();
4746
4747 /* set stderr non-buffering (for running under, say, daemontools) */
4748 setbuf(stderr, NULL);
4749
4750 /* process arguments */
4751 while (-1 != (c = getopt(argc, argv,
4752 "a:" /* access mask for unix socket */
4753 "p:" /* TCP port number to listen on */
4754 "s:" /* unix socket path to listen on */
4755 "U:" /* UDP port number to listen on */
4756 "m:" /* max memory to use for items in megabytes */
4757 "M" /* return error on memory exhausted */
4758 "c:" /* max simultaneous connections */
4759 "k" /* lock down all paged memory */
4760 "hi" /* help, licence info */
4761 "r" /* maximize core file limit */
4762 "v" /* verbose */
4763 "d" /* daemon mode */
4764 "l:" /* interface to listen on */
4765 "u:" /* user identity to run as */
4766 "P:" /* save PID in file */
4767 "f:" /* factor? */
4768 "n:" /* minimum space allocated for key+value+flags */
4769 "t:" /* threads */
4770 "D:" /* prefix delimiter? */
4771 "L" /* Large memory pages */
4772 "R:" /* max requests per event */
4773 "C" /* Disable use of CAS */
4774 "b:" /* backlog queue limit */
4775 "B:" /* Binding protocol */
4776 "I:" /* Max item size */
4777 "S" /* Sasl ON */
4778 "o:" /* Extended generic options */
4779 ))) {
4780 switch (c) {
4781 case 'a':
4782 /* access for unix domain socket, as octal mask (like chmod)*/
4783 settings.access= strtol(optarg,NULL,8);
4784 break;
4785
4786 case 'U':
4787 settings.udpport = atoi(optarg);
4788 udp_specified = true;
4789 break;
4790 case 'p':
4791 settings.port = atoi(optarg);
4792 tcp_specified = true;
4793 break;
4794 case 's':
4795 settings.socketpath = optarg;
4796 break;
4797 case 'm':
4798 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
4799 break;
4800 case 'M':
4801 settings.evict_to_free = 0;
4802 break;
4803 case 'c':
4804 settings.maxconns = atoi(optarg);
4805 break;
4806 case 'h':
4807 usage();
4808 exit(EXIT_SUCCESS);
4809 case 'i':
4810 usage_license();
4811 exit(EXIT_SUCCESS);
4812 case 'k':
4813 lock_memory = true;
4814 break;
4815 case 'v':
4816 settings.verbose++;
4817 break;
4818 case 'l':
4819 if (settings.inter != NULL) {
4820 size_t len = strlen(settings.inter) + strlen(optarg) + 2;
4821 char *p = malloc(len);
4822 if (p == NULL) {
4823 fprintf(stderr, "Failed to allocate memory\n");
4824 return 1;
4825 }
4826 snprintf(p, len, "%s,%s", settings.inter, optarg);
4827 free(settings.inter);
4828 settings.inter = p;
4829 } else {
4830 settings.inter= strdup(optarg);
4831 }
4832 break;
4833 case 'd':
4834 do_daemonize = true;
4835 break;
4836 case 'r':
4837 maxcore = 1;
4838 break;
4839 case 'R':
4840 settings.reqs_per_event = atoi(optarg);
4841 if (settings.reqs_per_event == 0) {
4842 fprintf(stderr, "Number of requests per event must be greater than 0\n");
4843 return 1;
4844 }
4845 break;
4846 case 'u':
4847 username = optarg;
4848 break;
4849 case 'P':
4850 pid_file = optarg;
4851 break;
4852 case 'f':
4853 settings.factor = atof(optarg);
4854 if (settings.factor <= 1.0) {
4855 fprintf(stderr, "Factor must be greater than 1\n");
4856 return 1;
4857 }
4858 break;
4859 case 'n':
4860 settings.chunk_size = atoi(optarg);
4861 if (settings.chunk_size == 0) {
4862 fprintf(stderr, "Chunk size must be greater than 0\n");
4863 return 1;
4864 }
4865 break;
4866 case 't':
4867 settings.num_threads = atoi(optarg);
4868 if (settings.num_threads <= 0) {
4869 fprintf(stderr, "Number of threads must be greater than 0\n");
4870 return 1;
4871 }
4872 /* There're other problems when you get above 64 threads.
4873 * In the future we should portably detect # of cores for the
4874 * default.
4875 */
4876 if (settings.num_threads > 64) {
4877 fprintf(stderr, "WARNING: Setting a high number of worker"
4878 "threads is not recommended.\n"
4879 " Set this value to the number of cores in"
4880 " your machine or less.\n");
4881 }
4882 break;
4883 case 'D':
4884 if (! optarg || ! optarg[0]) {
4885 fprintf(stderr, "No delimiter specified\n");
4886 return 1;
4887 }
4888 settings.prefix_delimiter = optarg[0];
4889 settings.detail_enabled = 1;
4890 break;
4891 case 'L' :
4892 if (enable_large_pages() == 0) {
4893 preallocate = true;
4894 } else {
4895 fprintf(stderr, "Cannot enable large pages on this system\n"
4896 "(There is no Linux support as of this version)\n");
4897 return 1;
4898 }
4899 break;
4900 case 'C' :
4901 settings.use_cas = false;
4902 break;
4903 case 'b' :
4904 settings.backlog = atoi(optarg);
4905 break;
4906 case 'B':
4907 protocol_specified = true;
4908 if (strcmp(optarg, "auto") == 0) {
4909 settings.binding_protocol = negotiating_prot;
4910 } else if (strcmp(optarg, "binary") == 0) {
4911 settings.binding_protocol = binary_prot;
4912 } else if (strcmp(optarg, "ascii") == 0) {
4913 settings.binding_protocol = ascii_prot;
4914 } else {
4915 fprintf(stderr, "Invalid value for binding protocol: %s\n"
4916 " -- should be one of auto, binary, or ascii\n", optarg);
4917 exit(EX_USAGE);
4918 }
4919 break;
4920 case 'I':
4921 unit = optarg[strlen(optarg)-1];
4922 if (unit == 'k' || unit == 'm' ||
4923 unit == 'K' || unit == 'M') {
4924 optarg[strlen(optarg)-1] = '\0';
4925 size_max = atoi(optarg);
4926 if (unit == 'k' || unit == 'K')
4927 size_max *= 1024;
4928 if (unit == 'm' || unit == 'M')
4929 size_max *= 1024 * 1024;
4930 settings.item_size_max = size_max;
4931 } else {
4932 settings.item_size_max = atoi(optarg);
4933 }
4934 if (settings.item_size_max < 1024) {
4935 fprintf(stderr, "Item max size cannot be less than 1024 bytes.\n");
4936 return 1;
4937 }
4938 if (settings.item_size_max > 1024 * 1024 * 128) {
4939 fprintf(stderr, "Cannot set item size limit higher than 128 mb.\n");
4940 return 1;
4941 }
4942 if (settings.item_size_max > 1024 * 1024) {
4943 fprintf(stderr, "WARNING: Setting item max size above 1MB is not"
4944 " recommended!\n"
4945 " Raising this limit increases the minimum memory requirements\n"
4946 " and will decrease your memory efficiency.\n"
4947 );
4948 }
4949 break;
4950 case 'S': /* set Sasl authentication to true. Default is false */
4951 #ifndef ENABLE_SASL
4952 fprintf(stderr, "This server is not built with SASL support.\n");
4953 exit(EX_USAGE);
4954 #endif
4955 settings.sasl = true;
4956 break;
4957 case 'o': /* It's sub-opts time! */
4958 subopts = optarg;
4959
4960 while (*subopts != '\0') {
4961
4962 switch (getsubopt(&subopts, subopts_tokens, &subopts_value)) {
4963 case MAXCONNS_FAST:
4964 settings.maxconns_fast = true;
4965 break;
4966 case HASHPOWER_INIT:
4967 if (subopts_value == NULL) {
4968 fprintf(stderr, "Missing numeric argument for hashpower\n");
4969 return 1;
4970 }
4971 settings.hashpower_init = atoi(subopts_value);
4972 if (settings.hashpower_init < 12) {
4973 fprintf(stderr, "Initial hashtable multiplier of %d is too low\n",
4974 settings.hashpower_init);
4975 return 1;
4976 } else if (settings.hashpower_init > 64) {
4977 fprintf(stderr, "Initial hashtable multiplier of %d is too high\n"
4978 "Choose a value based on \"STAT hash_power_level\" from a running instance\n",
4979 settings.hashpower_init);
4980 return 1;
4981 }
4982 break;
4983 case SLAB_REASSIGN:
4984 settings.slab_reassign = true;
4985 break;
4986 case SLAB_AUTOMOVE:
4987 if (subopts_value == NULL) {
4988 settings.slab_automove = 1;
4989 break;
4990 }
4991 settings.slab_automove = atoi(subopts_value);
4992 if (settings.slab_automove < 0 || settings.slab_automove > 2) {
4993 fprintf(stderr, "slab_automove must be between 0 and 2\n");
4994 return 1;
4995 }
4996 break;
4997 default:
4998 printf("Illegal suboption \"%s\"\n", subopts_value);
4999 return 1;
5000 }
5001
5002 }
5003 break;
5004 default:
5005 fprintf(stderr, "Illegal argument \"%c\"\n", c);
5006 return 1;
5007 }
5008 }
5009
5010 /*
5011 * Use one workerthread to serve each UDP port if the user specified
5012 * multiple ports
5013 */
5014 if (settings.inter != NULL && strchr(settings.inter, ',')) {
5015 settings.num_threads_per_udp = 1;
5016 } else {
5017 settings.num_threads_per_udp = settings.num_threads;
5018 }
5019
5020 if (settings.sasl) {
5021 if (!protocol_specified) {
5022 settings.binding_protocol = binary_prot;
5023 } else {
5024 if (settings.binding_protocol != binary_prot) {
5025 fprintf(stderr, "ERROR: You cannot allow the ASCII protocol while using SASL.\n");
5026 exit(EX_USAGE);
5027 }
5028 }
5029 }
5030
5031 if (tcp_specified && !udp_specified) {
5032 settings.udpport = settings.port;
5033 } else if (udp_specified && !tcp_specified) {
5034 settings.port = settings.udpport;
5035 }
5036
5037 if (maxcore != 0) {
5038 struct rlimit rlim_new;
5039 /*
5040 * First try raising to infinity; if that fails, try bringing
5041 * the soft limit to the hard.
5042 */
5043 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
5044 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
5045 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
5046 /* failed. try raising just to the old max */
5047 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
5048 (void)setrlimit(RLIMIT_CORE, &rlim_new);
5049 }
5050 }
5051 /*
5052 * getrlimit again to see what we ended up with. Only fail if
5053 * the soft limit ends up 0, because then no core files will be
5054 * created at all.
5055 */
5056
5057 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
5058 fprintf(stderr, "failed to ensure corefile creation\n");
5059 exit(EX_OSERR);
5060 }
5061 }
5062
5063 /*
5064 * If needed, increase rlimits to allow as many connections
5065 * as needed.
5066 */
5067
5068 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
5069 fprintf(stderr, "failed to getrlimit number of files\n");
5070 exit(EX_OSERR);
5071 } else {
5072 rlim.rlim_cur = settings.maxconns;
5073 rlim.rlim_max = settings.maxconns;
5074 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
5075 fprintf(stderr, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n");
5076 exit(EX_OSERR);
5077 }
5078 }
5079
5080 /* lose root privileges if we have them */
5081 if (getuid() == 0 || geteuid() == 0) {
5082 if (username == 0 || *username == '\0') {
5083 fprintf(stderr, "can't run as root without the -u switch\n");
5084 exit(EX_USAGE);
5085 }
5086 if ((pw = getpwnam(username)) == 0) {
5087 fprintf(stderr, "can't find the user %s to switch to\n", username);
5088 exit(EX_NOUSER);
5089 }
5090 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
5091 fprintf(stderr, "failed to assume identity of user %s\n", username);
5092 exit(EX_OSERR);
5093 }
5094 }
5095
5096 /* Initialize Sasl if -S was specified */
5097 if (settings.sasl) {
5098 init_sasl();
5099 }
5100
5101 /* daemonize if requested */
5102 /* if we want to ensure our ability to dump core, don't chdir to / */
5103 if (do_daemonize) {
5104 if (sigignore(SIGHUP) == -1) {
5105 perror("Failed to ignore SIGHUP");
5106 }
5107 if (daemonize(maxcore, settings.verbose) == -1) {
5108 fprintf(stderr, "failed to daemon() in order to daemonize\n");
5109 exit(EXIT_FAILURE);
5110 }
5111 }
5112
5113 /* lock paged memory if needed */
5114 if (lock_memory) {
5115 #ifdef HAVE_MLOCKALL
5116 int res = mlockall(MCL_CURRENT | MCL_FUTURE);
5117 if (res != 0) {
5118 fprintf(stderr, "warning: -k invalid, mlockall() failed: %s\n",
5119 strerror(errno));
5120 }
5121 #else
5122 fprintf(stderr, "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
5123 #endif
5124 }
5125
5126 /* initialize main thread libevent instance */
5127 main_base = event_init();
5128
5129 /* initialize other stuff */
5130 stats_init();
5131 assoc_init(settings.hashpower_init);
5132 conn_init();
5133 slabs_init(settings.maxbytes, settings.factor, preallocate);
5134
5135 /*
5136 * ignore SIGPIPE signals; we can use errno == EPIPE if we
5137 * need that information
5138 */
5139 if (sigignore(SIGPIPE) == -1) {
5140 perror("failed to ignore SIGPIPE; sigaction");
5141 exit(EX_OSERR);
5142 }
5143 /* start up worker threads if MT mode */
5144 thread_init(settings.num_threads, main_base);
5145
5146 if (start_assoc_maintenance_thread() == -1) {
5147 exit(EXIT_FAILURE);
5148 }
5149
5150 if (settings.slab_reassign &&
5151 start_slab_maintenance_thread() == -1) {
5152 exit(EXIT_FAILURE);
5153 }
5154
5155 /* initialise clock event */
5156 clock_handler(0, 0, 0);
5157
5158 /* create unix mode sockets after dropping privileges */
5159 if (settings.socketpath != NULL) {
5160 errno = 0;
5161 if (server_socket_unix(settings.socketpath,settings.access)) {
5162 vperror("failed to listen on UNIX socket: %s", settings.socketpath);
5163 exit(EX_OSERR);
5164 }
5165 }
5166
5167 /* create the listening socket, bind it, and init */
5168 if (settings.socketpath == NULL) {
5169 const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");
5170 char temp_portnumber_filename[PATH_MAX];
5171 FILE *portnumber_file = NULL;
5172
5173 if (portnumber_filename != NULL) {
5174 snprintf(temp_portnumber_filename,
5175 sizeof(temp_portnumber_filename),
5176 "%s.lck", portnumber_filename);
5177
5178 portnumber_file = fopen(temp_portnumber_filename, "a");
5179 if (portnumber_file == NULL) {
5180 fprintf(stderr, "Failed to open \"%s\": %s\n",
5181 temp_portnumber_filename, strerror(errno));
5182 }
5183 }
5184
5185 errno = 0;
5186 if (settings.port && server_sockets(settings.port, tcp_transport,
5187 portnumber_file)) {
5188 vperror("failed to listen on TCP port %d", settings.port);
5189 exit(EX_OSERR);
5190 }
5191
5192 /*
5193 * initialization order: first create the listening sockets
5194 * (may need root on low ports), then drop root if needed,
5195 * then daemonise if needed, then init libevent (in some cases
5196 * descriptors created by libevent wouldn't survive forking).
5197 */
5198
5199 /* create the UDP listening socket and bind it */
5200 errno = 0;
5201 if (settings.udpport && server_sockets(settings.udpport, udp_transport,
5202 portnumber_file)) {
5203 vperror("failed to listen on UDP port %d", settings.udpport);
5204 exit(EX_OSERR);
5205 }
5206
5207 if (portnumber_file) {
5208 fclose(portnumber_file);
5209 rename(temp_portnumber_filename, portnumber_filename);
5210 }
5211 }
5212
5213 /* Give the sockets a moment to open. I know this is dumb, but the error
5214 * is only an advisory.
5215 */
5216 usleep(1000);
5217 if (stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {
5218 fprintf(stderr, "Maxconns setting is too low, use -c to increase.\n");
5219 exit(EXIT_FAILURE);
5220 }
5221
5222 if (pid_file != NULL) {
5223 save_pid(pid_file);
5224 }
5225
5226 /* Drop privileges no longer needed */
5227 drop_privileges();
5228
5229 /* enter the event loop */
5230 if (event_base_loop(main_base, 0) != 0) {
5231 retval = EXIT_FAILURE;
5232 }
5233
5234 stop_assoc_maintenance_thread();
5235
5236 /* remove the PID file if we're a daemon */
5237 if (do_daemonize)
5238 remove_pidfile(pid_file);
5239 /* Clean up strdup() call for bind() address */
5240 if (settings.inter)
5241 free(settings.inter);
5242 if (l_socket)
5243 free(l_socket);
5244 if (u_socket)
5245 free(u_socket);
5246
5247 return retval;
5248 }