Temp save for Monty.
[awesomized/libmemcached] / clients / ms_thread.c
1 /*
2 * File: ms_thread.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include "ms_thread.h"
15 #include "ms_setting.h"
16 #include "ms_atomic.h"
17
18 /* global variable */
19 pthread_key_t ms_thread_key;
20
21 /* array of thread context structure, each thread has a thread context structure */
22 static ms_thread_ctx_t *ms_thread_ctx;
23
24 /* functions */
25 static void ms_set_current_time(void);
26 static void ms_check_sock_timeout(void);
27 static void ms_clock_handler(const int fd, const short which, void *arg);
28 static int ms_set_thread_cpu_affinity(int cpu);
29 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
30 static void *ms_worker_libevent(void *arg);
31 static void ms_create_worker(void *(*func)(void *), void *arg);
32
33
34 /**
35 * time-sensitive callers can call it by hand with this,
36 * outside the normal ever-1-second timer
37 */
38 static void ms_set_current_time()
39 {
40 struct timeval timer;
41 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
42
43 gettimeofday(&timer, NULL);
44 ms_thread->curr_time= (rel_time_t)timer.tv_sec;
45 } /* ms_set_current_time */
46
47
48 /**
49 * used to check whether UDP of command are waiting timeout
50 * by the ever-1-second timer
51 */
52 static void ms_check_sock_timeout(void)
53 {
54 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
55 ms_conn_t *c= NULL;
56 int time_diff= 0;
57
58 for (int i= 0; i < ms_thread->thread_ctx->nconns; i++)
59 {
60 c= &ms_thread->conn[i];
61
62 if (c->udp)
63 {
64 time_diff= (int)(ms_thread->curr_time - (rel_time_t)c->start_time.tv_sec);
65
66 /* wait time out */
67 if (time_diff > SOCK_WAIT_TIMEOUT)
68 {
69 /* calculate dropped packets count */
70 if (c->recvpkt > 0)
71 {
72 atomic_add_size(&ms_stats.pkt_drop, c->packets - c->recvpkt);
73 }
74
75 atomic_add_size(&ms_stats.udp_timeout, 1);
76 ms_reset_conn(c, true);
77 }
78 }
79 }
80 } /* ms_check_sock_timeout */
81
82
83 /* if disconnect, the ever-1-second timer will call this function to reconnect */
84 static void ms_reconn_thread_socks(void)
85 {
86 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
87 for (int i= 0; i < ms_thread->thread_ctx->nconns; i++)
88 {
89 ms_reconn_socks(&ms_thread->conn[i]);
90 }
91 } /* ms_reconn_thread_socks */
92
93
94 /**
95 * the handler of the ever-1-second timer
96 *
97 * @param fd, the descriptors of the socket
98 * @param which, event flags
99 * @param arg, argument
100 */
101 static void ms_clock_handler(const int fd, const short which, void *arg)
102 {
103 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
104 struct timeval t=
105 {
106 .tv_sec= 1, .tv_usec= 0
107 };
108
109 UNUSED_ARGUMENT(fd);
110 UNUSED_ARGUMENT(which);
111 UNUSED_ARGUMENT(arg);
112
113 ms_set_current_time();
114
115 if (ms_thread->initialized)
116 {
117 /* only delete the event if it's actually there. */
118 evtimer_del(&ms_thread->clock_event);
119 ms_check_sock_timeout();
120 }
121 else
122 {
123 ms_thread->initialized= true;
124 }
125
126 ms_reconn_thread_socks();
127
128 evtimer_set(&ms_thread->clock_event, ms_clock_handler, 0);
129 event_base_set(ms_thread->base, &ms_thread->clock_event);
130 evtimer_add(&ms_thread->clock_event, &t);
131 } /* ms_clock_handler */
132
133
134 /**
135 * used to bind thread to CPU if the system supports
136 *
137 * @param cpu, cpu index
138 *
139 * @return if success, return 0, else return -1
140 */
141 static int ms_set_thread_cpu_affinity(int cpu)
142 {
143 int ret= 0;
144
145 #ifdef HAVE_CPU_SET_T
146 cpu_set_t cpu_set;
147 CPU_ZERO(&cpu_set);
148 CPU_SET(cpu, &cpu_set);
149
150 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
151 {
152 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
153 ret= 1;
154 }
155 #else
156 UNUSED_ARGUMENT(cpu);
157 #endif
158
159 return ret;
160 } /* ms_set_thread_cpu_affinity */
161
162
163 /**
164 * Set up a thread's information.
165 *
166 * @param thread_ctx, pointer of the thread context structure
167 *
168 * @return if success, return 0, else return -1
169 */
170 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
171 {
172
173 ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
174 pthread_setspecific(ms_thread_key, (void *)ms_thread);
175
176 ms_thread->thread_ctx= thread_ctx;
177 ms_thread->nactive_conn= thread_ctx->nconns;
178 ms_thread->initialized= false;
179 static volatile uint32_t cnt= 0;
180
181 gettimeofday(&ms_thread->startup_time, NULL);
182
183 ms_thread->base= event_init();
184 if (ms_thread->base == NULL)
185 {
186 if (atomic_add_32_nv(&cnt, 1) == 0)
187 {
188 fprintf(stderr, "Can't allocate event base.\n");
189 }
190
191 return -1;
192 }
193
194 ms_thread->conn=
195 (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
196 if (ms_thread->conn == NULL)
197 {
198 if (atomic_add_32_nv(&cnt, 1) == 0)
199 {
200 fprintf(
201 stderr,
202 "Can't allocate concurrency structure for thread descriptors.");
203 }
204
205 return -1;
206 }
207 memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
208
209 for (int i= 0; i < thread_ctx->nconns; i++)
210 {
211 ms_thread->conn[i].conn_idx= i;
212 if (ms_setup_conn(&ms_thread->conn[i]) != 0)
213 {
214 /* only output this error once */
215 if (atomic_add_32_nv(&cnt, 1) == 0)
216 {
217 fprintf(stderr, "Initializing connection failed.\n");
218 }
219
220 return -1;
221 }
222 }
223
224 return 0;
225 } /* ms_setup_thread */
226
227
228 /**
229 * Worker thread: main event loop
230 *
231 * @param arg, the pointer of argument
232 *
233 * @return void*
234 */
235 static void *ms_worker_libevent(void *arg)
236 {
237 ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
238 ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
239
240 /**
241 * If system has more than one cpu and supports set cpu
242 * affinity, try to bind each thread to a cpu core;
243 */
244 if (ms_setting.ncpu > 1)
245 {
246 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
247 }
248
249 if (ms_setup_thread(thread_ctx) != 0)
250 {
251 exit(1);
252 }
253
254 /* each thread with a timer */
255 ms_clock_handler(0, 0, 0);
256
257 event_base_loop(ms_thread->base, 0);
258
259 return NULL;
260 } /* ms_worker_libevent */
261
262
263 /**
264 * Creates a worker thread.
265 *
266 * @param func, the callback function
267 * @param arg, the argument to pass to the callback function
268 */
269 static void ms_create_worker(void *(*func)(void *), void *arg)
270 {
271 pthread_t thread;
272 pthread_attr_t attr;
273 int ret;
274
275 pthread_attr_init(&attr);
276
277 if ((ret= pthread_create(&thread, &attr, func, arg)) != 0)
278 {
279 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
280 exit(1);
281 }
282 } /* ms_create_worker */
283
284
285 /* initialize threads */
286 void ms_thread_init()
287 {
288 ms_thread_ctx=
289 (ms_thread_ctx_t *)malloc(
290 sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
291 if (ms_thread_ctx == NULL)
292 {
293 fprintf(stderr, "Can't allocate thread descriptors.");
294 exit(1);
295 }
296
297 for (int i= 0; i < ms_setting.nthreads; i++)
298 {
299 ms_thread_ctx[i].thd_idx= i;
300 ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
301
302 /**
303 * If only one server, all the connections in all threads
304 * connects the same server. For support multi-servers, simple
305 * distribute thread to server.
306 */
307 ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
308 ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
309 / ms_setting.nconns;
310 ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
311 / ms_setting.nconns;
312 }
313
314 if (pthread_key_create(&ms_thread_key, NULL))
315 {
316 fprintf(stderr, "Can't create pthread keys. Major malfunction!\n");
317 exit(1);
318 }
319 /* Create threads after we've done all the epoll setup. */
320 for (int i= 0; i < ms_setting.nthreads; i++)
321 {
322 ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
323 }
324 } /* ms_thread_init */
325
326
327 /* cleanup some resource of threads when all the threads exit */
328 void ms_thread_cleanup()
329 {
330 if (ms_thread_ctx != NULL)
331 {
332 free(ms_thread_ctx);
333 }
334 pthread_key_delete(ms_thread_key);
335 } /* ms_thread_cleanup */