Fixed atomics and getline on solaris.
[awesomized/libmemcached] / clients / ms_thread.c
1 /*
2 * File: ms_thread.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11
12 #include "config.h"
13
14 #include "ms_thread.h"
15 #include "ms_setting.h"
16 #include "ms_atomic.h"
17
18 /* global variable */
19 __thread ms_thread_t ms_thread; /* each thread with a private ms_thread structure */
20
21 /* array of thread context structure, each thread has a thread context structure */
22 static ms_thread_ctx_t *ms_thread_ctx;
23
24 /* functions */
25 static void ms_set_current_time(void);
26 static void ms_check_sock_timeout(void);
27 static void ms_clock_handler(const int fd, const short which, void *arg);
28 static int ms_set_thread_cpu_affinity(int cpu);
29 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
30 static void *ms_worker_libevent(void *arg);
31 static void ms_create_worker(void *(*func)(void *), void *arg);
32
33
34 /**
35 * time-sensitive callers can call it by hand with this,
36 * outside the normal ever-1-second timer
37 */
38 static void ms_set_current_time()
39 {
40 struct timeval timer;
41
42 gettimeofday(&timer, NULL);
43 ms_thread.curr_time= (rel_time_t)timer.tv_sec;
44 } /* ms_set_current_time */
45
46
47 /**
48 * used to check whether UDP of command are waiting timeout
49 * by the ever-1-second timer
50 */
51 static void ms_check_sock_timeout(void)
52 {
53 ms_conn_t *c= NULL;
54 int time_diff= 0;
55
56 for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
57 {
58 c= &ms_thread.conn[i];
59
60 if (c->udp)
61 {
62 time_diff= (int)(ms_thread.curr_time - c->start_time.tv_sec);
63
64 /* wait time out */
65 if (time_diff > SOCK_WAIT_TIMEOUT)
66 {
67 /* calculate dropped packets count */
68 if (c->recvpkt > 0)
69 {
70 atomic_add_64(&ms_stats.pkt_drop, c->packets - c->recvpkt);
71 }
72
73 atomic_add_64(&ms_stats.udp_timeout, 1);
74 ms_reset_conn(c, true);
75 }
76 }
77 }
78 } /* ms_check_sock_timeout */
79
80
81 /* if disconnect, the ever-1-second timer will call this function to reconnect */
82 static void ms_reconn_thread_socks(void)
83 {
84 for (int i= 0; i < ms_thread.thread_ctx->nconns; i++)
85 {
86 ms_reconn_socks(&ms_thread.conn[i]);
87 }
88 } /* ms_reconn_thread_socks */
89
90
91 /**
92 * the handler of the ever-1-second timer
93 *
94 * @param fd, the descriptors of the socket
95 * @param which, event flags
96 * @param arg, argument
97 */
98 static void ms_clock_handler(const int fd, const short which, void *arg)
99 {
100 struct timeval t=
101 {
102 .tv_sec= 1, .tv_usec= 0
103 };
104
105 UNUSED_ARGUMENT(fd);
106 UNUSED_ARGUMENT(which);
107 UNUSED_ARGUMENT(arg);
108
109 ms_set_current_time();
110
111 if (ms_thread.initialized)
112 {
113 /* only delete the event if it's actually there. */
114 evtimer_del(&ms_thread.clock_event);
115 ms_check_sock_timeout();
116 }
117 else
118 {
119 ms_thread.initialized= true;
120 }
121
122 ms_reconn_thread_socks();
123
124 evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0);
125 event_base_set(ms_thread.base, &ms_thread.clock_event);
126 evtimer_add(&ms_thread.clock_event, &t);
127 } /* ms_clock_handler */
128
129
130 /**
131 * used to bind thread to CPU if the system supports
132 *
133 * @param cpu, cpu index
134 *
135 * @return if success, return 0, else return -1
136 */
137 static int ms_set_thread_cpu_affinity(int cpu)
138 {
139 int ret= 0;
140
141 #ifdef HAVE_CPU_SET_T
142 cpu_set_t cpu_set;
143 CPU_ZERO(&cpu_set);
144 CPU_SET(cpu, &cpu_set);
145
146 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1)
147 {
148 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
149 ret= 1;
150 }
151 #else
152 UNUSED_ARGUMENT(cpu);
153 #endif
154
155 return ret;
156 } /* ms_set_thread_cpu_affinity */
157
158
159 /**
160 * Set up a thread's information.
161 *
162 * @param thread_ctx, pointer of the thread context structure
163 *
164 * @return if success, return 0, else return -1
165 */
166 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
167 {
168 ms_thread.thread_ctx= thread_ctx;
169 ms_thread.nactive_conn= thread_ctx->nconns;
170 ms_thread.initialized= false;
171 static volatile uint32_t cnt= 0;
172
173 gettimeofday(&ms_thread.startup_time, NULL);
174
175 ms_thread.base= event_init();
176 if (ms_thread.base == NULL)
177 {
178 if (atomic_add_32_nv(&cnt, 1) == 0)
179 {
180 fprintf(stderr, "Can't allocate event base.\n");
181 }
182
183 return -1;
184 }
185
186 ms_thread.conn=
187 (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
188 if (ms_thread.conn == NULL)
189 {
190 if (atomic_add_32_nv(&cnt, 1) == 0)
191 {
192 fprintf(
193 stderr,
194 "Can't allocate concurrency structure for thread descriptors.");
195 }
196
197 return -1;
198 }
199 memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
200
201 for (int i= 0; i < thread_ctx->nconns; i++)
202 {
203 ms_thread.conn[i].conn_idx= i;
204 if (ms_setup_conn(&ms_thread.conn[i]) != 0)
205 {
206 /* only output this error once */
207 if (atomic_add_32_nv(&cnt, 1) == 0)
208 {
209 fprintf(stderr, "Initializing connection failed.\n");
210 }
211
212 return -1;
213 }
214 }
215
216 return 0;
217 } /* ms_setup_thread */
218
219
220 /**
221 * Worker thread: main event loop
222 *
223 * @param arg, the pointer of argument
224 *
225 * @return void*
226 */
227 static void *ms_worker_libevent(void *arg)
228 {
229 ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
230
231 /**
232 * If system has more than one cpu and supports set cpu
233 * affinity, try to bind each thread to a cpu core;
234 */
235 if (ms_setting.ncpu > 1)
236 {
237 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
238 }
239
240 if (ms_setup_thread(thread_ctx) != 0)
241 {
242 exit(1);
243 }
244
245 /* each thread with a timer */
246 ms_clock_handler(0, 0, 0);
247
248 event_base_loop(ms_thread.base, 0);
249
250 return NULL;
251 } /* ms_worker_libevent */
252
253
254 /**
255 * Creates a worker thread.
256 *
257 * @param func, the callback function
258 * @param arg, the argument to pass to the callback function
259 */
260 static void ms_create_worker(void *(*func)(void *), void *arg)
261 {
262 pthread_t thread;
263 pthread_attr_t attr;
264 int ret;
265
266 pthread_attr_init(&attr);
267
268 if ((ret= pthread_create(&thread, &attr, func, arg)) != 0)
269 {
270 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
271 exit(1);
272 }
273 } /* ms_create_worker */
274
275
276 /* initialize threads */
277 void ms_thread_init()
278 {
279 ms_thread_ctx=
280 (ms_thread_ctx_t *)malloc(
281 sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
282 if (ms_thread_ctx == NULL)
283 {
284 fprintf(stderr, "Can't allocate thread descriptors.");
285 exit(1);
286 }
287
288 for (int i= 0; i < ms_setting.nthreads; i++)
289 {
290 ms_thread_ctx[i].thd_idx= i;
291 ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
292
293 /**
294 * If only one server, all the connections in all threads
295 * connects the same server. For support multi-servers, simple
296 * distribute thread to server.
297 */
298 ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
299 ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
300 / ms_setting.nconns;
301 ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
302 / ms_setting.nconns;
303 }
304
305 /* Create threads after we've done all the epoll setup. */
306 for (int i= 0; i < ms_setting.nthreads; i++)
307 {
308 ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
309 }
310 } /* ms_thread_init */
311
312
313 /* cleanup some resource of threads when all the threads exit */
314 void ms_thread_cleanup()
315 {
316 if (ms_thread_ctx != NULL)
317 {
318 free(ms_thread_ctx);
319 }
320 } /* ms_thread_cleanup */