Merge trunk
[awesomized/libmemcached] / clients / ms_thread.c
1 /*
2 * File: ms_thread.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #if TIME_WITH_SYS_TIME
15 # include <sys/time.h>
16 # include <time.h>
17 #else
18 # if HAVE_SYS_TIME_H
19 # include <sys/time.h>
20 # else
21 # include <time.h>
22 # endif
23 #endif
24 #include "ms_thread.h"
25 #include "ms_setting.h"
26 #include "ms_atomic.h"
27
28 /* global variable */
29 pthread_key_t ms_thread_key;
30
31 /* array of thread context structure, each thread has a thread context structure */
32 static ms_thread_ctx_t *ms_thread_ctx;
33
34 /* functions */
35 static void ms_set_current_time(void);
36 static void ms_check_sock_timeout(void);
37 static void ms_clock_handler(const int fd, const short which, void *arg);
38 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu);
39 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
40 static void *ms_worker_libevent(void *arg);
41 static void ms_create_worker(void *(*func)(void *), void *arg);
42
43
44 /**
45 * time-sensitive callers can call it by hand with this,
46 * outside the normal ever-1-second timer
47 */
48 static void ms_set_current_time()
49 {
50 struct timeval timer;
51 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
52
53 gettimeofday(&timer, NULL);
54 ms_thread->curr_time= (rel_time_t)timer.tv_sec;
55 } /* ms_set_current_time */
56
57
58 /**
59 * used to check whether UDP of command are waiting timeout
60 * by the ever-1-second timer
61 */
62 static void ms_check_sock_timeout(void)
63 {
64 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
65 ms_conn_t *c= NULL;
66 int time_diff= 0;
67
68 for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
69 {
70 c= &ms_thread->conn[i];
71
72 if (c->udp)
73 {
74 time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec);
75
76 /* wait time out */
77 if (time_diff > SOCK_WAIT_TIMEOUT)
78 {
79 /* calculate dropped packets count */
80 if (c->recvpkt > 0)
81 {
82 atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt);
83 }
84
85 atomic_add_size(&ms_stats.udp_timeout, 1);
86 ms_reset_conn(c, true);
87 }
88 }
89 }
90 } /* ms_check_sock_timeout */
91
92
93 /* if disconnect, the ever-1-second timer will call this function to reconnect */
94 static void ms_reconn_thread_socks(void)
95 {
96 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
97 for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
98 {
99 ms_reconn_socks(&ms_thread->conn[i]);
100 }
101 } /* ms_reconn_thread_socks */
102
103
104 /**
105 * the handler of the ever-1-second timer
106 *
107 * @param fd, the descriptors of the socket
108 * @param which, event flags
109 * @param arg, argument
110 */
111 static void ms_clock_handler(const int fd, const short which, void *arg)
112 {
113 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
114 struct timeval t=
115 {
116 .tv_sec= 1, .tv_usec= 0
117 };
118
119 UNUSED_ARGUMENT(fd);
120 UNUSED_ARGUMENT(which);
121 UNUSED_ARGUMENT(arg);
122
123 ms_set_current_time();
124
125 if (ms_thread->initialized)
126 {
127 /* only delete the event if it's actually there. */
128 evtimer_del(&ms_thread->clock_event);
129 ms_check_sock_timeout();
130 }
131 else
132 {
133 ms_thread->initialized= true;
134 }
135
136 ms_reconn_thread_socks();
137
138 evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
139 event_base_set(ms_thread->base, &ms_thread->clock_event);
140 evtimer_add(&ms_thread->clock_event, &t);
141 } /* ms_clock_handler */
142
143
144 /**
145 * used to bind thread to CPU if the system supports
146 *
147 * @param cpu, cpu index
148 *
149 * @return if success, return EXIT_SUCCESS, else return -1
150 */
151 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu)
152 {
153 uint32_t ret= 0;
154
155 #ifdef HAVE_CPU_SET_T
156 cpu_set_t cpu_set;
157 CPU_ZERO(&cpu_set);
158 CPU_SET(cpu, &cpu_set);
159
160 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
161 {
162 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
163 ret= 1;
164 }
165 #else
166 UNUSED_ARGUMENT(cpu);
167 #endif
168
169 return ret;
170 } /* ms_set_thread_cpu_affinity */
171
172
173 /**
174 * Set up a thread's information.
175 *
176 * @param thread_ctx, pointer of the thread context structure
177 *
178 * @return if success, return EXIT_SUCCESS, else return -1
179 */
180 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
181 {
182
183 ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
184 pthread_setspecific(ms_thread_key, (void *)ms_thread);
185
186 ms_thread->thread_ctx= thread_ctx;
187 ms_thread->nactive_conn= thread_ctx->nconns;
188 ms_thread->initialized= false;
189 static volatile uint32_t cnt= 0;
190
191 gettimeofday(&ms_thread->startup_time, NULL);
192
193 ms_thread->base= event_init();
194 if (ms_thread->base == NULL)
195 {
196 if (atomic_add_32_nv(&cnt, 1) == 0)
197 {
198 fprintf(stderr, "Can't allocate event base.\n");
199 }
200
201 return -1;
202 }
203
204 ms_thread->conn=
205 (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
206 if (ms_thread->conn == NULL)
207 {
208 if (atomic_add_32_nv(&cnt, 1) == 0)
209 {
210 fprintf(
211 stderr,
212 "Can't allocate concurrency structure for thread descriptors.");
213 }
214
215 return -1;
216 }
217 memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
218
219 for (uint32_t i= 0; i < thread_ctx->nconns; i++)
220 {
221 ms_thread->conn[i].conn_idx= i;
222 if (ms_setup_conn(&ms_thread->conn[i]) != 0)
223 {
224 /* only output this error once */
225 if (atomic_add_32_nv(&cnt, 1) == 0)
226 {
227 fprintf(stderr, "Initializing connection failed.\n");
228 }
229
230 return -1;
231 }
232 }
233
234 return EXIT_SUCCESS;
235 } /* ms_setup_thread */
236
237
238 /**
239 * Worker thread: main event loop
240 *
241 * @param arg, the pointer of argument
242 *
243 * @return void*
244 */
245 static void *ms_worker_libevent(void *arg)
246 {
247 ms_thread_t *ms_thread= NULL;
248 ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
249
250 /**
251 * If system has more than one cpu and supports set cpu
252 * affinity, try to bind each thread to a cpu core;
253 */
254 if (ms_setting.ncpu > 1)
255 {
256 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
257 }
258
259 if (ms_setup_thread(thread_ctx) != 0)
260 {
261 exit(1);
262 }
263
264 /* each thread with a timer */
265 ms_clock_handler(0, 0, 0);
266
267 pthread_mutex_lock(&ms_global.init_lock.lock);
268 ms_global.init_lock.count++;
269 pthread_cond_signal(&ms_global.init_lock.cond);
270 pthread_mutex_unlock(&ms_global.init_lock.lock);
271
272 ms_thread= pthread_getspecific(ms_thread_key);
273 event_base_loop(ms_thread->base, 0);
274
275 return NULL;
276 } /* ms_worker_libevent */
277
278
279 /**
280 * Creates a worker thread.
281 *
282 * @param func, the callback function
283 * @param arg, the argument to pass to the callback function
284 */
285 static void ms_create_worker(void *(*func)(void *), void *arg)
286 {
287 pthread_t thread;
288 pthread_attr_t attr;
289 int ret;
290
291 pthread_attr_init(&attr);
292
293 if ((ret= pthread_create(&thread, &attr, func, arg)) != 0)
294 {
295 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
296 exit(1);
297 }
298 } /* ms_create_worker */
299
300
301 /* initialize threads */
302 void ms_thread_init()
303 {
304 ms_thread_ctx=
305 (ms_thread_ctx_t *)malloc(
306 sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
307 if (ms_thread_ctx == NULL)
308 {
309 fprintf(stderr, "Can't allocate thread descriptors.");
310 exit(1);
311 }
312
313 for (uint32_t i= 0; i < ms_setting.nthreads; i++)
314 {
315 ms_thread_ctx[i].thd_idx= i;
316 ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
317
318 /**
319 * If only one server, all the connections in all threads
320 * connects the same server. For support multi-servers, simple
321 * distribute thread to server.
322 */
323 ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
324 ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
325 / (int)ms_setting.nconns;
326 ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
327 / ms_setting.nconns;
328 }
329
330 if (pthread_key_create(&ms_thread_key, NULL))
331 {
332 fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
333 exit(1);
334 }
335 /* Create threads after we've done all the epoll setup. */
336 for (uint32_t i= 0; i < ms_setting.nthreads; i++)
337 {
338 ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
339 }
340 } /* ms_thread_init */
341
342
343 /* cleanup some resource of threads when all the threads exit */
344 void ms_thread_cleanup()
345 {
346 if (ms_thread_ctx != NULL)
347 {
348 free(ms_thread_ctx);
349 }
350 pthread_key_delete(ms_thread_key);
351 } /* ms_thread_cleanup */