2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
16 #include "mem_config.h"
18 #if defined(HAVE_SYS_TIME_H)
19 # include <sys/time.h>
23 #include "ms_thread.h"
24 #include "ms_setting.h"
25 #include "ms_atomic.h"
28 pthread_key_t ms_thread_key
;
30 /* array of thread context structure, each thread has a thread context structure */
31 static ms_thread_ctx_t
*ms_thread_ctx
;
34 static void ms_set_current_time(void);
35 static void ms_check_sock_timeout(void);
36 static void ms_clock_handler(const int fd
, const short which
, void *arg
);
37 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu
);
38 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
);
39 static void *ms_worker_libevent(void *arg
);
40 static void ms_create_worker(void *(*func
)(void *), ms_thread_ctx_t
*arg
);
43 * time-sensitive callers can call it by hand with this,
44 * outside the normal ever-1-second timer
46 static void ms_set_current_time() {
48 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
50 gettimeofday(&timer
, NULL
);
51 ms_thread
->curr_time
= (rel_time_t
) timer
.tv_sec
;
52 } /* ms_set_current_time */
55 * used to check whether UDP of command are waiting timeout
56 * by the ever-1-second timer
58 static void ms_check_sock_timeout(void) {
59 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
63 for (uint32_t i
= 0; i
< ms_thread
->thread_ctx
->nconns
; i
++) {
64 c
= &ms_thread
->conn
[i
];
67 time_diff
= (int) (ms_thread
->curr_time
- (rel_time_t
) c
->start_time
.tv_sec
);
70 if (time_diff
> SOCK_WAIT_TIMEOUT
) {
71 /* calculate dropped packets count */
73 atomic_add_size(&ms_stats
.pkt_drop
, c
->packets
- c
->recvpkt
);
76 atomic_add_size(&ms_stats
.udp_timeout
, 1);
77 ms_reset_conn(c
, true);
81 } /* ms_check_sock_timeout */
83 /* if disconnect, the ever-1-second timer will call this function to reconnect */
84 static void ms_reconn_thread_socks(void) {
85 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
86 for (uint32_t i
= 0; i
< ms_thread
->thread_ctx
->nconns
; i
++) {
87 ms_reconn_socks(&ms_thread
->conn
[i
]);
89 } /* ms_reconn_thread_socks */
92 * the handler of the ever-1-second timer
94 * @param fd, the descriptors of the socket
95 * @param which, event flags
96 * @param arg, argument
98 static void ms_clock_handler(const int fd
, const short which
, void *arg
) {
99 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
100 struct timeval t
= {.tv_sec
= 1, .tv_usec
= 0};
103 UNUSED_ARGUMENT(which
);
104 UNUSED_ARGUMENT(arg
);
106 ms_set_current_time();
108 if (ms_thread
->initialized
) {
109 /* only delete the event if it's actually there. */
110 evtimer_del(&ms_thread
->clock_event
);
111 ms_check_sock_timeout();
113 ms_thread
->initialized
= true;
116 ms_reconn_thread_socks();
118 evtimer_set(&ms_thread
->clock_event
, ms_clock_handler
, 0);
119 event_base_set(ms_thread
->base
, &ms_thread
->clock_event
);
120 evtimer_add(&ms_thread
->clock_event
, &t
);
121 } /* ms_clock_handler */
124 * used to bind thread to CPU if the system supports
126 * @param cpu, cpu index
128 * @return if success, return EXIT_SUCCESS, else return -1
130 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu
) {
133 #ifdef HAVE_CPU_SET_T
136 CPU_SET(cpu
, &cpu_set
);
138 if (sched_setaffinity(0, sizeof(cpu_set_t
), &cpu_set
) == -1) {
139 fprintf(stderr
, "WARNING: Could not set CPU Affinity, continuing...\n");
143 UNUSED_ARGUMENT(cpu
);
147 } /* ms_set_thread_cpu_affinity */
150 * Set up a thread's information.
152 * @param thread_ctx, pointer of the thread context structure
154 * @return if success, return EXIT_SUCCESS, else return -1
156 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
) {
157 ms_thread_t
*ms_thread
= (ms_thread_t
*) calloc(sizeof(*ms_thread
), 1);
158 pthread_setspecific(ms_thread_key
, (void *) ms_thread
);
160 ms_thread
->thread_ctx
= thread_ctx
;
161 ms_thread
->nactive_conn
= thread_ctx
->nconns
;
162 ms_thread
->initialized
= false;
163 static ATOMIC
uint32_t cnt
= 0;
165 gettimeofday(&ms_thread
->startup_time
, NULL
);
167 ms_thread
->base
= event_base_new();
168 if (ms_thread
->base
== NULL
) {
169 if (atomic_add_32_nv(&cnt
, 1) == 0) {
170 fprintf(stderr
, "Can't allocate event base.\n");
176 ms_thread
->conn
= (ms_conn_t
*) malloc((size_t) thread_ctx
->nconns
* sizeof(ms_conn_t
));
177 if (ms_thread
->conn
== NULL
) {
178 if (atomic_add_32_nv(&cnt
, 1) == 0) {
179 fprintf(stderr
, "Can't allocate concurrency structure for thread descriptors.");
184 memset(ms_thread
->conn
, 0, (size_t) thread_ctx
->nconns
* sizeof(ms_conn_t
));
186 for (uint32_t i
= 0; i
< thread_ctx
->nconns
; i
++) {
187 ms_thread
->conn
[i
].conn_idx
= i
;
188 if (ms_setup_conn(&ms_thread
->conn
[i
])) {
189 /* only output this error once */
190 if (atomic_add_32_nv(&cnt
, 1) == 0) {
191 fprintf(stderr
, "Initializing connection failed.\n");
199 } /* ms_setup_thread */
202 * Worker thread: main event loop
204 * @param arg, the pointer of argument
208 static void *ms_worker_libevent(void *arg
) {
209 ms_thread_t
*ms_thread
= NULL
;
210 ms_thread_ctx_t
*thread_ctx
= (ms_thread_ctx_t
*) arg
;
213 * If system has more than one cpu and supports set cpu
214 * affinity, try to bind each thread to a cpu core;
216 if (ms_setting
.ncpu
> 1) {
217 ms_set_thread_cpu_affinity(thread_ctx
->thd_idx
% ms_setting
.ncpu
);
220 if (ms_setup_thread(thread_ctx
)) {
224 /* each thread with a timer */
225 ms_clock_handler(0, 0, 0);
227 pthread_mutex_lock(&ms_global
.init_lock
.lock
);
228 ms_global
.init_lock
.count
++;
229 pthread_cond_signal(&ms_global
.init_lock
.cond
);
230 pthread_mutex_unlock(&ms_global
.init_lock
.lock
);
232 ms_thread
= pthread_getspecific(ms_thread_key
);
233 event_base_loop(ms_thread
->base
, 0);
234 event_base_free(ms_thread
->base
);
238 } /* ms_worker_libevent */
241 * Creates a worker thread.
243 * @param func, the callback function
244 * @param arg, the argument to pass to the callback function
246 static void ms_create_worker(void *(*func
)(void *), ms_thread_ctx_t
*arg
) {
250 pthread_attr_init(&attr
);
252 if ((ret
= pthread_create(&arg
->pth_id
, &attr
, func
, arg
))) {
253 fprintf(stderr
, "Can't create thread: %s.\n", strerror(ret
));
256 } /* ms_create_worker */
258 /* initialize threads */
259 void ms_thread_init() {
261 (ms_thread_ctx_t
*) malloc(sizeof(ms_thread_ctx_t
) * (size_t) ms_setting
.nthreads
);
262 if (ms_thread_ctx
== NULL
) {
263 fprintf(stderr
, "Can't allocate thread descriptors.");
267 for (uint32_t i
= 0; i
< ms_setting
.nthreads
; i
++) {
268 ms_thread_ctx
[i
].thd_idx
= i
;
269 ms_thread_ctx
[i
].nconns
= ms_setting
.nconns
/ ms_setting
.nthreads
;
272 * If only one server, all the connections in all threads
273 * connects the same server. For support multi-servers, simple
274 * distribute thread to server.
276 ms_thread_ctx
[i
].srv_idx
= i
% ms_setting
.srv_cnt
;
277 ms_thread_ctx
[i
].tps_perconn
= ms_setting
.expected_tps
/ (int) ms_setting
.nconns
;
278 ms_thread_ctx
[i
].exec_num_perconn
= ms_setting
.exec_num
/ ms_setting
.nconns
;
281 if (pthread_key_create(&ms_thread_key
, NULL
)) {
282 fprintf(stderr
, "Can't create pthread keys. Major malfunction!\n");
285 /* Create threads after we've done all the epoll setup. */
286 for (uint32_t i
= 0; i
< ms_setting
.nthreads
; i
++) {
287 ms_create_worker(ms_worker_libevent
, &ms_thread_ctx
[i
]);
289 } /* ms_thread_init */
291 /* cleanup some resource of threads when all the threads exit */
292 void ms_thread_cleanup() {
293 for (uint32_t i
= 0; i
< ms_setting
.nthreads
; i
++) {
294 pthread_join(ms_thread_ctx
[i
].pth_id
, NULL
);
299 pthread_key_delete(ms_thread_key
);
300 } /* ms_thread_cleanup */