1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * memcached - memory caching daemon
5 * http://www.danga.com/memcached/
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
16 #include "memcached.h"
18 #include <sys/socket.h>
21 #include <sys/resource.h>
26 /* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h. */
28 #ifndef _P1003_1B_VISIBLE
29 #define _P1003_1B_VISIBLE
31 /* need this to get IOV_MAX on some platforms. */
32 #ifndef __need_IOV_MAX
33 #define __need_IOV_MAX
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
50 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
52 #if defined(__FreeBSD__) || defined(__APPLE__)
58 * forward declarations
60 static void drive_machine(conn
*c
);
61 static int new_socket(struct addrinfo
*ai
);
62 static int try_read_command(conn
*c
);
64 enum try_read_result
{
66 READ_NO_DATA_RECEIVED
,
67 READ_ERROR
, /** an error occured (on the socket) (or client closed connection) */
68 READ_MEMORY_ERROR
/** failed to allocate more memory */
71 static enum try_read_result
try_read_network(conn
*c
);
72 static enum try_read_result
try_read_udp(conn
*c
);
74 static void conn_set_state(conn
*c
, enum conn_states state
);
77 static void stats_init(void);
78 static void server_stats(ADD_STAT add_stats
, conn
*c
);
79 static void process_stat_settings(ADD_STAT add_stats
, void *c
);
83 static void settings_init(void);
85 /* event handling, network IO */
86 static void event_handler(const int fd
, const short which
, void *arg
);
87 static void conn_close(conn
*c
);
88 static void conn_init(void);
89 static bool update_event(conn
*c
, const int new_flags
);
90 static void complete_nread(conn
*c
);
91 static void process_command(conn
*c
, char *command
);
92 static void write_and_free(conn
*c
, char *buf
, int bytes
);
93 static int ensure_iov_space(conn
*c
);
94 static int add_iov(conn
*c
, const void *buf
, int len
);
95 static int add_msghdr(conn
*c
);
98 static void conn_free(conn
*c
);
100 /** exported globals **/
102 struct settings settings
;
103 time_t process_started
; /* when the process was started */
105 struct slab_rebalance slab_rebal
;
106 volatile int slab_rebalance_signal
;
108 /** file scope variables **/
109 static conn
*listen_conn
= NULL
;
110 static struct event_base
*main_base
;
112 enum transmit_result
{
113 TRANSMIT_COMPLETE
, /** All done writing. */
114 TRANSMIT_INCOMPLETE
, /** More data remaining to write. */
115 TRANSMIT_SOFT_ERROR
, /** Can't write any more right now. */
116 TRANSMIT_HARD_ERROR
/** Can't write (c->state is set to conn_closing) */
119 static enum transmit_result
transmit(conn
*c
);
121 /* This reduces the latency without adding lots of extra wiring to be able to
122 * notify the listener thread of when to listen again.
123 * Also, the clock timer could be broken out into its own thread and we
124 * can block the listener via a condition.
126 static volatile bool allow_new_conns
= true;
127 static struct event maxconnsevent
;
128 #ifndef __INTEL_COMPILER
129 #pragma GCC diagnostic ignored "-Wunused-parameter"
131 static void maxconns_handler(const int fd
, const short which
, void *arg
) {
132 struct timeval t
= {.tv_sec
= 0, .tv_usec
= 10000};
134 if (fd
== -42 || allow_new_conns
== false) {
135 /* reschedule in 10ms if we need to keep polling */
136 evtimer_set(&maxconnsevent
, maxconns_handler
, 0);
137 event_base_set(main_base
, &maxconnsevent
);
138 evtimer_add(&maxconnsevent
, &t
);
140 evtimer_del(&maxconnsevent
);
141 accept_new_conns(true);
145 #define REALTIME_MAXDELTA 60*60*24*30
148 * given time value that's either unix time or delta from current unix time, return
149 * unix time. Use the fact that delta can't exceed one month (and real time value can't
152 static rel_time_t
realtime(const time_t exptime
) {
153 /* no. of seconds in 30 days - largest possible delta exptime */
155 if (exptime
== 0) return 0; /* 0 means never expire */
157 if (exptime
> REALTIME_MAXDELTA
) {
158 /* if item expiration is at/before the server started, give it an
159 expiration time of 1 second after the server started.
160 (because 0 means don't expire). without this, we'd
161 underflow and wrap around to some large value way in the
162 future, effectively making items expiring in the past
163 really expiring never */
164 if (exptime
<= process_started
)
165 return (rel_time_t
)1;
166 return (rel_time_t
)(exptime
- process_started
);
168 return (rel_time_t
)(exptime
+ current_time
);
172 static void stats_init(void) {
173 stats
.curr_items
= stats
.total_items
= stats
.curr_conns
= stats
.total_conns
= stats
.conn_structs
= 0;
174 stats
.get_cmds
= stats
.set_cmds
= stats
.get_hits
= stats
.get_misses
= stats
.evictions
= stats
.reclaimed
= 0;
175 stats
.touch_cmds
= stats
.touch_misses
= stats
.touch_hits
= stats
.rejected_conns
= 0;
176 stats
.curr_bytes
= stats
.listen_disabled_num
= 0;
177 stats
.hash_power_level
= stats
.hash_bytes
= stats
.hash_is_expanding
= 0;
178 stats
.expired_unfetched
= stats
.evicted_unfetched
= 0;
179 stats
.slabs_moved
= 0;
180 stats
.accepting_conns
= true; /* assuming we start in this state. */
181 stats
.slab_reassign_running
= false;
183 /* make the time we started always be 2 seconds before we really
184 did, so time(0) - time.started is never zero. if so, things
185 like 'settings.oldest_live' which act as booleans as well as
186 values are now false in boolean context... */
187 process_started
= time(0) - 2;
191 static void stats_reset(void) {
193 stats
.total_items
= stats
.total_conns
= 0;
194 stats
.rejected_conns
= 0;
197 stats
.listen_disabled_num
= 0;
198 stats_prefix_clear();
200 threadlocal_stats_reset();
204 static void settings_init(void) {
205 settings
.use_cas
= true;
206 settings
.access
= 0700;
207 settings
.port
= 11211;
208 settings
.udpport
= 11211;
209 /* By default this string should be NULL for getaddrinfo() */
210 settings
.inter
= NULL
;
211 settings
.maxbytes
= 64 * 1024 * 1024; /* default is 64MB */
212 settings
.maxconns
= 1024; /* to limit connections-related memory to about 5MB */
213 settings
.verbose
= 0;
214 settings
.oldest_live
= 0;
215 settings
.evict_to_free
= 1; /* push old items out of cache when memory runs out */
216 settings
.socketpath
= NULL
; /* by default, not using a unix socket */
217 settings
.factor
= 1.25;
218 settings
.chunk_size
= 48; /* space for a modest key and value */
219 settings
.num_threads
= 4; /* N workers */
220 settings
.num_threads_per_udp
= 0;
221 settings
.prefix_delimiter
= ':';
222 settings
.detail_enabled
= 0;
223 settings
.reqs_per_event
= 20;
224 settings
.backlog
= 1024;
225 settings
.binding_protocol
= negotiating_prot
;
226 settings
.item_size_max
= 1024 * 1024; /* The famous 1MB upper limit. */
227 settings
.maxconns_fast
= false;
228 settings
.hashpower_init
= 0;
229 settings
.slab_reassign
= false;
230 settings
.slab_automove
= false;
234 * Adds a message header to a connection.
236 * Returns 0 on success, -1 on out-of-memory.
238 static int add_msghdr(conn
*c
)
244 if (c
->msgsize
== c
->msgused
) {
245 msg
= realloc(c
->msglist
, c
->msgsize
* 2 * sizeof(struct msghdr
));
252 msg
= c
->msglist
+ c
->msgused
;
254 /* this wipes msg_iovlen, msg_control, msg_controllen, and
255 msg_flags, the last 3 of which aren't defined on solaris: */
256 memset(msg
, 0, sizeof(struct msghdr
));
258 msg
->msg_iov
= &c
->iov
[c
->iovused
];
260 if (c
->request_addr_size
> 0) {
261 msg
->msg_name
= &c
->request_addr
;
262 msg
->msg_namelen
= c
->request_addr_size
;
268 if (IS_UDP(c
->transport
)) {
269 /* Leave room for the UDP header, which we'll fill in later. */
270 return add_iov(c
, NULL
, UDP_HEADER_SIZE
);
278 * Free list management for connections.
281 static conn
**freeconns
;
282 static int freetotal
;
284 /* Lock for connection freelist */
285 static pthread_mutex_t conn_lock
= PTHREAD_MUTEX_INITIALIZER
;
288 static void conn_init(void) {
291 if ((freeconns
= calloc(freetotal
, sizeof(conn
*))) == NULL
) {
292 fprintf(stderr
, "Failed to allocate connection structures\n");
298 * Returns a connection from the freelist, if any.
300 conn
*conn_from_freelist() {
303 pthread_mutex_lock(&conn_lock
);
305 c
= freeconns
[--freecurr
];
309 pthread_mutex_unlock(&conn_lock
);
315 * Adds a connection to the freelist. 0 = success.
317 bool conn_add_to_freelist(conn
*c
) {
319 pthread_mutex_lock(&conn_lock
);
320 if (freecurr
< freetotal
) {
321 freeconns
[freecurr
++] = c
;
324 /* try to enlarge free connections array */
325 size_t newsize
= freetotal
* 2;
326 conn
**new_freeconns
= realloc(freeconns
, sizeof(conn
*) * newsize
);
329 freeconns
= new_freeconns
;
330 freeconns
[freecurr
++] = c
;
334 pthread_mutex_unlock(&conn_lock
);
338 static const char *prot_text(enum protocol prot
) {
339 const char *rv
= "unknown";
347 case negotiating_prot
:
348 rv
= "auto-negotiate";
356 conn
*conn_new(const int sfd
, enum conn_states init_state
,
357 const int event_flags
,
358 const int read_buffer_size
, enum network_transport transport
,
359 struct event_base
*base
) {
360 conn
*c
= conn_from_freelist();
363 if (!(c
= (conn
*)calloc(1, sizeof(conn
)))) {
364 fprintf(stderr
, "calloc()\n");
367 MEMCACHED_CONN_CREATE(c
);
369 c
->rbuf
= c
->wbuf
= 0;
376 c
->rsize
= read_buffer_size
;
377 c
->wsize
= DATA_BUFFER_SIZE
;
378 c
->isize
= ITEM_LIST_INITIAL
;
379 c
->suffixsize
= SUFFIX_LIST_INITIAL
;
380 c
->iovsize
= IOV_LIST_INITIAL
;
381 c
->msgsize
= MSG_LIST_INITIAL
;
384 c
->rbuf
= (char *)malloc((size_t)c
->rsize
);
385 c
->wbuf
= (char *)malloc((size_t)c
->wsize
);
386 c
->ilist
= (item
**)malloc(sizeof(item
*) * c
->isize
);
387 c
->suffixlist
= (char **)malloc(sizeof(char *) * c
->suffixsize
);
388 c
->iov
= (struct iovec
*)malloc(sizeof(struct iovec
) * c
->iovsize
);
389 c
->msglist
= (struct msghdr
*)malloc(sizeof(struct msghdr
) * c
->msgsize
);
391 if (c
->rbuf
== 0 || c
->wbuf
== 0 || c
->ilist
== 0 || c
->iov
== 0 ||
392 c
->msglist
== 0 || c
->suffixlist
== 0) {
394 fprintf(stderr
, "malloc()\n");
399 stats
.conn_structs
++;
403 c
->transport
= transport
;
404 c
->protocol
= settings
.binding_protocol
;
406 /* unix socket mode doesn't need this, so zeroed out. but why
407 * is this done for every command? presumably for UDP
409 if (!settings
.socketpath
) {
410 c
->request_addr_size
= sizeof(c
->request_addr
);
412 c
->request_addr_size
= 0;
415 if (settings
.verbose
> 1) {
416 if (init_state
== conn_listening
) {
417 fprintf(stderr
, "<%d server listening (%s)\n", sfd
,
418 prot_text(c
->protocol
));
419 } else if (IS_UDP(transport
)) {
420 fprintf(stderr
, "<%d server listening (udp)\n", sfd
);
421 } else if (c
->protocol
== negotiating_prot
) {
422 fprintf(stderr
, "<%d new auto-negotiating client connection\n",
424 } else if (c
->protocol
== ascii_prot
) {
425 fprintf(stderr
, "<%d new ascii client connection.\n", sfd
);
426 } else if (c
->protocol
== binary_prot
) {
427 fprintf(stderr
, "<%d new binary client connection.\n", sfd
);
429 fprintf(stderr
, "<%d new unknown (%d) client connection\n",
436 c
->state
= init_state
;
439 c
->rbytes
= c
->wbytes
= 0;
444 c
->suffixcurr
= c
->suffixlist
;
451 c
->write_and_go
= init_state
;
452 c
->write_and_free
= 0;
457 event_set(&c
->event
, sfd
, event_flags
, event_handler
, (void *)c
);
458 event_base_set(base
, &c
->event
);
459 c
->ev_flags
= event_flags
;
461 if (event_add(&c
->event
, 0) == -1) {
462 if (conn_add_to_freelist(c
)) {
474 MEMCACHED_CONN_ALLOCATE(c
->sfd
);
479 static void conn_cleanup(conn
*c
) {
483 item_remove(c
->item
);
488 for (; c
->ileft
> 0; c
->ileft
--,c
->icurr
++) {
489 item_remove(*(c
->icurr
));
493 if (c
->suffixleft
!= 0) {
494 for (; c
->suffixleft
> 0; c
->suffixleft
--, c
->suffixcurr
++) {
495 cache_free(c
->thread
->suffix_cache
, *(c
->suffixcurr
));
499 if (c
->write_and_free
) {
500 free(c
->write_and_free
);
501 c
->write_and_free
= 0;
505 assert(settings
.sasl
);
506 sasl_dispose(&c
->sasl_conn
);
510 if (IS_UDP(c
->transport
)) {
511 conn_set_state(c
, conn_read
);
516 * Frees a connection.
518 void conn_free(conn
*c
) {
520 MEMCACHED_CONN_DESTROY(c
);
539 static void conn_close(conn
*c
) {
542 /* delete the event, the socket and the conn */
543 event_del(&c
->event
);
545 if (settings
.verbose
> 1)
546 fprintf(stderr
, "<%d connection closed.\n", c
->sfd
);
548 MEMCACHED_CONN_RELEASE(c
->sfd
);
550 pthread_mutex_lock(&conn_lock
);
551 allow_new_conns
= true;
552 pthread_mutex_unlock(&conn_lock
);
555 /* if the connection has big buffers, just free it */
556 if (c
->rsize
> READ_BUFFER_HIGHWAT
|| conn_add_to_freelist(c
)) {
568 * Shrinks a connection's buffers if they're too big. This prevents
569 * periodic large "get" requests from permanently chewing lots of server
572 * This should only be called in between requests since it can wipe output
575 static void conn_shrink(conn
*c
) {
578 if (IS_UDP(c
->transport
))
581 if (c
->rsize
> READ_BUFFER_HIGHWAT
&& c
->rbytes
< DATA_BUFFER_SIZE
) {
584 if (c
->rcurr
!= c
->rbuf
)
585 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
587 newbuf
= (char *)realloc((void *)c
->rbuf
, DATA_BUFFER_SIZE
);
591 c
->rsize
= DATA_BUFFER_SIZE
;
593 /* TODO check other branch... */
597 if (c
->isize
> ITEM_LIST_HIGHWAT
) {
598 item
**newbuf
= (item
**) realloc((void *)c
->ilist
, ITEM_LIST_INITIAL
* sizeof(c
->ilist
[0]));
601 c
->isize
= ITEM_LIST_INITIAL
;
603 /* TODO check error condition? */
606 if (c
->msgsize
> MSG_LIST_HIGHWAT
) {
607 struct msghdr
*newbuf
= (struct msghdr
*) realloc((void *)c
->msglist
, MSG_LIST_INITIAL
* sizeof(c
->msglist
[0]));
610 c
->msgsize
= MSG_LIST_INITIAL
;
612 /* TODO check error condition? */
615 if (c
->iovsize
> IOV_LIST_HIGHWAT
) {
616 struct iovec
*newbuf
= (struct iovec
*) realloc((void *)c
->iov
, IOV_LIST_INITIAL
* sizeof(c
->iov
[0]));
619 c
->iovsize
= IOV_LIST_INITIAL
;
621 /* TODO check return value */
626 * Convert a state name to a human readable form.
628 static const char *state_text(enum conn_states state
) {
629 const char* const statenames
[] = { "conn_listening",
639 return statenames
[state
];
642 #ifndef __INTEL_COMPILER
643 #pragma GCC diagnostic ignored "-Wtype-limits"
646 * Sets a connection's current state in the state machine. Any special
647 * processing that needs to happen on certain state transitions can
650 static void conn_set_state(conn
*c
, enum conn_states state
) {
652 assert(state
>= conn_listening
&& state
< conn_max_state
);
654 if (state
!= c
->state
) {
655 if (settings
.verbose
> 2) {
656 fprintf(stderr
, "%d: going from %s to %s\n",
657 c
->sfd
, state_text(c
->state
),
661 if (state
== conn_write
|| state
== conn_mwrite
) {
662 MEMCACHED_PROCESS_COMMAND_END(c
->sfd
, c
->wbuf
, c
->wbytes
);
669 * Ensures that there is room for another struct iovec in a connection's
672 * Returns 0 on success, -1 on out-of-memory.
674 static int ensure_iov_space(conn
*c
) {
677 if (c
->iovused
>= c
->iovsize
) {
679 struct iovec
*new_iov
= (struct iovec
*)realloc(c
->iov
,
680 (c
->iovsize
* 2) * sizeof(struct iovec
));
686 /* Point all the msghdr structures at the new list. */
687 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++) {
688 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
689 iovnum
+= c
->msglist
[i
].msg_iovlen
;
698 * Adds data to the list of pending data that will be written out to a
701 * Returns 0 on success, -1 on out-of-memory.
704 static int add_iov(conn
*c
, const void *buf
, int len
) {
712 m
= &c
->msglist
[c
->msgused
- 1];
715 * Limit UDP packets, and the first payloads of TCP replies, to
716 * UDP_MAX_PAYLOAD_SIZE bytes.
718 limit_to_mtu
= IS_UDP(c
->transport
) || (1 == c
->msgused
);
720 /* We may need to start a new msghdr if this one is full. */
721 if (m
->msg_iovlen
== IOV_MAX
||
722 (limit_to_mtu
&& c
->msgbytes
>= UDP_MAX_PAYLOAD_SIZE
)) {
724 m
= &c
->msglist
[c
->msgused
- 1];
727 if (ensure_iov_space(c
) != 0)
730 /* If the fragment is too big to fit in the datagram, split it up */
731 if (limit_to_mtu
&& len
+ c
->msgbytes
> UDP_MAX_PAYLOAD_SIZE
) {
732 leftover
= len
+ c
->msgbytes
- UDP_MAX_PAYLOAD_SIZE
;
738 m
= &c
->msglist
[c
->msgused
- 1];
739 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *)buf
;
740 m
->msg_iov
[m
->msg_iovlen
].iov_len
= len
;
746 buf
= ((char *)buf
) + len
;
748 } while (leftover
> 0);
755 * Constructs a set of UDP headers and attaches them to the outgoing messages.
757 static int build_udp_headers(conn
*c
) {
763 if (c
->msgused
> c
->hdrsize
) {
766 new_hdrbuf
= realloc(c
->hdrbuf
, c
->msgused
* 2 * UDP_HEADER_SIZE
);
768 new_hdrbuf
= malloc(c
->msgused
* 2 * UDP_HEADER_SIZE
);
771 c
->hdrbuf
= (unsigned char *)new_hdrbuf
;
772 c
->hdrsize
= c
->msgused
* 2;
776 for (i
= 0; i
< c
->msgused
; i
++) {
777 c
->msglist
[i
].msg_iov
[0].iov_base
= (void*)hdr
;
778 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
779 *hdr
++ = c
->request_id
/ 256;
780 *hdr
++ = c
->request_id
% 256;
783 *hdr
++ = c
->msgused
/ 256;
784 *hdr
++ = c
->msgused
% 256;
787 assert((void *) hdr
== (caddr_t
)c
->msglist
[i
].msg_iov
[0].iov_base
+ UDP_HEADER_SIZE
);
794 #ifndef __INTEL_COMPILER
795 #pragma GCC diagnostic ignored "-Wsign-compare"
797 static void out_string(conn
*c
, const char *str
) {
803 if (settings
.verbose
> 1)
804 fprintf(stderr
, ">%d NOREPLY %s\n", c
->sfd
, str
);
806 conn_set_state(c
, conn_new_cmd
);
810 if (settings
.verbose
> 1)
811 fprintf(stderr
, ">%d %s\n", c
->sfd
, str
);
813 /* Nuke a partial output... */
820 if ((len
+ 2) > c
->wsize
) {
821 /* ought to be always enough. just fail for simplicity */
822 str
= "SERVER_ERROR output line too long";
826 memcpy(c
->wbuf
, str
, len
);
827 memcpy(c
->wbuf
+ len
, "\r\n", 2);
831 conn_set_state(c
, conn_write
);
832 c
->write_and_go
= conn_new_cmd
;
837 * we get here after reading the value in set/add/replace commands. The command
838 * has been stored in c->cmd, and the item is ready in c->item.
840 static void complete_nread_ascii(conn
*c
) {
845 enum store_item_type ret
;
847 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
848 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].set_cmds
++;
849 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
851 if (strncmp(ITEM_data(it
) + it
->nbytes
- 2, "\r\n", 2) != 0) {
852 out_string(c
, "CLIENT_ERROR bad data chunk");
854 ret
= store_item(it
, comm
, c
);
857 uint64_t cas
= ITEM_get_cas(it
);
860 MEMCACHED_COMMAND_ADD(c
->sfd
, ITEM_key(it
), it
->nkey
,
861 (ret
== 1) ? it
->nbytes
: -1, cas
);
864 MEMCACHED_COMMAND_REPLACE(c
->sfd
, ITEM_key(it
), it
->nkey
,
865 (ret
== 1) ? it
->nbytes
: -1, cas
);
868 MEMCACHED_COMMAND_APPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
869 (ret
== 1) ? it
->nbytes
: -1, cas
);
872 MEMCACHED_COMMAND_PREPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
873 (ret
== 1) ? it
->nbytes
: -1, cas
);
876 MEMCACHED_COMMAND_SET(c
->sfd
, ITEM_key(it
), it
->nkey
,
877 (ret
== 1) ? it
->nbytes
: -1, cas
);
880 MEMCACHED_COMMAND_CAS(c
->sfd
, ITEM_key(it
), it
->nkey
, it
->nbytes
,
888 out_string(c
, "STORED");
891 out_string(c
, "EXISTS");
894 out_string(c
, "NOT_FOUND");
897 out_string(c
, "NOT_STORED");
900 out_string(c
, "SERVER_ERROR Unhandled storage type.");
905 item_remove(c
->item
); /* release the c->item reference */
910 * get a pointer to the start of the request struct for the current command
912 static void* binary_get_request(conn
*c
) {
913 char *ret
= c
->rcurr
;
914 ret
-= (sizeof(c
->binary_header
) + c
->binary_header
.request
.keylen
+
915 c
->binary_header
.request
.extlen
);
917 assert(ret
>= c
->rbuf
);
922 * get a pointer to the key in this request
924 static char* binary_get_key(conn
*c
) {
925 return c
->rcurr
- (c
->binary_header
.request
.keylen
);
928 static void add_bin_header(conn
*c
, uint16_t err
, uint8_t hdr_len
, uint16_t key_len
, uint32_t body_len
) {
929 protocol_binary_response_header
* header
;
936 if (add_msghdr(c
) != 0) {
937 /* XXX: out_string is inappropriate here */
938 out_string(c
, "SERVER_ERROR out of memory");
942 header
= (protocol_binary_response_header
*)c
->wbuf
;
944 header
->response
.magic
= (uint8_t)PROTOCOL_BINARY_RES
;
945 header
->response
.opcode
= c
->binary_header
.request
.opcode
;
946 header
->response
.keylen
= (uint16_t)htons(key_len
);
948 header
->response
.extlen
= (uint8_t)hdr_len
;
949 header
->response
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
;
950 header
->response
.status
= (uint16_t)htons(err
);
952 header
->response
.bodylen
= htonl(body_len
);
953 header
->response
.opaque
= c
->opaque
;
954 header
->response
.cas
= htonll(c
->cas
);
956 if (settings
.verbose
> 1) {
958 fprintf(stderr
, ">%d Writing bin response:", c
->sfd
);
959 for (ii
= 0; ii
< sizeof(header
->bytes
); ++ii
) {
961 fprintf(stderr
, "\n>%d ", c
->sfd
);
963 fprintf(stderr
, " 0x%02x", header
->bytes
[ii
]);
965 fprintf(stderr
, "\n");
968 add_iov(c
, c
->wbuf
, sizeof(header
->response
));
971 static void write_bin_error(conn
*c
, protocol_binary_response_status err
, int swallow
) {
972 const char *errstr
= "Unknown error";
976 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
977 errstr
= "Out of memory";
979 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
980 errstr
= "Unknown command";
982 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
983 errstr
= "Not found";
985 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
986 errstr
= "Invalid arguments";
988 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
:
989 errstr
= "Data exists for key.";
991 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
992 errstr
= "Too large.";
994 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL
:
995 errstr
= "Non-numeric server-side value for incr or decr";
997 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
998 errstr
= "Not stored.";
1000 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
:
1001 errstr
= "Auth failure.";
1003 case PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE
:
1005 case PROTOCOL_BINARY_RESPONSE_SUCCESS
:
1009 errstr
= "UNHANDLED ERROR";
1010 fprintf(stderr
, ">%d UNHANDLED ERROR: %d\n", c
->sfd
, err
);
1013 if (settings
.verbose
> 1) {
1014 fprintf(stderr
, ">%d Writing an error: %s\n", c
->sfd
, errstr
);
1017 len
= strlen(errstr
);
1018 add_bin_header(c
, err
, 0, 0, len
);
1020 add_iov(c
, errstr
, len
);
1022 conn_set_state(c
, conn_mwrite
);
1024 c
->sbytes
= swallow
;
1025 c
->write_and_go
= conn_swallow
;
1027 c
->write_and_go
= conn_new_cmd
;
1031 /* Form and send a response to a command over the binary protocol */
1032 static void write_bin_response(conn
*c
, const void *d
, int hlen
, int keylen
, int dlen
) {
1033 if (!c
->noreply
|| c
->cmd
== PROTOCOL_BINARY_CMD_GET
||
1034 c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1035 add_bin_header(c
, 0, hlen
, keylen
, dlen
);
1037 add_iov(c
, d
, dlen
);
1039 conn_set_state(c
, conn_mwrite
);
1040 c
->write_and_go
= conn_new_cmd
;
1042 conn_set_state(c
, conn_new_cmd
);
1046 static void complete_incr_bin(conn
*c
) {
1050 /* Weird magic in add_delta forces me to pad here */
1051 char tmpbuf
[INCR_MAX_STORAGE_LEN
];
1054 protocol_binary_response_incr
* rsp
= (protocol_binary_response_incr
*)c
->wbuf
;
1055 protocol_binary_request_incr
* req
= binary_get_request(c
);
1058 assert(c
->wsize
>= sizeof(*rsp
));
1060 /* fix byteorder in the request */
1061 req
->message
.body
.delta
= ntohll(req
->message
.body
.delta
);
1062 req
->message
.body
.initial
= ntohll(req
->message
.body
.initial
);
1063 req
->message
.body
.expiration
= ntohl(req
->message
.body
.expiration
);
1064 key
= binary_get_key(c
);
1065 nkey
= c
->binary_header
.request
.keylen
;
1067 if (settings
.verbose
> 1) {
1069 fprintf(stderr
, "incr ");
1071 for (i
= 0; i
< nkey
; i
++) {
1072 fprintf(stderr
, "%c", key
[i
]);
1074 fprintf(stderr
, " %lld, %llu, %d\n",
1075 (long long)req
->message
.body
.delta
,
1076 (long long)req
->message
.body
.initial
,
1077 req
->message
.body
.expiration
);
1080 if (c
->binary_header
.request
.cas
!= 0) {
1081 cas
= c
->binary_header
.request
.cas
;
1083 switch(add_delta(c
, key
, nkey
, c
->cmd
== PROTOCOL_BINARY_CMD_INCREMENT
,
1084 req
->message
.body
.delta
, tmpbuf
,
1087 rsp
->message
.body
.value
= htonll(strtoull(tmpbuf
, NULL
, 10));
1091 write_bin_response(c
, &rsp
->message
.body
, 0, 0,
1092 sizeof(rsp
->message
.body
.value
));
1095 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL
, 0);
1098 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1100 case DELTA_ITEM_NOT_FOUND
:
1101 if (req
->message
.body
.expiration
!= 0xffffffff) {
1102 /* Save some room for the response */
1103 rsp
->message
.body
.value
= htonll(req
->message
.body
.initial
);
1104 it
= item_alloc(key
, nkey
, 0, realtime(req
->message
.body
.expiration
),
1105 INCR_MAX_STORAGE_LEN
);
1108 snprintf(ITEM_data(it
), INCR_MAX_STORAGE_LEN
, "%llu",
1109 (unsigned long long)req
->message
.body
.initial
);
1111 if (store_item(it
, NREAD_ADD
, c
)) {
1112 c
->cas
= ITEM_get_cas(it
);
1113 write_bin_response(c
, &rsp
->message
.body
, 0, 0, sizeof(rsp
->message
.body
.value
));
1115 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_NOT_STORED
, 0);
1117 item_remove(it
); /* release our reference */
1119 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1122 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1123 if (c
->cmd
== PROTOCOL_BINARY_CMD_INCREMENT
) {
1124 c
->thread
->stats
.incr_misses
++;
1126 c
->thread
->stats
.decr_misses
++;
1128 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1130 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1133 case DELTA_ITEM_CAS_MISMATCH
:
1134 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
1143 static void complete_update_bin(conn
*c
) {
1144 protocol_binary_response_status eno
= PROTOCOL_BINARY_RESPONSE_EINVAL
;
1145 enum store_item_type ret
= NOT_STORED
;
1150 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1151 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].set_cmds
++;
1152 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1154 /* We don't actually receive the trailing two characters in the bin
1155 * protocol, so we're going to just set them here */
1156 *(ITEM_data(it
) + it
->nbytes
- 2) = '\r';
1157 *(ITEM_data(it
) + it
->nbytes
- 1) = '\n';
1159 ret
= store_item(it
, c
->cmd
, c
);
1161 #ifdef ENABLE_DTRACE
1162 uint64_t cas
= ITEM_get_cas(it
);
1165 MEMCACHED_COMMAND_ADD(c
->sfd
, ITEM_key(it
), it
->nkey
,
1166 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1169 MEMCACHED_COMMAND_REPLACE(c
->sfd
, ITEM_key(it
), it
->nkey
,
1170 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1173 MEMCACHED_COMMAND_APPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
1174 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1177 MEMCACHED_COMMAND_PREPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
1178 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1181 MEMCACHED_COMMAND_SET(c
->sfd
, ITEM_key(it
), it
->nkey
,
1182 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1190 write_bin_response(c
, NULL
, 0, 0, 0);
1193 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
1196 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1199 if (c
->cmd
== NREAD_ADD
) {
1200 eno
= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
;
1201 } else if(c
->cmd
== NREAD_REPLACE
) {
1202 eno
= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
;
1204 eno
= PROTOCOL_BINARY_RESPONSE_NOT_STORED
;
1206 write_bin_error(c
, eno
, 0);
1212 item_remove(c
->item
); /* release the c->item reference */
1216 static void process_bin_touch(conn
*c
) {
1219 protocol_binary_response_get
* rsp
= (protocol_binary_response_get
*)c
->wbuf
;
1220 char* key
= binary_get_key(c
);
1221 size_t nkey
= c
->binary_header
.request
.keylen
;
1222 protocol_binary_request_touch
*t
= (void *)&c
->binary_header
;
1223 uint32_t exptime
= ntohl(t
->message
.body
.expiration
);
1225 if (settings
.verbose
> 1) {
1227 /* May be GAT/GATQ/etc */
1228 fprintf(stderr
, "<%d TOUCH ", c
->sfd
);
1229 for (ii
= 0; ii
< nkey
; ++ii
) {
1230 fprintf(stderr
, "%c", key
[ii
]);
1232 fprintf(stderr
, "\n");
1235 it
= item_touch(key
, nkey
, realtime(exptime
));
1238 /* the length has two unnecessary bytes ("\r\n") */
1239 uint16_t keylen
= 0;
1240 uint32_t bodylen
= sizeof(rsp
->message
.body
) + (it
->nbytes
- 2);
1243 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1244 c
->thread
->stats
.touch_cmds
++;
1245 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].touch_hits
++;
1246 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1248 MEMCACHED_COMMAND_TOUCH(c
->sfd
, ITEM_key(it
), it
->nkey
,
1249 it
->nbytes
, ITEM_get_cas(it
));
1251 if (c
->cmd
== PROTOCOL_BINARY_CMD_TOUCH
) {
1252 bodylen
-= it
->nbytes
- 2;
1253 } else if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1258 add_bin_header(c
, 0, sizeof(rsp
->message
.body
), keylen
, bodylen
);
1259 rsp
->message
.header
.response
.cas
= htonll(ITEM_get_cas(it
));
1262 rsp
->message
.body
.flags
= htonl(strtoul(ITEM_suffix(it
), NULL
, 10));
1263 add_iov(c
, &rsp
->message
.body
, sizeof(rsp
->message
.body
));
1265 if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1266 add_iov(c
, ITEM_key(it
), nkey
);
1269 /* Add the data minus the CRLF */
1270 if (c
->cmd
!= PROTOCOL_BINARY_CMD_TOUCH
) {
1271 add_iov(c
, ITEM_data(it
), it
->nbytes
- 2);
1274 conn_set_state(c
, conn_mwrite
);
1275 c
->write_and_go
= conn_new_cmd
;
1276 /* Remember this command so we can garbage collect it later */
1279 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1280 c
->thread
->stats
.touch_cmds
++;
1281 c
->thread
->stats
.touch_misses
++;
1282 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1284 MEMCACHED_COMMAND_TOUCH(c
->sfd
, key
, nkey
, -1, 0);
1287 conn_set_state(c
, conn_new_cmd
);
1289 if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1290 char *ofs
= c
->wbuf
+ sizeof(protocol_binary_response_header
);
1291 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
,
1293 memcpy(ofs
, key
, nkey
);
1294 add_iov(c
, ofs
, nkey
);
1295 conn_set_state(c
, conn_mwrite
);
1296 c
->write_and_go
= conn_new_cmd
;
1298 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1303 if (settings
.detail_enabled
) {
1304 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
1308 static void process_bin_get(conn
*c
) {
1311 protocol_binary_response_get
* rsp
= (protocol_binary_response_get
*)c
->wbuf
;
1312 char* key
= binary_get_key(c
);
1313 size_t nkey
= c
->binary_header
.request
.keylen
;
1315 if (settings
.verbose
> 1) {
1317 fprintf(stderr
, "<%d GET ", c
->sfd
);
1318 for (ii
= 0; ii
< nkey
; ++ii
) {
1319 fprintf(stderr
, "%c", key
[ii
]);
1321 fprintf(stderr
, "\n");
1324 it
= item_get(key
, nkey
);
1326 /* the length has two unnecessary bytes ("\r\n") */
1327 uint16_t keylen
= 0;
1328 uint32_t bodylen
= sizeof(rsp
->message
.body
) + (it
->nbytes
- 2);
1331 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1332 c
->thread
->stats
.get_cmds
++;
1333 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].get_hits
++;
1334 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1336 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
1337 it
->nbytes
, ITEM_get_cas(it
));
1339 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1343 add_bin_header(c
, 0, sizeof(rsp
->message
.body
), keylen
, bodylen
);
1344 rsp
->message
.header
.response
.cas
= htonll(ITEM_get_cas(it
));
1347 rsp
->message
.body
.flags
= htonl(strtoul(ITEM_suffix(it
), NULL
, 10));
1348 add_iov(c
, &rsp
->message
.body
, sizeof(rsp
->message
.body
));
1350 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1351 add_iov(c
, ITEM_key(it
), nkey
);
1354 /* Add the data minus the CRLF */
1355 add_iov(c
, ITEM_data(it
), it
->nbytes
- 2);
1356 conn_set_state(c
, conn_mwrite
);
1357 c
->write_and_go
= conn_new_cmd
;
1358 /* Remember this command so we can garbage collect it later */
1361 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1362 c
->thread
->stats
.get_cmds
++;
1363 c
->thread
->stats
.get_misses
++;
1364 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1366 MEMCACHED_COMMAND_GET(c
->sfd
, key
, nkey
, -1, 0);
1369 conn_set_state(c
, conn_new_cmd
);
1371 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1372 char *ofs
= c
->wbuf
+ sizeof(protocol_binary_response_header
);
1373 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
,
1375 memcpy(ofs
, key
, nkey
);
1376 add_iov(c
, ofs
, nkey
);
1377 conn_set_state(c
, conn_mwrite
);
1378 c
->write_and_go
= conn_new_cmd
;
1380 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1385 if (settings
.detail_enabled
) {
1386 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
1390 #ifndef __INTEL_COMPILER
1391 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
1393 static void append_bin_stats(const char *key
, const uint16_t klen
,
1394 const char *val
, const uint32_t vlen
,
1396 char *buf
= c
->stats
.buffer
+ c
->stats
.offset
;
1397 uint32_t bodylen
= klen
+ vlen
;
1398 protocol_binary_response_header header
= {
1399 .response
.magic
= (uint8_t)PROTOCOL_BINARY_RES
,
1400 .response
.opcode
= (uint8_t)PROTOCOL_BINARY_CMD_STAT
,
1401 .response
.keylen
= (uint16_t)htons(klen
),
1402 .response
.extlen
= (uint8_t)0,
1403 .response
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
,
1404 .response
.status
= (uint16_t)0,
1405 .response
.bodylen
= htonl(bodylen
),
1406 .response
.opaque
= c
->opaque
,
1407 .response
.cas
= (uint64_t)0
1410 memcpy(buf
, header
.bytes
, sizeof(header
.response
));
1411 buf
+= sizeof(header
.response
);
1414 memcpy(buf
, key
, klen
);
1418 memcpy(buf
, val
, vlen
);
1422 c
->stats
.offset
+= sizeof(header
.response
) + bodylen
;
1425 static void append_ascii_stats(const char *key
, const uint16_t klen
,
1426 const char *val
, const uint32_t vlen
,
1428 char *pos
= c
->stats
.buffer
+ c
->stats
.offset
;
1429 uint32_t nbytes
= 0;
1430 int remaining
= c
->stats
.size
- c
->stats
.offset
;
1431 int room
= remaining
- 1;
1433 if (klen
== 0 && vlen
== 0) {
1434 nbytes
= snprintf(pos
, room
, "END\r\n");
1435 } else if (vlen
== 0) {
1436 nbytes
= snprintf(pos
, room
, "STAT %s\r\n", key
);
1438 nbytes
= snprintf(pos
, room
, "STAT %s %s\r\n", key
, val
);
1441 c
->stats
.offset
+= nbytes
;
1444 static bool grow_stats_buf(conn
*c
, size_t needed
) {
1445 size_t nsize
= c
->stats
.size
;
1446 size_t available
= nsize
- c
->stats
.offset
;
1449 /* Special case: No buffer -- need to allocate fresh */
1450 if (c
->stats
.buffer
== NULL
) {
1452 available
= c
->stats
.size
= c
->stats
.offset
= 0;
1455 while (needed
> available
) {
1458 available
= nsize
- c
->stats
.offset
;
1461 if (nsize
!= c
->stats
.size
) {
1462 char *ptr
= realloc(c
->stats
.buffer
, nsize
);
1464 c
->stats
.buffer
= ptr
;
1465 c
->stats
.size
= nsize
;
1474 static void append_stats(const char *key
, const uint16_t klen
,
1475 const char *val
, const uint32_t vlen
,
1478 /* value without a key is invalid */
1479 if (klen
== 0 && vlen
> 0) {
1483 conn
*c
= (conn
*)cookie
;
1485 if (c
->protocol
== binary_prot
) {
1486 size_t needed
= vlen
+ klen
+ sizeof(protocol_binary_response_header
);
1487 if (!grow_stats_buf(c
, needed
)) {
1490 append_bin_stats(key
, klen
, val
, vlen
, c
);
1492 size_t needed
= vlen
+ klen
+ 10; // 10 == "STAT = \r\n"
1493 if (!grow_stats_buf(c
, needed
)) {
1496 append_ascii_stats(key
, klen
, val
, vlen
, c
);
1499 assert(c
->stats
.offset
<= c
->stats
.size
);
1502 static void process_bin_stat(conn
*c
) {
1503 char *subcommand
= binary_get_key(c
);
1504 size_t nkey
= c
->binary_header
.request
.keylen
;
1506 if (settings
.verbose
> 1) {
1508 fprintf(stderr
, "<%d STATS ", c
->sfd
);
1509 for (ii
= 0; ii
< nkey
; ++ii
) {
1510 fprintf(stderr
, "%c", subcommand
[ii
]);
1512 fprintf(stderr
, "\n");
1516 /* request all statistics */
1517 server_stats(&append_stats
, c
);
1518 (void)get_stats(NULL
, 0, &append_stats
, c
);
1519 } else if (strncmp(subcommand
, "reset", 5) == 0) {
1521 } else if (strncmp(subcommand
, "settings", 8) == 0) {
1522 process_stat_settings(&append_stats
, c
);
1523 } else if (strncmp(subcommand
, "detail", 6) == 0) {
1524 char *subcmd_pos
= subcommand
+ 6;
1525 if (strncmp(subcmd_pos
, " dump", 5) == 0) {
1527 char *dump_buf
= stats_prefix_dump(&len
);
1528 if (dump_buf
== NULL
|| len
<= 0) {
1529 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1532 append_stats("detailed", strlen("detailed"), dump_buf
, len
, c
);
1535 } else if (strncmp(subcmd_pos
, " on", 3) == 0) {
1536 settings
.detail_enabled
= 1;
1537 } else if (strncmp(subcmd_pos
, " off", 4) == 0) {
1538 settings
.detail_enabled
= 0;
1540 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1544 if (get_stats(subcommand
, nkey
, &append_stats
, c
)) {
1545 if (c
->stats
.buffer
== NULL
) {
1546 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1548 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
1549 c
->stats
.buffer
= NULL
;
1552 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1558 /* Append termination package and start the transfer */
1559 append_stats(NULL
, 0, NULL
, 0, c
);
1560 if (c
->stats
.buffer
== NULL
) {
1561 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1563 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
1564 c
->stats
.buffer
= NULL
;
1568 static void bin_read_key(conn
*c
, enum bin_substates next_substate
, int extra
) {
1570 c
->substate
= next_substate
;
1571 c
->rlbytes
= c
->keylen
+ extra
;
1573 /* Ok... do we have room for the extras and the key in the input buffer? */
1574 ptrdiff_t offset
= c
->rcurr
+ sizeof(protocol_binary_request_header
) - c
->rbuf
;
1575 if (c
->rlbytes
> c
->rsize
- offset
) {
1576 size_t nsize
= c
->rsize
;
1577 size_t size
= c
->rlbytes
+ sizeof(protocol_binary_request_header
);
1579 while (size
> nsize
) {
1583 if (nsize
!= c
->rsize
) {
1584 if (settings
.verbose
> 1) {
1585 fprintf(stderr
, "%d: Need to grow buffer from %lu to %lu\n",
1586 c
->sfd
, (unsigned long)c
->rsize
, (unsigned long)nsize
);
1588 char *newm
= realloc(c
->rbuf
, nsize
);
1590 if (settings
.verbose
) {
1591 fprintf(stderr
, "%d: Failed to grow buffer.. closing connection\n",
1594 conn_set_state(c
, conn_closing
);
1599 /* rcurr should point to the same offset in the packet */
1600 c
->rcurr
= c
->rbuf
+ offset
- sizeof(protocol_binary_request_header
);
1603 if (c
->rbuf
!= c
->rcurr
) {
1604 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1606 if (settings
.verbose
> 1) {
1607 fprintf(stderr
, "%d: Repack input buffer\n", c
->sfd
);
1612 /* preserve the header in the buffer.. */
1613 c
->ritem
= c
->rcurr
+ sizeof(protocol_binary_request_header
);
1614 conn_set_state(c
, conn_nread
);
1617 /* Just write an error message and disconnect the client */
1618 static void handle_binary_protocol_error(conn
*c
) {
1619 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_EINVAL
, 0);
1620 if (settings
.verbose
) {
1621 fprintf(stderr
, "Protocol error (opcode %02x), close connection %d\n",
1622 c
->binary_header
.request
.opcode
, c
->sfd
);
1624 c
->write_and_go
= conn_closing
;
1627 static void init_sasl_conn(conn
*c
) {
1629 /* should something else be returned? */
1633 if (!c
->sasl_conn
) {
1634 int result
=sasl_server_new("memcached",
1636 my_sasl_hostname
[0] ? my_sasl_hostname
: NULL
,
1638 NULL
, 0, &c
->sasl_conn
);
1639 if (result
!= SASL_OK
) {
1640 if (settings
.verbose
) {
1641 fprintf(stderr
, "Failed to initialize SASL conn.\n");
1643 c
->sasl_conn
= NULL
;
1648 static void bin_list_sasl_mechs(conn
*c
) {
1649 // Guard against a disabled SASL.
1650 if (!settings
.sasl
) {
1651 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
,
1652 c
->binary_header
.request
.bodylen
1653 - c
->binary_header
.request
.keylen
);
1658 const char *result_string
= NULL
;
1659 unsigned int string_length
= 0;
1660 int result
=sasl_listmech(c
->sasl_conn
, NULL
,
1661 "", /* What to prepend the string with */
1662 " ", /* What to separate mechanisms with */
1663 "", /* What to append to the string */
1664 &result_string
, &string_length
,
1666 if (result
!= SASL_OK
) {
1667 /* Perhaps there's a better error for this... */
1668 if (settings
.verbose
) {
1669 fprintf(stderr
, "Failed to list SASL mechanisms.\n");
1671 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1674 write_bin_response(c
, (char*)result_string
, 0, 0, string_length
);
1677 static void process_bin_sasl_auth(conn
*c
) {
1678 // Guard for handling disabled SASL on the server.
1679 if (!settings
.sasl
) {
1680 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
,
1681 c
->binary_header
.request
.bodylen
1682 - c
->binary_header
.request
.keylen
);
1686 assert(c
->binary_header
.request
.extlen
== 0);
1688 int nkey
= c
->binary_header
.request
.keylen
;
1689 int vlen
= c
->binary_header
.request
.bodylen
- nkey
;
1691 if (nkey
> MAX_SASL_MECH_LEN
) {
1692 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_EINVAL
, vlen
);
1693 c
->write_and_go
= conn_swallow
;
1697 char *key
= binary_get_key(c
);
1700 item
*it
= item_alloc(key
, nkey
, 0, 0, vlen
);
1703 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
1704 c
->write_and_go
= conn_swallow
;
1709 c
->ritem
= ITEM_data(it
);
1711 conn_set_state(c
, conn_nread
);
1712 c
->substate
= bin_reading_sasl_auth_data
;
1715 static void process_bin_complete_sasl_auth(conn
*c
) {
1716 assert(settings
.sasl
);
1717 const char *out
= NULL
;
1718 unsigned int outlen
= 0;
1723 int nkey
= c
->binary_header
.request
.keylen
;
1724 int vlen
= c
->binary_header
.request
.bodylen
- nkey
;
1727 memcpy(mech
, ITEM_key((item
*)c
->item
), nkey
);
1730 if (settings
.verbose
)
1731 fprintf(stderr
, "mech: ``%s'' with %d bytes of data\n", mech
, vlen
);
1733 const char *challenge
= vlen
== 0 ? NULL
: ITEM_data((item
*) c
->item
);
1738 case PROTOCOL_BINARY_CMD_SASL_AUTH
:
1739 result
= sasl_server_start(c
->sasl_conn
, mech
,
1743 case PROTOCOL_BINARY_CMD_SASL_STEP
:
1744 result
= sasl_server_step(c
->sasl_conn
,
1749 assert(false); /* CMD should be one of the above */
1750 /* This code is pretty much impossible, but makes the compiler
1752 if (settings
.verbose
) {
1753 fprintf(stderr
, "Unhandled command %d with challenge %s\n",
1759 item_unlink(c
->item
);
1761 if (settings
.verbose
) {
1762 fprintf(stderr
, "sasl result code: %d\n", result
);
1767 write_bin_response(c
, "Authenticated", 0, 0, strlen("Authenticated"));
1768 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1769 c
->thread
->stats
.auth_cmds
++;
1770 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1773 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE
, 0, 0, outlen
);
1775 add_iov(c
, out
, outlen
);
1777 conn_set_state(c
, conn_mwrite
);
1778 c
->write_and_go
= conn_new_cmd
;
1781 if (settings
.verbose
)
1782 fprintf(stderr
, "Unknown sasl response: %d\n", result
);
1783 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1784 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1785 c
->thread
->stats
.auth_cmds
++;
1786 c
->thread
->stats
.auth_errors
++;
1787 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1791 static bool authenticated(conn
*c
) {
1792 assert(settings
.sasl
);
1796 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS
: /* FALLTHROUGH */
1797 case PROTOCOL_BINARY_CMD_SASL_AUTH
: /* FALLTHROUGH */
1798 case PROTOCOL_BINARY_CMD_SASL_STEP
: /* FALLTHROUGH */
1799 case PROTOCOL_BINARY_CMD_VERSION
: /* FALLTHROUGH */
1804 const void *uname
= NULL
;
1805 sasl_getprop(c
->sasl_conn
, SASL_USERNAME
, &uname
);
1810 if (settings
.verbose
> 1) {
1811 fprintf(stderr
, "authenticated() in cmd 0x%02x is %s\n",
1812 c
->cmd
, rv
? "true" : "false");
1818 static void dispatch_bin_command(conn
*c
) {
1819 int protocol_error
= 0;
1821 int extlen
= c
->binary_header
.request
.extlen
;
1822 int keylen
= c
->binary_header
.request
.keylen
;
1823 uint32_t bodylen
= c
->binary_header
.request
.bodylen
;
1825 if (settings
.sasl
&& !authenticated(c
)) {
1826 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1827 c
->write_and_go
= conn_closing
;
1831 MEMCACHED_PROCESS_COMMAND_START(c
->sfd
, c
->rcurr
, c
->rbytes
);
1834 /* binprot supports 16bit keys, but internals are still 8bit */
1835 if (keylen
> KEY_MAX_LENGTH
) {
1836 handle_binary_protocol_error(c
);
1841 case PROTOCOL_BINARY_CMD_SETQ
:
1842 c
->cmd
= PROTOCOL_BINARY_CMD_SET
;
1844 case PROTOCOL_BINARY_CMD_ADDQ
:
1845 c
->cmd
= PROTOCOL_BINARY_CMD_ADD
;
1847 case PROTOCOL_BINARY_CMD_REPLACEQ
:
1848 c
->cmd
= PROTOCOL_BINARY_CMD_REPLACE
;
1850 case PROTOCOL_BINARY_CMD_DELETEQ
:
1851 c
->cmd
= PROTOCOL_BINARY_CMD_DELETE
;
1853 case PROTOCOL_BINARY_CMD_INCREMENTQ
:
1854 c
->cmd
= PROTOCOL_BINARY_CMD_INCREMENT
;
1856 case PROTOCOL_BINARY_CMD_DECREMENTQ
:
1857 c
->cmd
= PROTOCOL_BINARY_CMD_DECREMENT
;
1859 case PROTOCOL_BINARY_CMD_QUITQ
:
1860 c
->cmd
= PROTOCOL_BINARY_CMD_QUIT
;
1862 case PROTOCOL_BINARY_CMD_FLUSHQ
:
1863 c
->cmd
= PROTOCOL_BINARY_CMD_FLUSH
;
1865 case PROTOCOL_BINARY_CMD_APPENDQ
:
1866 c
->cmd
= PROTOCOL_BINARY_CMD_APPEND
;
1868 case PROTOCOL_BINARY_CMD_PREPENDQ
:
1869 c
->cmd
= PROTOCOL_BINARY_CMD_PREPEND
;
1871 case PROTOCOL_BINARY_CMD_GETQ
:
1872 c
->cmd
= PROTOCOL_BINARY_CMD_GET
;
1874 case PROTOCOL_BINARY_CMD_GETKQ
:
1875 c
->cmd
= PROTOCOL_BINARY_CMD_GETK
;
1877 case PROTOCOL_BINARY_CMD_GATQ
:
1878 c
->cmd
= PROTOCOL_BINARY_CMD_GAT
;
1880 case PROTOCOL_BINARY_CMD_GATKQ
:
1881 c
->cmd
= PROTOCOL_BINARY_CMD_GAT
;
1888 case PROTOCOL_BINARY_CMD_VERSION
:
1889 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1890 write_bin_response(c
, RVERSION
, 0, 0, strlen(RVERSION
));
1895 case PROTOCOL_BINARY_CMD_FLUSH
:
1896 if (keylen
== 0 && bodylen
== extlen
&& (extlen
== 0 || extlen
== 4)) {
1897 bin_read_key(c
, bin_read_flush_exptime
, extlen
);
1902 case PROTOCOL_BINARY_CMD_NOOP
:
1903 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1904 write_bin_response(c
, NULL
, 0, 0, 0);
1909 case PROTOCOL_BINARY_CMD_SET
: /* FALLTHROUGH */
1910 case PROTOCOL_BINARY_CMD_ADD
: /* FALLTHROUGH */
1911 case PROTOCOL_BINARY_CMD_REPLACE
:
1912 if (extlen
== 8 && keylen
!= 0 && bodylen
>= (keylen
+ 8)) {
1913 bin_read_key(c
, bin_reading_set_header
, 8);
1918 case PROTOCOL_BINARY_CMD_GETQ
: /* FALLTHROUGH */
1919 case PROTOCOL_BINARY_CMD_GET
: /* FALLTHROUGH */
1920 case PROTOCOL_BINARY_CMD_GETKQ
: /* FALLTHROUGH */
1921 case PROTOCOL_BINARY_CMD_GETK
:
1922 if (extlen
== 0 && bodylen
== keylen
&& keylen
> 0) {
1923 bin_read_key(c
, bin_reading_get_key
, 0);
1928 case PROTOCOL_BINARY_CMD_DELETE
:
1929 if (keylen
> 0 && extlen
== 0 && bodylen
== keylen
) {
1930 bin_read_key(c
, bin_reading_del_header
, extlen
);
1935 case PROTOCOL_BINARY_CMD_INCREMENT
:
1936 case PROTOCOL_BINARY_CMD_DECREMENT
:
1937 if (keylen
> 0 && extlen
== 20 && bodylen
== (keylen
+ extlen
)) {
1938 bin_read_key(c
, bin_reading_incr_header
, 20);
1943 case PROTOCOL_BINARY_CMD_APPEND
:
1944 case PROTOCOL_BINARY_CMD_PREPEND
:
1945 if (keylen
> 0 && extlen
== 0) {
1946 bin_read_key(c
, bin_reading_set_header
, 0);
1951 case PROTOCOL_BINARY_CMD_STAT
:
1953 bin_read_key(c
, bin_reading_stat
, 0);
1958 case PROTOCOL_BINARY_CMD_QUIT
:
1959 if (keylen
== 0 && extlen
== 0 && bodylen
== 0) {
1960 write_bin_response(c
, NULL
, 0, 0, 0);
1961 c
->write_and_go
= conn_closing
;
1963 conn_set_state(c
, conn_closing
);
1969 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS
:
1970 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1971 bin_list_sasl_mechs(c
);
1976 case PROTOCOL_BINARY_CMD_SASL_AUTH
:
1977 case PROTOCOL_BINARY_CMD_SASL_STEP
:
1978 if (extlen
== 0 && keylen
!= 0) {
1979 bin_read_key(c
, bin_reading_sasl_auth
, 0);
1984 case PROTOCOL_BINARY_CMD_TOUCH
:
1985 case PROTOCOL_BINARY_CMD_GAT
:
1986 case PROTOCOL_BINARY_CMD_GATQ
:
1987 case PROTOCOL_BINARY_CMD_GATK
:
1988 case PROTOCOL_BINARY_CMD_GATKQ
:
1989 if (extlen
== 4 && keylen
!= 0) {
1990 bin_read_key(c
, bin_reading_touch_key
, 4);
1996 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
, bodylen
);
2000 handle_binary_protocol_error(c
);
2003 static void process_bin_update(conn
*c
) {
2008 protocol_binary_request_set
* req
= binary_get_request(c
);
2012 key
= binary_get_key(c
);
2013 nkey
= c
->binary_header
.request
.keylen
;
2015 /* fix byteorder in the request */
2016 req
->message
.body
.flags
= ntohl(req
->message
.body
.flags
);
2017 req
->message
.body
.expiration
= ntohl(req
->message
.body
.expiration
);
2019 vlen
= c
->binary_header
.request
.bodylen
- (nkey
+ c
->binary_header
.request
.extlen
);
2021 if (settings
.verbose
> 1) {
2023 if (c
->cmd
== PROTOCOL_BINARY_CMD_ADD
) {
2024 fprintf(stderr
, "<%d ADD ", c
->sfd
);
2025 } else if (c
->cmd
== PROTOCOL_BINARY_CMD_SET
) {
2026 fprintf(stderr
, "<%d SET ", c
->sfd
);
2028 fprintf(stderr
, "<%d REPLACE ", c
->sfd
);
2030 for (ii
= 0; ii
< nkey
; ++ii
) {
2031 fprintf(stderr
, "%c", key
[ii
]);
2034 fprintf(stderr
, " Value len is %d", vlen
);
2035 fprintf(stderr
, "\n");
2038 if (settings
.detail_enabled
) {
2039 stats_prefix_record_set(key
, nkey
);
2042 it
= item_alloc(key
, nkey
, req
->message
.body
.flags
,
2043 realtime(req
->message
.body
.expiration
), vlen
+2);
2046 if (! item_size_ok(nkey
, req
->message
.body
.flags
, vlen
+ 2)) {
2047 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_E2BIG
, vlen
);
2049 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
2052 /* Avoid stale data persisting in cache because we failed alloc.
2053 * Unacceptable for SET. Anywhere else too? */
2054 if (c
->cmd
== PROTOCOL_BINARY_CMD_SET
) {
2055 it
= item_get(key
, nkey
);
2062 /* swallow the data line */
2063 c
->write_and_go
= conn_swallow
;
2067 ITEM_set_cas(it
, c
->binary_header
.request
.cas
);
2070 case PROTOCOL_BINARY_CMD_ADD
:
2073 case PROTOCOL_BINARY_CMD_SET
:
2076 case PROTOCOL_BINARY_CMD_REPLACE
:
2077 c
->cmd
= NREAD_REPLACE
;
2083 if (ITEM_get_cas(it
) != 0) {
2088 c
->ritem
= ITEM_data(it
);
2090 conn_set_state(c
, conn_nread
);
2091 c
->substate
= bin_read_set_value
;
2094 static void process_bin_append_prepend(conn
*c
) {
2102 key
= binary_get_key(c
);
2103 nkey
= c
->binary_header
.request
.keylen
;
2104 vlen
= c
->binary_header
.request
.bodylen
- nkey
;
2106 if (settings
.verbose
> 1) {
2107 fprintf(stderr
, "Value len is %d\n", vlen
);
2110 if (settings
.detail_enabled
) {
2111 stats_prefix_record_set(key
, nkey
);
2114 it
= item_alloc(key
, nkey
, 0, 0, vlen
+2);
2117 if (! item_size_ok(nkey
, 0, vlen
+ 2)) {
2118 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_E2BIG
, vlen
);
2120 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
2122 /* swallow the data line */
2123 c
->write_and_go
= conn_swallow
;
2127 ITEM_set_cas(it
, c
->binary_header
.request
.cas
);
2130 case PROTOCOL_BINARY_CMD_APPEND
:
2131 c
->cmd
= NREAD_APPEND
;
2133 case PROTOCOL_BINARY_CMD_PREPEND
:
2134 c
->cmd
= NREAD_PREPEND
;
2141 c
->ritem
= ITEM_data(it
);
2143 conn_set_state(c
, conn_nread
);
2144 c
->substate
= bin_read_set_value
;
2147 static void process_bin_flush(conn
*c
) {
2149 protocol_binary_request_flush
* req
= binary_get_request(c
);
2151 if (c
->binary_header
.request
.extlen
== sizeof(req
->message
.body
)) {
2152 exptime
= ntohl(req
->message
.body
.expiration
);
2156 settings
.oldest_live
= realtime(exptime
) - 1;
2158 settings
.oldest_live
= current_time
- 1;
2160 item_flush_expired();
2162 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2163 c
->thread
->stats
.flush_cmds
++;
2164 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2166 write_bin_response(c
, NULL
, 0, 0, 0);
2169 static void process_bin_delete(conn
*c
) {
2172 protocol_binary_request_delete
* req
= binary_get_request(c
);
2174 char* key
= binary_get_key(c
);
2175 size_t nkey
= c
->binary_header
.request
.keylen
;
2179 if (settings
.verbose
> 1) {
2180 fprintf(stderr
, "Deleting %s\n", key
);
2183 if (settings
.detail_enabled
) {
2184 stats_prefix_record_delete(key
, nkey
);
2187 it
= item_get(key
, nkey
);
2189 uint64_t cas
= ntohll(req
->message
.header
.request
.cas
);
2190 if (cas
== 0 || cas
== ITEM_get_cas(it
)) {
2191 MEMCACHED_COMMAND_DELETE(c
->sfd
, ITEM_key(it
), it
->nkey
);
2192 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2193 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].delete_hits
++;
2194 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2196 write_bin_response(c
, NULL
, 0, 0, 0);
2198 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
2200 item_remove(it
); /* release our reference */
2202 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
2203 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2204 c
->thread
->stats
.delete_misses
++;
2205 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2209 static void complete_nread_binary(conn
*c
) {
2211 assert(c
->cmd
>= 0);
2213 switch(c
->substate
) {
2214 case bin_reading_set_header
:
2215 if (c
->cmd
== PROTOCOL_BINARY_CMD_APPEND
||
2216 c
->cmd
== PROTOCOL_BINARY_CMD_PREPEND
) {
2217 process_bin_append_prepend(c
);
2219 process_bin_update(c
);
2222 case bin_read_set_value
:
2223 complete_update_bin(c
);
2225 case bin_reading_get_key
:
2228 case bin_reading_touch_key
:
2229 process_bin_touch(c
);
2231 case bin_reading_stat
:
2232 process_bin_stat(c
);
2234 case bin_reading_del_header
:
2235 process_bin_delete(c
);
2237 case bin_reading_incr_header
:
2238 complete_incr_bin(c
);
2240 case bin_read_flush_exptime
:
2241 process_bin_flush(c
);
2243 case bin_reading_sasl_auth
:
2244 process_bin_sasl_auth(c
);
2246 case bin_reading_sasl_auth_data
:
2247 process_bin_complete_sasl_auth(c
);
2249 case bin_reading_cas_header
:
2254 fprintf(stderr
, "Not handling substate %d\n", c
->substate
);
2259 static void reset_cmd_handler(conn
*c
) {
2261 c
->substate
= bin_no_state
;
2262 if(c
->item
!= NULL
) {
2263 item_remove(c
->item
);
2267 if (c
->rbytes
> 0) {
2268 conn_set_state(c
, conn_parse_cmd
);
2270 conn_set_state(c
, conn_waiting
);
2274 static void complete_nread(conn
*c
) {
2276 assert(c
->protocol
== ascii_prot
2277 || c
->protocol
== binary_prot
);
2279 if (c
->protocol
== ascii_prot
) {
2280 complete_nread_ascii(c
);
2281 } else if (c
->protocol
== binary_prot
) {
2282 complete_nread_binary(c
);
2287 * Stores an item in the cache according to the semantics of one of the set
2288 * commands. In threaded mode, this is protected by the cache lock.
2290 * Returns the state of storage.
2292 enum store_item_type
do_store_item(item
*it
, int comm
, conn
*c
, const uint32_t hv
) {
2293 char *key
= ITEM_key(it
);
2294 item
*old_it
= do_item_get(key
, it
->nkey
, hv
);
2295 enum store_item_type stored
= NOT_STORED
;
2297 item
*new_it
= NULL
;
2300 if (old_it
!= NULL
&& comm
== NREAD_ADD
) {
2301 /* add only adds a nonexistent item, but promote to head of LRU */
2302 do_item_update(old_it
);
2303 } else if (!old_it
&& (comm
== NREAD_REPLACE
2304 || comm
== NREAD_APPEND
|| comm
== NREAD_PREPEND
))
2306 /* replace only replaces an existing value; don't store */
2307 } else if (comm
== NREAD_CAS
) {
2308 /* validate cas operation */
2309 if(old_it
== NULL
) {
2312 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2313 c
->thread
->stats
.cas_misses
++;
2314 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2316 else if (ITEM_get_cas(it
) == ITEM_get_cas(old_it
)) {
2318 // it and old_it may belong to different classes.
2319 // I'm updating the stats for the one that's getting pushed out
2320 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2321 c
->thread
->stats
.slab_stats
[old_it
->slabs_clsid
].cas_hits
++;
2322 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2324 item_replace(old_it
, it
, hv
);
2327 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2328 c
->thread
->stats
.slab_stats
[old_it
->slabs_clsid
].cas_badval
++;
2329 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2331 if(settings
.verbose
> 1) {
2332 fprintf(stderr
, "CAS: failure: expected %llu, got %llu\n",
2333 (unsigned long long)ITEM_get_cas(old_it
),
2334 (unsigned long long)ITEM_get_cas(it
));
2340 * Append - combine new and old record into single one. Here it's
2341 * atomic and thread-safe.
2343 if (comm
== NREAD_APPEND
|| comm
== NREAD_PREPEND
) {
2347 if (ITEM_get_cas(it
) != 0) {
2348 // CAS much be equal
2349 if (ITEM_get_cas(it
) != ITEM_get_cas(old_it
)) {
2354 if (stored
== NOT_STORED
) {
2355 /* we have it and old_it here - alloc memory to hold both */
2356 /* flags was already lost - so recover them from ITEM_suffix(it) */
2358 flags
= (int) strtol(ITEM_suffix(old_it
), (char **) NULL
, 10);
2360 new_it
= item_alloc(key
, it
->nkey
, flags
, old_it
->exptime
, it
->nbytes
+ old_it
->nbytes
- 2 /* CRLF */);
2362 if (new_it
== NULL
) {
2363 /* SERVER_ERROR out of memory */
2365 do_item_remove(old_it
);
2370 /* copy data from it and old_it to new_it */
2372 if (comm
== NREAD_APPEND
) {
2373 memcpy(ITEM_data(new_it
), ITEM_data(old_it
), old_it
->nbytes
);
2374 memcpy(ITEM_data(new_it
) + old_it
->nbytes
- 2 /* CRLF */, ITEM_data(it
), it
->nbytes
);
2377 memcpy(ITEM_data(new_it
), ITEM_data(it
), it
->nbytes
);
2378 memcpy(ITEM_data(new_it
) + it
->nbytes
- 2 /* CRLF */, ITEM_data(old_it
), old_it
->nbytes
);
2385 if (stored
== NOT_STORED
) {
2387 item_replace(old_it
, it
, hv
);
2389 do_item_link(it
, hv
);
2391 c
->cas
= ITEM_get_cas(it
);
2398 do_item_remove(old_it
); /* release our reference */
2400 do_item_remove(new_it
);
2402 if (stored
== STORED
) {
2403 c
->cas
= ITEM_get_cas(it
);
2409 typedef struct token_s
{
2414 #define COMMAND_TOKEN 0
2415 #define SUBCOMMAND_TOKEN 1
2418 #define MAX_TOKENS 8
2421 * Tokenize the command string by replacing whitespace with '\0' and update
2422 * the token array tokens with pointer to start of each token and length.
2423 * Returns total number of tokens. The last valid token is the terminal
2424 * token (value points to the first unprocessed character of the string and
2429 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2430 * for(int ix = 0; tokens[ix].length != 0; ix++) {
2433 * ncommand = tokens[ix].value - command;
2434 * command = tokens[ix].value;
2437 static size_t tokenize_command(char *command
, token_t
*tokens
, const size_t max_tokens
) {
2440 size_t len
= strlen(command
);
2443 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
2446 for (i
= 0; i
< len
; i
++) {
2449 tokens
[ntokens
].value
= s
;
2450 tokens
[ntokens
].length
= e
- s
;
2453 if (ntokens
== max_tokens
- 1) {
2455 s
= e
; /* so we don't add an extra token */
2465 tokens
[ntokens
].value
= s
;
2466 tokens
[ntokens
].length
= e
- s
;
2471 * If we scanned the whole string, the terminal value pointer is null,
2472 * otherwise it is the first unprocessed character.
2474 tokens
[ntokens
].value
= *e
== '\0' ? NULL
: e
;
2475 tokens
[ntokens
].length
= 0;
2481 /* set up a connection to write a buffer then free it, used for stats */
2482 static void write_and_free(conn
*c
, char *buf
, int bytes
) {
2484 c
->write_and_free
= buf
;
2487 conn_set_state(c
, conn_write
);
2488 c
->write_and_go
= conn_new_cmd
;
2490 out_string(c
, "SERVER_ERROR out of memory writing stats");
2494 static inline bool set_noreply_maybe(conn
*c
, token_t
*tokens
, size_t ntokens
)
2496 int noreply_index
= ntokens
- 2;
2499 NOTE: this function is not the first place where we are going to
2500 send the reply. We could send it instead from process_command()
2501 if the request line has wrong number of tokens. However parsing
2502 malformed line for "noreply" option is not reliable anyway, so
2505 if (tokens
[noreply_index
].value
2506 && strcmp(tokens
[noreply_index
].value
, "noreply") == 0) {
2512 void append_stat(const char *name
, ADD_STAT add_stats
, conn
*c
,
2513 const char *fmt
, ...) {
2514 char val_str
[STAT_VAL_LEN
];
2524 vlen
= vsnprintf(val_str
, sizeof(val_str
) - 1, fmt
, ap
);
2527 add_stats(name
, strlen(name
), val_str
, vlen
, c
);
2530 inline static void process_stats_detail(conn
*c
, const char *command
) {
2533 if (strcmp(command
, "on") == 0) {
2534 settings
.detail_enabled
= 1;
2535 out_string(c
, "OK");
2537 else if (strcmp(command
, "off") == 0) {
2538 settings
.detail_enabled
= 0;
2539 out_string(c
, "OK");
2541 else if (strcmp(command
, "dump") == 0) {
2543 char *stats
= stats_prefix_dump(&len
);
2544 write_and_free(c
, stats
, len
);
2547 out_string(c
, "CLIENT_ERROR usage: stats detail on|off|dump");
2551 /* return server specific stats only */
2552 static void server_stats(ADD_STAT add_stats
, conn
*c
) {
2553 pid_t pid
= getpid();
2554 rel_time_t now
= current_time
;
2556 struct thread_stats thread_stats
;
2557 threadlocal_stats_aggregate(&thread_stats
);
2558 struct slab_stats slab_stats
;
2559 slab_stats_aggregate(&thread_stats
, &slab_stats
);
2562 struct rusage usage
;
2563 getrusage(RUSAGE_SELF
, &usage
);
2568 APPEND_STAT("pid", "%lu", (long)pid
);
2569 APPEND_STAT("uptime", "%u", now
);
2570 APPEND_STAT("time", "%ld", now
+ (long)process_started
);
2571 APPEND_STAT("version", "%s", RVERSION
);
2572 APPEND_STAT("libevent", "%s", event_get_version());
2573 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2576 append_stat("rusage_user", add_stats
, c
, "%ld.%06ld",
2577 (long)usage
.ru_utime
.tv_sec
,
2578 (long)usage
.ru_utime
.tv_usec
);
2579 append_stat("rusage_system", add_stats
, c
, "%ld.%06ld",
2580 (long)usage
.ru_stime
.tv_sec
,
2581 (long)usage
.ru_stime
.tv_usec
);
2584 APPEND_STAT("curr_connections", "%u", stats
.curr_conns
- 1);
2585 APPEND_STAT("total_connections", "%u", stats
.total_conns
);
2586 if (settings
.maxconns_fast
) {
2587 APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats
.rejected_conns
);
2589 APPEND_STAT("connection_structures", "%u", stats
.conn_structs
);
2590 APPEND_STAT("reserved_fds", "%u", stats
.reserved_fds
);
2591 APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats
.get_cmds
);
2592 APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats
.set_cmds
);
2593 APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats
.flush_cmds
);
2594 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats
.touch_cmds
);
2595 APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats
.get_hits
);
2596 APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats
.get_misses
);
2597 APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats
.delete_misses
);
2598 APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats
.delete_hits
);
2599 APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats
.incr_misses
);
2600 APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats
.incr_hits
);
2601 APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats
.decr_misses
);
2602 APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats
.decr_hits
);
2603 APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats
.cas_misses
);
2604 APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats
.cas_hits
);
2605 APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats
.cas_badval
);
2606 APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats
.touch_hits
);
2607 APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats
.touch_misses
);
2608 APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats
.auth_cmds
);
2609 APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats
.auth_errors
);
2610 APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats
.bytes_read
);
2611 APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats
.bytes_written
);
2612 APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings
.maxbytes
);
2613 APPEND_STAT("accepting_conns", "%u", stats
.accepting_conns
);
2614 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats
.listen_disabled_num
);
2615 APPEND_STAT("threads", "%d", settings
.num_threads
);
2616 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats
.conn_yields
);
2617 APPEND_STAT("hash_power_level", "%u", stats
.hash_power_level
);
2618 APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats
.hash_bytes
);
2619 APPEND_STAT("hash_is_expanding", "%u", stats
.hash_is_expanding
);
2620 APPEND_STAT("expired_unfetched", "%llu", stats
.expired_unfetched
);
2621 APPEND_STAT("evicted_unfetched", "%llu", stats
.evicted_unfetched
);
2622 if (settings
.slab_reassign
) {
2623 APPEND_STAT("slab_reassign_running", "%u", stats
.slab_reassign_running
);
2624 APPEND_STAT("slabs_moved", "%llu", stats
.slabs_moved
);
2629 static void process_stat_settings(ADD_STAT add_stats
, void *c
) {
2631 APPEND_STAT("maxbytes", "%u", (unsigned int)settings
.maxbytes
);
2632 APPEND_STAT("maxconns", "%d", settings
.maxconns
);
2633 APPEND_STAT("tcpport", "%d", settings
.port
);
2634 APPEND_STAT("udpport", "%d", settings
.udpport
);
2635 APPEND_STAT("inter", "%s", settings
.inter
? settings
.inter
: "NULL");
2636 APPEND_STAT("verbosity", "%d", settings
.verbose
);
2637 APPEND_STAT("oldest", "%lu", (unsigned long)settings
.oldest_live
);
2638 APPEND_STAT("evictions", "%s", settings
.evict_to_free
? "on" : "off");
2639 APPEND_STAT("domain_socket", "%s",
2640 settings
.socketpath
? settings
.socketpath
: "NULL");
2641 APPEND_STAT("umask", "%o", settings
.access
);
2642 APPEND_STAT("growth_factor", "%.2f", settings
.factor
);
2643 APPEND_STAT("chunk_size", "%d", settings
.chunk_size
);
2644 APPEND_STAT("num_threads", "%d", settings
.num_threads
);
2645 APPEND_STAT("num_threads_per_udp", "%d", settings
.num_threads_per_udp
);
2646 APPEND_STAT("stat_key_prefix", "%c", settings
.prefix_delimiter
);
2647 APPEND_STAT("detail_enabled", "%s",
2648 settings
.detail_enabled
? "yes" : "no");
2649 APPEND_STAT("reqs_per_event", "%d", settings
.reqs_per_event
);
2650 APPEND_STAT("cas_enabled", "%s", settings
.use_cas
? "yes" : "no");
2651 APPEND_STAT("tcp_backlog", "%d", settings
.backlog
);
2652 APPEND_STAT("binding_protocol", "%s",
2653 prot_text(settings
.binding_protocol
));
2654 APPEND_STAT("auth_enabled_sasl", "%s", settings
.sasl
? "yes" : "no");
2655 APPEND_STAT("item_size_max", "%d", settings
.item_size_max
);
2656 APPEND_STAT("maxconns_fast", "%s", settings
.maxconns_fast
? "yes" : "no");
2657 APPEND_STAT("hashpower_init", "%d", settings
.hashpower_init
);
2658 APPEND_STAT("slab_reassign", "%s", settings
.slab_reassign
? "yes" : "no");
2659 APPEND_STAT("slab_automove", "%s", settings
.slab_automove
? "yes" : "no");
2662 static void process_stat(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
2663 const char *subcommand
= tokens
[SUBCOMMAND_TOKEN
].value
;
2667 out_string(c
, "CLIENT_ERROR bad command line");
2672 server_stats(&append_stats
, c
);
2673 (void)get_stats(NULL
, 0, &append_stats
, c
);
2674 } else if (strcmp(subcommand
, "reset") == 0) {
2676 out_string(c
, "RESET");
2678 } else if (strcmp(subcommand
, "detail") == 0) {
2679 /* NOTE: how to tackle detail with binary? */
2681 process_stats_detail(c
, ""); /* outputs the error message */
2683 process_stats_detail(c
, tokens
[2].value
);
2684 /* Output already generated */
2686 } else if (strcmp(subcommand
, "settings") == 0) {
2687 process_stat_settings(&append_stats
, c
);
2688 } else if (strcmp(subcommand
, "cachedump") == 0) {
2690 unsigned int bytes
, id
, limit
= 0;
2693 out_string(c
, "CLIENT_ERROR bad command line");
2697 if (!safe_strtoul(tokens
[2].value
, &id
) ||
2698 !safe_strtoul(tokens
[3].value
, &limit
)) {
2699 out_string(c
, "CLIENT_ERROR bad command line format");
2703 if (id
>= POWER_LARGEST
) {
2704 out_string(c
, "CLIENT_ERROR Illegal slab id");
2708 buf
= item_cachedump(id
, limit
, &bytes
);
2709 write_and_free(c
, buf
, bytes
);
2712 /* getting here means that the subcommand is either engine specific or
2713 is invalid. query the engine and see. */
2714 if (get_stats(subcommand
, strlen(subcommand
), &append_stats
, c
)) {
2715 if (c
->stats
.buffer
== NULL
) {
2716 out_string(c
, "SERVER_ERROR out of memory writing stats");
2718 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
2719 c
->stats
.buffer
= NULL
;
2722 out_string(c
, "ERROR");
2727 /* append terminator and start the transfer */
2728 append_stats(NULL
, 0, NULL
, 0, c
);
2730 if (c
->stats
.buffer
== NULL
) {
2731 out_string(c
, "SERVER_ERROR out of memory writing stats");
2733 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
2734 c
->stats
.buffer
= NULL
;
2738 #ifndef __INTEL_COMPILER
2739 #pragma GCC diagnostic ignored "-Wunused-but-set-parameter"
2741 /* ntokens is overwritten here... shrug.. */
2742 static inline void process_get_command(conn
*c
, token_t
*tokens
, size_t ntokens
, bool return_cas
) {
2747 token_t
*key_token
= &tokens
[KEY_TOKEN
];
2752 while(key_token
->length
!= 0) {
2754 key
= key_token
->value
;
2755 nkey
= key_token
->length
;
2757 if(nkey
> KEY_MAX_LENGTH
) {
2758 out_string(c
, "CLIENT_ERROR bad command line format");
2762 it
= item_get(key
, nkey
);
2763 if (settings
.detail_enabled
) {
2764 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
2767 if (i
>= c
->isize
) {
2768 item
**new_list
= realloc(c
->ilist
, sizeof(item
*) * c
->isize
* 2);
2771 c
->ilist
= new_list
;
2779 * Construct the response. Each hit adds three elements to the
2780 * outgoing data list:
2783 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
2788 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
2789 it
->nbytes
, ITEM_get_cas(it
));
2790 /* Goofy mid-flight realloc. */
2791 if (i
>= c
->suffixsize
) {
2792 char **new_suffix_list
= realloc(c
->suffixlist
,
2793 sizeof(char *) * c
->suffixsize
* 2);
2794 if (new_suffix_list
) {
2796 c
->suffixlist
= new_suffix_list
;
2803 suffix
= cache_alloc(c
->thread
->suffix_cache
);
2804 if (suffix
== NULL
) {
2805 out_string(c
, "SERVER_ERROR out of memory making CAS suffix");
2809 *(c
->suffixlist
+ i
) = suffix
;
2810 int suffix_len
= snprintf(suffix
, SUFFIX_SIZE
,
2812 (unsigned long long)ITEM_get_cas(it
));
2813 if (add_iov(c
, "VALUE ", 6) != 0 ||
2814 add_iov(c
, ITEM_key(it
), it
->nkey
) != 0 ||
2815 add_iov(c
, ITEM_suffix(it
), it
->nsuffix
- 2) != 0 ||
2816 add_iov(c
, suffix
, suffix_len
) != 0 ||
2817 add_iov(c
, ITEM_data(it
), it
->nbytes
) != 0)
2825 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
2826 it
->nbytes
, ITEM_get_cas(it
));
2827 if (add_iov(c
, "VALUE ", 6) != 0 ||
2828 add_iov(c
, ITEM_key(it
), it
->nkey
) != 0 ||
2829 add_iov(c
, ITEM_suffix(it
), it
->nsuffix
+ it
->nbytes
) != 0)
2837 if (settings
.verbose
> 1)
2838 fprintf(stderr
, ">%d sending key %s\n", c
->sfd
, ITEM_key(it
));
2840 /* item_get() has incremented it->refcount for us */
2841 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2842 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].get_hits
++;
2843 c
->thread
->stats
.get_cmds
++;
2844 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2846 *(c
->ilist
+ i
) = it
;
2850 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2851 c
->thread
->stats
.get_misses
++;
2852 c
->thread
->stats
.get_cmds
++;
2853 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2854 MEMCACHED_COMMAND_GET(c
->sfd
, key
, nkey
, -1, 0);
2861 * If the command string hasn't been fully processed, get the next set
2864 if(key_token
->value
!= NULL
) {
2865 ntokens
= tokenize_command(key_token
->value
, tokens
, MAX_TOKENS
);
2869 } while(key_token
->value
!= NULL
);
2871 c
->icurr
= c
->ilist
;
2874 c
->suffixcurr
= c
->suffixlist
;
2878 if (settings
.verbose
> 1)
2879 fprintf(stderr
, ">%d END\n", c
->sfd
);
2882 If the loop was terminated because of out-of-memory, it is not
2883 reliable to add END\r\n to the buffer, because it might not end
2884 in \r\n. So we send SERVER_ERROR instead.
2886 if (key_token
->value
!= NULL
|| add_iov(c
, "END\r\n", 5) != 0
2887 || (IS_UDP(c
->transport
) && build_udp_headers(c
) != 0)) {
2888 out_string(c
, "SERVER_ERROR out of memory writing get response");
2891 conn_set_state(c
, conn_mwrite
);
2898 static void process_update_command(conn
*c
, token_t
*tokens
, const size_t ntokens
, int comm
, bool handle_cas
) {
2902 int32_t exptime_int
= 0;
2905 uint64_t req_cas_id
=0;
2910 set_noreply_maybe(c
, tokens
, ntokens
);
2912 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
2913 out_string(c
, "CLIENT_ERROR bad command line format");
2917 key
= tokens
[KEY_TOKEN
].value
;
2918 nkey
= tokens
[KEY_TOKEN
].length
;
2920 if (! (safe_strtoul(tokens
[2].value
, (uint32_t *)&flags
)
2921 && safe_strtol(tokens
[3].value
, &exptime_int
)
2922 && safe_strtol(tokens
[4].value
, (int32_t *)&vlen
))) {
2923 out_string(c
, "CLIENT_ERROR bad command line format");
2927 /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2928 exptime
= exptime_int
;
2930 /* Negative exptimes can underflow and end up immortal. realtime() will
2931 immediately expire values that are greater than REALTIME_MAXDELTA, but less
2932 than process_started, so lets aim for that. */
2934 exptime
= REALTIME_MAXDELTA
+ 1;
2936 // does cas value exist?
2938 if (!safe_strtoull(tokens
[5].value
, &req_cas_id
)) {
2939 out_string(c
, "CLIENT_ERROR bad command line format");
2945 if (vlen
< 0 || vlen
- 2 < 0) {
2946 out_string(c
, "CLIENT_ERROR bad command line format");
2950 if (settings
.detail_enabled
) {
2951 stats_prefix_record_set(key
, nkey
);
2954 it
= item_alloc(key
, nkey
, flags
, realtime(exptime
), vlen
);
2957 if (! item_size_ok(nkey
, flags
, vlen
))
2958 out_string(c
, "SERVER_ERROR object too large for cache");
2960 out_string(c
, "SERVER_ERROR out of memory storing object");
2961 /* swallow the data line */
2962 c
->write_and_go
= conn_swallow
;
2965 /* Avoid stale data persisting in cache because we failed alloc.
2966 * Unacceptable for SET. Anywhere else too? */
2967 if (comm
== NREAD_SET
) {
2968 it
= item_get(key
, nkey
);
2977 ITEM_set_cas(it
, req_cas_id
);
2980 c
->ritem
= ITEM_data(it
);
2981 c
->rlbytes
= it
->nbytes
;
2983 conn_set_state(c
, conn_nread
);
2986 static void process_touch_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
2989 int32_t exptime_int
= 0;
2994 set_noreply_maybe(c
, tokens
, ntokens
);
2996 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
2997 out_string(c
, "CLIENT_ERROR bad command line format");
3001 key
= tokens
[KEY_TOKEN
].value
;
3002 nkey
= tokens
[KEY_TOKEN
].length
;
3004 if (!safe_strtol(tokens
[2].value
, &exptime_int
)) {
3005 out_string(c
, "CLIENT_ERROR invalid exptime argument");
3009 it
= item_touch(key
, nkey
, realtime(exptime_int
));
3012 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3013 c
->thread
->stats
.touch_cmds
++;
3014 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].touch_hits
++;
3015 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3017 out_string(c
, "TOUCHED");
3020 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3021 c
->thread
->stats
.touch_cmds
++;
3022 c
->thread
->stats
.touch_misses
++;
3023 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3025 out_string(c
, "NOT_FOUND");
3029 static void process_arithmetic_command(conn
*c
, token_t
*tokens
, const size_t ntokens
, const bool incr
) {
3030 char temp
[INCR_MAX_STORAGE_LEN
];
3037 set_noreply_maybe(c
, tokens
, ntokens
);
3039 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
3040 out_string(c
, "CLIENT_ERROR bad command line format");
3044 key
= tokens
[KEY_TOKEN
].value
;
3045 nkey
= tokens
[KEY_TOKEN
].length
;
3047 if (!safe_strtoull(tokens
[2].value
, &delta
)) {
3048 out_string(c
, "CLIENT_ERROR invalid numeric delta argument");
3052 switch(add_delta(c
, key
, nkey
, incr
, delta
, temp
, NULL
)) {
3054 out_string(c
, temp
);
3057 out_string(c
, "CLIENT_ERROR cannot increment or decrement non-numeric value");
3060 out_string(c
, "SERVER_ERROR out of memory");
3062 case DELTA_ITEM_NOT_FOUND
:
3063 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3065 c
->thread
->stats
.incr_misses
++;
3067 c
->thread
->stats
.decr_misses
++;
3069 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3071 out_string(c
, "NOT_FOUND");
3073 case DELTA_ITEM_CAS_MISMATCH
:
3074 break; /* Should never get here */
3082 * adds a delta value to a numeric item.
3084 * c connection requesting the operation
3086 * incr true to increment value, false to decrement
3087 * delta amount to adjust value by
3088 * buf buffer for response string
3090 * returns a response string to send back to the client.
3092 enum delta_result_type
do_add_delta(conn
*c
, const char *key
, const size_t nkey
,
3093 const bool incr
, const int64_t delta
,
3094 char *buf
, uint64_t *cas
,
3095 const uint32_t hv
) {
3101 it
= do_item_get(key
, nkey
, hv
);
3103 return DELTA_ITEM_NOT_FOUND
;
3106 if (cas
!= NULL
&& *cas
!= 0 && ITEM_get_cas(it
) != *cas
) {
3108 return DELTA_ITEM_CAS_MISMATCH
;
3111 ptr
= ITEM_data(it
);
3113 if (!safe_strtoull(ptr
, &value
)) {
3120 MEMCACHED_COMMAND_INCR(c
->sfd
, ITEM_key(it
), it
->nkey
, value
);
3127 MEMCACHED_COMMAND_DECR(c
->sfd
, ITEM_key(it
), it
->nkey
, value
);
3130 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3132 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].incr_hits
++;
3134 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].decr_hits
++;
3136 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3138 snprintf(buf
, INCR_MAX_STORAGE_LEN
, "%llu", (unsigned long long)value
);
3140 if (res
+ 2 > it
->nbytes
|| it
->refcount
!= 1) { /* need to realloc */
3142 new_it
= item_alloc(ITEM_key(it
), it
->nkey
, atoi(ITEM_suffix(it
) + 1), it
->exptime
, res
+ 2 );
3147 memcpy(ITEM_data(new_it
), buf
, res
);
3148 memcpy(ITEM_data(new_it
) + res
, "\r\n", 2);
3149 item_replace(it
, new_it
, hv
);
3150 // Overwrite the older item's CAS with our new CAS since we're
3151 // returning the CAS of the old item below.
3152 ITEM_set_cas(it
, (settings
.use_cas
) ? ITEM_get_cas(new_it
) : 0);
3153 do_item_remove(new_it
); /* release our reference */
3154 } else { /* replace in-place */
3155 /* When changing the value without replacing the item, we
3156 need to update the CAS on the existing item. */
3157 mutex_lock(&cache_lock
); /* FIXME */
3158 ITEM_set_cas(it
, (settings
.use_cas
) ? get_cas_id() : 0);
3159 pthread_mutex_unlock(&cache_lock
);
3161 memcpy(ITEM_data(it
), buf
, res
);
3162 memset(ITEM_data(it
) + res
, ' ', it
->nbytes
- res
- 2);
3167 *cas
= ITEM_get_cas(it
); /* swap the incoming CAS value */
3169 do_item_remove(it
); /* release our reference */
3173 static void process_delete_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3181 bool hold_is_zero
= strcmp(tokens
[KEY_TOKEN
+1].value
, "0") == 0;
3182 bool sets_noreply
= set_noreply_maybe(c
, tokens
, ntokens
);
3183 bool valid
= (ntokens
== 4 && (hold_is_zero
|| sets_noreply
))
3184 || (ntokens
== 5 && hold_is_zero
&& sets_noreply
);
3186 out_string(c
, "CLIENT_ERROR bad command line format. "
3187 "Usage: delete <key> [noreply]");
3193 key
= tokens
[KEY_TOKEN
].value
;
3194 nkey
= tokens
[KEY_TOKEN
].length
;
3196 if(nkey
> KEY_MAX_LENGTH
) {
3197 out_string(c
, "CLIENT_ERROR bad command line format");
3201 if (settings
.detail_enabled
) {
3202 stats_prefix_record_delete(key
, nkey
);
3205 it
= item_get(key
, nkey
);
3207 MEMCACHED_COMMAND_DELETE(c
->sfd
, ITEM_key(it
), it
->nkey
);
3209 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3210 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].delete_hits
++;
3211 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3214 item_remove(it
); /* release our reference */
3215 out_string(c
, "DELETED");
3217 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3218 c
->thread
->stats
.delete_misses
++;
3219 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3221 out_string(c
, "NOT_FOUND");
3225 static void process_verbosity_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3230 set_noreply_maybe(c
, tokens
, ntokens
);
3232 level
= strtoul(tokens
[1].value
, NULL
, 10);
3233 settings
.verbose
= level
> MAX_VERBOSITY_LEVEL
? MAX_VERBOSITY_LEVEL
: level
;
3234 out_string(c
, "OK");
3238 static void process_slabs_automove_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3243 set_noreply_maybe(c
, tokens
, ntokens
);
3245 level
= strtoul(tokens
[2].value
, NULL
, 10);
3247 settings
.slab_automove
= false;
3248 } else if (level
== 1) {
3249 settings
.slab_automove
= true;
3251 out_string(c
, "ERROR");
3254 out_string(c
, "OK");
3258 static void process_command(conn
*c
, char *command
) {
3260 token_t tokens
[MAX_TOKENS
];
3266 MEMCACHED_PROCESS_COMMAND_START(c
->sfd
, c
->rcurr
, c
->rbytes
);
3268 if (settings
.verbose
> 1)
3269 fprintf(stderr
, "<%d %s\n", c
->sfd
, command
);
3272 * for commands set/add/replace, we build an item and read the data
3273 * directly into it, then continue in nread_complete().
3279 if (add_msghdr(c
) != 0) {
3280 out_string(c
, "SERVER_ERROR out of memory preparing response");
3284 ntokens
= tokenize_command(command
, tokens
, MAX_TOKENS
);
3286 ((strcmp(tokens
[COMMAND_TOKEN
].value
, "get") == 0) ||
3287 (strcmp(tokens
[COMMAND_TOKEN
].value
, "bget") == 0))) {
3289 process_get_command(c
, tokens
, ntokens
, false);
3291 } else if ((ntokens
== 6 || ntokens
== 7) &&
3292 ((strcmp(tokens
[COMMAND_TOKEN
].value
, "add") == 0 && (comm
= NREAD_ADD
)) ||
3293 (strcmp(tokens
[COMMAND_TOKEN
].value
, "set") == 0 && (comm
= NREAD_SET
)) ||
3294 (strcmp(tokens
[COMMAND_TOKEN
].value
, "replace") == 0 && (comm
= NREAD_REPLACE
)) ||
3295 (strcmp(tokens
[COMMAND_TOKEN
].value
, "prepend") == 0 && (comm
= NREAD_PREPEND
)) ||
3296 (strcmp(tokens
[COMMAND_TOKEN
].value
, "append") == 0 && (comm
= NREAD_APPEND
)) )) {
3298 process_update_command(c
, tokens
, ntokens
, comm
, false);
3300 } else if ((ntokens
== 7 || ntokens
== 8) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "cas") == 0 && (comm
= NREAD_CAS
))) {
3302 process_update_command(c
, tokens
, ntokens
, comm
, true);
3304 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "incr") == 0)) {
3306 process_arithmetic_command(c
, tokens
, ntokens
, 1);
3308 } else if (ntokens
>= 3 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "gets") == 0)) {
3310 process_get_command(c
, tokens
, ntokens
, true);
3312 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "decr") == 0)) {
3314 process_arithmetic_command(c
, tokens
, ntokens
, 0);
3316 } else if (ntokens
>= 3 && ntokens
<= 5 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "delete") == 0)) {
3318 process_delete_command(c
, tokens
, ntokens
);
3320 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "touch") == 0)) {
3322 process_touch_command(c
, tokens
, ntokens
);
3324 } else if (ntokens
>= 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "stats") == 0)) {
3326 process_stat(c
, tokens
, ntokens
);
3328 } else if (ntokens
>= 2 && ntokens
<= 4 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "flush_all") == 0)) {
3331 set_noreply_maybe(c
, tokens
, ntokens
);
3333 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3334 c
->thread
->stats
.flush_cmds
++;
3335 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3337 if(ntokens
== (c
->noreply
? 3 : 2)) {
3338 settings
.oldest_live
= current_time
- 1;
3339 item_flush_expired();
3340 out_string(c
, "OK");
3344 exptime
= strtol(tokens
[1].value
, NULL
, 10);
3345 if(errno
== ERANGE
) {
3346 out_string(c
, "CLIENT_ERROR bad command line format");
3351 If exptime is zero realtime() would return zero too, and
3352 realtime(exptime) - 1 would overflow to the max unsigned
3353 value. So we process exptime == 0 the same way we do when
3354 no delay is given at all.
3357 settings
.oldest_live
= realtime(exptime
) - 1;
3358 else /* exptime == 0 */
3359 settings
.oldest_live
= current_time
- 1;
3360 item_flush_expired();
3361 out_string(c
, "OK");
3364 } else if (ntokens
== 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "version") == 0)) {
3366 out_string(c
, "VERSION " RVERSION
);
3368 } else if (ntokens
== 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "quit") == 0)) {
3370 conn_set_state(c
, conn_closing
);
3372 } else if (ntokens
> 1 && strcmp(tokens
[COMMAND_TOKEN
].value
, "slabs") == 0) {
3373 if (ntokens
== 5 && strcmp(tokens
[COMMAND_TOKEN
+ 1].value
, "reassign") == 0) {
3376 if (settings
.slab_reassign
== false) {
3377 out_string(c
, "CLIENT_ERROR slab reassignment disabled");
3381 src
= strtol(tokens
[2].value
, NULL
, 10);
3382 dst
= strtol(tokens
[3].value
, NULL
, 10);
3384 if (errno
== ERANGE
) {
3385 out_string(c
, "CLIENT_ERROR bad command line format");
3389 rv
= slabs_reassign(src
, dst
);
3392 out_string(c
, "OK");
3394 case REASSIGN_RUNNING
:
3395 out_string(c
, "BUSY currently processing reassign request");
3397 case REASSIGN_BADCLASS
:
3398 out_string(c
, "BADCLASS invalid src or dst class id");
3400 case REASSIGN_NOSPARE
:
3401 out_string(c
, "NOSPARE source class has no spare pages");
3403 case REASSIGN_DEST_NOT_FULL
:
3404 out_string(c
, "NOTFULL dest class has spare memory");
3406 case REASSIGN_SRC_NOT_SAFE
:
3407 out_string(c
, "UNSAFE src class is in an unsafe state");
3409 case REASSIGN_SRC_DST_SAME
:
3410 out_string(c
, "SAME src and dst class are identical");
3417 } else if (ntokens
== 4 &&
3418 (strcmp(tokens
[COMMAND_TOKEN
+ 1].value
, "automove") == 0)) {
3419 process_slabs_automove_command(c
, tokens
, ntokens
);
3421 out_string(c
, "ERROR");
3423 } else if ((ntokens
== 3 || ntokens
== 4) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "verbosity") == 0)) {
3424 process_verbosity_command(c
, tokens
, ntokens
);
3426 out_string(c
, "ERROR");
3432 * if we have a complete line in the buffer, process it.
3434 static int try_read_command(conn
*c
) {
3436 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
3437 assert(c
->rbytes
> 0);
3439 if (c
->protocol
== negotiating_prot
|| c
->transport
== udp_transport
) {
3440 if ((unsigned char)c
->rbuf
[0] == (unsigned char)PROTOCOL_BINARY_REQ
) {
3441 c
->protocol
= binary_prot
;
3443 c
->protocol
= ascii_prot
;
3446 if (settings
.verbose
> 1) {
3447 fprintf(stderr
, "%d: Client using the %s protocol\n", c
->sfd
,
3448 prot_text(c
->protocol
));
3452 if (c
->protocol
== binary_prot
) {
3453 /* Do we have the complete packet header? */
3454 if (c
->rbytes
< sizeof(c
->binary_header
)) {
3455 /* need more data! */
3459 if (((long)(c
->rcurr
)) % 8 != 0) {
3460 /* must realign input buffer */
3461 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
3463 if (settings
.verbose
> 1) {
3464 fprintf(stderr
, "%d: Realign input buffer\n", c
->sfd
);
3468 protocol_binary_request_header
* req
;
3469 req
= (protocol_binary_request_header
*)c
->rcurr
;
3471 if (settings
.verbose
> 1) {
3472 /* Dump the packet before we convert it to host order */
3474 fprintf(stderr
, "<%d Read binary protocol data:", c
->sfd
);
3475 for (ii
= 0; ii
< sizeof(req
->bytes
); ++ii
) {
3477 fprintf(stderr
, "\n<%d ", c
->sfd
);
3479 fprintf(stderr
, " 0x%02x", req
->bytes
[ii
]);
3481 fprintf(stderr
, "\n");
3484 c
->binary_header
= *req
;
3485 c
->binary_header
.request
.keylen
= ntohs(req
->request
.keylen
);
3486 c
->binary_header
.request
.bodylen
= ntohl(req
->request
.bodylen
);
3487 c
->binary_header
.request
.cas
= ntohll(req
->request
.cas
);
3489 if (c
->binary_header
.request
.magic
!= PROTOCOL_BINARY_REQ
) {
3490 if (settings
.verbose
) {
3491 fprintf(stderr
, "Invalid magic: %x\n",
3492 c
->binary_header
.request
.magic
);
3494 conn_set_state(c
, conn_closing
);
3501 if (add_msghdr(c
) != 0) {
3502 out_string(c
, "SERVER_ERROR out of memory");
3506 c
->cmd
= c
->binary_header
.request
.opcode
;
3507 c
->keylen
= c
->binary_header
.request
.keylen
;
3508 c
->opaque
= c
->binary_header
.request
.opaque
;
3509 /* clear the returned cas value */
3512 dispatch_bin_command(c
);
3514 c
->rbytes
-= sizeof(c
->binary_header
);
3515 c
->rcurr
+= sizeof(c
->binary_header
);
3523 el
= memchr(c
->rcurr
, '\n', c
->rbytes
);
3525 if (c
->rbytes
> 1024) {
3527 * We didn't have a '\n' in the first k. This _has_ to be a
3528 * large multiget, if not we should just nuke the connection.
3530 char *ptr
= c
->rcurr
;
3531 while (*ptr
== ' ') { /* ignore leading whitespaces */
3535 if (ptr
- c
->rcurr
> 100 ||
3536 (strncmp(ptr
, "get ", 4) && strncmp(ptr
, "gets ", 5))) {
3538 conn_set_state(c
, conn_closing
);
3546 if ((el
- c
->rcurr
) > 1 && *(el
- 1) == '\r') {
3551 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
3553 process_command(c
, c
->rcurr
);
3555 c
->rbytes
-= (cont
- c
->rcurr
);
3558 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
3565 * read a UDP request.
3567 static enum try_read_result
try_read_udp(conn
*c
) {
3572 c
->request_addr_size
= sizeof(c
->request_addr
);
3573 res
= recvfrom(c
->sfd
, c
->rbuf
, c
->rsize
,
3574 0, &c
->request_addr
, &c
->request_addr_size
);
3576 unsigned char *buf
= (unsigned char *)c
->rbuf
;
3577 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3578 c
->thread
->stats
.bytes_read
+= res
;
3579 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3581 /* Beginning of UDP packet is the request ID; save it. */
3582 c
->request_id
= buf
[0] * 256 + buf
[1];
3584 /* If this is a multi-packet request, drop it. */
3585 if (buf
[4] != 0 || buf
[5] != 1) {
3586 out_string(c
, "SERVER_ERROR multi-packet request not supported");
3587 return READ_NO_DATA_RECEIVED
;
3590 /* Don't care about any of the rest of the header. */
3592 memmove(c
->rbuf
, c
->rbuf
+ 8, res
);
3596 return READ_DATA_RECEIVED
;
3598 return READ_NO_DATA_RECEIVED
;
3602 * read from network as much as we can, handle buffer overflow and connection
3604 * before reading, move the remaining incomplete fragment of a command
3605 * (if any) to the beginning of the buffer.
3607 * To protect us from someone flooding a connection with bogus data causing
3608 * the connection to eat up all available memory, break out and start looking
3609 * at the data I've got after a number of reallocs...
3611 * @return enum try_read_result
3613 static enum try_read_result
try_read_network(conn
*c
) {
3614 enum try_read_result gotdata
= READ_NO_DATA_RECEIVED
;
3619 if (c
->rcurr
!= c
->rbuf
) {
3620 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
3621 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
3626 if (c
->rbytes
>= c
->rsize
) {
3627 if (num_allocs
== 4) {
3631 char *new_rbuf
= realloc(c
->rbuf
, c
->rsize
* 2);
3633 if (settings
.verbose
> 0)
3634 fprintf(stderr
, "Couldn't realloc input buffer\n");
3635 c
->rbytes
= 0; /* ignore what we read */
3636 out_string(c
, "SERVER_ERROR out of memory reading request");
3637 c
->write_and_go
= conn_closing
;
3638 return READ_MEMORY_ERROR
;
3640 c
->rcurr
= c
->rbuf
= new_rbuf
;
3644 int avail
= c
->rsize
- c
->rbytes
;
3645 res
= read(c
->sfd
, c
->rbuf
+ c
->rbytes
, avail
);
3647 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3648 c
->thread
->stats
.bytes_read
+= res
;
3649 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3650 gotdata
= READ_DATA_RECEIVED
;
3662 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
3671 static bool update_event(conn
*c
, const int new_flags
) {
3674 struct event_base
*base
= c
->event
.ev_base
;
3675 if (c
->ev_flags
== new_flags
)
3677 if (event_del(&c
->event
) == -1) return false;
3678 event_set(&c
->event
, c
->sfd
, new_flags
, event_handler
, (void *)c
);
3679 event_base_set(base
, &c
->event
);
3680 c
->ev_flags
= new_flags
;
3681 if (event_add(&c
->event
, 0) == -1) return false;
3686 * Sets whether we are listening for new connections or not.
3688 void do_accept_new_conns(const bool do_accept
) {
3691 for (next
= listen_conn
; next
; next
= next
->next
) {
3693 update_event(next
, EV_READ
| EV_PERSIST
);
3694 if (listen(next
->sfd
, settings
.backlog
) != 0) {
3699 update_event(next
, 0);
3700 if (listen(next
->sfd
, 0) != 0) {
3708 stats
.accepting_conns
= true;
3712 stats
.accepting_conns
= false;
3713 stats
.listen_disabled_num
++;
3715 allow_new_conns
= false;
3716 maxconns_handler(-42, 0, 0);
3721 * Transmit the next chunk of data from our list of msgbuf structures.
3724 * TRANSMIT_COMPLETE All done writing.
3725 * TRANSMIT_INCOMPLETE More data remaining to write.
3726 * TRANSMIT_SOFT_ERROR Can't write any more right now.
3727 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3729 static enum transmit_result
transmit(conn
*c
) {
3732 if (c
->msgcurr
< c
->msgused
&&
3733 c
->msglist
[c
->msgcurr
].msg_iovlen
== 0) {
3734 /* Finished writing the current msg; advance to the next. */
3737 if (c
->msgcurr
< c
->msgused
) {
3739 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
3741 res
= sendmsg(c
->sfd
, m
, 0);
3743 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3744 c
->thread
->stats
.bytes_written
+= res
;
3745 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3747 /* We've written some of the data. Remove the completed
3748 iovec entries from the list of pending writes. */
3749 while (m
->msg_iovlen
> 0 && res
>= m
->msg_iov
->iov_len
) {
3750 res
-= m
->msg_iov
->iov_len
;
3755 /* Might have written just part of the last iovec entry;
3756 adjust it so the next write will do the rest. */
3758 m
->msg_iov
->iov_base
= (caddr_t
)m
->msg_iov
->iov_base
+ res
;
3759 m
->msg_iov
->iov_len
-= res
;
3761 return TRANSMIT_INCOMPLETE
;
3763 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3764 if (!update_event(c
, EV_WRITE
| EV_PERSIST
)) {
3765 if (settings
.verbose
> 0)
3766 fprintf(stderr
, "Couldn't update event\n");
3767 conn_set_state(c
, conn_closing
);
3768 return TRANSMIT_HARD_ERROR
;
3770 return TRANSMIT_SOFT_ERROR
;
3772 /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3773 we have a real error, on which we close the connection */
3774 if (settings
.verbose
> 0)
3775 perror("Failed to write, and not due to blocking");
3777 if (IS_UDP(c
->transport
))
3778 conn_set_state(c
, conn_read
);
3780 conn_set_state(c
, conn_closing
);
3781 return TRANSMIT_HARD_ERROR
;
3783 return TRANSMIT_COMPLETE
;
3787 static void drive_machine(conn
*c
) {
3791 struct sockaddr_storage addr
;
3792 int nreqs
= settings
.reqs_per_event
;
3801 case conn_listening
:
3802 addrlen
= sizeof(addr
);
3803 if ((sfd
= accept(c
->sfd
, (struct sockaddr
*)&addr
, &addrlen
)) == -1) {
3804 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
3805 /* these are transient, so don't log anything */
3807 } else if (errno
== EMFILE
) {
3808 if (settings
.verbose
> 0)
3809 fprintf(stderr
, "Too many open connections\n");
3810 accept_new_conns(false);
3818 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
3819 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
3820 perror("setting O_NONBLOCK");
3825 if (settings
.maxconns_fast
&&
3826 stats
.curr_conns
+ stats
.reserved_fds
>= settings
.maxconns
- 1) {
3827 str
= "ERROR Too many open connections\r\n";
3828 res
= write(sfd
, str
, strlen(str
));
3831 stats
.rejected_conns
++;
3834 dispatch_conn_new(sfd
, conn_new_cmd
, EV_READ
| EV_PERSIST
,
3835 DATA_BUFFER_SIZE
, tcp_transport
);
3842 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3843 if (settings
.verbose
> 0)
3844 fprintf(stderr
, "Couldn't update event\n");
3845 conn_set_state(c
, conn_closing
);
3849 conn_set_state(c
, conn_read
);
3854 res
= IS_UDP(c
->transport
) ? try_read_udp(c
) : try_read_network(c
);
3857 case READ_NO_DATA_RECEIVED
:
3858 conn_set_state(c
, conn_waiting
);
3860 case READ_DATA_RECEIVED
:
3861 conn_set_state(c
, conn_parse_cmd
);
3864 conn_set_state(c
, conn_closing
);
3866 case READ_MEMORY_ERROR
: /* Failed to allocate more memory */
3867 /* State already set by try_read_network */
3875 case conn_parse_cmd
:
3876 if (try_read_command(c
) == 0) {
3877 /* wee need more data! */
3878 conn_set_state(c
, conn_waiting
);
3884 /* Only process nreqs at a time to avoid starving other
3889 reset_cmd_handler(c
);
3891 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3892 c
->thread
->stats
.conn_yields
++;
3893 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3894 if (c
->rbytes
> 0) {
3895 /* We have already read in data into the input buffer,
3896 so libevent will most likely not signal read events
3897 on the socket (unless more data is available. As a
3898 hack we should just put in a request to write data,
3899 because that should be possible ;-)
3901 if (!update_event(c
, EV_WRITE
| EV_PERSIST
)) {
3902 if (settings
.verbose
> 0)
3903 fprintf(stderr
, "Couldn't update event\n");
3904 conn_set_state(c
, conn_closing
);
3912 if (c
->rlbytes
== 0) {
3916 /* first check if we have leftovers in the conn_read buffer */
3917 if (c
->rbytes
> 0) {
3918 int tocopy
= c
->rbytes
> c
->rlbytes
? c
->rlbytes
: c
->rbytes
;
3919 if (c
->ritem
!= c
->rcurr
) {
3920 memmove(c
->ritem
, c
->rcurr
, tocopy
);
3923 c
->rlbytes
-= tocopy
;
3925 c
->rbytes
-= tocopy
;
3926 if (c
->rlbytes
== 0) {
3931 /* now try reading from the socket */
3932 res
= read(c
->sfd
, c
->ritem
, c
->rlbytes
);
3934 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3935 c
->thread
->stats
.bytes_read
+= res
;
3936 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3937 if (c
->rcurr
== c
->ritem
) {
3944 if (res
== 0) { /* end of stream */
3945 conn_set_state(c
, conn_closing
);
3948 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3949 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3950 if (settings
.verbose
> 0)
3951 fprintf(stderr
, "Couldn't update event\n");
3952 conn_set_state(c
, conn_closing
);
3958 /* otherwise we have a real error, on which we close the connection */
3959 if (settings
.verbose
> 0) {
3960 fprintf(stderr
, "Failed to read, and not due to blocking:\n"
3962 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3963 errno
, strerror(errno
),
3964 (long)c
->rcurr
, (long)c
->ritem
, (long)c
->rbuf
,
3965 (int)c
->rlbytes
, (int)c
->rsize
);
3967 conn_set_state(c
, conn_closing
);
3971 /* we are reading sbytes and throwing them away */
3972 if (c
->sbytes
== 0) {
3973 conn_set_state(c
, conn_new_cmd
);
3977 /* first check if we have leftovers in the conn_read buffer */
3978 if (c
->rbytes
> 0) {
3979 int tocopy
= c
->rbytes
> c
->sbytes
? c
->sbytes
: c
->rbytes
;
3980 c
->sbytes
-= tocopy
;
3982 c
->rbytes
-= tocopy
;
3986 /* now try reading from the socket */
3987 res
= read(c
->sfd
, c
->rbuf
, c
->rsize
> c
->sbytes
? c
->sbytes
: c
->rsize
);
3989 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3990 c
->thread
->stats
.bytes_read
+= res
;
3991 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3995 if (res
== 0) { /* end of stream */
3996 conn_set_state(c
, conn_closing
);
3999 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
4000 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
4001 if (settings
.verbose
> 0)
4002 fprintf(stderr
, "Couldn't update event\n");
4003 conn_set_state(c
, conn_closing
);
4009 /* otherwise we have a real error, on which we close the connection */
4010 if (settings
.verbose
> 0)
4011 fprintf(stderr
, "Failed to read, and not due to blocking\n");
4012 conn_set_state(c
, conn_closing
);
4017 * We want to write out a simple response. If we haven't already,
4018 * assemble it into a msgbuf list (this will be a single-entry
4019 * list for TCP or a two-entry list for UDP).
4021 if (c
->iovused
== 0 || (IS_UDP(c
->transport
) && c
->iovused
== 1)) {
4022 if (add_iov(c
, c
->wcurr
, c
->wbytes
) != 0) {
4023 if (settings
.verbose
> 0)
4024 fprintf(stderr
, "Couldn't build response\n");
4025 conn_set_state(c
, conn_closing
);
4030 /* fall through... */
4033 if (IS_UDP(c
->transport
) && c
->msgcurr
== 0 && build_udp_headers(c
) != 0) {
4034 if (settings
.verbose
> 0)
4035 fprintf(stderr
, "Failed to build UDP headers\n");
4036 conn_set_state(c
, conn_closing
);
4039 switch (transmit(c
)) {
4040 case TRANSMIT_COMPLETE
:
4041 if (c
->state
== conn_mwrite
) {
4042 while (c
->ileft
> 0) {
4043 item
*it
= *(c
->icurr
);
4044 assert((it
->it_flags
& ITEM_SLABBED
) == 0);
4049 while (c
->suffixleft
> 0) {
4050 char *suffix
= *(c
->suffixcurr
);
4051 cache_free(c
->thread
->suffix_cache
, suffix
);
4055 /* XXX: I don't know why this wasn't the general case */
4056 if(c
->protocol
== binary_prot
) {
4057 conn_set_state(c
, c
->write_and_go
);
4059 conn_set_state(c
, conn_new_cmd
);
4061 } else if (c
->state
== conn_write
) {
4062 if (c
->write_and_free
) {
4063 free(c
->write_and_free
);
4064 c
->write_and_free
= 0;
4066 conn_set_state(c
, c
->write_and_go
);
4068 if (settings
.verbose
> 0)
4069 fprintf(stderr
, "Unexpected state %d\n", c
->state
);
4070 conn_set_state(c
, conn_closing
);
4074 case TRANSMIT_INCOMPLETE
:
4075 case TRANSMIT_HARD_ERROR
:
4076 break; /* Continue in state machine. */
4078 case TRANSMIT_SOFT_ERROR
:
4088 if (IS_UDP(c
->transport
))
4095 case conn_max_state
:
4107 void event_handler(const int fd
, const short which
, void *arg
) {
4117 if (settings
.verbose
> 0)
4118 fprintf(stderr
, "Catastrophic: event fd doesn't match conn fd!\n");
4125 /* wait for next event */
4129 static int new_socket(struct addrinfo
*ai
) {
4133 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1) {
4137 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
4138 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
4139 perror("setting O_NONBLOCK");
4148 * Sets a socket's send buffer size to the maximum allowed by the system.
4150 static void maximize_sndbuf(const int sfd
) {
4151 socklen_t intsize
= sizeof(int);
4156 /* Start with the default size. */
4157 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0) {
4158 if (settings
.verbose
> 0)
4159 perror("getsockopt(SO_SNDBUF)");
4163 /* Binary-search for the real maximum. */
4165 max
= MAX_SENDBUF_SIZE
;
4167 while (min
<= max
) {
4168 avg
= ((unsigned int)(min
+ max
)) / 2;
4169 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *)&avg
, intsize
) == 0) {
4177 if (settings
.verbose
> 1)
4178 fprintf(stderr
, "<%d send buffer was %d, now %d\n", sfd
, old_size
, last_good
);
4182 * Create a socket and bind it to a specific port number
4183 * @param interface the interface to bind to
4184 * @param port the port number to bind to
4185 * @param transport the transport protocol (TCP / UDP)
4186 * @param portnumber_file A filepointer to write the port numbers to
4187 * when they are successfully added to the list of ports we
4190 static int server_socket(const char *interface
,
4192 enum network_transport transport
,
4193 FILE *portnumber_file
) {
4195 struct linger ling
= {0, 0};
4196 struct addrinfo
*ai
;
4197 struct addrinfo
*next
;
4198 struct addrinfo hints
= { .ai_flags
= AI_PASSIVE
,
4199 .ai_family
= AF_UNSPEC
};
4200 char port_buf
[NI_MAXSERV
];
4205 hints
.ai_socktype
= IS_UDP(transport
) ? SOCK_DGRAM
: SOCK_STREAM
;
4210 snprintf(port_buf
, sizeof(port_buf
), "%d", port
);
4211 error
= getaddrinfo(interface
, port_buf
, &hints
, &ai
);
4213 if (error
!= EAI_SYSTEM
)
4214 fprintf(stderr
, "getaddrinfo(): %s\n", gai_strerror(error
));
4216 perror("getaddrinfo()");
4220 for (next
= ai
; next
; next
= next
->ai_next
) {
4221 conn
*listen_conn_add
;
4222 if ((sfd
= new_socket(next
)) == -1) {
4223 /* getaddrinfo can return "junk" addresses,
4224 * we make sure at least one works before erroring.
4226 if (errno
== EMFILE
) {
4227 /* ...unless we're out of fds */
4228 perror("server_socket");
4235 if (next
->ai_family
== AF_INET6
) {
4236 error
= setsockopt(sfd
, IPPROTO_IPV6
, IPV6_V6ONLY
, (char *) &flags
, sizeof(flags
));
4238 perror("setsockopt");
4245 error
= setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, &flags
, sizeof(flags
));
4248 perror("setsockopt(SO_REUSEADDR)");
4251 if (IS_UDP(transport
)) {
4252 maximize_sndbuf(sfd
);
4254 error
= setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
));
4256 perror("setsockopt");
4258 error
= setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
4260 perror("setsockopt");
4262 error
= setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
, sizeof(flags
));
4264 perror("setsockopt");
4267 if (bind(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1) {
4268 if (errno
!= EADDRINUSE
) {
4278 if (!IS_UDP(transport
) && listen(sfd
, settings
.backlog
) == -1) {
4284 if (portnumber_file
!= NULL
&&
4285 (next
->ai_addr
->sa_family
== AF_INET
||
4286 next
->ai_addr
->sa_family
== AF_INET6
)) {
4288 struct sockaddr_in in
;
4289 struct sockaddr_in6 in6
;
4291 socklen_t len
= sizeof(my_sockaddr
);
4292 if (getsockname(sfd
, (struct sockaddr
*)&my_sockaddr
, &len
)==0) {
4293 if (next
->ai_addr
->sa_family
== AF_INET
) {
4294 fprintf(portnumber_file
, "%s INET: %u\n",
4295 IS_UDP(transport
) ? "UDP" : "TCP",
4296 ntohs(my_sockaddr
.in
.sin_port
));
4298 fprintf(portnumber_file
, "%s INET6: %u\n",
4299 IS_UDP(transport
) ? "UDP" : "TCP",
4300 ntohs(my_sockaddr
.in6
.sin6_port
));
4306 if (IS_UDP(transport
)) {
4309 for (c
= 0; c
< settings
.num_threads_per_udp
; c
++) {
4310 /* this is guaranteed to hit all threads because we round-robin */
4311 dispatch_conn_new(sfd
, conn_read
, EV_READ
| EV_PERSIST
,
4312 UDP_READ_BUFFER_SIZE
, transport
);
4315 if (!(listen_conn_add
= conn_new(sfd
, conn_listening
,
4316 EV_READ
| EV_PERSIST
, 1,
4317 transport
, main_base
))) {
4318 fprintf(stderr
, "failed to create listening connection\n");
4321 listen_conn_add
->next
= listen_conn
;
4322 listen_conn
= listen_conn_add
;
4328 /* Return zero iff we detected no errors in starting up connections */
4329 return success
== 0;
4332 static int server_sockets(int port
, enum network_transport transport
,
4333 FILE *portnumber_file
) {
4334 if (settings
.inter
== NULL
) {
4335 return server_socket(settings
.inter
, port
, transport
, portnumber_file
);
4337 // tokenize them and bind to each one of them..
4340 char *list
= strdup(settings
.inter
);
4343 fprintf(stderr
, "Failed to allocate memory for parsing server interface string\n");
4346 for (char *p
= strtok_r(list
, ";,", &b
);
4348 p
= strtok_r(NULL
, ";,", &b
)) {
4349 int the_port
= port
;
4350 char *s
= strchr(p
, ':');
4354 if (!safe_strtol(s
, &the_port
)) {
4355 fprintf(stderr
, "Invalid port number: \"%s\"", s
);
4359 if (strcmp(p
, "*") == 0) {
4362 ret
|= server_socket(p
, the_port
, transport
, portnumber_file
);
4369 static int new_socket_unix(void) {
4373 if ((sfd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) == -1) {
4378 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
4379 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
4380 perror("setting O_NONBLOCK");
4387 static int server_socket_unix(const char *path
, int access_mask
) {
4389 struct linger ling
= {0, 0};
4390 struct sockaddr_un addr
;
4399 if ((sfd
= new_socket_unix()) == -1) {
4404 * Clean up a previous socket file if we left it around
4406 if (lstat(path
, &tstat
) == 0) {
4407 if (S_ISSOCK(tstat
.st_mode
))
4411 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
4412 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
));
4413 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
4416 * the memset call clears nonstandard fields in some impementations
4417 * that otherwise mess things up.
4419 memset(&addr
, 0, sizeof(addr
));
4421 addr
.sun_family
= AF_UNIX
;
4422 strncpy(addr
.sun_path
, path
, sizeof(addr
.sun_path
) - 1);
4423 assert(strcmp(addr
.sun_path
, path
) == 0);
4424 old_umask
= umask( ~(access_mask
&0777));
4425 if (bind(sfd
, (struct sockaddr
*)&addr
, sizeof(addr
)) == -1) {
4432 if (listen(sfd
, settings
.backlog
) == -1) {
4437 if (!(listen_conn
= conn_new(sfd
, conn_listening
,
4438 EV_READ
| EV_PERSIST
, 1,
4439 local_transport
, main_base
))) {
4440 fprintf(stderr
, "failed to create listening connection\n");
4448 * We keep the current time of day in a global variable that's updated by a
4449 * timer event. This saves us a bunch of time() system calls (we really only
4450 * need to get the time once a second, whereas there can be tens of thousands
4451 * of requests a second) and allows us to use server-start-relative timestamps
4452 * rather than absolute UNIX timestamps, a space savings on systems where
4453 * sizeof(time_t) > sizeof(unsigned int).
4455 volatile rel_time_t current_time
;
4456 static struct event clockevent
;
4458 /* libevent uses a monotonic clock when available for event scheduling. Aside
4459 * from jitter, simply ticking our internal timer here is accurate enough.
4460 * Note that users who are setting explicit dates for expiration times *must*
4461 * ensure their clocks are correct before starting memcached. */
4462 static void clock_handler(const int fd
, const short which
, void *arg
) {
4463 struct timeval t
= {.tv_sec
= 1, .tv_usec
= 0};
4464 static bool initialized
= false;
4465 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4466 static bool monotonic
= false;
4467 static time_t monotonic_start
;
4471 /* only delete the event if it's actually there. */
4472 evtimer_del(&clockevent
);
4475 /* process_started is initialized to time() - 2. We initialize to 1 so
4476 * flush_all won't underflow during tests. */
4477 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4479 if (clock_gettime(CLOCK_MONOTONIC
, &ts
) == 0) {
4481 monotonic_start
= ts
.tv_sec
- 2;
4486 evtimer_set(&clockevent
, clock_handler
, 0);
4487 event_base_set(main_base
, &clockevent
);
4488 evtimer_add(&clockevent
, &t
);
4490 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4493 if (clock_gettime(CLOCK_MONOTONIC
, &ts
) == -1)
4495 current_time
= (rel_time_t
) (ts
.tv_sec
- monotonic_start
);
4501 gettimeofday(&tv
, NULL
);
4502 current_time
= (rel_time_t
) (tv
.tv_sec
- process_started
);
4506 static void usage(void) {
4507 printf(RPACKAGE
" " RVERSION
"\n");
4508 printf("-p <num> TCP port number to listen on (default: 11211)\n"
4509 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
4510 "-s <file> UNIX socket path to listen on (disables network support)\n"
4511 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
4512 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
4513 " <addr> may be specified as host:port. If you don't specify\n"
4514 " a port number, the value you specified with -p or -U is\n"
4515 " used. You may specify multiple addresses separated by comma\n"
4516 " or by using -l multiple times\n"
4518 "-d run as a daemon\n"
4519 "-r maximize core file limit\n"
4520 "-u <username> assume identity of <username> (only when run as root)\n"
4521 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
4522 "-M return error on memory exhausted (rather than removing items)\n"
4523 "-c <num> max simultaneous connections (default: 1024)\n"
4524 "-k lock down all paged memory. Note that there is a\n"
4525 " limit on how much memory you may lock. Trying to\n"
4526 " allocate more than that would fail, so be sure you\n"
4527 " set the limit correctly for the user you started\n"
4528 " the daemon with (not for -u <username> user;\n"
4529 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
4530 "-v verbose (print errors/warnings while in event loop)\n"
4531 "-vv very verbose (also print client commands/reponses)\n"
4532 "-vvv extremely verbose (also print internal state transitions)\n"
4533 "-h print this help and exit\n"
4534 "-i print memcached and libevent license\n"
4535 "-P <file> save PID in <file>, only used with -d option\n"
4536 "-f <factor> chunk size growth factor (default: 1.25)\n"
4537 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
4538 printf("-L Try to use large memory pages (if available). Increasing\n"
4539 " the memory page size could reduce the number of TLB misses\n"
4540 " and improve the performance. In order to get large pages\n"
4541 " from the OS, memcached will allocate the total item-cache\n"
4542 " in one large chunk.\n");
4543 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
4544 " This is used for per-prefix stats reporting. The default is\n"
4545 " \":\" (colon). If this option is specified, stats collection\n"
4546 " is turned on automatically; if not, then it may be turned on\n"
4547 " by sending the \"stats detail on\" command to the server.\n");
4548 printf("-t <num> number of threads to use (default: 4)\n");
4549 printf("-R Maximum number of requests per event, limits the number of\n"
4550 " requests process for a given connection to prevent \n"
4551 " starvation (default: 20)\n");
4552 printf("-C Disable use of CAS\n");
4553 printf("-b Set the backlog queue limit (default: 1024)\n");
4554 printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
4555 printf("-I Override the size of each slab page. Adjusts max item size\n"
4556 " (default: 1mb, min: 1k, max: 128m)\n");
4558 printf("-S Turn on Sasl authentication\n");
4560 printf("-o Comma separated list of extended or experimental options\n"
4561 " - (EXPERIMENTAL) maxconns_fast: immediately close new\n"
4562 " connections if over maxconns limit\n"
4563 " - hashpower: An integer multiplier for how large the hash\n"
4564 " table should be. Can be grown at runtime if not big enough.\n"
4565 " Set this based on \"STAT hash_power_level\" before a \n"
4571 static void usage_license(void) {
4572 printf(RPACKAGE
" " RVERSION
"\n\n");
4574 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
4575 "All rights reserved.\n"
4577 "Redistribution and use in source and binary forms, with or without\n"
4578 "modification, are permitted provided that the following conditions are\n"
4581 " * Redistributions of source code must retain the above copyright\n"
4582 "notice, this list of conditions and the following disclaimer.\n"
4584 " * Redistributions in binary form must reproduce the above\n"
4585 "copyright notice, this list of conditions and the following disclaimer\n"
4586 "in the documentation and/or other materials provided with the\n"
4589 " * Neither the name of the Danga Interactive nor the names of its\n"
4590 "contributors may be used to endorse or promote products derived from\n"
4591 "this software without specific prior written permission.\n"
4593 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
4594 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
4595 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
4596 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
4597 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
4598 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
4599 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4600 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4601 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4602 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
4603 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4606 "This product includes software developed by Niels Provos.\n"
4610 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
4611 "All rights reserved.\n"
4613 "Redistribution and use in source and binary forms, with or without\n"
4614 "modification, are permitted provided that the following conditions\n"
4616 "1. Redistributions of source code must retain the above copyright\n"
4617 " notice, this list of conditions and the following disclaimer.\n"
4618 "2. Redistributions in binary form must reproduce the above copyright\n"
4619 " notice, this list of conditions and the following disclaimer in the\n"
4620 " documentation and/or other materials provided with the distribution.\n"
4621 "3. All advertising materials mentioning features or use of this software\n"
4622 " must display the following acknowledgement:\n"
4623 " This product includes software developed by Niels Provos.\n"
4624 "4. The name of the author may not be used to endorse or promote products\n"
4625 " derived from this software without specific prior written permission.\n"
4627 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
4628 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
4629 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
4630 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
4631 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
4632 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4633 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4634 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4635 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
4636 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4642 static void save_pid(const char *pid_file
) {
4644 if (access(pid_file
, F_OK
) == 0) {
4645 if ((fp
= fopen(pid_file
, "r")) != NULL
) {
4647 if (fgets(buffer
, sizeof(buffer
), fp
) != NULL
) {
4649 if (safe_strtoul(buffer
, &pid
) && kill((pid_t
)pid
, 0) == 0) {
4650 fprintf(stderr
, "WARNING: The pid file contained the following (running) pid: %u\n", pid
);
4657 if ((fp
= fopen(pid_file
, "w")) == NULL
) {
4658 vperror("Could not open the pid file %s for writing", pid_file
);
4662 fprintf(fp
,"%ld\n", (long)getpid());
4663 if (fclose(fp
) == -1) {
4664 vperror("Could not close the pid file %s", pid_file
);
4668 static void remove_pidfile(const char *pid_file
) {
4669 if (pid_file
== NULL
)
4672 if (unlink(pid_file
) != 0) {
4673 vperror("Could not remove the pid file %s", pid_file
);
4678 static void sig_handler(const int sig
) {
4679 printf("SIGINT handled.\n");
4683 #ifndef HAVE_SIGIGNORE
4684 static int sigignore(int sig
) {
4685 struct sigaction sa
= { .sa_handler
= SIG_IGN
, .sa_flags
= 0 };
4687 if (sigemptyset(&sa
.sa_mask
) == -1 || sigaction(sig
, &sa
, 0) == -1) {
4696 * On systems that supports multiple page sizes we may reduce the
4697 * number of TLB-misses by using the biggest available page size
4699 static int enable_large_pages(void) {
4700 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
4703 int avail
= getpagesizes(sizes
, 32);
4705 size_t max
= sizes
[0];
4706 struct memcntl_mha arg
= {0};
4709 for (ii
= 1; ii
< avail
; ++ii
) {
4710 if (max
< sizes
[ii
]) {
4716 arg
.mha_pagesize
= max
;
4717 arg
.mha_cmd
= MHA_MAPSIZE_BSSBRK
;
4719 if (memcntl(0, 0, MC_HAT_ADVISE
, (caddr_t
)&arg
, 0, 0) == -1) {
4720 fprintf(stderr
, "Failed to set large pages: %s\n",
4722 fprintf(stderr
, "Will use default page size\n");
4727 fprintf(stderr
, "Failed to get supported pagesizes: %s\n",
4729 fprintf(stderr
, "Will use default page size\n");
4739 * Do basic sanity check of the runtime environment
4740 * @return true if no errors found, false if we can't use this env
4742 static bool sanitycheck(void) {
4743 /* One of our biggest problems is old and bogus libevents */
4744 const char *ever
= event_get_version();
4746 if (strncmp(ever
, "1.", 2) == 0) {
4747 /* Require at least 1.3 (that's still a couple of years old) */
4748 if ((ever
[2] == '1' || ever
[2] == '2') && !isdigit(ever
[3])) {
4749 fprintf(stderr
, "You are using libevent %s.\nPlease upgrade to"
4750 " a more recent version (1.3 or newer)\n",
4751 event_get_version());
4760 int main (int argc
, char **argv
) {
4762 bool lock_memory
= false;
4763 bool do_daemonize
= false;
4764 bool preallocate
= false;
4766 char *username
= NULL
;
4767 char *pid_file
= NULL
;
4772 int retval
= EXIT_SUCCESS
;
4773 /* listening sockets */
4774 static int *l_socket
= NULL
;
4777 static int *u_socket
= NULL
;
4778 bool protocol_specified
= false;
4779 bool tcp_specified
= false;
4780 bool udp_specified
= false;
4783 char *subopts_value
;
4790 char *const subopts_tokens
[] = {
4791 [MAXCONNS_FAST
] = (char*)"maxconns_fast",
4792 [HASHPOWER_INIT
] = (char*)"hashpower",
4793 [SLAB_REASSIGN
] = (char*)"slab_reassign",
4794 [SLAB_AUTOMOVE
] = (char*)"slab_automove",
4798 if (!sanitycheck()) {
4803 signal(SIGINT
, sig_handler
);
4808 /* set stderr non-buffering (for running under, say, daemontools) */
4809 setbuf(stderr
, NULL
);
4811 /* process arguments */
4812 while (-1 != (c
= getopt(argc
, argv
,
4813 "a:" /* access mask for unix socket */
4814 "p:" /* TCP port number to listen on */
4815 "s:" /* unix socket path to listen on */
4816 "U:" /* UDP port number to listen on */
4817 "m:" /* max memory to use for items in megabytes */
4818 "M" /* return error on memory exhausted */
4819 "c:" /* max simultaneous connections */
4820 "k" /* lock down all paged memory */
4821 "hi" /* help, licence info */
4822 "r" /* maximize core file limit */
4824 "d" /* daemon mode */
4825 "l:" /* interface to listen on */
4826 "u:" /* user identity to run as */
4827 "P:" /* save PID in file */
4829 "n:" /* minimum space allocated for key+value+flags */
4831 "D:" /* prefix delimiter? */
4832 "L" /* Large memory pages */
4833 "R:" /* max requests per event */
4834 "C" /* Disable use of CAS */
4835 "b:" /* backlog queue limit */
4836 "B:" /* Binding protocol */
4837 "I:" /* Max item size */
4839 "o:" /* Extended generic options */
4843 /* access for unix domain socket, as octal mask (like chmod)*/
4844 settings
.access
= strtol(optarg
,NULL
,8);
4848 settings
.udpport
= atoi(optarg
);
4849 udp_specified
= true;
4852 settings
.port
= atoi(optarg
);
4853 tcp_specified
= true;
4856 settings
.socketpath
= optarg
;
4859 settings
.maxbytes
= ((size_t)atoi(optarg
)) * 1024 * 1024;
4862 settings
.evict_to_free
= 0;
4865 settings
.maxconns
= atoi(optarg
);
4880 if (settings
.inter
!= NULL
) {
4881 size_t len
= strlen(settings
.inter
) + strlen(optarg
) + 2;
4882 char *p
= malloc(len
);
4884 fprintf(stderr
, "Failed to allocate memory\n");
4887 snprintf(p
, len
, "%s,%s", settings
.inter
, optarg
);
4888 free(settings
.inter
);
4891 settings
.inter
= strdup(optarg
);
4895 do_daemonize
= true;
4901 settings
.reqs_per_event
= atoi(optarg
);
4902 if (settings
.reqs_per_event
== 0) {
4903 fprintf(stderr
, "Number of requests per event must be greater than 0\n");
4914 settings
.factor
= atof(optarg
);
4915 if (settings
.factor
<= 1.0) {
4916 fprintf(stderr
, "Factor must be greater than 1\n");
4921 settings
.chunk_size
= atoi(optarg
);
4922 if (settings
.chunk_size
== 0) {
4923 fprintf(stderr
, "Chunk size must be greater than 0\n");
4928 settings
.num_threads
= atoi(optarg
);
4929 if (settings
.num_threads
<= 0) {
4930 fprintf(stderr
, "Number of threads must be greater than 0\n");
4933 /* There're other problems when you get above 64 threads.
4934 * In the future we should portably detect # of cores for the
4937 if (settings
.num_threads
> 64) {
4938 fprintf(stderr
, "WARNING: Setting a high number of worker"
4939 "threads is not recommended.\n"
4940 " Set this value to the number of cores in"
4941 " your machine or less.\n");
4945 if (! optarg
|| ! optarg
[0]) {
4946 fprintf(stderr
, "No delimiter specified\n");
4949 settings
.prefix_delimiter
= optarg
[0];
4950 settings
.detail_enabled
= 1;
4953 if (enable_large_pages() == 0) {
4958 settings
.use_cas
= false;
4961 settings
.backlog
= atoi(optarg
);
4964 protocol_specified
= true;
4965 if (strcmp(optarg
, "auto") == 0) {
4966 settings
.binding_protocol
= negotiating_prot
;
4967 } else if (strcmp(optarg
, "binary") == 0) {
4968 settings
.binding_protocol
= binary_prot
;
4969 } else if (strcmp(optarg
, "ascii") == 0) {
4970 settings
.binding_protocol
= ascii_prot
;
4972 fprintf(stderr
, "Invalid value for binding protocol: %s\n"
4973 " -- should be one of auto, binary, or ascii\n", optarg
);
4978 unit
= optarg
[strlen(optarg
)-1];
4979 if (unit
== 'k' || unit
== 'm' ||
4980 unit
== 'K' || unit
== 'M') {
4981 optarg
[strlen(optarg
)-1] = '\0';
4982 size_max
= atoi(optarg
);
4983 if (unit
== 'k' || unit
== 'K')
4985 if (unit
== 'm' || unit
== 'M')
4986 size_max
*= 1024 * 1024;
4987 settings
.item_size_max
= size_max
;
4989 settings
.item_size_max
= atoi(optarg
);
4991 if (settings
.item_size_max
< 1024) {
4992 fprintf(stderr
, "Item max size cannot be less than 1024 bytes.\n");
4995 if (settings
.item_size_max
> 1024 * 1024 * 128) {
4996 fprintf(stderr
, "Cannot set item size limit higher than 128 mb.\n");
4999 if (settings
.item_size_max
> 1024 * 1024) {
5000 fprintf(stderr
, "WARNING: Setting item max size above 1MB is not"
5002 " Raising this limit increases the minimum memory requirements\n"
5003 " and will decrease your memory efficiency.\n"
5007 case 'S': /* set Sasl authentication to true. Default is false */
5009 fprintf(stderr
, "This server is not built with SASL support.\n");
5012 settings
.sasl
= true;
5014 case 'o': /* It's sub-opts time! */
5017 while (*subopts
!= '\0') {
5019 switch (getsubopt(&subopts
, subopts_tokens
, &subopts_value
)) {
5021 settings
.maxconns_fast
= true;
5023 case HASHPOWER_INIT
:
5024 if (subopts_value
== NULL
) {
5025 fprintf(stderr
, "Missing numeric argument for hashpower\n");
5028 settings
.hashpower_init
= atoi(subopts_value
);
5029 if (settings
.hashpower_init
< 12) {
5030 fprintf(stderr
, "Initial hashtable multiplier of %d is too low\n",
5031 settings
.hashpower_init
);
5033 } else if (settings
.hashpower_init
> 64) {
5034 fprintf(stderr
, "Initial hashtable multiplier of %d is too high\n"
5035 "Choose a value based on \"STAT hash_power_level\" from a running instance\n",
5036 settings
.hashpower_init
);
5041 settings
.slab_reassign
= true;
5044 settings
.slab_automove
= true;
5047 printf("Illegal suboption \"%s\"\n", subopts_value
);
5054 fprintf(stderr
, "Illegal argument \"%c\"\n", c
);
5060 * Use one workerthread to serve each UDP port if the user specified
5063 if (settings
.inter
!= NULL
&& strchr(settings
.inter
, ',')) {
5064 settings
.num_threads_per_udp
= 1;
5066 settings
.num_threads_per_udp
= settings
.num_threads
;
5069 if (settings
.sasl
) {
5070 if (!protocol_specified
) {
5071 settings
.binding_protocol
= binary_prot
;
5073 if (settings
.binding_protocol
!= binary_prot
) {
5074 fprintf(stderr
, "ERROR: You cannot allow the ASCII protocol while using SASL.\n");
5080 if (tcp_specified
&& !udp_specified
) {
5081 settings
.udpport
= settings
.port
;
5082 } else if (udp_specified
&& !tcp_specified
) {
5083 settings
.port
= settings
.udpport
;
5087 struct rlimit rlim_new
;
5089 * First try raising to infinity; if that fails, try bringing
5090 * the soft limit to the hard.
5092 if (getrlimit(RLIMIT_CORE
, &rlim
) == 0) {
5093 rlim_new
.rlim_cur
= rlim_new
.rlim_max
= RLIM_INFINITY
;
5094 if (setrlimit(RLIMIT_CORE
, &rlim_new
)!= 0) {
5095 /* failed. try raising just to the old max */
5096 rlim_new
.rlim_cur
= rlim_new
.rlim_max
= rlim
.rlim_max
;
5097 (void)setrlimit(RLIMIT_CORE
, &rlim_new
);
5101 * getrlimit again to see what we ended up with. Only fail if
5102 * the soft limit ends up 0, because then no core files will be
5106 if ((getrlimit(RLIMIT_CORE
, &rlim
) != 0) || rlim
.rlim_cur
== 0) {
5107 fprintf(stderr
, "failed to ensure corefile creation\n");
5113 * If needed, increase rlimits to allow as many connections
5117 if (getrlimit(RLIMIT_NOFILE
, &rlim
) != 0) {
5118 fprintf(stderr
, "failed to getrlimit number of files\n");
5121 rlim
.rlim_cur
= settings
.maxconns
;
5122 rlim
.rlim_max
= settings
.maxconns
;
5123 if (setrlimit(RLIMIT_NOFILE
, &rlim
) != 0) {
5124 fprintf(stderr
, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n");
5129 /* lose root privileges if we have them */
5130 if (getuid() == 0 || geteuid() == 0) {
5131 if (username
== 0 || *username
== '\0') {
5132 fprintf(stderr
, "can't run as root without the -u switch\n");
5135 if ((pw
= getpwnam(username
)) == 0) {
5136 fprintf(stderr
, "can't find the user %s to switch to\n", username
);
5139 if (setgid(pw
->pw_gid
) < 0 || setuid(pw
->pw_uid
) < 0) {
5140 fprintf(stderr
, "failed to assume identity of user %s\n", username
);
5145 /* Initialize Sasl if -S was specified */
5146 if (settings
.sasl
) {
5150 /* daemonize if requested */
5151 /* if we want to ensure our ability to dump core, don't chdir to / */
5153 if (sigignore(SIGHUP
) == -1) {
5154 perror("Failed to ignore SIGHUP");
5156 if (daemonize(maxcore
, settings
.verbose
) == -1) {
5157 fprintf(stderr
, "failed to daemon() in order to daemonize\n");
5162 /* lock paged memory if needed */
5164 #ifdef HAVE_MLOCKALL
5165 int res
= mlockall(MCL_CURRENT
| MCL_FUTURE
);
5167 fprintf(stderr
, "warning: -k invalid, mlockall() failed: %s\n",
5171 fprintf(stderr
, "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
5175 /* initialize main thread libevent instance */
5176 main_base
= event_init();
5178 /* initialize other stuff */
5180 assoc_init(settings
.hashpower_init
);
5182 slabs_init(settings
.maxbytes
, settings
.factor
, preallocate
);
5185 * ignore SIGPIPE signals; we can use errno == EPIPE if we
5186 * need that information
5188 if (sigignore(SIGPIPE
) == -1) {
5189 perror("failed to ignore SIGPIPE; sigaction");
5192 /* start up worker threads if MT mode */
5193 thread_init(settings
.num_threads
, main_base
);
5195 if (start_assoc_maintenance_thread() == -1) {
5199 if (settings
.slab_reassign
&&
5200 start_slab_maintenance_thread() == -1) {
5204 /* initialise clock event */
5205 clock_handler(0, 0, 0);
5207 /* create unix mode sockets after dropping privileges */
5208 if (settings
.socketpath
!= NULL
) {
5210 if (server_socket_unix(settings
.socketpath
,settings
.access
)) {
5211 vperror("failed to listen on UNIX socket: %s", settings
.socketpath
);
5216 /* create the listening socket, bind it, and init */
5217 if (settings
.socketpath
== NULL
) {
5218 const char *portnumber_filename
= getenv("MEMCACHED_PORT_FILENAME");
5219 char temp_portnumber_filename
[PATH_MAX
];
5220 FILE *portnumber_file
= NULL
;
5222 if (portnumber_filename
!= NULL
) {
5223 snprintf(temp_portnumber_filename
,
5224 sizeof(temp_portnumber_filename
),
5225 "%s.lck", portnumber_filename
);
5227 portnumber_file
= fopen(temp_portnumber_filename
, "a");
5228 if (portnumber_file
== NULL
) {
5229 fprintf(stderr
, "Failed to open \"%s\": %s\n",
5230 temp_portnumber_filename
, strerror(errno
));
5235 if (settings
.port
&& server_sockets(settings
.port
, tcp_transport
,
5237 vperror("failed to listen on TCP port %d", settings
.port
);
5242 * initialization order: first create the listening sockets
5243 * (may need root on low ports), then drop root if needed,
5244 * then daemonise if needed, then init libevent (in some cases
5245 * descriptors created by libevent wouldn't survive forking).
5248 /* create the UDP listening socket and bind it */
5250 if (settings
.udpport
&& server_sockets(settings
.udpport
, udp_transport
,
5252 vperror("failed to listen on UDP port %d", settings
.udpport
);
5256 if (portnumber_file
) {
5257 fclose(portnumber_file
);
5258 rename(temp_portnumber_filename
, portnumber_filename
);
5262 /* Give the sockets a moment to open. I know this is dumb, but the error
5263 * is only an advisory.
5266 if (stats
.curr_conns
+ stats
.reserved_fds
>= settings
.maxconns
- 1) {
5267 fprintf(stderr
, "Maxconns setting is too low, use -c to increase.\n");
5271 if (pid_file
!= NULL
) {
5275 /* Drop privileges no longer needed */
5278 /* enter the event loop */
5279 if (event_base_loop(main_base
, 0) != 0) {
5280 retval
= EXIT_FAILURE
;
5283 stop_assoc_maintenance_thread();
5285 /* remove the PID file if we're a daemon */
5288 remove_pidfile(pid_file
);
5290 /* Clean up strdup() call for bind() address */
5292 free(settings
.inter
);