1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Thread management for memcached.
18 #define ITEMS_PER_ALLOC 64
20 /* An item in the connection queue. */
21 typedef struct conn_queue_item CQ_ITEM
;
22 struct conn_queue_item
{
24 enum conn_states init_state
;
27 enum network_transport transport
;
31 /* A connection queue. */
32 typedef struct conn_queue CQ
;
40 /* Lock for cache operations (item_*, assoc_*) */
41 pthread_mutex_t cache_lock
;
43 /* Connection lock around accepting new connections */
44 pthread_mutex_t conn_lock
= PTHREAD_MUTEX_INITIALIZER
;
46 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
47 pthread_mutex_t atomics_mutex
= PTHREAD_MUTEX_INITIALIZER
;
50 /* Lock for global stats */
51 static pthread_mutex_t stats_lock
;
53 /* Free list of CQ_ITEM structs */
54 static CQ_ITEM
*cqi_freelist
;
55 static pthread_mutex_t cqi_freelist_lock
;
57 static pthread_mutex_t
*item_locks
;
58 /* size of the item lock hash table */
59 static uint32_t item_lock_count
;
60 /* size - 1 for lookup masking */
61 static uint32_t item_lock_mask
;
63 static LIBEVENT_DISPATCHER_THREAD dispatcher_thread
;
66 * Each libevent instance has a wakeup pipe, which other threads
67 * can use to signal that they've put a new connection on its queue.
69 static LIBEVENT_THREAD
*threads
;
72 * Number of worker threads that have finished setting themselves up.
74 static int init_count
= 0;
75 static pthread_mutex_t init_lock
;
76 static pthread_cond_t init_cond
;
79 static void thread_libevent_process(int fd
, short which
, void *arg
);
81 unsigned short refcount_incr(unsigned short *refcount
) {
82 #ifdef HAVE_GCC_ATOMICS
83 return __sync_add_and_fetch(refcount
, 1);
85 return atomic_inc_ushort_nv(refcount
);
88 mutex_lock(&atomics_mutex
);
91 pthread_mutex_unlock(&atomics_mutex
);
96 unsigned short refcount_decr(unsigned short *refcount
) {
97 #ifdef HAVE_GCC_ATOMICS
98 return __sync_sub_and_fetch(refcount
, 1);
100 return atomic_dec_ushort_nv(refcount
);
103 mutex_lock(&atomics_mutex
);
106 pthread_mutex_unlock(&atomics_mutex
);
111 void item_lock(uint32_t hv
) {
112 mutex_lock(&item_locks
[hv
& item_lock_mask
]);
115 void item_unlock(uint32_t hv
) {
116 pthread_mutex_unlock(&item_locks
[hv
& item_lock_mask
]);
120 * Initializes a connection queue.
122 static void cq_init(CQ
*cq
) {
123 pthread_mutex_init(&cq
->lock
, NULL
);
124 pthread_cond_init(&cq
->cond
, NULL
);
130 * Looks for an item on a connection queue, but doesn't block if there isn't
132 * Returns the item, or NULL if no item is available
134 static CQ_ITEM
*cq_pop(CQ
*cq
) {
137 pthread_mutex_lock(&cq
->lock
);
140 cq
->head
= item
->next
;
141 if (NULL
== cq
->head
)
144 pthread_mutex_unlock(&cq
->lock
);
150 * Adds an item to a connection queue.
152 static void cq_push(CQ
*cq
, CQ_ITEM
*item
) {
155 pthread_mutex_lock(&cq
->lock
);
156 if (NULL
== cq
->tail
)
159 cq
->tail
->next
= item
;
161 pthread_cond_signal(&cq
->cond
);
162 pthread_mutex_unlock(&cq
->lock
);
166 * Returns a fresh connection queue item.
168 static CQ_ITEM
*cqi_new(void) {
169 CQ_ITEM
*item
= NULL
;
170 pthread_mutex_lock(&cqi_freelist_lock
);
173 cqi_freelist
= item
->next
;
175 pthread_mutex_unlock(&cqi_freelist_lock
);
180 /* Allocate a bunch of items at once to reduce fragmentation */
181 item
= malloc(sizeof(CQ_ITEM
) * ITEMS_PER_ALLOC
);
186 * Link together all the new items except the first one
187 * (which we'll return to the caller) for placement on
190 for (i
= 2; i
< ITEMS_PER_ALLOC
; i
++)
191 item
[i
- 1].next
= &item
[i
];
193 pthread_mutex_lock(&cqi_freelist_lock
);
194 item
[ITEMS_PER_ALLOC
- 1].next
= cqi_freelist
;
195 cqi_freelist
= &item
[1];
196 pthread_mutex_unlock(&cqi_freelist_lock
);
204 * Frees a connection queue item (adds it to the freelist.)
206 static void cqi_free(CQ_ITEM
*item
) {
207 pthread_mutex_lock(&cqi_freelist_lock
);
208 item
->next
= cqi_freelist
;
210 pthread_mutex_unlock(&cqi_freelist_lock
);
215 * Creates a worker thread.
217 static void create_worker(void *(*func
)(void *), void *arg
) {
222 pthread_attr_init(&attr
);
224 if ((ret
= pthread_create(&thread
, &attr
, func
, arg
)) != 0) {
225 fprintf(stderr
, "Can't create thread: %s\n",
232 * Sets whether or not we accept new connections.
234 void accept_new_conns(const bool do_accept
) {
235 pthread_mutex_lock(&conn_lock
);
236 do_accept_new_conns(do_accept
);
237 pthread_mutex_unlock(&conn_lock
);
239 /****************************** LIBEVENT THREADS *****************************/
242 * Set up a thread's information.
244 static void setup_thread(LIBEVENT_THREAD
*me
) {
245 me
->base
= event_init();
247 fprintf(stderr
, "Can't allocate event base\n");
251 /* Listen for notifications from other threads */
252 event_set(&me
->notify_event
, me
->notify_receive_fd
,
253 EV_READ
| EV_PERSIST
, thread_libevent_process
, me
);
254 event_base_set(me
->base
, &me
->notify_event
);
256 if (event_add(&me
->notify_event
, 0) == -1) {
257 fprintf(stderr
, "Can't monitor libevent notify pipe\n");
261 me
->new_conn_queue
= malloc(sizeof(struct conn_queue
));
262 if (me
->new_conn_queue
== NULL
) {
263 perror("Failed to allocate memory for connection queue");
266 cq_init(me
->new_conn_queue
);
268 if (pthread_mutex_init(&me
->stats
.mutex
, NULL
) != 0) {
269 perror("Failed to initialize mutex");
273 me
->suffix_cache
= cache_create("suffix", SUFFIX_SIZE
, sizeof(char*),
275 if (me
->suffix_cache
== NULL
) {
276 fprintf(stderr
, "Failed to create suffix cache\n");
283 * Worker thread: main event loop
285 static void *worker_libevent(void *arg
) {
286 LIBEVENT_THREAD
*me
= arg
;
288 /* Any per-thread setup can happen here; thread_init() will block until
289 * all threads have finished initializing.
292 pthread_mutex_lock(&init_lock
);
294 pthread_cond_signal(&init_cond
);
295 pthread_mutex_unlock(&init_lock
);
297 event_base_loop(me
->base
, 0);
302 #ifndef __INTEL_COMPILER
303 #pragma GCC diagnostic ignored "-Wunused-parameter"
306 * Processes an incoming "handle a new connection" item. This is called when
307 * input arrives on the libevent wakeup pipe.
309 static void thread_libevent_process(int fd
, short which
, void *arg
) {
310 LIBEVENT_THREAD
*me
= arg
;
314 if (read(fd
, buf
, 1) != 1)
315 if (settings
.verbose
> 0)
316 fprintf(stderr
, "Can't read from libevent pipe\n");
318 item
= cq_pop(me
->new_conn_queue
);
321 conn
*c
= conn_new(item
->sfd
, item
->init_state
, item
->event_flags
,
322 item
->read_buffer_size
, item
->transport
, me
->base
);
324 if (IS_UDP(item
->transport
)) {
325 fprintf(stderr
, "Can't listen for events on UDP socket\n");
328 if (settings
.verbose
> 0) {
329 fprintf(stderr
, "Can't listen for events on fd %d\n",
341 /* Which thread we assigned a connection to most recently. */
342 static int last_thread
= -1;
345 * Dispatches a new connection to another thread. This is only ever called
346 * from the main thread, either during initialization (for UDP) or because
347 * of an incoming connection.
349 void dispatch_conn_new(int sfd
, enum conn_states init_state
, int event_flags
,
350 int read_buffer_size
, enum network_transport transport
) {
351 CQ_ITEM
*item
= cqi_new();
352 int tid
= (last_thread
+ 1) % settings
.num_threads
;
354 LIBEVENT_THREAD
*thread
= threads
+ tid
;
359 item
->init_state
= init_state
;
360 item
->event_flags
= event_flags
;
361 item
->read_buffer_size
= read_buffer_size
;
362 item
->transport
= transport
;
364 cq_push(thread
->new_conn_queue
, item
);
366 MEMCACHED_CONN_DISPATCH(sfd
, thread
->thread_id
);
367 if (write(thread
->notify_send_fd
, "", 1) != 1) {
368 perror("Writing to thread notify pipe");
373 * Returns true if this is the thread that listens for new TCP connections.
375 int is_listen_thread() {
376 return pthread_self() == dispatcher_thread
.thread_id
;
379 /********************************* ITEM ACCESS *******************************/
382 * Allocates a new item.
384 item
*item_alloc(char *key
, size_t nkey
, int flags
, rel_time_t exptime
, int nbytes
) {
386 /* do_item_alloc handles its own locks */
387 it
= do_item_alloc(key
, nkey
, flags
, exptime
, nbytes
);
392 * Returns an item if it hasn't been marked as expired,
393 * lazy-expiring as needed.
395 item
*item_get(const char *key
, const size_t nkey
) {
398 hv
= hash(key
, nkey
, 0);
400 it
= do_item_get(key
, nkey
, hv
);
405 item
*item_touch(const char *key
, size_t nkey
, uint32_t exptime
) {
408 hv
= hash(key
, nkey
, 0);
410 it
= do_item_touch(key
, nkey
, exptime
, hv
);
416 * Links an item into the LRU and hashtable.
418 int item_link(item
*item
) {
422 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
424 ret
= do_item_link(item
, hv
);
430 * Decrements the reference count on an item and adds it to the freelist if
433 void item_remove(item
*item
) {
435 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
438 do_item_remove(item
);
443 * Replaces one item with another in the hashtable.
444 * Unprotected by a mutex lock since the core server does not require
445 * it to be thread-safe.
447 int item_replace(item
*old_it
, item
*new_it
, const uint32_t hv
) {
448 return do_item_replace(old_it
, new_it
, hv
);
452 * Unlinks an item from the LRU and hashtable.
454 void item_unlink(item
*item
) {
456 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
458 do_item_unlink(item
, hv
);
463 * Moves an item to the back of the LRU queue.
465 void item_update(item
*item
) {
467 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
470 do_item_update(item
);
475 * Does arithmetic on a numeric item value.
477 enum delta_result_type
add_delta(conn
*c
, const char *key
,
478 const size_t nkey
, int incr
,
479 const int64_t delta
, char *buf
,
481 enum delta_result_type ret
;
484 hv
= hash(key
, nkey
, 0);
486 ret
= do_add_delta(c
, key
, nkey
, incr
, delta
, buf
, cas
, hv
);
492 * Stores an item in the cache (high level, obeys set/add/replace semantics)
494 enum store_item_type
store_item(item
*item
, int comm
, conn
* c
) {
495 enum store_item_type ret
;
498 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
500 ret
= do_store_item(item
, comm
, c
, hv
);
506 * Flushes expired items after a flush_all call
508 void item_flush_expired() {
509 mutex_lock(&cache_lock
);
510 do_item_flush_expired();
511 pthread_mutex_unlock(&cache_lock
);
515 * Dumps part of the cache
517 char *item_cachedump(unsigned int slabs_clsid
, unsigned int limit
, unsigned int *bytes
) {
520 mutex_lock(&cache_lock
);
521 ret
= do_item_cachedump(slabs_clsid
, limit
, bytes
);
522 pthread_mutex_unlock(&cache_lock
);
527 * Dumps statistics about slab classes
529 void item_stats(ADD_STAT add_stats
, void *c
) {
530 mutex_lock(&cache_lock
);
531 do_item_stats(add_stats
, c
);
532 pthread_mutex_unlock(&cache_lock
);
536 * Dumps a list of objects of each size in 32-byte increments
538 void item_stats_sizes(ADD_STAT add_stats
, void *c
) {
539 mutex_lock(&cache_lock
);
540 do_item_stats_sizes(add_stats
, c
);
541 pthread_mutex_unlock(&cache_lock
);
544 /******************************* GLOBAL STATS ******************************/
547 pthread_mutex_lock(&stats_lock
);
550 void STATS_UNLOCK() {
551 pthread_mutex_unlock(&stats_lock
);
554 void threadlocal_stats_reset(void) {
556 for (ii
= 0; ii
< settings
.num_threads
; ++ii
) {
557 pthread_mutex_lock(&threads
[ii
].stats
.mutex
);
559 threads
[ii
].stats
.get_cmds
= 0;
560 threads
[ii
].stats
.get_misses
= 0;
561 threads
[ii
].stats
.touch_cmds
= 0;
562 threads
[ii
].stats
.touch_misses
= 0;
563 threads
[ii
].stats
.delete_misses
= 0;
564 threads
[ii
].stats
.incr_misses
= 0;
565 threads
[ii
].stats
.decr_misses
= 0;
566 threads
[ii
].stats
.cas_misses
= 0;
567 threads
[ii
].stats
.bytes_read
= 0;
568 threads
[ii
].stats
.bytes_written
= 0;
569 threads
[ii
].stats
.flush_cmds
= 0;
570 threads
[ii
].stats
.conn_yields
= 0;
571 threads
[ii
].stats
.auth_cmds
= 0;
572 threads
[ii
].stats
.auth_errors
= 0;
574 for(sid
= 0; sid
< MAX_NUMBER_OF_SLAB_CLASSES
; sid
++) {
575 threads
[ii
].stats
.slab_stats
[sid
].set_cmds
= 0;
576 threads
[ii
].stats
.slab_stats
[sid
].get_hits
= 0;
577 threads
[ii
].stats
.slab_stats
[sid
].touch_hits
= 0;
578 threads
[ii
].stats
.slab_stats
[sid
].delete_hits
= 0;
579 threads
[ii
].stats
.slab_stats
[sid
].incr_hits
= 0;
580 threads
[ii
].stats
.slab_stats
[sid
].decr_hits
= 0;
581 threads
[ii
].stats
.slab_stats
[sid
].cas_hits
= 0;
582 threads
[ii
].stats
.slab_stats
[sid
].cas_badval
= 0;
585 pthread_mutex_unlock(&threads
[ii
].stats
.mutex
);
589 void threadlocal_stats_aggregate(struct thread_stats
*stats
) {
592 /* The struct has a mutex, but we can safely set the whole thing
593 * to zero since it is unused when aggregating. */
594 memset(stats
, 0, sizeof(*stats
));
596 for (ii
= 0; ii
< settings
.num_threads
; ++ii
) {
597 pthread_mutex_lock(&threads
[ii
].stats
.mutex
);
599 stats
->get_cmds
+= threads
[ii
].stats
.get_cmds
;
600 stats
->get_misses
+= threads
[ii
].stats
.get_misses
;
601 stats
->touch_cmds
+= threads
[ii
].stats
.touch_cmds
;
602 stats
->touch_misses
+= threads
[ii
].stats
.touch_misses
;
603 stats
->delete_misses
+= threads
[ii
].stats
.delete_misses
;
604 stats
->decr_misses
+= threads
[ii
].stats
.decr_misses
;
605 stats
->incr_misses
+= threads
[ii
].stats
.incr_misses
;
606 stats
->cas_misses
+= threads
[ii
].stats
.cas_misses
;
607 stats
->bytes_read
+= threads
[ii
].stats
.bytes_read
;
608 stats
->bytes_written
+= threads
[ii
].stats
.bytes_written
;
609 stats
->flush_cmds
+= threads
[ii
].stats
.flush_cmds
;
610 stats
->conn_yields
+= threads
[ii
].stats
.conn_yields
;
611 stats
->auth_cmds
+= threads
[ii
].stats
.auth_cmds
;
612 stats
->auth_errors
+= threads
[ii
].stats
.auth_errors
;
614 for (sid
= 0; sid
< MAX_NUMBER_OF_SLAB_CLASSES
; sid
++) {
615 stats
->slab_stats
[sid
].set_cmds
+=
616 threads
[ii
].stats
.slab_stats
[sid
].set_cmds
;
617 stats
->slab_stats
[sid
].get_hits
+=
618 threads
[ii
].stats
.slab_stats
[sid
].get_hits
;
619 stats
->slab_stats
[sid
].touch_hits
+=
620 threads
[ii
].stats
.slab_stats
[sid
].touch_hits
;
621 stats
->slab_stats
[sid
].delete_hits
+=
622 threads
[ii
].stats
.slab_stats
[sid
].delete_hits
;
623 stats
->slab_stats
[sid
].decr_hits
+=
624 threads
[ii
].stats
.slab_stats
[sid
].decr_hits
;
625 stats
->slab_stats
[sid
].incr_hits
+=
626 threads
[ii
].stats
.slab_stats
[sid
].incr_hits
;
627 stats
->slab_stats
[sid
].cas_hits
+=
628 threads
[ii
].stats
.slab_stats
[sid
].cas_hits
;
629 stats
->slab_stats
[sid
].cas_badval
+=
630 threads
[ii
].stats
.slab_stats
[sid
].cas_badval
;
633 pthread_mutex_unlock(&threads
[ii
].stats
.mutex
);
637 void slab_stats_aggregate(struct thread_stats
*stats
, struct slab_stats
*out
) {
643 out
->delete_hits
= 0;
649 for (sid
= 0; sid
< MAX_NUMBER_OF_SLAB_CLASSES
; sid
++) {
650 out
->set_cmds
+= stats
->slab_stats
[sid
].set_cmds
;
651 out
->get_hits
+= stats
->slab_stats
[sid
].get_hits
;
652 out
->touch_hits
+= stats
->slab_stats
[sid
].touch_hits
;
653 out
->delete_hits
+= stats
->slab_stats
[sid
].delete_hits
;
654 out
->decr_hits
+= stats
->slab_stats
[sid
].decr_hits
;
655 out
->incr_hits
+= stats
->slab_stats
[sid
].incr_hits
;
656 out
->cas_hits
+= stats
->slab_stats
[sid
].cas_hits
;
657 out
->cas_badval
+= stats
->slab_stats
[sid
].cas_badval
;
661 #ifndef __INTEL_COMPILER
662 #pragma GCC diagnostic ignored "-Wsign-compare"
665 * Initializes the thread subsystem, creating various worker threads.
667 * nthreads Number of worker event handler threads to spawn
668 * main_base Event base for main thread
670 void thread_init(int nthreads
, struct event_base
*main_base
) {
674 pthread_mutex_init(&cache_lock
, NULL
);
675 pthread_mutex_init(&stats_lock
, NULL
);
677 pthread_mutex_init(&init_lock
, NULL
);
678 pthread_cond_init(&init_cond
, NULL
);
680 pthread_mutex_init(&cqi_freelist_lock
, NULL
);
683 /* Want a wide lock table, but don't waste memory */
686 } else if (nthreads
< 4) {
688 } else if (nthreads
< 5) {
691 /* 8192 buckets, and central locks don't scale much past 5 threads */
695 item_lock_count
= ((unsigned long int)1 << (power
));
696 item_lock_mask
= item_lock_count
- 1;
698 item_locks
= calloc(item_lock_count
, sizeof(pthread_mutex_t
));
700 perror("Can't allocate item locks");
703 for (i
= 0; i
< item_lock_count
; i
++) {
704 pthread_mutex_init(&item_locks
[i
], NULL
);
707 threads
= calloc(nthreads
, sizeof(LIBEVENT_THREAD
));
709 perror("Can't allocate thread descriptors");
713 dispatcher_thread
.base
= main_base
;
714 dispatcher_thread
.thread_id
= pthread_self();
716 for (i
= 0; i
< nthreads
; i
++) {
719 perror("Can't create notify pipe");
723 threads
[i
].notify_receive_fd
= fds
[0];
724 threads
[i
].notify_send_fd
= fds
[1];
726 setup_thread(&threads
[i
]);
727 /* Reserve three fds for the libevent base, and two for the pipe */
728 stats
.reserved_fds
+= 5;
731 /* Create threads after we've done all the libevent setup. */
732 for (i
= 0; i
< nthreads
; i
++) {
733 create_worker(worker_libevent
, &threads
[i
]);
736 /* Wait for all the threads to set themselves up before returning. */
737 pthread_mutex_lock(&init_lock
);
738 while (init_count
< nthreads
) {
739 pthread_cond_wait(&init_cond
, &init_lock
);
741 pthread_mutex_unlock(&init_lock
);