3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
12 #include "mem_config.h"
14 #if defined(HAVE_SYS_TIME_H)
15 # include <sys/time.h>
18 #if defined(HAVE_TIME_H)
22 #include "ms_thread.h"
23 #include "ms_setting.h"
24 #include "ms_atomic.h"
27 pthread_key_t ms_thread_key
;
29 /* array of thread context structure, each thread has a thread context structure */
30 static ms_thread_ctx_t
*ms_thread_ctx
;
33 static void ms_set_current_time(void);
34 static void ms_check_sock_timeout(void);
35 static void ms_clock_handler(const int fd
, const short which
, void *arg
);
36 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu
);
37 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
);
38 static void *ms_worker_libevent(void *arg
);
39 static void ms_create_worker(void *(*func
)(void *), void *arg
);
43 * time-sensitive callers can call it by hand with this,
44 * outside the normal ever-1-second timer
46 static void ms_set_current_time()
49 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
51 gettimeofday(&timer
, NULL
);
52 ms_thread
->curr_time
= (rel_time_t
)timer
.tv_sec
;
53 } /* ms_set_current_time */
57 * used to check whether UDP of command are waiting timeout
58 * by the ever-1-second timer
60 static void ms_check_sock_timeout(void)
62 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
66 for (uint32_t i
= 0; i
< ms_thread
->thread_ctx
->nconns
; i
++)
68 c
= &ms_thread
->conn
[i
];
72 time_diff
= (int)(ms_thread
->curr_time
- (rel_time_t
)c
->start_time
.tv_sec
);
75 if (time_diff
> SOCK_WAIT_TIMEOUT
)
77 /* calculate dropped packets count */
80 atomic_add_size(&ms_stats
.pkt_drop
, c
->packets
- c
->recvpkt
);
83 atomic_add_size(&ms_stats
.udp_timeout
, 1);
84 ms_reset_conn(c
, true);
88 } /* ms_check_sock_timeout */
91 /* if disconnect, the ever-1-second timer will call this function to reconnect */
92 static void ms_reconn_thread_socks(void)
94 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
95 for (uint32_t i
= 0; i
< ms_thread
->thread_ctx
->nconns
; i
++)
97 ms_reconn_socks(&ms_thread
->conn
[i
]);
99 } /* ms_reconn_thread_socks */
103 * the handler of the ever-1-second timer
105 * @param fd, the descriptors of the socket
106 * @param which, event flags
107 * @param arg, argument
109 static void ms_clock_handler(const int fd
, const short which
, void *arg
)
111 ms_thread_t
*ms_thread
= pthread_getspecific(ms_thread_key
);
114 .tv_sec
= 1, .tv_usec
= 0
118 UNUSED_ARGUMENT(which
);
119 UNUSED_ARGUMENT(arg
);
121 ms_set_current_time();
123 if (ms_thread
->initialized
)
125 /* only delete the event if it's actually there. */
126 evtimer_del(&ms_thread
->clock_event
);
127 ms_check_sock_timeout();
131 ms_thread
->initialized
= true;
134 ms_reconn_thread_socks();
136 evtimer_set(&ms_thread
->clock_event
, ms_clock_handler
, 0);
137 event_base_set(ms_thread
->base
, &ms_thread
->clock_event
);
138 evtimer_add(&ms_thread
->clock_event
, &t
);
139 } /* ms_clock_handler */
143 * used to bind thread to CPU if the system supports
145 * @param cpu, cpu index
147 * @return if success, return EXIT_SUCCESS, else return -1
149 static uint32_t ms_set_thread_cpu_affinity(uint32_t cpu
)
153 #ifdef HAVE_CPU_SET_T
156 CPU_SET(cpu
, &cpu_set
);
158 if (sched_setaffinity(0, sizeof(cpu_set_t
), &cpu_set
) == -1)
160 fprintf(stderr
, "WARNING: Could not set CPU Affinity, continuing...\n");
164 UNUSED_ARGUMENT(cpu
);
168 } /* ms_set_thread_cpu_affinity */
172 * Set up a thread's information.
174 * @param thread_ctx, pointer of the thread context structure
176 * @return if success, return EXIT_SUCCESS, else return -1
178 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
)
181 ms_thread_t
*ms_thread
= (ms_thread_t
*)calloc(sizeof(*ms_thread
), 1);
182 pthread_setspecific(ms_thread_key
, (void *)ms_thread
);
184 ms_thread
->thread_ctx
= thread_ctx
;
185 ms_thread
->nactive_conn
= thread_ctx
->nconns
;
186 ms_thread
->initialized
= false;
187 static volatile uint32_t cnt
= 0;
189 gettimeofday(&ms_thread
->startup_time
, NULL
);
191 ms_thread
->base
= event_init();
192 if (ms_thread
->base
== NULL
)
194 if (atomic_add_32_nv(&cnt
, 1) == 0)
196 fprintf(stderr
, "Can't allocate event base.\n");
203 (ms_conn_t
*)malloc((size_t)thread_ctx
->nconns
* sizeof(ms_conn_t
));
204 if (ms_thread
->conn
== NULL
)
206 if (atomic_add_32_nv(&cnt
, 1) == 0)
210 "Can't allocate concurrency structure for thread descriptors.");
215 memset(ms_thread
->conn
, 0, (size_t)thread_ctx
->nconns
* sizeof(ms_conn_t
));
217 for (uint32_t i
= 0; i
< thread_ctx
->nconns
; i
++)
219 ms_thread
->conn
[i
].conn_idx
= i
;
220 if (ms_setup_conn(&ms_thread
->conn
[i
]) != 0)
222 /* only output this error once */
223 if (atomic_add_32_nv(&cnt
, 1) == 0)
225 fprintf(stderr
, "Initializing connection failed.\n");
233 } /* ms_setup_thread */
237 * Worker thread: main event loop
239 * @param arg, the pointer of argument
243 static void *ms_worker_libevent(void *arg
)
245 ms_thread_t
*ms_thread
= NULL
;
246 ms_thread_ctx_t
*thread_ctx
= (ms_thread_ctx_t
*)arg
;
249 * If system has more than one cpu and supports set cpu
250 * affinity, try to bind each thread to a cpu core;
252 if (ms_setting
.ncpu
> 1)
254 ms_set_thread_cpu_affinity(thread_ctx
->thd_idx
% ms_setting
.ncpu
);
257 if (ms_setup_thread(thread_ctx
) != 0)
262 /* each thread with a timer */
263 ms_clock_handler(0, 0, 0);
265 pthread_mutex_lock(&ms_global
.init_lock
.lock
);
266 ms_global
.init_lock
.count
++;
267 pthread_cond_signal(&ms_global
.init_lock
.cond
);
268 pthread_mutex_unlock(&ms_global
.init_lock
.lock
);
270 ms_thread
= pthread_getspecific(ms_thread_key
);
271 event_base_loop(ms_thread
->base
, 0);
274 } /* ms_worker_libevent */
278 * Creates a worker thread.
280 * @param func, the callback function
281 * @param arg, the argument to pass to the callback function
283 static void ms_create_worker(void *(*func
)(void *), void *arg
)
289 pthread_attr_init(&attr
);
291 if ((ret
= pthread_create(&thread
, &attr
, func
, arg
)) != 0)
293 fprintf(stderr
, "Can't create thread: %s.\n", strerror(ret
));
296 } /* ms_create_worker */
299 /* initialize threads */
300 void ms_thread_init()
303 (ms_thread_ctx_t
*)malloc(
304 sizeof(ms_thread_ctx_t
) * (size_t)ms_setting
.nthreads
);
305 if (ms_thread_ctx
== NULL
)
307 fprintf(stderr
, "Can't allocate thread descriptors.");
311 for (uint32_t i
= 0; i
< ms_setting
.nthreads
; i
++)
313 ms_thread_ctx
[i
].thd_idx
= i
;
314 ms_thread_ctx
[i
].nconns
= ms_setting
.nconns
/ ms_setting
.nthreads
;
317 * If only one server, all the connections in all threads
318 * connects the same server. For support multi-servers, simple
319 * distribute thread to server.
321 ms_thread_ctx
[i
].srv_idx
= i
% ms_setting
.srv_cnt
;
322 ms_thread_ctx
[i
].tps_perconn
= ms_setting
.expected_tps
323 / (int)ms_setting
.nconns
;
324 ms_thread_ctx
[i
].exec_num_perconn
= ms_setting
.exec_num
328 if (pthread_key_create(&ms_thread_key
, NULL
))
330 fprintf(stderr
, "Can't create pthread keys. Major malfunction!\n");
333 /* Create threads after we've done all the epoll setup. */
334 for (uint32_t i
= 0; i
< ms_setting
.nthreads
; i
++)
336 ms_create_worker(ms_worker_libevent
, (void *)&ms_thread_ctx
[i
]);
338 } /* ms_thread_init */
341 /* cleanup some resource of threads when all the threads exit */
342 void ms_thread_cleanup()
344 if (ms_thread_ctx
!= NULL
)
348 pthread_key_delete(ms_thread_key
);
349 } /* ms_thread_cleanup */