1d960cfa758806da97e5250ef4575c5380a754eb
[m6w6/libmemcached] / contrib / bin / memaslap / ms_thread.c
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #if defined(HAVE_SYS_TIME_H)
19 # include <sys/time.h>
20 #endif
21
22 #if defined(HAVE_TIME_H)
23 # include <time.h>
24 #endif
25
26 #include "ms_thread.h"
27 #include "ms_setting.h"
28 #include "ms_atomic.h"
29
30 /* global variable */
31 pthread_key_t ms_thread_key;
32
33 /* array of thread context structure, each thread has a thread context structure */
34 static ms_thread_ctx_t *ms_thread_ctx;
35
36 /* functions */
37 static void ms_set_current_time(void);
38 static void ms_check_sock_timeout(void);
39 static void ms_clock_handler(const int fd, const short which, void *arg);
40 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu);
41 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
42 static void *ms_worker_libevent(void *arg);
43 static void ms_create_worker(void *(*func)(void *), ms_thread_ctx_t *arg);
44
45 /**
46 * time-sensitive callers can call it by hand with this,
47 * outside the normal ever-1-second timer
48 */
49 static void ms_set_current_time() {
50 struct timeval timer;
51 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
52
53 gettimeofday(&timer, NULL);
54 ms_thread->curr_time = (rel_time_t) timer.tv_sec;
55 } /* ms_set_current_time */
56
57 /**
58 * used to check whether UDP of command are waiting timeout
59 * by the ever-1-second timer
60 */
61 static void ms_check_sock_timeout(void) {
62 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
63 ms_conn_t *c = NULL;
64 int time_diff = 0;
65
66 for (uint32_t i = 0; i < ms_thread->thread_ctx->nconns; i++) {
67 c = &ms_thread->conn[i];
68
69 if (c->udp) {
70 time_diff = (int) (ms_thread->curr_time - (rel_time_t) c->start_time.tv_sec);
71
72 /* wait time out */
73 if (time_diff > SOCK_WAIT_TIMEOUT) {
74 /* calculate dropped packets count */
75 if (c->recvpkt > 0) {
76 atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt);
77 }
78
79 atomic_add_size(&ms_stats.udp_timeout, 1);
80 ms_reset_conn(c, true);
81 }
82 }
83 }
84 } /* ms_check_sock_timeout */
85
86 /* if disconnect, the ever-1-second timer will call this function to reconnect */
87 static void ms_reconn_thread_socks(void) {
88 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
89 for (uint32_t i = 0; i < ms_thread->thread_ctx->nconns; i++) {
90 ms_reconn_socks(&ms_thread->conn[i]);
91 }
92 } /* ms_reconn_thread_socks */
93
94 /**
95 * the handler of the ever-1-second timer
96 *
97 * @param fd, the descriptors of the socket
98 * @param which, event flags
99 * @param arg, argument
100 */
101 static void ms_clock_handler(const int fd, const short which, void *arg) {
102 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
103 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
104
105 UNUSED_ARGUMENT(fd);
106 UNUSED_ARGUMENT(which);
107 UNUSED_ARGUMENT(arg);
108
109 ms_set_current_time();
110
111 if (ms_thread->initialized) {
112 /* only delete the event if it's actually there. */
113 evtimer_del(&ms_thread->clock_event);
114 ms_check_sock_timeout();
115 } else {
116 ms_thread->initialized = true;
117 }
118
119 ms_reconn_thread_socks();
120
121 evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
122 event_base_set(ms_thread->base, &ms_thread->clock_event);
123 evtimer_add(&ms_thread->clock_event, &t);
124 } /* ms_clock_handler */
125
126 /**
127 * used to bind thread to CPU if the system supports
128 *
129 * @param cpu, cpu index
130 *
131 * @return if success, return EXIT_SUCCESS, else return -1
132 */
133 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu) {
134 uint32_t ret = 0;
135
136 #ifdef HAVE_CPU_SET_T
137 cpu_set_t cpu_set;
138 CPU_ZERO(&cpu_set);
139 CPU_SET(cpu, &cpu_set);
140
141 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1) {
142 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
143 ret = 1;
144 }
145 #else
146 UNUSED_ARGUMENT(cpu);
147 #endif
148
149 return ret;
150 } /* ms_set_thread_cpu_affinity */
151
152 /**
153 * Set up a thread's information.
154 *
155 * @param thread_ctx, pointer of the thread context structure
156 *
157 * @return if success, return EXIT_SUCCESS, else return -1
158 */
159 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) {
160 ms_thread_t *ms_thread = (ms_thread_t *) calloc(sizeof(*ms_thread), 1);
161 pthread_setspecific(ms_thread_key, (void *) ms_thread);
162
163 ms_thread->thread_ctx = thread_ctx;
164 ms_thread->nactive_conn = thread_ctx->nconns;
165 ms_thread->initialized = false;
166 static ATOMIC uint32_t cnt = 0;
167
168 gettimeofday(&ms_thread->startup_time, NULL);
169
170 ms_thread->base = event_base_new();
171 if (ms_thread->base == NULL) {
172 if (atomic_add_32_nv(&cnt, 1) == 0) {
173 fprintf(stderr, "Can't allocate event base.\n");
174 }
175
176 return -1;
177 }
178
179 ms_thread->conn = (ms_conn_t *) malloc((size_t) thread_ctx->nconns * sizeof(ms_conn_t));
180 if (ms_thread->conn == NULL) {
181 if (atomic_add_32_nv(&cnt, 1) == 0) {
182 fprintf(stderr, "Can't allocate concurrency structure for thread descriptors.");
183 }
184
185 return -1;
186 }
187 memset(ms_thread->conn, 0, (size_t) thread_ctx->nconns * sizeof(ms_conn_t));
188
189 for (uint32_t i = 0; i < thread_ctx->nconns; i++) {
190 ms_thread->conn[i].conn_idx = i;
191 if (ms_setup_conn(&ms_thread->conn[i])) {
192 /* only output this error once */
193 if (atomic_add_32_nv(&cnt, 1) == 0) {
194 fprintf(stderr, "Initializing connection failed.\n");
195 }
196
197 return -1;
198 }
199 }
200
201 return EXIT_SUCCESS;
202 } /* ms_setup_thread */
203
204 /**
205 * Worker thread: main event loop
206 *
207 * @param arg, the pointer of argument
208 *
209 * @return void*
210 */
211 static void *ms_worker_libevent(void *arg) {
212 ms_thread_t *ms_thread = NULL;
213 ms_thread_ctx_t *thread_ctx = (ms_thread_ctx_t *) arg;
214
215 /**
216 * If system has more than one cpu and supports set cpu
217 * affinity, try to bind each thread to a cpu core;
218 */
219 if (ms_setting.ncpu > 1) {
220 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
221 }
222
223 if (ms_setup_thread(thread_ctx)) {
224 exit(1);
225 }
226
227 /* each thread with a timer */
228 ms_clock_handler(0, 0, 0);
229
230 pthread_mutex_lock(&ms_global.init_lock.lock);
231 ms_global.init_lock.count++;
232 pthread_cond_signal(&ms_global.init_lock.cond);
233 pthread_mutex_unlock(&ms_global.init_lock.lock);
234
235 ms_thread = pthread_getspecific(ms_thread_key);
236 event_base_loop(ms_thread->base, 0);
237 event_base_free(ms_thread->base);
238 free(ms_thread);
239
240 return NULL;
241 } /* ms_worker_libevent */
242
243 /**
244 * Creates a worker thread.
245 *
246 * @param func, the callback function
247 * @param arg, the argument to pass to the callback function
248 */
249 static void ms_create_worker(void *(*func)(void *), ms_thread_ctx_t *arg) {
250 pthread_attr_t attr;
251 int ret;
252
253 pthread_attr_init(&attr);
254
255 if ((ret = pthread_create(&arg->pth_id, &attr, func, arg))) {
256 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
257 exit(1);
258 }
259 } /* ms_create_worker */
260
261 /* initialize threads */
262 void ms_thread_init() {
263 ms_thread_ctx =
264 (ms_thread_ctx_t *) malloc(sizeof(ms_thread_ctx_t) * (size_t) ms_setting.nthreads);
265 if (ms_thread_ctx == NULL) {
266 fprintf(stderr, "Can't allocate thread descriptors.");
267 exit(1);
268 }
269
270 for (uint32_t i = 0; i < ms_setting.nthreads; i++) {
271 ms_thread_ctx[i].thd_idx = i;
272 ms_thread_ctx[i].nconns = ms_setting.nconns / ms_setting.nthreads;
273
274 /**
275 * If only one server, all the connections in all threads
276 * connects the same server. For support multi-servers, simple
277 * distribute thread to server.
278 */
279 ms_thread_ctx[i].srv_idx = i % ms_setting.srv_cnt;
280 ms_thread_ctx[i].tps_perconn = ms_setting.expected_tps / (int) ms_setting.nconns;
281 ms_thread_ctx[i].exec_num_perconn = ms_setting.exec_num / ms_setting.nconns;
282 }
283
284 if (pthread_key_create(&ms_thread_key, NULL)) {
285 fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
286 exit(1);
287 }
288 /* Create threads after we've done all the epoll setup. */
289 for (uint32_t i = 0; i < ms_setting.nthreads; i++) {
290 ms_create_worker(ms_worker_libevent, &ms_thread_ctx[i]);
291 }
292 } /* ms_thread_init */
293
294 /* cleanup some resource of threads when all the threads exit */
295 void ms_thread_cleanup() {
296 for (uint32_t i = 0; i < ms_setting.nthreads; i++) {
297 pthread_join(ms_thread_ctx[i].pth_id, NULL);
298 }
299 if (ms_thread_ctx) {
300 free(ms_thread_ctx);
301 }
302 pthread_key_delete(ms_thread_key);
303 } /* ms_thread_cleanup */