2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
16 #include "mem_config.h"
18 #if defined(HAVE_SYS_TIME_H)
19 # include <sys/time.h>
22 #if defined(HAVE_TIME_H)
26 #include "ms_thread.h"
27 #include "ms_setting.h"
28 #include "ms_atomic.h"
31 pthread_key_t ms_thread_key
;
33 /* array of thread context structure, each thread has a thread context structure */
34 static ms_thread_ctx_t
*ms_thread_ctx
;
37 static void ms_set_current_time(void);
38 static void ms_check_sock_timeout(void);
39 static void ms_clock_handler(const int fd
, const short which
, void *arg
);
40 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu
);
41 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
);
42 static void *ms_worker_libevent(void *arg
);
43 static void ms_create_worker(void *(*func
)(void *), void *arg
);
46 * time-sensitive callers can call it by hand with this,
47 * outside the normal ever-1-second timer
49 static void ms_set_current_time() {
51 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
53 gettimeofday(&timer
, NULL
);
54 ms_thread
->curr_time
= (rel_time_t
) timer
.tv_sec
;
55 } /* ms_set_current_time */
58 * used to check whether UDP of command are waiting timeout
59 * by the ever-1-second timer
61 static void ms_check_sock_timeout(void) {
62 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
66 for (uint32_t i
= 0; i
< ms_thread
->thread_ctx
->nconns
; i
++) {
67 c
= &ms_thread
->conn
[i
];
70 time_diff
= (int) (ms_thread
->curr_time
- (rel_time_t
) c
->start_time
.tv_sec
);
73 if (time_diff
> SOCK_WAIT_TIMEOUT
) {
74 /* calculate dropped packets count */
76 atomic_add_size(&ms_stats
.pkt_drop
, c
->packets
- c
->recvpkt
);
79 atomic_add_size(&ms_stats
.udp_timeout
, 1);
80 ms_reset_conn(c
, true);
84 } /* ms_check_sock_timeout */
86 /* if disconnect, the ever-1-second timer will call this function to reconnect */
87 static void ms_reconn_thread_socks(void) {
88 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
89 for (uint32_t i
= 0; i
< ms_thread
->thread_ctx
->nconns
; i
++) {
90 ms_reconn_socks(&ms_thread
->conn
[i
]);
92 } /* ms_reconn_thread_socks */
95 * the handler of the ever-1-second timer
97 * @param fd, the descriptors of the socket
98 * @param which, event flags
99 * @param arg, argument
101 static void ms_clock_handler(const int fd
, const short which
, void *arg
) {
102 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
103 struct timeval t
= {.tv_sec
= 1, .tv_usec
= 0};
106 UNUSED_ARGUMENT(which
);
107 UNUSED_ARGUMENT(arg
);
109 ms_set_current_time();
111 if (ms_thread
->initialized
) {
112 /* only delete the event if it's actually there. */
113 evtimer_del(&ms_thread
->clock_event
);
114 ms_check_sock_timeout();
116 ms_thread
->initialized
= true;
119 ms_reconn_thread_socks();
121 evtimer_set(&ms_thread
->clock_event
, ms_clock_handler
, 0);
122 event_base_set(ms_thread
->base
, &ms_thread
->clock_event
);
123 evtimer_add(&ms_thread
->clock_event
, &t
);
124 } /* ms_clock_handler */
127 * used to bind thread to CPU if the system supports
129 * @param cpu, cpu index
131 * @return if success, return EXIT_SUCCESS, else return -1
133 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu
) {
136 #ifdef HAVE_CPU_SET_T
139 CPU_SET(cpu
, &cpu_set
);
141 if (sched_setaffinity(0, sizeof(cpu_set_t
), &cpu_set
) == -1) {
142 fprintf(stderr
, "WARNING: Could not set CPU Affinity, continuing...\n");
146 UNUSED_ARGUMENT(cpu
);
150 } /* ms_set_thread_cpu_affinity */
153 * Set up a thread's information.
155 * @param thread_ctx, pointer of the thread context structure
157 * @return if success, return EXIT_SUCCESS, else return -1
159 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
) {
160 ms_thread_t
*ms_thread
= (ms_thread_t
*) calloc(sizeof(*ms_thread
), 1);
161 pthread_setspecific(ms_thread_key
, (void *) ms_thread
);
163 ms_thread
->thread_ctx
= thread_ctx
;
164 ms_thread
->nactive_conn
= thread_ctx
->nconns
;
165 ms_thread
->initialized
= false;
166 static ATOMIC
uint32_t cnt
= 0;
168 gettimeofday(&ms_thread
->startup_time
, NULL
);
170 ms_thread
->base
= event_init();
171 if (ms_thread
->base
== NULL
) {
172 if (atomic_add_32_nv(&cnt
, 1) == 0) {
173 fprintf(stderr
, "Can't allocate event base.\n");
179 ms_thread
->conn
= (ms_conn_t
*) malloc((size_t) thread_ctx
->nconns
* sizeof(ms_conn_t
));
180 if (ms_thread
->conn
== NULL
) {
181 if (atomic_add_32_nv(&cnt
, 1) == 0) {
182 fprintf(stderr
, "Can't allocate concurrency structure for thread descriptors.");
187 memset(ms_thread
->conn
, 0, (size_t) thread_ctx
->nconns
* sizeof(ms_conn_t
));
189 for (uint32_t i
= 0; i
< thread_ctx
->nconns
; i
++) {
190 ms_thread
->conn
[i
].conn_idx
= i
;
191 if (ms_setup_conn(&ms_thread
->conn
[i
]) != 0) {
192 /* only output this error once */
193 if (atomic_add_32_nv(&cnt
, 1) == 0) {
194 fprintf(stderr
, "Initializing connection failed.\n");
202 } /* ms_setup_thread */
205 * Worker thread: main event loop
207 * @param arg, the pointer of argument
211 static void *ms_worker_libevent(void *arg
) {
212 ms_thread_t
*ms_thread
= NULL
;
213 ms_thread_ctx_t
*thread_ctx
= (ms_thread_ctx_t
*) arg
;
216 * If system has more than one cpu and supports set cpu
217 * affinity, try to bind each thread to a cpu core;
219 if (ms_setting
.ncpu
> 1) {
220 ms_set_thread_cpu_affinity(thread_ctx
->thd_idx
% ms_setting
.ncpu
);
223 if (ms_setup_thread(thread_ctx
) != 0) {
227 /* each thread with a timer */
228 ms_clock_handler(0, 0, 0);
230 pthread_mutex_lock(&ms_global
.init_lock
.lock
);
231 ms_global
.init_lock
.count
++;
232 pthread_cond_signal(&ms_global
.init_lock
.cond
);
233 pthread_mutex_unlock(&ms_global
.init_lock
.lock
);
235 ms_thread
= pthread_getspecific(ms_thread_key
);
236 event_base_loop(ms_thread
->base
, 0);
239 } /* ms_worker_libevent */
242 * Creates a worker thread.
244 * @param func, the callback function
245 * @param arg, the argument to pass to the callback function
247 static void ms_create_worker(void *(*func
)(void *), void *arg
) {
252 pthread_attr_init(&attr
);
254 if ((ret
= pthread_create(&thread
, &attr
, func
, arg
)) != 0) {
255 fprintf(stderr
, "Can't create thread: %s.\n", strerror(ret
));
258 } /* ms_create_worker */
260 /* initialize threads */
261 void ms_thread_init() {
263 (ms_thread_ctx_t
*) malloc(sizeof(ms_thread_ctx_t
) * (size_t) ms_setting
.nthreads
);
264 if (ms_thread_ctx
== NULL
) {
265 fprintf(stderr
, "Can't allocate thread descriptors.");
269 for (uint32_t i
= 0; i
< ms_setting
.nthreads
; i
++) {
270 ms_thread_ctx
[i
].thd_idx
= i
;
271 ms_thread_ctx
[i
].nconns
= ms_setting
.nconns
/ ms_setting
.nthreads
;
274 * If only one server, all the connections in all threads
275 * connects the same server. For support multi-servers, simple
276 * distribute thread to server.
278 ms_thread_ctx
[i
].srv_idx
= i
% ms_setting
.srv_cnt
;
279 ms_thread_ctx
[i
].tps_perconn
= ms_setting
.expected_tps
/ (int) ms_setting
.nconns
;
280 ms_thread_ctx
[i
].exec_num_perconn
= ms_setting
.exec_num
/ ms_setting
.nconns
;
283 if (pthread_key_create(&ms_thread_key
, NULL
)) {
284 fprintf(stderr
, "Can't create pthread keys. Major malfunction!\n");
287 /* Create threads after we've done all the epoll setup. */
288 for (uint32_t i
= 0; i
< ms_setting
.nthreads
; i
++) {
289 ms_create_worker(ms_worker_libevent
, (void *) &ms_thread_ctx
[i
]);
291 } /* ms_thread_init */
293 /* cleanup some resource of threads when all the threads exit */
294 void ms_thread_cleanup() {
295 if (ms_thread_ctx
!= NULL
) {
298 pthread_key_delete(ms_thread_key
);
299 } /* ms_thread_cleanup */