projects
/
m6w6
/
libmemcached
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
|
github
raw
|
inline
| side by side
Add ability to have version "just requested" when you initially connect.
[m6w6/libmemcached]
/
clients
/
ms_thread.c
diff --git
a/clients/ms_thread.c
b/clients/ms_thread.c
index af856366c345ddea87ddbc9f2fd0a7693155d92c..1efe5cfbb6f62df9dc8975033065d6ae933db002 100644
(file)
--- a/
clients/ms_thread.c
+++ b/
clients/ms_thread.c
@@
-11,6
+11,16
@@
#include "config.h"
#include "config.h"
+#if TIME_WITH_SYS_TIME
+# include <sys/time.h>
+# include <time.h>
+#else
+# if HAVE_SYS_TIME_H
+# include <sys/time.h>
+# else
+# include <time.h>
+# endif
+#endif
#include "ms_thread.h"
#include "ms_setting.h"
#include "ms_atomic.h"
#include "ms_thread.h"
#include "ms_setting.h"
#include "ms_atomic.h"
@@
-25,7
+35,7
@@
static ms_thread_ctx_t *ms_thread_ctx;
static void ms_set_current_time(void);
static void ms_check_sock_timeout(void);
static void ms_clock_handler(const int fd, const short which, void *arg);
static void ms_set_current_time(void);
static void ms_check_sock_timeout(void);
static void ms_clock_handler(const int fd, const short which, void *arg);
-static
int ms_set_thread_cpu_affinity(in
t cpu);
+static
uint32_t ms_set_thread_cpu_affinity(uint32_
t cpu);
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
static void *ms_worker_libevent(void *arg);
static void ms_create_worker(void *(*func)(void *), void *arg);
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx);
static void *ms_worker_libevent(void *arg);
static void ms_create_worker(void *(*func)(void *), void *arg);
@@
-55,13
+65,13
@@
static void ms_check_sock_timeout(void)
ms_conn_t *c= NULL;
int time_diff= 0;
ms_conn_t *c= NULL;
int time_diff= 0;
- for (
in
t i= 0; i < ms_thread->thread_ctx->nconns; i++)
+ for (
uint32_
t i= 0; i < ms_thread->thread_ctx->nconns; i++)
{
c= &ms_thread->conn[i];
if (c->udp)
{
{
c= &ms_thread->conn[i];
if (c->udp)
{
- time_diff= (int)(ms_thread->curr_time - c->start_time.tv_sec);
+ time_diff= (int)(ms_thread->curr_time -
(rel_time_t)
c->start_time.tv_sec);
/* wait time out */
if (time_diff > SOCK_WAIT_TIMEOUT)
/* wait time out */
if (time_diff > SOCK_WAIT_TIMEOUT)
@@
-69,10
+79,10
@@
static void ms_check_sock_timeout(void)
/* calculate dropped packets count */
if (c->recvpkt > 0)
{
/* calculate dropped packets count */
if (c->recvpkt > 0)
{
- atomic_add_
64
(&ms_stats.pkt_drop, c->packets - c->recvpkt);
+ atomic_add_
size
(&ms_stats.pkt_drop, c->packets - c->recvpkt);
}
}
- atomic_add_
64
(&ms_stats.udp_timeout, 1);
+ atomic_add_
size
(&ms_stats.udp_timeout, 1);
ms_reset_conn(c, true);
}
}
ms_reset_conn(c, true);
}
}
@@
-84,7
+94,7
@@
static void ms_check_sock_timeout(void)
static void ms_reconn_thread_socks(void)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
static void ms_reconn_thread_socks(void)
{
ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
- for (
in
t i= 0; i < ms_thread->thread_ctx->nconns; i++)
+ for (
uint32_
t i= 0; i < ms_thread->thread_ctx->nconns; i++)
{
ms_reconn_socks(&ms_thread->conn[i]);
}
{
ms_reconn_socks(&ms_thread->conn[i]);
}
@@
-136,11
+146,11
@@
static void ms_clock_handler(const int fd, const short which, void *arg)
*
* @param cpu, cpu index
*
*
* @param cpu, cpu index
*
- * @return if success, return
0
, else return -1
+ * @return if success, return
EXIT_SUCCESS
, else return -1
*/
*/
-static
int ms_set_thread_cpu_affinity(in
t cpu)
+static
uint32_t ms_set_thread_cpu_affinity(uint32_
t cpu)
{
{
-
in
t ret= 0;
+
uint32_
t ret= 0;
#ifdef HAVE_CPU_SET_T
cpu_set_t cpu_set;
#ifdef HAVE_CPU_SET_T
cpu_set_t cpu_set;
@@
-165,11
+175,11
@@
static int ms_set_thread_cpu_affinity(int cpu)
*
* @param thread_ctx, pointer of the thread context structure
*
*
* @param thread_ctx, pointer of the thread context structure
*
- * @return if success, return
0
, else return -1
+ * @return if success, return
EXIT_SUCCESS
, else return -1
*/
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
{
*/
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
{
-
+
ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
pthread_setspecific(ms_thread_key, (void *)ms_thread);
ms_thread_t *ms_thread= (ms_thread_t *)calloc(sizeof(*ms_thread), 1);
pthread_setspecific(ms_thread_key, (void *)ms_thread);
@@
-206,7
+216,7
@@
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
}
memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
}
memset(ms_thread->conn, 0, (size_t)thread_ctx->nconns * sizeof(ms_conn_t));
- for (
in
t i= 0; i < thread_ctx->nconns; i++)
+ for (
uint32_
t i= 0; i < thread_ctx->nconns; i++)
{
ms_thread->conn[i].conn_idx= i;
if (ms_setup_conn(&ms_thread->conn[i]) != 0)
{
ms_thread->conn[i].conn_idx= i;
if (ms_setup_conn(&ms_thread->conn[i]) != 0)
@@
-221,7
+231,7
@@
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
}
}
}
}
- return
0
;
+ return
EXIT_SUCCESS
;
} /* ms_setup_thread */
} /* ms_setup_thread */
@@
-234,7
+244,7
@@
static int ms_setup_thread(ms_thread_ctx_t *thread_ctx)
*/
static void *ms_worker_libevent(void *arg)
{
*/
static void *ms_worker_libevent(void *arg)
{
- ms_thread_t *ms_thread=
pthread_getspecific(ms_thread_key)
;
+ ms_thread_t *ms_thread=
NULL
;
ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
/**
ms_thread_ctx_t *thread_ctx= (ms_thread_ctx_t *)arg;
/**
@@
-254,6
+264,12
@@
static void *ms_worker_libevent(void *arg)
/* each thread with a timer */
ms_clock_handler(0, 0, 0);
/* each thread with a timer */
ms_clock_handler(0, 0, 0);
+ pthread_mutex_lock(&ms_global.init_lock.lock);
+ ms_global.init_lock.count++;
+ pthread_cond_signal(&ms_global.init_lock.cond);
+ pthread_mutex_unlock(&ms_global.init_lock.lock);
+
+ ms_thread= pthread_getspecific(ms_thread_key);
event_base_loop(ms_thread->base, 0);
return NULL;
event_base_loop(ms_thread->base, 0);
return NULL;
@@
-294,7
+310,7
@@
void ms_thread_init()
exit(1);
}
exit(1);
}
- for (
in
t i= 0; i < ms_setting.nthreads; i++)
+ for (
uint32_
t i= 0; i < ms_setting.nthreads; i++)
{
ms_thread_ctx[i].thd_idx= i;
ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
{
ms_thread_ctx[i].thd_idx= i;
ms_thread_ctx[i].nconns= ms_setting.nconns / ms_setting.nthreads;
@@
-306,7
+322,7
@@
void ms_thread_init()
*/
ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
*/
ms_thread_ctx[i].srv_idx= i % ms_setting.srv_cnt;
ms_thread_ctx[i].tps_perconn= ms_setting.expected_tps
- / ms_setting.nconns;
+ /
(int)
ms_setting.nconns;
ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
/ ms_setting.nconns;
}
ms_thread_ctx[i].exec_num_perconn= ms_setting.exec_num
/ ms_setting.nconns;
}
@@
-317,7
+333,7
@@
void ms_thread_init()
exit(1);
}
/* Create threads after we've done all the epoll setup. */
exit(1);
}
/* Create threads after we've done all the epoll setup. */
- for (
in
t i= 0; i < ms_setting.nthreads; i++)
+ for (
uint32_
t i= 0; i < ms_setting.nthreads; i++)
{
ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
}
{
ms_create_worker(ms_worker_libevent, (void *)&ms_thread_ctx[i]);
}