projects
/
m6w6
/
libmemcached
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
|
github
raw
|
inline
| side by side
Casting fixes for linux.
[m6w6/libmemcached]
/
clients
/
ms_conn.c
diff --git
a/clients/ms_conn.c
b/clients/ms_conn.c
index f4c7347d531892b0cd462c483aaba3ec82856316..59b0875d73db61a59d881626fdfc376740793c20 100644
(file)
--- a/
clients/ms_conn.c
+++ b/
clients/ms_conn.c
@@
-8,7
+8,11
@@
* http://www.schoonerinfotech.com/
*
*/
* http://www.schoonerinfotech.com/
*
*/
+
+#include "config.h"
+
#include <stdio.h>
#include <stdio.h>
+#include <inttypes.h>
#include <limits.h>
#include <sys/uio.h>
#include <event.h>
#include <limits.h>
#include <sys/uio.h>
#include <event.h>
@@
-17,6
+21,7
@@
#include <arpa/inet.h>
#include "ms_setting.h"
#include "ms_thread.h"
#include <arpa/inet.h>
#include "ms_setting.h"
#include "ms_thread.h"
+#include "ms_atomic.h"
/* for network write */
#define TRANSMIT_COMPLETE 0
/* for network write */
#define TRANSMIT_COMPLETE 0
@@
-36,12
+41,12
@@
static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
/* global increasing counter, generating request id for UDP */
static uint64_t key_prefix_seq= KEY_PREFIX_BASE;
/* global increasing counter, generating request id for UDP */
-static
in
t udp_request_id= 0;
+static
volatile uint32_
t udp_request_id= 0;
-extern
__thread ms_thread_t ms_thread
;
+extern
pthread_key_t ms_thread_key
;
/* generate upd request id */
/* generate upd request id */
-static
in
t ms_get_udp_request_id(void);
+static
uint32_
t ms_get_udp_request_id(void);
/* connect initialize */
/* connect initialize */
@@
-162,9
+167,9
@@
uint64_t ms_get_key_prefix(void)
*
* @return an unique UDP request id
*/
*
* @return an unique UDP request id
*/
-static
in
t ms_get_udp_request_id(void)
+static
uint32_
t ms_get_udp_request_id(void)
{
{
- return
__sync_fetch_and_add
(&udp_request_id, 1);
+ return
atomic_add_32_nv
(&udp_request_id, 1);
}
}
@@
-354,7
+359,7
@@
static int ms_conn_init(ms_conn_t *c,
if (! (ms_setting.facebook_test && is_udp))
{
if (! (ms_setting.facebook_test && is_udp))
{
-
__sync_fetch_and_add
(&ms_stats.active_conns, 1);
+
atomic_add_32
(&ms_stats.active_conns, 1);
}
return 0;
}
return 0;
@@
-394,11
+399,12
@@
static void ms_warmup_num_init(ms_conn_t *c)
*/
static int ms_item_win_init(ms_conn_t *c)
{
*/
static int ms_item_win_init(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
int exp_cnt= 0;
c->win_size= (int)ms_setting.win_size;
c->set_cursor= 0;
int exp_cnt= 0;
c->win_size= (int)ms_setting.win_size;
c->set_cursor= 0;
- c->exec_num= ms_thread
.
thread_ctx->exec_num_perconn;
+ c->exec_num= ms_thread
->
thread_ctx->exec_num_perconn;
c->remain_exec_num= c->exec_num;
c->item_win= (ms_task_item_t *)malloc(
c->remain_exec_num= c->exec_num;
c->item_win= (ms_task_item_t *)malloc(
@@
-448,6
+454,7
@@
static int ms_item_win_init(ms_conn_t *c)
*/
static int ms_conn_sock_init(ms_conn_t *c)
{
*/
static int ms_conn_sock_init(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
int i;
int ret_sfd;
int srv_idx= 0;
int i;
int ret_sfd;
int srv_idx= 0;
@@
-466,7
+473,7
@@
static int ms_conn_sock_init(ms_conn_t *c)
else
{
/* all the connections in a thread connects the same server */
else
{
/* all the connections in a thread connects the same server */
- srv_idx= ms_thread
.
thread_ctx->srv_idx;
+ srv_idx= ms_thread
->
thread_ctx->srv_idx;
}
if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
}
if (ms_network_connect(c, ms_setting.servers[srv_idx].srv_host_name,
@@
-541,6
+548,7
@@
static int ms_conn_sock_init(ms_conn_t *c)
*/
static int ms_conn_event_init(ms_conn_t *c)
{
*/
static int ms_conn_event_init(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
/* default event timeout 10 seconds */
struct timeval t=
{
/* default event timeout 10 seconds */
struct timeval t=
{
@@
-549,7
+557,7
@@
static int ms_conn_event_init(ms_conn_t *c)
short event_flags= EV_WRITE | EV_PERSIST;
event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
short event_flags= EV_WRITE | EV_PERSIST;
event_set(&c->event, c->sfd, event_flags, ms_event_handler, (void *)c);
- event_base_set(ms_thread
.
base, &c->event);
+ event_base_set(ms_thread
->
base, &c->event);
c->ev_flags= event_flags;
if (c->total_sfds == 1)
c->ev_flags= event_flags;
if (c->total_sfds == 1)
@@
-612,6
+620,7
@@
int ms_setup_conn(ms_conn_t *c)
*/
void ms_conn_free(ms_conn_t *c)
{
*/
void ms_conn_free(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
if (c != NULL)
{
if (c->hdrbuf != NULL)
if (c != NULL)
{
if (c->hdrbuf != NULL)
@@
-635,9
+644,9
@@
void ms_conn_free(ms_conn_t *c)
if (c->tcpsfd != NULL)
free(c->tcpsfd);
if (c->tcpsfd != NULL)
free(c->tcpsfd);
- if (--ms_thread
.
nactive_conn == 0)
+ if (--ms_thread
->
nactive_conn == 0)
{
{
- free(ms_thread
.
conn);
+ free(ms_thread
->
conn);
}
}
} /* ms_conn_free */
}
}
} /* ms_conn_free */
@@
-650,6
+659,7
@@
void ms_conn_free(ms_conn_t *c)
*/
static void ms_conn_close(ms_conn_t *c)
{
*/
static void ms_conn_close(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
assert(c != NULL);
/* delete the event, the socket and the connection */
assert(c != NULL);
/* delete the event, the socket and the connection */
@@
-669,7
+679,7
@@
static void ms_conn_close(ms_conn_t *c)
close(c->udpsfd);
}
close(c->udpsfd);
}
-
__sync_fetch_and_sub(&ms_stats.active_conns, 1
);
+
atomic_dec_32(&ms_stats.active_conns
);
ms_conn_free(c);
ms_conn_free(c);
@@
-681,7
+691,7
@@
static void ms_conn_close(ms_conn_t *c)
pthread_mutex_unlock(&ms_global.run_lock.lock);
}
pthread_mutex_unlock(&ms_global.run_lock.lock);
}
- if (ms_thread
.
nactive_conn == 0)
+ if (ms_thread
->
nactive_conn == 0)
{
pthread_exit(NULL);
}
{
pthread_exit(NULL);
}
@@
-880,8
+890,9
@@
static int ms_network_connect(ms_conn_t *c,
*/
static int ms_reconn(ms_conn_t *c)
{
*/
static int ms_reconn(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
int srv_idx= 0;
int srv_idx= 0;
- int srv_conn_cnt= 0;
+ int
32_t
srv_conn_cnt= 0;
if (ms_setting.rep_write_srv > 0)
{
if (ms_setting.rep_write_srv > 0)
{
@@
-890,7
+901,7
@@
static int ms_reconn(ms_conn_t *c)
}
else
{
}
else
{
- srv_idx= ms_thread
.
thread_ctx->srv_idx;
+ srv_idx= ms_thread
->
thread_ctx->srv_idx;
srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
@@
-898,8
+909,8
@@
static int ms_reconn(ms_conn_t *c)
close(c->sfd);
c->tcpsfd[c->cur_idx]= 0;
close(c->sfd);
c->tcpsfd[c->cur_idx]= 0;
- if (
__sync_fetch_and_add
(&ms_setting.servers[srv_idx].disconn_cnt, 1)
- % srv_conn_cnt == 0)
+ if (
atomic_add_32_nv
(&ms_setting.servers[srv_idx].disconn_cnt, 1)
+ %
(uint32_t)
srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
fprintf(stderr, "Server %s:%d disconnect\n",
{
gettimeofday(&ms_setting.servers[srv_idx].disconn_time, NULL);
fprintf(stderr, "Server %s:%d disconnect\n",
@@
-934,8
+945,8
@@
static int ms_reconn(ms_conn_t *c)
ms_setting.udp, &c->sfd) == 0)
{
c->tcpsfd[c->cur_idx]= c->sfd;
ms_setting.udp, &c->sfd) == 0)
{
c->tcpsfd[c->cur_idx]= c->sfd;
- if (
__sync_fetch_and_add
(&ms_setting.servers[srv_idx].reconn_cnt, 1)
- % srv_conn_cnt == 0)
+ if (
atomic_add_32_nv
(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ %
(uint32_t)
srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
int reconn_time=
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
int reconn_time=
@@
-980,6
+991,7
@@
static int ms_reconn(ms_conn_t *c)
*/
int ms_reconn_socks(ms_conn_t *c)
{
*/
int ms_reconn_socks(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
int srv_idx= 0;
int ret_sfd= 0;
int srv_conn_cnt= 0;
int srv_idx= 0;
int ret_sfd= 0;
int srv_conn_cnt= 0;
@@
-1017,7
+1029,7
@@
int ms_reconn_socks(ms_conn_t *c)
}
else
{
}
else
{
- srv_idx= ms_thread
.
thread_ctx->srv_idx;
+ srv_idx= ms_thread
->
thread_ctx->srv_idx;
srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
srv_conn_cnt= ms_setting.nconns / ms_setting.srv_cnt;
}
@@
-1028,8
+1040,8
@@
int ms_reconn_socks(ms_conn_t *c)
c->tcpsfd[i]= ret_sfd;
c->alive_sfds++;
c->tcpsfd[i]= ret_sfd;
c->alive_sfds++;
- if (
__sync_fetch_and_add
(&ms_setting.servers[srv_idx].reconn_cnt, 1)
- % srv_conn_cnt == 0)
+ if (
atomic_add_32_nv
(&ms_setting.servers[srv_idx].reconn_cnt, 1)
+ %
(uint32_t)
srv_conn_cnt == 0)
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
int reconn_time=
{
gettimeofday(&ms_setting.servers[srv_idx].reconn_time, NULL);
int reconn_time=
@@
-1565,7
+1577,7
@@
static int ms_udp_read(ms_conn_t *c, char *buf, int len)
if (res > 0)
{
if (res > 0)
{
-
__sync_fetch_and_add
(&ms_stats.bytes_read, res);
+
atomic_add_64
(&ms_stats.bytes_read, res);
c->rudpbytes+= res;
rbytes+= res;
if (res == avail)
c->rudpbytes+= res;
rbytes+= res;
if (res == avail)
@@
-1599,7
+1611,7
@@
static int ms_udp_read(ms_conn_t *c, char *buf, int len)
if (copybytes == -1)
{
if (copybytes == -1)
{
-
__sync_fetch_and_add
(&ms_stats.pkt_disorder, 1);
+
atomic_add_64
(&ms_stats.pkt_disorder, 1);
}
return copybytes;
}
return copybytes;
@@
-1677,7
+1689,7
@@
static int ms_try_read_network(ms_conn_t *c)
{
if (! c->udp)
{
{
if (! c->udp)
{
-
__sync_fetch_and_add
(&ms_stats.bytes_read, res);
+
atomic_add_64
(&ms_stats.bytes_read, res);
}
gotdata= 1;
c->rbytes+= res;
}
gotdata= 1;
c->rbytes+= res;
@@
-1741,7
+1753,7
@@
static void ms_verify_value(ms_conn_t *c,
if (curr_time.tv_sec - c->curr_task.item->client_time
> c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
{
if (curr_time.tv_sec - c->curr_task.item->client_time
> c->curr_task.item->exp_time + EXPIRE_TIME_ERROR)
{
-
__sync_fetch_and_add
(&ms_stats.exp_get, 1);
+
atomic_add_64
(&ms_stats.exp_get, 1);
if (ms_setting.verbose)
{
if (ms_setting.verbose)
{
@@
-1755,7
+1767,7
@@
static void ms_verify_value(ms_conn_t *c,
"\n<%d expire time verification failed, "
"object expired but get it now\n"
"\tkey len: %d\n"
"\n<%d expire time verification failed, "
"object expired but get it now\n"
"\tkey len: %d\n"
- "\tkey: %
lx
%.*s\n"
+ "\tkey: %
" PRIx64 "
%.*s\n"
"\tset time: %s current time: %s "
"diff time: %d expire time: %d\n"
"\texpected data: \n"
"\tset time: %s current time: %s "
"diff time: %d expire time: %d\n"
"\texpected data: \n"
@@
-1782,14
+1794,14
@@
static void ms_verify_value(ms_conn_t *c,
if ((c->curr_task.item->value_size != vlen)
|| (memcmp(orignval, value, (size_t)vlen) != 0))
{
if ((c->curr_task.item->value_size != vlen)
|| (memcmp(orignval, value, (size_t)vlen) != 0))
{
-
__sync_fetch_and_add
(&ms_stats.vef_failed, 1);
+
atomic_add_64
(&ms_stats.vef_failed, 1);
if (ms_setting.verbose)
{
fprintf(stderr,
"\n<%d data verification failed\n"
"\tkey len: %d\n"
if (ms_setting.verbose)
{
fprintf(stderr,
"\n<%d data verification failed\n"
"\tkey len: %d\n"
- "\tkey: %
lx
%.*s\n"
+ "\tkey: %
" PRIx64"
%.*s\n"
"\texpected data len: %d\n"
"\texpected data: %.*s\n"
"\treceived data len: %d\n"
"\texpected data len: %d\n"
"\texpected data: %.*s\n"
"\treceived data len: %d\n"
@@
-2182,7
+2194,7
@@
static int ms_build_udp_headers(ms_conn_t *c)
hdr= c->hdrbuf;
for (i= 0; i < c->msgused; i++)
{
hdr= c->hdrbuf;
for (i= 0; i < c->msgused; i++)
{
- c->msglist[i].msg_iov[0].iov_base= hdr;
+ c->msglist[i].msg_iov[0].iov_base=
(void *)
hdr;
c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
*hdr++= (unsigned char)(c->request_id / 256);
*hdr++= (unsigned char)(c->request_id % 256);
c->msglist[i].msg_iov[0].iov_len= UDP_HEADER_SIZE;
*hdr++= (unsigned char)(c->request_id / 256);
*hdr++= (unsigned char)(c->request_id % 256);
@@
-2230,7
+2242,7
@@
static int ms_transmit(ms_conn_t *c)
res= sendmsg(c->sfd, m, 0);
if (res > 0)
{
res= sendmsg(c->sfd, m, 0);
if (res > 0)
{
-
__sync_fetch_and_add
(&ms_stats.bytes_written, res);
+
atomic_add_64
(&ms_stats.bytes_written, res);
/* We've written some of the data. Remove the completed
* iovec entries from the list of pending writes. */
/* We've written some of the data. Remove the completed
* iovec entries from the list of pending writes. */
@@
-2245,7
+2257,7
@@
static int ms_transmit(ms_conn_t *c)
* adjust it so the next write will do the rest. */
if (res > 0)
{
* adjust it so the next write will do the rest. */
if (res > 0)
{
- m->msg_iov->iov_base= (
unsigned char *)m->msg_iov->iov_base + res
;
+ m->msg_iov->iov_base= (
void *)((unsigned char *)m->msg_iov->iov_base + res)
;
m->msg_iov->iov_len-= (uint64_t)res;
}
return TRANSMIT_INCOMPLETE;
m->msg_iov->iov_len-= (uint64_t)res;
}
return TRANSMIT_INCOMPLETE;
@@
-2448,6
+2460,7
@@
static bool ms_update_event(ms_conn_t *c, const int new_flags)
*/
static bool ms_need_yield(ms_conn_t *c)
{
*/
static bool ms_need_yield(ms_conn_t *c)
{
+ ms_thread_t *ms_thread= pthread_getspecific(ms_thread_key);
int64_t tps= 0;
int64_t time_diff= 0;
struct timeval curr_time;
int64_t tps= 0;
int64_t time_diff= 0;
struct timeval curr_time;
@@
-2456,13
+2469,13
@@
static bool ms_need_yield(ms_conn_t *c)
if (ms_setting.expected_tps > 0)
{
gettimeofday(&curr_time, NULL);
if (ms_setting.expected_tps > 0)
{
gettimeofday(&curr_time, NULL);
- time_diff= ms_time_diff(&ms_thread
.
startup_time, &curr_time);
+ time_diff= ms_time_diff(&ms_thread
->
startup_time, &curr_time);
tps=
(int64_t)((task->get_opt
+ task->set_opt) / ((uint64_t)time_diff / 1000000));
/* current throughput is greater than expected throughput */
tps=
(int64_t)((task->get_opt
+ task->set_opt) / ((uint64_t)time_diff / 1000000));
/* current throughput is greater than expected throughput */
- if (tps > ms_thread
.
thread_ctx->tps_perconn)
+ if (tps > ms_thread
->
thread_ctx->tps_perconn)
{
return true;
}
{
return true;
}
@@
-2972,9
+2985,9
@@
int ms_mcd_set(ms_conn_t *c, ms_task_item_t *item)
}
}
}
}
-
__sync_fetch_and_add
(&ms_stats.obj_bytes,
-
item->key_size + item->value_size);
-
__sync_fetch_and_add
(&ms_stats.cmd_set, 1);
+
atomic_add_64
(&ms_stats.obj_bytes,
+ item->key_size + item->value_size);
+
atomic_add_64
(&ms_stats.cmd_set, 1);
return 0;
} /* ms_mcd_set */
return 0;
} /* ms_mcd_set */
@@
-3058,7
+3071,7
@@
int ms_mcd_get(ms_conn_t *c, ms_task_item_t *item, bool verify)
}
}
}
}
-
__sync_fetch_and_add
(&ms_stats.cmd_get, 1);
+
atomic_add_64
(&ms_stats.cmd_get, 1);
return 0;
} /* ms_mcd_get */
return 0;
} /* ms_mcd_get */
@@
-3157,7
+3170,7
@@
int ms_mcd_mlget(ms_conn_t *c)
for (int i= 0; i < c->mlget_task.mlget_num; i++)
{
item= c->mlget_task.mlget_item[i].item;
for (int i= 0; i < c->mlget_task.mlget_num; i++)
{
item= c->mlget_task.mlget_item[i].item;
-
__sync_fetch_and_add
(&ms_stats.cmd_get, 1);
+
atomic_add_64
(&ms_stats.cmd_get, 1);
}
return 0;
}
return 0;