Uncrustify
[awesomized/libmemcached] / clients / ms_thread.c
1 /*
2 * File: ms_thread.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11 #include "ms_thread.h"
12 #include "ms_setting.h"
13
14 /* global variable */
15 __thread ms_thread_t ms_thread; /* each thread with a private ms_thread structure */
16
17 /* array of thread context structure, each thread has a thread context structure */
18 static ms_thread_ctx_t *ms_thread_ctx;
19
20 /* functions */
21 static void ms_set_current_time(void);
22 static void ms_check_sock_timeout(void);
23 static void ms_clock_handler(const int fd, const short which, void *arg);
24 static int ms_set_thread_cpu_affinity(int cpu);
25 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
26 static void *ms_worker_libevent(void *arg);
27 static void ms_create_worker(void *(*func)(void *), void *arg);
28
29
30 /**
31 * time-sensitive callers can call it by hand with this,
32 * outside the normal ever-1-second timer
33 */
34 static void ms_set_current_time()
35 {
36 struct timeval timer;
37
38 gettimeofday(&timer, NULL);
39 ms_thread.curr_time= (rel_time_t)timer.tv_sec;
40 } /* ms_set_current_time */
41
42
43 /**
44 * used to check whether UDP of command are waiting timeout
45 * by the ever-1-second timer
46 */
47 static void ms_check_sock_timeout(void)
48 {
49 ms_conn_t *c= NULL;
50 int time_diff= 0;
51
52 for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
53 {
54 c= &ms_thread.conn[i];
55
56 if (c->udp)
57 {
58 time_diff= (int)(ms_thread.curr_time - c->start_time.tv_sec);
59
60 /* wait time out */
61 if (time_diff > SOCK_WAIT_TIMEOUT)
62 {
63 /* calculate dropped packets count */
64 if (c->recvpkt > 0)
65 {
66 __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt);
67 }
68
69 __sync_fetch_and_add(&ms_stats.udp_timeout, 1);
70 ms_reset_conn(c, true);
71 }
72 }
73 }
74 } /* ms_check_sock_timeout */
75
76
77 /* if disconnect, the ever-1-second timer will call this function to reconnect */
78 static void ms_reconn_thread_socks(void)
79 {
80 for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
81 {
82 ms_reconn_socks(&ms_thread.conn[i]);
83 }
84 } /* ms_reconn_thread_socks */
85
86
87 /**
88 * the handler of the ever-1-second timer
89 *
90 * @param fd, the descriptors of the socket
91 * @param which, event flags
92 * @param arg, argument
93 */
94 static void ms_clock_handler(const int fd, const short which, void *arg)
95 {
96 struct timeval t=
97 {
98 .tv_sec= 1, .tv_usec= 0
99 };
100
101 UNUSED_ARGUMENT(fd);
102 UNUSED_ARGUMENT(which);
103 UNUSED_ARGUMENT(arg);
104
105 ms_set_current_time();
106
107 if (ms_thread.initialized)
108 {
109 /* only delete the event if it's actually there. */
110 evtimer_del(&ms_thread.clock_event);
111 ms_check_sock_timeout();
112 }
113 else
114 {
115 ms_thread.initialized= true;
116 }
117
118 ms_reconn_thread_socks();
119
120 evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0);
121 event_base_set(ms_thread.base, &ms_thread.clock_event);
122 evtimer_add(&ms_thread.clock_event, &t);
123 } /* ms_clock_handler */
124
125
126 /**
127 * used to bind thread to CPU if the system supports
128 *
129 * @param cpu, cpu index
130 *
131 * @return if success, return 0, else return -1
132 */
133 static int ms_set_thread_cpu_affinity(int cpu)
134 {
135 int ret= 0;
136
137 #ifdef HAVE_CPU_SET_T
138 cpu_set_t cpu_set;
139 CPU_ZERO(&cpu_set);
140 CPU_SET(cpu, &cpu_set);
141
142 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
143 {
144 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
145 ret= 1;
146 }
147 #else
148 UNUSED_ARGUMENT(cpu);
149 #endif
150
151 return ret;
152 } /* ms_set_thread_cpu_affinity */
153
154
155 /**
156 * Set up a thread's information.
157 *
158 * @param thread_ctx, pointer of the thread context structure
159 *
160 * @return if success, return 0, else return -1
161 */
162 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
163 {
164 ms_thread.thread_ctx= thread_ctx;
165 ms_thread.nactive_conn= thread_ctx->nconns;
166 ms_thread.initialized= false;
167 static int cnt= 0;
168
169 gettimeofday(&ms_thread.startup_time, NULL);
170
171 ms_thread.base= event_init();
172 if (ms_thread.base == NULL)
173 {
174 if (__sync_fetch_and_add(&cnt, 1) == 0)
175 {
176 fprintf(stderr, "Can't allocate event base.\n");
177 }
178
179 return -1;
180 }
181
182 ms_thread.conn=
183 (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
184 if (ms_thread.conn == NULL)
185 {
186 if (__sync_fetch_and_add(&cnt, 1) == 0)
187 {
188 fprintf(
189 stderr,
190 "Can't allocate concurrency structure for thread descriptors.");
191 }
192
193 return -1;
194 }
195 memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
196
197 for (int i= 0; i < thread_ctx->nconns; i++)
198 {
199 ms_thread.conn[i].conn_idx= i;
200 if (ms_setup_conn(&ms_thread.conn[i]) != 0)
201 {
202 /* only output this error once */
203 if (__sync_fetch_and_add(&cnt, 1) == 0)
204 {
205 fprintf(stderr, "Initializing connection failed.\n");
206 }
207
208 return -1;
209 }
210 }
211
212 return 0;
213 } /* ms_setup_thread */
214
215
216 /**
217 * Worker thread: main event loop
218 *
219 * @param arg, the pointer of argument
220 *
221 * @return void*
222 */
223 static void *ms_worker_libevent(void *arg)
224 {
225 ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
226
227 /**
228 * If system has more than one cpu and supports set cpu
229 * affinity, try to bind each thread to a cpu core;
230 */
231 if (ms_setting.ncpu > 1)
232 {
233 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
234 }
235
236 if (ms_setup_thread(thread_ctx) != 0)
237 {
238 exit(1);
239 }
240
241 /* each thread with a timer */
242 ms_clock_handler(0, 0, 0);
243
244 event_base_loop(ms_thread.base, 0);
245
246 return NULL;
247 } /* ms_worker_libevent */
248
249
250 /**
251 * Creates a worker thread.
252 *
253 * @param func, the callback function
254 * @param arg, the argument to pass to the callback function
255 */
256 static void ms_create_worker(void *(*func)(void *), void *arg)
257 {
258 pthread_t thread;
259 pthread_attr_t attr;
260 int ret;
261
262 pthread_attr_init(&attr);
263
264 if ((ret= pthread_create(&thread, &attr, func, arg)) != 0)
265 {
266 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
267 exit(1);
268 }
269 } /* ms_create_worker */
270
271
272 /* initialize threads */
273 void ms_thread_init()
274 {
275 ms_thread_ctx=
276 (ms_thread_ctx_t *)malloc(
277 sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
278 if (ms_thread_ctx == NULL)
279 {
280 fprintf(stderr, "Can't allocate thread descriptors.");
281 exit(1);
282 }
283
284 for (int i= 0; i < ms_setting.nthreads; i++)
285 {
286 ms_thread_ctx[i].thd_idx= i;
287 ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
288
289 /**
290 * If only one server, all the connections in all threads
291 * connects the same server. For support multi-servers, simple
292 * distribute thread to server.
293 */
294 ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
295 ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
296 / ms_setting.nconns;
297 ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
298 / ms_setting.nconns;
299 }
300
301 /* Create threads after we've done all the epoll setup. */
302 for (int i= 0; i < ms_setting.nthreads; i++)
303 {
304 ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
305 }
306 } /* ms_thread_init */
307
308
309 /* cleanup some resource of threads when all the threads exit */
310 void ms_thread_cleanup()
311 {
312 if (ms_thread_ctx != NULL)
313 {
314 free(ms_thread_ctx);
315 }
316 } /* ms_thread_cleanup */