1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * memcached - memory caching daemon
5 * http://www.danga.com/memcached/
7 * Copyright 2003 Danga Interactive, Inc. All rights reserved.
9 * Use and distribution licensed under the BSD license. See
10 * the LICENSE file for full text.
13 * Anatoly Vorobey <mellon@pobox.com>
14 * Brad Fitzpatrick <brad@danga.com>
16 #include "memcached.h"
18 #include <sys/socket.h>
21 #include <sys/resource.h>
26 /* some POSIX systems need the following definition
27 * to get mlockall flags out of sys/mman.h. */
28 #ifndef _P1003_1B_VISIBLE
29 #define _P1003_1B_VISIBLE
31 /* need this to get IOV_MAX on some platforms. */
32 #ifndef __need_IOV_MAX
33 #define __need_IOV_MAX
38 #include <netinet/tcp.h>
39 #include <arpa/inet.h>
50 /* FreeBSD 4.x doesn't have IOV_MAX exposed. */
52 #if defined(__FreeBSD__) || defined(__APPLE__)
58 * forward declarations
60 static void drive_machine(conn
*c
);
61 static int new_socket(struct addrinfo
*ai
);
62 static int try_read_command(conn
*c
);
64 enum try_read_result
{
66 READ_NO_DATA_RECEIVED
,
67 READ_ERROR
, /** an error occured (on the socket) (or client closed connection) */
68 READ_MEMORY_ERROR
/** failed to allocate more memory */
71 static enum try_read_result
try_read_network(conn
*c
);
72 static enum try_read_result
try_read_udp(conn
*c
);
74 static void conn_set_state(conn
*c
, enum conn_states state
);
77 static void stats_init(void);
78 static void server_stats(ADD_STAT add_stats
, conn
*c
);
79 static void process_stat_settings(ADD_STAT add_stats
, void *c
);
83 static void settings_init(void);
85 /* event handling, network IO */
86 static void event_handler(const int fd
, const short which
, void *arg
);
87 static void conn_close(conn
*c
);
88 static void conn_init(void);
89 static bool update_event(conn
*c
, const int new_flags
);
90 static void complete_nread(conn
*c
);
91 static void process_command(conn
*c
, char *command
);
92 static void write_and_free(conn
*c
, char *buf
, int bytes
);
93 static int ensure_iov_space(conn
*c
);
94 static int add_iov(conn
*c
, const void *buf
, int len
);
95 static int add_msghdr(conn
*c
);
98 static void conn_free(conn
*c
);
100 /** exported globals **/
102 struct settings settings
;
103 time_t process_started
; /* when the process was started */
105 struct slab_rebalance slab_rebal
;
106 volatile int slab_rebalance_signal
;
108 /** file scope variables **/
109 static conn
*listen_conn
= NULL
;
110 static struct event_base
*main_base
;
112 enum transmit_result
{
113 TRANSMIT_COMPLETE
, /** All done writing. */
114 TRANSMIT_INCOMPLETE
, /** More data remaining to write. */
115 TRANSMIT_SOFT_ERROR
, /** Can't write any more right now. */
116 TRANSMIT_HARD_ERROR
/** Can't write (c->state is set to conn_closing) */
119 static enum transmit_result
transmit(conn
*c
);
121 /* This reduces the latency without adding lots of extra wiring to be able to
122 * notify the listener thread of when to listen again.
123 * Also, the clock timer could be broken out into its own thread and we
124 * can block the listener via a condition.
126 static volatile bool allow_new_conns
= true;
127 static struct event maxconnsevent
;
128 #ifndef __INTEL_COMPILER
129 #pragma GCC diagnostic ignored "-Wunused-parameter"
131 static void maxconns_handler(const int fd
, const short which
, void *arg
) {
132 struct timeval t
= {.tv_sec
= 0, .tv_usec
= 10000};
134 if (fd
== -42 || allow_new_conns
== false) {
135 /* reschedule in 10ms if we need to keep polling */
136 evtimer_set(&maxconnsevent
, maxconns_handler
, 0);
137 event_base_set(main_base
, &maxconnsevent
);
138 evtimer_add(&maxconnsevent
, &t
);
140 evtimer_del(&maxconnsevent
);
141 accept_new_conns(true);
145 #define REALTIME_MAXDELTA 60*60*24*30
148 * given time value that's either unix time or delta from current unix time, return
149 * unix time. Use the fact that delta can't exceed one month (and real time value can't
152 static rel_time_t
realtime(const time_t exptime
) {
153 /* no. of seconds in 30 days - largest possible delta exptime */
155 if (exptime
== 0) return 0; /* 0 means never expire */
157 if (exptime
> REALTIME_MAXDELTA
) {
158 /* if item expiration is at/before the server started, give it an
159 expiration time of 1 second after the server started.
160 (because 0 means don't expire). without this, we'd
161 underflow and wrap around to some large value way in the
162 future, effectively making items expiring in the past
163 really expiring never */
164 if (exptime
<= process_started
)
165 return (rel_time_t
)1;
166 return (rel_time_t
)(exptime
- process_started
);
168 return (rel_time_t
)(exptime
+ current_time
);
172 static void stats_init(void) {
173 stats
.curr_items
= stats
.total_items
= stats
.curr_conns
= stats
.total_conns
= stats
.conn_structs
= 0;
174 stats
.get_cmds
= stats
.set_cmds
= stats
.get_hits
= stats
.get_misses
= stats
.evictions
= stats
.reclaimed
= 0;
175 stats
.touch_cmds
= stats
.touch_misses
= stats
.touch_hits
= stats
.rejected_conns
= 0;
176 stats
.curr_bytes
= stats
.listen_disabled_num
= 0;
177 stats
.hash_power_level
= stats
.hash_bytes
= stats
.hash_is_expanding
= 0;
178 stats
.expired_unfetched
= stats
.evicted_unfetched
= 0;
179 stats
.slabs_moved
= 0;
180 stats
.accepting_conns
= true; /* assuming we start in this state. */
181 stats
.slab_reassign_running
= false;
183 /* make the time we started always be 2 seconds before we really
184 did, so time(0) - time.started is never zero. if so, things
185 like 'settings.oldest_live' which act as booleans as well as
186 values are now false in boolean context... */
187 process_started
= time(0) - 2;
191 static void stats_reset(void) {
193 stats
.total_items
= stats
.total_conns
= 0;
194 stats
.rejected_conns
= 0;
197 stats
.listen_disabled_num
= 0;
198 stats_prefix_clear();
200 threadlocal_stats_reset();
204 static void settings_init(void) {
205 settings
.use_cas
= true;
206 settings
.access
= 0700;
207 settings
.port
= 11211;
208 settings
.udpport
= 11211;
209 /* By default this string should be NULL for getaddrinfo() */
210 settings
.inter
= NULL
;
211 settings
.maxbytes
= 64 * 1024 * 1024; /* default is 64MB */
212 settings
.maxconns
= 1024; /* to limit connections-related memory to about 5MB */
213 settings
.verbose
= 0;
214 settings
.oldest_live
= 0;
215 settings
.evict_to_free
= 1; /* push old items out of cache when memory runs out */
216 settings
.socketpath
= NULL
; /* by default, not using a unix socket */
217 settings
.factor
= 1.25;
218 settings
.chunk_size
= 48; /* space for a modest key and value */
219 settings
.num_threads
= 4; /* N workers */
220 settings
.num_threads_per_udp
= 0;
221 settings
.prefix_delimiter
= ':';
222 settings
.detail_enabled
= 0;
223 settings
.reqs_per_event
= 20;
224 settings
.backlog
= 1024;
225 settings
.binding_protocol
= negotiating_prot
;
226 settings
.item_size_max
= 1024 * 1024; /* The famous 1MB upper limit. */
227 settings
.maxconns_fast
= false;
228 settings
.hashpower_init
= 0;
229 settings
.slab_reassign
= false;
230 settings
.slab_automove
= false;
234 * Adds a message header to a connection.
236 * Returns 0 on success, -1 on out-of-memory.
238 static int add_msghdr(conn
*c
)
244 if (c
->msgsize
== c
->msgused
) {
245 msg
= realloc(c
->msglist
, c
->msgsize
* 2 * sizeof(struct msghdr
));
252 msg
= c
->msglist
+ c
->msgused
;
254 /* this wipes msg_iovlen, msg_control, msg_controllen, and
255 msg_flags, the last 3 of which aren't defined on solaris: */
256 memset(msg
, 0, sizeof(struct msghdr
));
258 msg
->msg_iov
= &c
->iov
[c
->iovused
];
260 if (c
->request_addr_size
> 0) {
261 msg
->msg_name
= &c
->request_addr
;
262 msg
->msg_namelen
= c
->request_addr_size
;
268 if (IS_UDP(c
->transport
)) {
269 /* Leave room for the UDP header, which we'll fill in later. */
270 return add_iov(c
, NULL
, UDP_HEADER_SIZE
);
278 * Free list management for connections.
281 static conn
**freeconns
;
282 static int freetotal
;
284 /* Lock for connection freelist */
285 static pthread_mutex_t conn_lock
= PTHREAD_MUTEX_INITIALIZER
;
288 static void conn_init(void) {
291 if ((freeconns
= calloc(freetotal
, sizeof(conn
*))) == NULL
) {
292 fprintf(stderr
, "Failed to allocate connection structures\n");
298 * Returns a connection from the freelist, if any.
300 conn
*conn_from_freelist() {
303 pthread_mutex_lock(&conn_lock
);
305 c
= freeconns
[--freecurr
];
309 pthread_mutex_unlock(&conn_lock
);
315 * Adds a connection to the freelist. 0 = success.
317 bool conn_add_to_freelist(conn
*c
) {
319 pthread_mutex_lock(&conn_lock
);
320 if (freecurr
< freetotal
) {
321 freeconns
[freecurr
++] = c
;
324 /* try to enlarge free connections array */
325 size_t newsize
= freetotal
* 2;
326 conn
**new_freeconns
= realloc(freeconns
, sizeof(conn
*) * newsize
);
329 freeconns
= new_freeconns
;
330 freeconns
[freecurr
++] = c
;
334 pthread_mutex_unlock(&conn_lock
);
338 static const char *prot_text(enum protocol prot
) {
339 const char *rv
= "unknown";
347 case negotiating_prot
:
348 rv
= "auto-negotiate";
356 conn
*conn_new(const int sfd
, enum conn_states init_state
,
357 const int event_flags
,
358 const int read_buffer_size
, enum network_transport transport
,
359 struct event_base
*base
) {
360 conn
*c
= conn_from_freelist();
363 if (!(c
= (conn
*)calloc(1, sizeof(conn
)))) {
364 fprintf(stderr
, "calloc()\n");
367 MEMCACHED_CONN_CREATE(c
);
369 c
->rbuf
= c
->wbuf
= 0;
376 c
->rsize
= read_buffer_size
;
377 c
->wsize
= DATA_BUFFER_SIZE
;
378 c
->isize
= ITEM_LIST_INITIAL
;
379 c
->suffixsize
= SUFFIX_LIST_INITIAL
;
380 c
->iovsize
= IOV_LIST_INITIAL
;
381 c
->msgsize
= MSG_LIST_INITIAL
;
384 c
->rbuf
= (char *)malloc((size_t)c
->rsize
);
385 c
->wbuf
= (char *)malloc((size_t)c
->wsize
);
386 c
->ilist
= (item
**)malloc(sizeof(item
*) * c
->isize
);
387 c
->suffixlist
= (char **)malloc(sizeof(char *) * c
->suffixsize
);
388 c
->iov
= (struct iovec
*)malloc(sizeof(struct iovec
) * c
->iovsize
);
389 c
->msglist
= (struct msghdr
*)malloc(sizeof(struct msghdr
) * c
->msgsize
);
391 if (c
->rbuf
== 0 || c
->wbuf
== 0 || c
->ilist
== 0 || c
->iov
== 0 ||
392 c
->msglist
== 0 || c
->suffixlist
== 0) {
394 fprintf(stderr
, "malloc()\n");
399 stats
.conn_structs
++;
403 c
->transport
= transport
;
404 c
->protocol
= settings
.binding_protocol
;
406 /* unix socket mode doesn't need this, so zeroed out. but why
407 * is this done for every command? presumably for UDP
409 if (!settings
.socketpath
) {
410 c
->request_addr_size
= sizeof(c
->request_addr
);
412 c
->request_addr_size
= 0;
415 if (settings
.verbose
> 1) {
416 if (init_state
== conn_listening
) {
417 fprintf(stderr
, "<%d server listening (%s)\n", sfd
,
418 prot_text(c
->protocol
));
419 } else if (IS_UDP(transport
)) {
420 fprintf(stderr
, "<%d server listening (udp)\n", sfd
);
421 } else if (c
->protocol
== negotiating_prot
) {
422 fprintf(stderr
, "<%d new auto-negotiating client connection\n",
424 } else if (c
->protocol
== ascii_prot
) {
425 fprintf(stderr
, "<%d new ascii client connection.\n", sfd
);
426 } else if (c
->protocol
== binary_prot
) {
427 fprintf(stderr
, "<%d new binary client connection.\n", sfd
);
429 fprintf(stderr
, "<%d new unknown (%d) client connection\n",
436 c
->state
= init_state
;
439 c
->rbytes
= c
->wbytes
= 0;
444 c
->suffixcurr
= c
->suffixlist
;
451 c
->write_and_go
= init_state
;
452 c
->write_and_free
= 0;
457 event_set(&c
->event
, sfd
, event_flags
, event_handler
, (void *)c
);
458 event_base_set(base
, &c
->event
);
459 c
->ev_flags
= event_flags
;
461 if (event_add(&c
->event
, 0) == -1) {
462 if (conn_add_to_freelist(c
)) {
474 MEMCACHED_CONN_ALLOCATE(c
->sfd
);
479 static void conn_cleanup(conn
*c
) {
483 item_remove(c
->item
);
488 for (; c
->ileft
> 0; c
->ileft
--,c
->icurr
++) {
489 item_remove(*(c
->icurr
));
493 if (c
->suffixleft
!= 0) {
494 for (; c
->suffixleft
> 0; c
->suffixleft
--, c
->suffixcurr
++) {
495 cache_free(c
->thread
->suffix_cache
, *(c
->suffixcurr
));
499 if (c
->write_and_free
) {
500 free(c
->write_and_free
);
501 c
->write_and_free
= 0;
505 assert(settings
.sasl
);
506 sasl_dispose(&c
->sasl_conn
);
510 if (IS_UDP(c
->transport
)) {
511 conn_set_state(c
, conn_read
);
516 * Frees a connection.
518 void conn_free(conn
*c
) {
520 MEMCACHED_CONN_DESTROY(c
);
539 static void conn_close(conn
*c
) {
542 /* delete the event, the socket and the conn */
543 event_del(&c
->event
);
545 if (settings
.verbose
> 1)
546 fprintf(stderr
, "<%d connection closed.\n", c
->sfd
);
548 MEMCACHED_CONN_RELEASE(c
->sfd
);
550 pthread_mutex_lock(&conn_lock
);
551 allow_new_conns
= true;
552 pthread_mutex_unlock(&conn_lock
);
555 /* if the connection has big buffers, just free it */
556 if (c
->rsize
> READ_BUFFER_HIGHWAT
|| conn_add_to_freelist(c
)) {
568 * Shrinks a connection's buffers if they're too big. This prevents
569 * periodic large "get" requests from permanently chewing lots of server
572 * This should only be called in between requests since it can wipe output
575 static void conn_shrink(conn
*c
) {
578 if (IS_UDP(c
->transport
))
581 if (c
->rsize
> READ_BUFFER_HIGHWAT
&& c
->rbytes
< DATA_BUFFER_SIZE
) {
584 if (c
->rcurr
!= c
->rbuf
)
585 memmove(c
->rbuf
, c
->rcurr
, (size_t)c
->rbytes
);
587 newbuf
= (char *)realloc((void *)c
->rbuf
, DATA_BUFFER_SIZE
);
591 c
->rsize
= DATA_BUFFER_SIZE
;
593 /* TODO check other branch... */
597 if (c
->isize
> ITEM_LIST_HIGHWAT
) {
598 item
**newbuf
= (item
**) realloc((void *)c
->ilist
, ITEM_LIST_INITIAL
* sizeof(c
->ilist
[0]));
601 c
->isize
= ITEM_LIST_INITIAL
;
603 /* TODO check error condition? */
606 if (c
->msgsize
> MSG_LIST_HIGHWAT
) {
607 struct msghdr
*newbuf
= (struct msghdr
*) realloc((void *)c
->msglist
, MSG_LIST_INITIAL
* sizeof(c
->msglist
[0]));
610 c
->msgsize
= MSG_LIST_INITIAL
;
612 /* TODO check error condition? */
615 if (c
->iovsize
> IOV_LIST_HIGHWAT
) {
616 struct iovec
*newbuf
= (struct iovec
*) realloc((void *)c
->iov
, IOV_LIST_INITIAL
* sizeof(c
->iov
[0]));
619 c
->iovsize
= IOV_LIST_INITIAL
;
621 /* TODO check return value */
626 * Convert a state name to a human readable form.
628 static const char *state_text(enum conn_states state
) {
629 const char* const statenames
[] = { "conn_listening",
639 return statenames
[state
];
642 #ifndef __INTEL_COMPILER
643 #pragma GCC diagnostic ignored "-Wtype-limits"
646 * Sets a connection's current state in the state machine. Any special
647 * processing that needs to happen on certain state transitions can
650 static void conn_set_state(conn
*c
, enum conn_states state
) {
652 assert(state
>= conn_listening
&& state
< conn_max_state
);
654 if (state
!= c
->state
) {
655 if (settings
.verbose
> 2) {
656 fprintf(stderr
, "%d: going from %s to %s\n",
657 c
->sfd
, state_text(c
->state
),
661 if (state
== conn_write
|| state
== conn_mwrite
) {
662 MEMCACHED_PROCESS_COMMAND_END(c
->sfd
, c
->wbuf
, c
->wbytes
);
669 * Ensures that there is room for another struct iovec in a connection's
672 * Returns 0 on success, -1 on out-of-memory.
674 static int ensure_iov_space(conn
*c
) {
677 if (c
->iovused
>= c
->iovsize
) {
679 struct iovec
*new_iov
= (struct iovec
*)realloc(c
->iov
,
680 (c
->iovsize
* 2) * sizeof(struct iovec
));
686 /* Point all the msghdr structures at the new list. */
687 for (i
= 0, iovnum
= 0; i
< c
->msgused
; i
++) {
688 c
->msglist
[i
].msg_iov
= &c
->iov
[iovnum
];
689 iovnum
+= c
->msglist
[i
].msg_iovlen
;
698 * Adds data to the list of pending data that will be written out to a
701 * Returns 0 on success, -1 on out-of-memory.
704 static int add_iov(conn
*c
, const void *buf
, int len
) {
712 m
= &c
->msglist
[c
->msgused
- 1];
715 * Limit UDP packets, and the first payloads of TCP replies, to
716 * UDP_MAX_PAYLOAD_SIZE bytes.
718 limit_to_mtu
= IS_UDP(c
->transport
) || (1 == c
->msgused
);
720 /* We may need to start a new msghdr if this one is full. */
721 if (m
->msg_iovlen
== IOV_MAX
||
722 (limit_to_mtu
&& c
->msgbytes
>= UDP_MAX_PAYLOAD_SIZE
)) {
724 m
= &c
->msglist
[c
->msgused
- 1];
727 if (ensure_iov_space(c
) != 0)
730 /* If the fragment is too big to fit in the datagram, split it up */
731 if (limit_to_mtu
&& len
+ c
->msgbytes
> UDP_MAX_PAYLOAD_SIZE
) {
732 leftover
= len
+ c
->msgbytes
- UDP_MAX_PAYLOAD_SIZE
;
738 m
= &c
->msglist
[c
->msgused
- 1];
739 m
->msg_iov
[m
->msg_iovlen
].iov_base
= (void *)buf
;
740 m
->msg_iov
[m
->msg_iovlen
].iov_len
= len
;
746 buf
= ((char *)buf
) + len
;
748 } while (leftover
> 0);
755 * Constructs a set of UDP headers and attaches them to the outgoing messages.
757 static int build_udp_headers(conn
*c
) {
763 if (c
->msgused
> c
->hdrsize
) {
766 new_hdrbuf
= realloc(c
->hdrbuf
, c
->msgused
* 2 * UDP_HEADER_SIZE
);
768 new_hdrbuf
= malloc(c
->msgused
* 2 * UDP_HEADER_SIZE
);
771 c
->hdrbuf
= (unsigned char *)new_hdrbuf
;
772 c
->hdrsize
= c
->msgused
* 2;
776 for (i
= 0; i
< c
->msgused
; i
++) {
777 c
->msglist
[i
].msg_iov
[0].iov_base
= (void*)hdr
;
778 c
->msglist
[i
].msg_iov
[0].iov_len
= UDP_HEADER_SIZE
;
779 *hdr
++ = c
->request_id
/ 256;
780 *hdr
++ = c
->request_id
% 256;
783 *hdr
++ = c
->msgused
/ 256;
784 *hdr
++ = c
->msgused
% 256;
787 assert((void *) hdr
== (caddr_t
)c
->msglist
[i
].msg_iov
[0].iov_base
+ UDP_HEADER_SIZE
);
794 #ifndef __INTEL_COMPILER
795 #pragma GCC diagnostic ignored "-Wsign-compare"
797 static void out_string(conn
*c
, const char *str
) {
803 if (settings
.verbose
> 1)
804 fprintf(stderr
, ">%d NOREPLY %s\n", c
->sfd
, str
);
806 conn_set_state(c
, conn_new_cmd
);
810 if (settings
.verbose
> 1)
811 fprintf(stderr
, ">%d %s\n", c
->sfd
, str
);
813 /* Nuke a partial output... */
820 if ((len
+ 2) > c
->wsize
) {
821 /* ought to be always enough. just fail for simplicity */
822 str
= "SERVER_ERROR output line too long";
826 memcpy(c
->wbuf
, str
, len
);
827 memcpy(c
->wbuf
+ len
, "\r\n", 2);
831 conn_set_state(c
, conn_write
);
832 c
->write_and_go
= conn_new_cmd
;
837 * we get here after reading the value in set/add/replace commands. The command
838 * has been stored in c->cmd, and the item is ready in c->item.
840 static void complete_nread_ascii(conn
*c
) {
845 enum store_item_type ret
;
847 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
848 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].set_cmds
++;
849 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
851 if (strncmp(ITEM_data(it
) + it
->nbytes
- 2, "\r\n", 2) != 0) {
852 out_string(c
, "CLIENT_ERROR bad data chunk");
854 ret
= store_item(it
, comm
, c
);
857 uint64_t cas
= ITEM_get_cas(it
);
860 MEMCACHED_COMMAND_ADD(c
->sfd
, ITEM_key(it
), it
->nkey
,
861 (ret
== 1) ? it
->nbytes
: -1, cas
);
864 MEMCACHED_COMMAND_REPLACE(c
->sfd
, ITEM_key(it
), it
->nkey
,
865 (ret
== 1) ? it
->nbytes
: -1, cas
);
868 MEMCACHED_COMMAND_APPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
869 (ret
== 1) ? it
->nbytes
: -1, cas
);
872 MEMCACHED_COMMAND_PREPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
873 (ret
== 1) ? it
->nbytes
: -1, cas
);
876 MEMCACHED_COMMAND_SET(c
->sfd
, ITEM_key(it
), it
->nkey
,
877 (ret
== 1) ? it
->nbytes
: -1, cas
);
880 MEMCACHED_COMMAND_CAS(c
->sfd
, ITEM_key(it
), it
->nkey
, it
->nbytes
,
888 out_string(c
, "STORED");
891 out_string(c
, "EXISTS");
894 out_string(c
, "NOT_FOUND");
897 out_string(c
, "NOT_STORED");
900 out_string(c
, "SERVER_ERROR Unhandled storage type.");
905 item_remove(c
->item
); /* release the c->item reference */
910 * get a pointer to the start of the request struct for the current command
912 static void* binary_get_request(conn
*c
) {
913 char *ret
= c
->rcurr
;
914 ret
-= (sizeof(c
->binary_header
) + c
->binary_header
.request
.keylen
+
915 c
->binary_header
.request
.extlen
);
917 assert(ret
>= c
->rbuf
);
922 * get a pointer to the key in this request
924 static char* binary_get_key(conn
*c
) {
925 return c
->rcurr
- (c
->binary_header
.request
.keylen
);
928 static void add_bin_header(conn
*c
, uint16_t err
, uint8_t hdr_len
, uint16_t key_len
, uint32_t body_len
) {
929 protocol_binary_response_header
* header
;
936 if (add_msghdr(c
) != 0) {
937 /* XXX: out_string is inappropriate here */
938 out_string(c
, "SERVER_ERROR out of memory");
942 header
= (protocol_binary_response_header
*)c
->wbuf
;
944 header
->response
.magic
= (uint8_t)PROTOCOL_BINARY_RES
;
945 header
->response
.opcode
= c
->binary_header
.request
.opcode
;
946 header
->response
.keylen
= (uint16_t)htons(key_len
);
948 header
->response
.extlen
= (uint8_t)hdr_len
;
949 header
->response
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
;
950 header
->response
.status
= (uint16_t)htons(err
);
952 header
->response
.bodylen
= htonl(body_len
);
953 header
->response
.opaque
= c
->opaque
;
954 header
->response
.cas
= htonll(c
->cas
);
956 if (settings
.verbose
> 1) {
958 fprintf(stderr
, ">%d Writing bin response:", c
->sfd
);
959 for (ii
= 0; ii
< sizeof(header
->bytes
); ++ii
) {
961 fprintf(stderr
, "\n>%d ", c
->sfd
);
963 fprintf(stderr
, " 0x%02x", header
->bytes
[ii
]);
965 fprintf(stderr
, "\n");
968 add_iov(c
, c
->wbuf
, sizeof(header
->response
));
971 static void write_bin_error(conn
*c
, protocol_binary_response_status err
, int swallow
) {
972 const char *errstr
= "Unknown error";
976 case PROTOCOL_BINARY_RESPONSE_ENOMEM
:
977 errstr
= "Out of memory";
979 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
:
980 errstr
= "Unknown command";
982 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
:
983 errstr
= "Not found";
985 case PROTOCOL_BINARY_RESPONSE_EINVAL
:
986 errstr
= "Invalid arguments";
988 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
:
989 errstr
= "Data exists for key.";
991 case PROTOCOL_BINARY_RESPONSE_E2BIG
:
992 errstr
= "Too large.";
994 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL
:
995 errstr
= "Non-numeric server-side value for incr or decr";
997 case PROTOCOL_BINARY_RESPONSE_NOT_STORED
:
998 errstr
= "Not stored.";
1000 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
:
1001 errstr
= "Auth failure.";
1005 errstr
= "UNHANDLED ERROR";
1006 fprintf(stderr
, ">%d UNHANDLED ERROR: %d\n", c
->sfd
, err
);
1009 if (settings
.verbose
> 1) {
1010 fprintf(stderr
, ">%d Writing an error: %s\n", c
->sfd
, errstr
);
1013 len
= strlen(errstr
);
1014 add_bin_header(c
, err
, 0, 0, len
);
1016 add_iov(c
, errstr
, len
);
1018 conn_set_state(c
, conn_mwrite
);
1020 c
->sbytes
= swallow
;
1021 c
->write_and_go
= conn_swallow
;
1023 c
->write_and_go
= conn_new_cmd
;
1027 /* Form and send a response to a command over the binary protocol */
1028 static void write_bin_response(conn
*c
, const void *d
, int hlen
, int keylen
, int dlen
) {
1029 if (!c
->noreply
|| c
->cmd
== PROTOCOL_BINARY_CMD_GET
||
1030 c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1031 add_bin_header(c
, 0, hlen
, keylen
, dlen
);
1033 add_iov(c
, d
, dlen
);
1035 conn_set_state(c
, conn_mwrite
);
1036 c
->write_and_go
= conn_new_cmd
;
1038 conn_set_state(c
, conn_new_cmd
);
1042 static void complete_incr_bin(conn
*c
) {
1046 /* Weird magic in add_delta forces me to pad here */
1047 char tmpbuf
[INCR_MAX_STORAGE_LEN
];
1050 protocol_binary_response_incr
* rsp
= (protocol_binary_response_incr
*)c
->wbuf
;
1051 protocol_binary_request_incr
* req
= binary_get_request(c
);
1054 assert(c
->wsize
>= sizeof(*rsp
));
1056 /* fix byteorder in the request */
1057 req
->message
.body
.delta
= ntohll(req
->message
.body
.delta
);
1058 req
->message
.body
.initial
= ntohll(req
->message
.body
.initial
);
1059 req
->message
.body
.expiration
= ntohl(req
->message
.body
.expiration
);
1060 key
= binary_get_key(c
);
1061 nkey
= c
->binary_header
.request
.keylen
;
1063 if (settings
.verbose
> 1) {
1065 fprintf(stderr
, "incr ");
1067 for (i
= 0; i
< nkey
; i
++) {
1068 fprintf(stderr
, "%c", key
[i
]);
1070 fprintf(stderr
, " %lld, %llu, %d\n",
1071 (long long)req
->message
.body
.delta
,
1072 (long long)req
->message
.body
.initial
,
1073 req
->message
.body
.expiration
);
1076 if (c
->binary_header
.request
.cas
!= 0) {
1077 cas
= c
->binary_header
.request
.cas
;
1079 switch(add_delta(c
, key
, nkey
, c
->cmd
== PROTOCOL_BINARY_CMD_INCREMENT
,
1080 req
->message
.body
.delta
, tmpbuf
,
1083 rsp
->message
.body
.value
= htonll(strtoull(tmpbuf
, NULL
, 10));
1087 write_bin_response(c
, &rsp
->message
.body
, 0, 0,
1088 sizeof(rsp
->message
.body
.value
));
1091 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL
, 0);
1094 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1096 case DELTA_ITEM_NOT_FOUND
:
1097 if (req
->message
.body
.expiration
!= 0xffffffff) {
1098 /* Save some room for the response */
1099 rsp
->message
.body
.value
= htonll(req
->message
.body
.initial
);
1100 it
= item_alloc(key
, nkey
, 0, realtime(req
->message
.body
.expiration
),
1101 INCR_MAX_STORAGE_LEN
);
1104 snprintf(ITEM_data(it
), INCR_MAX_STORAGE_LEN
, "%llu",
1105 (unsigned long long)req
->message
.body
.initial
);
1107 if (store_item(it
, NREAD_ADD
, c
)) {
1108 c
->cas
= ITEM_get_cas(it
);
1109 write_bin_response(c
, &rsp
->message
.body
, 0, 0, sizeof(rsp
->message
.body
.value
));
1111 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_NOT_STORED
, 0);
1113 item_remove(it
); /* release our reference */
1115 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1118 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1119 if (c
->cmd
== PROTOCOL_BINARY_CMD_INCREMENT
) {
1120 c
->thread
->stats
.incr_misses
++;
1122 c
->thread
->stats
.decr_misses
++;
1124 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1126 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1129 case DELTA_ITEM_CAS_MISMATCH
:
1130 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
1135 static void complete_update_bin(conn
*c
) {
1136 protocol_binary_response_status eno
= PROTOCOL_BINARY_RESPONSE_EINVAL
;
1137 enum store_item_type ret
= NOT_STORED
;
1142 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1143 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].set_cmds
++;
1144 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1146 /* We don't actually receive the trailing two characters in the bin
1147 * protocol, so we're going to just set them here */
1148 *(ITEM_data(it
) + it
->nbytes
- 2) = '\r';
1149 *(ITEM_data(it
) + it
->nbytes
- 1) = '\n';
1151 ret
= store_item(it
, c
->cmd
, c
);
1153 #ifdef ENABLE_DTRACE
1154 uint64_t cas
= ITEM_get_cas(it
);
1157 MEMCACHED_COMMAND_ADD(c
->sfd
, ITEM_key(it
), it
->nkey
,
1158 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1161 MEMCACHED_COMMAND_REPLACE(c
->sfd
, ITEM_key(it
), it
->nkey
,
1162 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1165 MEMCACHED_COMMAND_APPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
1166 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1169 MEMCACHED_COMMAND_PREPEND(c
->sfd
, ITEM_key(it
), it
->nkey
,
1170 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1173 MEMCACHED_COMMAND_SET(c
->sfd
, ITEM_key(it
), it
->nkey
,
1174 (ret
== STORED
) ? it
->nbytes
: -1, cas
);
1182 write_bin_response(c
, NULL
, 0, 0, 0);
1185 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
1188 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1191 if (c
->cmd
== NREAD_ADD
) {
1192 eno
= PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
;
1193 } else if(c
->cmd
== NREAD_REPLACE
) {
1194 eno
= PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
;
1196 eno
= PROTOCOL_BINARY_RESPONSE_NOT_STORED
;
1198 write_bin_error(c
, eno
, 0);
1201 item_remove(c
->item
); /* release the c->item reference */
1205 static void process_bin_touch(conn
*c
) {
1208 protocol_binary_response_get
* rsp
= (protocol_binary_response_get
*)c
->wbuf
;
1209 char* key
= binary_get_key(c
);
1210 size_t nkey
= c
->binary_header
.request
.keylen
;
1211 protocol_binary_request_touch
*t
= (void *)&c
->binary_header
;
1212 uint32_t exptime
= ntohl(t
->message
.body
.expiration
);
1214 if (settings
.verbose
> 1) {
1216 /* May be GAT/GATQ/etc */
1217 fprintf(stderr
, "<%d TOUCH ", c
->sfd
);
1218 for (ii
= 0; ii
< nkey
; ++ii
) {
1219 fprintf(stderr
, "%c", key
[ii
]);
1221 fprintf(stderr
, "\n");
1224 it
= item_touch(key
, nkey
, realtime(exptime
));
1227 /* the length has two unnecessary bytes ("\r\n") */
1228 uint16_t keylen
= 0;
1229 uint32_t bodylen
= sizeof(rsp
->message
.body
) + (it
->nbytes
- 2);
1232 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1233 c
->thread
->stats
.touch_cmds
++;
1234 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].touch_hits
++;
1235 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1237 MEMCACHED_COMMAND_TOUCH(c
->sfd
, ITEM_key(it
), it
->nkey
,
1238 it
->nbytes
, ITEM_get_cas(it
));
1240 if (c
->cmd
== PROTOCOL_BINARY_CMD_TOUCH
) {
1241 bodylen
-= it
->nbytes
- 2;
1242 } else if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1247 add_bin_header(c
, 0, sizeof(rsp
->message
.body
), keylen
, bodylen
);
1248 rsp
->message
.header
.response
.cas
= htonll(ITEM_get_cas(it
));
1251 rsp
->message
.body
.flags
= htonl(strtoul(ITEM_suffix(it
), NULL
, 10));
1252 add_iov(c
, &rsp
->message
.body
, sizeof(rsp
->message
.body
));
1254 if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1255 add_iov(c
, ITEM_key(it
), nkey
);
1258 /* Add the data minus the CRLF */
1259 if (c
->cmd
!= PROTOCOL_BINARY_CMD_TOUCH
) {
1260 add_iov(c
, ITEM_data(it
), it
->nbytes
- 2);
1263 conn_set_state(c
, conn_mwrite
);
1264 c
->write_and_go
= conn_new_cmd
;
1265 /* Remember this command so we can garbage collect it later */
1268 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1269 c
->thread
->stats
.touch_cmds
++;
1270 c
->thread
->stats
.touch_misses
++;
1271 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1273 MEMCACHED_COMMAND_TOUCH(c
->sfd
, key
, nkey
, -1, 0);
1276 conn_set_state(c
, conn_new_cmd
);
1278 if (c
->cmd
== PROTOCOL_BINARY_CMD_GATK
) {
1279 char *ofs
= c
->wbuf
+ sizeof(protocol_binary_response_header
);
1280 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
,
1282 memcpy(ofs
, key
, nkey
);
1283 add_iov(c
, ofs
, nkey
);
1284 conn_set_state(c
, conn_mwrite
);
1285 c
->write_and_go
= conn_new_cmd
;
1287 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1292 if (settings
.detail_enabled
) {
1293 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
1297 static void process_bin_get(conn
*c
) {
1300 protocol_binary_response_get
* rsp
= (protocol_binary_response_get
*)c
->wbuf
;
1301 char* key
= binary_get_key(c
);
1302 size_t nkey
= c
->binary_header
.request
.keylen
;
1304 if (settings
.verbose
> 1) {
1306 fprintf(stderr
, "<%d GET ", c
->sfd
);
1307 for (ii
= 0; ii
< nkey
; ++ii
) {
1308 fprintf(stderr
, "%c", key
[ii
]);
1310 fprintf(stderr
, "\n");
1313 it
= item_get(key
, nkey
);
1315 /* the length has two unnecessary bytes ("\r\n") */
1316 uint16_t keylen
= 0;
1317 uint32_t bodylen
= sizeof(rsp
->message
.body
) + (it
->nbytes
- 2);
1320 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1321 c
->thread
->stats
.get_cmds
++;
1322 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].get_hits
++;
1323 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1325 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
1326 it
->nbytes
, ITEM_get_cas(it
));
1328 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1332 add_bin_header(c
, 0, sizeof(rsp
->message
.body
), keylen
, bodylen
);
1333 rsp
->message
.header
.response
.cas
= htonll(ITEM_get_cas(it
));
1336 rsp
->message
.body
.flags
= htonl(strtoul(ITEM_suffix(it
), NULL
, 10));
1337 add_iov(c
, &rsp
->message
.body
, sizeof(rsp
->message
.body
));
1339 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1340 add_iov(c
, ITEM_key(it
), nkey
);
1343 /* Add the data minus the CRLF */
1344 add_iov(c
, ITEM_data(it
), it
->nbytes
- 2);
1345 conn_set_state(c
, conn_mwrite
);
1346 c
->write_and_go
= conn_new_cmd
;
1347 /* Remember this command so we can garbage collect it later */
1350 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1351 c
->thread
->stats
.get_cmds
++;
1352 c
->thread
->stats
.get_misses
++;
1353 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1355 MEMCACHED_COMMAND_GET(c
->sfd
, key
, nkey
, -1, 0);
1358 conn_set_state(c
, conn_new_cmd
);
1360 if (c
->cmd
== PROTOCOL_BINARY_CMD_GETK
) {
1361 char *ofs
= c
->wbuf
+ sizeof(protocol_binary_response_header
);
1362 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
,
1364 memcpy(ofs
, key
, nkey
);
1365 add_iov(c
, ofs
, nkey
);
1366 conn_set_state(c
, conn_mwrite
);
1367 c
->write_and_go
= conn_new_cmd
;
1369 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1374 if (settings
.detail_enabled
) {
1375 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
1379 #ifndef __INTEL_COMPILER
1380 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
1382 static void append_bin_stats(const char *key
, const uint16_t klen
,
1383 const char *val
, const uint32_t vlen
,
1385 char *buf
= c
->stats
.buffer
+ c
->stats
.offset
;
1386 uint32_t bodylen
= klen
+ vlen
;
1387 protocol_binary_response_header header
= {
1388 .response
.magic
= (uint8_t)PROTOCOL_BINARY_RES
,
1389 .response
.opcode
= (uint8_t)PROTOCOL_BINARY_CMD_STAT
,
1390 .response
.keylen
= (uint16_t)htons(klen
),
1391 .response
.extlen
= (uint8_t)0,
1392 .response
.datatype
= (uint8_t)PROTOCOL_BINARY_RAW_BYTES
,
1393 .response
.status
= (uint16_t)0,
1394 .response
.bodylen
= htonl(bodylen
),
1395 .response
.opaque
= c
->opaque
,
1396 .response
.cas
= (uint64_t)0
1399 memcpy(buf
, header
.bytes
, sizeof(header
.response
));
1400 buf
+= sizeof(header
.response
);
1403 memcpy(buf
, key
, klen
);
1407 memcpy(buf
, val
, vlen
);
1411 c
->stats
.offset
+= sizeof(header
.response
) + bodylen
;
1414 static void append_ascii_stats(const char *key
, const uint16_t klen
,
1415 const char *val
, const uint32_t vlen
,
1417 char *pos
= c
->stats
.buffer
+ c
->stats
.offset
;
1418 uint32_t nbytes
= 0;
1419 int remaining
= c
->stats
.size
- c
->stats
.offset
;
1420 int room
= remaining
- 1;
1422 if (klen
== 0 && vlen
== 0) {
1423 nbytes
= snprintf(pos
, room
, "END\r\n");
1424 } else if (vlen
== 0) {
1425 nbytes
= snprintf(pos
, room
, "STAT %s\r\n", key
);
1427 nbytes
= snprintf(pos
, room
, "STAT %s %s\r\n", key
, val
);
1430 c
->stats
.offset
+= nbytes
;
1433 static bool grow_stats_buf(conn
*c
, size_t needed
) {
1434 size_t nsize
= c
->stats
.size
;
1435 size_t available
= nsize
- c
->stats
.offset
;
1438 /* Special case: No buffer -- need to allocate fresh */
1439 if (c
->stats
.buffer
== NULL
) {
1441 available
= c
->stats
.size
= c
->stats
.offset
= 0;
1444 while (needed
> available
) {
1447 available
= nsize
- c
->stats
.offset
;
1450 if (nsize
!= c
->stats
.size
) {
1451 char *ptr
= realloc(c
->stats
.buffer
, nsize
);
1453 c
->stats
.buffer
= ptr
;
1454 c
->stats
.size
= nsize
;
1463 static void append_stats(const char *key
, const uint16_t klen
,
1464 const char *val
, const uint32_t vlen
,
1467 /* value without a key is invalid */
1468 if (klen
== 0 && vlen
> 0) {
1472 conn
*c
= (conn
*)cookie
;
1474 if (c
->protocol
== binary_prot
) {
1475 size_t needed
= vlen
+ klen
+ sizeof(protocol_binary_response_header
);
1476 if (!grow_stats_buf(c
, needed
)) {
1479 append_bin_stats(key
, klen
, val
, vlen
, c
);
1481 size_t needed
= vlen
+ klen
+ 10; // 10 == "STAT = \r\n"
1482 if (!grow_stats_buf(c
, needed
)) {
1485 append_ascii_stats(key
, klen
, val
, vlen
, c
);
1488 assert(c
->stats
.offset
<= c
->stats
.size
);
1491 static void process_bin_stat(conn
*c
) {
1492 char *subcommand
= binary_get_key(c
);
1493 size_t nkey
= c
->binary_header
.request
.keylen
;
1495 if (settings
.verbose
> 1) {
1497 fprintf(stderr
, "<%d STATS ", c
->sfd
);
1498 for (ii
= 0; ii
< nkey
; ++ii
) {
1499 fprintf(stderr
, "%c", subcommand
[ii
]);
1501 fprintf(stderr
, "\n");
1505 /* request all statistics */
1506 server_stats(&append_stats
, c
);
1507 (void)get_stats(NULL
, 0, &append_stats
, c
);
1508 } else if (strncmp(subcommand
, "reset", 5) == 0) {
1510 } else if (strncmp(subcommand
, "settings", 8) == 0) {
1511 process_stat_settings(&append_stats
, c
);
1512 } else if (strncmp(subcommand
, "detail", 6) == 0) {
1513 char *subcmd_pos
= subcommand
+ 6;
1514 if (strncmp(subcmd_pos
, " dump", 5) == 0) {
1516 char *dump_buf
= stats_prefix_dump(&len
);
1517 if (dump_buf
== NULL
|| len
<= 0) {
1518 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1521 append_stats("detailed", strlen("detailed"), dump_buf
, len
, c
);
1524 } else if (strncmp(subcmd_pos
, " on", 3) == 0) {
1525 settings
.detail_enabled
= 1;
1526 } else if (strncmp(subcmd_pos
, " off", 4) == 0) {
1527 settings
.detail_enabled
= 0;
1529 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1533 if (get_stats(subcommand
, nkey
, &append_stats
, c
)) {
1534 if (c
->stats
.buffer
== NULL
) {
1535 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1537 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
1538 c
->stats
.buffer
= NULL
;
1541 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
1547 /* Append termination package and start the transfer */
1548 append_stats(NULL
, 0, NULL
, 0, c
);
1549 if (c
->stats
.buffer
== NULL
) {
1550 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, 0);
1552 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
1553 c
->stats
.buffer
= NULL
;
1557 static void bin_read_key(conn
*c
, enum bin_substates next_substate
, int extra
) {
1559 c
->substate
= next_substate
;
1560 c
->rlbytes
= c
->keylen
+ extra
;
1562 /* Ok... do we have room for the extras and the key in the input buffer? */
1563 ptrdiff_t offset
= c
->rcurr
+ sizeof(protocol_binary_request_header
) - c
->rbuf
;
1564 if (c
->rlbytes
> c
->rsize
- offset
) {
1565 size_t nsize
= c
->rsize
;
1566 size_t size
= c
->rlbytes
+ sizeof(protocol_binary_request_header
);
1568 while (size
> nsize
) {
1572 if (nsize
!= c
->rsize
) {
1573 if (settings
.verbose
> 1) {
1574 fprintf(stderr
, "%d: Need to grow buffer from %lu to %lu\n",
1575 c
->sfd
, (unsigned long)c
->rsize
, (unsigned long)nsize
);
1577 char *newm
= realloc(c
->rbuf
, nsize
);
1579 if (settings
.verbose
) {
1580 fprintf(stderr
, "%d: Failed to grow buffer.. closing connection\n",
1583 conn_set_state(c
, conn_closing
);
1588 /* rcurr should point to the same offset in the packet */
1589 c
->rcurr
= c
->rbuf
+ offset
- sizeof(protocol_binary_request_header
);
1592 if (c
->rbuf
!= c
->rcurr
) {
1593 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
1595 if (settings
.verbose
> 1) {
1596 fprintf(stderr
, "%d: Repack input buffer\n", c
->sfd
);
1601 /* preserve the header in the buffer.. */
1602 c
->ritem
= c
->rcurr
+ sizeof(protocol_binary_request_header
);
1603 conn_set_state(c
, conn_nread
);
1606 /* Just write an error message and disconnect the client */
1607 static void handle_binary_protocol_error(conn
*c
) {
1608 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_EINVAL
, 0);
1609 if (settings
.verbose
) {
1610 fprintf(stderr
, "Protocol error (opcode %02x), close connection %d\n",
1611 c
->binary_header
.request
.opcode
, c
->sfd
);
1613 c
->write_and_go
= conn_closing
;
1616 static void init_sasl_conn(conn
*c
) {
1618 /* should something else be returned? */
1622 if (!c
->sasl_conn
) {
1623 int result
=sasl_server_new("memcached",
1625 my_sasl_hostname
[0] ? my_sasl_hostname
: NULL
,
1627 NULL
, 0, &c
->sasl_conn
);
1628 if (result
!= SASL_OK
) {
1629 if (settings
.verbose
) {
1630 fprintf(stderr
, "Failed to initialize SASL conn.\n");
1632 c
->sasl_conn
= NULL
;
1637 static void bin_list_sasl_mechs(conn
*c
) {
1638 // Guard against a disabled SASL.
1639 if (!settings
.sasl
) {
1640 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
,
1641 c
->binary_header
.request
.bodylen
1642 - c
->binary_header
.request
.keylen
);
1647 const char *result_string
= NULL
;
1648 unsigned int string_length
= 0;
1649 int result
=sasl_listmech(c
->sasl_conn
, NULL
,
1650 "", /* What to prepend the string with */
1651 " ", /* What to separate mechanisms with */
1652 "", /* What to append to the string */
1653 &result_string
, &string_length
,
1655 if (result
!= SASL_OK
) {
1656 /* Perhaps there's a better error for this... */
1657 if (settings
.verbose
) {
1658 fprintf(stderr
, "Failed to list SASL mechanisms.\n");
1660 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1663 write_bin_response(c
, (char*)result_string
, 0, 0, string_length
);
1666 static void process_bin_sasl_auth(conn
*c
) {
1667 // Guard for handling disabled SASL on the server.
1668 if (!settings
.sasl
) {
1669 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
,
1670 c
->binary_header
.request
.bodylen
1671 - c
->binary_header
.request
.keylen
);
1675 assert(c
->binary_header
.request
.extlen
== 0);
1677 int nkey
= c
->binary_header
.request
.keylen
;
1678 int vlen
= c
->binary_header
.request
.bodylen
- nkey
;
1680 if (nkey
> MAX_SASL_MECH_LEN
) {
1681 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_EINVAL
, vlen
);
1682 c
->write_and_go
= conn_swallow
;
1686 char *key
= binary_get_key(c
);
1689 item
*it
= item_alloc(key
, nkey
, 0, 0, vlen
);
1692 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
1693 c
->write_and_go
= conn_swallow
;
1698 c
->ritem
= ITEM_data(it
);
1700 conn_set_state(c
, conn_nread
);
1701 c
->substate
= bin_reading_sasl_auth_data
;
1704 static void process_bin_complete_sasl_auth(conn
*c
) {
1705 assert(settings
.sasl
);
1706 const char *out
= NULL
;
1707 unsigned int outlen
= 0;
1712 int nkey
= c
->binary_header
.request
.keylen
;
1713 int vlen
= c
->binary_header
.request
.bodylen
- nkey
;
1716 memcpy(mech
, ITEM_key((item
*)c
->item
), nkey
);
1719 if (settings
.verbose
)
1720 fprintf(stderr
, "mech: ``%s'' with %d bytes of data\n", mech
, vlen
);
1722 const char *challenge
= vlen
== 0 ? NULL
: ITEM_data((item
*) c
->item
);
1727 case PROTOCOL_BINARY_CMD_SASL_AUTH
:
1728 result
= sasl_server_start(c
->sasl_conn
, mech
,
1732 case PROTOCOL_BINARY_CMD_SASL_STEP
:
1733 result
= sasl_server_step(c
->sasl_conn
,
1738 assert(false); /* CMD should be one of the above */
1739 /* This code is pretty much impossible, but makes the compiler
1741 if (settings
.verbose
) {
1742 fprintf(stderr
, "Unhandled command %d with challenge %s\n",
1748 item_unlink(c
->item
);
1750 if (settings
.verbose
) {
1751 fprintf(stderr
, "sasl result code: %d\n", result
);
1756 write_bin_response(c
, "Authenticated", 0, 0, strlen("Authenticated"));
1757 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1758 c
->thread
->stats
.auth_cmds
++;
1759 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1762 add_bin_header(c
, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE
, 0, 0, outlen
);
1764 add_iov(c
, out
, outlen
);
1766 conn_set_state(c
, conn_mwrite
);
1767 c
->write_and_go
= conn_new_cmd
;
1770 if (settings
.verbose
)
1771 fprintf(stderr
, "Unknown sasl response: %d\n", result
);
1772 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1773 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
1774 c
->thread
->stats
.auth_cmds
++;
1775 c
->thread
->stats
.auth_errors
++;
1776 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
1780 static bool authenticated(conn
*c
) {
1781 assert(settings
.sasl
);
1785 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS
: /* FALLTHROUGH */
1786 case PROTOCOL_BINARY_CMD_SASL_AUTH
: /* FALLTHROUGH */
1787 case PROTOCOL_BINARY_CMD_SASL_STEP
: /* FALLTHROUGH */
1788 case PROTOCOL_BINARY_CMD_VERSION
: /* FALLTHROUGH */
1793 const void *uname
= NULL
;
1794 sasl_getprop(c
->sasl_conn
, SASL_USERNAME
, &uname
);
1799 if (settings
.verbose
> 1) {
1800 fprintf(stderr
, "authenticated() in cmd 0x%02x is %s\n",
1801 c
->cmd
, rv
? "true" : "false");
1807 static void dispatch_bin_command(conn
*c
) {
1808 int protocol_error
= 0;
1810 int extlen
= c
->binary_header
.request
.extlen
;
1811 int keylen
= c
->binary_header
.request
.keylen
;
1812 uint32_t bodylen
= c
->binary_header
.request
.bodylen
;
1814 if (settings
.sasl
&& !authenticated(c
)) {
1815 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR
, 0);
1816 c
->write_and_go
= conn_closing
;
1820 MEMCACHED_PROCESS_COMMAND_START(c
->sfd
, c
->rcurr
, c
->rbytes
);
1823 /* binprot supports 16bit keys, but internals are still 8bit */
1824 if (keylen
> KEY_MAX_LENGTH
) {
1825 handle_binary_protocol_error(c
);
1830 case PROTOCOL_BINARY_CMD_SETQ
:
1831 c
->cmd
= PROTOCOL_BINARY_CMD_SET
;
1833 case PROTOCOL_BINARY_CMD_ADDQ
:
1834 c
->cmd
= PROTOCOL_BINARY_CMD_ADD
;
1836 case PROTOCOL_BINARY_CMD_REPLACEQ
:
1837 c
->cmd
= PROTOCOL_BINARY_CMD_REPLACE
;
1839 case PROTOCOL_BINARY_CMD_DELETEQ
:
1840 c
->cmd
= PROTOCOL_BINARY_CMD_DELETE
;
1842 case PROTOCOL_BINARY_CMD_INCREMENTQ
:
1843 c
->cmd
= PROTOCOL_BINARY_CMD_INCREMENT
;
1845 case PROTOCOL_BINARY_CMD_DECREMENTQ
:
1846 c
->cmd
= PROTOCOL_BINARY_CMD_DECREMENT
;
1848 case PROTOCOL_BINARY_CMD_QUITQ
:
1849 c
->cmd
= PROTOCOL_BINARY_CMD_QUIT
;
1851 case PROTOCOL_BINARY_CMD_FLUSHQ
:
1852 c
->cmd
= PROTOCOL_BINARY_CMD_FLUSH
;
1854 case PROTOCOL_BINARY_CMD_APPENDQ
:
1855 c
->cmd
= PROTOCOL_BINARY_CMD_APPEND
;
1857 case PROTOCOL_BINARY_CMD_PREPENDQ
:
1858 c
->cmd
= PROTOCOL_BINARY_CMD_PREPEND
;
1860 case PROTOCOL_BINARY_CMD_GETQ
:
1861 c
->cmd
= PROTOCOL_BINARY_CMD_GET
;
1863 case PROTOCOL_BINARY_CMD_GETKQ
:
1864 c
->cmd
= PROTOCOL_BINARY_CMD_GETK
;
1866 case PROTOCOL_BINARY_CMD_GATQ
:
1867 c
->cmd
= PROTOCOL_BINARY_CMD_GAT
;
1869 case PROTOCOL_BINARY_CMD_GATKQ
:
1870 c
->cmd
= PROTOCOL_BINARY_CMD_GAT
;
1877 case PROTOCOL_BINARY_CMD_VERSION
:
1878 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1879 write_bin_response(c
, RVERSION
, 0, 0, strlen(RVERSION
));
1884 case PROTOCOL_BINARY_CMD_FLUSH
:
1885 if (keylen
== 0 && bodylen
== extlen
&& (extlen
== 0 || extlen
== 4)) {
1886 bin_read_key(c
, bin_read_flush_exptime
, extlen
);
1891 case PROTOCOL_BINARY_CMD_NOOP
:
1892 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1893 write_bin_response(c
, NULL
, 0, 0, 0);
1898 case PROTOCOL_BINARY_CMD_SET
: /* FALLTHROUGH */
1899 case PROTOCOL_BINARY_CMD_ADD
: /* FALLTHROUGH */
1900 case PROTOCOL_BINARY_CMD_REPLACE
:
1901 if (extlen
== 8 && keylen
!= 0 && bodylen
>= (keylen
+ 8)) {
1902 bin_read_key(c
, bin_reading_set_header
, 8);
1907 case PROTOCOL_BINARY_CMD_GETQ
: /* FALLTHROUGH */
1908 case PROTOCOL_BINARY_CMD_GET
: /* FALLTHROUGH */
1909 case PROTOCOL_BINARY_CMD_GETKQ
: /* FALLTHROUGH */
1910 case PROTOCOL_BINARY_CMD_GETK
:
1911 if (extlen
== 0 && bodylen
== keylen
&& keylen
> 0) {
1912 bin_read_key(c
, bin_reading_get_key
, 0);
1917 case PROTOCOL_BINARY_CMD_DELETE
:
1918 if (keylen
> 0 && extlen
== 0 && bodylen
== keylen
) {
1919 bin_read_key(c
, bin_reading_del_header
, extlen
);
1924 case PROTOCOL_BINARY_CMD_INCREMENT
:
1925 case PROTOCOL_BINARY_CMD_DECREMENT
:
1926 if (keylen
> 0 && extlen
== 20 && bodylen
== (keylen
+ extlen
)) {
1927 bin_read_key(c
, bin_reading_incr_header
, 20);
1932 case PROTOCOL_BINARY_CMD_APPEND
:
1933 case PROTOCOL_BINARY_CMD_PREPEND
:
1934 if (keylen
> 0 && extlen
== 0) {
1935 bin_read_key(c
, bin_reading_set_header
, 0);
1940 case PROTOCOL_BINARY_CMD_STAT
:
1942 bin_read_key(c
, bin_reading_stat
, 0);
1947 case PROTOCOL_BINARY_CMD_QUIT
:
1948 if (keylen
== 0 && extlen
== 0 && bodylen
== 0) {
1949 write_bin_response(c
, NULL
, 0, 0, 0);
1950 c
->write_and_go
= conn_closing
;
1952 conn_set_state(c
, conn_closing
);
1958 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS
:
1959 if (extlen
== 0 && keylen
== 0 && bodylen
== 0) {
1960 bin_list_sasl_mechs(c
);
1965 case PROTOCOL_BINARY_CMD_SASL_AUTH
:
1966 case PROTOCOL_BINARY_CMD_SASL_STEP
:
1967 if (extlen
== 0 && keylen
!= 0) {
1968 bin_read_key(c
, bin_reading_sasl_auth
, 0);
1973 case PROTOCOL_BINARY_CMD_TOUCH
:
1974 case PROTOCOL_BINARY_CMD_GAT
:
1975 case PROTOCOL_BINARY_CMD_GATQ
:
1976 case PROTOCOL_BINARY_CMD_GATK
:
1977 case PROTOCOL_BINARY_CMD_GATKQ
:
1978 if (extlen
== 4 && keylen
!= 0) {
1979 bin_read_key(c
, bin_reading_touch_key
, 4);
1985 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
, bodylen
);
1989 handle_binary_protocol_error(c
);
1992 static void process_bin_update(conn
*c
) {
1997 protocol_binary_request_set
* req
= binary_get_request(c
);
2001 key
= binary_get_key(c
);
2002 nkey
= c
->binary_header
.request
.keylen
;
2004 /* fix byteorder in the request */
2005 req
->message
.body
.flags
= ntohl(req
->message
.body
.flags
);
2006 req
->message
.body
.expiration
= ntohl(req
->message
.body
.expiration
);
2008 vlen
= c
->binary_header
.request
.bodylen
- (nkey
+ c
->binary_header
.request
.extlen
);
2010 if (settings
.verbose
> 1) {
2012 if (c
->cmd
== PROTOCOL_BINARY_CMD_ADD
) {
2013 fprintf(stderr
, "<%d ADD ", c
->sfd
);
2014 } else if (c
->cmd
== PROTOCOL_BINARY_CMD_SET
) {
2015 fprintf(stderr
, "<%d SET ", c
->sfd
);
2017 fprintf(stderr
, "<%d REPLACE ", c
->sfd
);
2019 for (ii
= 0; ii
< nkey
; ++ii
) {
2020 fprintf(stderr
, "%c", key
[ii
]);
2023 fprintf(stderr
, " Value len is %d", vlen
);
2024 fprintf(stderr
, "\n");
2027 if (settings
.detail_enabled
) {
2028 stats_prefix_record_set(key
, nkey
);
2031 it
= item_alloc(key
, nkey
, req
->message
.body
.flags
,
2032 realtime(req
->message
.body
.expiration
), vlen
+2);
2035 if (! item_size_ok(nkey
, req
->message
.body
.flags
, vlen
+ 2)) {
2036 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_E2BIG
, vlen
);
2038 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
2041 /* Avoid stale data persisting in cache because we failed alloc.
2042 * Unacceptable for SET. Anywhere else too? */
2043 if (c
->cmd
== PROTOCOL_BINARY_CMD_SET
) {
2044 it
= item_get(key
, nkey
);
2051 /* swallow the data line */
2052 c
->write_and_go
= conn_swallow
;
2056 ITEM_set_cas(it
, c
->binary_header
.request
.cas
);
2059 case PROTOCOL_BINARY_CMD_ADD
:
2062 case PROTOCOL_BINARY_CMD_SET
:
2065 case PROTOCOL_BINARY_CMD_REPLACE
:
2066 c
->cmd
= NREAD_REPLACE
;
2072 if (ITEM_get_cas(it
) != 0) {
2077 c
->ritem
= ITEM_data(it
);
2079 conn_set_state(c
, conn_nread
);
2080 c
->substate
= bin_read_set_value
;
2083 static void process_bin_append_prepend(conn
*c
) {
2091 key
= binary_get_key(c
);
2092 nkey
= c
->binary_header
.request
.keylen
;
2093 vlen
= c
->binary_header
.request
.bodylen
- nkey
;
2095 if (settings
.verbose
> 1) {
2096 fprintf(stderr
, "Value len is %d\n", vlen
);
2099 if (settings
.detail_enabled
) {
2100 stats_prefix_record_set(key
, nkey
);
2103 it
= item_alloc(key
, nkey
, 0, 0, vlen
+2);
2106 if (! item_size_ok(nkey
, 0, vlen
+ 2)) {
2107 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_E2BIG
, vlen
);
2109 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_ENOMEM
, vlen
);
2111 /* swallow the data line */
2112 c
->write_and_go
= conn_swallow
;
2116 ITEM_set_cas(it
, c
->binary_header
.request
.cas
);
2119 case PROTOCOL_BINARY_CMD_APPEND
:
2120 c
->cmd
= NREAD_APPEND
;
2122 case PROTOCOL_BINARY_CMD_PREPEND
:
2123 c
->cmd
= NREAD_PREPEND
;
2130 c
->ritem
= ITEM_data(it
);
2132 conn_set_state(c
, conn_nread
);
2133 c
->substate
= bin_read_set_value
;
2136 static void process_bin_flush(conn
*c
) {
2138 protocol_binary_request_flush
* req
= binary_get_request(c
);
2140 if (c
->binary_header
.request
.extlen
== sizeof(req
->message
.body
)) {
2141 exptime
= ntohl(req
->message
.body
.expiration
);
2145 settings
.oldest_live
= realtime(exptime
) - 1;
2147 settings
.oldest_live
= current_time
- 1;
2149 item_flush_expired();
2151 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2152 c
->thread
->stats
.flush_cmds
++;
2153 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2155 write_bin_response(c
, NULL
, 0, 0, 0);
2158 static void process_bin_delete(conn
*c
) {
2161 protocol_binary_request_delete
* req
= binary_get_request(c
);
2163 char* key
= binary_get_key(c
);
2164 size_t nkey
= c
->binary_header
.request
.keylen
;
2168 if (settings
.verbose
> 1) {
2169 fprintf(stderr
, "Deleting %s\n", key
);
2172 if (settings
.detail_enabled
) {
2173 stats_prefix_record_delete(key
, nkey
);
2176 it
= item_get(key
, nkey
);
2178 uint64_t cas
= ntohll(req
->message
.header
.request
.cas
);
2179 if (cas
== 0 || cas
== ITEM_get_cas(it
)) {
2180 MEMCACHED_COMMAND_DELETE(c
->sfd
, ITEM_key(it
), it
->nkey
);
2181 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2182 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].delete_hits
++;
2183 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2185 write_bin_response(c
, NULL
, 0, 0, 0);
2187 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS
, 0);
2189 item_remove(it
); /* release our reference */
2191 write_bin_error(c
, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT
, 0);
2192 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2193 c
->thread
->stats
.delete_misses
++;
2194 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2198 static void complete_nread_binary(conn
*c
) {
2200 assert(c
->cmd
>= 0);
2202 switch(c
->substate
) {
2203 case bin_reading_set_header
:
2204 if (c
->cmd
== PROTOCOL_BINARY_CMD_APPEND
||
2205 c
->cmd
== PROTOCOL_BINARY_CMD_PREPEND
) {
2206 process_bin_append_prepend(c
);
2208 process_bin_update(c
);
2211 case bin_read_set_value
:
2212 complete_update_bin(c
);
2214 case bin_reading_get_key
:
2217 case bin_reading_touch_key
:
2218 process_bin_touch(c
);
2220 case bin_reading_stat
:
2221 process_bin_stat(c
);
2223 case bin_reading_del_header
:
2224 process_bin_delete(c
);
2226 case bin_reading_incr_header
:
2227 complete_incr_bin(c
);
2229 case bin_read_flush_exptime
:
2230 process_bin_flush(c
);
2232 case bin_reading_sasl_auth
:
2233 process_bin_sasl_auth(c
);
2235 case bin_reading_sasl_auth_data
:
2236 process_bin_complete_sasl_auth(c
);
2239 fprintf(stderr
, "Not handling substate %d\n", c
->substate
);
2244 static void reset_cmd_handler(conn
*c
) {
2246 c
->substate
= bin_no_state
;
2247 if(c
->item
!= NULL
) {
2248 item_remove(c
->item
);
2252 if (c
->rbytes
> 0) {
2253 conn_set_state(c
, conn_parse_cmd
);
2255 conn_set_state(c
, conn_waiting
);
2259 static void complete_nread(conn
*c
) {
2261 assert(c
->protocol
== ascii_prot
2262 || c
->protocol
== binary_prot
);
2264 if (c
->protocol
== ascii_prot
) {
2265 complete_nread_ascii(c
);
2266 } else if (c
->protocol
== binary_prot
) {
2267 complete_nread_binary(c
);
2272 * Stores an item in the cache according to the semantics of one of the set
2273 * commands. In threaded mode, this is protected by the cache lock.
2275 * Returns the state of storage.
2277 enum store_item_type
do_store_item(item
*it
, int comm
, conn
*c
, const uint32_t hv
) {
2278 char *key
= ITEM_key(it
);
2279 item
*old_it
= do_item_get(key
, it
->nkey
, hv
);
2280 enum store_item_type stored
= NOT_STORED
;
2282 item
*new_it
= NULL
;
2285 if (old_it
!= NULL
&& comm
== NREAD_ADD
) {
2286 /* add only adds a nonexistent item, but promote to head of LRU */
2287 do_item_update(old_it
);
2288 } else if (!old_it
&& (comm
== NREAD_REPLACE
2289 || comm
== NREAD_APPEND
|| comm
== NREAD_PREPEND
))
2291 /* replace only replaces an existing value; don't store */
2292 } else if (comm
== NREAD_CAS
) {
2293 /* validate cas operation */
2294 if(old_it
== NULL
) {
2297 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2298 c
->thread
->stats
.cas_misses
++;
2299 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2301 else if (ITEM_get_cas(it
) == ITEM_get_cas(old_it
)) {
2303 // it and old_it may belong to different classes.
2304 // I'm updating the stats for the one that's getting pushed out
2305 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2306 c
->thread
->stats
.slab_stats
[old_it
->slabs_clsid
].cas_hits
++;
2307 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2309 item_replace(old_it
, it
, hv
);
2312 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2313 c
->thread
->stats
.slab_stats
[old_it
->slabs_clsid
].cas_badval
++;
2314 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2316 if(settings
.verbose
> 1) {
2317 fprintf(stderr
, "CAS: failure: expected %llu, got %llu\n",
2318 (unsigned long long)ITEM_get_cas(old_it
),
2319 (unsigned long long)ITEM_get_cas(it
));
2325 * Append - combine new and old record into single one. Here it's
2326 * atomic and thread-safe.
2328 if (comm
== NREAD_APPEND
|| comm
== NREAD_PREPEND
) {
2332 if (ITEM_get_cas(it
) != 0) {
2333 // CAS much be equal
2334 if (ITEM_get_cas(it
) != ITEM_get_cas(old_it
)) {
2339 if (stored
== NOT_STORED
) {
2340 /* we have it and old_it here - alloc memory to hold both */
2341 /* flags was already lost - so recover them from ITEM_suffix(it) */
2343 flags
= (int) strtol(ITEM_suffix(old_it
), (char **) NULL
, 10);
2345 new_it
= item_alloc(key
, it
->nkey
, flags
, old_it
->exptime
, it
->nbytes
+ old_it
->nbytes
- 2 /* CRLF */);
2347 if (new_it
== NULL
) {
2348 /* SERVER_ERROR out of memory */
2350 do_item_remove(old_it
);
2355 /* copy data from it and old_it to new_it */
2357 if (comm
== NREAD_APPEND
) {
2358 memcpy(ITEM_data(new_it
), ITEM_data(old_it
), old_it
->nbytes
);
2359 memcpy(ITEM_data(new_it
) + old_it
->nbytes
- 2 /* CRLF */, ITEM_data(it
), it
->nbytes
);
2362 memcpy(ITEM_data(new_it
), ITEM_data(it
), it
->nbytes
);
2363 memcpy(ITEM_data(new_it
) + it
->nbytes
- 2 /* CRLF */, ITEM_data(old_it
), old_it
->nbytes
);
2370 if (stored
== NOT_STORED
) {
2372 item_replace(old_it
, it
, hv
);
2374 do_item_link(it
, hv
);
2376 c
->cas
= ITEM_get_cas(it
);
2383 do_item_remove(old_it
); /* release our reference */
2385 do_item_remove(new_it
);
2387 if (stored
== STORED
) {
2388 c
->cas
= ITEM_get_cas(it
);
2394 typedef struct token_s
{
2399 #define COMMAND_TOKEN 0
2400 #define SUBCOMMAND_TOKEN 1
2403 #define MAX_TOKENS 8
2406 * Tokenize the command string by replacing whitespace with '\0' and update
2407 * the token array tokens with pointer to start of each token and length.
2408 * Returns total number of tokens. The last valid token is the terminal
2409 * token (value points to the first unprocessed character of the string and
2414 * while(tokenize_command(command, ncommand, tokens, max_tokens) > 0) {
2415 * for(int ix = 0; tokens[ix].length != 0; ix++) {
2418 * ncommand = tokens[ix].value - command;
2419 * command = tokens[ix].value;
2422 static size_t tokenize_command(char *command
, token_t
*tokens
, const size_t max_tokens
) {
2425 size_t len
= strlen(command
);
2428 assert(command
!= NULL
&& tokens
!= NULL
&& max_tokens
> 1);
2431 for (i
= 0; i
< len
; i
++) {
2434 tokens
[ntokens
].value
= s
;
2435 tokens
[ntokens
].length
= e
- s
;
2438 if (ntokens
== max_tokens
- 1) {
2440 s
= e
; /* so we don't add an extra token */
2450 tokens
[ntokens
].value
= s
;
2451 tokens
[ntokens
].length
= e
- s
;
2456 * If we scanned the whole string, the terminal value pointer is null,
2457 * otherwise it is the first unprocessed character.
2459 tokens
[ntokens
].value
= *e
== '\0' ? NULL
: e
;
2460 tokens
[ntokens
].length
= 0;
2466 /* set up a connection to write a buffer then free it, used for stats */
2467 static void write_and_free(conn
*c
, char *buf
, int bytes
) {
2469 c
->write_and_free
= buf
;
2472 conn_set_state(c
, conn_write
);
2473 c
->write_and_go
= conn_new_cmd
;
2475 out_string(c
, "SERVER_ERROR out of memory writing stats");
2479 static inline bool set_noreply_maybe(conn
*c
, token_t
*tokens
, size_t ntokens
)
2481 int noreply_index
= ntokens
- 2;
2484 NOTE: this function is not the first place where we are going to
2485 send the reply. We could send it instead from process_command()
2486 if the request line has wrong number of tokens. However parsing
2487 malformed line for "noreply" option is not reliable anyway, so
2490 if (tokens
[noreply_index
].value
2491 && strcmp(tokens
[noreply_index
].value
, "noreply") == 0) {
2497 void append_stat(const char *name
, ADD_STAT add_stats
, conn
*c
,
2498 const char *fmt
, ...) {
2499 char val_str
[STAT_VAL_LEN
];
2509 vlen
= vsnprintf(val_str
, sizeof(val_str
) - 1, fmt
, ap
);
2512 add_stats(name
, strlen(name
), val_str
, vlen
, c
);
2515 inline static void process_stats_detail(conn
*c
, const char *command
) {
2518 if (strcmp(command
, "on") == 0) {
2519 settings
.detail_enabled
= 1;
2520 out_string(c
, "OK");
2522 else if (strcmp(command
, "off") == 0) {
2523 settings
.detail_enabled
= 0;
2524 out_string(c
, "OK");
2526 else if (strcmp(command
, "dump") == 0) {
2528 char *stats
= stats_prefix_dump(&len
);
2529 write_and_free(c
, stats
, len
);
2532 out_string(c
, "CLIENT_ERROR usage: stats detail on|off|dump");
2536 /* return server specific stats only */
2537 static void server_stats(ADD_STAT add_stats
, conn
*c
) {
2538 pid_t pid
= getpid();
2539 rel_time_t now
= current_time
;
2541 struct thread_stats thread_stats
;
2542 threadlocal_stats_aggregate(&thread_stats
);
2543 struct slab_stats slab_stats
;
2544 slab_stats_aggregate(&thread_stats
, &slab_stats
);
2547 struct rusage usage
;
2548 getrusage(RUSAGE_SELF
, &usage
);
2553 APPEND_STAT("pid", "%lu", (long)pid
);
2554 APPEND_STAT("uptime", "%u", now
);
2555 APPEND_STAT("time", "%ld", now
+ (long)process_started
);
2556 APPEND_STAT("version", "%s", RVERSION
);
2557 APPEND_STAT("libevent", "%s", event_get_version());
2558 APPEND_STAT("pointer_size", "%d", (int)(8 * sizeof(void *)));
2561 append_stat("rusage_user", add_stats
, c
, "%ld.%06ld",
2562 (long)usage
.ru_utime
.tv_sec
,
2563 (long)usage
.ru_utime
.tv_usec
);
2564 append_stat("rusage_system", add_stats
, c
, "%ld.%06ld",
2565 (long)usage
.ru_stime
.tv_sec
,
2566 (long)usage
.ru_stime
.tv_usec
);
2569 APPEND_STAT("curr_connections", "%u", stats
.curr_conns
- 1);
2570 APPEND_STAT("total_connections", "%u", stats
.total_conns
);
2571 if (settings
.maxconns_fast
) {
2572 APPEND_STAT("rejected_connections", "%llu", (unsigned long long)stats
.rejected_conns
);
2574 APPEND_STAT("connection_structures", "%u", stats
.conn_structs
);
2575 APPEND_STAT("reserved_fds", "%u", stats
.reserved_fds
);
2576 APPEND_STAT("cmd_get", "%llu", (unsigned long long)thread_stats
.get_cmds
);
2577 APPEND_STAT("cmd_set", "%llu", (unsigned long long)slab_stats
.set_cmds
);
2578 APPEND_STAT("cmd_flush", "%llu", (unsigned long long)thread_stats
.flush_cmds
);
2579 APPEND_STAT("cmd_touch", "%llu", (unsigned long long)thread_stats
.touch_cmds
);
2580 APPEND_STAT("get_hits", "%llu", (unsigned long long)slab_stats
.get_hits
);
2581 APPEND_STAT("get_misses", "%llu", (unsigned long long)thread_stats
.get_misses
);
2582 APPEND_STAT("delete_misses", "%llu", (unsigned long long)thread_stats
.delete_misses
);
2583 APPEND_STAT("delete_hits", "%llu", (unsigned long long)slab_stats
.delete_hits
);
2584 APPEND_STAT("incr_misses", "%llu", (unsigned long long)thread_stats
.incr_misses
);
2585 APPEND_STAT("incr_hits", "%llu", (unsigned long long)slab_stats
.incr_hits
);
2586 APPEND_STAT("decr_misses", "%llu", (unsigned long long)thread_stats
.decr_misses
);
2587 APPEND_STAT("decr_hits", "%llu", (unsigned long long)slab_stats
.decr_hits
);
2588 APPEND_STAT("cas_misses", "%llu", (unsigned long long)thread_stats
.cas_misses
);
2589 APPEND_STAT("cas_hits", "%llu", (unsigned long long)slab_stats
.cas_hits
);
2590 APPEND_STAT("cas_badval", "%llu", (unsigned long long)slab_stats
.cas_badval
);
2591 APPEND_STAT("touch_hits", "%llu", (unsigned long long)slab_stats
.touch_hits
);
2592 APPEND_STAT("touch_misses", "%llu", (unsigned long long)thread_stats
.touch_misses
);
2593 APPEND_STAT("auth_cmds", "%llu", (unsigned long long)thread_stats
.auth_cmds
);
2594 APPEND_STAT("auth_errors", "%llu", (unsigned long long)thread_stats
.auth_errors
);
2595 APPEND_STAT("bytes_read", "%llu", (unsigned long long)thread_stats
.bytes_read
);
2596 APPEND_STAT("bytes_written", "%llu", (unsigned long long)thread_stats
.bytes_written
);
2597 APPEND_STAT("limit_maxbytes", "%llu", (unsigned long long)settings
.maxbytes
);
2598 APPEND_STAT("accepting_conns", "%u", stats
.accepting_conns
);
2599 APPEND_STAT("listen_disabled_num", "%llu", (unsigned long long)stats
.listen_disabled_num
);
2600 APPEND_STAT("threads", "%d", settings
.num_threads
);
2601 APPEND_STAT("conn_yields", "%llu", (unsigned long long)thread_stats
.conn_yields
);
2602 APPEND_STAT("hash_power_level", "%u", stats
.hash_power_level
);
2603 APPEND_STAT("hash_bytes", "%llu", (unsigned long long)stats
.hash_bytes
);
2604 APPEND_STAT("hash_is_expanding", "%u", stats
.hash_is_expanding
);
2605 APPEND_STAT("expired_unfetched", "%llu", stats
.expired_unfetched
);
2606 APPEND_STAT("evicted_unfetched", "%llu", stats
.evicted_unfetched
);
2607 if (settings
.slab_reassign
) {
2608 APPEND_STAT("slab_reassign_running", "%u", stats
.slab_reassign_running
);
2609 APPEND_STAT("slabs_moved", "%llu", stats
.slabs_moved
);
2614 static void process_stat_settings(ADD_STAT add_stats
, void *c
) {
2616 APPEND_STAT("maxbytes", "%u", (unsigned int)settings
.maxbytes
);
2617 APPEND_STAT("maxconns", "%d", settings
.maxconns
);
2618 APPEND_STAT("tcpport", "%d", settings
.port
);
2619 APPEND_STAT("udpport", "%d", settings
.udpport
);
2620 APPEND_STAT("inter", "%s", settings
.inter
? settings
.inter
: "NULL");
2621 APPEND_STAT("verbosity", "%d", settings
.verbose
);
2622 APPEND_STAT("oldest", "%lu", (unsigned long)settings
.oldest_live
);
2623 APPEND_STAT("evictions", "%s", settings
.evict_to_free
? "on" : "off");
2624 APPEND_STAT("domain_socket", "%s",
2625 settings
.socketpath
? settings
.socketpath
: "NULL");
2626 APPEND_STAT("umask", "%o", settings
.access
);
2627 APPEND_STAT("growth_factor", "%.2f", settings
.factor
);
2628 APPEND_STAT("chunk_size", "%d", settings
.chunk_size
);
2629 APPEND_STAT("num_threads", "%d", settings
.num_threads
);
2630 APPEND_STAT("num_threads_per_udp", "%d", settings
.num_threads_per_udp
);
2631 APPEND_STAT("stat_key_prefix", "%c", settings
.prefix_delimiter
);
2632 APPEND_STAT("detail_enabled", "%s",
2633 settings
.detail_enabled
? "yes" : "no");
2634 APPEND_STAT("reqs_per_event", "%d", settings
.reqs_per_event
);
2635 APPEND_STAT("cas_enabled", "%s", settings
.use_cas
? "yes" : "no");
2636 APPEND_STAT("tcp_backlog", "%d", settings
.backlog
);
2637 APPEND_STAT("binding_protocol", "%s",
2638 prot_text(settings
.binding_protocol
));
2639 APPEND_STAT("auth_enabled_sasl", "%s", settings
.sasl
? "yes" : "no");
2640 APPEND_STAT("item_size_max", "%d", settings
.item_size_max
);
2641 APPEND_STAT("maxconns_fast", "%s", settings
.maxconns_fast
? "yes" : "no");
2642 APPEND_STAT("hashpower_init", "%d", settings
.hashpower_init
);
2643 APPEND_STAT("slab_reassign", "%s", settings
.slab_reassign
? "yes" : "no");
2644 APPEND_STAT("slab_automove", "%s", settings
.slab_automove
? "yes" : "no");
2647 static void process_stat(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
2648 const char *subcommand
= tokens
[SUBCOMMAND_TOKEN
].value
;
2652 out_string(c
, "CLIENT_ERROR bad command line");
2657 server_stats(&append_stats
, c
);
2658 (void)get_stats(NULL
, 0, &append_stats
, c
);
2659 } else if (strcmp(subcommand
, "reset") == 0) {
2661 out_string(c
, "RESET");
2663 } else if (strcmp(subcommand
, "detail") == 0) {
2664 /* NOTE: how to tackle detail with binary? */
2666 process_stats_detail(c
, ""); /* outputs the error message */
2668 process_stats_detail(c
, tokens
[2].value
);
2669 /* Output already generated */
2671 } else if (strcmp(subcommand
, "settings") == 0) {
2672 process_stat_settings(&append_stats
, c
);
2673 } else if (strcmp(subcommand
, "cachedump") == 0) {
2675 unsigned int bytes
, id
, limit
= 0;
2678 out_string(c
, "CLIENT_ERROR bad command line");
2682 if (!safe_strtoul(tokens
[2].value
, &id
) ||
2683 !safe_strtoul(tokens
[3].value
, &limit
)) {
2684 out_string(c
, "CLIENT_ERROR bad command line format");
2688 if (id
>= POWER_LARGEST
) {
2689 out_string(c
, "CLIENT_ERROR Illegal slab id");
2693 buf
= item_cachedump(id
, limit
, &bytes
);
2694 write_and_free(c
, buf
, bytes
);
2697 /* getting here means that the subcommand is either engine specific or
2698 is invalid. query the engine and see. */
2699 if (get_stats(subcommand
, strlen(subcommand
), &append_stats
, c
)) {
2700 if (c
->stats
.buffer
== NULL
) {
2701 out_string(c
, "SERVER_ERROR out of memory writing stats");
2703 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
2704 c
->stats
.buffer
= NULL
;
2707 out_string(c
, "ERROR");
2712 /* append terminator and start the transfer */
2713 append_stats(NULL
, 0, NULL
, 0, c
);
2715 if (c
->stats
.buffer
== NULL
) {
2716 out_string(c
, "SERVER_ERROR out of memory writing stats");
2718 write_and_free(c
, c
->stats
.buffer
, c
->stats
.offset
);
2719 c
->stats
.buffer
= NULL
;
2723 #ifndef __INTEL_COMPILER
2724 #pragma GCC diagnostic ignored "-Wunused-but-set-parameter"
2726 /* ntokens is overwritten here... shrug.. */
2727 static inline void process_get_command(conn
*c
, token_t
*tokens
, size_t ntokens
, bool return_cas
) {
2732 token_t
*key_token
= &tokens
[KEY_TOKEN
];
2737 while(key_token
->length
!= 0) {
2739 key
= key_token
->value
;
2740 nkey
= key_token
->length
;
2742 if(nkey
> KEY_MAX_LENGTH
) {
2743 out_string(c
, "CLIENT_ERROR bad command line format");
2747 it
= item_get(key
, nkey
);
2748 if (settings
.detail_enabled
) {
2749 stats_prefix_record_get(key
, nkey
, NULL
!= it
);
2752 if (i
>= c
->isize
) {
2753 item
**new_list
= realloc(c
->ilist
, sizeof(item
*) * c
->isize
* 2);
2756 c
->ilist
= new_list
;
2764 * Construct the response. Each hit adds three elements to the
2765 * outgoing data list:
2768 * " " + flags + " " + data length + "\r\n" + data (with \r\n)
2773 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
2774 it
->nbytes
, ITEM_get_cas(it
));
2775 /* Goofy mid-flight realloc. */
2776 if (i
>= c
->suffixsize
) {
2777 char **new_suffix_list
= realloc(c
->suffixlist
,
2778 sizeof(char *) * c
->suffixsize
* 2);
2779 if (new_suffix_list
) {
2781 c
->suffixlist
= new_suffix_list
;
2788 suffix
= cache_alloc(c
->thread
->suffix_cache
);
2789 if (suffix
== NULL
) {
2790 out_string(c
, "SERVER_ERROR out of memory making CAS suffix");
2794 *(c
->suffixlist
+ i
) = suffix
;
2795 int suffix_len
= snprintf(suffix
, SUFFIX_SIZE
,
2797 (unsigned long long)ITEM_get_cas(it
));
2798 if (add_iov(c
, "VALUE ", 6) != 0 ||
2799 add_iov(c
, ITEM_key(it
), it
->nkey
) != 0 ||
2800 add_iov(c
, ITEM_suffix(it
), it
->nsuffix
- 2) != 0 ||
2801 add_iov(c
, suffix
, suffix_len
) != 0 ||
2802 add_iov(c
, ITEM_data(it
), it
->nbytes
) != 0)
2810 MEMCACHED_COMMAND_GET(c
->sfd
, ITEM_key(it
), it
->nkey
,
2811 it
->nbytes
, ITEM_get_cas(it
));
2812 if (add_iov(c
, "VALUE ", 6) != 0 ||
2813 add_iov(c
, ITEM_key(it
), it
->nkey
) != 0 ||
2814 add_iov(c
, ITEM_suffix(it
), it
->nsuffix
+ it
->nbytes
) != 0)
2822 if (settings
.verbose
> 1)
2823 fprintf(stderr
, ">%d sending key %s\n", c
->sfd
, ITEM_key(it
));
2825 /* item_get() has incremented it->refcount for us */
2826 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2827 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].get_hits
++;
2828 c
->thread
->stats
.get_cmds
++;
2829 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2831 *(c
->ilist
+ i
) = it
;
2835 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2836 c
->thread
->stats
.get_misses
++;
2837 c
->thread
->stats
.get_cmds
++;
2838 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
2839 MEMCACHED_COMMAND_GET(c
->sfd
, key
, nkey
, -1, 0);
2846 * If the command string hasn't been fully processed, get the next set
2849 if(key_token
->value
!= NULL
) {
2850 ntokens
= tokenize_command(key_token
->value
, tokens
, MAX_TOKENS
);
2854 } while(key_token
->value
!= NULL
);
2856 c
->icurr
= c
->ilist
;
2859 c
->suffixcurr
= c
->suffixlist
;
2863 if (settings
.verbose
> 1)
2864 fprintf(stderr
, ">%d END\n", c
->sfd
);
2867 If the loop was terminated because of out-of-memory, it is not
2868 reliable to add END\r\n to the buffer, because it might not end
2869 in \r\n. So we send SERVER_ERROR instead.
2871 if (key_token
->value
!= NULL
|| add_iov(c
, "END\r\n", 5) != 0
2872 || (IS_UDP(c
->transport
) && build_udp_headers(c
) != 0)) {
2873 out_string(c
, "SERVER_ERROR out of memory writing get response");
2876 conn_set_state(c
, conn_mwrite
);
2883 static void process_update_command(conn
*c
, token_t
*tokens
, const size_t ntokens
, int comm
, bool handle_cas
) {
2887 int32_t exptime_int
= 0;
2890 uint64_t req_cas_id
=0;
2895 set_noreply_maybe(c
, tokens
, ntokens
);
2897 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
2898 out_string(c
, "CLIENT_ERROR bad command line format");
2902 key
= tokens
[KEY_TOKEN
].value
;
2903 nkey
= tokens
[KEY_TOKEN
].length
;
2905 if (! (safe_strtoul(tokens
[2].value
, (uint32_t *)&flags
)
2906 && safe_strtol(tokens
[3].value
, &exptime_int
)
2907 && safe_strtol(tokens
[4].value
, (int32_t *)&vlen
))) {
2908 out_string(c
, "CLIENT_ERROR bad command line format");
2912 /* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
2913 exptime
= exptime_int
;
2915 /* Negative exptimes can underflow and end up immortal. realtime() will
2916 immediately expire values that are greater than REALTIME_MAXDELTA, but less
2917 than process_started, so lets aim for that. */
2919 exptime
= REALTIME_MAXDELTA
+ 1;
2921 // does cas value exist?
2923 if (!safe_strtoull(tokens
[5].value
, &req_cas_id
)) {
2924 out_string(c
, "CLIENT_ERROR bad command line format");
2930 if (vlen
< 0 || vlen
- 2 < 0) {
2931 out_string(c
, "CLIENT_ERROR bad command line format");
2935 if (settings
.detail_enabled
) {
2936 stats_prefix_record_set(key
, nkey
);
2939 it
= item_alloc(key
, nkey
, flags
, realtime(exptime
), vlen
);
2942 if (! item_size_ok(nkey
, flags
, vlen
))
2943 out_string(c
, "SERVER_ERROR object too large for cache");
2945 out_string(c
, "SERVER_ERROR out of memory storing object");
2946 /* swallow the data line */
2947 c
->write_and_go
= conn_swallow
;
2950 /* Avoid stale data persisting in cache because we failed alloc.
2951 * Unacceptable for SET. Anywhere else too? */
2952 if (comm
== NREAD_SET
) {
2953 it
= item_get(key
, nkey
);
2962 ITEM_set_cas(it
, req_cas_id
);
2965 c
->ritem
= ITEM_data(it
);
2966 c
->rlbytes
= it
->nbytes
;
2968 conn_set_state(c
, conn_nread
);
2971 static void process_touch_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
2974 int32_t exptime_int
= 0;
2979 set_noreply_maybe(c
, tokens
, ntokens
);
2981 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
2982 out_string(c
, "CLIENT_ERROR bad command line format");
2986 key
= tokens
[KEY_TOKEN
].value
;
2987 nkey
= tokens
[KEY_TOKEN
].length
;
2989 if (!safe_strtol(tokens
[2].value
, &exptime_int
)) {
2990 out_string(c
, "CLIENT_ERROR invalid exptime argument");
2994 it
= item_touch(key
, nkey
, realtime(exptime_int
));
2997 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
2998 c
->thread
->stats
.touch_cmds
++;
2999 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].touch_hits
++;
3000 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3002 out_string(c
, "TOUCHED");
3005 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3006 c
->thread
->stats
.touch_cmds
++;
3007 c
->thread
->stats
.touch_misses
++;
3008 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3010 out_string(c
, "NOT_FOUND");
3014 static void process_arithmetic_command(conn
*c
, token_t
*tokens
, const size_t ntokens
, const bool incr
) {
3015 char temp
[INCR_MAX_STORAGE_LEN
];
3022 set_noreply_maybe(c
, tokens
, ntokens
);
3024 if (tokens
[KEY_TOKEN
].length
> KEY_MAX_LENGTH
) {
3025 out_string(c
, "CLIENT_ERROR bad command line format");
3029 key
= tokens
[KEY_TOKEN
].value
;
3030 nkey
= tokens
[KEY_TOKEN
].length
;
3032 if (!safe_strtoull(tokens
[2].value
, &delta
)) {
3033 out_string(c
, "CLIENT_ERROR invalid numeric delta argument");
3037 switch(add_delta(c
, key
, nkey
, incr
, delta
, temp
, NULL
)) {
3039 out_string(c
, temp
);
3042 out_string(c
, "CLIENT_ERROR cannot increment or decrement non-numeric value");
3045 out_string(c
, "SERVER_ERROR out of memory");
3047 case DELTA_ITEM_NOT_FOUND
:
3048 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3050 c
->thread
->stats
.incr_misses
++;
3052 c
->thread
->stats
.decr_misses
++;
3054 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3056 out_string(c
, "NOT_FOUND");
3058 case DELTA_ITEM_CAS_MISMATCH
:
3059 break; /* Should never get here */
3064 * adds a delta value to a numeric item.
3066 * c connection requesting the operation
3068 * incr true to increment value, false to decrement
3069 * delta amount to adjust value by
3070 * buf buffer for response string
3072 * returns a response string to send back to the client.
3074 enum delta_result_type
do_add_delta(conn
*c
, const char *key
, const size_t nkey
,
3075 const bool incr
, const int64_t delta
,
3076 char *buf
, uint64_t *cas
,
3077 const uint32_t hv
) {
3083 it
= do_item_get(key
, nkey
, hv
);
3085 return DELTA_ITEM_NOT_FOUND
;
3088 if (cas
!= NULL
&& *cas
!= 0 && ITEM_get_cas(it
) != *cas
) {
3090 return DELTA_ITEM_CAS_MISMATCH
;
3093 ptr
= ITEM_data(it
);
3095 if (!safe_strtoull(ptr
, &value
)) {
3102 MEMCACHED_COMMAND_INCR(c
->sfd
, ITEM_key(it
), it
->nkey
, value
);
3109 MEMCACHED_COMMAND_DECR(c
->sfd
, ITEM_key(it
), it
->nkey
, value
);
3112 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3114 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].incr_hits
++;
3116 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].decr_hits
++;
3118 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3120 snprintf(buf
, INCR_MAX_STORAGE_LEN
, "%llu", (unsigned long long)value
);
3122 if (res
+ 2 > it
->nbytes
|| it
->refcount
!= 1) { /* need to realloc */
3124 new_it
= item_alloc(ITEM_key(it
), it
->nkey
, atoi(ITEM_suffix(it
) + 1), it
->exptime
, res
+ 2 );
3129 memcpy(ITEM_data(new_it
), buf
, res
);
3130 memcpy(ITEM_data(new_it
) + res
, "\r\n", 2);
3131 item_replace(it
, new_it
, hv
);
3132 // Overwrite the older item's CAS with our new CAS since we're
3133 // returning the CAS of the old item below.
3134 ITEM_set_cas(it
, (settings
.use_cas
) ? ITEM_get_cas(new_it
) : 0);
3135 do_item_remove(new_it
); /* release our reference */
3136 } else { /* replace in-place */
3137 /* When changing the value without replacing the item, we
3138 need to update the CAS on the existing item. */
3139 mutex_lock(&cache_lock
); /* FIXME */
3140 ITEM_set_cas(it
, (settings
.use_cas
) ? get_cas_id() : 0);
3141 pthread_mutex_unlock(&cache_lock
);
3143 memcpy(ITEM_data(it
), buf
, res
);
3144 memset(ITEM_data(it
) + res
, ' ', it
->nbytes
- res
- 2);
3149 *cas
= ITEM_get_cas(it
); /* swap the incoming CAS value */
3151 do_item_remove(it
); /* release our reference */
3155 static void process_delete_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3163 bool hold_is_zero
= strcmp(tokens
[KEY_TOKEN
+1].value
, "0") == 0;
3164 bool sets_noreply
= set_noreply_maybe(c
, tokens
, ntokens
);
3165 bool valid
= (ntokens
== 4 && (hold_is_zero
|| sets_noreply
))
3166 || (ntokens
== 5 && hold_is_zero
&& sets_noreply
);
3168 out_string(c
, "CLIENT_ERROR bad command line format. "
3169 "Usage: delete <key> [noreply]");
3175 key
= tokens
[KEY_TOKEN
].value
;
3176 nkey
= tokens
[KEY_TOKEN
].length
;
3178 if(nkey
> KEY_MAX_LENGTH
) {
3179 out_string(c
, "CLIENT_ERROR bad command line format");
3183 if (settings
.detail_enabled
) {
3184 stats_prefix_record_delete(key
, nkey
);
3187 it
= item_get(key
, nkey
);
3189 MEMCACHED_COMMAND_DELETE(c
->sfd
, ITEM_key(it
), it
->nkey
);
3191 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3192 c
->thread
->stats
.slab_stats
[it
->slabs_clsid
].delete_hits
++;
3193 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3196 item_remove(it
); /* release our reference */
3197 out_string(c
, "DELETED");
3199 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3200 c
->thread
->stats
.delete_misses
++;
3201 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3203 out_string(c
, "NOT_FOUND");
3207 static void process_verbosity_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3212 set_noreply_maybe(c
, tokens
, ntokens
);
3214 level
= strtoul(tokens
[1].value
, NULL
, 10);
3215 settings
.verbose
= level
> MAX_VERBOSITY_LEVEL
? MAX_VERBOSITY_LEVEL
: level
;
3216 out_string(c
, "OK");
3220 static void process_slabs_automove_command(conn
*c
, token_t
*tokens
, const size_t ntokens
) {
3225 set_noreply_maybe(c
, tokens
, ntokens
);
3227 level
= strtoul(tokens
[2].value
, NULL
, 10);
3229 settings
.slab_automove
= false;
3230 } else if (level
== 1) {
3231 settings
.slab_automove
= true;
3233 out_string(c
, "ERROR");
3236 out_string(c
, "OK");
3240 static void process_command(conn
*c
, char *command
) {
3242 token_t tokens
[MAX_TOKENS
];
3248 MEMCACHED_PROCESS_COMMAND_START(c
->sfd
, c
->rcurr
, c
->rbytes
);
3250 if (settings
.verbose
> 1)
3251 fprintf(stderr
, "<%d %s\n", c
->sfd
, command
);
3254 * for commands set/add/replace, we build an item and read the data
3255 * directly into it, then continue in nread_complete().
3261 if (add_msghdr(c
) != 0) {
3262 out_string(c
, "SERVER_ERROR out of memory preparing response");
3266 ntokens
= tokenize_command(command
, tokens
, MAX_TOKENS
);
3268 ((strcmp(tokens
[COMMAND_TOKEN
].value
, "get") == 0) ||
3269 (strcmp(tokens
[COMMAND_TOKEN
].value
, "bget") == 0))) {
3271 process_get_command(c
, tokens
, ntokens
, false);
3273 } else if ((ntokens
== 6 || ntokens
== 7) &&
3274 ((strcmp(tokens
[COMMAND_TOKEN
].value
, "add") == 0 && (comm
= NREAD_ADD
)) ||
3275 (strcmp(tokens
[COMMAND_TOKEN
].value
, "set") == 0 && (comm
= NREAD_SET
)) ||
3276 (strcmp(tokens
[COMMAND_TOKEN
].value
, "replace") == 0 && (comm
= NREAD_REPLACE
)) ||
3277 (strcmp(tokens
[COMMAND_TOKEN
].value
, "prepend") == 0 && (comm
= NREAD_PREPEND
)) ||
3278 (strcmp(tokens
[COMMAND_TOKEN
].value
, "append") == 0 && (comm
= NREAD_APPEND
)) )) {
3280 process_update_command(c
, tokens
, ntokens
, comm
, false);
3282 } else if ((ntokens
== 7 || ntokens
== 8) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "cas") == 0 && (comm
= NREAD_CAS
))) {
3284 process_update_command(c
, tokens
, ntokens
, comm
, true);
3286 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "incr") == 0)) {
3288 process_arithmetic_command(c
, tokens
, ntokens
, 1);
3290 } else if (ntokens
>= 3 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "gets") == 0)) {
3292 process_get_command(c
, tokens
, ntokens
, true);
3294 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "decr") == 0)) {
3296 process_arithmetic_command(c
, tokens
, ntokens
, 0);
3298 } else if (ntokens
>= 3 && ntokens
<= 5 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "delete") == 0)) {
3300 process_delete_command(c
, tokens
, ntokens
);
3302 } else if ((ntokens
== 4 || ntokens
== 5) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "touch") == 0)) {
3304 process_touch_command(c
, tokens
, ntokens
);
3306 } else if (ntokens
>= 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "stats") == 0)) {
3308 process_stat(c
, tokens
, ntokens
);
3310 } else if (ntokens
>= 2 && ntokens
<= 4 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "flush_all") == 0)) {
3313 set_noreply_maybe(c
, tokens
, ntokens
);
3315 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3316 c
->thread
->stats
.flush_cmds
++;
3317 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3319 if(ntokens
== (c
->noreply
? 3 : 2)) {
3320 settings
.oldest_live
= current_time
- 1;
3321 item_flush_expired();
3322 out_string(c
, "OK");
3326 exptime
= strtol(tokens
[1].value
, NULL
, 10);
3327 if(errno
== ERANGE
) {
3328 out_string(c
, "CLIENT_ERROR bad command line format");
3333 If exptime is zero realtime() would return zero too, and
3334 realtime(exptime) - 1 would overflow to the max unsigned
3335 value. So we process exptime == 0 the same way we do when
3336 no delay is given at all.
3339 settings
.oldest_live
= realtime(exptime
) - 1;
3340 else /* exptime == 0 */
3341 settings
.oldest_live
= current_time
- 1;
3342 item_flush_expired();
3343 out_string(c
, "OK");
3346 } else if (ntokens
== 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "version") == 0)) {
3348 out_string(c
, "VERSION " RVERSION
);
3350 } else if (ntokens
== 2 && (strcmp(tokens
[COMMAND_TOKEN
].value
, "quit") == 0)) {
3352 conn_set_state(c
, conn_closing
);
3354 } else if (ntokens
> 1 && strcmp(tokens
[COMMAND_TOKEN
].value
, "slabs") == 0) {
3355 if (ntokens
== 5 && strcmp(tokens
[COMMAND_TOKEN
+ 1].value
, "reassign") == 0) {
3358 if (settings
.slab_reassign
== false) {
3359 out_string(c
, "CLIENT_ERROR slab reassignment disabled");
3363 src
= strtol(tokens
[2].value
, NULL
, 10);
3364 dst
= strtol(tokens
[3].value
, NULL
, 10);
3366 if (errno
== ERANGE
) {
3367 out_string(c
, "CLIENT_ERROR bad command line format");
3371 rv
= slabs_reassign(src
, dst
);
3374 out_string(c
, "OK");
3376 case REASSIGN_RUNNING
:
3377 out_string(c
, "BUSY currently processing reassign request");
3379 case REASSIGN_BADCLASS
:
3380 out_string(c
, "BADCLASS invalid src or dst class id");
3382 case REASSIGN_NOSPARE
:
3383 out_string(c
, "NOSPARE source class has no spare pages");
3385 case REASSIGN_DEST_NOT_FULL
:
3386 out_string(c
, "NOTFULL dest class has spare memory");
3388 case REASSIGN_SRC_NOT_SAFE
:
3389 out_string(c
, "UNSAFE src class is in an unsafe state");
3391 case REASSIGN_SRC_DST_SAME
:
3392 out_string(c
, "SAME src and dst class are identical");
3396 } else if (ntokens
== 4 &&
3397 (strcmp(tokens
[COMMAND_TOKEN
+ 1].value
, "automove") == 0)) {
3398 process_slabs_automove_command(c
, tokens
, ntokens
);
3400 out_string(c
, "ERROR");
3402 } else if ((ntokens
== 3 || ntokens
== 4) && (strcmp(tokens
[COMMAND_TOKEN
].value
, "verbosity") == 0)) {
3403 process_verbosity_command(c
, tokens
, ntokens
);
3405 out_string(c
, "ERROR");
3411 * if we have a complete line in the buffer, process it.
3413 static int try_read_command(conn
*c
) {
3415 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
3416 assert(c
->rbytes
> 0);
3418 if (c
->protocol
== negotiating_prot
|| c
->transport
== udp_transport
) {
3419 if ((unsigned char)c
->rbuf
[0] == (unsigned char)PROTOCOL_BINARY_REQ
) {
3420 c
->protocol
= binary_prot
;
3422 c
->protocol
= ascii_prot
;
3425 if (settings
.verbose
> 1) {
3426 fprintf(stderr
, "%d: Client using the %s protocol\n", c
->sfd
,
3427 prot_text(c
->protocol
));
3431 if (c
->protocol
== binary_prot
) {
3432 /* Do we have the complete packet header? */
3433 if (c
->rbytes
< sizeof(c
->binary_header
)) {
3434 /* need more data! */
3438 if (((long)(c
->rcurr
)) % 8 != 0) {
3439 /* must realign input buffer */
3440 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
3442 if (settings
.verbose
> 1) {
3443 fprintf(stderr
, "%d: Realign input buffer\n", c
->sfd
);
3447 protocol_binary_request_header
* req
;
3448 req
= (protocol_binary_request_header
*)c
->rcurr
;
3450 if (settings
.verbose
> 1) {
3451 /* Dump the packet before we convert it to host order */
3453 fprintf(stderr
, "<%d Read binary protocol data:", c
->sfd
);
3454 for (ii
= 0; ii
< sizeof(req
->bytes
); ++ii
) {
3456 fprintf(stderr
, "\n<%d ", c
->sfd
);
3458 fprintf(stderr
, " 0x%02x", req
->bytes
[ii
]);
3460 fprintf(stderr
, "\n");
3463 c
->binary_header
= *req
;
3464 c
->binary_header
.request
.keylen
= ntohs(req
->request
.keylen
);
3465 c
->binary_header
.request
.bodylen
= ntohl(req
->request
.bodylen
);
3466 c
->binary_header
.request
.cas
= ntohll(req
->request
.cas
);
3468 if (c
->binary_header
.request
.magic
!= PROTOCOL_BINARY_REQ
) {
3469 if (settings
.verbose
) {
3470 fprintf(stderr
, "Invalid magic: %x\n",
3471 c
->binary_header
.request
.magic
);
3473 conn_set_state(c
, conn_closing
);
3480 if (add_msghdr(c
) != 0) {
3481 out_string(c
, "SERVER_ERROR out of memory");
3485 c
->cmd
= c
->binary_header
.request
.opcode
;
3486 c
->keylen
= c
->binary_header
.request
.keylen
;
3487 c
->opaque
= c
->binary_header
.request
.opaque
;
3488 /* clear the returned cas value */
3491 dispatch_bin_command(c
);
3493 c
->rbytes
-= sizeof(c
->binary_header
);
3494 c
->rcurr
+= sizeof(c
->binary_header
);
3502 el
= memchr(c
->rcurr
, '\n', c
->rbytes
);
3504 if (c
->rbytes
> 1024) {
3506 * We didn't have a '\n' in the first k. This _has_ to be a
3507 * large multiget, if not we should just nuke the connection.
3509 char *ptr
= c
->rcurr
;
3510 while (*ptr
== ' ') { /* ignore leading whitespaces */
3514 if (ptr
- c
->rcurr
> 100 ||
3515 (strncmp(ptr
, "get ", 4) && strncmp(ptr
, "gets ", 5))) {
3517 conn_set_state(c
, conn_closing
);
3525 if ((el
- c
->rcurr
) > 1 && *(el
- 1) == '\r') {
3530 assert(cont
<= (c
->rcurr
+ c
->rbytes
));
3532 process_command(c
, c
->rcurr
);
3534 c
->rbytes
-= (cont
- c
->rcurr
);
3537 assert(c
->rcurr
<= (c
->rbuf
+ c
->rsize
));
3544 * read a UDP request.
3546 static enum try_read_result
try_read_udp(conn
*c
) {
3551 c
->request_addr_size
= sizeof(c
->request_addr
);
3552 res
= recvfrom(c
->sfd
, c
->rbuf
, c
->rsize
,
3553 0, &c
->request_addr
, &c
->request_addr_size
);
3555 unsigned char *buf
= (unsigned char *)c
->rbuf
;
3556 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3557 c
->thread
->stats
.bytes_read
+= res
;
3558 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3560 /* Beginning of UDP packet is the request ID; save it. */
3561 c
->request_id
= buf
[0] * 256 + buf
[1];
3563 /* If this is a multi-packet request, drop it. */
3564 if (buf
[4] != 0 || buf
[5] != 1) {
3565 out_string(c
, "SERVER_ERROR multi-packet request not supported");
3566 return READ_NO_DATA_RECEIVED
;
3569 /* Don't care about any of the rest of the header. */
3571 memmove(c
->rbuf
, c
->rbuf
+ 8, res
);
3575 return READ_DATA_RECEIVED
;
3577 return READ_NO_DATA_RECEIVED
;
3581 * read from network as much as we can, handle buffer overflow and connection
3583 * before reading, move the remaining incomplete fragment of a command
3584 * (if any) to the beginning of the buffer.
3586 * To protect us from someone flooding a connection with bogus data causing
3587 * the connection to eat up all available memory, break out and start looking
3588 * at the data I've got after a number of reallocs...
3590 * @return enum try_read_result
3592 static enum try_read_result
try_read_network(conn
*c
) {
3593 enum try_read_result gotdata
= READ_NO_DATA_RECEIVED
;
3598 if (c
->rcurr
!= c
->rbuf
) {
3599 if (c
->rbytes
!= 0) /* otherwise there's nothing to copy */
3600 memmove(c
->rbuf
, c
->rcurr
, c
->rbytes
);
3605 if (c
->rbytes
>= c
->rsize
) {
3606 if (num_allocs
== 4) {
3610 char *new_rbuf
= realloc(c
->rbuf
, c
->rsize
* 2);
3612 if (settings
.verbose
> 0)
3613 fprintf(stderr
, "Couldn't realloc input buffer\n");
3614 c
->rbytes
= 0; /* ignore what we read */
3615 out_string(c
, "SERVER_ERROR out of memory reading request");
3616 c
->write_and_go
= conn_closing
;
3617 return READ_MEMORY_ERROR
;
3619 c
->rcurr
= c
->rbuf
= new_rbuf
;
3623 int avail
= c
->rsize
- c
->rbytes
;
3624 res
= read(c
->sfd
, c
->rbuf
+ c
->rbytes
, avail
);
3626 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3627 c
->thread
->stats
.bytes_read
+= res
;
3628 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3629 gotdata
= READ_DATA_RECEIVED
;
3641 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
3650 static bool update_event(conn
*c
, const int new_flags
) {
3653 struct event_base
*base
= c
->event
.ev_base
;
3654 if (c
->ev_flags
== new_flags
)
3656 if (event_del(&c
->event
) == -1) return false;
3657 event_set(&c
->event
, c
->sfd
, new_flags
, event_handler
, (void *)c
);
3658 event_base_set(base
, &c
->event
);
3659 c
->ev_flags
= new_flags
;
3660 if (event_add(&c
->event
, 0) == -1) return false;
3665 * Sets whether we are listening for new connections or not.
3667 void do_accept_new_conns(const bool do_accept
) {
3670 for (next
= listen_conn
; next
; next
= next
->next
) {
3672 update_event(next
, EV_READ
| EV_PERSIST
);
3673 if (listen(next
->sfd
, settings
.backlog
) != 0) {
3678 update_event(next
, 0);
3679 if (listen(next
->sfd
, 0) != 0) {
3687 stats
.accepting_conns
= true;
3691 stats
.accepting_conns
= false;
3692 stats
.listen_disabled_num
++;
3694 allow_new_conns
= false;
3695 maxconns_handler(-42, 0, 0);
3700 * Transmit the next chunk of data from our list of msgbuf structures.
3703 * TRANSMIT_COMPLETE All done writing.
3704 * TRANSMIT_INCOMPLETE More data remaining to write.
3705 * TRANSMIT_SOFT_ERROR Can't write any more right now.
3706 * TRANSMIT_HARD_ERROR Can't write (c->state is set to conn_closing)
3708 static enum transmit_result
transmit(conn
*c
) {
3711 if (c
->msgcurr
< c
->msgused
&&
3712 c
->msglist
[c
->msgcurr
].msg_iovlen
== 0) {
3713 /* Finished writing the current msg; advance to the next. */
3716 if (c
->msgcurr
< c
->msgused
) {
3718 struct msghdr
*m
= &c
->msglist
[c
->msgcurr
];
3720 res
= sendmsg(c
->sfd
, m
, 0);
3722 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3723 c
->thread
->stats
.bytes_written
+= res
;
3724 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3726 /* We've written some of the data. Remove the completed
3727 iovec entries from the list of pending writes. */
3728 while (m
->msg_iovlen
> 0 && res
>= m
->msg_iov
->iov_len
) {
3729 res
-= m
->msg_iov
->iov_len
;
3734 /* Might have written just part of the last iovec entry;
3735 adjust it so the next write will do the rest. */
3737 m
->msg_iov
->iov_base
= (caddr_t
)m
->msg_iov
->iov_base
+ res
;
3738 m
->msg_iov
->iov_len
-= res
;
3740 return TRANSMIT_INCOMPLETE
;
3742 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3743 if (!update_event(c
, EV_WRITE
| EV_PERSIST
)) {
3744 if (settings
.verbose
> 0)
3745 fprintf(stderr
, "Couldn't update event\n");
3746 conn_set_state(c
, conn_closing
);
3747 return TRANSMIT_HARD_ERROR
;
3749 return TRANSMIT_SOFT_ERROR
;
3751 /* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
3752 we have a real error, on which we close the connection */
3753 if (settings
.verbose
> 0)
3754 perror("Failed to write, and not due to blocking");
3756 if (IS_UDP(c
->transport
))
3757 conn_set_state(c
, conn_read
);
3759 conn_set_state(c
, conn_closing
);
3760 return TRANSMIT_HARD_ERROR
;
3762 return TRANSMIT_COMPLETE
;
3766 static void drive_machine(conn
*c
) {
3770 struct sockaddr_storage addr
;
3771 int nreqs
= settings
.reqs_per_event
;
3780 case conn_listening
:
3781 addrlen
= sizeof(addr
);
3782 if ((sfd
= accept(c
->sfd
, (struct sockaddr
*)&addr
, &addrlen
)) == -1) {
3783 if (errno
== EAGAIN
|| errno
== EWOULDBLOCK
) {
3784 /* these are transient, so don't log anything */
3786 } else if (errno
== EMFILE
) {
3787 if (settings
.verbose
> 0)
3788 fprintf(stderr
, "Too many open connections\n");
3789 accept_new_conns(false);
3797 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
3798 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
3799 perror("setting O_NONBLOCK");
3804 if (settings
.maxconns_fast
&&
3805 stats
.curr_conns
+ stats
.reserved_fds
>= settings
.maxconns
- 1) {
3806 str
= "ERROR Too many open connections\r\n";
3807 res
= write(sfd
, str
, strlen(str
));
3810 stats
.rejected_conns
++;
3813 dispatch_conn_new(sfd
, conn_new_cmd
, EV_READ
| EV_PERSIST
,
3814 DATA_BUFFER_SIZE
, tcp_transport
);
3821 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3822 if (settings
.verbose
> 0)
3823 fprintf(stderr
, "Couldn't update event\n");
3824 conn_set_state(c
, conn_closing
);
3828 conn_set_state(c
, conn_read
);
3833 res
= IS_UDP(c
->transport
) ? try_read_udp(c
) : try_read_network(c
);
3836 case READ_NO_DATA_RECEIVED
:
3837 conn_set_state(c
, conn_waiting
);
3839 case READ_DATA_RECEIVED
:
3840 conn_set_state(c
, conn_parse_cmd
);
3843 conn_set_state(c
, conn_closing
);
3845 case READ_MEMORY_ERROR
: /* Failed to allocate more memory */
3846 /* State already set by try_read_network */
3851 case conn_parse_cmd
:
3852 if (try_read_command(c
) == 0) {
3853 /* wee need more data! */
3854 conn_set_state(c
, conn_waiting
);
3860 /* Only process nreqs at a time to avoid starving other
3865 reset_cmd_handler(c
);
3867 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3868 c
->thread
->stats
.conn_yields
++;
3869 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3870 if (c
->rbytes
> 0) {
3871 /* We have already read in data into the input buffer,
3872 so libevent will most likely not signal read events
3873 on the socket (unless more data is available. As a
3874 hack we should just put in a request to write data,
3875 because that should be possible ;-)
3877 if (!update_event(c
, EV_WRITE
| EV_PERSIST
)) {
3878 if (settings
.verbose
> 0)
3879 fprintf(stderr
, "Couldn't update event\n");
3880 conn_set_state(c
, conn_closing
);
3888 if (c
->rlbytes
== 0) {
3892 /* first check if we have leftovers in the conn_read buffer */
3893 if (c
->rbytes
> 0) {
3894 int tocopy
= c
->rbytes
> c
->rlbytes
? c
->rlbytes
: c
->rbytes
;
3895 if (c
->ritem
!= c
->rcurr
) {
3896 memmove(c
->ritem
, c
->rcurr
, tocopy
);
3899 c
->rlbytes
-= tocopy
;
3901 c
->rbytes
-= tocopy
;
3902 if (c
->rlbytes
== 0) {
3907 /* now try reading from the socket */
3908 res
= read(c
->sfd
, c
->ritem
, c
->rlbytes
);
3910 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3911 c
->thread
->stats
.bytes_read
+= res
;
3912 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3913 if (c
->rcurr
== c
->ritem
) {
3920 if (res
== 0) { /* end of stream */
3921 conn_set_state(c
, conn_closing
);
3924 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3925 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3926 if (settings
.verbose
> 0)
3927 fprintf(stderr
, "Couldn't update event\n");
3928 conn_set_state(c
, conn_closing
);
3934 /* otherwise we have a real error, on which we close the connection */
3935 if (settings
.verbose
> 0) {
3936 fprintf(stderr
, "Failed to read, and not due to blocking:\n"
3938 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
3939 errno
, strerror(errno
),
3940 (long)c
->rcurr
, (long)c
->ritem
, (long)c
->rbuf
,
3941 (int)c
->rlbytes
, (int)c
->rsize
);
3943 conn_set_state(c
, conn_closing
);
3947 /* we are reading sbytes and throwing them away */
3948 if (c
->sbytes
== 0) {
3949 conn_set_state(c
, conn_new_cmd
);
3953 /* first check if we have leftovers in the conn_read buffer */
3954 if (c
->rbytes
> 0) {
3955 int tocopy
= c
->rbytes
> c
->sbytes
? c
->sbytes
: c
->rbytes
;
3956 c
->sbytes
-= tocopy
;
3958 c
->rbytes
-= tocopy
;
3962 /* now try reading from the socket */
3963 res
= read(c
->sfd
, c
->rbuf
, c
->rsize
> c
->sbytes
? c
->sbytes
: c
->rsize
);
3965 pthread_mutex_lock(&c
->thread
->stats
.mutex
);
3966 c
->thread
->stats
.bytes_read
+= res
;
3967 pthread_mutex_unlock(&c
->thread
->stats
.mutex
);
3971 if (res
== 0) { /* end of stream */
3972 conn_set_state(c
, conn_closing
);
3975 if (res
== -1 && (errno
== EAGAIN
|| errno
== EWOULDBLOCK
)) {
3976 if (!update_event(c
, EV_READ
| EV_PERSIST
)) {
3977 if (settings
.verbose
> 0)
3978 fprintf(stderr
, "Couldn't update event\n");
3979 conn_set_state(c
, conn_closing
);
3985 /* otherwise we have a real error, on which we close the connection */
3986 if (settings
.verbose
> 0)
3987 fprintf(stderr
, "Failed to read, and not due to blocking\n");
3988 conn_set_state(c
, conn_closing
);
3993 * We want to write out a simple response. If we haven't already,
3994 * assemble it into a msgbuf list (this will be a single-entry
3995 * list for TCP or a two-entry list for UDP).
3997 if (c
->iovused
== 0 || (IS_UDP(c
->transport
) && c
->iovused
== 1)) {
3998 if (add_iov(c
, c
->wcurr
, c
->wbytes
) != 0) {
3999 if (settings
.verbose
> 0)
4000 fprintf(stderr
, "Couldn't build response\n");
4001 conn_set_state(c
, conn_closing
);
4006 /* fall through... */
4009 if (IS_UDP(c
->transport
) && c
->msgcurr
== 0 && build_udp_headers(c
) != 0) {
4010 if (settings
.verbose
> 0)
4011 fprintf(stderr
, "Failed to build UDP headers\n");
4012 conn_set_state(c
, conn_closing
);
4015 switch (transmit(c
)) {
4016 case TRANSMIT_COMPLETE
:
4017 if (c
->state
== conn_mwrite
) {
4018 while (c
->ileft
> 0) {
4019 item
*it
= *(c
->icurr
);
4020 assert((it
->it_flags
& ITEM_SLABBED
) == 0);
4025 while (c
->suffixleft
> 0) {
4026 char *suffix
= *(c
->suffixcurr
);
4027 cache_free(c
->thread
->suffix_cache
, suffix
);
4031 /* XXX: I don't know why this wasn't the general case */
4032 if(c
->protocol
== binary_prot
) {
4033 conn_set_state(c
, c
->write_and_go
);
4035 conn_set_state(c
, conn_new_cmd
);
4037 } else if (c
->state
== conn_write
) {
4038 if (c
->write_and_free
) {
4039 free(c
->write_and_free
);
4040 c
->write_and_free
= 0;
4042 conn_set_state(c
, c
->write_and_go
);
4044 if (settings
.verbose
> 0)
4045 fprintf(stderr
, "Unexpected state %d\n", c
->state
);
4046 conn_set_state(c
, conn_closing
);
4050 case TRANSMIT_INCOMPLETE
:
4051 case TRANSMIT_HARD_ERROR
:
4052 break; /* Continue in state machine. */
4054 case TRANSMIT_SOFT_ERROR
:
4061 if (IS_UDP(c
->transport
))
4068 case conn_max_state
:
4077 void event_handler(const int fd
, const short which
, void *arg
) {
4087 if (settings
.verbose
> 0)
4088 fprintf(stderr
, "Catastrophic: event fd doesn't match conn fd!\n");
4095 /* wait for next event */
4099 static int new_socket(struct addrinfo
*ai
) {
4103 if ((sfd
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
)) == -1) {
4107 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
4108 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
4109 perror("setting O_NONBLOCK");
4118 * Sets a socket's send buffer size to the maximum allowed by the system.
4120 static void maximize_sndbuf(const int sfd
) {
4121 socklen_t intsize
= sizeof(int);
4126 /* Start with the default size. */
4127 if (getsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, &old_size
, &intsize
) != 0) {
4128 if (settings
.verbose
> 0)
4129 perror("getsockopt(SO_SNDBUF)");
4133 /* Binary-search for the real maximum. */
4135 max
= MAX_SENDBUF_SIZE
;
4137 while (min
<= max
) {
4138 avg
= ((unsigned int)(min
+ max
)) / 2;
4139 if (setsockopt(sfd
, SOL_SOCKET
, SO_SNDBUF
, (void *)&avg
, intsize
) == 0) {
4147 if (settings
.verbose
> 1)
4148 fprintf(stderr
, "<%d send buffer was %d, now %d\n", sfd
, old_size
, last_good
);
4152 * Create a socket and bind it to a specific port number
4153 * @param interface the interface to bind to
4154 * @param port the port number to bind to
4155 * @param transport the transport protocol (TCP / UDP)
4156 * @param portnumber_file A filepointer to write the port numbers to
4157 * when they are successfully added to the list of ports we
4160 static int server_socket(const char *interface
,
4162 enum network_transport transport
,
4163 FILE *portnumber_file
) {
4165 struct linger ling
= {0, 0};
4166 struct addrinfo
*ai
;
4167 struct addrinfo
*next
;
4168 struct addrinfo hints
= { .ai_flags
= AI_PASSIVE
,
4169 .ai_family
= AF_UNSPEC
};
4170 char port_buf
[NI_MAXSERV
];
4175 hints
.ai_socktype
= IS_UDP(transport
) ? SOCK_DGRAM
: SOCK_STREAM
;
4180 snprintf(port_buf
, sizeof(port_buf
), "%d", port
);
4181 error
= getaddrinfo(interface
, port_buf
, &hints
, &ai
);
4183 if (error
!= EAI_SYSTEM
)
4184 fprintf(stderr
, "getaddrinfo(): %s\n", gai_strerror(error
));
4186 perror("getaddrinfo()");
4190 for (next
= ai
; next
; next
= next
->ai_next
) {
4191 conn
*listen_conn_add
;
4192 if ((sfd
= new_socket(next
)) == -1) {
4193 /* getaddrinfo can return "junk" addresses,
4194 * we make sure at least one works before erroring.
4196 if (errno
== EMFILE
) {
4197 /* ...unless we're out of fds */
4198 perror("server_socket");
4205 if (next
->ai_family
== AF_INET6
) {
4206 error
= setsockopt(sfd
, IPPROTO_IPV6
, IPV6_V6ONLY
, (char *) &flags
, sizeof(flags
));
4208 perror("setsockopt");
4215 error
= setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, &flags
, sizeof(flags
));
4218 perror("setsockopt(SO_REUSEADDR)");
4221 if (IS_UDP(transport
)) {
4222 maximize_sndbuf(sfd
);
4224 error
= setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
));
4226 perror("setsockopt");
4228 error
= setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
4230 perror("setsockopt");
4232 error
= setsockopt(sfd
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
, sizeof(flags
));
4234 perror("setsockopt");
4237 if (bind(sfd
, next
->ai_addr
, next
->ai_addrlen
) == -1) {
4238 if (errno
!= EADDRINUSE
) {
4248 if (!IS_UDP(transport
) && listen(sfd
, settings
.backlog
) == -1) {
4254 if (portnumber_file
!= NULL
&&
4255 (next
->ai_addr
->sa_family
== AF_INET
||
4256 next
->ai_addr
->sa_family
== AF_INET6
)) {
4258 struct sockaddr_in in
;
4259 struct sockaddr_in6 in6
;
4261 socklen_t len
= sizeof(my_sockaddr
);
4262 if (getsockname(sfd
, (struct sockaddr
*)&my_sockaddr
, &len
)==0) {
4263 if (next
->ai_addr
->sa_family
== AF_INET
) {
4264 fprintf(portnumber_file
, "%s INET: %u\n",
4265 IS_UDP(transport
) ? "UDP" : "TCP",
4266 ntohs(my_sockaddr
.in
.sin_port
));
4268 fprintf(portnumber_file
, "%s INET6: %u\n",
4269 IS_UDP(transport
) ? "UDP" : "TCP",
4270 ntohs(my_sockaddr
.in6
.sin6_port
));
4276 if (IS_UDP(transport
)) {
4279 for (c
= 0; c
< settings
.num_threads_per_udp
; c
++) {
4280 /* this is guaranteed to hit all threads because we round-robin */
4281 dispatch_conn_new(sfd
, conn_read
, EV_READ
| EV_PERSIST
,
4282 UDP_READ_BUFFER_SIZE
, transport
);
4285 if (!(listen_conn_add
= conn_new(sfd
, conn_listening
,
4286 EV_READ
| EV_PERSIST
, 1,
4287 transport
, main_base
))) {
4288 fprintf(stderr
, "failed to create listening connection\n");
4291 listen_conn_add
->next
= listen_conn
;
4292 listen_conn
= listen_conn_add
;
4298 /* Return zero iff we detected no errors in starting up connections */
4299 return success
== 0;
4302 static int server_sockets(int port
, enum network_transport transport
,
4303 FILE *portnumber_file
) {
4304 if (settings
.inter
== NULL
) {
4305 return server_socket(settings
.inter
, port
, transport
, portnumber_file
);
4307 // tokenize them and bind to each one of them..
4310 char *list
= strdup(settings
.inter
);
4313 fprintf(stderr
, "Failed to allocate memory for parsing server interface string\n");
4316 for (char *p
= strtok_r(list
, ";,", &b
);
4318 p
= strtok_r(NULL
, ";,", &b
)) {
4319 int the_port
= port
;
4320 char *s
= strchr(p
, ':');
4324 if (!safe_strtol(s
, &the_port
)) {
4325 fprintf(stderr
, "Invalid port number: \"%s\"", s
);
4329 if (strcmp(p
, "*") == 0) {
4332 ret
|= server_socket(p
, the_port
, transport
, portnumber_file
);
4339 static int new_socket_unix(void) {
4343 if ((sfd
= socket(AF_UNIX
, SOCK_STREAM
, 0)) == -1) {
4348 if ((flags
= fcntl(sfd
, F_GETFL
, 0)) < 0 ||
4349 fcntl(sfd
, F_SETFL
, flags
| O_NONBLOCK
) < 0) {
4350 perror("setting O_NONBLOCK");
4357 static int server_socket_unix(const char *path
, int access_mask
) {
4359 struct linger ling
= {0, 0};
4360 struct sockaddr_un addr
;
4369 if ((sfd
= new_socket_unix()) == -1) {
4374 * Clean up a previous socket file if we left it around
4376 if (lstat(path
, &tstat
) == 0) {
4377 if (S_ISSOCK(tstat
.st_mode
))
4381 setsockopt(sfd
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
));
4382 setsockopt(sfd
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
));
4383 setsockopt(sfd
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
));
4386 * the memset call clears nonstandard fields in some impementations
4387 * that otherwise mess things up.
4389 memset(&addr
, 0, sizeof(addr
));
4391 addr
.sun_family
= AF_UNIX
;
4392 strncpy(addr
.sun_path
, path
, sizeof(addr
.sun_path
) - 1);
4393 assert(strcmp(addr
.sun_path
, path
) == 0);
4394 old_umask
= umask( ~(access_mask
&0777));
4395 if (bind(sfd
, (struct sockaddr
*)&addr
, sizeof(addr
)) == -1) {
4402 if (listen(sfd
, settings
.backlog
) == -1) {
4407 if (!(listen_conn
= conn_new(sfd
, conn_listening
,
4408 EV_READ
| EV_PERSIST
, 1,
4409 local_transport
, main_base
))) {
4410 fprintf(stderr
, "failed to create listening connection\n");
4418 * We keep the current time of day in a global variable that's updated by a
4419 * timer event. This saves us a bunch of time() system calls (we really only
4420 * need to get the time once a second, whereas there can be tens of thousands
4421 * of requests a second) and allows us to use server-start-relative timestamps
4422 * rather than absolute UNIX timestamps, a space savings on systems where
4423 * sizeof(time_t) > sizeof(unsigned int).
4425 volatile rel_time_t current_time
;
4426 static struct event clockevent
;
4428 /* libevent uses a monotonic clock when available for event scheduling. Aside
4429 * from jitter, simply ticking our internal timer here is accurate enough.
4430 * Note that users who are setting explicit dates for expiration times *must*
4431 * ensure their clocks are correct before starting memcached. */
4432 static void clock_handler(const int fd
, const short which
, void *arg
) {
4433 struct timeval t
= {.tv_sec
= 1, .tv_usec
= 0};
4434 static bool initialized
= false;
4435 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4436 static bool monotonic
= false;
4437 static time_t monotonic_start
;
4441 /* only delete the event if it's actually there. */
4442 evtimer_del(&clockevent
);
4445 /* process_started is initialized to time() - 2. We initialize to 1 so
4446 * flush_all won't underflow during tests. */
4447 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4449 if (clock_gettime(CLOCK_MONOTONIC
, &ts
) == 0) {
4451 monotonic_start
= ts
.tv_sec
- 2;
4456 evtimer_set(&clockevent
, clock_handler
, 0);
4457 event_base_set(main_base
, &clockevent
);
4458 evtimer_add(&clockevent
, &t
);
4460 #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC)
4463 if (clock_gettime(CLOCK_MONOTONIC
, &ts
) == -1)
4465 current_time
= (rel_time_t
) (ts
.tv_sec
- monotonic_start
);
4471 gettimeofday(&tv
, NULL
);
4472 current_time
= (rel_time_t
) (tv
.tv_sec
- process_started
);
4476 static void usage(void) {
4477 printf(RPACKAGE
" " RVERSION
"\n");
4478 printf("-p <num> TCP port number to listen on (default: 11211)\n"
4479 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
4480 "-s <file> UNIX socket path to listen on (disables network support)\n"
4481 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
4482 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
4483 " <addr> may be specified as host:port. If you don't specify\n"
4484 " a port number, the value you specified with -p or -U is\n"
4485 " used. You may specify multiple addresses separated by comma\n"
4486 " or by using -l multiple times\n"
4488 "-d run as a daemon\n"
4489 "-r maximize core file limit\n"
4490 "-u <username> assume identity of <username> (only when run as root)\n"
4491 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
4492 "-M return error on memory exhausted (rather than removing items)\n"
4493 "-c <num> max simultaneous connections (default: 1024)\n"
4494 "-k lock down all paged memory. Note that there is a\n"
4495 " limit on how much memory you may lock. Trying to\n"
4496 " allocate more than that would fail, so be sure you\n"
4497 " set the limit correctly for the user you started\n"
4498 " the daemon with (not for -u <username> user;\n"
4499 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
4500 "-v verbose (print errors/warnings while in event loop)\n"
4501 "-vv very verbose (also print client commands/reponses)\n"
4502 "-vvv extremely verbose (also print internal state transitions)\n"
4503 "-h print this help and exit\n"
4504 "-i print memcached and libevent license\n"
4505 "-P <file> save PID in <file>, only used with -d option\n"
4506 "-f <factor> chunk size growth factor (default: 1.25)\n"
4507 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
4508 printf("-L Try to use large memory pages (if available). Increasing\n"
4509 " the memory page size could reduce the number of TLB misses\n"
4510 " and improve the performance. In order to get large pages\n"
4511 " from the OS, memcached will allocate the total item-cache\n"
4512 " in one large chunk.\n");
4513 printf("-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
4514 " This is used for per-prefix stats reporting. The default is\n"
4515 " \":\" (colon). If this option is specified, stats collection\n"
4516 " is turned on automatically; if not, then it may be turned on\n"
4517 " by sending the \"stats detail on\" command to the server.\n");
4518 printf("-t <num> number of threads to use (default: 4)\n");
4519 printf("-R Maximum number of requests per event, limits the number of\n"
4520 " requests process for a given connection to prevent \n"
4521 " starvation (default: 20)\n");
4522 printf("-C Disable use of CAS\n");
4523 printf("-b Set the backlog queue limit (default: 1024)\n");
4524 printf("-B Binding protocol - one of ascii, binary, or auto (default)\n");
4525 printf("-I Override the size of each slab page. Adjusts max item size\n"
4526 " (default: 1mb, min: 1k, max: 128m)\n");
4528 printf("-S Turn on Sasl authentication\n");
4530 printf("-o Comma separated list of extended or experimental options\n"
4531 " - (EXPERIMENTAL) maxconns_fast: immediately close new\n"
4532 " connections if over maxconns limit\n"
4533 " - hashpower: An integer multiplier for how large the hash\n"
4534 " table should be. Can be grown at runtime if not big enough.\n"
4535 " Set this based on \"STAT hash_power_level\" before a \n"
4541 static void usage_license(void) {
4542 printf(RPACKAGE
" " RVERSION
"\n\n");
4544 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
4545 "All rights reserved.\n"
4547 "Redistribution and use in source and binary forms, with or without\n"
4548 "modification, are permitted provided that the following conditions are\n"
4551 " * Redistributions of source code must retain the above copyright\n"
4552 "notice, this list of conditions and the following disclaimer.\n"
4554 " * Redistributions in binary form must reproduce the above\n"
4555 "copyright notice, this list of conditions and the following disclaimer\n"
4556 "in the documentation and/or other materials provided with the\n"
4559 " * Neither the name of the Danga Interactive nor the names of its\n"
4560 "contributors may be used to endorse or promote products derived from\n"
4561 "this software without specific prior written permission.\n"
4563 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
4564 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
4565 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
4566 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
4567 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
4568 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
4569 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4570 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4571 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4572 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
4573 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4576 "This product includes software developed by Niels Provos.\n"
4580 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
4581 "All rights reserved.\n"
4583 "Redistribution and use in source and binary forms, with or without\n"
4584 "modification, are permitted provided that the following conditions\n"
4586 "1. Redistributions of source code must retain the above copyright\n"
4587 " notice, this list of conditions and the following disclaimer.\n"
4588 "2. Redistributions in binary form must reproduce the above copyright\n"
4589 " notice, this list of conditions and the following disclaimer in the\n"
4590 " documentation and/or other materials provided with the distribution.\n"
4591 "3. All advertising materials mentioning features or use of this software\n"
4592 " must display the following acknowledgement:\n"
4593 " This product includes software developed by Niels Provos.\n"
4594 "4. The name of the author may not be used to endorse or promote products\n"
4595 " derived from this software without specific prior written permission.\n"
4597 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
4598 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
4599 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
4600 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
4601 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
4602 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
4603 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
4604 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
4605 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
4606 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
4612 static void save_pid(const char *pid_file
) {
4614 if (access(pid_file
, F_OK
) == 0) {
4615 if ((fp
= fopen(pid_file
, "r")) != NULL
) {
4617 if (fgets(buffer
, sizeof(buffer
), fp
) != NULL
) {
4619 if (safe_strtoul(buffer
, &pid
) && kill((pid_t
)pid
, 0) == 0) {
4620 fprintf(stderr
, "WARNING: The pid file contained the following (running) pid: %u\n", pid
);
4627 if ((fp
= fopen(pid_file
, "w")) == NULL
) {
4628 vperror("Could not open the pid file %s for writing", pid_file
);
4632 fprintf(fp
,"%ld\n", (long)getpid());
4633 if (fclose(fp
) == -1) {
4634 vperror("Could not close the pid file %s", pid_file
);
4638 static void remove_pidfile(const char *pid_file
) {
4639 if (pid_file
== NULL
)
4642 if (unlink(pid_file
) != 0) {
4643 vperror("Could not remove the pid file %s", pid_file
);
4648 static void sig_handler(const int sig
) {
4649 printf("SIGINT handled.\n");
4653 #ifndef HAVE_SIGIGNORE
4654 static int sigignore(int sig
) {
4655 struct sigaction sa
= { .sa_handler
= SIG_IGN
, .sa_flags
= 0 };
4657 if (sigemptyset(&sa
.sa_mask
) == -1 || sigaction(sig
, &sa
, 0) == -1) {
4666 * On systems that supports multiple page sizes we may reduce the
4667 * number of TLB-misses by using the biggest available page size
4669 static int enable_large_pages(void) {
4670 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
4673 int avail
= getpagesizes(sizes
, 32);
4675 size_t max
= sizes
[0];
4676 struct memcntl_mha arg
= {0};
4679 for (ii
= 1; ii
< avail
; ++ii
) {
4680 if (max
< sizes
[ii
]) {
4686 arg
.mha_pagesize
= max
;
4687 arg
.mha_cmd
= MHA_MAPSIZE_BSSBRK
;
4689 if (memcntl(0, 0, MC_HAT_ADVISE
, (caddr_t
)&arg
, 0, 0) == -1) {
4690 fprintf(stderr
, "Failed to set large pages: %s\n",
4692 fprintf(stderr
, "Will use default page size\n");
4697 fprintf(stderr
, "Failed to get supported pagesizes: %s\n",
4699 fprintf(stderr
, "Will use default page size\n");
4709 * Do basic sanity check of the runtime environment
4710 * @return true if no errors found, false if we can't use this env
4712 static bool sanitycheck(void) {
4713 /* One of our biggest problems is old and bogus libevents */
4714 const char *ever
= event_get_version();
4716 if (strncmp(ever
, "1.", 2) == 0) {
4717 /* Require at least 1.3 (that's still a couple of years old) */
4718 if ((ever
[2] == '1' || ever
[2] == '2') && !isdigit(ever
[3])) {
4719 fprintf(stderr
, "You are using libevent %s.\nPlease upgrade to"
4720 " a more recent version (1.3 or newer)\n",
4721 event_get_version());
4730 int main (int argc
, char **argv
) {
4732 bool lock_memory
= false;
4733 bool do_daemonize
= false;
4734 bool preallocate
= false;
4736 char *username
= NULL
;
4737 char *pid_file
= NULL
;
4742 int retval
= EXIT_SUCCESS
;
4743 /* listening sockets */
4744 static int *l_socket
= NULL
;
4747 static int *u_socket
= NULL
;
4748 bool protocol_specified
= false;
4749 bool tcp_specified
= false;
4750 bool udp_specified
= false;
4753 char *subopts_value
;
4760 char *const subopts_tokens
[] = {
4761 [MAXCONNS_FAST
] = (char*)"maxconns_fast",
4762 [HASHPOWER_INIT
] = (char*)"hashpower",
4763 [SLAB_REASSIGN
] = (char*)"slab_reassign",
4764 [SLAB_AUTOMOVE
] = (char*)"slab_automove",
4768 if (!sanitycheck()) {
4773 signal(SIGINT
, sig_handler
);
4778 /* set stderr non-buffering (for running under, say, daemontools) */
4779 setbuf(stderr
, NULL
);
4781 /* process arguments */
4782 while (-1 != (c
= getopt(argc
, argv
,
4783 "a:" /* access mask for unix socket */
4784 "p:" /* TCP port number to listen on */
4785 "s:" /* unix socket path to listen on */
4786 "U:" /* UDP port number to listen on */
4787 "m:" /* max memory to use for items in megabytes */
4788 "M" /* return error on memory exhausted */
4789 "c:" /* max simultaneous connections */
4790 "k" /* lock down all paged memory */
4791 "hi" /* help, licence info */
4792 "r" /* maximize core file limit */
4794 "d" /* daemon mode */
4795 "l:" /* interface to listen on */
4796 "u:" /* user identity to run as */
4797 "P:" /* save PID in file */
4799 "n:" /* minimum space allocated for key+value+flags */
4801 "D:" /* prefix delimiter? */
4802 "L" /* Large memory pages */
4803 "R:" /* max requests per event */
4804 "C" /* Disable use of CAS */
4805 "b:" /* backlog queue limit */
4806 "B:" /* Binding protocol */
4807 "I:" /* Max item size */
4809 "o:" /* Extended generic options */
4813 /* access for unix domain socket, as octal mask (like chmod)*/
4814 settings
.access
= strtol(optarg
,NULL
,8);
4818 settings
.udpport
= atoi(optarg
);
4819 udp_specified
= true;
4822 settings
.port
= atoi(optarg
);
4823 tcp_specified
= true;
4826 settings
.socketpath
= optarg
;
4829 settings
.maxbytes
= ((size_t)atoi(optarg
)) * 1024 * 1024;
4832 settings
.evict_to_free
= 0;
4835 settings
.maxconns
= atoi(optarg
);
4850 if (settings
.inter
!= NULL
) {
4851 size_t len
= strlen(settings
.inter
) + strlen(optarg
) + 2;
4852 char *p
= malloc(len
);
4854 fprintf(stderr
, "Failed to allocate memory\n");
4857 snprintf(p
, len
, "%s,%s", settings
.inter
, optarg
);
4858 free(settings
.inter
);
4861 settings
.inter
= strdup(optarg
);
4865 do_daemonize
= true;
4871 settings
.reqs_per_event
= atoi(optarg
);
4872 if (settings
.reqs_per_event
== 0) {
4873 fprintf(stderr
, "Number of requests per event must be greater than 0\n");
4884 settings
.factor
= atof(optarg
);
4885 if (settings
.factor
<= 1.0) {
4886 fprintf(stderr
, "Factor must be greater than 1\n");
4891 settings
.chunk_size
= atoi(optarg
);
4892 if (settings
.chunk_size
== 0) {
4893 fprintf(stderr
, "Chunk size must be greater than 0\n");
4898 settings
.num_threads
= atoi(optarg
);
4899 if (settings
.num_threads
<= 0) {
4900 fprintf(stderr
, "Number of threads must be greater than 0\n");
4903 /* There're other problems when you get above 64 threads.
4904 * In the future we should portably detect # of cores for the
4907 if (settings
.num_threads
> 64) {
4908 fprintf(stderr
, "WARNING: Setting a high number of worker"
4909 "threads is not recommended.\n"
4910 " Set this value to the number of cores in"
4911 " your machine or less.\n");
4915 if (! optarg
|| ! optarg
[0]) {
4916 fprintf(stderr
, "No delimiter specified\n");
4919 settings
.prefix_delimiter
= optarg
[0];
4920 settings
.detail_enabled
= 1;
4923 if (enable_large_pages() == 0) {
4928 settings
.use_cas
= false;
4931 settings
.backlog
= atoi(optarg
);
4934 protocol_specified
= true;
4935 if (strcmp(optarg
, "auto") == 0) {
4936 settings
.binding_protocol
= negotiating_prot
;
4937 } else if (strcmp(optarg
, "binary") == 0) {
4938 settings
.binding_protocol
= binary_prot
;
4939 } else if (strcmp(optarg
, "ascii") == 0) {
4940 settings
.binding_protocol
= ascii_prot
;
4942 fprintf(stderr
, "Invalid value for binding protocol: %s\n"
4943 " -- should be one of auto, binary, or ascii\n", optarg
);
4948 unit
= optarg
[strlen(optarg
)-1];
4949 if (unit
== 'k' || unit
== 'm' ||
4950 unit
== 'K' || unit
== 'M') {
4951 optarg
[strlen(optarg
)-1] = '\0';
4952 size_max
= atoi(optarg
);
4953 if (unit
== 'k' || unit
== 'K')
4955 if (unit
== 'm' || unit
== 'M')
4956 size_max
*= 1024 * 1024;
4957 settings
.item_size_max
= size_max
;
4959 settings
.item_size_max
= atoi(optarg
);
4961 if (settings
.item_size_max
< 1024) {
4962 fprintf(stderr
, "Item max size cannot be less than 1024 bytes.\n");
4965 if (settings
.item_size_max
> 1024 * 1024 * 128) {
4966 fprintf(stderr
, "Cannot set item size limit higher than 128 mb.\n");
4969 if (settings
.item_size_max
> 1024 * 1024) {
4970 fprintf(stderr
, "WARNING: Setting item max size above 1MB is not"
4972 " Raising this limit increases the minimum memory requirements\n"
4973 " and will decrease your memory efficiency.\n"
4977 case 'S': /* set Sasl authentication to true. Default is false */
4979 fprintf(stderr
, "This server is not built with SASL support.\n");
4982 settings
.sasl
= true;
4984 case 'o': /* It's sub-opts time! */
4987 while (*subopts
!= '\0') {
4989 switch (getsubopt(&subopts
, subopts_tokens
, &subopts_value
)) {
4991 settings
.maxconns_fast
= true;
4993 case HASHPOWER_INIT
:
4994 if (subopts_value
== NULL
) {
4995 fprintf(stderr
, "Missing numeric argument for hashpower\n");
4998 settings
.hashpower_init
= atoi(subopts_value
);
4999 if (settings
.hashpower_init
< 12) {
5000 fprintf(stderr
, "Initial hashtable multiplier of %d is too low\n",
5001 settings
.hashpower_init
);
5003 } else if (settings
.hashpower_init
> 64) {
5004 fprintf(stderr
, "Initial hashtable multiplier of %d is too high\n"
5005 "Choose a value based on \"STAT hash_power_level\" from a running instance\n",
5006 settings
.hashpower_init
);
5011 settings
.slab_reassign
= true;
5014 settings
.slab_automove
= true;
5017 printf("Illegal suboption \"%s\"\n", subopts_value
);
5024 fprintf(stderr
, "Illegal argument \"%c\"\n", c
);
5030 * Use one workerthread to serve each UDP port if the user specified
5033 if (settings
.inter
!= NULL
&& strchr(settings
.inter
, ',')) {
5034 settings
.num_threads_per_udp
= 1;
5036 settings
.num_threads_per_udp
= settings
.num_threads
;
5039 if (settings
.sasl
) {
5040 if (!protocol_specified
) {
5041 settings
.binding_protocol
= binary_prot
;
5043 if (settings
.binding_protocol
!= binary_prot
) {
5044 fprintf(stderr
, "ERROR: You cannot allow the ASCII protocol while using SASL.\n");
5050 if (tcp_specified
&& !udp_specified
) {
5051 settings
.udpport
= settings
.port
;
5052 } else if (udp_specified
&& !tcp_specified
) {
5053 settings
.port
= settings
.udpport
;
5057 struct rlimit rlim_new
;
5059 * First try raising to infinity; if that fails, try bringing
5060 * the soft limit to the hard.
5062 if (getrlimit(RLIMIT_CORE
, &rlim
) == 0) {
5063 rlim_new
.rlim_cur
= rlim_new
.rlim_max
= RLIM_INFINITY
;
5064 if (setrlimit(RLIMIT_CORE
, &rlim_new
)!= 0) {
5065 /* failed. try raising just to the old max */
5066 rlim_new
.rlim_cur
= rlim_new
.rlim_max
= rlim
.rlim_max
;
5067 (void)setrlimit(RLIMIT_CORE
, &rlim_new
);
5071 * getrlimit again to see what we ended up with. Only fail if
5072 * the soft limit ends up 0, because then no core files will be
5076 if ((getrlimit(RLIMIT_CORE
, &rlim
) != 0) || rlim
.rlim_cur
== 0) {
5077 fprintf(stderr
, "failed to ensure corefile creation\n");
5083 * If needed, increase rlimits to allow as many connections
5087 if (getrlimit(RLIMIT_NOFILE
, &rlim
) != 0) {
5088 fprintf(stderr
, "failed to getrlimit number of files\n");
5091 rlim
.rlim_cur
= settings
.maxconns
;
5092 rlim
.rlim_max
= settings
.maxconns
;
5093 if (setrlimit(RLIMIT_NOFILE
, &rlim
) != 0) {
5094 fprintf(stderr
, "failed to set rlimit for open files. Try starting as root or requesting smaller maxconns value.\n");
5099 /* lose root privileges if we have them */
5100 if (getuid() == 0 || geteuid() == 0) {
5101 if (username
== 0 || *username
== '\0') {
5102 fprintf(stderr
, "can't run as root without the -u switch\n");
5105 if ((pw
= getpwnam(username
)) == 0) {
5106 fprintf(stderr
, "can't find the user %s to switch to\n", username
);
5109 if (setgid(pw
->pw_gid
) < 0 || setuid(pw
->pw_uid
) < 0) {
5110 fprintf(stderr
, "failed to assume identity of user %s\n", username
);
5115 /* Initialize Sasl if -S was specified */
5116 if (settings
.sasl
) {
5120 /* daemonize if requested */
5121 /* if we want to ensure our ability to dump core, don't chdir to / */
5123 if (sigignore(SIGHUP
) == -1) {
5124 perror("Failed to ignore SIGHUP");
5126 if (daemonize(maxcore
, settings
.verbose
) == -1) {
5127 fprintf(stderr
, "failed to daemon() in order to daemonize\n");
5132 /* lock paged memory if needed */
5134 #ifdef HAVE_MLOCKALL
5135 int res
= mlockall(MCL_CURRENT
| MCL_FUTURE
);
5137 fprintf(stderr
, "warning: -k invalid, mlockall() failed: %s\n",
5141 fprintf(stderr
, "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
5145 /* initialize main thread libevent instance */
5146 main_base
= event_init();
5148 /* initialize other stuff */
5150 assoc_init(settings
.hashpower_init
);
5152 slabs_init(settings
.maxbytes
, settings
.factor
, preallocate
);
5155 * ignore SIGPIPE signals; we can use errno == EPIPE if we
5156 * need that information
5158 if (sigignore(SIGPIPE
) == -1) {
5159 perror("failed to ignore SIGPIPE; sigaction");
5162 /* start up worker threads if MT mode */
5163 thread_init(settings
.num_threads
, main_base
);
5165 if (start_assoc_maintenance_thread() == -1) {
5169 if (settings
.slab_reassign
&&
5170 start_slab_maintenance_thread() == -1) {
5174 /* initialise clock event */
5175 clock_handler(0, 0, 0);
5177 /* create unix mode sockets after dropping privileges */
5178 if (settings
.socketpath
!= NULL
) {
5180 if (server_socket_unix(settings
.socketpath
,settings
.access
)) {
5181 vperror("failed to listen on UNIX socket: %s", settings
.socketpath
);
5186 /* create the listening socket, bind it, and init */
5187 if (settings
.socketpath
== NULL
) {
5188 const char *portnumber_filename
= getenv("MEMCACHED_PORT_FILENAME");
5189 char temp_portnumber_filename
[PATH_MAX
];
5190 FILE *portnumber_file
= NULL
;
5192 if (portnumber_filename
!= NULL
) {
5193 snprintf(temp_portnumber_filename
,
5194 sizeof(temp_portnumber_filename
),
5195 "%s.lck", portnumber_filename
);
5197 portnumber_file
= fopen(temp_portnumber_filename
, "a");
5198 if (portnumber_file
== NULL
) {
5199 fprintf(stderr
, "Failed to open \"%s\": %s\n",
5200 temp_portnumber_filename
, strerror(errno
));
5205 if (settings
.port
&& server_sockets(settings
.port
, tcp_transport
,
5207 vperror("failed to listen on TCP port %d", settings
.port
);
5212 * initialization order: first create the listening sockets
5213 * (may need root on low ports), then drop root if needed,
5214 * then daemonise if needed, then init libevent (in some cases
5215 * descriptors created by libevent wouldn't survive forking).
5218 /* create the UDP listening socket and bind it */
5220 if (settings
.udpport
&& server_sockets(settings
.udpport
, udp_transport
,
5222 vperror("failed to listen on UDP port %d", settings
.udpport
);
5226 if (portnumber_file
) {
5227 fclose(portnumber_file
);
5228 rename(temp_portnumber_filename
, portnumber_filename
);
5232 /* Give the sockets a moment to open. I know this is dumb, but the error
5233 * is only an advisory.
5236 if (stats
.curr_conns
+ stats
.reserved_fds
>= settings
.maxconns
- 1) {
5237 fprintf(stderr
, "Maxconns setting is too low, use -c to increase.\n");
5241 if (pid_file
!= NULL
) {
5245 /* Drop privileges no longer needed */
5248 /* enter the event loop */
5249 if (event_base_loop(main_base
, 0) != 0) {
5250 retval
= EXIT_FAILURE
;
5253 stop_assoc_maintenance_thread();
5255 /* remove the PID file if we're a daemon */
5258 remove_pidfile(pid_file
);
5260 /* Clean up strdup() call for bind() address */
5262 free(settings
.inter
);