Merge lp:~tangent-org/libmemcached/1.0-build/ Build: jenkins-Libmemcached-187
[m6w6/libmemcached] / clients / ms_thread.c
1 /*
2 * File: ms_thread.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "mem_config.h"
13
14 #if defined(HAVE_SYS_TIME_H)
15 # include <sys/time.h>
16 #endif
17
18 #if defined(HAVE_TIME_H)
19 # include <time.h>
20 #endif
21
22 #include "ms_thread.h"
23 #include "ms_setting.h"
24 #include "ms_atomic.h"
25
26 /* global variable */
27 pthread_key_t ms_thread_key;
28
29 /* array of thread context structure, each thread has a thread context structure */
30 static ms_thread_ctx_t *ms_thread_ctx;
31
32 /* functions */
33 static void ms_set_current_time(void);
34 static void ms_check_sock_timeout(void);
35 static void ms_clock_handler(const int fd, const short which, void *arg);
36 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu);
37 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
38 static void *ms_worker_libevent(void *arg);
39 static void ms_create_worker(void *(*func)(void *), void *arg);
40
41
42 /**
43 * time-sensitive callers can call it by hand with this,
44 * outside the normal ever-1-second timer
45 */
46 static void ms_set_current_time()
47 {
48 struct timeval timer;
49 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
50
51 gettimeofday(&timer, NULL);
52 ms_thread->curr_time= (rel_time_t)timer.tv_sec;
53 } /* ms_set_current_time */
54
55
56 /**
57 * used to check whether UDP of command are waiting timeout
58 * by the ever-1-second timer
59 */
60 static void ms_check_sock_timeout(void)
61 {
62 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
63 ms_conn_t *c= NULL;
64 int time_diff= 0;
65
66 for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
67 {
68 c= &ms_thread->conn[i];
69
70 if (c->udp)
71 {
72 time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec);
73
74 /* wait time out */
75 if (time_diff > SOCK_WAIT_TIMEOUT)
76 {
77 /* calculate dropped packets count */
78 if (c->recvpkt > 0)
79 {
80 atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt);
81 }
82
83 atomic_add_size(&ms_stats.udp_timeout, 1);
84 ms_reset_conn(c, true);
85 }
86 }
87 }
88 } /* ms_check_sock_timeout */
89
90
91 /* if disconnect, the ever-1-second timer will call this function to reconnect */
92 static void ms_reconn_thread_socks(void)
93 {
94 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
95 for (uint32_t i= 0; i < ms_thread->thread_ctx->nconns; i++)
96 {
97 ms_reconn_socks(&ms_thread->conn[i]);
98 }
99 } /* ms_reconn_thread_socks */
100
101
102 /**
103 * the handler of the ever-1-second timer
104 *
105 * @param fd, the descriptors of the socket
106 * @param which, event flags
107 * @param arg, argument
108 */
109 static void ms_clock_handler(const int fd, const short which, void *arg)
110 {
111 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
112 struct timeval t=
113 {
114 .tv_sec= 1, .tv_usec= 0
115 };
116
117 UNUSED_ARGUMENT(fd);
118 UNUSED_ARGUMENT(which);
119 UNUSED_ARGUMENT(arg);
120
121 ms_set_current_time();
122
123 if (ms_thread->initialized)
124 {
125 /* only delete the event if it's actually there. */
126 evtimer_del(&ms_thread->clock_event);
127 ms_check_sock_timeout();
128 }
129 else
130 {
131 ms_thread->initialized= true;
132 }
133
134 ms_reconn_thread_socks();
135
136 evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
137 event_base_set(ms_thread->base, &ms_thread->clock_event);
138 evtimer_add(&ms_thread->clock_event, &t);
139 } /* ms_clock_handler */
140
141
142 /**
143 * used to bind thread to CPU if the system supports
144 *
145 * @param cpu, cpu index
146 *
147 * @return if success, return EXIT_SUCCESS, else return -1
148 */
149 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu)
150 {
151 uint32_t ret= 0;
152
153 #ifdef HAVE_CPU_SET_T
154 cpu_set_t cpu_set;
155 CPU_ZERO(&cpu_set);
156 CPU_SET(cpu, &cpu_set);
157
158 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
159 {
160 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
161 ret= 1;
162 }
163 #else
164 UNUSED_ARGUMENT(cpu);
165 #endif
166
167 return ret;
168 } /* ms_set_thread_cpu_affinity */
169
170
171 /**
172 * Set up a thread's information.
173 *
174 * @param thread_ctx, pointer of the thread context structure
175 *
176 * @return if success, return EXIT_SUCCESS, else return -1
177 */
178 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
179 {
180
181 ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
182 pthread_setspecific(ms_thread_key, (void *)ms_thread);
183
184 ms_thread->thread_ctx= thread_ctx;
185 ms_thread->nactive_conn= thread_ctx->nconns;
186 ms_thread->initialized= false;
187 static volatile uint32_t cnt= 0;
188
189 gettimeofday(&ms_thread->startup_time, NULL);
190
191 ms_thread->base= event_init();
192 if (ms_thread->base == NULL)
193 {
194 if (atomic_add_32_nv(&cnt, 1) == 0)
195 {
196 fprintf(stderr, "Can't allocate event base.\n");
197 }
198
199 return -1;
200 }
201
202 ms_thread->conn=
203 (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
204 if (ms_thread->conn == NULL)
205 {
206 if (atomic_add_32_nv(&cnt, 1) == 0)
207 {
208 fprintf(
209 stderr,
210 "Can't allocate concurrency structure for thread descriptors.");
211 }
212
213 return -1;
214 }
215 memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
216
217 for (uint32_t i= 0; i < thread_ctx->nconns; i++)
218 {
219 ms_thread->conn[i].conn_idx= i;
220 if (ms_setup_conn(&ms_thread->conn[i]) != 0)
221 {
222 /* only output this error once */
223 if (atomic_add_32_nv(&cnt, 1) == 0)
224 {
225 fprintf(stderr, "Initializing connection failed.\n");
226 }
227
228 return -1;
229 }
230 }
231
232 return EXIT_SUCCESS;
233 } /* ms_setup_thread */
234
235
236 /**
237 * Worker thread: main event loop
238 *
239 * @param arg, the pointer of argument
240 *
241 * @return void*
242 */
243 static void *ms_worker_libevent(void *arg)
244 {
245 ms_thread_t *ms_thread= NULL;
246 ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
247
248 /**
249 * If system has more than one cpu and supports set cpu
250 * affinity, try to bind each thread to a cpu core;
251 */
252 if (ms_setting.ncpu > 1)
253 {
254 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
255 }
256
257 if (ms_setup_thread(thread_ctx) != 0)
258 {
259 exit(1);
260 }
261
262 /* each thread with a timer */
263 ms_clock_handler(0, 0, 0);
264
265 pthread_mutex_lock(&ms_global.init_lock.lock);
266 ms_global.init_lock.count++;
267 pthread_cond_signal(&ms_global.init_lock.cond);
268 pthread_mutex_unlock(&ms_global.init_lock.lock);
269
270 ms_thread= pthread_getspecific(ms_thread_key);
271 event_base_loop(ms_thread->base, 0);
272
273 return NULL;
274 } /* ms_worker_libevent */
275
276
277 /**
278 * Creates a worker thread.
279 *
280 * @param func, the callback function
281 * @param arg, the argument to pass to the callback function
282 */
283 static void ms_create_worker(void *(*func)(void *), void *arg)
284 {
285 pthread_t thread;
286 pthread_attr_t attr;
287 int ret;
288
289 pthread_attr_init(&attr);
290
291 if ((ret= pthread_create(&thread, &attr, func, arg)) != 0)
292 {
293 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
294 exit(1);
295 }
296 } /* ms_create_worker */
297
298
299 /* initialize threads */
300 void ms_thread_init()
301 {
302 ms_thread_ctx=
303 (ms_thread_ctx_t *)malloc(
304 sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
305 if (ms_thread_ctx == NULL)
306 {
307 fprintf(stderr, "Can't allocate thread descriptors.");
308 exit(1);
309 }
310
311 for (uint32_t i= 0; i < ms_setting.nthreads; i++)
312 {
313 ms_thread_ctx[i].thd_idx= i;
314 ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
315
316 /**
317 * If only one server, all the connections in all threads
318 * connects the same server. For support multi-servers, simple
319 * distribute thread to server.
320 */
321 ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
322 ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
323 / (int)ms_setting.nconns;
324 ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
325 / ms_setting.nconns;
326 }
327
328 if (pthread_key_create(&ms_thread_key, NULL))
329 {
330 fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
331 exit(1);
332 }
333 /* Create threads after we've done all the epoll setup. */
334 for (uint32_t i= 0; i < ms_setting.nthreads; i++)
335 {
336 ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
337 }
338 } /* ms_thread_init */
339
340
341 /* cleanup some resource of threads when all the threads exit */
342 void ms_thread_cleanup()
343 {
344 if (ms_thread_ctx != NULL)
345 {
346 free(ms_thread_ctx);
347 }
348 pthread_key_delete(ms_thread_key);
349 } /* ms_thread_cleanup */