Schooner memslap changes
[awesomized/libmemcached] / clients / ms_thread.c
1 /*
2 * File: ms_thread.c
3 * Author: Mingqiang Zhuang
4 *
5 * Created on February 10, 2009
6 *
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
9 *
10 */
11 #include "ms_thread.h"
12 #include "ms_setting.h"
13
14 /* global variable */
15 __thread ms_thread_t ms_thread; /* each thread with a private ms_thread structure */
16
17 /* array of thread context structure, each thread has a thread context structure */
18 static ms_thread_ctx_t *ms_thread_ctx;
19
20 /* functions */
21 static void ms_set_current_time(void);
22 static void ms_check_sock_timeout(void);
23 static void ms_clock_handler(const int fd, const short which, void *arg);
24 static int ms_set_thread_cpu_affinity(int cpu);
25 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
26 static void *ms_worker_libevent(void *arg);
27 static void ms_create_worker(void *(*func)(void *), void *arg);
28
29
30 /**
31 * time-sensitive callers can call it by hand with this,
32 * outside the normal ever-1-second timer
33 */
34 static void ms_set_current_time()
35 {
36 struct timeval timer;
37
38 gettimeofday(&timer, NULL);
39 ms_thread.curr_time = (rel_time_t)timer.tv_sec;
40 }
41
42 /**
43 * used to check whether UDP of command are waiting timeout
44 * by the ever-1-second timer
45 */
46 static void ms_check_sock_timeout(void)
47 {
48 ms_conn_t *c = NULL;
49 int time_diff = 0;
50
51 for (int i = 0; i < ms_thread.thread_ctx->nconns; i++) {
52 c = &ms_thread.conn[i];
53
54 if (c->udp) {
55 time_diff = (int)(ms_thread.curr_time - c->start_time.tv_sec);
56
57 /* wait time out */
58 if (time_diff > SOCK_WAIT_TIMEOUT) {
59 /* calculate dropped packets count */
60 if (c->recvpkt > 0) {
61 __sync_fetch_and_add(&ms_stats.pkt_drop, c->packets - c->recvpkt);
62 }
63
64 __sync_fetch_and_add(&ms_stats.udp_timeout, 1);
65 ms_reset_conn(c, true);
66 }
67 }
68 }
69 }
70
71 /* if disconnect, the ever-1-second timer will call this function to reconnect */
72 static void ms_reconn_thread_socks(void)
73 {
74 for (int i = 0; i < ms_thread.thread_ctx->nconns; i++) {
75 ms_reconn_socks(&ms_thread.conn[i]);
76 }
77 }
78
79 /**
80 * the handler of the ever-1-second timer
81 *
82 * @param fd, the descriptors of the socket
83 * @param which, event flags
84 * @param arg, argument
85 */
86 static void ms_clock_handler(const int fd, const short which, void *arg)
87 {
88 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
89
90 UNUSED_ARGUMENT(fd);
91 UNUSED_ARGUMENT(which);
92 UNUSED_ARGUMENT(arg);
93
94 ms_set_current_time();
95
96 if (ms_thread.initialized) {
97 /* only delete the event if it's actually there. */
98 evtimer_del(&ms_thread.clock_event);
99 ms_check_sock_timeout();
100 } else {
101 ms_thread.initialized = true;
102 }
103
104 ms_reconn_thread_socks();
105
106 evtimer_set(&ms_thread.clock_event, ms_clock_handler, 0);
107 event_base_set(ms_thread.base, &ms_thread.clock_event);
108 evtimer_add(&ms_thread.clock_event, &t);
109 }
110
111 /**
112 * used to bind thread to CPU if the system supports
113 *
114 * @param cpu, cpu index
115 *
116 * @return if success, return 0, else return -1
117 */
118 static int ms_set_thread_cpu_affinity(int cpu)
119 {
120 int ret = 0;
121
122 #ifdef HAVE_CPU_SET_T
123 cpu_set_t cpu_set;
124 CPU_ZERO(&cpu_set);
125 CPU_SET(cpu, &cpu_set);
126
127 if (sched_setaffinity(0, sizeof(cpu_set_t), &cpu_set) == -1){
128 fprintf(stderr, "WARNING: Could not set CPU Affinity, continuing...\n");
129 ret = 1;
130 }
131 #else
132 UNUSED_ARGUMENT(cpu);
133 #endif
134
135 return ret;
136 }
137
138 /**
139 * Set up a thread's information.
140 *
141 * @param thread_ctx, pointer of the thread context structure
142 *
143 * @return if success, return 0, else return -1
144 */
145 static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
146 {
147 ms_thread.thread_ctx = thread_ctx;
148 ms_thread.nactive_conn = thread_ctx->nconns;
149 ms_thread.initialized = false;
150 static int cnt = 0;
151
152 gettimeofday(&ms_thread.startup_time, NULL);
153
154 ms_thread.base = event_init();
155 if (ms_thread.base == NULL) {
156 if (__sync_fetch_and_add(&cnt, 1) == 0) {
157 fprintf(stderr, "Can't allocate event base.\n");
158 }
159
160 return -1;
161 }
162
163 ms_thread.conn = (ms_conn_t *)malloc((size_t)thread_ctx->nconns * sizeof(ms_conn_t));
164 if (ms_thread.conn == NULL) {
165 if (__sync_fetch_and_add(&cnt, 1) == 0) {
166 fprintf(stderr, "Can't allocate concurrency structure for thread descriptors.");
167 }
168
169 return -1;
170 }
171 memset(ms_thread.conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
172
173 for (int i = 0; i < thread_ctx->nconns; i++) {
174 ms_thread.conn[i].conn_idx = i;
175 if (ms_setup_conn(&ms_thread.conn[i]) != 0) {
176
177 /* only output this error once */
178 if (__sync_fetch_and_add(&cnt, 1) == 0) {
179 fprintf(stderr, "Initializing connection failed.\n");
180 }
181
182 return -1;
183 }
184 }
185
186 return 0;
187 }
188
189 /**
190 * Worker thread: main event loop
191 *
192 * @param arg, the pointer of argument
193 *
194 * @return void*
195 */
196 static void *ms_worker_libevent(void *arg)
197 {
198 ms_thread_ctx_t *thread_ctx = (ms_thread_ctx_t *)arg;
199
200 /**
201 * If system has more than one cpu and supports set cpu
202 * affinity, try to bind each thread to a cpu core;
203 */
204 if (ms_setting.ncpu > 1) {
205 ms_set_thread_cpu_affinity(thread_ctx->thd_idx % ms_setting.ncpu);
206 }
207
208 if (ms_setup_thread(thread_ctx) != 0) {
209 exit(1);
210 }
211
212 /* each thread with a timer */
213 ms_clock_handler(0, 0, 0);
214
215 event_base_loop(ms_thread.base, 0);
216
217 return NULL;
218 }
219
220 /**
221 * Creates a worker thread.
222 *
223 * @param func, the callback function
224 * @param arg, the argument to pass to the callback function
225 */
226 static void ms_create_worker(void *(*func)(void *), void *arg)
227 {
228 pthread_t thread;
229 pthread_attr_t attr;
230 int ret;
231
232 pthread_attr_init(&attr);
233
234 if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
235 fprintf(stderr, "Can't create thread: %s.\n", strerror(ret));
236 exit(1);
237 }
238 }
239
240 /* initialize threads */
241 void ms_thread_init()
242 {
243 ms_thread_ctx = (ms_thread_ctx_t *)malloc(sizeof(ms_thread_ctx_t) * (size_t)ms_setting.nthreads);
244 if (ms_thread_ctx == NULL) {
245 fprintf(stderr, "Can't allocate thread descriptors.");
246 exit(1);
247 }
248
249 for (int i = 0; i < ms_setting.nthreads; i++) {
250 ms_thread_ctx[i].thd_idx = i;
251 ms_thread_ctx[i].nconns = ms_setting.nconns / ms_setting.nthreads;
252
253 /**
254 * If only one server, all the connections in all threads
255 * connects the same server. For support multi-servers, simple
256 * distribute thread to server.
257 */
258 ms_thread_ctx[i].srv_idx = i % ms_setting.srv_cnt;
259 ms_thread_ctx[i].tps_perconn = ms_setting.expected_tps / ms_setting.nconns;
260 ms_thread_ctx[i].exec_num_perconn = ms_setting.exec_num / ms_setting.nconns;
261 }
262
263 /* Create threads after we've done all the epoll setup. */
264 for (int i = 0; i < ms_setting.nthreads; i++) {
265 ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
266 }
267 }
268
269 /* cleanup some resource of threads when all the threads exit */
270 void ms_thread_cleanup()
271 {
272 if (ms_thread_ctx != NULL) {
273 free(ms_thread_ctx);
274 }
275 }