3 * Author: Mingqiang Zhuang
5 * Created on February 10, 2009
7 * (c) Copyright 2009, Schooner Information Technology, Inc.
8 * http://www.schoonerinfotech.com/
11 #include "ms_thread.h"
12 #include "ms_setting.h"
15 __thread ms_thread_t ms_thread
; /* each thread with a private ms_thread structure */
17 /* array of thread context structure, each thread has a thread context structure */
18 static ms_thread_ctx_t
*ms_thread_ctx
;
21 static void ms_set_current_time(void);
22 static void ms_check_sock_timeout(void);
23 static void ms_clock_handler(const int fd
, const short which
, void *arg
);
24 static int ms_set_thread_cpu_affinity(int cpu
);
25 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
);
26 static void *ms_worker_libevent(void *arg
);
27 static void ms_create_worker(void *(*func
)(void *), void *arg
);
31 * time-sensitive callers can call it by hand with this,
32 * outside the normal ever-1-second timer
34 static void ms_set_current_time()
38 gettimeofday(&timer
, NULL
);
39 ms_thread
.curr_time
= (rel_time_t
)timer
.tv_sec
;
43 * used to check whether UDP of command are waiting timeout
44 * by the ever-1-second timer
46 static void ms_check_sock_timeout(void)
51 for (int i
= 0; i
< ms_thread
.thread_ctx
->nconns
; i
++) {
52 c
= &ms_thread
.conn
[i
];
55 time_diff
= (int)(ms_thread
.curr_time
- c
->start_time
.tv_sec
);
58 if (time_diff
> SOCK_WAIT_TIMEOUT
) {
59 /* calculate dropped packets count */
61 __sync_fetch_and_add(&ms_stats
.pkt_drop
, c
->packets
- c
->recvpkt
);
64 __sync_fetch_and_add(&ms_stats
.udp_timeout
, 1);
65 ms_reset_conn(c
, true);
71 /* if disconnect, the ever-1-second timer will call this function to reconnect */
72 static void ms_reconn_thread_socks(void)
74 for (int i
= 0; i
< ms_thread
.thread_ctx
->nconns
; i
++) {
75 ms_reconn_socks(&ms_thread
.conn
[i
]);
80 * the handler of the ever-1-second timer
82 * @param fd, the descriptors of the socket
83 * @param which, event flags
84 * @param arg, argument
86 static void ms_clock_handler(const int fd
, const short which
, void *arg
)
88 struct timeval t
= {.tv_sec
= 1, .tv_usec
= 0};
91 UNUSED_ARGUMENT(which
);
94 ms_set_current_time();
96 if (ms_thread
.initialized
) {
97 /* only delete the event if it's actually there. */
98 evtimer_del(&ms_thread
.clock_event
);
99 ms_check_sock_timeout();
101 ms_thread
.initialized
= true;
104 ms_reconn_thread_socks();
106 evtimer_set(&ms_thread
.clock_event
, ms_clock_handler
, 0);
107 event_base_set(ms_thread
.base
, &ms_thread
.clock_event
);
108 evtimer_add(&ms_thread
.clock_event
, &t
);
112 * used to bind thread to CPU if the system supports
114 * @param cpu, cpu index
116 * @return if success, return 0, else return -1
118 static int ms_set_thread_cpu_affinity(int cpu
)
122 #ifdef HAVE_CPU_SET_T
125 CPU_SET(cpu
, &cpu_set
);
127 if (sched_setaffinity(0, sizeof(cpu_set_t
), &cpu_set
) == -1){
128 fprintf(stderr
, "WARNING: Could not set CPU Affinity, continuing...\n");
132 UNUSED_ARGUMENT(cpu
);
139 * Set up a thread's information.
141 * @param thread_ctx, pointer of the thread context structure
143 * @return if success, return 0, else return -1
145 static int ms_setup_thread(ms_thread_ctx_t
*thread_ctx
)
147 ms_thread
.thread_ctx
= thread_ctx
;
148 ms_thread
.nactive_conn
= thread_ctx
->nconns
;
149 ms_thread
.initialized
= false;
152 gettimeofday(&ms_thread
.startup_time
, NULL
);
154 ms_thread
.base
= event_init();
155 if (ms_thread
.base
== NULL
) {
156 if (__sync_fetch_and_add(&cnt
, 1) == 0) {
157 fprintf(stderr
, "Can't allocate event base.\n");
163 ms_thread
.conn
= (ms_conn_t
*)malloc((size_t)thread_ctx
->nconns
* sizeof(ms_conn_t
));
164 if (ms_thread
.conn
== NULL
) {
165 if (__sync_fetch_and_add(&cnt
, 1) == 0) {
166 fprintf(stderr
, "Can't allocate concurrency structure for thread descriptors.");
171 memset(ms_thread
.conn
, 0, (size_t)thread_ctx
->nconns
* sizeof(ms_conn_t
));
173 for (int i
= 0; i
< thread_ctx
->nconns
; i
++) {
174 ms_thread
.conn
[i
].conn_idx
= i
;
175 if (ms_setup_conn(&ms_thread
.conn
[i
]) != 0) {
177 /* only output this error once */
178 if (__sync_fetch_and_add(&cnt
, 1) == 0) {
179 fprintf(stderr
, "Initializing connection failed.\n");
190 * Worker thread: main event loop
192 * @param arg, the pointer of argument
196 static void *ms_worker_libevent(void *arg
)
198 ms_thread_ctx_t
*thread_ctx
= (ms_thread_ctx_t
*)arg
;
201 * If system has more than one cpu and supports set cpu
202 * affinity, try to bind each thread to a cpu core;
204 if (ms_setting
.ncpu
> 1) {
205 ms_set_thread_cpu_affinity(thread_ctx
->thd_idx
% ms_setting
.ncpu
);
208 if (ms_setup_thread(thread_ctx
) != 0) {
212 /* each thread with a timer */
213 ms_clock_handler(0, 0, 0);
215 event_base_loop(ms_thread
.base
, 0);
221 * Creates a worker thread.
223 * @param func, the callback function
224 * @param arg, the argument to pass to the callback function
226 static void ms_create_worker(void *(*func
)(void *), void *arg
)
232 pthread_attr_init(&attr
);
234 if ((ret
= pthread_create(&thread
, &attr
, func
, arg
)) != 0) {
235 fprintf(stderr
, "Can't create thread: %s.\n", strerror(ret
));
240 /* initialize threads */
241 void ms_thread_init()
243 ms_thread_ctx
= (ms_thread_ctx_t
*)malloc(sizeof(ms_thread_ctx_t
) * (size_t)ms_setting
.nthreads
);
244 if (ms_thread_ctx
== NULL
) {
245 fprintf(stderr
, "Can't allocate thread descriptors.");
249 for (int i
= 0; i
< ms_setting
.nthreads
; i
++) {
250 ms_thread_ctx
[i
].thd_idx
= i
;
251 ms_thread_ctx
[i
].nconns
= ms_setting
.nconns
/ ms_setting
.nthreads
;
254 * If only one server, all the connections in all threads
255 * connects the same server. For support multi-servers, simple
256 * distribute thread to server.
258 ms_thread_ctx
[i
].srv_idx
= i
% ms_setting
.srv_cnt
;
259 ms_thread_ctx
[i
].tps_perconn
= ms_setting
.expected_tps
/ ms_setting
.nconns
;
260 ms_thread_ctx
[i
].exec_num_perconn
= ms_setting
.exec_num
/ ms_setting
.nconns
;
263 /* Create threads after we've done all the epoll setup. */
264 for (int i
= 0; i
< ms_setting
.nthreads
; i
++) {
265 ms_create_worker(ms_worker_libevent
, (void *)&ms_thread_ctx
[i
]);
269 /* cleanup some resource of threads when all the threads exit */
270 void ms_thread_cleanup()
272 if (ms_thread_ctx
!= NULL
) {