Adding memslap tool.
[awesomized/libmemcached] / clients / ms_thread.c
1 /*
2 * File: ms_thread.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include "ms_thread.h"
15 #include "ms_setting.h"
16
17 /* global variable */
18 __thread ms_thread_t ms_thread; /* each thread with a private ms_thread structure */
19
20 /* array of thread context structure, each thread has a thread context structure */
21 static ms_thread_ctx_t *ms_thread_ctx;
22
23 /* functions */
24 static void ms_set_current_time(void);
25 static void ms_check_sock_timeout(void);
26 static void ms_clock_handler(const int fd, const short which, void *arg);
27 static int ms_set_thread_cpu_affinity(int cpu);
28 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
29 static void *ms_worker_libevent(void *arg);
30 static void ms_create_worker(void *(*func)(void *), void *arg);
31
32
33 /**
34 * time-sensitive callers can call it by hand with this,
35 * outside the normal ever-1-second timer
36 */
37 static void ms_set_current_time()
38 {
39 struct timeval timer;
40
41 gettimeofday(&timer, NULL);
42 ms_thread.curr_time= (rel_time_t)timer.tv_sec;
43 } /* ms_set_current_time */
44
45
46 /**
47 * used to check whether UDP of command are waiting timeout
48 * by the ever-1-second timer
49 */
50 static void ms_check_sock_timeout(void)
51 {
52 ms_conn_t *c= NULL;
53 int time_diff= 0;
54
55 for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
56 {
57 c= &ms_thread.conn[i];
58
59 if (c->udp)
60 {
61 time_diff= (int)(ms_thread.curr_time - c->start_time.tv_sec);
62
63 /* wait time out */
64 if (time_diff > SOCK_WAIT_TIMEOUT)
65 {
66 /* calculate dropped packets count */
67 if (c->recvpkt > 0)
68 {
69 __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt);
70 }
71
72 __sync_fetch_and_add(&ms_stats.udp_timeout, 1);
73 ms_reset_conn(c, true);
74 }
75 }
76 }
77 } /* ms_check_sock_timeout */
78
79
80 /* if disconnect, the ever-1-second timer will call this function to reconnect */
81 static void ms_reconn_thread_socks(void)
82 {
83 for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
84 {
85 ms_reconn_socks(&ms_thread.conn[i]);
86 }
87 } /* ms_reconn_thread_socks */
88
89
90 /**
91 * the handler of the ever-1-second timer
92 *
93 * @param fd, the descriptors of the socket
94 * @param which, event flags
95 * @param arg, argument
96 */
97 static void ms_clock_handler(const int fd, const short which, void *arg)
98 {
99 struct timeval t=
100 {
101 .tv_sec= 1, .tv_usec= 0
102 };
103
104 UNUSED_ARGUMENT(fd);
105 UNUSED_ARGUMENT(which);
106 UNUSED_ARGUMENT(arg);
107
108 ms_set_current_time();
109
110 if (ms_thread.initialized)
111 {
112 /* only delete the event if it's actually there. */
113 evtimer_del(&ms_thread.clock_event);
114 ms_check_sock_timeout();
115 }
116 else
117 {
118 ms_thread.initialized= true;
119 }
120
121 ms_reconn_thread_socks();
122
123 evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0);
124 event_base_set(ms_thread.base, &ms_thread.clock_event);
125 evtimer_add(&ms_thread.clock_event, &t);
126 } /* ms_clock_handler */
127
128
129 /**
130 * used to bind thread to CPU if the system supports
131 *
132 * @param cpu, cpu index
133 *
134 * @return if success, return 0, else return -1
135 */
136 static int ms_set_thread_cpu_affinity(int cpu)
137 {
138 int ret= 0;
139
140 #ifdef HAVE_CPU_SET_T
141 cpu_set_t cpu_set;
142 CPU_ZERO(&cpu_set);
143 CPU_SET(cpu, &cpu_set);
144
145 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
146 {
147 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
148 ret= 1;
149 }
150 #else
151 UNUSED_ARGUMENT(cpu);
152 #endif
153
154 return ret;
155 } /* ms_set_thread_cpu_affinity */
156
157
158 /**
159 * Set up a thread's information.
160 *
161 * @param thread_ctx, pointer of the thread context structure
162 *
163 * @return if success, return 0, else return -1
164 */
165 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
166 {
167 ms_thread.thread_ctx= thread_ctx;
168 ms_thread.nactive_conn= thread_ctx->nconns;
169 ms_thread.initialized= false;
170 static int cnt= 0;
171
172 gettimeofday(&ms_thread.startup_time, NULL);
173
174 ms_thread.base= event_init();
175 if (ms_thread.base == NULL)
176 {
177 if (__sync_fetch_and_add(&cnt, 1) == 0)
178 {
179 fprintf(stderr, "Can't allocate event base.\n");
180 }
181
182 return -1;
183 }
184
185 ms_thread.conn=
186 (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
187 if (ms_thread.conn == NULL)
188 {
189 if (__sync_fetch_and_add(&cnt, 1) == 0)
190 {
191 fprintf(
192 stderr,
193 "Can't allocate concurrency structure for thread descriptors.");
194 }
195
196 return -1;
197 }
198 memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
199
200 for (int i= 0; i < thread_ctx->nconns; i++)
201 {
202 ms_thread.conn[i].conn_idx= i;
203 if (ms_setup_conn(&ms_thread.conn[i]) != 0)
204 {
205 /* only output this error once */
206 if (__sync_fetch_and_add(&cnt, 1) == 0)
207 {
208 fprintf(stderr, "Initializing connection failed.\n");
209 }
210
211 return -1;
212 }
213 }
214
215 return 0;
216 } /* ms_setup_thread */
217
218
219 /**
220 * Worker thread: main event loop
221 *
222 * @param arg, the pointer of argument
223 *
224 * @return void*
225 */
226 static void *ms_worker_libevent(void *arg)
227 {
228 ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
229
230 /**
231 * If system has more than one cpu and supports set cpu
232 * affinity, try to bind each thread to a cpu core;
233 */
234 if (ms_setting.ncpu > 1)
235 {
236 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
237 }
238
239 if (ms_setup_thread(thread_ctx) != 0)
240 {
241 exit(1);
242 }
243
244 /* each thread with a timer */
245 ms_clock_handler(0, 0, 0);
246
247 event_base_loop(ms_thread.base, 0);
248
249 return NULL;
250 } /* ms_worker_libevent */
251
252
253 /**
254 * Creates a worker thread.
255 *
256 * @param func, the callback function
257 * @param arg, the argument to pass to the callback function
258 */
259 static void ms_create_worker(void *(*func)(void *), void *arg)
260 {
261 pthread_t thread;
262 pthread_attr_t attr;
263 int ret;
264
265 pthread_attr_init(&attr);
266
267 if ((ret= pthread_create(&thread, &attr, func, arg)) != 0)
268 {
269 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
270 exit(1);
271 }
272 } /* ms_create_worker */
273
274
275 /* initialize threads */
276 void ms_thread_init()
277 {
278 ms_thread_ctx=
279 (ms_thread_ctx_t *)malloc(
280 sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
281 if (ms_thread_ctx == NULL)
282 {
283 fprintf(stderr, "Can't allocate thread descriptors.");
284 exit(1);
285 }
286
287 for (int i= 0; i < ms_setting.nthreads; i++)
288 {
289 ms_thread_ctx[i].thd_idx= i;
290 ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
291
292 /**
293 * If only one server, all the connections in all threads
294 * connects the same server. For support multi-servers, simple
295 * distribute thread to server.
296 */
297 ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
298 ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
299 / ms_setting.nconns;
300 ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
301 / ms_setting.nconns;
302 }
303
304 /* Create threads after we've done all the epoll setup. */
305 for (int i= 0; i < ms_setting.nthreads; i++)
306 {
307 ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
308 }
309 } /* ms_thread_init */
310
311
312 /* cleanup some resource of threads when all the threads exit */
313 void ms_thread_cleanup()
314 {
315 if (ms_thread_ctx != NULL)
316 {
317 free(ms_thread_ctx);
318 }
319 } /* ms_thread_cleanup */