{
struct pollfd fds[1];
fds[0].fd= server->fd;
- fds[0].events= POLLOUT;
+ fds[0].events= server->events();
+ fds[0].revents= 0;
size_t loop_max= 5;
return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
}
+#if 0
+ server->revents(fds[0].revents);
+#endif
+
if (fds[0].revents & POLLERR or
fds[0].revents & POLLHUP or
fds[0].revents & POLLNVAL)
#ifndef WIN32
WATCHPOINT_ASSERT(server->fd == INVALID_SOCKET);
- int type= SOCK_STREAM;
- if (SOCK_CLOEXEC)
- {
- type|= SOCK_CLOEXEC;
- }
+ do {
+ int type= SOCK_STREAM;
+ if (SOCK_CLOEXEC)
+ {
+ type|= SOCK_CLOEXEC;
+ }
- if (SOCK_NONBLOCK)
- {
- type|= SOCK_NONBLOCK;
- }
+ if (SOCK_NONBLOCK)
+ {
+ type|= SOCK_NONBLOCK;
+ }
- if ((server->fd= socket(AF_UNIX, type, 0)) < 0)
- {
- return memcached_set_errno(*server, errno, NULL);
- }
+ if ((server->fd= socket(AF_UNIX, type, 0)) < 0)
+ {
+ return memcached_set_errno(*server, errno, NULL);
+ }
- struct sockaddr_un servAddr;
+ struct sockaddr_un servAddr;
- memset(&servAddr, 0, sizeof (struct sockaddr_un));
- servAddr.sun_family= AF_UNIX;
- strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
+ memset(&servAddr, 0, sizeof (struct sockaddr_un));
+ servAddr.sun_family= AF_UNIX;
+ strncpy(servAddr.sun_path, server->hostname, sizeof(servAddr.sun_path)); /* Copy filename */
- do {
if (connect(server->fd, (struct sockaddr *)&servAddr, sizeof(servAddr)) < 0)
{
switch (errno)
{
case EINPROGRESS:
case EALREADY:
+ server->events(POLLOUT);
+ break;
+
case EINTR:
+ (void)closesocket(server->fd);
+ server->fd= INVALID_SOCKET;
continue;
case EISCONN: /* We were spinning waiting on connect */
{
assert(0); // Programmer error
- break;
+ (void)closesocket(server->fd);
+ server->fd= INVALID_SOCKET;
+ continue;
}
default:
WATCHPOINT_ERRNO(errno);
+ (void)closesocket(server->fd);
+ server->fd= INVALID_SOCKET;
return memcached_set_errno(*server, errno, MEMCACHED_AT);
}
}
case EINPROGRESS: // nonblocking mode - first return
case EALREADY: // nonblocking mode - subsequent returns
{
+ server->events(POLLOUT);
server->state= MEMCACHED_SERVER_STATE_IN_PROGRESS;
memcached_return_t rc= connect_poll(server);
// Collect the returned items
org::libmemcached::Instance* instance;
- while ((instance= memcached_io_get_readable_server(memc)))
+ memcached_return_t read_ret= MEMCACHED_SUCCESS;
+ while ((instance= memcached_io_get_readable_server(memc, read_ret)))
{
memcached_return_t response_rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
if (response_rc == MEMCACHED_ITEM)
*error= MEMCACHED_MAXIMUM_RETURN; // We use this to see if we ever go into the loop
org::libmemcached::Instance *server;
- while ((server= memcached_io_get_readable_server(ptr)))
+ memcached_return_t read_ret= MEMCACHED_SUCCESS;
+ while ((server= memcached_io_get_readable_server(ptr, read_ret)))
{
char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
*error= memcached_response(server, buffer, sizeof(buffer), result);
{
self->options.is_shutting_down= false;
self->options.is_dead= false;
+ self->options.ready= false;
+ self->_events= 0;
+ self->_revents= 0;
self->cursor_active_= 0;
self->port_= port;
self->fd= INVALID_SOCKET;
return self;
}
+void org::libmemcached::Instance::events(short arg)
+{
+ if ((_events | arg) == _events)
+ {
+ return;
+ }
+
+ _events|= arg;
+}
+
+void org::libmemcached::Instance::revents(short arg)
+{
+ if (arg)
+ {
+ options.ready= true;
+ }
+
+ _revents= arg;
+ _events&= short(~arg);
+}
+
org::libmemcached::Instance* __instance_create_with(memcached_st *memc,
org::libmemcached::Instance* self,
const memcached_string_t& hostname,
}
struct {
- bool is_allocated:1;
- bool is_initialized:1;
- bool is_shutting_down:1;
- bool is_dead:1;
+ bool is_allocated;
+ bool is_initialized;
+ bool is_shutting_down;
+ bool is_dead;
+ bool ready;
} options;
+
+ short _events;
+ short _revents;
+
+ short events(void)
+ {
+ return _events;
+ }
+
+ short revents(void)
+ {
+ return _revents;
+ }
+
+ void events(short);
+ void revents(short);
+
uint32_t cursor_active_;
in_port_t port_;
memcached_socket_t fd;
ptr->fd= INVALID_SOCKET;
}
-org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc)
+org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&)
{
#define MAX_SERVERS_TO_POLL 100
struct pollfd fds[MAX_SERVERS_TO_POLL];
void *dta,
const size_t size);
-org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc);
+org::libmemcached::Instance* memcached_io_get_readable_server(memcached_st *memc, memcached_return_t&);
memcached_return_t memcached_io_slurp(org::libmemcached::Instance* ptr);
#define memcached_is_auto_eject_hosts(__object) ((__object)->flags.auto_eject_hosts)
#define memcached_is_use_sort_hosts(__object) ((__object)->flags.use_sort_hosts)
+#define memcached_is_ready(__object) ((__object)->options.ready)
+
#define memcached_is_weighted_ketama(__object) ((__object)->ketama.weighted_)
+#define memcached_set_ready(__object, __flag) ((__object)->options.ready= (__flag))
+
#define memcached_set_aes(__object, __flag) ((__object).flags.is_aes= __flag)
#define memcached_set_udp(__object, __flag) ((__object).flags.use_udp= __flag)
#define memcached_set_verify_key(__object, __flag) ((__object).flags.verify_key= __flag)
{
// Collect the returned items
org::libmemcached::Instance* instance;
- while ((instance= memcached_io_get_readable_server(ptr)))
+ memcached_return_t readable_error;
+ while ((instance= memcached_io_get_readable_server(ptr, readable_error)))
{
memcached_return_t rrc= memcached_response(instance, NULL);
if (memcached_failed(rrc))
{
// Collect the returned items
org::libmemcached::Instance* instance;
- while ((instance= memcached_io_get_readable_server(ptr)))
+ memcached_return_t readable_error;
+ while ((instance= memcached_io_get_readable_server(ptr, readable_error)))
{
char buffer[32];
memcached_return_t rrc= memcached_response(instance, buffer, sizeof(buffer), NULL);