1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * memcached - memory caching daemon
5 * http://www.danga.com/memcached/
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
16 #include "memcached.h"
18 #include <sys/socket.h>
21 #include <sys/resource.h>
26 /* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h. */
28 #ifndef _P1003_1B_VISIBLE
29 #define _P1003_1B_VISIBLE
31 /* need this to get IOV_MAX on some platforms. */
32 #ifndef __need_IOV_MAX
33 #define __need_IOV_MAX
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
50 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
52 #if defined(__FreeBSD__) || defined(__APPLE__)
58 * forward declarations
60 static void drive_machine(conn
*c
);
61 static int new_socket(struct addrinfo
*ai
);
62 static int try_read_command(conn
*c
);
64 enum try_read_result
{
66 READ_NO_DATA_RECEIVED
,
67 READ_ERROR
, /** an error occured (on the socket) (or client closed connection) */
68 READ_MEMORY_ERROR
/** failed to allocate more memory */
71 static enum try_read_result
try_read_network(conn
*c
);
72 static enum try_read_result
try_read_udp(conn
*c
);
74 static void conn_set_state(conn
*c
, enum conn_states state
);
77 static void stats_init(void);
78 static void server_stats(ADD_STAT add_stats
, conn
*c
);
79 static void process_stat_settings(ADD_STAT add_stats
, void *c
);
83 static void settings_init(void);
85 /* event handling, network IO */
86 static void event_handler(const int fd
, const short which
, void *arg
);
87 static void conn_close(conn
*c
);
88 static void conn_init(void);
89 static bool update_event(conn
*c
, const int new_flags
);
90 static void complete_nread(conn
*c
);
91 static void process_command(conn
*c
, char *command
);
92 static void write_and_free(conn
*c
, char *buf
, int bytes
);
93 static int ensure_iov_space(conn
*c
);
94 static int add_iov(conn
*c
, const void *buf
, int len
);
95 static int add_msghdr(conn
*c
);
98 static void conn_free(conn
*c
);
100 /** exported globals **/
102 struct settings settings
;
103 time_t process_started
; /* when the process was started */
105 struct slab_rebalance slab_rebal
;
106 volatile int slab_rebalance_signal
;
108 /** file scope variables **/
109 static conn
*listen_conn
= NULL
;
110 static struct event_base
*main_base
;
112 enum transmit_result
{
113 TRANSMIT_COMPLETE
, /** All done writing. */
114 TRANSMIT_INCOMPLETE
, /** More data remaining to write. */
115 TRANSMIT_SOFT_ERROR
, /** Can't write any more right now. */
116 TRANSMIT_HARD_ERROR
/** Can't write (c->state is set to conn_closing) */
119 static enum transmit_result
transmit(conn
*c
);
121 /* This reduces the latency without adding lots of extra wiring to be able to
122 * notify the listener thread of when to listen again.
123 * Also, the clock timer could be broken out into its own thread and we
124 * can block the listener via a condition.
126 static volatile bool allow_new_conns
= true;
127 static struct event maxconnsevent
;
128 static void maxconns_handler(const int fd
, const short which
, void *arg
) {
129 struct timeval t
= {.tv_sec
= 0, .tv_usec
= 10000};
131 if (fd
== -42 || allow_new_conns
== false) {
132 /* reschedule in 10ms if we need to keep polling */
133 evtimer_set(&maxconnsevent
, maxconns_handler
, 0);
134 event_base_set(main_base
, &maxconnsevent
);
135 evtimer_add(&maxconnsevent
, &t
);
137 evtimer_del(&maxconnsevent
);
138 accept_new_conns(true);
142 #define REALTIME_MAXDELTA 60*60*24*30
145 * given time value that's either unix time or delta from current unix time, return
146 * unix time. Use the fact that delta can't exceed one month (and real time value can't
149 static rel_time_t
realtime(const time_t exptime
) {
150 /* no. of seconds in 30 days - largest possible delta exptime */
152 if (exptime
== 0) return 0; /* 0 means never expire */
154 if (exptime
> REALTIME_MAXDELTA
) {
155 /* if item expiration is at/before the server started, give it an
156 expiration time of 1 second after the server started.
157 (because 0 means don't expire). without this, we'd
158 underflow and wrap around to some large value way in the
159 future, effectively making items expiring in the past
160 really expiring never */
161 if (exptime
<= process_started
)
162 return (rel_time_t
)1;
163 return (rel_time_t
)(exptime
- process_started
);
165 return (rel_time_t
)(exptime
+ current_time
);
169 static void stats_init(void) {
170 stats
.curr_items
= stats
.total_items
= stats
.curr_conns
= stats
.total_conns
= stats
.conn_structs
= 0;
171 stats
.get_cmds
= stats
.set_cmds
= stats
.get_hits
= stats
.get_misses
= stats
.evictions
= stats
.reclaimed
= 0;
172 stats
.touch_cmds
= stats
.touch_misses
= stats
.touch_hits
= stats
.rejected_conns
= 0;
173 stats
.curr_bytes
= stats
.listen_disabled_num
= 0;
174 stats
.hash_power_level
= stats
.hash_bytes
= stats
.hash_is_expanding
= 0;
175 stats
.expired_unfetched
= stats
.evicted_unfetched
= 0;
176 stats
.slabs_moved
= 0;
177 stats
.accepting_conns
= true; /* assuming we start in this state. */
178 stats
.slab_reassign_running
= false;
180 /* make the time we started always be 2 seconds before we really
181 did, so time(0) - time.started is never zero. if so, things
182 like 'settings.oldest_live' which act as booleans as well as
183 values are now false in boolean context... */
184 process_started
= time(0) - 2;
188 static void stats_reset(void) {
190 stats
.total_items
= stats
.total_conns
= 0;
191 stats
.rejected_conns
= 0;
194 stats
.listen_disabled_num
= 0;
195 stats_prefix_clear();
197 threadlocal_stats_reset();
201 static void settings_init(void) {
202 settings
.use_cas
= true;
203 settings
.access
= 0700;
204 settings
.port
= 11211;
205 settings
.udpport
= 11211;
206 /* By default this string should be NULL for getaddrinfo() */
207 settings
.inter
= NULL
;
208 settings
.maxbytes
= 64 * 1024 * 1024; /* default is 64MB */
209 settings
.maxconns
= 1024; /* to limit connections-related memory to about 5MB */
210 settings
.verbose
= 0;
211 settings
.oldest_live
= 0;
212 settings
.evict_to_free
= 1; /* push old items out of cache when memory runs out */
213 settings
.socketpath
= NULL
; /* by default, not using a unix socket */
214 settings
.factor
= 1.25;
215 settings
.chunk_size
= 48; /* space for a modest key and value */
216 settings
.num_threads
= 4; /* N workers */
217 settings
.num_threads_per_udp
= 0;
218 settings
.prefix_delimiter
= ':';
219 settings
.detail_enabled
= 0;
220 settings
.reqs_per_event
= 20;
221 settings
.backlog
= 1024;
222 settings
.binding_protocol
= negotiating_prot
;
223 settings
.item_size_max
= 1024 * 1024; /* The famous 1MB upper limit. */
224 settings
.maxconns_fast
= false;
225 settings
.hashpower_init
= 0;
226 settings
.slab_reassign
= false;
227 settings
.slab_automove
= 0;
231 * Adds a message header to a connection.
233 * Returns 0 on success, -1 on out-of-memory.
235 static int add_msghdr(conn
*c
)
241 if (c
->msgsize
== c
->msgused
) {
242 msg
= realloc(c
->msglist
, c
->msgsize
* 2 * sizeof(struct msghdr
));
249 msg
= c
->msglist
+ c
->msgused
;
251 /* this wipes msg_iovlen, msg_control, msg_controllen, and
252 msg_flags, the last 3 of which aren't defined on solaris: */
253 memset(msg
, 0, sizeof(struct msghdr
));
255 msg
->msg_iov
= &c
->iov
[c
->iovused
];
257 if (c
->request_addr_size
> 0) {
258 msg
->msg_name
= &c
->request_addr
;
259 msg
->msg_namelen
= c
->request_addr_size
;
265 if (IS_UDP(c
->transport
)) {
266 /* Leave room for the UDP header, which we'll fill in later. */
267 return add_iov(c
, NULL
, UDP_HEADER_SIZE
);
275 * Free list management for connections.
278 static conn
**freeconns
;
279 static int freetotal
;
281 /* Lock for connection freelist */
282 static pthread_mutex_t conn_lock
= PTHREAD_MUTEX_INITIALIZER
;
285 static void conn_init(void) {
288 if ((freeconns
= calloc(freetotal
, sizeof(conn
*))) == NULL
) {
289 fprintf(stderr
, "Failed to allocate connection structures\n");
295 * Returns a connection from the freelist, if any.
297 conn
*conn_from_freelist() {
300 pthread_mutex_lock(&conn_lock
);
302 c
= freeconns
[--freecurr
];
306 pthread_mutex_unlock(&conn_lock
);
312 * Adds a connection to the freelist. 0 = success.
314 bool conn_add_to_freelist(conn
*c
) {
316 pthread_mutex_lock(&conn_lock
);
317 if (freecurr
< freetotal
) {
318 freeconns
[freecurr
++] = c
;
321 /* try to enlarge free connections array */
322 size_t newsize
= freetotal
* 2;
323 conn
**new_freeconns
= realloc(freeconns
, sizeof(conn
*) * newsize
);
326 freeconns
= new_freeconns
;
327 freeconns
[freecurr
++] = c
;
331 pthread_mutex_unlock(&conn_lock
);
335 static const char *prot_text(enum protocol prot
) {
336 char *rv
= "unknown";
344 case negotiating_prot
:
345 rv
= "auto-negotiate";
351 conn
*conn_new(const int sfd
, enum conn_states init_state
,
352 const int event_flags
,
353 const int read_buffer_size
, enum network_transport transport
,
354 struct event_base
*base
) {
355 conn
*c
= conn_from_freelist();
358 if (!(c
= (conn
*)calloc(1, sizeof(conn
)))) {
359 fprintf(stderr
, "calloc()\n");
362 MEMCACHED_CONN_CREATE(c
);
364 c
->rbuf
= c
->wbuf
= 0;
371 c
->rsize
= read_buffer_size
;
372 c
->wsize
= DATA_BUFFER_SIZE
;
373 c
->isize
= ITEM_LIST_INITIAL
;
374 c
->suffixsize
= SUFFIX_LIST_INITIAL
;
375 c
->iovsize
= IOV_LIST_INITIAL
;
376 c
->msgsize
= MSG_LIST_INITIAL
;
379 c
->rbuf
= (char *)malloc((size_t)c
->rsize
);
380 c
->wbuf
= (char *)malloc((size_t)c
->wsize
);
381 c
->ilist
= (item
**)malloc(sizeof(item
*) * c
->isize
);
382 c
->suffixlist
= (char **)malloc(sizeof(char *) * c
->suffixsize
);
383 c
->iov
= (struct iovec
*)malloc(sizeof(struct iovec
) * c
->iovsize
);
384 c
->msglist
= (struct msghdr
*)malloc(sizeof(struct msghdr
) * c
->msgsize
);
386 if (c
->rbuf
== 0 || c
->wbuf
== 0 || c
->ilist
== 0 || c
->iov
== 0 ||
387 c
->msglist
== 0 || c
->suffixlist
== 0) {
389 fprintf(stderr
, "malloc()\n");
394 stats
.conn_structs
++;
398 c
->transport
= transport
;
399 c
->protocol
= settings
.binding_protocol
;
401 /* unix socket mode doesn't need this, so zeroed out. but why
402 * is this done for every command? presumably for UDP
404 if (!settings
.socketpath
) {
405 c
->request_addr_size
= sizeof(c
->request_addr
);
407 c
->request_addr_size
= 0;
410 if (settings
.verbose
> 1) {
411 if (init_state
== conn_listening
) {
412 fprintf(stderr
, "<%d server listening (%s)\n", sfd
,
413 prot_text(c
->protocol
));
414 } else if (IS_UDP(transport
)) {
415 fprintf(stderr
, "<%d server listening (udp)\n", sfd
);
416 } else if (c
->protocol
== negotiating_prot
) {
417 fprintf(stderr
, "<%d new auto-negotiating client connection\n",
419 } else if (c
->protocol
== ascii_prot
) {
420 fprintf(stderr
, "<%d new ascii client connection.\n", sfd
);
421 } else if (c
->protocol
== binary_prot
) {
422 fprintf(stderr
, "<%d new binary client connection.\n", sfd
);
424 fprintf(stderr
, "<%d new unknown (%d) client connection\n",
431 c
->state
= init_state
;
434 c
->rbytes
= c
->wbytes
= 0;
439 c
->suffixcurr
= c
->suffixlist
;
446 c
->write_and_go
= init_state
;
447 c
->write_and_free
= 0;
452 event_set(&c
->event
, sfd
, event_flags
, event_handler
, (void *)c
);
453 event_base_set(base
, &c
->event
);
454 c
->ev_flags
= event_flags
;
456 if (event_add(&c
->event
, 0) == -1) {
457 if (conn_add_to_freelist(c
)) {
469 MEMCACHED_CONN_ALLOCATE(c
->sfd
);
474 static void conn_cleanup(conn
*c
) {
478 item_remove(c
->item
);
483 for (; c
->ileft
> 0; c
->ileft
--,c
->icurr
++) {
484 item_remove(*(c
->icurr
));
488 if (c
->suffixleft
!= 0) {
489 for (; c
->suffixleft
> 0; c
->suffixleft
--, c
->suffixcurr
++) {
490 cache_free(c
->thread
->suffix_cache
, *(c
->suffixcurr
));
494 if (c
->write_and_free
) {
495 free(c
->write_and_free
);
496 c
->write_and_free
= 0;
500 assert(settings
.sasl
);
501 sasl_dispose(&c
->sasl_conn
);
505 if (IS_UDP(c
->transport
)) {
506 conn_set_state(c
, conn_read
);
511 * Frees a connection.
513 void conn_free(conn
*c
) {
515 MEMCACHED_CONN_DESTROY(c
);
534 static void conn_close(conn
*c
) {
537 /* delete the event, the socket and the conn */
538 event_del(&c
->event
);
540 if (settings
.verbose
> 1)
541 fprintf(stderr
, "<%d connection closed.\n", c
->sfd
);
543 MEMCACHED_CONN_RELEASE(c
->sfd
);
545 pthread_mutex_lock(&conn_lock
);
546 allow_new_conns
= true;
547 pthread_mutex_unlock(&conn_lock
);
550 /* if the connection has big buffers, just free it */
551 if (c
->rsize
> READ_BUFFER_HIGHWAT
|| conn_add_to_freelist(c
)) {
563 * Shrinks a connection's buffers if they're too big. This prevents
564 * periodic large "get" requests from permanently chewing lots of server
567 * This should only be called in between requests since it can wipe output
570 static void conn_shrink(conn
*c
) {
573 if (IS_UDP(c
->transport
))
576 if (c
->rsize
> READ_BUFFER_HIGHWAT
&& c
->rbytes
< DATA_BUFFER_SIZE
) {
579 if (c
->rcurr
!= c
->rbuf
)
580 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
582 newbuf
= (char *)realloc((void *)c
->rbuf
, DATA_BUFFER_SIZE
);
586 c
->rsize
= DATA_BUFFER_SIZE
;
588 /* TODO check other branch... */
592 if (c
->isize
> ITEM_LIST_HIGHWAT
) {
593 item
**newbuf
= (item
**) realloc((void *)c
->ilist
, ITEM_LIST_INITIAL
* sizeof(c
->ilist
[0]));
596 c
->isize
= ITEM_LIST_INITIAL
;
598 /* TODO check error condition? */
601 if (c
->msgsize
> MSG_LIST_HIGHWAT
) {
602 struct msghdr
*newbuf
= (struct msghdr
*) realloc((void *)c
->msglist
, MSG_LIST_INITIAL
* sizeof(c
->msglist
[0]));
605 c
->msgsize
= MSG_LIST_INITIAL
;
607 /* TODO check error condition? */
610 if (c
->iovsize
> IOV_LIST_HIGHWAT
) {
611 struct iovec
*newbuf
= (struct iovec
*) realloc((void *)c
->iov
, IOV_LIST_INITIAL
* sizeof(c
->iov
[0]));
614 c
->iovsize
= IOV_LIST_INITIAL
;
616 /* TODO check return value */
621 * Convert a state name to a human readable form.
623 static const char *state_text(enum conn_states state
) {
624 const char* const statenames
[] = { "conn_listening",
634 return statenames
[state
];
638 * Sets a connection's current state in the state machine. Any special
639 * processing that needs to happen on certain state transitions can
642 static void conn_set_state(conn
*c
, enum conn_states state
) {
644 assert(state
>= conn_listening
&& state
< conn_max_state
);
646 if (state
!= c
->state
) {
647 if (settings
.verbose
> 2) {
648 fprintf(stderr
, "%d: going from %s to %s\n",
649 c
->sfd
, state_text(c
->state
),
653 if (state
== conn_write
|| state
== conn_mwrite
) {
654 MEMCACHED_PROCESS_COMMAND_END(c
->sfd
, c
->wbuf
, c
->wbytes
);
661 * Ensures that there is room for another struct iovec in a connection's
664 * Returns 0 on success, -1 on out-of-memory.
666 static int ensure_iov_space(conn
*c
) {
669 if (c
->iovused
>= c
->iovsize
) {
671 struct iovec
*new_iov
= (struct iovec
*)realloc(c
->iov
,
672 (c
->iovsize
* 2) * sizeof(struct iovec
));
678 /* Point all the msghdr structures at the new list. */
679 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++) {
680 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
681 iovnum
+= c
->msglist
[i
].msg_iovlen
;
690 * Adds data to the list of pending data that will be written out to a
693 * Returns 0 on success, -1 on out-of-memory.
696 static int add_iov(conn
*c
, const void *buf
, int len
) {
704 m
= &c
->msglist
[c
->msgused
- 1];
707 * Limit UDP packets, and the first payloads of TCP replies, to
708 * UDP_MAX_PAYLOAD_SIZE bytes.
710 limit_to_mtu
= IS_UDP(c
->transport
) || (1 == c
->msgused
);
712 /* We may need to start a new msghdr if this one is full. */
713 if (m
->msg_iovlen
== IOV_MAX
||
714 (limit_to_mtu
&& c
->msgbytes
>= UDP_MAX_PAYLOAD_SIZE
)) {
716 m
= &c
->msglist
[c
->msgused
- 1];
719 if (ensure_iov_space(c
) != 0)
722 /* If the fragment is too big to fit in the datagram, split it up */
723 if (limit_to_mtu
&& len
+ c
->msgbytes
> UDP_MAX_PAYLOAD_SIZE
) {
724 leftover
= len
+ c
->msgbytes
- UDP_MAX_PAYLOAD_SIZE
;
730 m
= &c
->msglist
[c
->msgused
- 1];
731 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *)buf
;
732 m
->msg_iov
[m
->msg_iovlen
].iov_len
= len
;
738 buf
= ((char *)buf
) + len
;
740 } while (leftover
> 0);
747 * Constructs a set of UDP headers and attaches them to the outgoing messages.
749 static int build_udp_headers(conn
*c
) {
755 if (c
->msgused
> c
->hdrsize
) {
758 new_hdrbuf
= realloc(c
->hdrbuf
, c
->msgused
* 2 * UDP_HEADER_SIZE
);
760 new_hdrbuf
= malloc(c
->msgused
* 2 * UDP_HEADER_SIZE
);
763 c
->hdrbuf
= (unsigned char *)new_hdrbuf
;
764 c
->hdrsize
= c
->msgused
* 2;
768 for (i
= 0; i
< c
->msgused
; i
++) {
769 c
->msglist
[i
].msg_iov
[0].iov_base
= (void*)hdr
;
770 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
771 *hdr
++ = c
->request_id
/ 256;
772 *hdr
++ = c
->request_id
% 256;
775 *hdr
++ = c
->msgused
/ 256;
776 *hdr
++ = c
->msgused
% 256;
779 assert((void *) hdr
== (caddr_t
)c
->msglist
[i
].msg_iov
[0].iov_base
+ UDP_HEADER_SIZE
);
786 static void out_string(conn
*c
, const char *str
) {
792 if (settings
.verbose
> 1)
793 fprintf(stderr
, ">%d NOREPLY %s\n", c
->sfd
, str
);
795 conn_set_state(c
, conn_new_cmd
);
799 if (settings
.verbose
> 1)
800 fprintf(stderr
, ">%d %s\n", c
->sfd
, str
);
802 /* Nuke a partial output... */
809 if ((len
+ 2) > c
->wsize
) {
810 /* ought to be always enough. just fail for simplicity */
811 str
= "SERVER_ERROR output line too long";
815 memcpy(c
->wbuf
, str
, len
);
816 memcpy(c
->wbuf
+ len
, "\r\n", 2);
820 conn_set_state(c
, conn_write
);
821 c
->write_and_go
= conn_new_cmd
;
826 * we get here after reading the value in set/add/replace commands. The command
827 * has been stored in c->cmd, and the item is ready in c->item.
829 static void complete_nread_ascii(conn
*c
) {
834 enum store_item_type ret
;
836 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
837 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].set_cmds
++;
838 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
840 if (strncmp(ITEM_data(it
) + it
->nbytes
- 2, "\r\n", 2) != 0) {
841 out_string(c
, "CLIENT_ERROR bad data chunk");
843 ret
= store_item(it
, comm
, c
);
846 uint64_t cas
= ITEM_get_cas(it
);
849 MEMCACHED_COMMAND_ADD(c
->sfd
, ITEM_key(it
), it
->nkey
,
850 (ret
== 1) ? it
->nbytes
: -1, cas
);
853 MEMCACHED_COMMAND_REPLACE(c
->sfd
, ITEM_key(it
), it
->nkey
,
854 (ret
== 1) ? it
->nbytes
: -1, cas
);
857 MEMCACHED_COMMAND_APPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
858 (ret
== 1) ? it
->nbytes
: -1, cas
);
861 MEMCACHED_COMMAND_PREPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
862 (ret
== 1) ? it
->nbytes
: -1, cas
);
865 MEMCACHED_COMMAND_SET(c
->sfd
, ITEM_key(it
), it
->nkey
,
866 (ret
== 1) ? it
->nbytes
: -1, cas
);
869 MEMCACHED_COMMAND_CAS(c
->sfd
, ITEM_key(it
), it
->nkey
, it
->nbytes
,
877 out_string(c
, "STORED");
880 out_string(c
, "EXISTS");
883 out_string(c
, "NOT_FOUND");
886 out_string(c
, "NOT_STORED");
889 out_string(c
, "SERVER_ERROR Unhandled storage type.");
894 item_remove(c
->item
); /* release the c->item reference */
899 * get a pointer to the start of the request struct for the current command
901 static void* binary_get_request(conn
*c
) {
902 char *ret
= c
->rcurr
;
903 ret
-= (sizeof(c
->binary_header
) + c
->binary_header
.request
.keylen
+
904 c
->binary_header
.request
.extlen
);
906 assert(ret
>= c
->rbuf
);
911 * get a pointer to the key in this request
913 static char* binary_get_key(conn
*c
) {
914 return c
->rcurr
- (c
->binary_header
.request
.keylen
);
917 static void add_bin_header(conn
*c
, uint16_t err
, uint8_t hdr_len
, uint16_t key_len
, uint32_t body_len
) {
918 protocol_binary_response_header
* header
;
925 if (add_msghdr(c
) != 0) {
926 /* XXX: out_string is inappropriate here */
927 out_string(c
, "SERVER_ERROR out of memory");
931 header
= (protocol_binary_response_header
*)c
->wbuf
;
933 header
->response
.magic
= (uint8_t)PROTOCOL_BINARY_RES
;
934 header
->response
.opcode
= c
->binary_header
.request
.opcode
;
935 header
->response
.keylen
= (uint16_t)htons(key_len
);
937 header
->response
.extlen
= (uint8_t)hdr_len
;
938 header
->response
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
;
939 header
->response
.status
= (uint16_t)htons(err
);
941 header
->response
.bodylen
= htonl(body_len
);
942 header
->response
.opaque
= c
->opaque
;
943 header
->response
.cas
= htonll(c
->cas
);
945 if (settings
.verbose
> 1) {
947 fprintf(stderr
, ">%d Writing bin response:", c
->sfd
);
948 for (ii
= 0; ii
< sizeof(header
->bytes
); ++ii
) {
950 fprintf(stderr
, "\n>%d ", c
->sfd
);
952 fprintf(stderr
, " 0x%02x", header
->bytes
[ii
]);
954 fprintf(stderr
, "\n");
957 add_iov(c
, c
->wbuf
, sizeof(header
->response
));
960 static void write_bin_error(conn
*c
, protocol_binary_response_status err
, int swallow
) {
961 const char *errstr
= "Unknown error";
965 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
966 errstr
= "Out of memory";
968 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
969 errstr
= "Unknown command";
971 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
972 errstr
= "Not found";
974 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
975 errstr
= "Invalid arguments";
977 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
:
978 errstr
= "Data exists for key.";
980 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
981 errstr
= "Too large.";
983 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL
:
984 errstr
= "Non-numeric server-side value for incr or decr";
986 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
987 errstr
= "Not stored.";
989 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
:
990 errstr
= "Auth failure.";
994 errstr
= "UNHANDLED ERROR";
995 fprintf(stderr
, ">%d UNHANDLED ERROR: %d\n", c
->sfd
, err
);
998 if (settings
.verbose
> 1) {
999 fprintf(stderr
, ">%d Writing an error: %s\n", c
->sfd
, errstr
);
1002 len
= strlen(errstr
);
1003 add_bin_header(c
, err
, 0, 0, len
);
1005 add_iov(c
, errstr
, len
);
1007 conn_set_state(c
, conn_mwrite
);
1009 c
->sbytes
= swallow
;
1010 c
->write_and_go
= conn_swallow
;
1012 c
->write_and_go
= conn_new_cmd
;
1016 /* Form and send a response to a command over the binary protocol */
1017 static void write_bin_response(conn
*c
, void *d
, int hlen
, int keylen
, int dlen
) {
1018 if (!c
->noreply
|| c
->cmd
== PROTOCOL_BINARY_CMD_GET
||
1019 c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1020 add_bin_header(c
, 0, hlen
, keylen
, dlen
);
1022 add_iov(c
, d
, dlen
);
1024 conn_set_state(c
, conn_mwrite
);
1025 c
->write_and_go
= conn_new_cmd
;
1027 conn_set_state(c
, conn_new_cmd
);
1031 static void complete_incr_bin(conn
*c
) {
1035 /* Weird magic in add_delta forces me to pad here */
1036 char tmpbuf
[INCR_MAX_STORAGE_LEN
];
1039 protocol_binary_response_incr
* rsp
= (protocol_binary_response_incr
*)c
->wbuf
;
1040 protocol_binary_request_incr
* req
= binary_get_request(c
);
1043 assert(c
->wsize
>= sizeof(*rsp
));
1045 /* fix byteorder in the request */
1046 req
->message
.body
.delta
= ntohll(req
->message
.body
.delta
);
1047 req
->message
.body
.initial
= ntohll(req
->message
.body
.initial
);
1048 req
->message
.body
.expiration
= ntohl(req
->message
.body
.expiration
);
1049 key
= binary_get_key(c
);
1050 nkey
= c
->binary_header
.request
.keylen
;
1052 if (settings
.verbose
> 1) {
1054 fprintf(stderr
, "incr ");
1056 for (i
= 0; i
< nkey
; i
++) {
1057 fprintf(stderr
, "%c", key
[i
]);
1059 fprintf(stderr
, " %lld, %llu, %d\n",
1060 (long long)req
->message
.body
.delta
,
1061 (long long)req
->message
.body
.initial
,
1062 req
->message
.body
.expiration
);
1065 if (c
->binary_header
.request
.cas
!= 0) {
1066 cas
= c
->binary_header
.request
.cas
;
1068 switch(add_delta(c
, key
, nkey
, c
->cmd
== PROTOCOL_BINARY_CMD_INCREMENT
,
1069 req
->message
.body
.delta
, tmpbuf
,
1072 rsp
->message
.body
.value
= htonll(strtoull(tmpbuf
, NULL
, 10));
1076 write_bin_response(c
, &rsp
->message
.body
, 0, 0,
1077 sizeof(rsp
->message
.body
.value
));
1080 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL
, 0);
1083 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1085 case DELTA_ITEM_NOT_FOUND
:
1086 if (req
->message
.body
.expiration
!= 0xffffffff) {
1087 /* Save some room for the response */
1088 rsp
->message
.body
.value
= htonll(req
->message
.body
.initial
);
1089 it
= item_alloc(key
, nkey
, 0, realtime(req
->message
.body
.expiration
),
1090 INCR_MAX_STORAGE_LEN
);
1093 snprintf(ITEM_data(it
), INCR_MAX_STORAGE_LEN
, "%llu",
1094 (unsigned long long)req
->message
.body
.initial
);
1096 if (store_item(it
, NREAD_ADD
, c
)) {
1097 c
->cas
= ITEM_get_cas(it
);
1098 write_bin_response(c
, &rsp
->message
.body
, 0, 0, sizeof(rsp
->message
.body
.value
));
1100 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_NOT_STORED
, 0);
1102 item_remove(it
); /* release our reference */
1104 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1107 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1108 if (c
->cmd
== PROTOCOL_BINARY_CMD_INCREMENT
) {
1109 c
->thread
->stats
.incr_misses
++;
1111 c
->thread
->stats
.decr_misses
++;
1113 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1115 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1118 case DELTA_ITEM_CAS_MISMATCH
:
1119 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
1124 static void complete_update_bin(conn
*c
) {
1125 protocol_binary_response_status eno
= PROTOCOL_BINARY_RESPONSE_EINVAL
;
1126 enum store_item_type ret
= NOT_STORED
;
1131 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1132 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].set_cmds
++;
1133 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1135 /* We don't actually receive the trailing two characters in the bin
1136 * protocol, so we're going to just set them here */
1137 *(ITEM_data(it
) + it
->nbytes
- 2) = '\r';
1138 *(ITEM_data(it
) + it
->nbytes
- 1) = '\n';
1140 ret
= store_item(it
, c
->cmd
, c
);
1142 #ifdef ENABLE_DTRACE
1143 uint64_t cas
= ITEM_get_cas(it
);
1146 MEMCACHED_COMMAND_ADD(c
->sfd
, ITEM_key(it
), it
->nkey
,
1147 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1150 MEMCACHED_COMMAND_REPLACE(c
->sfd
, ITEM_key(it
), it
->nkey
,
1151 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1154 MEMCACHED_COMMAND_APPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
1155 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1158 MEMCACHED_COMMAND_PREPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
1159 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1162 MEMCACHED_COMMAND_SET(c
->sfd
, ITEM_key(it
), it
->nkey
,
1163 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1171 write_bin_response(c
, NULL
, 0, 0, 0);
1174 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
1177 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1180 if (c
->cmd
== NREAD_ADD
) {
1181 eno
= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
;
1182 } else if(c
->cmd
== NREAD_REPLACE
) {
1183 eno
= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
;
1185 eno
= PROTOCOL_BINARY_RESPONSE_NOT_STORED
;
1187 write_bin_error(c
, eno
, 0);
1190 item_remove(c
->item
); /* release the c->item reference */
1194 static void process_bin_touch(conn
*c
) {
1197 protocol_binary_response_get
* rsp
= (protocol_binary_response_get
*)c
->wbuf
;
1198 char* key
= binary_get_key(c
);
1199 size_t nkey
= c
->binary_header
.request
.keylen
;
1200 protocol_binary_request_touch
*t
= binary_get_request(c
);
1201 time_t exptime
= ntohl(t
->message
.body
.expiration
);
1203 if (settings
.verbose
> 1) {
1205 /* May be GAT/GATQ/etc */
1206 fprintf(stderr
, "<%d TOUCH ", c
->sfd
);
1207 for (ii
= 0; ii
< nkey
; ++ii
) {
1208 fprintf(stderr
, "%c", key
[ii
]);
1210 fprintf(stderr
, "\n");
1213 it
= item_touch(key
, nkey
, realtime(exptime
));
1216 /* the length has two unnecessary bytes ("\r\n") */
1217 uint16_t keylen
= 0;
1218 uint32_t bodylen
= sizeof(rsp
->message
.body
) + (it
->nbytes
- 2);
1221 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1222 c
->thread
->stats
.touch_cmds
++;
1223 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].touch_hits
++;
1224 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1226 MEMCACHED_COMMAND_TOUCH(c
->sfd
, ITEM_key(it
), it
->nkey
,
1227 it
->nbytes
, ITEM_get_cas(it
));
1229 if (c
->cmd
== PROTOCOL_BINARY_CMD_TOUCH
) {
1230 bodylen
-= it
->nbytes
- 2;
1231 } else if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1236 add_bin_header(c
, 0, sizeof(rsp
->message
.body
), keylen
, bodylen
);
1237 rsp
->message
.header
.response
.cas
= htonll(ITEM_get_cas(it
));
1240 rsp
->message
.body
.flags
= htonl(strtoul(ITEM_suffix(it
), NULL
, 10));
1241 add_iov(c
, &rsp
->message
.body
, sizeof(rsp
->message
.body
));
1243 if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1244 add_iov(c
, ITEM_key(it
), nkey
);
1247 /* Add the data minus the CRLF */
1248 if (c
->cmd
!= PROTOCOL_BINARY_CMD_TOUCH
) {
1249 add_iov(c
, ITEM_data(it
), it
->nbytes
- 2);
1252 conn_set_state(c
, conn_mwrite
);
1253 c
->write_and_go
= conn_new_cmd
;
1254 /* Remember this command so we can garbage collect it later */
1257 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1258 c
->thread
->stats
.touch_cmds
++;
1259 c
->thread
->stats
.touch_misses
++;
1260 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1262 MEMCACHED_COMMAND_TOUCH(c
->sfd
, key
, nkey
, -1, 0);
1265 conn_set_state(c
, conn_new_cmd
);
1267 if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1268 char *ofs
= c
->wbuf
+ sizeof(protocol_binary_response_header
);
1269 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
,
1271 memcpy(ofs
, key
, nkey
);
1272 add_iov(c
, ofs
, nkey
);
1273 conn_set_state(c
, conn_mwrite
);
1274 c
->write_and_go
= conn_new_cmd
;
1276 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1281 if (settings
.detail_enabled
) {
1282 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
1286 static void process_bin_get(conn
*c
) {
1289 protocol_binary_response_get
* rsp
= (protocol_binary_response_get
*)c
->wbuf
;
1290 char* key
= binary_get_key(c
);
1291 size_t nkey
= c
->binary_header
.request
.keylen
;
1293 if (settings
.verbose
> 1) {
1295 fprintf(stderr
, "<%d GET ", c
->sfd
);
1296 for (ii
= 0; ii
< nkey
; ++ii
) {
1297 fprintf(stderr
, "%c", key
[ii
]);
1299 fprintf(stderr
, "\n");
1302 it
= item_get(key
, nkey
);
1304 /* the length has two unnecessary bytes ("\r\n") */
1305 uint16_t keylen
= 0;
1306 uint32_t bodylen
= sizeof(rsp
->message
.body
) + (it
->nbytes
- 2);
1309 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1310 c
->thread
->stats
.get_cmds
++;
1311 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].get_hits
++;
1312 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1314 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
1315 it
->nbytes
, ITEM_get_cas(it
));
1317 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1321 add_bin_header(c
, 0, sizeof(rsp
->message
.body
), keylen
, bodylen
);
1322 rsp
->message
.header
.response
.cas
= htonll(ITEM_get_cas(it
));
1325 rsp
->message
.body
.flags
= htonl(strtoul(ITEM_suffix(it
), NULL
, 10));
1326 add_iov(c
, &rsp
->message
.body
, sizeof(rsp
->message
.body
));
1328 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1329 add_iov(c
, ITEM_key(it
), nkey
);
1332 /* Add the data minus the CRLF */
1333 add_iov(c
, ITEM_data(it
), it
->nbytes
- 2);
1334 conn_set_state(c
, conn_mwrite
);
1335 c
->write_and_go
= conn_new_cmd
;
1336 /* Remember this command so we can garbage collect it later */
1339 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1340 c
->thread
->stats
.get_cmds
++;
1341 c
->thread
->stats
.get_misses
++;
1342 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1344 MEMCACHED_COMMAND_GET(c
->sfd
, key
, nkey
, -1, 0);
1347 conn_set_state(c
, conn_new_cmd
);
1349 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1350 char *ofs
= c
->wbuf
+ sizeof(protocol_binary_response_header
);
1351 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
,
1353 memcpy(ofs
, key
, nkey
);
1354 add_iov(c
, ofs
, nkey
);
1355 conn_set_state(c
, conn_mwrite
);
1356 c
->write_and_go
= conn_new_cmd
;
1358 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1363 if (settings
.detail_enabled
) {
1364 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
1368 static void append_bin_stats(const char *key
, const uint16_t klen
,
1369 const char *val
, const uint32_t vlen
,
1371 char *buf
= c
->stats
.buffer
+ c
->stats
.offset
;
1372 uint32_t bodylen
= klen
+ vlen
;
1373 protocol_binary_response_header header
= {
1374 .response
.magic
= (uint8_t)PROTOCOL_BINARY_RES
,
1375 .response
.opcode
= PROTOCOL_BINARY_CMD_STAT
,
1376 .response
.keylen
= (uint16_t)htons(klen
),
1377 .response
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
,
1378 .response
.bodylen
= htonl(bodylen
),
1379 .response
.opaque
= c
->opaque
1382 memcpy(buf
, header
.bytes
, sizeof(header
.response
));
1383 buf
+= sizeof(header
.response
);
1386 memcpy(buf
, key
, klen
);
1390 memcpy(buf
, val
, vlen
);
1394 c
->stats
.offset
+= sizeof(header
.response
) + bodylen
;
1397 static void append_ascii_stats(const char *key
, const uint16_t klen
,
1398 const char *val
, const uint32_t vlen
,
1400 char *pos
= c
->stats
.buffer
+ c
->stats
.offset
;
1401 uint32_t nbytes
= 0;
1402 int remaining
= c
->stats
.size
- c
->stats
.offset
;
1403 int room
= remaining
- 1;
1405 if (klen
== 0 && vlen
== 0) {
1406 nbytes
= snprintf(pos
, room
, "END\r\n");
1407 } else if (vlen
== 0) {
1408 nbytes
= snprintf(pos
, room
, "STAT %s\r\n", key
);
1410 nbytes
= snprintf(pos
, room
, "STAT %s %s\r\n", key
, val
);
1413 c
->stats
.offset
+= nbytes
;
1416 static bool grow_stats_buf(conn
*c
, size_t needed
) {
1417 size_t nsize
= c
->stats
.size
;
1418 size_t available
= nsize
- c
->stats
.offset
;
1421 /* Special case: No buffer -- need to allocate fresh */
1422 if (c
->stats
.buffer
== NULL
) {
1424 available
= c
->stats
.size
= c
->stats
.offset
= 0;
1427 while (needed
> available
) {
1430 available
= nsize
- c
->stats
.offset
;
1433 if (nsize
!= c
->stats
.size
) {
1434 char *ptr
= realloc(c
->stats
.buffer
, nsize
);
1436 c
->stats
.buffer
= ptr
;
1437 c
->stats
.size
= nsize
;
1446 static void append_stats(const char *key
, const uint16_t klen
,
1447 const char *val
, const uint32_t vlen
,
1450 /* value without a key is invalid */
1451 if (klen
== 0 && vlen
> 0) {
1455 conn
*c
= (conn
*)cookie
;
1457 if (c
->protocol
== binary_prot
) {
1458 size_t needed
= vlen
+ klen
+ sizeof(protocol_binary_response_header
);
1459 if (!grow_stats_buf(c
, needed
)) {
1462 append_bin_stats(key
, klen
, val
, vlen
, c
);
1464 size_t needed
= vlen
+ klen
+ 10; // 10 == "STAT = \r\n"
1465 if (!grow_stats_buf(c
, needed
)) {
1468 append_ascii_stats(key
, klen
, val
, vlen
, c
);
1471 assert(c
->stats
.offset
<= c
->stats
.size
);
1474 static void process_bin_stat(conn
*c
) {
1475 char *subcommand
= binary_get_key(c
);
1476 size_t nkey
= c
->binary_header
.request
.keylen
;
1478 if (settings
.verbose
> 1) {
1480 fprintf(stderr
, "<%d STATS ", c
->sfd
);
1481 for (ii
= 0; ii
< nkey
; ++ii
) {
1482 fprintf(stderr
, "%c", subcommand
[ii
]);
1484 fprintf(stderr
, "\n");
1488 /* request all statistics */
1489 server_stats(&append_stats
, c
);
1490 (void)get_stats(NULL
, 0, &append_stats
, c
);
1491 } else if (strncmp(subcommand
, "reset", 5) == 0) {
1493 } else if (strncmp(subcommand
, "settings", 8) == 0) {
1494 process_stat_settings(&append_stats
, c
);
1495 } else if (strncmp(subcommand
, "detail", 6) == 0) {
1496 char *subcmd_pos
= subcommand
+ 6;
1497 if (strncmp(subcmd_pos
, " dump", 5) == 0) {
1499 char *dump_buf
= stats_prefix_dump(&len
);
1500 if (dump_buf
== NULL
|| len
<= 0) {
1501 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1504 append_stats("detailed", strlen("detailed"), dump_buf
, len
, c
);
1507 } else if (strncmp(subcmd_pos
, " on", 3) == 0) {
1508 settings
.detail_enabled
= 1;
1509 } else if (strncmp(subcmd_pos
, " off", 4) == 0) {
1510 settings
.detail_enabled
= 0;
1512 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1516 if (get_stats(subcommand
, nkey
, &append_stats
, c
)) {
1517 if (c
->stats
.buffer
== NULL
) {
1518 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1520 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
1521 c
->stats
.buffer
= NULL
;
1524 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1530 /* Append termination package and start the transfer */
1531 append_stats(NULL
, 0, NULL
, 0, c
);
1532 if (c
->stats
.buffer
== NULL
) {
1533 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1535 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
1536 c
->stats
.buffer
= NULL
;
1540 static void bin_read_key(conn
*c
, enum bin_substates next_substate
, int extra
) {
1542 c
->substate
= next_substate
;
1543 c
->rlbytes
= c
->keylen
+ extra
;
1545 /* Ok... do we have room for the extras and the key in the input buffer? */
1546 ptrdiff_t offset
= c
->rcurr
+ sizeof(protocol_binary_request_header
) - c
->rbuf
;
1547 if (c
->rlbytes
> c
->rsize
- offset
) {
1548 size_t nsize
= c
->rsize
;
1549 size_t size
= c
->rlbytes
+ sizeof(protocol_binary_request_header
);
1551 while (size
> nsize
) {
1555 if (nsize
!= c
->rsize
) {
1556 if (settings
.verbose
> 1) {
1557 fprintf(stderr
, "%d: Need to grow buffer from %lu to %lu\n",
1558 c
->sfd
, (unsigned long)c
->rsize
, (unsigned long)nsize
);
1560 char *newm
= realloc(c
->rbuf
, nsize
);
1562 if (settings
.verbose
) {
1563 fprintf(stderr
, "%d: Failed to grow buffer.. closing connection\n",
1566 conn_set_state(c
, conn_closing
);
1571 /* rcurr should point to the same offset in the packet */
1572 c
->rcurr
= c
->rbuf
+ offset
- sizeof(protocol_binary_request_header
);
1575 if (c
->rbuf
!= c
->rcurr
) {
1576 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1578 if (settings
.verbose
> 1) {
1579 fprintf(stderr
, "%d: Repack input buffer\n", c
->sfd
);
1584 /* preserve the header in the buffer.. */
1585 c
->ritem
= c
->rcurr
+ sizeof(protocol_binary_request_header
);
1586 conn_set_state(c
, conn_nread
);
1589 /* Just write an error message and disconnect the client */
1590 static void handle_binary_protocol_error(conn
*c
) {
1591 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_EINVAL
, 0);
1592 if (settings
.verbose
) {
1593 fprintf(stderr
, "Protocol error (opcode %02x), close connection %d\n",
1594 c
->binary_header
.request
.opcode
, c
->sfd
);
1596 c
->write_and_go
= conn_closing
;
1599 static void init_sasl_conn(conn
*c
) {
1601 /* should something else be returned? */
1605 if (!c
->sasl_conn
) {
1606 int result
=sasl_server_new("memcached",
1608 my_sasl_hostname
[0] ? my_sasl_hostname
: NULL
,
1610 NULL
, 0, &c
->sasl_conn
);
1611 if (result
!= SASL_OK
) {
1612 if (settings
.verbose
) {
1613 fprintf(stderr
, "Failed to initialize SASL conn.\n");
1615 c
->sasl_conn
= NULL
;
1620 static void bin_list_sasl_mechs(conn
*c
) {
1621 // Guard against a disabled SASL.
1622 if (!settings
.sasl
) {
1623 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
,
1624 c
->binary_header
.request
.bodylen
1625 - c
->binary_header
.request
.keylen
);
1630 const char *result_string
= NULL
;
1631 unsigned int string_length
= 0;
1632 int result
=sasl_listmech(c
->sasl_conn
, NULL
,
1633 "", /* What to prepend the string with */
1634 " ", /* What to separate mechanisms with */
1635 "", /* What to append to the string */
1636 &result_string
, &string_length
,
1638 if (result
!= SASL_OK
) {
1639 /* Perhaps there's a better error for this... */
1640 if (settings
.verbose
) {
1641 fprintf(stderr
, "Failed to list SASL mechanisms.\n");
1643 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1646 write_bin_response(c
, (char*)result_string
, 0, 0, string_length
);
1649 static void process_bin_sasl_auth(conn
*c
) {
1650 // Guard for handling disabled SASL on the server.
1651 if (!settings
.sasl
) {
1652 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
,
1653 c
->binary_header
.request
.bodylen
1654 - c
->binary_header
.request
.keylen
);
1658 assert(c
->binary_header
.request
.extlen
== 0);
1660 int nkey
= c
->binary_header
.request
.keylen
;
1661 int vlen
= c
->binary_header
.request
.bodylen
- nkey
;
1663 if (nkey
> MAX_SASL_MECH_LEN
) {
1664 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_EINVAL
, vlen
);
1665 c
->write_and_go
= conn_swallow
;
1669 char *key
= binary_get_key(c
);
1672 item
*it
= item_alloc(key
, nkey
, 0, 0, vlen
);
1675 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
1676 c
->write_and_go
= conn_swallow
;
1681 c
->ritem
= ITEM_data(it
);
1683 conn_set_state(c
, conn_nread
);
1684 c
->substate
= bin_reading_sasl_auth_data
;
1687 static void process_bin_complete_sasl_auth(conn
*c
) {
1688 assert(settings
.sasl
);
1689 const char *out
= NULL
;
1690 unsigned int outlen
= 0;
1695 int nkey
= c
->binary_header
.request
.keylen
;
1696 int vlen
= c
->binary_header
.request
.bodylen
- nkey
;
1699 memcpy(mech
, ITEM_key((item
*)c
->item
), nkey
);
1702 if (settings
.verbose
)
1703 fprintf(stderr
, "mech: ``%s'' with %d bytes of data\n", mech
, vlen
);
1705 const char *challenge
= vlen
== 0 ? NULL
: ITEM_data((item
*) c
->item
);
1710 case PROTOCOL_BINARY_CMD_SASL_AUTH
:
1711 result
= sasl_server_start(c
->sasl_conn
, mech
,
1715 case PROTOCOL_BINARY_CMD_SASL_STEP
:
1716 result
= sasl_server_step(c
->sasl_conn
,
1721 assert(false); /* CMD should be one of the above */
1722 /* This code is pretty much impossible, but makes the compiler
1724 if (settings
.verbose
) {
1725 fprintf(stderr
, "Unhandled command %d with challenge %s\n",
1731 item_unlink(c
->item
);
1733 if (settings
.verbose
) {
1734 fprintf(stderr
, "sasl result code: %d\n", result
);
1739 write_bin_response(c
, "Authenticated", 0, 0, strlen("Authenticated"));
1740 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1741 c
->thread
->stats
.auth_cmds
++;
1742 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1745 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE
, 0, 0, outlen
);
1747 add_iov(c
, out
, outlen
);
1749 conn_set_state(c
, conn_mwrite
);
1750 c
->write_and_go
= conn_new_cmd
;
1753 if (settings
.verbose
)
1754 fprintf(stderr
, "Unknown sasl response: %d\n", result
);
1755 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1756 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1757 c
->thread
->stats
.auth_cmds
++;
1758 c
->thread
->stats
.auth_errors
++;
1759 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1763 static bool authenticated(conn
*c
) {
1764 assert(settings
.sasl
);
1768 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS
: /* FALLTHROUGH */
1769 case PROTOCOL_BINARY_CMD_SASL_AUTH
: /* FALLTHROUGH */
1770 case PROTOCOL_BINARY_CMD_SASL_STEP
: /* FALLTHROUGH */
1771 case PROTOCOL_BINARY_CMD_VERSION
: /* FALLTHROUGH */
1776 const void *uname
= NULL
;
1777 sasl_getprop(c
->sasl_conn
, SASL_USERNAME
, &uname
);
1782 if (settings
.verbose
> 1) {
1783 fprintf(stderr
, "authenticated() in cmd 0x%02x is %s\n",
1784 c
->cmd
, rv
? "true" : "false");
1790 static void dispatch_bin_command(conn
*c
) {
1791 int protocol_error
= 0;
1793 int extlen
= c
->binary_header
.request
.extlen
;
1794 int keylen
= c
->binary_header
.request
.keylen
;
1795 uint32_t bodylen
= c
->binary_header
.request
.bodylen
;
1797 if (settings
.sasl
&& !authenticated(c
)) {
1798 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1799 c
->write_and_go
= conn_closing
;
1803 MEMCACHED_PROCESS_COMMAND_START(c
->sfd
, c
->rcurr
, c
->rbytes
);
1806 /* binprot supports 16bit keys, but internals are still 8bit */
1807 if (keylen
> KEY_MAX_LENGTH
) {
1808 handle_binary_protocol_error(c
);
1813 case PROTOCOL_BINARY_CMD_SETQ
:
1814 c
->cmd
= PROTOCOL_BINARY_CMD_SET
;
1816 case PROTOCOL_BINARY_CMD_ADDQ
:
1817 c
->cmd
= PROTOCOL_BINARY_CMD_ADD
;
1819 case PROTOCOL_BINARY_CMD_REPLACEQ
:
1820 c
->cmd
= PROTOCOL_BINARY_CMD_REPLACE
;
1822 case PROTOCOL_BINARY_CMD_DELETEQ
:
1823 c
->cmd
= PROTOCOL_BINARY_CMD_DELETE
;
1825 case PROTOCOL_BINARY_CMD_INCREMENTQ
:
1826 c
->cmd
= PROTOCOL_BINARY_CMD_INCREMENT
;
1828 case PROTOCOL_BINARY_CMD_DECREMENTQ
:
1829 c
->cmd
= PROTOCOL_BINARY_CMD_DECREMENT
;
1831 case PROTOCOL_BINARY_CMD_QUITQ
:
1832 c
->cmd
= PROTOCOL_BINARY_CMD_QUIT
;
1834 case PROTOCOL_BINARY_CMD_FLUSHQ
:
1835 c
->cmd
= PROTOCOL_BINARY_CMD_FLUSH
;
1837 case PROTOCOL_BINARY_CMD_APPENDQ
:
1838 c
->cmd
= PROTOCOL_BINARY_CMD_APPEND
;
1840 case PROTOCOL_BINARY_CMD_PREPENDQ
:
1841 c
->cmd
= PROTOCOL_BINARY_CMD_PREPEND
;
1843 case PROTOCOL_BINARY_CMD_GETQ
:
1844 c
->cmd
= PROTOCOL_BINARY_CMD_GET
;
1846 case PROTOCOL_BINARY_CMD_GETKQ
:
1847 c
->cmd
= PROTOCOL_BINARY_CMD_GETK
;
1849 case PROTOCOL_BINARY_CMD_GATQ
:
1850 c
->cmd
= PROTOCOL_BINARY_CMD_GAT
;
1852 case PROTOCOL_BINARY_CMD_GATKQ
:
1853 c
->cmd
= PROTOCOL_BINARY_CMD_GAT
;
1860 case PROTOCOL_BINARY_CMD_VERSION
:
1861 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1862 write_bin_response(c
, VERSION
, 0, 0, strlen(VERSION
));
1867 case PROTOCOL_BINARY_CMD_FLUSH
:
1868 if (keylen
== 0 && bodylen
== extlen
&& (extlen
== 0 || extlen
== 4)) {
1869 bin_read_key(c
, bin_read_flush_exptime
, extlen
);
1874 case PROTOCOL_BINARY_CMD_NOOP
:
1875 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1876 write_bin_response(c
, NULL
, 0, 0, 0);
1881 case PROTOCOL_BINARY_CMD_SET
: /* FALLTHROUGH */
1882 case PROTOCOL_BINARY_CMD_ADD
: /* FALLTHROUGH */
1883 case PROTOCOL_BINARY_CMD_REPLACE
:
1884 if (extlen
== 8 && keylen
!= 0 && bodylen
>= (keylen
+ 8)) {
1885 bin_read_key(c
, bin_reading_set_header
, 8);
1890 case PROTOCOL_BINARY_CMD_GETQ
: /* FALLTHROUGH */
1891 case PROTOCOL_BINARY_CMD_GET
: /* FALLTHROUGH */
1892 case PROTOCOL_BINARY_CMD_GETKQ
: /* FALLTHROUGH */
1893 case PROTOCOL_BINARY_CMD_GETK
:
1894 if (extlen
== 0 && bodylen
== keylen
&& keylen
> 0) {
1895 bin_read_key(c
, bin_reading_get_key
, 0);
1900 case PROTOCOL_BINARY_CMD_DELETE
:
1901 if (keylen
> 0 && extlen
== 0 && bodylen
== keylen
) {
1902 bin_read_key(c
, bin_reading_del_header
, extlen
);
1907 case PROTOCOL_BINARY_CMD_INCREMENT
:
1908 case PROTOCOL_BINARY_CMD_DECREMENT
:
1909 if (keylen
> 0 && extlen
== 20 && bodylen
== (keylen
+ extlen
)) {
1910 bin_read_key(c
, bin_reading_incr_header
, 20);
1915 case PROTOCOL_BINARY_CMD_APPEND
:
1916 case PROTOCOL_BINARY_CMD_PREPEND
:
1917 if (keylen
> 0 && extlen
== 0) {
1918 bin_read_key(c
, bin_reading_set_header
, 0);
1923 case PROTOCOL_BINARY_CMD_STAT
:
1925 bin_read_key(c
, bin_reading_stat
, 0);
1930 case PROTOCOL_BINARY_CMD_QUIT
:
1931 if (keylen
== 0 && extlen
== 0 && bodylen
== 0) {
1932 write_bin_response(c
, NULL
, 0, 0, 0);
1933 c
->write_and_go
= conn_closing
;
1935 conn_set_state(c
, conn_closing
);
1941 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS
:
1942 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1943 bin_list_sasl_mechs(c
);
1948 case PROTOCOL_BINARY_CMD_SASL_AUTH
:
1949 case PROTOCOL_BINARY_CMD_SASL_STEP
:
1950 if (extlen
== 0 && keylen
!= 0) {
1951 bin_read_key(c
, bin_reading_sasl_auth
, 0);
1956 case PROTOCOL_BINARY_CMD_TOUCH
:
1957 case PROTOCOL_BINARY_CMD_GAT
:
1958 case PROTOCOL_BINARY_CMD_GATQ
:
1959 case PROTOCOL_BINARY_CMD_GATK
:
1960 case PROTOCOL_BINARY_CMD_GATKQ
:
1961 if (extlen
== 4 && keylen
!= 0) {
1962 bin_read_key(c
, bin_reading_touch_key
, 4);
1968 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
, bodylen
);
1972 handle_binary_protocol_error(c
);
1975 static void process_bin_update(conn
*c
) {
1980 protocol_binary_request_set
* req
= binary_get_request(c
);
1984 key
= binary_get_key(c
);
1985 nkey
= c
->binary_header
.request
.keylen
;
1987 /* fix byteorder in the request */
1988 req
->message
.body
.flags
= ntohl(req
->message
.body
.flags
);
1989 req
->message
.body
.expiration
= ntohl(req
->message
.body
.expiration
);
1991 vlen
= c
->binary_header
.request
.bodylen
- (nkey
+ c
->binary_header
.request
.extlen
);
1993 if (settings
.verbose
> 1) {
1995 if (c
->cmd
== PROTOCOL_BINARY_CMD_ADD
) {
1996 fprintf(stderr
, "<%d ADD ", c
->sfd
);
1997 } else if (c
->cmd
== PROTOCOL_BINARY_CMD_SET
) {
1998 fprintf(stderr
, "<%d SET ", c
->sfd
);
2000 fprintf(stderr
, "<%d REPLACE ", c
->sfd
);
2002 for (ii
= 0; ii
< nkey
; ++ii
) {
2003 fprintf(stderr
, "%c", key
[ii
]);
2006 fprintf(stderr
, " Value len is %d", vlen
);
2007 fprintf(stderr
, "\n");
2010 if (settings
.detail_enabled
) {
2011 stats_prefix_record_set(key
, nkey
);
2014 it
= item_alloc(key
, nkey
, req
->message
.body
.flags
,
2015 realtime(req
->message
.body
.expiration
), vlen
+2);
2018 if (! item_size_ok(nkey
, req
->message
.body
.flags
, vlen
+ 2)) {
2019 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_E2BIG
, vlen
);
2021 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
2024 /* Avoid stale data persisting in cache because we failed alloc.
2025 * Unacceptable for SET. Anywhere else too? */
2026 if (c
->cmd
== PROTOCOL_BINARY_CMD_SET
) {
2027 it
= item_get(key
, nkey
);
2034 /* swallow the data line */
2035 c
->write_and_go
= conn_swallow
;
2039 ITEM_set_cas(it
, c
->binary_header
.request
.cas
);
2042 case PROTOCOL_BINARY_CMD_ADD
:
2045 case PROTOCOL_BINARY_CMD_SET
:
2048 case PROTOCOL_BINARY_CMD_REPLACE
:
2049 c
->cmd
= NREAD_REPLACE
;
2055 if (ITEM_get_cas(it
) != 0) {
2060 c
->ritem
= ITEM_data(it
);
2062 conn_set_state(c
, conn_nread
);
2063 c
->substate
= bin_read_set_value
;
2066 static void process_bin_append_prepend(conn
*c
) {
2074 key
= binary_get_key(c
);
2075 nkey
= c
->binary_header
.request
.keylen
;
2076 vlen
= c
->binary_header
.request
.bodylen
- nkey
;
2078 if (settings
.verbose
> 1) {
2079 fprintf(stderr
, "Value len is %d\n", vlen
);
2082 if (settings
.detail_enabled
) {
2083 stats_prefix_record_set(key
, nkey
);
2086 it
= item_alloc(key
, nkey
, 0, 0, vlen
+2);
2089 if (! item_size_ok(nkey
, 0, vlen
+ 2)) {
2090 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_E2BIG
, vlen
);
2092 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
2094 /* swallow the data line */
2095 c
->write_and_go
= conn_swallow
;
2099 ITEM_set_cas(it
, c
->binary_header
.request
.cas
);
2102 case PROTOCOL_BINARY_CMD_APPEND
:
2103 c
->cmd
= NREAD_APPEND
;
2105 case PROTOCOL_BINARY_CMD_PREPEND
:
2106 c
->cmd
= NREAD_PREPEND
;
2113 c
->ritem
= ITEM_data(it
);
2115 conn_set_state(c
, conn_nread
);
2116 c
->substate
= bin_read_set_value
;
2119 static void process_bin_flush(conn
*c
) {
2121 protocol_binary_request_flush
* req
= binary_get_request(c
);
2123 if (c
->binary_header
.request
.extlen
== sizeof(req
->message
.body
)) {
2124 exptime
= ntohl(req
->message
.body
.expiration
);
2128 settings
.oldest_live
= realtime(exptime
) - 1;
2130 settings
.oldest_live
= current_time
- 1;
2132 item_flush_expired();
2134 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2135 c
->thread
->stats
.flush_cmds
++;
2136 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2138 write_bin_response(c
, NULL
, 0, 0, 0);
2141 static void process_bin_delete(conn
*c
) {
2144 protocol_binary_request_delete
* req
= binary_get_request(c
);
2146 char* key
= binary_get_key(c
);
2147 size_t nkey
= c
->binary_header
.request
.keylen
;
2151 if (settings
.verbose
> 1) {
2152 fprintf(stderr
, "Deleting %s\n", key
);
2155 if (settings
.detail_enabled
) {
2156 stats_prefix_record_delete(key
, nkey
);
2159 it
= item_get(key
, nkey
);
2161 uint64_t cas
= ntohll(req
->message
.header
.request
.cas
);
2162 if (cas
== 0 || cas
== ITEM_get_cas(it
)) {
2163 MEMCACHED_COMMAND_DELETE(c
->sfd
, ITEM_key(it
), it
->nkey
);
2164 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2165 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].delete_hits
++;
2166 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2168 write_bin_response(c
, NULL
, 0, 0, 0);
2170 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
2172 item_remove(it
); /* release our reference */
2174 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
2175 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2176 c
->thread
->stats
.delete_misses
++;
2177 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2181 static void complete_nread_binary(conn
*c
) {
2183 assert(c
->cmd
>= 0);
2185 switch(c
->substate
) {
2186 case bin_reading_set_header
:
2187 if (c
->cmd
== PROTOCOL_BINARY_CMD_APPEND
||
2188 c
->cmd
== PROTOCOL_BINARY_CMD_PREPEND
) {
2189 process_bin_append_prepend(c
);
2191 process_bin_update(c
);
2194 case bin_read_set_value
:
2195 complete_update_bin(c
);
2197 case bin_reading_get_key
:
2200 case bin_reading_touch_key
:
2201 process_bin_touch(c
);
2203 case bin_reading_stat
:
2204 process_bin_stat(c
);
2206 case bin_reading_del_header
:
2207 process_bin_delete(c
);
2209 case bin_reading_incr_header
:
2210 complete_incr_bin(c
);
2212 case bin_read_flush_exptime
:
2213 process_bin_flush(c
);
2215 case bin_reading_sasl_auth
:
2216 process_bin_sasl_auth(c
);
2218 case bin_reading_sasl_auth_data
:
2219 process_bin_complete_sasl_auth(c
);
2222 fprintf(stderr
, "Not handling substate %d\n", c
->substate
);
2227 static void reset_cmd_handler(conn
*c
) {
2229 c
->substate
= bin_no_state
;
2230 if(c
->item
!= NULL
) {
2231 item_remove(c
->item
);
2235 if (c
->rbytes
> 0) {
2236 conn_set_state(c
, conn_parse_cmd
);
2238 conn_set_state(c
, conn_waiting
);
2242 static void complete_nread(conn
*c
) {
2244 assert(c
->protocol
== ascii_prot
2245 || c
->protocol
== binary_prot
);
2247 if (c
->protocol
== ascii_prot
) {
2248 complete_nread_ascii(c
);
2249 } else if (c
->protocol
== binary_prot
) {
2250 complete_nread_binary(c
);
2255 * Stores an item in the cache according to the semantics of one of the set
2256 * commands. In threaded mode, this is protected by the cache lock.
2258 * Returns the state of storage.
2260 enum store_item_type
do_store_item(item
*it
, int comm
, conn
*c
, const uint32_t hv
) {
2261 char *key
= ITEM_key(it
);
2262 item
*old_it
= do_item_get(key
, it
->nkey
, hv
);
2263 enum store_item_type stored
= NOT_STORED
;
2265 item
*new_it
= NULL
;
2268 if (old_it
!= NULL
&& comm
== NREAD_ADD
) {
2269 /* add only adds a nonexistent item, but promote to head of LRU */
2270 do_item_update(old_it
);
2271 } else if (!old_it
&& (comm
== NREAD_REPLACE
2272 || comm
== NREAD_APPEND
|| comm
== NREAD_PREPEND
))
2274 /* replace only replaces an existing value; don't store */
2275 } else if (comm
== NREAD_CAS
) {
2276 /* validate cas operation */
2277 if(old_it
== NULL
) {
2280 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2281 c
->thread
->stats
.cas_misses
++;
2282 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2284 else if (ITEM_get_cas(it
) == ITEM_get_cas(old_it
)) {
2286 // it and old_it may belong to different classes.
2287 // I'm updating the stats for the one that's getting pushed out
2288 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2289 c
->thread
->stats
.slab_stats
[old_it
->slabs_clsid
].cas_hits
++;
2290 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2292 item_replace(old_it
, it
, hv
);
2295 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2296 c
->thread
->stats
.slab_stats
[old_it
->slabs_clsid
].cas_badval
++;
2297 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2299 if(settings
.verbose
> 1) {
2300 fprintf(stderr
, "CAS: failure: expected %llu, got %llu\n",
2301 (unsigned long long)ITEM_get_cas(old_it
),
2302 (unsigned long long)ITEM_get_cas(it
));
2308 * Append - combine new and old record into single one. Here it's
2309 * atomic and thread-safe.
2311 if (comm
== NREAD_APPEND
|| comm
== NREAD_PREPEND
) {
2315 if (ITEM_get_cas(it
) != 0) {
2316 // CAS much be equal
2317 if (ITEM_get_cas(it
) != ITEM_get_cas(old_it
)) {
2322 if (stored
== NOT_STORED
) {
2323 /* we have it and old_it here - alloc memory to hold both */
2324 /* flags was already lost - so recover them from ITEM_suffix(it) */
2326 flags
= (int) strtol(ITEM_suffix(old_it
), (char **) NULL
, 10);
2328 new_it
= item_alloc(key
, it
->nkey
, flags
, old_it
->exptime
, it
->nbytes
+ old_it
->nbytes
- 2 /* CRLF */);
2330 if (new_it
== NULL
) {
2331 /* SERVER_ERROR out of memory */
2333 do_item_remove(old_it
);
2338 /* copy data from it and old_it to new_it */
2340 if (comm
== NREAD_APPEND
) {
2341 memcpy(ITEM_data(new_it
), ITEM_data(old_it
), old_it
->nbytes
);
2342 memcpy(ITEM_data(new_it
) + old_it
->nbytes
- 2 /* CRLF */, ITEM_data(it
), it
->nbytes
);
2345 memcpy(ITEM_data(new_it
), ITEM_data(it
), it
->nbytes
);
2346 memcpy(ITEM_data(new_it
) + it
->nbytes
- 2 /* CRLF */, ITEM_data(old_it
), old_it
->nbytes
);
2353 if (stored
== NOT_STORED
) {
2355 item_replace(old_it
, it
, hv
);
2357 do_item_link(it
, hv
);
2359 c
->cas
= ITEM_get_cas(it
);
2366 do_item_remove(old_it
); /* release our reference */
2368 do_item_remove(new_it
);
2370 if (stored
== STORED
) {
2371 c
->cas
= ITEM_get_cas(it
);
2377 typedef struct token_s
{
2382 #define COMMAND_TOKEN 0
2383 #define SUBCOMMAND_TOKEN 1
2386 #define MAX_TOKENS 8
2389 * Tokenize the command string by replacing whitespace with '\0' and update
2390 * the token array tokens with pointer to start of each token and length.
2391 * Returns total number of tokens. The last valid token is the terminal
2392 * token (value points to the first unprocessed character of the string and
2397 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2398 * for(int ix = 0; tokens[ix].length != 0; ix++) {
2401 * ncommand = tokens[ix].value - command;
2402 * command = tokens[ix].value;
2405 static size_t tokenize_command(char *command
, token_t
*tokens
, const size_t max_tokens
) {
2408 size_t len
= strlen(command
);
2411 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
2414 for (i
= 0; i
< len
; i
++) {
2417 tokens
[ntokens
].value
= s
;
2418 tokens
[ntokens
].length
= e
- s
;
2421 if (ntokens
== max_tokens
- 1) {
2423 s
= e
; /* so we don't add an extra token */
2433 tokens
[ntokens
].value
= s
;
2434 tokens
[ntokens
].length
= e
- s
;
2439 * If we scanned the whole string, the terminal value pointer is null,
2440 * otherwise it is the first unprocessed character.
2442 tokens
[ntokens
].value
= *e
== '\0' ? NULL
: e
;
2443 tokens
[ntokens
].length
= 0;
2449 /* set up a connection to write a buffer then free it, used for stats */
2450 static void write_and_free(conn
*c
, char *buf
, int bytes
) {
2452 c
->write_and_free
= buf
;
2455 conn_set_state(c
, conn_write
);
2456 c
->write_and_go
= conn_new_cmd
;
2458 out_string(c
, "SERVER_ERROR out of memory writing stats");
2462 static inline bool set_noreply_maybe(conn
*c
, token_t
*tokens
, size_t ntokens
)
2464 int noreply_index
= ntokens
- 2;
2467 NOTE: this function is not the first place where we are going to
2468 send the reply. We could send it instead from process_command()
2469 if the request line has wrong number of tokens. However parsing
2470 malformed line for "noreply" option is not reliable anyway, so
2473 if (tokens
[noreply_index
].value
2474 && strcmp(tokens
[noreply_index
].value
, "noreply") == 0) {
2480 void append_stat(const char *name
, ADD_STAT add_stats
, conn
*c
,
2481 const char *fmt
, ...) {
2482 char val_str
[STAT_VAL_LEN
];
2492 vlen
= vsnprintf(val_str
, sizeof(val_str
) - 1, fmt
, ap
);
2495 add_stats(name
, strlen(name
), val_str
, vlen
, c
);
2498 inline static void process_stats_detail(conn
*c
, const char *command
) {
2501 if (strcmp(command
, "on") == 0) {
2502 settings
.detail_enabled
= 1;
2503 out_string(c
, "OK");
2505 else if (strcmp(command
, "off") == 0) {
2506 settings
.detail_enabled
= 0;
2507 out_string(c
, "OK");
2509 else if (strcmp(command
, "dump") == 0) {
2511 char *stats
= stats_prefix_dump(&len
);
2512 write_and_free(c
, stats
, len
);
2515 out_string(c
, "CLIENT_ERROR usage: stats detail on|off|dump");
2519 /* return server specific stats only */
2520 static void server_stats(ADD_STAT add_stats
, conn
*c
) {
2521 pid_t pid
= getpid();
2522 rel_time_t now
= current_time
;
2524 struct thread_stats thread_stats
;
2525 threadlocal_stats_aggregate(&thread_stats
);
2526 struct slab_stats slab_stats
;
2527 slab_stats_aggregate(&thread_stats
, &slab_stats
);
2530 struct rusage usage
;
2531 getrusage(RUSAGE_SELF
, &usage
);
2536 APPEND_STAT("pid", "%lu", (long)pid
);
2537 APPEND_STAT("uptime", "%u", now
);
2538 APPEND_STAT("time", "%ld", now
+ (long)process_started
);
2539 APPEND_STAT("version", "%s", VERSION
);
2540 APPEND_STAT("libevent", "%s", event_get_version());
2541 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2544 append_stat("rusage_user", add_stats
, c
, "%ld.%06ld",
2545 (long)usage
.ru_utime
.tv_sec
,
2546 (long)usage
.ru_utime
.tv_usec
);
2547 append_stat("rusage_system", add_stats
, c
, "%ld.%06ld",
2548 (long)usage
.ru_stime
.tv_sec
,
2549 (long)usage
.ru_stime
.tv_usec
);
2552 APPEND_STAT("curr_connections", "%u", stats
.curr_conns
- 1);
2553 APPEND_STAT("total_connections", "%u", stats
.total_conns
);
2554 if (settings
.maxconns_fast
) {
2555 APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats
.rejected_conns
);
2557 APPEND_STAT("connection_structures", "%u", stats
.conn_structs
);
2558 APPEND_STAT("reserved_fds", "%u", stats
.reserved_fds
);
2559 APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats
.get_cmds
);
2560 APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats
.set_cmds
);
2561 APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats
.flush_cmds
);
2562 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats
.touch_cmds
);
2563 APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats
.get_hits
);
2564 APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats
.get_misses
);
2565 APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats
.delete_misses
);
2566 APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats
.delete_hits
);
2567 APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats
.incr_misses
);
2568 APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats
.incr_hits
);
2569 APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats
.decr_misses
);
2570 APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats
.decr_hits
);
2571 APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats
.cas_misses
);
2572 APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats
.cas_hits
);
2573 APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats
.cas_badval
);
2574 APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats
.touch_hits
);
2575 APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats
.touch_misses
);
2576 APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats
.auth_cmds
);
2577 APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats
.auth_errors
);
2578 APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats
.bytes_read
);
2579 APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats
.bytes_written
);
2580 APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings
.maxbytes
);
2581 APPEND_STAT("accepting_conns", "%u", stats
.accepting_conns
);
2582 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats
.listen_disabled_num
);
2583 APPEND_STAT("threads", "%d", settings
.num_threads
);
2584 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats
.conn_yields
);
2585 APPEND_STAT("hash_power_level", "%u", stats
.hash_power_level
);
2586 APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats
.hash_bytes
);
2587 APPEND_STAT("hash_is_expanding", "%u", stats
.hash_is_expanding
);
2588 APPEND_STAT("expired_unfetched", "%llu", stats
.expired_unfetched
);
2589 APPEND_STAT("evicted_unfetched", "%llu", stats
.evicted_unfetched
);
2590 if (settings
.slab_reassign
) {
2591 APPEND_STAT("slab_reassign_running", "%u", stats
.slab_reassign_running
);
2592 APPEND_STAT("slabs_moved", "%llu", stats
.slabs_moved
);
2597 static void process_stat_settings(ADD_STAT add_stats
, void *c
) {
2599 APPEND_STAT("maxbytes", "%u", (unsigned int)settings
.maxbytes
);
2600 APPEND_STAT("maxconns", "%d", settings
.maxconns
);
2601 APPEND_STAT("tcpport", "%d", settings
.port
);
2602 APPEND_STAT("udpport", "%d", settings
.udpport
);
2603 APPEND_STAT("inter", "%s", settings
.inter
? settings
.inter
: "NULL");
2604 APPEND_STAT("verbosity", "%d", settings
.verbose
);
2605 APPEND_STAT("oldest", "%lu", (unsigned long)settings
.oldest_live
);
2606 APPEND_STAT("evictions", "%s", settings
.evict_to_free
? "on" : "off");
2607 APPEND_STAT("domain_socket", "%s",
2608 settings
.socketpath
? settings
.socketpath
: "NULL");
2609 APPEND_STAT("umask", "%o", settings
.access
);
2610 APPEND_STAT("growth_factor", "%.2f", settings
.factor
);
2611 APPEND_STAT("chunk_size", "%d", settings
.chunk_size
);
2612 APPEND_STAT("num_threads", "%d", settings
.num_threads
);
2613 APPEND_STAT("num_threads_per_udp", "%d", settings
.num_threads_per_udp
);
2614 APPEND_STAT("stat_key_prefix", "%c", settings
.prefix_delimiter
);
2615 APPEND_STAT("detail_enabled", "%s",
2616 settings
.detail_enabled
? "yes" : "no");
2617 APPEND_STAT("reqs_per_event", "%d", settings
.reqs_per_event
);
2618 APPEND_STAT("cas_enabled", "%s", settings
.use_cas
? "yes" : "no");
2619 APPEND_STAT("tcp_backlog", "%d", settings
.backlog
);
2620 APPEND_STAT("binding_protocol", "%s",
2621 prot_text(settings
.binding_protocol
));
2622 APPEND_STAT("auth_enabled_sasl", "%s", settings
.sasl
? "yes" : "no");
2623 APPEND_STAT("item_size_max", "%d", settings
.item_size_max
);
2624 APPEND_STAT("maxconns_fast", "%s", settings
.maxconns_fast
? "yes" : "no");
2625 APPEND_STAT("hashpower_init", "%d", settings
.hashpower_init
);
2626 APPEND_STAT("slab_reassign", "%s", settings
.slab_reassign
? "yes" : "no");
2627 APPEND_STAT("slab_automove", "%d", settings
.slab_automove
);
2630 static void process_stat(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
2631 const char *subcommand
= tokens
[SUBCOMMAND_TOKEN
].value
;
2635 out_string(c
, "CLIENT_ERROR bad command line");
2640 server_stats(&append_stats
, c
);
2641 (void)get_stats(NULL
, 0, &append_stats
, c
);
2642 } else if (strcmp(subcommand
, "reset") == 0) {
2644 out_string(c
, "RESET");
2646 } else if (strcmp(subcommand
, "detail") == 0) {
2647 /* NOTE: how to tackle detail with binary? */
2649 process_stats_detail(c
, ""); /* outputs the error message */
2651 process_stats_detail(c
, tokens
[2].value
);
2652 /* Output already generated */
2654 } else if (strcmp(subcommand
, "settings") == 0) {
2655 process_stat_settings(&append_stats
, c
);
2656 } else if (strcmp(subcommand
, "cachedump") == 0) {
2658 unsigned int bytes
, id
, limit
= 0;
2661 out_string(c
, "CLIENT_ERROR bad command line");
2665 if (!safe_strtoul(tokens
[2].value
, &id
) ||
2666 !safe_strtoul(tokens
[3].value
, &limit
)) {
2667 out_string(c
, "CLIENT_ERROR bad command line format");
2671 if (id
>= POWER_LARGEST
) {
2672 out_string(c
, "CLIENT_ERROR Illegal slab id");
2676 buf
= item_cachedump(id
, limit
, &bytes
);
2677 write_and_free(c
, buf
, bytes
);
2680 /* getting here means that the subcommand is either engine specific or
2681 is invalid. query the engine and see. */
2682 if (get_stats(subcommand
, strlen(subcommand
), &append_stats
, c
)) {
2683 if (c
->stats
.buffer
== NULL
) {
2684 out_string(c
, "SERVER_ERROR out of memory writing stats");
2686 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
2687 c
->stats
.buffer
= NULL
;
2690 out_string(c
, "ERROR");
2695 /* append terminator and start the transfer */
2696 append_stats(NULL
, 0, NULL
, 0, c
);
2698 if (c
->stats
.buffer
== NULL
) {
2699 out_string(c
, "SERVER_ERROR out of memory writing stats");
2701 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
2702 c
->stats
.buffer
= NULL
;
2706 /* ntokens is overwritten here... shrug.. */
2707 static inline void process_get_command(conn
*c
, token_t
*tokens
, size_t ntokens
, bool return_cas
) {
2712 token_t
*key_token
= &tokens
[KEY_TOKEN
];
2717 while(key_token
->length
!= 0) {
2719 key
= key_token
->value
;
2720 nkey
= key_token
->length
;
2722 if(nkey
> KEY_MAX_LENGTH
) {
2723 out_string(c
, "CLIENT_ERROR bad command line format");
2727 it
= item_get(key
, nkey
);
2728 if (settings
.detail_enabled
) {
2729 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
2732 if (i
>= c
->isize
) {
2733 item
**new_list
= realloc(c
->ilist
, sizeof(item
*) * c
->isize
* 2);
2736 c
->ilist
= new_list
;
2744 * Construct the response. Each hit adds three elements to the
2745 * outgoing data list:
2748 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
2753 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
2754 it
->nbytes
, ITEM_get_cas(it
));
2755 /* Goofy mid-flight realloc. */
2756 if (i
>= c
->suffixsize
) {
2757 char **new_suffix_list
= realloc(c
->suffixlist
,
2758 sizeof(char *) * c
->suffixsize
* 2);
2759 if (new_suffix_list
) {
2761 c
->suffixlist
= new_suffix_list
;
2768 suffix
= cache_alloc(c
->thread
->suffix_cache
);
2769 if (suffix
== NULL
) {
2770 out_string(c
, "SERVER_ERROR out of memory making CAS suffix");
2774 *(c
->suffixlist
+ i
) = suffix
;
2775 int suffix_len
= snprintf(suffix
, SUFFIX_SIZE
,
2777 (unsigned long long)ITEM_get_cas(it
));
2778 if (add_iov(c
, "VALUE ", 6) != 0 ||
2779 add_iov(c
, ITEM_key(it
), it
->nkey
) != 0 ||
2780 add_iov(c
, ITEM_suffix(it
), it
->nsuffix
- 2) != 0 ||
2781 add_iov(c
, suffix
, suffix_len
) != 0 ||
2782 add_iov(c
, ITEM_data(it
), it
->nbytes
) != 0)
2790 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
2791 it
->nbytes
, ITEM_get_cas(it
));
2792 if (add_iov(c
, "VALUE ", 6) != 0 ||
2793 add_iov(c
, ITEM_key(it
), it
->nkey
) != 0 ||
2794 add_iov(c
, ITEM_suffix(it
), it
->nsuffix
+ it
->nbytes
) != 0)
2802 if (settings
.verbose
> 1)
2803 fprintf(stderr
, ">%d sending key %s\n", c
->sfd
, ITEM_key(it
));
2805 /* item_get() has incremented it->refcount for us */
2806 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2807 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].get_hits
++;
2808 c
->thread
->stats
.get_cmds
++;
2809 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2811 *(c
->ilist
+ i
) = it
;
2815 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2816 c
->thread
->stats
.get_misses
++;
2817 c
->thread
->stats
.get_cmds
++;
2818 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2819 MEMCACHED_COMMAND_GET(c
->sfd
, key
, nkey
, -1, 0);
2826 * If the command string hasn't been fully processed, get the next set
2829 if(key_token
->value
!= NULL
) {
2830 ntokens
= tokenize_command(key_token
->value
, tokens
, MAX_TOKENS
);
2834 } while(key_token
->value
!= NULL
);
2836 c
->icurr
= c
->ilist
;
2839 c
->suffixcurr
= c
->suffixlist
;
2843 if (settings
.verbose
> 1)
2844 fprintf(stderr
, ">%d END\n", c
->sfd
);
2847 If the loop was terminated because of out-of-memory, it is not
2848 reliable to add END\r\n to the buffer, because it might not end
2849 in \r\n. So we send SERVER_ERROR instead.
2851 if (key_token
->value
!= NULL
|| add_iov(c
, "END\r\n", 5) != 0
2852 || (IS_UDP(c
->transport
) && build_udp_headers(c
) != 0)) {
2853 out_string(c
, "SERVER_ERROR out of memory writing get response");
2856 conn_set_state(c
, conn_mwrite
);
2863 static void process_update_command(conn
*c
, token_t
*tokens
, const size_t ntokens
, int comm
, bool handle_cas
) {
2867 int32_t exptime_int
= 0;
2870 uint64_t req_cas_id
=0;
2875 set_noreply_maybe(c
, tokens
, ntokens
);
2877 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
2878 out_string(c
, "CLIENT_ERROR bad command line format");
2882 key
= tokens
[KEY_TOKEN
].value
;
2883 nkey
= tokens
[KEY_TOKEN
].length
;
2885 if (! (safe_strtoul(tokens
[2].value
, (uint32_t *)&flags
)
2886 && safe_strtol(tokens
[3].value
, &exptime_int
)
2887 && safe_strtol(tokens
[4].value
, (int32_t *)&vlen
))) {
2888 out_string(c
, "CLIENT_ERROR bad command line format");
2892 /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2893 exptime
= exptime_int
;
2895 /* Negative exptimes can underflow and end up immortal. realtime() will
2896 immediately expire values that are greater than REALTIME_MAXDELTA, but less
2897 than process_started, so lets aim for that. */
2899 exptime
= REALTIME_MAXDELTA
+ 1;
2901 // does cas value exist?
2903 if (!safe_strtoull(tokens
[5].value
, &req_cas_id
)) {
2904 out_string(c
, "CLIENT_ERROR bad command line format");
2910 if (vlen
< 0 || vlen
- 2 < 0) {
2911 out_string(c
, "CLIENT_ERROR bad command line format");
2915 if (settings
.detail_enabled
) {
2916 stats_prefix_record_set(key
, nkey
);
2919 it
= item_alloc(key
, nkey
, flags
, realtime(exptime
), vlen
);
2922 if (! item_size_ok(nkey
, flags
, vlen
))
2923 out_string(c
, "SERVER_ERROR object too large for cache");
2925 out_string(c
, "SERVER_ERROR out of memory storing object");
2926 /* swallow the data line */
2927 c
->write_and_go
= conn_swallow
;
2930 /* Avoid stale data persisting in cache because we failed alloc.
2931 * Unacceptable for SET. Anywhere else too? */
2932 if (comm
== NREAD_SET
) {
2933 it
= item_get(key
, nkey
);
2942 ITEM_set_cas(it
, req_cas_id
);
2945 c
->ritem
= ITEM_data(it
);
2946 c
->rlbytes
= it
->nbytes
;
2948 conn_set_state(c
, conn_nread
);
2951 static void process_touch_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
2954 int32_t exptime_int
= 0;
2959 set_noreply_maybe(c
, tokens
, ntokens
);
2961 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
2962 out_string(c
, "CLIENT_ERROR bad command line format");
2966 key
= tokens
[KEY_TOKEN
].value
;
2967 nkey
= tokens
[KEY_TOKEN
].length
;
2969 if (!safe_strtol(tokens
[2].value
, &exptime_int
)) {
2970 out_string(c
, "CLIENT_ERROR invalid exptime argument");
2974 it
= item_touch(key
, nkey
, realtime(exptime_int
));
2977 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2978 c
->thread
->stats
.touch_cmds
++;
2979 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].touch_hits
++;
2980 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2982 out_string(c
, "TOUCHED");
2985 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2986 c
->thread
->stats
.touch_cmds
++;
2987 c
->thread
->stats
.touch_misses
++;
2988 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2990 out_string(c
, "NOT_FOUND");
2994 static void process_arithmetic_command(conn
*c
, token_t
*tokens
, const size_t ntokens
, const bool incr
) {
2995 char temp
[INCR_MAX_STORAGE_LEN
];
3002 set_noreply_maybe(c
, tokens
, ntokens
);
3004 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
3005 out_string(c
, "CLIENT_ERROR bad command line format");
3009 key
= tokens
[KEY_TOKEN
].value
;
3010 nkey
= tokens
[KEY_TOKEN
].length
;
3012 if (!safe_strtoull(tokens
[2].value
, &delta
)) {
3013 out_string(c
, "CLIENT_ERROR invalid numeric delta argument");
3017 switch(add_delta(c
, key
, nkey
, incr
, delta
, temp
, NULL
)) {
3019 out_string(c
, temp
);
3022 out_string(c
, "CLIENT_ERROR cannot increment or decrement non-numeric value");
3025 out_string(c
, "SERVER_ERROR out of memory");
3027 case DELTA_ITEM_NOT_FOUND
:
3028 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3030 c
->thread
->stats
.incr_misses
++;
3032 c
->thread
->stats
.decr_misses
++;
3034 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3036 out_string(c
, "NOT_FOUND");
3038 case DELTA_ITEM_CAS_MISMATCH
:
3039 break; /* Should never get here */
3044 * adds a delta value to a numeric item.
3046 * c connection requesting the operation
3048 * incr true to increment value, false to decrement
3049 * delta amount to adjust value by
3050 * buf buffer for response string
3052 * returns a response string to send back to the client.
3054 enum delta_result_type
do_add_delta(conn
*c
, const char *key
, const size_t nkey
,
3055 const bool incr
, const int64_t delta
,
3056 char *buf
, uint64_t *cas
,
3057 const uint32_t hv
) {
3063 it
= do_item_get(key
, nkey
, hv
);
3065 return DELTA_ITEM_NOT_FOUND
;
3068 if (cas
!= NULL
&& *cas
!= 0 && ITEM_get_cas(it
) != *cas
) {
3070 return DELTA_ITEM_CAS_MISMATCH
;
3073 ptr
= ITEM_data(it
);
3075 if (!safe_strtoull(ptr
, &value
)) {
3082 MEMCACHED_COMMAND_INCR(c
->sfd
, ITEM_key(it
), it
->nkey
, value
);
3089 MEMCACHED_COMMAND_DECR(c
->sfd
, ITEM_key(it
), it
->nkey
, value
);
3092 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3094 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].incr_hits
++;
3096 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].decr_hits
++;
3098 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3100 snprintf(buf
, INCR_MAX_STORAGE_LEN
, "%llu", (unsigned long long)value
);
3102 if (res
+ 2 > it
->nbytes
|| it
->refcount
!= 1) { /* need to realloc */
3104 new_it
= item_alloc(ITEM_key(it
), it
->nkey
, atoi(ITEM_suffix(it
) + 1), it
->exptime
, res
+ 2 );
3109 memcpy(ITEM_data(new_it
), buf
, res
);
3110 memcpy(ITEM_data(new_it
) + res
, "\r\n", 2);
3111 item_replace(it
, new_it
, hv
);
3112 // Overwrite the older item's CAS with our new CAS since we're
3113 // returning the CAS of the old item below.
3114 ITEM_set_cas(it
, (settings
.use_cas
) ? ITEM_get_cas(new_it
) : 0);
3115 do_item_remove(new_it
); /* release our reference */
3116 } else { /* replace in-place */
3117 /* When changing the value without replacing the item, we
3118 need to update the CAS on the existing item. */
3119 mutex_lock(&cache_lock
); /* FIXME */
3120 ITEM_set_cas(it
, (settings
.use_cas
) ? get_cas_id() : 0);
3121 mutex_unlock(&cache_lock
);
3123 memcpy(ITEM_data(it
), buf
, res
);
3124 memset(ITEM_data(it
) + res
, ' ', it
->nbytes
- res
- 2);
3129 *cas
= ITEM_get_cas(it
); /* swap the incoming CAS value */
3131 do_item_remove(it
); /* release our reference */
3135 static void process_delete_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3143 bool hold_is_zero
= strcmp(tokens
[KEY_TOKEN
+1].value
, "0") == 0;
3144 bool sets_noreply
= set_noreply_maybe(c
, tokens
, ntokens
);
3145 bool valid
= (ntokens
== 4 && (hold_is_zero
|| sets_noreply
))
3146 || (ntokens
== 5 && hold_is_zero
&& sets_noreply
);
3148 out_string(c
, "CLIENT_ERROR bad command line format. "
3149 "Usage: delete <key> [noreply]");
3155 key
= tokens
[KEY_TOKEN
].value
;
3156 nkey
= tokens
[KEY_TOKEN
].length
;
3158 if(nkey
> KEY_MAX_LENGTH
) {
3159 out_string(c
, "CLIENT_ERROR bad command line format");
3163 if (settings
.detail_enabled
) {
3164 stats_prefix_record_delete(key
, nkey
);
3167 it
= item_get(key
, nkey
);
3169 MEMCACHED_COMMAND_DELETE(c
->sfd
, ITEM_key(it
), it
->nkey
);
3171 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3172 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].delete_hits
++;
3173 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3176 item_remove(it
); /* release our reference */
3177 out_string(c
, "DELETED");
3179 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3180 c
->thread
->stats
.delete_misses
++;
3181 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3183 out_string(c
, "NOT_FOUND");
3187 static void process_verbosity_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3192 set_noreply_maybe(c
, tokens
, ntokens
);
3194 level
= strtoul(tokens
[1].value
, NULL
, 10);
3195 settings
.verbose
= level
> MAX_VERBOSITY_LEVEL
? MAX_VERBOSITY_LEVEL
: level
;
3196 out_string(c
, "OK");
3200 static void process_slabs_automove_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3205 set_noreply_maybe(c
, tokens
, ntokens
);
3207 level
= strtoul(tokens
[2].value
, NULL
, 10);
3209 settings
.slab_automove
= 0;
3210 } else if (level
== 1 || level
== 2) {
3211 settings
.slab_automove
= level
;
3213 out_string(c
, "ERROR");
3216 out_string(c
, "OK");
3220 static void process_command(conn
*c
, char *command
) {
3222 token_t tokens
[MAX_TOKENS
];
3228 MEMCACHED_PROCESS_COMMAND_START(c
->sfd
, c
->rcurr
, c
->rbytes
);
3230 if (settings
.verbose
> 1)
3231 fprintf(stderr
, "<%d %s\n", c
->sfd
, command
);
3234 * for commands set/add/replace, we build an item and read the data
3235 * directly into it, then continue in nread_complete().
3241 if (add_msghdr(c
) != 0) {
3242 out_string(c
, "SERVER_ERROR out of memory preparing response");
3246 ntokens
= tokenize_command(command
, tokens
, MAX_TOKENS
);
3248 ((strcmp(tokens
[COMMAND_TOKEN
].value
, "get") == 0) ||
3249 (strcmp(tokens
[COMMAND_TOKEN
].value
, "bget") == 0))) {
3251 process_get_command(c
, tokens
, ntokens
, false);
3253 } else if ((ntokens
== 6 || ntokens
== 7) &&
3254 ((strcmp(tokens
[COMMAND_TOKEN
].value
, "add") == 0 && (comm
= NREAD_ADD
)) ||
3255 (strcmp(tokens
[COMMAND_TOKEN
].value
, "set") == 0 && (comm
= NREAD_SET
)) ||
3256 (strcmp(tokens
[COMMAND_TOKEN
].value
, "replace") == 0 && (comm
= NREAD_REPLACE
)) ||
3257 (strcmp(tokens
[COMMAND_TOKEN
].value
, "prepend") == 0 && (comm
= NREAD_PREPEND
)) ||
3258 (strcmp(tokens
[COMMAND_TOKEN
].value
, "append") == 0 && (comm
= NREAD_APPEND
)) )) {
3260 process_update_command(c
, tokens
, ntokens
, comm
, false);
3262 } else if ((ntokens
== 7 || ntokens
== 8) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "cas") == 0 && (comm
= NREAD_CAS
))) {
3264 process_update_command(c
, tokens
, ntokens
, comm
, true);
3266 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "incr") == 0)) {
3268 process_arithmetic_command(c
, tokens
, ntokens
, 1);
3270 } else if (ntokens
>= 3 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "gets") == 0)) {
3272 process_get_command(c
, tokens
, ntokens
, true);
3274 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "decr") == 0)) {
3276 process_arithmetic_command(c
, tokens
, ntokens
, 0);
3278 } else if (ntokens
>= 3 && ntokens
<= 5 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "delete") == 0)) {
3280 process_delete_command(c
, tokens
, ntokens
);
3282 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "touch") == 0)) {
3284 process_touch_command(c
, tokens
, ntokens
);
3286 } else if (ntokens
>= 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "stats") == 0)) {
3288 process_stat(c
, tokens
, ntokens
);
3290 } else if (ntokens
>= 2 && ntokens
<= 4 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "flush_all") == 0)) {
3293 set_noreply_maybe(c
, tokens
, ntokens
);
3295 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3296 c
->thread
->stats
.flush_cmds
++;
3297 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3299 if(ntokens
== (c
->noreply
? 3 : 2)) {
3300 settings
.oldest_live
= current_time
- 1;
3301 item_flush_expired();
3302 out_string(c
, "OK");
3306 exptime
= strtol(tokens
[1].value
, NULL
, 10);
3307 if(errno
== ERANGE
) {
3308 out_string(c
, "CLIENT_ERROR bad command line format");
3313 If exptime is zero realtime() would return zero too, and
3314 realtime(exptime) - 1 would overflow to the max unsigned
3315 value. So we process exptime == 0 the same way we do when
3316 no delay is given at all.
3319 settings
.oldest_live
= realtime(exptime
) - 1;
3320 else /* exptime == 0 */
3321 settings
.oldest_live
= current_time
- 1;
3322 item_flush_expired();
3323 out_string(c
, "OK");
3326 } else if (ntokens
== 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "version") == 0)) {
3328 out_string(c
, "VERSION " VERSION
);
3330 } else if (ntokens
== 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "quit") == 0)) {
3332 conn_set_state(c
, conn_closing
);
3334 } else if (ntokens
> 1 && strcmp(tokens
[COMMAND_TOKEN
].value
, "slabs") == 0) {
3335 if (ntokens
== 5 && strcmp(tokens
[COMMAND_TOKEN
+ 1].value
, "reassign") == 0) {
3338 if (settings
.slab_reassign
== false) {
3339 out_string(c
, "CLIENT_ERROR slab reassignment disabled");
3343 src
= strtol(tokens
[2].value
, NULL
, 10);
3344 dst
= strtol(tokens
[3].value
, NULL
, 10);
3346 if (errno
== ERANGE
) {
3347 out_string(c
, "CLIENT_ERROR bad command line format");
3351 rv
= slabs_reassign(src
, dst
);
3354 out_string(c
, "OK");
3356 case REASSIGN_RUNNING
:
3357 out_string(c
, "BUSY currently processing reassign request");
3359 case REASSIGN_BADCLASS
:
3360 out_string(c
, "BADCLASS invalid src or dst class id");
3362 case REASSIGN_NOSPARE
:
3363 out_string(c
, "NOSPARE source class has no spare pages");
3365 case REASSIGN_SRC_DST_SAME
:
3366 out_string(c
, "SAME src and dst class are identical");
3370 } else if (ntokens
== 4 &&
3371 (strcmp(tokens
[COMMAND_TOKEN
+ 1].value
, "automove") == 0)) {
3372 process_slabs_automove_command(c
, tokens
, ntokens
);
3374 out_string(c
, "ERROR");
3376 } else if ((ntokens
== 3 || ntokens
== 4) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "verbosity") == 0)) {
3377 process_verbosity_command(c
, tokens
, ntokens
);
3379 out_string(c
, "ERROR");
3385 * if we have a complete line in the buffer, process it.
3387 static int try_read_command(conn
*c
) {
3389 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
3390 assert(c
->rbytes
> 0);
3392 if (c
->protocol
== negotiating_prot
|| c
->transport
== udp_transport
) {
3393 if ((unsigned char)c
->rbuf
[0] == (unsigned char)PROTOCOL_BINARY_REQ
) {
3394 c
->protocol
= binary_prot
;
3396 c
->protocol
= ascii_prot
;
3399 if (settings
.verbose
> 1) {
3400 fprintf(stderr
, "%d: Client using the %s protocol\n", c
->sfd
,
3401 prot_text(c
->protocol
));
3405 if (c
->protocol
== binary_prot
) {
3406 /* Do we have the complete packet header? */
3407 if (c
->rbytes
< sizeof(c
->binary_header
)) {
3408 /* need more data! */
3412 if (((long)(c
->rcurr
)) % 8 != 0) {
3413 /* must realign input buffer */
3414 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
3416 if (settings
.verbose
> 1) {
3417 fprintf(stderr
, "%d: Realign input buffer\n", c
->sfd
);
3421 protocol_binary_request_header
* req
;
3422 req
= (protocol_binary_request_header
*)c
->rcurr
;
3424 if (settings
.verbose
> 1) {
3425 /* Dump the packet before we convert it to host order */
3427 fprintf(stderr
, "<%d Read binary protocol data:", c
->sfd
);
3428 for (ii
= 0; ii
< sizeof(req
->bytes
); ++ii
) {
3430 fprintf(stderr
, "\n<%d ", c
->sfd
);
3432 fprintf(stderr
, " 0x%02x", req
->bytes
[ii
]);
3434 fprintf(stderr
, "\n");
3437 c
->binary_header
= *req
;
3438 c
->binary_header
.request
.keylen
= ntohs(req
->request
.keylen
);
3439 c
->binary_header
.request
.bodylen
= ntohl(req
->request
.bodylen
);
3440 c
->binary_header
.request
.cas
= ntohll(req
->request
.cas
);
3442 if (c
->binary_header
.request
.magic
!= PROTOCOL_BINARY_REQ
) {
3443 if (settings
.verbose
) {
3444 fprintf(stderr
, "Invalid magic: %x\n",
3445 c
->binary_header
.request
.magic
);
3447 conn_set_state(c
, conn_closing
);
3454 if (add_msghdr(c
) != 0) {
3455 out_string(c
, "SERVER_ERROR out of memory");
3459 c
->cmd
= c
->binary_header
.request
.opcode
;
3460 c
->keylen
= c
->binary_header
.request
.keylen
;
3461 c
->opaque
= c
->binary_header
.request
.opaque
;
3462 /* clear the returned cas value */
3465 dispatch_bin_command(c
);
3467 c
->rbytes
-= sizeof(c
->binary_header
);
3468 c
->rcurr
+= sizeof(c
->binary_header
);
3476 el
= memchr(c
->rcurr
, '\n', c
->rbytes
);
3478 if (c
->rbytes
> 1024) {
3480 * We didn't have a '\n' in the first k. This _has_ to be a
3481 * large multiget, if not we should just nuke the connection.
3483 char *ptr
= c
->rcurr
;
3484 while (*ptr
== ' ') { /* ignore leading whitespaces */
3488 if (ptr
- c
->rcurr
> 100 ||
3489 (strncmp(ptr
, "get ", 4) && strncmp(ptr
, "gets ", 5))) {
3491 conn_set_state(c
, conn_closing
);
3499 if ((el
- c
->rcurr
) > 1 && *(el
- 1) == '\r') {
3504 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
3506 process_command(c
, c
->rcurr
);
3508 c
->rbytes
-= (cont
- c
->rcurr
);
3511 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
3518 * read a UDP request.
3520 static enum try_read_result
try_read_udp(conn
*c
) {
3525 c
->request_addr_size
= sizeof(c
->request_addr
);
3526 res
= recvfrom(c
->sfd
, c
->rbuf
, c
->rsize
,
3527 0, &c
->request_addr
, &c
->request_addr_size
);
3529 unsigned char *buf
= (unsigned char *)c
->rbuf
;
3530 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3531 c
->thread
->stats
.bytes_read
+= res
;
3532 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3534 /* Beginning of UDP packet is the request ID; save it. */
3535 c
->request_id
= buf
[0] * 256 + buf
[1];
3537 /* If this is a multi-packet request, drop it. */
3538 if (buf
[4] != 0 || buf
[5] != 1) {
3539 out_string(c
, "SERVER_ERROR multi-packet request not supported");
3540 return READ_NO_DATA_RECEIVED
;
3543 /* Don't care about any of the rest of the header. */
3545 memmove(c
->rbuf
, c
->rbuf
+ 8, res
);
3549 return READ_DATA_RECEIVED
;
3551 return READ_NO_DATA_RECEIVED
;
3555 * read from network as much as we can, handle buffer overflow and connection
3557 * before reading, move the remaining incomplete fragment of a command
3558 * (if any) to the beginning of the buffer.
3560 * To protect us from someone flooding a connection with bogus data causing
3561 * the connection to eat up all available memory, break out and start looking
3562 * at the data I've got after a number of reallocs...
3564 * @return enum try_read_result
3566 static enum try_read_result
try_read_network(conn
*c
) {
3567 enum try_read_result gotdata
= READ_NO_DATA_RECEIVED
;
3572 if (c
->rcurr
!= c
->rbuf
) {
3573 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
3574 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
3579 if (c
->rbytes
>= c
->rsize
) {
3580 if (num_allocs
== 4) {
3584 char *new_rbuf
= realloc(c
->rbuf
, c
->rsize
* 2);
3586 if (settings
.verbose
> 0)
3587 fprintf(stderr
, "Couldn't realloc input buffer\n");
3588 c
->rbytes
= 0; /* ignore what we read */
3589 out_string(c
, "SERVER_ERROR out of memory reading request");
3590 c
->write_and_go
= conn_closing
;
3591 return READ_MEMORY_ERROR
;
3593 c
->rcurr
= c
->rbuf
= new_rbuf
;
3597 int avail
= c
->rsize
- c
->rbytes
;
3598 res
= read(c
->sfd
, c
->rbuf
+ c
->rbytes
, avail
);
3600 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3601 c
->thread
->stats
.bytes_read
+= res
;
3602 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3603 gotdata
= READ_DATA_RECEIVED
;
3615 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
3624 static bool update_event(conn
*c
, const int new_flags
) {
3627 struct event_base
*base
= c
->event
.ev_base
;
3628 if (c
->ev_flags
== new_flags
)
3630 if (event_del(&c
->event
) == -1) return false;
3631 event_set(&c
->event
, c
->sfd
, new_flags
, event_handler
, (void *)c
);
3632 event_base_set(base
, &c
->event
);
3633 c
->ev_flags
= new_flags
;
3634 if (event_add(&c
->event
, 0) == -1) return false;
3639 * Sets whether we are listening for new connections or not.
3641 void do_accept_new_conns(const bool do_accept
) {
3644 for (next
= listen_conn
; next
; next
= next
->next
) {
3646 update_event(next
, EV_READ
| EV_PERSIST
);
3647 if (listen(next
->sfd
, settings
.backlog
) != 0) {
3652 update_event(next
, 0);
3653 if (listen(next
->sfd
, 0) != 0) {
3661 stats
.accepting_conns
= true;
3665 stats
.accepting_conns
= false;
3666 stats
.listen_disabled_num
++;
3668 allow_new_conns
= false;
3669 maxconns_handler(-42, 0, 0);
3674 * Transmit the next chunk of data from our list of msgbuf structures.
3677 * TRANSMIT_COMPLETE All done writing.
3678 * TRANSMIT_INCOMPLETE More data remaining to write.
3679 * TRANSMIT_SOFT_ERROR Can't write any more right now.
3680 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3682 static enum transmit_result
transmit(conn
*c
) {
3685 if (c
->msgcurr
< c
->msgused
&&
3686 c
->msglist
[c
->msgcurr
].msg_iovlen
== 0) {
3687 /* Finished writing the current msg; advance to the next. */
3690 if (c
->msgcurr
< c
->msgused
) {
3692 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
3694 res
= sendmsg(c
->sfd
, m
, 0);
3696 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3697 c
->thread
->stats
.bytes_written
+= res
;
3698 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3700 /* We've written some of the data. Remove the completed
3701 iovec entries from the list of pending writes. */
3702 while (m
->msg_iovlen
> 0 && res
>= m
->msg_iov
->iov_len
) {
3703 res
-= m
->msg_iov
->iov_len
;
3708 /* Might have written just part of the last iovec entry;
3709 adjust it so the next write will do the rest. */
3711 m
->msg_iov
->iov_base
= (caddr_t
)m
->msg_iov
->iov_base
+ res
;
3712 m
->msg_iov
->iov_len
-= res
;
3714 return TRANSMIT_INCOMPLETE
;
3716 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3717 if (!update_event(c
, EV_WRITE
| EV_PERSIST
)) {
3718 if (settings
.verbose
> 0)
3719 fprintf(stderr
, "Couldn't update event\n");
3720 conn_set_state(c
, conn_closing
);
3721 return TRANSMIT_HARD_ERROR
;
3723 return TRANSMIT_SOFT_ERROR
;
3725 /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3726 we have a real error, on which we close the connection */
3727 if (settings
.verbose
> 0)
3728 perror("Failed to write, and not due to blocking");
3730 if (IS_UDP(c
->transport
))
3731 conn_set_state(c
, conn_read
);
3733 conn_set_state(c
, conn_closing
);
3734 return TRANSMIT_HARD_ERROR
;
3736 return TRANSMIT_COMPLETE
;
3740 static void drive_machine(conn
*c
) {
3744 struct sockaddr_storage addr
;
3745 int nreqs
= settings
.reqs_per_event
;
3754 case conn_listening
:
3755 addrlen
= sizeof(addr
);
3756 if ((sfd
= accept(c
->sfd
, (struct sockaddr
*)&addr
, &addrlen
)) == -1) {
3757 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
3758 /* these are transient, so don't log anything */
3760 } else if (errno
== EMFILE
) {
3761 if (settings
.verbose
> 0)
3762 fprintf(stderr
, "Too many open connections\n");
3763 accept_new_conns(false);
3771 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
3772 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
3773 perror("setting O_NONBLOCK");
3778 if (settings
.maxconns_fast
&&
3779 stats
.curr_conns
+ stats
.reserved_fds
>= settings
.maxconns
- 1) {
3780 str
= "ERROR Too many open connections\r\n";
3781 res
= write(sfd
, str
, strlen(str
));
3784 stats
.rejected_conns
++;
3787 dispatch_conn_new(sfd
, conn_new_cmd
, EV_READ
| EV_PERSIST
,
3788 DATA_BUFFER_SIZE
, tcp_transport
);
3795 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3796 if (settings
.verbose
> 0)
3797 fprintf(stderr
, "Couldn't update event\n");
3798 conn_set_state(c
, conn_closing
);
3802 conn_set_state(c
, conn_read
);
3807 res
= IS_UDP(c
->transport
) ? try_read_udp(c
) : try_read_network(c
);
3810 case READ_NO_DATA_RECEIVED
:
3811 conn_set_state(c
, conn_waiting
);
3813 case READ_DATA_RECEIVED
:
3814 conn_set_state(c
, conn_parse_cmd
);
3817 conn_set_state(c
, conn_closing
);
3819 case READ_MEMORY_ERROR
: /* Failed to allocate more memory */
3820 /* State already set by try_read_network */
3825 case conn_parse_cmd
:
3826 if (try_read_command(c
) == 0) {
3827 /* wee need more data! */
3828 conn_set_state(c
, conn_waiting
);
3834 /* Only process nreqs at a time to avoid starving other
3839 reset_cmd_handler(c
);
3841 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3842 c
->thread
->stats
.conn_yields
++;
3843 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3844 if (c
->rbytes
> 0) {
3845 /* We have already read in data into the input buffer,
3846 so libevent will most likely not signal read events
3847 on the socket (unless more data is available. As a
3848 hack we should just put in a request to write data,
3849 because that should be possible ;-)
3851 if (!update_event(c
, EV_WRITE
| EV_PERSIST
)) {
3852 if (settings
.verbose
> 0)
3853 fprintf(stderr
, "Couldn't update event\n");
3854 conn_set_state(c
, conn_closing
);
3862 if (c
->rlbytes
== 0) {
3866 /* first check if we have leftovers in the conn_read buffer */
3867 if (c
->rbytes
> 0) {
3868 int tocopy
= c
->rbytes
> c
->rlbytes
? c
->rlbytes
: c
->rbytes
;
3869 if (c
->ritem
!= c
->rcurr
) {
3870 memmove(c
->ritem
, c
->rcurr
, tocopy
);
3873 c
->rlbytes
-= tocopy
;
3875 c
->rbytes
-= tocopy
;
3876 if (c
->rlbytes
== 0) {
3881 /* now try reading from the socket */
3882 res
= read(c
->sfd
, c
->ritem
, c
->rlbytes
);
3884 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3885 c
->thread
->stats
.bytes_read
+= res
;
3886 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3887 if (c
->rcurr
== c
->ritem
) {
3894 if (res
== 0) { /* end of stream */
3895 conn_set_state(c
, conn_closing
);
3898 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3899 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3900 if (settings
.verbose
> 0)
3901 fprintf(stderr
, "Couldn't update event\n");
3902 conn_set_state(c
, conn_closing
);
3908 /* otherwise we have a real error, on which we close the connection */
3909 if (settings
.verbose
> 0) {
3910 fprintf(stderr
, "Failed to read, and not due to blocking:\n"
3912 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3913 errno
, strerror(errno
),
3914 (long)c
->rcurr
, (long)c
->ritem
, (long)c
->rbuf
,
3915 (int)c
->rlbytes
, (int)c
->rsize
);
3917 conn_set_state(c
, conn_closing
);
3921 /* we are reading sbytes and throwing them away */
3922 if (c
->sbytes
== 0) {
3923 conn_set_state(c
, conn_new_cmd
);
3927 /* first check if we have leftovers in the conn_read buffer */
3928 if (c
->rbytes
> 0) {
3929 int tocopy
= c
->rbytes
> c
->sbytes
? c
->sbytes
: c
->rbytes
;
3930 c
->sbytes
-= tocopy
;
3932 c
->rbytes
-= tocopy
;
3936 /* now try reading from the socket */
3937 res
= read(c
->sfd
, c
->rbuf
, c
->rsize
> c
->sbytes
? c
->sbytes
: c
->rsize
);
3939 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3940 c
->thread
->stats
.bytes_read
+= res
;
3941 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3945 if (res
== 0) { /* end of stream */
3946 conn_set_state(c
, conn_closing
);
3949 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3950 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3951 if (settings
.verbose
> 0)
3952 fprintf(stderr
, "Couldn't update event\n");
3953 conn_set_state(c
, conn_closing
);
3959 /* otherwise we have a real error, on which we close the connection */
3960 if (settings
.verbose
> 0)
3961 fprintf(stderr
, "Failed to read, and not due to blocking\n");
3962 conn_set_state(c
, conn_closing
);
3967 * We want to write out a simple response. If we haven't already,
3968 * assemble it into a msgbuf list (this will be a single-entry
3969 * list for TCP or a two-entry list for UDP).
3971 if (c
->iovused
== 0 || (IS_UDP(c
->transport
) && c
->iovused
== 1)) {
3972 if (add_iov(c
, c
->wcurr
, c
->wbytes
) != 0) {
3973 if (settings
.verbose
> 0)
3974 fprintf(stderr
, "Couldn't build response\n");
3975 conn_set_state(c
, conn_closing
);
3980 /* fall through... */
3983 if (IS_UDP(c
->transport
) && c
->msgcurr
== 0 && build_udp_headers(c
) != 0) {
3984 if (settings
.verbose
> 0)
3985 fprintf(stderr
, "Failed to build UDP headers\n");
3986 conn_set_state(c
, conn_closing
);
3989 switch (transmit(c
)) {
3990 case TRANSMIT_COMPLETE
:
3991 if (c
->state
== conn_mwrite
) {
3992 while (c
->ileft
> 0) {
3993 item
*it
= *(c
->icurr
);
3994 assert((it
->it_flags
& ITEM_SLABBED
) == 0);
3999 while (c
->suffixleft
> 0) {
4000 char *suffix
= *(c
->suffixcurr
);
4001 cache_free(c
->thread
->suffix_cache
, suffix
);
4005 /* XXX: I don't know why this wasn't the general case */
4006 if(c
->protocol
== binary_prot
) {
4007 conn_set_state(c
, c
->write_and_go
);
4009 conn_set_state(c
, conn_new_cmd
);
4011 } else if (c
->state
== conn_write
) {
4012 if (c
->write_and_free
) {
4013 free(c
->write_and_free
);
4014 c
->write_and_free
= 0;
4016 conn_set_state(c
, c
->write_and_go
);
4018 if (settings
.verbose
> 0)
4019 fprintf(stderr
, "Unexpected state %d\n", c
->state
);
4020 conn_set_state(c
, conn_closing
);
4024 case TRANSMIT_INCOMPLETE
:
4025 case TRANSMIT_HARD_ERROR
:
4026 break; /* Continue in state machine. */
4028 case TRANSMIT_SOFT_ERROR
:
4035 if (IS_UDP(c
->transport
))
4042 case conn_max_state
:
4051 void event_handler(const int fd
, const short which
, void *arg
) {
4061 if (settings
.verbose
> 0)
4062 fprintf(stderr
, "Catastrophic: event fd doesn't match conn fd!\n");
4069 /* wait for next event */
4073 static int new_socket(struct addrinfo
*ai
) {
4077 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1) {
4081 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
4082 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
4083 perror("setting O_NONBLOCK");
4092 * Sets a socket's send buffer size to the maximum allowed by the system.
4094 static void maximize_sndbuf(const int sfd
) {
4095 socklen_t intsize
= sizeof(int);
4100 /* Start with the default size. */
4101 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0) {
4102 if (settings
.verbose
> 0)
4103 perror("getsockopt(SO_SNDBUF)");
4107 /* Binary-search for the real maximum. */
4109 max
= MAX_SENDBUF_SIZE
;
4111 while (min
<= max
) {
4112 avg
= ((unsigned int)(min
+ max
)) / 2;
4113 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *)&avg
, intsize
) == 0) {
4121 if (settings
.verbose
> 1)
4122 fprintf(stderr
, "<%d send buffer was %d, now %d\n", sfd
, old_size
, last_good
);
4126 * Create a socket and bind it to a specific port number
4127 * @param interface the interface to bind to
4128 * @param port the port number to bind to
4129 * @param transport the transport protocol (TCP / UDP)
4130 * @param portnumber_file A filepointer to write the port numbers to
4131 * when they are successfully added to the list of ports we
4134 static int server_socket(const char *interface
,
4136 enum network_transport transport
,
4137 FILE *portnumber_file
) {
4139 struct linger ling
= {0, 0};
4140 struct addrinfo
*ai
;
4141 struct addrinfo
*next
;
4142 struct addrinfo hints
= { .ai_flags
= AI_PASSIVE
,
4143 .ai_family
= AF_UNSPEC
};
4144 char port_buf
[NI_MAXSERV
];
4149 hints
.ai_socktype
= IS_UDP(transport
) ? SOCK_DGRAM
: SOCK_STREAM
;
4154 snprintf(port_buf
, sizeof(port_buf
), "%d", port
);
4155 error
= getaddrinfo(interface
, port_buf
, &hints
, &ai
);
4157 if (error
!= EAI_SYSTEM
)
4158 fprintf(stderr
, "getaddrinfo(): %s\n", gai_strerror(error
));
4160 perror("getaddrinfo()");
4164 for (next
= ai
; next
; next
= next
->ai_next
) {
4165 conn
*listen_conn_add
;
4166 if ((sfd
= new_socket(next
)) == -1) {
4167 /* getaddrinfo can return "junk" addresses,
4168 * we make sure at least one works before erroring.
4170 if (errno
== EMFILE
) {
4171 /* ...unless we're out of fds */
4172 perror("server_socket");
4179 if (next
->ai_family
== AF_INET6
) {
4180 error
= setsockopt(sfd
, IPPROTO_IPV6
, IPV6_V6ONLY
, (char *) &flags
, sizeof(flags
));
4182 perror("setsockopt");
4189 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
4190 if (IS_UDP(transport
)) {
4191 maximize_sndbuf(sfd
);
4193 error
= setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
));
4195 perror("setsockopt");
4197 error
= setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
4199 perror("setsockopt");
4201 error
= setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
, sizeof(flags
));
4203 perror("setsockopt");
4206 if (bind(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1) {
4207 if (errno
!= EADDRINUSE
) {
4217 if (!IS_UDP(transport
) && listen(sfd
, settings
.backlog
) == -1) {
4223 if (portnumber_file
!= NULL
&&
4224 (next
->ai_addr
->sa_family
== AF_INET
||
4225 next
->ai_addr
->sa_family
== AF_INET6
)) {
4227 struct sockaddr_in in
;
4228 struct sockaddr_in6 in6
;
4230 socklen_t len
= sizeof(my_sockaddr
);
4231 if (getsockname(sfd
, (struct sockaddr
*)&my_sockaddr
, &len
)==0) {
4232 if (next
->ai_addr
->sa_family
== AF_INET
) {
4233 fprintf(portnumber_file
, "%s INET: %u\n",
4234 IS_UDP(transport
) ? "UDP" : "TCP",
4235 ntohs(my_sockaddr
.in
.sin_port
));
4237 fprintf(portnumber_file
, "%s INET6: %u\n",
4238 IS_UDP(transport
) ? "UDP" : "TCP",
4239 ntohs(my_sockaddr
.in6
.sin6_port
));
4245 if (IS_UDP(transport
)) {
4248 for (c
= 0; c
< settings
.num_threads_per_udp
; c
++) {
4249 /* this is guaranteed to hit all threads because we round-robin */
4250 dispatch_conn_new(sfd
, conn_read
, EV_READ
| EV_PERSIST
,
4251 UDP_READ_BUFFER_SIZE
, transport
);
4254 if (!(listen_conn_add
= conn_new(sfd
, conn_listening
,
4255 EV_READ
| EV_PERSIST
, 1,
4256 transport
, main_base
))) {
4257 fprintf(stderr
, "failed to create listening connection\n");
4260 listen_conn_add
->next
= listen_conn
;
4261 listen_conn
= listen_conn_add
;
4267 /* Return zero iff we detected no errors in starting up connections */
4268 return success
== 0;
4271 static int server_sockets(int port
, enum network_transport transport
,
4272 FILE *portnumber_file
) {
4273 if (settings
.inter
== NULL
) {
4274 return server_socket(settings
.inter
, port
, transport
, portnumber_file
);
4276 // tokenize them and bind to each one of them..
4279 char *list
= strdup(settings
.inter
);
4282 fprintf(stderr
, "Failed to allocate memory for parsing server interface string\n");
4285 for (char *p
= strtok_r(list
, ";,", &b
);
4287 p
= strtok_r(NULL
, ";,", &b
)) {
4288 int the_port
= port
;
4289 char *s
= strchr(p
, ':');
4293 if (!safe_strtol(s
, &the_port
)) {
4294 fprintf(stderr
, "Invalid port number: \"%s\"", s
);
4298 if (strcmp(p
, "*") == 0) {
4301 ret
|= server_socket(p
, the_port
, transport
, portnumber_file
);
4308 static int new_socket_unix(void) {
4312 if ((sfd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) == -1) {
4317 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
4318 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
4319 perror("setting O_NONBLOCK");
4326 static int server_socket_unix(const char *path
, int access_mask
) {
4328 struct linger ling
= {0, 0};
4329 struct sockaddr_un addr
;
4338 if ((sfd
= new_socket_unix()) == -1) {
4343 * Clean up a previous socket file if we left it around
4345 if (lstat(path
, &tstat
) == 0) {
4346 if (S_ISSOCK(tstat
.st_mode
))
4350 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
4351 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
));
4352 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
4355 * the memset call clears nonstandard fields in some impementations
4356 * that otherwise mess things up.
4358 memset(&addr
, 0, sizeof(addr
));
4360 addr
.sun_family
= AF_UNIX
;
4361 strncpy(addr
.sun_path
, path
, sizeof(addr
.sun_path
) - 1);
4362 assert(strcmp(addr
.sun_path
, path
) == 0);
4363 old_umask
= umask( ~(access_mask
&0777));
4364 if (bind(sfd
, (struct sockaddr
*)&addr
, sizeof(addr
)) == -1) {
4371 if (listen(sfd
, settings
.backlog
) == -1) {
4376 if (!(listen_conn
= conn_new(sfd
, conn_listening
,
4377 EV_READ
| EV_PERSIST
, 1,
4378 local_transport
, main_base
))) {
4379 fprintf(stderr
, "failed to create listening connection\n");
4387 * We keep the current time of day in a global variable that's updated by a
4388 * timer event. This saves us a bunch of time() system calls (we really only
4389 * need to get the time once a second, whereas there can be tens of thousands
4390 * of requests a second) and allows us to use server-start-relative timestamps
4391 * rather than absolute UNIX timestamps, a space savings on systems where
4392 * sizeof(time_t) > sizeof(unsigned int).
4394 volatile rel_time_t current_time
;
4395 static struct event clockevent
;
4397 /* libevent uses a monotonic clock when available for event scheduling. Aside
4398 * from jitter, simply ticking our internal timer here is accurate enough.
4399 * Note that users who are setting explicit dates for expiration times *must*
4400 * ensure their clocks are correct before starting memcached. */
4401 static void clock_handler(const int fd
, const short which
, void *arg
) {
4402 struct timeval t
= {.tv_sec
= 1, .tv_usec
= 0};
4403 static bool initialized
= false;
4404 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4405 static bool monotonic
= false;
4406 static time_t monotonic_start
;
4410 /* only delete the event if it's actually there. */
4411 evtimer_del(&clockevent
);
4414 /* process_started is initialized to time() - 2. We initialize to 1 so
4415 * flush_all won't underflow during tests. */
4416 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4418 if (clock_gettime(CLOCK_MONOTONIC
, &ts
) == 0) {
4420 monotonic_start
= ts
.tv_sec
- 2;
4425 evtimer_set(&clockevent
, clock_handler
, 0);
4426 event_base_set(main_base
, &clockevent
);
4427 evtimer_add(&clockevent
, &t
);
4429 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4432 if (clock_gettime(CLOCK_MONOTONIC
, &ts
) == -1)
4434 current_time
= (rel_time_t
) (ts
.tv_sec
- monotonic_start
);
4440 gettimeofday(&tv
, NULL
);
4441 current_time
= (rel_time_t
) (tv
.tv_sec
- process_started
);
4445 static void usage(void) {
4446 printf(PACKAGE
" " VERSION
"\n");
4447 printf("-p <num> TCP port number to listen on (default: 11211)\n"
4448 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
4449 "-s <file> UNIX socket path to listen on (disables network support)\n"
4450 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
4451 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
4452 " <addr> may be specified as host:port. If you don't specify\n"
4453 " a port number, the value you specified with -p or -U is\n"
4454 " used. You may specify multiple addresses separated by comma\n"
4455 " or by using -l multiple times\n"
4457 "-d run as a daemon\n"
4458 "-r maximize core file limit\n"
4459 "-u <username> assume identity of <username> (only when run as root)\n"
4460 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
4461 "-M return error on memory exhausted (rather than removing items)\n"
4462 "-c <num> max simultaneous connections (default: 1024)\n"
4463 "-k lock down all paged memory. Note that there is a\n"
4464 " limit on how much memory you may lock. Trying to\n"
4465 " allocate more than that would fail, so be sure you\n"
4466 " set the limit correctly for the user you started\n"
4467 " the daemon with (not for -u <username> user;\n"
4468 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
4469 "-v verbose (print errors/warnings while in event loop)\n"
4470 "-vv very verbose (also print client commands/reponses)\n"
4471 "-vvv extremely verbose (also print internal state transitions)\n"
4472 "-h print this help and exit\n"
4473 "-i print memcached and libevent license\n"
4474 "-P <file> save PID in <file>, only used with -d option\n"
4475 "-f <factor> chunk size growth factor (default: 1.25)\n"
4476 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
4477 printf("-L Try to use large memory pages (if available). Increasing\n"
4478 " the memory page size could reduce the number of TLB misses\n"
4479 " and improve the performance. In order to get large pages\n"
4480 " from the OS, memcached will allocate the total item-cache\n"
4481 " in one large chunk.\n");
4482 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
4483 " This is used for per-prefix stats reporting. The default is\n"
4484 " \":\" (colon). If this option is specified, stats collection\n"
4485 " is turned on automatically; if not, then it may be turned on\n"
4486 " by sending the \"stats detail on\" command to the server.\n");
4487 printf("-t <num> number of threads to use (default: 4)\n");
4488 printf("-R Maximum number of requests per event, limits the number of\n"
4489 " requests process for a given connection to prevent \n"
4490 " starvation (default: 20)\n");
4491 printf("-C Disable use of CAS\n");
4492 printf("-b Set the backlog queue limit (default: 1024)\n");
4493 printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
4494 printf("-I Override the size of each slab page. Adjusts max item size\n"
4495 " (default: 1mb, min: 1k, max: 128m)\n");
4497 printf("-S Turn on Sasl authentication\n");
4499 printf("-o Comma separated list of extended or experimental options\n"
4500 " - (EXPERIMENTAL) maxconns_fast: immediately close new\n"
4501 " connections if over maxconns limit\n"
4502 " - hashpower: An integer multiplier for how large the hash\n"
4503 " table should be. Can be grown at runtime if not big enough.\n"
4504 " Set this based on \"STAT hash_power_level\" before a \n"
4510 static void usage_license(void) {
4511 printf(PACKAGE
" " VERSION
"\n\n");
4513 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
4514 "All rights reserved.\n"
4516 "Redistribution and use in source and binary forms, with or without\n"
4517 "modification, are permitted provided that the following conditions are\n"
4520 " * Redistributions of source code must retain the above copyright\n"
4521 "notice, this list of conditions and the following disclaimer.\n"
4523 " * Redistributions in binary form must reproduce the above\n"
4524 "copyright notice, this list of conditions and the following disclaimer\n"
4525 "in the documentation and/or other materials provided with the\n"
4528 " * Neither the name of the Danga Interactive nor the names of its\n"
4529 "contributors may be used to endorse or promote products derived from\n"
4530 "this software without specific prior written permission.\n"
4532 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
4533 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
4534 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
4535 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
4536 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
4537 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
4538 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4539 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4540 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4541 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
4542 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4545 "This product includes software developed by Niels Provos.\n"
4549 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
4550 "All rights reserved.\n"
4552 "Redistribution and use in source and binary forms, with or without\n"
4553 "modification, are permitted provided that the following conditions\n"
4555 "1. Redistributions of source code must retain the above copyright\n"
4556 " notice, this list of conditions and the following disclaimer.\n"
4557 "2. Redistributions in binary form must reproduce the above copyright\n"
4558 " notice, this list of conditions and the following disclaimer in the\n"
4559 " documentation and/or other materials provided with the distribution.\n"
4560 "3. All advertising materials mentioning features or use of this software\n"
4561 " must display the following acknowledgement:\n"
4562 " This product includes software developed by Niels Provos.\n"
4563 "4. The name of the author may not be used to endorse or promote products\n"
4564 " derived from this software without specific prior written permission.\n"
4566 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
4567 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
4568 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
4569 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
4570 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
4571 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4572 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4573 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4574 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
4575 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4581 static void save_pid(const char *pid_file
) {
4583 if (access(pid_file
, F_OK
) == 0) {
4584 if ((fp
= fopen(pid_file
, "r")) != NULL
) {
4586 if (fgets(buffer
, sizeof(buffer
), fp
) != NULL
) {
4588 if (safe_strtoul(buffer
, &pid
) && kill((pid_t
)pid
, 0) == 0) {
4589 fprintf(stderr
, "WARNING: The pid file contained the following (running) pid: %u\n", pid
);
4596 if ((fp
= fopen(pid_file
, "w")) == NULL
) {
4597 vperror("Could not open the pid file %s for writing", pid_file
);
4601 fprintf(fp
,"%ld\n", (long)getpid());
4602 if (fclose(fp
) == -1) {
4603 vperror("Could not close the pid file %s", pid_file
);
4607 static void remove_pidfile(const char *pid_file
) {
4608 if (pid_file
== NULL
)
4611 if (unlink(pid_file
) != 0) {
4612 vperror("Could not remove the pid file %s", pid_file
);
4617 static void sig_handler(const int sig
) {
4618 printf("SIGINT handled.\n");
4622 #ifndef HAVE_SIGIGNORE
4623 static int sigignore(int sig
) {
4624 struct sigaction sa
= { .sa_handler
= SIG_IGN
, .sa_flags
= 0 };
4626 if (sigemptyset(&sa
.sa_mask
) == -1 || sigaction(sig
, &sa
, 0) == -1) {
4635 * On systems that supports multiple page sizes we may reduce the
4636 * number of TLB-misses by using the biggest available page size
4638 static int enable_large_pages(void) {
4639 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
4642 int avail
= getpagesizes(sizes
, 32);
4644 size_t max
= sizes
[0];
4645 struct memcntl_mha arg
= {0};
4648 for (ii
= 1; ii
< avail
; ++ii
) {
4649 if (max
< sizes
[ii
]) {
4655 arg
.mha_pagesize
= max
;
4656 arg
.mha_cmd
= MHA_MAPSIZE_BSSBRK
;
4658 if (memcntl(0, 0, MC_HAT_ADVISE
, (caddr_t
)&arg
, 0, 0) == -1) {
4659 fprintf(stderr
, "Failed to set large pages: %s\n",
4661 fprintf(stderr
, "Will use default page size\n");
4666 fprintf(stderr
, "Failed to get supported pagesizes: %s\n",
4668 fprintf(stderr
, "Will use default page size\n");
4678 * Do basic sanity check of the runtime environment
4679 * @return true if no errors found, false if we can't use this env
4681 static bool sanitycheck(void) {
4682 /* One of our biggest problems is old and bogus libevents */
4683 const char *ever
= event_get_version();
4685 if (strncmp(ever
, "1.", 2) == 0) {
4686 /* Require at least 1.3 (that's still a couple of years old) */
4687 if ((ever
[2] == '1' || ever
[2] == '2') && !isdigit(ever
[3])) {
4688 fprintf(stderr
, "You are using libevent %s.\nPlease upgrade to"
4689 " a more recent version (1.3 or newer)\n",
4690 event_get_version());
4699 int main (int argc
, char **argv
) {
4701 bool lock_memory
= false;
4702 bool do_daemonize
= false;
4703 bool preallocate
= false;
4705 char *username
= NULL
;
4706 char *pid_file
= NULL
;
4711 int retval
= EXIT_SUCCESS
;
4712 /* listening sockets */
4713 static int *l_socket
= NULL
;
4716 static int *u_socket
= NULL
;
4717 bool protocol_specified
= false;
4718 bool tcp_specified
= false;
4719 bool udp_specified
= false;
4722 char *subopts_value
;
4729 char *const subopts_tokens
[] = {
4730 [MAXCONNS_FAST
] = "maxconns_fast",
4731 [HASHPOWER_INIT
] = "hashpower",
4732 [SLAB_REASSIGN
] = "slab_reassign",
4733 [SLAB_AUTOMOVE
] = "slab_automove",
4737 if (!sanitycheck()) {
4742 signal(SIGINT
, sig_handler
);
4747 /* set stderr non-buffering (for running under, say, daemontools) */
4748 setbuf(stderr
, NULL
);
4750 /* process arguments */
4751 while (-1 != (c
= getopt(argc
, argv
,
4752 "a:" /* access mask for unix socket */
4753 "p:" /* TCP port number to listen on */
4754 "s:" /* unix socket path to listen on */
4755 "U:" /* UDP port number to listen on */
4756 "m:" /* max memory to use for items in megabytes */
4757 "M" /* return error on memory exhausted */
4758 "c:" /* max simultaneous connections */
4759 "k" /* lock down all paged memory */
4760 "hi" /* help, licence info */
4761 "r" /* maximize core file limit */
4763 "d" /* daemon mode */
4764 "l:" /* interface to listen on */
4765 "u:" /* user identity to run as */
4766 "P:" /* save PID in file */
4768 "n:" /* minimum space allocated for key+value+flags */
4770 "D:" /* prefix delimiter? */
4771 "L" /* Large memory pages */
4772 "R:" /* max requests per event */
4773 "C" /* Disable use of CAS */
4774 "b:" /* backlog queue limit */
4775 "B:" /* Binding protocol */
4776 "I:" /* Max item size */
4778 "o:" /* Extended generic options */
4782 /* access for unix domain socket, as octal mask (like chmod)*/
4783 settings
.access
= strtol(optarg
,NULL
,8);
4787 settings
.udpport
= atoi(optarg
);
4788 udp_specified
= true;
4791 settings
.port
= atoi(optarg
);
4792 tcp_specified
= true;
4795 settings
.socketpath
= optarg
;
4798 settings
.maxbytes
= ((size_t)atoi(optarg
)) * 1024 * 1024;
4801 settings
.evict_to_free
= 0;
4804 settings
.maxconns
= atoi(optarg
);
4819 if (settings
.inter
!= NULL
) {
4820 size_t len
= strlen(settings
.inter
) + strlen(optarg
) + 2;
4821 char *p
= malloc(len
);
4823 fprintf(stderr
, "Failed to allocate memory\n");
4826 snprintf(p
, len
, "%s,%s", settings
.inter
, optarg
);
4827 free(settings
.inter
);
4830 settings
.inter
= strdup(optarg
);
4834 do_daemonize
= true;
4840 settings
.reqs_per_event
= atoi(optarg
);
4841 if (settings
.reqs_per_event
== 0) {
4842 fprintf(stderr
, "Number of requests per event must be greater than 0\n");
4853 settings
.factor
= atof(optarg
);
4854 if (settings
.factor
<= 1.0) {
4855 fprintf(stderr
, "Factor must be greater than 1\n");
4860 settings
.chunk_size
= atoi(optarg
);
4861 if (settings
.chunk_size
== 0) {
4862 fprintf(stderr
, "Chunk size must be greater than 0\n");
4867 settings
.num_threads
= atoi(optarg
);
4868 if (settings
.num_threads
<= 0) {
4869 fprintf(stderr
, "Number of threads must be greater than 0\n");
4872 /* There're other problems when you get above 64 threads.
4873 * In the future we should portably detect # of cores for the
4876 if (settings
.num_threads
> 64) {
4877 fprintf(stderr
, "WARNING: Setting a high number of worker"
4878 "threads is not recommended.\n"
4879 " Set this value to the number of cores in"
4880 " your machine or less.\n");
4884 if (! optarg
|| ! optarg
[0]) {
4885 fprintf(stderr
, "No delimiter specified\n");
4888 settings
.prefix_delimiter
= optarg
[0];
4889 settings
.detail_enabled
= 1;
4892 if (enable_large_pages() == 0) {
4895 fprintf(stderr
, "Cannot enable large pages on this system\n"
4896 "(There is no Linux support as of this version)\n");
4901 settings
.use_cas
= false;
4904 settings
.backlog
= atoi(optarg
);
4907 protocol_specified
= true;
4908 if (strcmp(optarg
, "auto") == 0) {
4909 settings
.binding_protocol
= negotiating_prot
;
4910 } else if (strcmp(optarg
, "binary") == 0) {
4911 settings
.binding_protocol
= binary_prot
;
4912 } else if (strcmp(optarg
, "ascii") == 0) {
4913 settings
.binding_protocol
= ascii_prot
;
4915 fprintf(stderr
, "Invalid value for binding protocol: %s\n"
4916 " -- should be one of auto, binary, or ascii\n", optarg
);
4921 unit
= optarg
[strlen(optarg
)-1];
4922 if (unit
== 'k' || unit
== 'm' ||
4923 unit
== 'K' || unit
== 'M') {
4924 optarg
[strlen(optarg
)-1] = '\0';
4925 size_max
= atoi(optarg
);
4926 if (unit
== 'k' || unit
== 'K')
4928 if (unit
== 'm' || unit
== 'M')
4929 size_max
*= 1024 * 1024;
4930 settings
.item_size_max
= size_max
;
4932 settings
.item_size_max
= atoi(optarg
);
4934 if (settings
.item_size_max
< 1024) {
4935 fprintf(stderr
, "Item max size cannot be less than 1024 bytes.\n");
4938 if (settings
.item_size_max
> 1024 * 1024 * 128) {
4939 fprintf(stderr
, "Cannot set item size limit higher than 128 mb.\n");
4942 if (settings
.item_size_max
> 1024 * 1024) {
4943 fprintf(stderr
, "WARNING: Setting item max size above 1MB is not"
4945 " Raising this limit increases the minimum memory requirements\n"
4946 " and will decrease your memory efficiency.\n"
4950 case 'S': /* set Sasl authentication to true. Default is false */
4952 fprintf(stderr
, "This server is not built with SASL support.\n");
4955 settings
.sasl
= true;
4957 case 'o': /* It's sub-opts time! */
4960 while (*subopts
!= '\0') {
4962 switch (getsubopt(&subopts
, subopts_tokens
, &subopts_value
)) {
4964 settings
.maxconns_fast
= true;
4966 case HASHPOWER_INIT
:
4967 if (subopts_value
== NULL
) {
4968 fprintf(stderr
, "Missing numeric argument for hashpower\n");
4971 settings
.hashpower_init
= atoi(subopts_value
);
4972 if (settings
.hashpower_init
< 12) {
4973 fprintf(stderr
, "Initial hashtable multiplier of %d is too low\n",
4974 settings
.hashpower_init
);
4976 } else if (settings
.hashpower_init
> 64) {
4977 fprintf(stderr
, "Initial hashtable multiplier of %d is too high\n"
4978 "Choose a value based on \"STAT hash_power_level\" from a running instance\n",
4979 settings
.hashpower_init
);
4984 settings
.slab_reassign
= true;
4987 if (subopts_value
== NULL
) {
4988 settings
.slab_automove
= 1;
4991 settings
.slab_automove
= atoi(subopts_value
);
4992 if (settings
.slab_automove
< 0 || settings
.slab_automove
> 2) {
4993 fprintf(stderr
, "slab_automove must be between 0 and 2\n");
4998 printf("Illegal suboption \"%s\"\n", subopts_value
);
5005 fprintf(stderr
, "Illegal argument \"%c\"\n", c
);
5011 * Use one workerthread to serve each UDP port if the user specified
5014 if (settings
.inter
!= NULL
&& strchr(settings
.inter
, ',')) {
5015 settings
.num_threads_per_udp
= 1;
5017 settings
.num_threads_per_udp
= settings
.num_threads
;
5020 if (settings
.sasl
) {
5021 if (!protocol_specified
) {
5022 settings
.binding_protocol
= binary_prot
;
5024 if (settings
.binding_protocol
!= binary_prot
) {
5025 fprintf(stderr
, "ERROR: You cannot allow the ASCII protocol while using SASL.\n");
5031 if (tcp_specified
&& !udp_specified
) {
5032 settings
.udpport
= settings
.port
;
5033 } else if (udp_specified
&& !tcp_specified
) {
5034 settings
.port
= settings
.udpport
;
5038 struct rlimit rlim_new
;
5040 * First try raising to infinity; if that fails, try bringing
5041 * the soft limit to the hard.
5043 if (getrlimit(RLIMIT_CORE
, &rlim
) == 0) {
5044 rlim_new
.rlim_cur
= rlim_new
.rlim_max
= RLIM_INFINITY
;
5045 if (setrlimit(RLIMIT_CORE
, &rlim_new
)!= 0) {
5046 /* failed. try raising just to the old max */
5047 rlim_new
.rlim_cur
= rlim_new
.rlim_max
= rlim
.rlim_max
;
5048 (void)setrlimit(RLIMIT_CORE
, &rlim_new
);
5052 * getrlimit again to see what we ended up with. Only fail if
5053 * the soft limit ends up 0, because then no core files will be
5057 if ((getrlimit(RLIMIT_CORE
, &rlim
) != 0) || rlim
.rlim_cur
== 0) {
5058 fprintf(stderr
, "failed to ensure corefile creation\n");
5064 * If needed, increase rlimits to allow as many connections
5068 if (getrlimit(RLIMIT_NOFILE
, &rlim
) != 0) {
5069 fprintf(stderr
, "failed to getrlimit number of files\n");
5072 rlim
.rlim_cur
= settings
.maxconns
;
5073 rlim
.rlim_max
= settings
.maxconns
;
5074 if (setrlimit(RLIMIT_NOFILE
, &rlim
) != 0) {
5075 fprintf(stderr
, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n");
5080 /* lose root privileges if we have them */
5081 if (getuid() == 0 || geteuid() == 0) {
5082 if (username
== 0 || *username
== '\0') {
5083 fprintf(stderr
, "can't run as root without the -u switch\n");
5086 if ((pw
= getpwnam(username
)) == 0) {
5087 fprintf(stderr
, "can't find the user %s to switch to\n", username
);
5090 if (setgid(pw
->pw_gid
) < 0 || setuid(pw
->pw_uid
) < 0) {
5091 fprintf(stderr
, "failed to assume identity of user %s\n", username
);
5096 /* Initialize Sasl if -S was specified */
5097 if (settings
.sasl
) {
5101 /* daemonize if requested */
5102 /* if we want to ensure our ability to dump core, don't chdir to / */
5104 if (sigignore(SIGHUP
) == -1) {
5105 perror("Failed to ignore SIGHUP");
5107 if (daemonize(maxcore
, settings
.verbose
) == -1) {
5108 fprintf(stderr
, "failed to daemon() in order to daemonize\n");
5113 /* lock paged memory if needed */
5115 #ifdef HAVE_MLOCKALL
5116 int res
= mlockall(MCL_CURRENT
| MCL_FUTURE
);
5118 fprintf(stderr
, "warning: -k invalid, mlockall() failed: %s\n",
5122 fprintf(stderr
, "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
5126 /* initialize main thread libevent instance */
5127 main_base
= event_init();
5129 /* initialize other stuff */
5131 assoc_init(settings
.hashpower_init
);
5133 slabs_init(settings
.maxbytes
, settings
.factor
, preallocate
);
5136 * ignore SIGPIPE signals; we can use errno == EPIPE if we
5137 * need that information
5139 if (sigignore(SIGPIPE
) == -1) {
5140 perror("failed to ignore SIGPIPE; sigaction");
5143 /* start up worker threads if MT mode */
5144 thread_init(settings
.num_threads
, main_base
);
5146 if (start_assoc_maintenance_thread() == -1) {
5150 if (settings
.slab_reassign
&&
5151 start_slab_maintenance_thread() == -1) {
5155 /* initialise clock event */
5156 clock_handler(0, 0, 0);
5158 /* create unix mode sockets after dropping privileges */
5159 if (settings
.socketpath
!= NULL
) {
5161 if (server_socket_unix(settings
.socketpath
,settings
.access
)) {
5162 vperror("failed to listen on UNIX socket: %s", settings
.socketpath
);
5167 /* create the listening socket, bind it, and init */
5168 if (settings
.socketpath
== NULL
) {
5169 const char *portnumber_filename
= getenv("MEMCACHED_PORT_FILENAME");
5170 char temp_portnumber_filename
[PATH_MAX
];
5171 FILE *portnumber_file
= NULL
;
5173 if (portnumber_filename
!= NULL
) {
5174 snprintf(temp_portnumber_filename
,
5175 sizeof(temp_portnumber_filename
),
5176 "%s.lck", portnumber_filename
);
5178 portnumber_file
= fopen(temp_portnumber_filename
, "a");
5179 if (portnumber_file
== NULL
) {
5180 fprintf(stderr
, "Failed to open \"%s\": %s\n",
5181 temp_portnumber_filename
, strerror(errno
));
5186 if (settings
.port
&& server_sockets(settings
.port
, tcp_transport
,
5188 vperror("failed to listen on TCP port %d", settings
.port
);
5193 * initialization order: first create the listening sockets
5194 * (may need root on low ports), then drop root if needed,
5195 * then daemonise if needed, then init libevent (in some cases
5196 * descriptors created by libevent wouldn't survive forking).
5199 /* create the UDP listening socket and bind it */
5201 if (settings
.udpport
&& server_sockets(settings
.udpport
, udp_transport
,
5203 vperror("failed to listen on UDP port %d", settings
.udpport
);
5207 if (portnumber_file
) {
5208 fclose(portnumber_file
);
5209 rename(temp_portnumber_filename
, portnumber_filename
);
5213 /* Give the sockets a moment to open. I know this is dumb, but the error
5214 * is only an advisory.
5217 if (stats
.curr_conns
+ stats
.reserved_fds
>= settings
.maxconns
- 1) {
5218 fprintf(stderr
, "Maxconns setting is too low, use -c to increase.\n");
5222 if (pid_file
!= NULL
) {
5226 /* Drop privileges no longer needed */
5229 /* enter the event loop */
5230 if (event_base_loop(main_base
, 0) != 0) {
5231 retval
= EXIT_FAILURE
;
5234 stop_assoc_maintenance_thread();
5236 /* remove the PID file if we're a daemon */
5238 remove_pidfile(pid_file
);
5239 /* Clean up strdup() call for bind() address */
5241 free(settings
.inter
);