1 /* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3 * Thread management for memcached.
18 #define ITEMS_PER_ALLOC 64
20 /* An item in the connection queue. */
21 typedef struct conn_queue_item CQ_ITEM
;
22 struct conn_queue_item
{
24 enum conn_states init_state
;
27 enum network_transport transport
;
31 /* A connection queue. */
32 typedef struct conn_queue CQ
;
40 /* Lock for cache operations (item_*, assoc_*) */
41 pthread_mutex_t cache_lock
;
43 /* Connection lock around accepting new connections */
44 pthread_mutex_t conn_lock
= PTHREAD_MUTEX_INITIALIZER
;
46 #if !defined(HAVE_GCC_ATOMICS) && !defined(__sun)
47 pthread_mutex_t atomics_mutex
= PTHREAD_MUTEX_INITIALIZER
;
50 /* Lock for global stats */
51 static pthread_mutex_t stats_lock
;
53 /* Free list of CQ_ITEM structs */
54 static CQ_ITEM
*cqi_freelist
;
55 static pthread_mutex_t cqi_freelist_lock
;
57 static pthread_mutex_t
*item_locks
;
58 /* size of the item lock hash table */
59 static uint32_t item_lock_count
;
60 /* size - 1 for lookup masking */
61 static uint32_t item_lock_mask
;
63 static LIBEVENT_DISPATCHER_THREAD dispatcher_thread
;
66 * Each libevent instance has a wakeup pipe, which other threads
67 * can use to signal that they've put a new connection on its queue.
69 static LIBEVENT_THREAD
*threads
;
72 * Number of worker threads that have finished setting themselves up.
74 static int init_count
= 0;
75 static pthread_mutex_t init_lock
;
76 static pthread_cond_t init_cond
;
79 static void thread_libevent_process(int fd
, short which
, void *arg
);
81 unsigned short refcount_incr(unsigned short *refcount
) {
82 #ifdef HAVE_GCC_ATOMICS
83 return __sync_add_and_fetch(refcount
, 1);
85 return atomic_inc_ushort_nv(refcount
);
88 mutex_lock(&atomics_mutex
);
91 mutex_unlock(&atomics_mutex
);
96 unsigned short refcount_decr(unsigned short *refcount
) {
97 #ifdef HAVE_GCC_ATOMICS
98 return __sync_sub_and_fetch(refcount
, 1);
100 return atomic_dec_ushort_nv(refcount
);
103 mutex_lock(&atomics_mutex
);
106 mutex_unlock(&atomics_mutex
);
111 void item_lock(uint32_t hv
) {
112 mutex_lock(&item_locks
[hv
& item_lock_mask
]);
115 void item_unlock(uint32_t hv
) {
116 mutex_unlock(&item_locks
[hv
& item_lock_mask
]);
120 * Initializes a connection queue.
122 static void cq_init(CQ
*cq
) {
123 pthread_mutex_init(&cq
->lock
, NULL
);
124 pthread_cond_init(&cq
->cond
, NULL
);
130 * Looks for an item on a connection queue, but doesn't block if there isn't
132 * Returns the item, or NULL if no item is available
134 static CQ_ITEM
*cq_pop(CQ
*cq
) {
137 pthread_mutex_lock(&cq
->lock
);
140 cq
->head
= item
->next
;
141 if (NULL
== cq
->head
)
144 pthread_mutex_unlock(&cq
->lock
);
150 * Adds an item to a connection queue.
152 static void cq_push(CQ
*cq
, CQ_ITEM
*item
) {
155 pthread_mutex_lock(&cq
->lock
);
156 if (NULL
== cq
->tail
)
159 cq
->tail
->next
= item
;
161 pthread_cond_signal(&cq
->cond
);
162 pthread_mutex_unlock(&cq
->lock
);
166 * Returns a fresh connection queue item.
168 static CQ_ITEM
*cqi_new(void) {
169 CQ_ITEM
*item
= NULL
;
170 pthread_mutex_lock(&cqi_freelist_lock
);
173 cqi_freelist
= item
->next
;
175 pthread_mutex_unlock(&cqi_freelist_lock
);
180 /* Allocate a bunch of items at once to reduce fragmentation */
181 item
= malloc(sizeof(CQ_ITEM
) * ITEMS_PER_ALLOC
);
186 * Link together all the new items except the first one
187 * (which we'll return to the caller) for placement on
190 for (i
= 2; i
< ITEMS_PER_ALLOC
; i
++)
191 item
[i
- 1].next
= &item
[i
];
193 pthread_mutex_lock(&cqi_freelist_lock
);
194 item
[ITEMS_PER_ALLOC
- 1].next
= cqi_freelist
;
195 cqi_freelist
= &item
[1];
196 pthread_mutex_unlock(&cqi_freelist_lock
);
204 * Frees a connection queue item (adds it to the freelist.)
206 static void cqi_free(CQ_ITEM
*item
) {
207 pthread_mutex_lock(&cqi_freelist_lock
);
208 item
->next
= cqi_freelist
;
210 pthread_mutex_unlock(&cqi_freelist_lock
);
215 * Creates a worker thread.
217 static void create_worker(void *(*func
)(void *), void *arg
) {
222 pthread_attr_init(&attr
);
224 if ((ret
= pthread_create(&thread
, &attr
, func
, arg
)) != 0) {
225 fprintf(stderr
, "Can't create thread: %s\n",
232 * Sets whether or not we accept new connections.
234 void accept_new_conns(const bool do_accept
) {
235 pthread_mutex_lock(&conn_lock
);
236 do_accept_new_conns(do_accept
);
237 pthread_mutex_unlock(&conn_lock
);
239 /****************************** LIBEVENT THREADS *****************************/
242 * Set up a thread's information.
244 static void setup_thread(LIBEVENT_THREAD
*me
) {
245 me
->base
= event_init();
247 fprintf(stderr
, "Can't allocate event base\n");
251 /* Listen for notifications from other threads */
252 event_set(&me
->notify_event
, me
->notify_receive_fd
,
253 EV_READ
| EV_PERSIST
, thread_libevent_process
, me
);
254 event_base_set(me
->base
, &me
->notify_event
);
256 if (event_add(&me
->notify_event
, 0) == -1) {
257 fprintf(stderr
, "Can't monitor libevent notify pipe\n");
261 me
->new_conn_queue
= malloc(sizeof(struct conn_queue
));
262 if (me
->new_conn_queue
== NULL
) {
263 perror("Failed to allocate memory for connection queue");
266 cq_init(me
->new_conn_queue
);
268 if (pthread_mutex_init(&me
->stats
.mutex
, NULL
) != 0) {
269 perror("Failed to initialize mutex");
273 me
->suffix_cache
= cache_create("suffix", SUFFIX_SIZE
, sizeof(char*),
275 if (me
->suffix_cache
== NULL
) {
276 fprintf(stderr
, "Failed to create suffix cache\n");
283 * Worker thread: main event loop
285 static void *worker_libevent(void *arg
) {
286 LIBEVENT_THREAD
*me
= arg
;
288 /* Any per-thread setup can happen here; thread_init() will block until
289 * all threads have finished initializing.
292 pthread_mutex_lock(&init_lock
);
294 pthread_cond_signal(&init_cond
);
295 pthread_mutex_unlock(&init_lock
);
297 event_base_loop(me
->base
, 0);
303 * Processes an incoming "handle a new connection" item. This is called when
304 * input arrives on the libevent wakeup pipe.
306 static void thread_libevent_process(int fd
, short which
, void *arg
) {
307 LIBEVENT_THREAD
*me
= arg
;
311 if (read(fd
, buf
, 1) != 1)
312 if (settings
.verbose
> 0)
313 fprintf(stderr
, "Can't read from libevent pipe\n");
315 item
= cq_pop(me
->new_conn_queue
);
318 conn
*c
= conn_new(item
->sfd
, item
->init_state
, item
->event_flags
,
319 item
->read_buffer_size
, item
->transport
, me
->base
);
321 if (IS_UDP(item
->transport
)) {
322 fprintf(stderr
, "Can't listen for events on UDP socket\n");
325 if (settings
.verbose
> 0) {
326 fprintf(stderr
, "Can't listen for events on fd %d\n",
338 /* Which thread we assigned a connection to most recently. */
339 static int last_thread
= -1;
342 * Dispatches a new connection to another thread. This is only ever called
343 * from the main thread, either during initialization (for UDP) or because
344 * of an incoming connection.
346 void dispatch_conn_new(int sfd
, enum conn_states init_state
, int event_flags
,
347 int read_buffer_size
, enum network_transport transport
) {
348 CQ_ITEM
*item
= cqi_new();
349 int tid
= (last_thread
+ 1) % settings
.num_threads
;
351 LIBEVENT_THREAD
*thread
= threads
+ tid
;
356 item
->init_state
= init_state
;
357 item
->event_flags
= event_flags
;
358 item
->read_buffer_size
= read_buffer_size
;
359 item
->transport
= transport
;
361 cq_push(thread
->new_conn_queue
, item
);
363 MEMCACHED_CONN_DISPATCH(sfd
, thread
->thread_id
);
364 if (write(thread
->notify_send_fd
, "", 1) != 1) {
365 perror("Writing to thread notify pipe");
370 * Returns true if this is the thread that listens for new TCP connections.
372 int is_listen_thread() {
373 return pthread_self() == dispatcher_thread
.thread_id
;
376 /********************************* ITEM ACCESS *******************************/
379 * Allocates a new item.
381 item
*item_alloc(char *key
, size_t nkey
, int flags
, rel_time_t exptime
, int nbytes
) {
383 /* do_item_alloc handles its own locks */
384 it
= do_item_alloc(key
, nkey
, flags
, exptime
, nbytes
);
389 * Returns an item if it hasn't been marked as expired,
390 * lazy-expiring as needed.
392 item
*item_get(const char *key
, const size_t nkey
) {
395 hv
= hash(key
, nkey
, 0);
397 it
= do_item_get(key
, nkey
, hv
);
402 item
*item_touch(const char *key
, size_t nkey
, uint32_t exptime
) {
405 hv
= hash(key
, nkey
, 0);
407 it
= do_item_touch(key
, nkey
, exptime
, hv
);
413 * Links an item into the LRU and hashtable.
415 int item_link(item
*item
) {
419 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
421 ret
= do_item_link(item
, hv
);
427 * Decrements the reference count on an item and adds it to the freelist if
430 void item_remove(item
*item
) {
432 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
435 do_item_remove(item
);
440 * Replaces one item with another in the hashtable.
441 * Unprotected by a mutex lock since the core server does not require
442 * it to be thread-safe.
444 int item_replace(item
*old_it
, item
*new_it
, const uint32_t hv
) {
445 return do_item_replace(old_it
, new_it
, hv
);
449 * Unlinks an item from the LRU and hashtable.
451 void item_unlink(item
*item
) {
453 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
455 do_item_unlink(item
, hv
);
460 * Moves an item to the back of the LRU queue.
462 void item_update(item
*item
) {
464 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
467 do_item_update(item
);
472 * Does arithmetic on a numeric item value.
474 enum delta_result_type
add_delta(conn
*c
, const char *key
,
475 const size_t nkey
, int incr
,
476 const int64_t delta
, char *buf
,
478 enum delta_result_type ret
;
481 hv
= hash(key
, nkey
, 0);
483 ret
= do_add_delta(c
, key
, nkey
, incr
, delta
, buf
, cas
, hv
);
489 * Stores an item in the cache (high level, obeys set/add/replace semantics)
491 enum store_item_type
store_item(item
*item
, int comm
, conn
* c
) {
492 enum store_item_type ret
;
495 hv
= hash(ITEM_key(item
), item
->nkey
, 0);
497 ret
= do_store_item(item
, comm
, c
, hv
);
503 * Flushes expired items after a flush_all call
505 void item_flush_expired() {
506 mutex_lock(&cache_lock
);
507 do_item_flush_expired();
508 mutex_unlock(&cache_lock
);
512 * Dumps part of the cache
514 char *item_cachedump(unsigned int slabs_clsid
, unsigned int limit
, unsigned int *bytes
) {
517 mutex_lock(&cache_lock
);
518 ret
= do_item_cachedump(slabs_clsid
, limit
, bytes
);
519 mutex_unlock(&cache_lock
);
524 * Dumps statistics about slab classes
526 void item_stats(ADD_STAT add_stats
, void *c
) {
527 mutex_lock(&cache_lock
);
528 do_item_stats(add_stats
, c
);
529 mutex_unlock(&cache_lock
);
533 * Dumps a list of objects of each size in 32-byte increments
535 void item_stats_sizes(ADD_STAT add_stats
, void *c
) {
536 mutex_lock(&cache_lock
);
537 do_item_stats_sizes(add_stats
, c
);
538 mutex_unlock(&cache_lock
);
541 /******************************* GLOBAL STATS ******************************/
544 pthread_mutex_lock(&stats_lock
);
547 void STATS_UNLOCK() {
548 pthread_mutex_unlock(&stats_lock
);
551 void threadlocal_stats_reset(void) {
553 for (ii
= 0; ii
< settings
.num_threads
; ++ii
) {
554 pthread_mutex_lock(&threads
[ii
].stats
.mutex
);
556 threads
[ii
].stats
.get_cmds
= 0;
557 threads
[ii
].stats
.get_misses
= 0;
558 threads
[ii
].stats
.touch_cmds
= 0;
559 threads
[ii
].stats
.touch_misses
= 0;
560 threads
[ii
].stats
.delete_misses
= 0;
561 threads
[ii
].stats
.incr_misses
= 0;
562 threads
[ii
].stats
.decr_misses
= 0;
563 threads
[ii
].stats
.cas_misses
= 0;
564 threads
[ii
].stats
.bytes_read
= 0;
565 threads
[ii
].stats
.bytes_written
= 0;
566 threads
[ii
].stats
.flush_cmds
= 0;
567 threads
[ii
].stats
.conn_yields
= 0;
568 threads
[ii
].stats
.auth_cmds
= 0;
569 threads
[ii
].stats
.auth_errors
= 0;
571 for(sid
= 0; sid
< MAX_NUMBER_OF_SLAB_CLASSES
; sid
++) {
572 threads
[ii
].stats
.slab_stats
[sid
].set_cmds
= 0;
573 threads
[ii
].stats
.slab_stats
[sid
].get_hits
= 0;
574 threads
[ii
].stats
.slab_stats
[sid
].touch_hits
= 0;
575 threads
[ii
].stats
.slab_stats
[sid
].delete_hits
= 0;
576 threads
[ii
].stats
.slab_stats
[sid
].incr_hits
= 0;
577 threads
[ii
].stats
.slab_stats
[sid
].decr_hits
= 0;
578 threads
[ii
].stats
.slab_stats
[sid
].cas_hits
= 0;
579 threads
[ii
].stats
.slab_stats
[sid
].cas_badval
= 0;
582 pthread_mutex_unlock(&threads
[ii
].stats
.mutex
);
586 void threadlocal_stats_aggregate(struct thread_stats
*stats
) {
589 /* The struct has a mutex, but we can safely set the whole thing
590 * to zero since it is unused when aggregating. */
591 memset(stats
, 0, sizeof(*stats
));
593 for (ii
= 0; ii
< settings
.num_threads
; ++ii
) {
594 pthread_mutex_lock(&threads
[ii
].stats
.mutex
);
596 stats
->get_cmds
+= threads
[ii
].stats
.get_cmds
;
597 stats
->get_misses
+= threads
[ii
].stats
.get_misses
;
598 stats
->touch_cmds
+= threads
[ii
].stats
.touch_cmds
;
599 stats
->touch_misses
+= threads
[ii
].stats
.touch_misses
;
600 stats
->delete_misses
+= threads
[ii
].stats
.delete_misses
;
601 stats
->decr_misses
+= threads
[ii
].stats
.decr_misses
;
602 stats
->incr_misses
+= threads
[ii
].stats
.incr_misses
;
603 stats
->cas_misses
+= threads
[ii
].stats
.cas_misses
;
604 stats
->bytes_read
+= threads
[ii
].stats
.bytes_read
;
605 stats
->bytes_written
+= threads
[ii
].stats
.bytes_written
;
606 stats
->flush_cmds
+= threads
[ii
].stats
.flush_cmds
;
607 stats
->conn_yields
+= threads
[ii
].stats
.conn_yields
;
608 stats
->auth_cmds
+= threads
[ii
].stats
.auth_cmds
;
609 stats
->auth_errors
+= threads
[ii
].stats
.auth_errors
;
611 for (sid
= 0; sid
< MAX_NUMBER_OF_SLAB_CLASSES
; sid
++) {
612 stats
->slab_stats
[sid
].set_cmds
+=
613 threads
[ii
].stats
.slab_stats
[sid
].set_cmds
;
614 stats
->slab_stats
[sid
].get_hits
+=
615 threads
[ii
].stats
.slab_stats
[sid
].get_hits
;
616 stats
->slab_stats
[sid
].touch_hits
+=
617 threads
[ii
].stats
.slab_stats
[sid
].touch_hits
;
618 stats
->slab_stats
[sid
].delete_hits
+=
619 threads
[ii
].stats
.slab_stats
[sid
].delete_hits
;
620 stats
->slab_stats
[sid
].decr_hits
+=
621 threads
[ii
].stats
.slab_stats
[sid
].decr_hits
;
622 stats
->slab_stats
[sid
].incr_hits
+=
623 threads
[ii
].stats
.slab_stats
[sid
].incr_hits
;
624 stats
->slab_stats
[sid
].cas_hits
+=
625 threads
[ii
].stats
.slab_stats
[sid
].cas_hits
;
626 stats
->slab_stats
[sid
].cas_badval
+=
627 threads
[ii
].stats
.slab_stats
[sid
].cas_badval
;
630 pthread_mutex_unlock(&threads
[ii
].stats
.mutex
);
634 void slab_stats_aggregate(struct thread_stats
*stats
, struct slab_stats
*out
) {
640 out
->delete_hits
= 0;
646 for (sid
= 0; sid
< MAX_NUMBER_OF_SLAB_CLASSES
; sid
++) {
647 out
->set_cmds
+= stats
->slab_stats
[sid
].set_cmds
;
648 out
->get_hits
+= stats
->slab_stats
[sid
].get_hits
;
649 out
->touch_hits
+= stats
->slab_stats
[sid
].touch_hits
;
650 out
->delete_hits
+= stats
->slab_stats
[sid
].delete_hits
;
651 out
->decr_hits
+= stats
->slab_stats
[sid
].decr_hits
;
652 out
->incr_hits
+= stats
->slab_stats
[sid
].incr_hits
;
653 out
->cas_hits
+= stats
->slab_stats
[sid
].cas_hits
;
654 out
->cas_badval
+= stats
->slab_stats
[sid
].cas_badval
;
659 * Initializes the thread subsystem, creating various worker threads.
661 * nthreads Number of worker event handler threads to spawn
662 * main_base Event base for main thread
664 void thread_init(int nthreads
, struct event_base
*main_base
) {
668 pthread_mutex_init(&cache_lock
, NULL
);
669 pthread_mutex_init(&stats_lock
, NULL
);
671 pthread_mutex_init(&init_lock
, NULL
);
672 pthread_cond_init(&init_cond
, NULL
);
674 pthread_mutex_init(&cqi_freelist_lock
, NULL
);
677 /* Want a wide lock table, but don't waste memory */
680 } else if (nthreads
< 4) {
682 } else if (nthreads
< 5) {
685 /* 8192 buckets, and central locks don't scale much past 5 threads */
689 item_lock_count
= ((unsigned long int)1 << (power
));
690 item_lock_mask
= item_lock_count
- 1;
692 item_locks
= calloc(item_lock_count
, sizeof(pthread_mutex_t
));
694 perror("Can't allocate item locks");
697 for (i
= 0; i
< item_lock_count
; i
++) {
698 pthread_mutex_init(&item_locks
[i
], NULL
);
701 threads
= calloc(nthreads
, sizeof(LIBEVENT_THREAD
));
703 perror("Can't allocate thread descriptors");
707 dispatcher_thread
.base
= main_base
;
708 dispatcher_thread
.thread_id
= pthread_self();
710 for (i
= 0; i
< nthreads
; i
++) {
713 perror("Can't create notify pipe");
717 threads
[i
].notify_receive_fd
= fds
[0];
718 threads
[i
].notify_send_fd
= fds
[1];
720 setup_thread(&threads
[i
]);
721 /* Reserve three fds for the libevent base, and two for the pipe */
722 stats
.reserved_fds
+= 5;
725 /* Create threads after we've done all the libevent setup. */
726 for (i
= 0; i
< nthreads
; i
++) {
727 create_worker(worker_libevent
, &threads
[i
]);
730 /* Wait for all the threads to set themselves up before returning. */
731 pthread_mutex_lock(&init_lock
);
732 while (init_count
< nthreads
) {
733 pthread_cond_wait(&init_cond
, &init_lock
);
735 pthread_mutex_unlock(&init_lock
);