WIP
[m6w6/libmemcached] / contrib / bin / memaslap / ms_thread.c
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #if defined(HAVE_SYS_TIME_H)
19 # include <sys/time.h>
20 #endif
21 #include <time.h>
22
23 #include "ms_thread.h"
24 #include "ms_setting.h"
25 #include "ms_atomic.h"
26
27 /* global variable */
28 pthread_key_t ms_thread_key;
29
30 /* array of thread context structure, each thread has a thread context structure */
31 static ms_thread_ctx_t *ms_thread_ctx;
32
33 /* functions */
34 static void ms_set_current_time(void);
35 static void ms_check_sock_timeout(void);
36 static void ms_clock_handler(const int fd, const short which, void *arg);
37 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu);
38 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
39 static void *ms_worker_libevent(void *arg);
40 static void ms_create_worker(void *(*func)(void *), ms_thread_ctx_t *arg);
41
42 /**
43 * time-sensitive callers can call it by hand with this,
44 * outside the normal ever-1-second timer
45 */
46 static void ms_set_current_time() {
47 struct timeval timer;
48 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
49
50 gettimeofday(&timer, NULL);
51 ms_thread->curr_time = (rel_time_t) timer.tv_sec;
52 } /* ms_set_current_time */
53
54 /**
55 * used to check whether UDP of command are waiting timeout
56 * by the ever-1-second timer
57 */
58 static void ms_check_sock_timeout(void) {
59 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
60 ms_conn_t *c = NULL;
61 int time_diff = 0;
62
63 for (uint32_t i = 0; i < ms_thread->thread_ctx->nconns; i++) {
64 c = &ms_thread->conn[i];
65
66 if (c->udp) {
67 time_diff = (int) (ms_thread->curr_time - (rel_time_t) c->start_time.tv_sec);
68
69 /* wait time out */
70 if (time_diff > SOCK_WAIT_TIMEOUT) {
71 /* calculate dropped packets count */
72 if (c->recvpkt > 0) {
73 atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt);
74 }
75
76 atomic_add_size(&ms_stats.udp_timeout, 1);
77 ms_reset_conn(c, true);
78 }
79 }
80 }
81 } /* ms_check_sock_timeout */
82
83 /* if disconnect, the ever-1-second timer will call this function to reconnect */
84 static void ms_reconn_thread_socks(void) {
85 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
86 for (uint32_t i = 0; i < ms_thread->thread_ctx->nconns; i++) {
87 ms_reconn_socks(&ms_thread->conn[i]);
88 }
89 } /* ms_reconn_thread_socks */
90
91 /**
92 * the handler of the ever-1-second timer
93 *
94 * @param fd, the descriptors of the socket
95 * @param which, event flags
96 * @param arg, argument
97 */
98 static void ms_clock_handler(const int fd, const short which, void *arg) {
99 ms_thread_t *ms_thread = pthread_getspecific(ms_thread_key);
100 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
101
102 UNUSED_ARGUMENT(fd);
103 UNUSED_ARGUMENT(which);
104 UNUSED_ARGUMENT(arg);
105
106 ms_set_current_time();
107
108 if (ms_thread->initialized) {
109 /* only delete the event if it's actually there. */
110 evtimer_del(&ms_thread->clock_event);
111 ms_check_sock_timeout();
112 } else {
113 ms_thread->initialized = true;
114 }
115
116 ms_reconn_thread_socks();
117
118 evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
119 event_base_set(ms_thread->base, &ms_thread->clock_event);
120 evtimer_add(&ms_thread->clock_event, &t);
121 } /* ms_clock_handler */
122
123 /**
124 * used to bind thread to CPU if the system supports
125 *
126 * @param cpu, cpu index
127 *
128 * @return if success, return EXIT_SUCCESS, else return -1
129 */
130 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu) {
131 uint32_t ret = 0;
132
133 #ifdef HAVE_CPU_SET_T
134 cpu_set_t cpu_set;
135 CPU_ZERO(&cpu_set);
136 CPU_SET(cpu, &cpu_set);
137
138 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1) {
139 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
140 ret = 1;
141 }
142 #else
143 UNUSED_ARGUMENT(cpu);
144 #endif
145
146 return ret;
147 } /* ms_set_thread_cpu_affinity */
148
149 /**
150 * Set up a thread's information.
151 *
152 * @param thread_ctx, pointer of the thread context structure
153 *
154 * @return if success, return EXIT_SUCCESS, else return -1
155 */
156 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx) {
157 ms_thread_t *ms_thread = (ms_thread_t *) calloc(sizeof(*ms_thread), 1);
158 pthread_setspecific(ms_thread_key, (void *) ms_thread);
159
160 ms_thread->thread_ctx = thread_ctx;
161 ms_thread->nactive_conn = thread_ctx->nconns;
162 ms_thread->initialized = false;
163 static ATOMIC uint32_t cnt = 0;
164
165 gettimeofday(&ms_thread->startup_time, NULL);
166
167 ms_thread->base = event_base_new();
168 if (ms_thread->base == NULL) {
169 if (atomic_add_32_nv(&cnt, 1) == 0) {
170 fprintf(stderr, "Can't allocate event base.\n");
171 }
172
173 return -1;
174 }
175
176 ms_thread->conn = (ms_conn_t *) malloc((size_t) thread_ctx->nconns * sizeof(ms_conn_t));
177 if (ms_thread->conn == NULL) {
178 if (atomic_add_32_nv(&cnt, 1) == 0) {
179 fprintf(stderr, "Can't allocate concurrency structure for thread descriptors.");
180 }
181
182 return -1;
183 }
184 memset(ms_thread->conn, 0, (size_t) thread_ctx->nconns * sizeof(ms_conn_t));
185
186 for (uint32_t i = 0; i < thread_ctx->nconns; i++) {
187 ms_thread->conn[i].conn_idx = i;
188 if (ms_setup_conn(&ms_thread->conn[i])) {
189 /* only output this error once */
190 if (atomic_add_32_nv(&cnt, 1) == 0) {
191 fprintf(stderr, "Initializing connection failed.\n");
192 }
193
194 return -1;
195 }
196 }
197
198 return EXIT_SUCCESS;
199 } /* ms_setup_thread */
200
201 /**
202 * Worker thread: main event loop
203 *
204 * @param arg, the pointer of argument
205 *
206 * @return void*
207 */
208 static void *ms_worker_libevent(void *arg) {
209 ms_thread_t *ms_thread = NULL;
210 ms_thread_ctx_t *thread_ctx = (ms_thread_ctx_t *) arg;
211
212 /**
213 * If system has more than one cpu and supports set cpu
214 * affinity, try to bind each thread to a cpu core;
215 */
216 if (ms_setting.ncpu > 1) {
217 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
218 }
219
220 if (ms_setup_thread(thread_ctx)) {
221 exit(1);
222 }
223
224 /* each thread with a timer */
225 ms_clock_handler(0, 0, 0);
226
227 pthread_mutex_lock(&ms_global.init_lock.lock);
228 ms_global.init_lock.count++;
229 pthread_cond_signal(&ms_global.init_lock.cond);
230 pthread_mutex_unlock(&ms_global.init_lock.lock);
231
232 ms_thread = pthread_getspecific(ms_thread_key);
233 event_base_loop(ms_thread->base, 0);
234 event_base_free(ms_thread->base);
235 free(ms_thread);
236
237 return NULL;
238 } /* ms_worker_libevent */
239
240 /**
241 * Creates a worker thread.
242 *
243 * @param func, the callback function
244 * @param arg, the argument to pass to the callback function
245 */
246 static void ms_create_worker(void *(*func)(void *), ms_thread_ctx_t *arg) {
247 pthread_attr_t attr;
248 int ret;
249
250 pthread_attr_init(&attr);
251
252 if ((ret = pthread_create(&arg->pth_id, &attr, func, arg))) {
253 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
254 exit(1);
255 }
256 } /* ms_create_worker */
257
258 /* initialize threads */
259 void ms_thread_init() {
260 ms_thread_ctx =
261 (ms_thread_ctx_t *) malloc(sizeof(ms_thread_ctx_t) * (size_t) ms_setting.nthreads);
262 if (ms_thread_ctx == NULL) {
263 fprintf(stderr, "Can't allocate thread descriptors.");
264 exit(1);
265 }
266
267 for (uint32_t i = 0; i < ms_setting.nthreads; i++) {
268 ms_thread_ctx[i].thd_idx = i;
269 ms_thread_ctx[i].nconns = ms_setting.nconns / ms_setting.nthreads;
270
271 /**
272 * If only one server, all the connections in all threads
273 * connects the same server. For support multi-servers, simple
274 * distribute thread to server.
275 */
276 ms_thread_ctx[i].srv_idx = i % ms_setting.srv_cnt;
277 ms_thread_ctx[i].tps_perconn = ms_setting.expected_tps / (int) ms_setting.nconns;
278 ms_thread_ctx[i].exec_num_perconn = ms_setting.exec_num / ms_setting.nconns;
279 }
280
281 if (pthread_key_create(&ms_thread_key, NULL)) {
282 fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
283 exit(1);
284 }
285 /* Create threads after we've done all the epoll setup. */
286 for (uint32_t i = 0; i < ms_setting.nthreads; i++) {
287 ms_create_worker(ms_worker_libevent, &ms_thread_ctx[i]);
288 }
289 } /* ms_thread_init */
290
291 /* cleanup some resource of threads when all the threads exit */
292 void ms_thread_cleanup() {
293 for (uint32_t i = 0; i < ms_setting.nthreads; i++) {
294 pthread_join(ms_thread_ctx[i].pth_id, NULL);
295 }
296 if (ms_thread_ctx) {
297 free(ms_thread_ctx);
298 }
299 pthread_key_delete(ms_thread_key);
300 } /* ms_thread_cleanup */