+/* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
+ *
+ * Libmemcached library
+ *
+ * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * The names of its contributors may not be used to endorse or
+ * promote products derived from this software without specific prior
+ * written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
/*
* Summary: C++ interface for memcached server
*
{
memc= memcached("", 0);
if (memc)
+ {
memcached_server_add(memc, hostname.c_str(), port);
+ }
}
Memcache(memcached_st *clone)
* @param[in] keys vector of keys to select
* @return true if all keys are found
*/
- bool mget(std::vector<std::string> &keys)
+ bool mget(const std::vector<std::string>& keys)
{
std::vector<const char *> real_keys;
std::vector<size_t> key_len;
real_keys.reserve(keys.size());
key_len.reserve(keys.size());
- std::vector<std::string>::iterator it= keys.begin();
+ std::vector<std::string>::const_iterator it= keys.begin();
while (it != keys.end())
{
* @param[in] flags flags to store with the object
* @return true on succcess; false otherwise
*/
- bool setByKey(const std::string &master_key,
- const std::string &key,
+ bool setByKey(const std::string& master_key,
+ const std::string& key,
const std::vector<char> &value,
time_t expiration,
uint32_t flags)
* @param[in] flags flags to store with the objects
* @return true on success; false otherwise
*/
- bool setAll(std::vector<std::string> &keys,
- std::vector< std::vector<char> *> &values,
+ bool setAll(const std::vector<std::string>& keys,
+ const std::vector< std::vector<char> *>& values,
time_t expiration,
uint32_t flags)
{
bool retval= true;
- std::vector<std::string>::iterator key_it= keys.begin();
- std::vector< std::vector<char> *>::iterator val_it= values.begin();
+ std::vector<std::string>::const_iterator key_it= keys.begin();
+ std::vector< std::vector<char> *>::const_iterator val_it= values.begin();
while (key_it != keys.end())
{
retval= set((*key_it), *(*val_it), expiration, flags);
* @param[in] flags flags to store with the objects
* @return true on success; false otherwise
*/
- bool setAll(std::map<const std::string, std::vector<char> > &key_value_map,
+ bool setAll(const std::map<const std::string, std::vector<char> >& key_value_map,
time_t expiration,
uint32_t flags)
{
bool retval= true;
- std::map<const std::string, std::vector<char> >::iterator it= key_value_map.begin();
+ std::map<const std::string, std::vector<char> >::const_iterator it= key_value_map.begin();
while (it != key_value_map.end())
{
}
++it;
}
+
return true;
}
* @param[out] value store the result of the increment here
* @return true on success; false otherwise
*/
- bool increment(const std::string &key, uint32_t offset, uint64_t *value)
+ bool increment(const std::string& key, uint32_t offset, uint64_t *value)
{
return memcached_success(memcached_increment(memc, key.c_str(), key.length(), offset, value));
}
* @param[out] value store the result of the decrement here
* @return true on success; false otherwise
*/
- bool decrement(const std::string &key, uint32_t offset, uint64_t *value)
+ bool decrement(const std::string& key, uint32_t offset, uint64_t *value)
{
return memcached_success(memcached_decrement(memc, key.c_str(),
key.length(),
* @param[in] value of object to add
* @return true on success; false otherwise
*/
- bool add(const std::string &key, const std::vector<char> &value)
+ bool add(const std::string& key, const std::vector<char>& value)
{
return memcached_success(memcached_add(memc, key.c_str(), key.length(),
&value[0], value.size(), 0, 0));
* @param[in] value of object to add
* @return true on success; false otherwise
*/
- bool addByKey(const std::string &master_key,
- const std::string &key,
- const std::vector<char> &value)
+ bool addByKey(const std::string& master_key,
+ const std::string& key,
+ const std::vector<char>& value)
{
return memcached_success(memcached_add_by_key(memc,
master_key.c_str(),
* @param[in[ value value to replace object with
* @return true on success; false otherwise
*/
- bool replace(const std::string &key, const std::vector<char> &value)
+ bool replace(const std::string& key, const std::vector<char>& value)
{
return memcached_success(memcached_replace(memc, key.c_str(), key.length(),
&value[0], value.size(),
* @param[in[ value value to replace object with
* @return true on success; false otherwise
*/
- bool replaceByKey(const std::string &master_key,
- const std::string &key,
- const std::vector<char> &value)
+ bool replaceByKey(const std::string& master_key,
+ const std::string& key,
+ const std::vector<char>& value)
{
return memcached_success(memcached_replace_by_key(memc,
master_key.c_str(),
* @param[in] value data to prepend to object's value
* @return true on success; false otherwise
*/
- bool prepend(const std::string &key, const std::vector<char> &value)
+ bool prepend(const std::string& key, const std::vector<char>& value)
{
return memcached_success(memcached_prepend(memc, key.c_str(), key.length(),
&value[0], value.size(), 0, 0));
* @param[in] value data to prepend to object's value
* @return true on success; false otherwise
*/
- bool prependByKey(const std::string &master_key,
- const std::string &key,
- const std::vector<char> &value)
+ bool prependByKey(const std::string& master_key,
+ const std::string& key,
+ const std::vector<char>& value)
{
return memcached_success(memcached_prepend_by_key(memc,
master_key.c_str(),
* @param[in] value data to append to object's value
* @return true on success; false otherwise
*/
- bool append(const std::string &key, const std::vector<char> &value)
+ bool append(const std::string& key, const std::vector<char>& value)
{
return memcached_success(memcached_append(memc,
key.c_str(),
* @param[in] value data to append to object's value
* @return true on success; false otherwise
*/
- bool appendByKey(const std::string &master_key,
- const std::string &key,
+ bool appendByKey(const std::string& master_key,
+ const std::string& key,
const std::vector<char> &value)
{
return memcached_success(memcached_append_by_key(memc,
* @param[in] value value to store for object in server
* @param[in] cas_arg "cas" value
*/
- bool cas(const std::string &key,
- const std::vector<char> &value,
+ bool cas(const std::string& key,
+ const std::vector<char>& value,
uint64_t cas_arg)
{
return memcached_success(memcached_cas(memc, key.c_str(), key.length(),
* @param[in] value value to store for object in server
* @param[in] cas_arg "cas" value
*/
- bool casByKey(const std::string &master_key,
- const std::string &key,
+ bool casByKey(const std::string& master_key,
+ const std::string& key,
const std::vector<char> &value,
uint64_t cas_arg)
{
* @param[in] key key of object to delete
* @return true on success; false otherwise
*/
- bool remove(const std::string &key)
+ bool remove(const std::string& key)
{
return memcached_success(memcached_delete(memc, key.c_str(), key.length(), 0));
}
* @param[in] expiration time to delete the object after
* @return true on success; false otherwise
*/
- bool remove(const std::string &key, time_t expiration)
+ bool remove(const std::string& key, time_t expiration)
{
return memcached_success(memcached_delete(memc,
key.c_str(),
* @param[in] key key of object to delete
* @return true on success; false otherwise
*/
- bool removeByKey(const std::string &master_key,
- const std::string &key)
+ bool removeByKey(const std::string& master_key,
+ const std::string& key)
{
return memcached_success(memcached_delete_by_key(memc,
master_key.c_str(),
* @param[in] expiration time to delete the object after
* @return true on success; false otherwise
*/
- bool removeByKey(const std::string &master_key,
- const std::string &key,
+ bool removeByKey(const std::string& master_key,
+ const std::string& key,
time_t expiration)
{
return memcached_success(memcached_delete_by_key(memc,
* stats
* @return true on success; false otherwise
*/
- bool getStats(std::map< std::string, std::map<std::string, std::string> >
- &stats_map)
+ bool getStats(std::map< std::string, std::map<std::string, std::string> >& stats_map)
{
memcached_return_t rc;
memcached_stat_st *stats= memcached_stat(memc, NULL, &rc);
*/
for (uint32_t x= 0; x < server_count; x++)
{
- memcached_server_instance_st instance=
- memcached_server_instance_by_position(memc, x);
+ memcached_server_instance_st instance= memcached_server_instance_by_position(memc, x);
std::ostringstream strstm;
std::string server_name(memcached_server_name(instance));
server_name.append(":");
server_name.append(strstm.str());
std::map<std::string, std::string> server_stats;
- char **list= NULL;
- char **ptr= NULL;
-
- list= memcached_stat_get_keys(memc, &stats[x], &rc);
- for (ptr= list; *ptr; ptr++)
+ char **list= memcached_stat_get_keys(memc, &stats[x], &rc);
+ for (char** ptr= list; *ptr; ptr++)
{
char *value= memcached_stat_get_value(memc, &stats[x], *ptr, &rc);
server_stats[*ptr]= value;
#define memcached_server_response_decrement(A) (A)->cursor_active--
#define memcached_server_response_reset(A) (A)->cursor_active=0
-LIBMEMCACHED_LOCAL
-memcached_return_t memcached_purge(memcached_server_write_instance_st ptr);
+bool memcached_purge(memcached_server_write_instance_st ptr);
#ifdef __cplusplus
}
while (--loop_max) // Should only loop on cases of ERESTART or EINTR
{
- int error= poll(fds, 1, server->root->connect_timeout);
- switch (error)
+ int number_of;
+ if ((number_of= poll(fds, 1, server->root->connect_timeout)) <= 0)
{
- case 1:
+ if (number_of == -1)
{
- int err;
- socklen_t len= sizeof (err);
- if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+ int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
+ switch (local_errno)
{
- // We check the value to see what happened wth the socket.
- if (err == 0)
- {
- return MEMCACHED_SUCCESS;
- }
- errno= err;
- }
-
- return memcached_set_errno(*server, err, MEMCACHED_AT);
- }
- case 0:
- {
- server->io_wait_count.timeouts++;
- return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
- }
-
- default: // A real error occurred and we need to completely bail
- switch (get_socket_errno())
- {
#ifdef TARGET_OS_LINUX
- case ERESTART:
+ case ERESTART:
#endif
- case EINTR:
- continue;
+ case EINTR:
+ continue;
- case EFAULT:
- case ENOMEM:
- return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
+ case EFAULT:
+ case ENOMEM:
+ return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
- case EINVAL:
- return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
+ case EINVAL:
+ return memcached_set_error(*server, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
- default: // This should not happen
- if (fds[0].revents & POLLERR)
- {
- int err;
- socklen_t len= sizeof(err);
- if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+ default: // This should not happen
+ if (fds[0].revents & POLLERR)
{
- if (err == 0)
+ int err;
+ socklen_t len= sizeof(err);
+ if (getsockopt(server->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
{
- // This should never happen, if it does? Punt.
- continue;
+ if (err == 0)
+ {
+ // This should never happen, if it does? Punt.
+ continue;
+ }
+ local_errno= err;
}
- errno= err;
}
- }
- int local_errno= get_socket_errno(); // We cache in case closesocket() modifies errno
+ assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
+ (void)closesocket(server->fd);
+ server->fd= INVALID_SOCKET;
+ server->state= MEMCACHED_SERVER_STATE_NEW;
- assert_msg(server->fd != INVALID_SOCKET, "poll() was passed an invalid file descriptor");
- (void)closesocket(server->fd);
- server->fd= INVALID_SOCKET;
- server->state= MEMCACHED_SERVER_STATE_NEW;
+ return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
+ }
+ }
+ assert(number_of == 0);
- return memcached_set_errno(*server, local_errno, MEMCACHED_AT);
+ server->io_wait_count.timeouts++;
+ return memcached_set_error(*server, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ }
+
+ if (fds[0].revents & POLLERR or
+ fds[0].revents & POLLHUP or
+ fds[0].revents & POLLNVAL)
+ {
+ int err;
+ socklen_t len= sizeof (err);
+ if (getsockopt(fds[0].fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
+ {
+ // We check the value to see what happened wth the socket.
+ if (err == 0)
+ {
+ return MEMCACHED_SUCCESS;
+ }
+ errno= err;
}
+
+ return memcached_set_errno(*server, err, MEMCACHED_AT);
}
+ assert(fds[0].revents & POLLIN or fds[0].revents & POLLOUT);
+
+ return MEMCACHED_SUCCESS;
}
// This should only be possible from ERESTART or EINTR;
#ifdef HAVE_SNDTIMEO
if (server->root->snd_timeout)
{
- int error;
struct timeval waittime;
waittime.tv_sec= 0;
waittime.tv_usec= server->root->snd_timeout;
- error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
+ int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDTIMEO,
&waittime, (socklen_t)sizeof(struct timeval));
- WATCHPOINT_ASSERT(error == 0);
+ assert(error == 0);
}
#endif
#ifdef HAVE_RCVTIMEO
if (server->root->rcv_timeout)
{
- int error;
struct timeval waittime;
waittime.tv_sec= 0;
waittime.tv_usec= server->root->rcv_timeout;
- error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
- &waittime, (socklen_t)sizeof(struct timeval));
- WATCHPOINT_ASSERT(error == 0);
+ int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVTIMEO,
+ &waittime, (socklen_t)sizeof(struct timeval));
+ assert(error == 0);
}
#endif
if (server->root->flags.no_block)
{
- int error;
struct linger linger;
linger.l_onoff= 1;
linger.l_linger= 0; /* By default on close() just drop the socket */
- error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
- &linger, (socklen_t)sizeof(struct linger));
- WATCHPOINT_ASSERT(error == 0);
+ int error= setsockopt(server->fd, SOL_SOCKET, SO_LINGER,
+ &linger, (socklen_t)sizeof(struct linger));
+ assert(error == 0);
}
if (server->root->flags.tcp_nodelay)
{
int flag= 1;
- int error;
- error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
- &flag, (socklen_t)sizeof(int));
- WATCHPOINT_ASSERT(error == 0);
+ int error= setsockopt(server->fd, IPPROTO_TCP, TCP_NODELAY,
+ &flag, (socklen_t)sizeof(int));
+ assert(error == 0);
}
if (server->root->flags.tcp_keepalive)
{
int flag= 1;
- int error;
- error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
+ int error= setsockopt(server->fd, SOL_SOCKET, SO_KEEPALIVE,
&flag, (socklen_t)sizeof(int));
- WATCHPOINT_ASSERT(error == 0);
+ assert(error == 0);
}
#ifdef TCP_KEEPIDLE
if (server->root->tcp_keepidle > 0)
{
- int error;
-
- error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
- &server->root->tcp_keepidle, (socklen_t)sizeof(int));
- WATCHPOINT_ASSERT(error == 0);
+ int error= setsockopt(server->fd, IPPROTO_TCP, TCP_KEEPIDLE,
+ &server->root->tcp_keepidle, (socklen_t)sizeof(int));
+ assert(error == 0);
}
#endif
if (server->root->send_size > 0)
{
- int error;
-
- error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
- &server->root->send_size, (socklen_t)sizeof(int));
- WATCHPOINT_ASSERT(error == 0);
+ int error= setsockopt(server->fd, SOL_SOCKET, SO_SNDBUF,
+ &server->root->send_size, (socklen_t)sizeof(int));
+ assert(error == 0);
}
if (server->root->recv_size > 0)
{
- int error;
-
- error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
- &server->root->recv_size, (socklen_t)sizeof(int));
- WATCHPOINT_ASSERT(error == 0);
+ int error= setsockopt(server->fd, SOL_SOCKET, SO_RCVBUF,
+ &server->root->recv_size, (socklen_t)sizeof(int));
+ assert(error == 0);
}
case EISCONN: /* We were spinning waiting on connect */
{
- WATCHPOINT_ASSERT(0); // Programmer error
+ assert(0); // Programmer error
break;
}
return MEMCACHED_SUCCESS;
}
- ssize_t sent_length= memcached_io_writev(instance, vector, count, with_flush);
- size_t command_length= 0;
- for (uint32_t x= 0; x < count; ++x, vector++)
- {
- command_length+= vector->length;
- }
-
- if (sent_length == -1 or size_t(sent_length) != command_length)
+ bool sent_success= memcached_io_writev(instance, vector, count, with_flush);
+ if (sent_success == false)
{
if (memcached_last_error(instance->root) == MEMCACHED_SUCCESS)
{
memcached_return_t *error)
{
memcached_return_t unused;
- if (not error)
+ if (error == NULL)
+ {
error= &unused;
+ }
- if (not ptr)
+ if (ptr == NULL)
{
*error= MEMCACHED_INVALID_ARGUMENTS;
return NULL;
// create one.
if (memcached_is_initialized(&ptr->result))
{
- if (not (result= memcached_result_create(ptr, NULL)))
+ if ((result= memcached_result_create(ptr, NULL)) == NULL)
{
*error= MEMCACHED_MEMORY_ALLOCATION_FAILURE;
return NULL;
*
* Libmemcached library
*
- * Copyright (C) 2011 Data Differential, http://datadifferential.com/
+ * Copyright (C) 2011-2012 Data Differential, http://datadifferential.com/
* Copyright (C) 2006-2009 Brian Aker All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
if (value == NULL)
{
- if (ptr->get_key_failure && *error == MEMCACHED_NOTFOUND)
+ if (ptr->get_key_failure and *error == MEMCACHED_NOTFOUND)
{
- memcached_result_reset(&ptr->result);
- memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, &ptr->result);
+ memcached_result_st key_failure_result;
+ memcached_result_st* result_ptr= memcached_result_create(ptr, &key_failure_result);
+ memcached_return_t rc= ptr->get_key_failure(ptr, key, key_length, result_ptr);
/* On all failure drop to returning NULL */
if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
}
rc= memcached_set(ptr, key, key_length,
- (memcached_result_value(&ptr->result)),
- (memcached_result_length(&ptr->result)),
+ (memcached_result_value(result_ptr)),
+ (memcached_result_length(result_ptr)),
0,
- (memcached_result_flags(&ptr->result)));
+ (memcached_result_flags(result_ptr)));
- if (rc == MEMCACHED_BUFFERED && latch == 0)
+ if (rc == MEMCACHED_BUFFERED and latch == 0)
{
memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
}
else
{
rc= memcached_set(ptr, key, key_length,
- (memcached_result_value(&ptr->result)),
- (memcached_result_length(&ptr->result)),
+ (memcached_result_value(result_ptr)),
+ (memcached_result_length(result_ptr)),
0,
- (memcached_result_flags(&ptr->result)));
+ (memcached_result_flags(result_ptr)));
}
- if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
+ if (rc == MEMCACHED_SUCCESS or rc == MEMCACHED_BUFFERED)
{
*error= rc;
- *value_length= memcached_result_length(&ptr->result);
- *flags= memcached_result_flags(&ptr->result);
- return memcached_string_take_value(&ptr->result.value);
+ *value_length= memcached_result_length(result_ptr);
+ *flags= memcached_result_flags(result_ptr);
+ char *result_value= memcached_string_take_value(&result_ptr->value);
+ memcached_result_free(result_ptr);
+
+ return result_value;
}
}
+
+ memcached_result_free(result_ptr);
}
assert_msg(ptr->query_id == query_id +1, "Programmer error, the query_id was not incremented.");
}
hosts_connected++;
- if ((memcached_io_writev(instance, vector, 4, false)) == -1)
+ if ((memcached_io_writev(instance, vector, 4, false)) == false)
{
failures_occured_in_sending= true;
continue;
}
else
{
- if ((memcached_io_writev(instance, (vector + 1), 3, false)) == -1)
+ if ((memcached_io_writev(instance, (vector + 1), 3, false)) == false)
{
memcached_server_response_reset(instance);
failures_occured_in_sending= true;
LIBMEMCACHED_MEMCACHED_MGET_END();
- if (failures_occured_in_sending && success_happened)
+ if (failures_occured_in_sending and success_happened)
{
return MEMCACHED_SOME_ERRORS;
}
if (success_happened)
+ {
return MEMCACHED_SUCCESS;
+ }
return MEMCACHED_FAILURE; // Complete failure occurred
}
{ keys[x], key_length[x] }
};
- if (memcached_io_writev(instance, vector, 3, flush) == -1)
+ if (memcached_io_writev(instance, vector, 3, flush) == false)
{
memcached_server_response_reset(instance);
rc= MEMCACHED_SOME_ERRORS;
/* We just want one pending response per server */
memcached_server_response_reset(instance);
memcached_server_response_increment(instance);
- if ((x > 0 && x == ptr->io_key_prefetch) && memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
+ if ((x > 0 and x == ptr->io_key_prefetch) and memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
{
rc= MEMCACHED_SOME_ERRORS;
}
uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ);
if (randomize_read)
+ {
start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1);
+ }
/* Loop for each replica */
for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
uint32_t server= hash[x] + replica;
/* In case of randomized reads */
- if (randomize_read && ((server + start) <= (hash[x] + ptr->number_of_replicas)))
- server += start;
+ if (randomize_read and ((server + start) <= (hash[x] + ptr->number_of_replicas)))
+ {
+ server+= start;
+ }
while (server >= memcached_server_count(ptr))
{
{ keys[x], key_length[x] }
};
- if (memcached_io_writev(instance, vector, 3, true) == -1)
+ if (memcached_io_writev(instance, vector, 3, true) == false)
{
memcached_io_reset(instance);
dead_servers[server]= true;
uint32_t* hash= libmemcached_xvalloc(ptr, number_of_keys, uint32_t);
bool* dead_servers= libmemcached_xcalloc(ptr, memcached_server_count(ptr), bool);
- if (hash == NULL || dead_servers == NULL)
+ if (hash == NULL or dead_servers == NULL)
{
libmemcached_free(ptr, hash);
libmemcached_free(ptr, dead_servers);
}
memcached_error_free(*self);
+ memcached_result_reset(&self->result);
return MEMCACHED_SUCCESS;
}
{
do {
/* Just try a single read to grab what's available */
- ssize_t nr= recv(ptr->fd,
- ptr->read_ptr + ptr->read_data_length,
- MEMCACHED_MAX_BUFFER - ptr->read_data_length,
- MSG_DONTWAIT);
-
- switch (nr)
+ ssize_t nr;
+ if ((nr= recv(ptr->fd,
+ ptr->read_ptr + ptr->read_data_length,
+ MEMCACHED_MAX_BUFFER - ptr->read_data_length,
+ MSG_DONTWAIT)) <= 0)
{
- case SOCKET_ERROR:
+ if (nr == 0)
+ {
+ memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
+ }
+ else
{
switch (get_socket_errno())
{
memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
}
- break;
- case 0: // Shutdown on the socket has occurred
- {
- memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
- }
break;
+ }
+ else // We read data, append to our read buffer
+ {
+ ptr->read_data_length+= size_t(nr);
+ ptr->read_buffer_length+= size_t(nr);
- default:
- {
- ptr->read_data_length+= size_t(nr);
- ptr->read_buffer_length+= size_t(nr);
- return true;
- }
- break;
+ return true;
}
- } while (0);
+ } while (false);
}
+
return false;
}
*/
if (read_or_write == MEM_WRITE)
{
- if (memcached_fatal(memcached_purge(ptr)))
+ if (memcached_purge(ptr) == false)
{
return MEMCACHED_FAILURE;
}
}
struct pollfd fds;
- memset(&fds, 0, sizeof(pollfd));
fds.fd= ptr->fd;
fds.events= POLLIN;
+ fds.revents= 0;
if (read_or_write == MEM_WRITE) /* write */
{
*/
{
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- memcached_return_t rc= memcached_purge(ptr);
- if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED)
+ if (memcached_purge(ptr) == false)
{
return false;
}
/* fall through */
case ENOTCONN: // Programmer Error
- WATCHPOINT_ASSERT(0);
+ assert(0);
case ENOTSOCK:
- WATCHPOINT_ASSERT(0);
+ assert(0);
case EBADF:
assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state");
case EINVAL:
const void *buffer, size_t length, bool with_flush,
size_t& written)
{
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ assert(ptr->fd != INVALID_SOCKET);
assert(memcached_is_udp(ptr->root) == false);
const char *buffer_ptr= static_cast<const char *>(buffer);
return ssize_t(written);
}
-ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
+bool memcached_io_writev(memcached_server_write_instance_st ptr,
libmemcached_io_vector_st vector[],
const size_t number_of, const bool with_flush)
{
+ ssize_t complete_total= 0;
ssize_t total= 0;
for (size_t x= 0; x < number_of; x++, vector++)
{
+ complete_total+= vector->length;
if (vector->length)
{
size_t written;
if ((_io_write(ptr, vector->buffer, vector->length, false, written)) == false)
{
- return -1;
+ return false;
}
total+= written;
}
{
if (memcached_io_write(ptr) == false)
{
- return -1;
+ return false;
}
}
- return total;
+ return (complete_total == total);
}
}
/* Now let's look in the buffer and copy as we go! */
- while (ptr->read_buffer_length && total_nr < size && !line_complete)
+ while (ptr->read_buffer_length and total_nr < size and line_complete == false)
{
*buffer_ptr = *ptr->read_ptr;
if (*buffer_ptr == '\n')
ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
const void *buffer, size_t length, bool with_flush);
-ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
- libmemcached_io_vector_st vector[],
- const size_t number_of, const bool with_flush);
+bool memcached_io_writev(memcached_server_write_instance_st ptr,
+ libmemcached_io_vector_st vector[],
+ const size_t number_of, const bool with_flush);
memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr);
#define memcached_has_replicas(__object) ((__object)->root->number_of_replicas)
-#define memcached_set_purging(__object, __value) ((__object)->state.is_purging= (__value))
#define memcached_set_processing_input(__object, __value) ((__object)->state.is_processing_input= (__value))
#define memcached_set_initialized(__object, __value) ((__object)->options.is_initialized(= (__value))
#define memcached_set_allocated(__object, __value) ((__object)->options.is_allocated= (__value))
ptr->options.is_allocated= true;
}
-#if 0
- memcached_set_purging(ptr, false);
- memcached_set_processing_input(ptr, false);
-#endif
-
if (_memcached_init(ptr) == false)
{
memcached_free(ptr);
#include <libmemcached/common.h>
+#define memcached_set_purging(__object, __value) ((__object)->state.is_purging= (__value))
-memcached_return_t memcached_purge(memcached_server_write_instance_st ptr)
+class Purge
+{
+public:
+ Purge(memcached_st* arg) :
+ _memc(arg)
+ {
+ memcached_set_purging(_memc, true);
+ }
+
+ ~Purge()
+ {
+ memcached_set_purging(_memc, false);
+ }
+
+private:
+ memcached_st* _memc;
+};
+
+class PollTimeout
+{
+public:
+ PollTimeout(memcached_st* arg) :
+ _timeout(arg->poll_timeout),
+ _origin(arg->poll_timeout)
+ {
+ _origin = 2000;
+ }
+
+ ~PollTimeout()
+ {
+ _origin= _timeout;
+ }
+
+private:
+ int32_t _timeout;
+ int32_t& _origin;
+};
+
+bool memcached_purge(memcached_server_write_instance_st ptr)
{
- memcached_return_t ret= MEMCACHED_SUCCESS;
memcached_st *root= (memcached_st *)ptr->root;
if (memcached_is_purging(ptr->root) || /* already purging */
(ptr->io_bytes_sent >= ptr->root->io_bytes_watermark &&
memcached_server_response_count(ptr) < 2))
{
- return MEMCACHED_SUCCESS;
+ return true;
}
/*
memcached_io_write and memcached_response may call memcached_purge
so we need to be able stop any recursion..
*/
- memcached_set_purging(root, true);
+ Purge set_purge(root);
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
/*
*/
if (memcached_io_write(ptr) == false)
{
- memcached_set_purging(root, true);
-
- return memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+ memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+ return false;
}
WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ bool is_successful= true;
uint32_t no_msg= memcached_server_response_count(ptr) - 1;
if (no_msg > 0)
{
memcached_result_st result;
- memcached_result_st *result_ptr;
/*
* We need to increase the timeout, because we might be waiting for
* data to be sent from the server (the commands was in the output buffer
* and just flushed
*/
- const int32_t timeo= ptr->root->poll_timeout;
- root->poll_timeout= 2000;
+ PollTimeout poll_timeout(ptr->root);
- result_ptr= memcached_result_create(root, &result);
- WATCHPOINT_ASSERT(result_ptr);
+ memcached_result_st* result_ptr= memcached_result_create(root, &result);
+ assert(result_ptr);
for (uint32_t x= 0; x < no_msg; x++)
{
{
WATCHPOINT_ERROR(rc);
memcached_io_reset(ptr);
- ret= rc;
-#if 0
- ret= memcached_set_error(*ptr, rc, MEMCACHED_AT);
-#endif
+ is_successful= false;
}
if (ptr->root->callbacks != NULL)
}
memcached_result_free(result_ptr);
- root->poll_timeout= timeo;
}
- memcached_set_purging(root, false);
- return ret;
+ return is_successful;
}
{ data, len }
};
- if (memcached_io_writev(server, vector, 3, true) == -1)
+ if (memcached_io_writev(server, vector, 3, true) == false)
{
rc= MEMCACHED_WRITE_FAILURE;
break;
}
#define READ_THROUGH_VALUE "set for me"
-static memcached_return_t read_through_trigger(memcached_st *memc,
- char *key,
- size_t key_length,
+static memcached_return_t read_through_trigger(memcached_st *, // memc
+ char *, // key
+ size_t, // key_length,
memcached_result_st *result)
{
- (void)memc;(void)key;(void)key_length;
return memcached_result_set_value(result, READ_THROUGH_VALUE, strlen(READ_THROUGH_VALUE));
}
&string_length, &flags, &rc);
test_compare(MEMCACHED_SUCCESS, rc);
- test_compare(string_length, sizeof(READ_THROUGH_VALUE) -1);
- test_true(string[sizeof(READ_THROUGH_VALUE) -1] == 0);
+ test_compare(sizeof(READ_THROUGH_VALUE) -1, string_length);
+ test_compare(0, string[sizeof(READ_THROUGH_VALUE) -1]);
test_strcmp(READ_THROUGH_VALUE, string);
free(string);