MEM_WRITE
};
-static ssize_t io_flush(memcached_server_write_instance_st ptr,
- const bool with_flush,
- memcached_return_t *error);
-static void increment_udp_message_id(memcached_server_write_instance_st ptr);
+/*
+ * The udp request id consists of two seperate sections
+ * 1) The thread id
+ * 2) The message number
+ * The thread id should only be set when the memcached_st struct is created
+ * and should not be changed.
+ *
+ * The message num is incremented for each new message we send, this function
+ * extracts the message number from message_id, increments it and then
+ * writes the new value back into the header
+ */
+static void increment_udp_message_id(memcached_server_write_instance_st ptr)
+{
+ struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+ uint16_t cur_req= get_udp_datagram_request_id(header);
+ int msg_num= get_msg_num_from_request_id(cur_req);
+ int thread_id= get_thread_id_from_request_id(cur_req);
+
+ if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
+ msg_num= 0;
+
+ header->request_id= htons((uint16_t) (thread_id | msg_num));
+}
+
+/**
+ * Try to fill the input buffer for a server with as much
+ * data as possible.
+ *
+ * @param ptr the server to pack
+ */
+static bool repack_input_buffer(memcached_server_write_instance_st ptr)
+{
+ if (ptr->read_ptr != ptr->read_buffer)
+ {
+ /* Move all of the data to the beginning of the buffer so
+ ** that we can fit more data into the buffer...
+ */
+ memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
+ ptr->read_ptr= ptr->read_buffer;
+ ptr->read_data_length= ptr->read_buffer_length;
+ }
+
+ /* There is room in the buffer, try to fill it! */
+ if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
+ {
+ do {
+ /* Just try a single read to grab what's available */
+ ssize_t nr= recv(ptr->fd,
+ ptr->read_ptr + ptr->read_data_length,
+ MEMCACHED_MAX_BUFFER - ptr->read_data_length,
+ MSG_DONTWAIT);
+
+ switch (nr)
+ {
+ case SOCKET_ERROR:
+ {
+ switch (get_socket_errno())
+ {
+ case EINTR:
+ continue;
+
+ case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+ case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+ case ERESTART:
+#endif
+ break; // No IO is fine, we can just move on
+
+ default:
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ }
+ }
+ break;
+
+ case 0: // Shutdown on the socket has occurred
+ {
+ memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
+ }
+ break;
+
+ default:
+ {
+ ptr->read_data_length+= size_t(nr);
+ ptr->read_buffer_length+= size_t(nr);
+ return true;
+ }
+ break;
+ }
+ } while (0);
+ }
+ return false;
+}
+
+/**
+ * If the we have callbacks connected to this server structure
+ * we may start process the input queue and fire the callbacks
+ * for the incomming messages. This function is _only_ called
+ * when the input buffer is full, so that we _know_ that we have
+ * at least _one_ message to process.
+ *
+ * @param ptr the server to star processing iput messages for
+ * @return true if we processed anything, false otherwise
+ */
+static bool process_input_buffer(memcached_server_write_instance_st ptr)
+{
+ /*
+ ** We might be able to process some of the response messages if we
+ ** have a callback set up
+ */
+ if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
+ {
+ /*
+ * We might have responses... try to read them out and fire
+ * callbacks
+ */
+ memcached_callback_st cb= *ptr->root->callbacks;
+
+ memcached_set_processing_input((memcached_st *)ptr->root, true);
+
+ char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+ memcached_return_t error;
+ memcached_st *root= (memcached_st *)ptr->root;
+ error= memcached_response(ptr, buffer, sizeof(buffer),
+ &root->result);
+
+ memcached_set_processing_input(root, false);
+
+ if (error == MEMCACHED_SUCCESS)
+ {
+ for (unsigned int x= 0; x < cb.number_of_callback; x++)
+ {
+ error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
+ if (error != MEMCACHED_SUCCESS)
+ break;
+ }
+
+ /* @todo what should I do with the error message??? */
+ }
+ /* @todo what should I do with other error messages?? */
+ return true;
+ }
+
+ return false;
+}
static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
memc_read_or_write read_or_write)
return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
-memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+static ssize_t io_flush(memcached_server_write_instance_st ptr,
+ const bool with_flush,
+ memcached_return_t *error)
{
- return io_wait(ptr, MEM_WRITE);
-}
-
-/**
- * Try to fill the input buffer for a server with as much
- * data as possible.
- *
- * @param ptr the server to pack
+ /*
+ ** We might want to purge the input buffer if we haven't consumed
+ ** any output yet... The test for the limits is the purge is inline
+ ** in the purge function to avoid duplicating the logic..
*/
-static bool repack_input_buffer(memcached_server_write_instance_st ptr)
-{
- if (ptr->read_ptr != ptr->read_buffer)
{
- /* Move all of the data to the beginning of the buffer so
- ** that we can fit more data into the buffer...
- */
- memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
- ptr->read_ptr= ptr->read_buffer;
- ptr->read_data_length= ptr->read_buffer_length;
+ memcached_return_t rc;
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ rc= memcached_purge(ptr);
+
+ if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+ {
+ return -1;
+ }
}
+ ssize_t sent_length;
+ size_t return_length;
+ char *local_write_ptr= ptr->write_buffer;
+ size_t write_length= ptr->write_buffer_offset;
- /* There is room in the buffer, try to fill it! */
- if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
+ *error= MEMCACHED_SUCCESS;
+
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+
+ // UDP Sanity check, make sure that we are not sending somthing too big
+ if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
{
- do {
- /* Just try a single read to grab what's available */
- ssize_t nr= recv(ptr->fd,
- ptr->read_ptr + ptr->read_data_length,
- MEMCACHED_MAX_BUFFER - ptr->read_data_length,
- MSG_DONTWAIT);
+ *error= MEMCACHED_WRITE_FAILURE;
+ return -1;
+ }
- switch (nr)
+ if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
+ && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
+ {
+ return 0;
+ }
+
+ /* Looking for memory overflows */
+#if defined(DEBUG)
+ if (write_length == MEMCACHED_MAX_BUFFER)
+ WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
+ WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
+#endif
+
+ return_length= 0;
+ while (write_length)
+ {
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ WATCHPOINT_ASSERT(write_length > 0);
+ sent_length= 0;
+ if (ptr->type == MEMCACHED_CONNECTION_UDP)
+ increment_udp_message_id(ptr);
+
+ WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+ if (with_flush)
+ {
+ sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
+ }
+ else
+ {
+ sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
+ }
+
+ if (sent_length == SOCKET_ERROR)
+ {
+ memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+#if 0 // @todo I should look at why we hit this bit of code hard frequently
+ WATCHPOINT_ERRNO(get_socket_errno());
+ WATCHPOINT_NUMBER(get_socket_errno());
+#endif
+ switch (get_socket_errno())
{
- case SOCKET_ERROR:
+ case ENOBUFS:
+ continue;
+ case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+ case EAGAIN:
+#endif
{
- switch (get_socket_errno())
+ /*
+ * We may be blocked on write because the input buffer
+ * is full. Let's check if we have room in our input
+ * buffer for more data and retry the write before
+ * waiting..
+ */
+ if (repack_input_buffer(ptr) or
+ process_input_buffer(ptr))
{
- case EINTR:
continue;
-
- case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
-#endif
-#ifdef TARGET_OS_LINUX
- case ERESTART:
-#endif
- break; // No IO is fine, we can just move on
-
- default:
- memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
}
- }
- break;
- case 0: // Shutdown on the socket has occurred
- {
- memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
- }
- break;
+ memcached_return_t rc= io_wait(ptr, MEM_WRITE);
+ if (memcached_success(rc))
+ {
+ continue;
+ }
+ else if (rc == MEMCACHED_TIMEOUT)
+ {
+ *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+ return -1;
+ }
- default:
- {
- ptr->read_data_length+= size_t(nr);
- ptr->read_buffer_length+= size_t(nr);
- return true;
+ memcached_quit_server(ptr, true);
+ *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ return -1;
}
- break;
+ case ENOTCONN:
+ case EPIPE:
+ default:
+ memcached_quit_server(ptr, true);
+ *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+ WATCHPOINT_ASSERT(ptr->fd == -1);
+ return -1;
}
- } while (0);
- }
- return false;
-}
+ }
-/**
- * If the we have callbacks connected to this server structure
- * we may start process the input queue and fire the callbacks
- * for the incomming messages. This function is _only_ called
- * when the input buffer is full, so that we _know_ that we have
- * at least _one_ message to process.
- *
- * @param ptr the server to star processing iput messages for
- * @return true if we processed anything, false otherwise
- */
-static bool process_input_buffer(memcached_server_write_instance_st ptr)
-{
- /*
- ** We might be able to process some of the response messages if we
- ** have a callback set up
- */
- if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
- {
- /*
- * We might have responses... try to read them out and fire
- * callbacks
- */
- memcached_callback_st cb= *ptr->root->callbacks;
+ if (ptr->type == MEMCACHED_CONNECTION_UDP and
+ (size_t)sent_length != write_length)
+ {
+ memcached_quit_server(ptr, true);
+ *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+ return -1;
+ }
- memcached_set_processing_input((memcached_st *)ptr->root, true);
+ ptr->io_bytes_sent += (uint32_t) sent_length;
- char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
- memcached_return_t error;
- memcached_st *root= (memcached_st *)ptr->root;
- error= memcached_response(ptr, buffer, sizeof(buffer),
- &root->result);
+ local_write_ptr+= sent_length;
+ write_length-= (uint32_t) sent_length;
+ return_length+= (uint32_t) sent_length;
+ }
- memcached_set_processing_input(root, false);
+ WATCHPOINT_ASSERT(write_length == 0);
+ // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
+ // ptr->write_buffer_offset);
- if (error == MEMCACHED_SUCCESS)
- {
- for (unsigned int x= 0; x < cb.number_of_callback; x++)
- {
- error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
- if (error != MEMCACHED_SUCCESS)
- break;
- }
+ // if we are a udp server, the begining of the buffer is reserverd for
+ // the upd frame header
+ if (ptr->type == MEMCACHED_CONNECTION_UDP)
+ ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
+ else
+ ptr->write_buffer_offset= 0;
- /* @todo what should I do with the error message??? */
- }
- /* @todo what should I do with other error messages?? */
- return true;
- }
+ return (ssize_t) return_length;
+}
- return false;
+memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+{
+ return io_wait(ptr, MEM_WRITE);
}
memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
return NULL;
}
-static ssize_t io_flush(memcached_server_write_instance_st ptr,
- const bool with_flush,
- memcached_return_t *error)
-{
- /*
- ** We might want to purge the input buffer if we haven't consumed
- ** any output yet... The test for the limits is the purge is inline
- ** in the purge function to avoid duplicating the logic..
- */
- {
- memcached_return_t rc;
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- rc= memcached_purge(ptr);
-
- if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
- {
- return -1;
- }
- }
- ssize_t sent_length;
- size_t return_length;
- char *local_write_ptr= ptr->write_buffer;
- size_t write_length= ptr->write_buffer_offset;
-
- *error= MEMCACHED_SUCCESS;
-
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-
- // UDP Sanity check, make sure that we are not sending somthing too big
- if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
- {
- *error= MEMCACHED_WRITE_FAILURE;
- return -1;
- }
-
- if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
- && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
- {
- return 0;
- }
-
- /* Looking for memory overflows */
-#if defined(DEBUG)
- if (write_length == MEMCACHED_MAX_BUFFER)
- WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
- WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
-#endif
-
- return_length= 0;
- while (write_length)
- {
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- WATCHPOINT_ASSERT(write_length > 0);
- sent_length= 0;
- if (ptr->type == MEMCACHED_CONNECTION_UDP)
- increment_udp_message_id(ptr);
-
- WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
- if (with_flush)
- {
- sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
- }
- else
- {
- sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
- }
-
- if (sent_length == SOCKET_ERROR)
- {
- memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
-#if 0 // @todo I should look at why we hit this bit of code hard frequently
- WATCHPOINT_ERRNO(get_socket_errno());
- WATCHPOINT_NUMBER(get_socket_errno());
-#endif
- switch (get_socket_errno())
- {
- case ENOBUFS:
- continue;
- case EWOULDBLOCK:
-#ifdef USE_EAGAIN
- case EAGAIN:
-#endif
- {
- /*
- * We may be blocked on write because the input buffer
- * is full. Let's check if we have room in our input
- * buffer for more data and retry the write before
- * waiting..
- */
- if (repack_input_buffer(ptr) or
- process_input_buffer(ptr))
- {
- continue;
- }
-
- memcached_return_t rc= io_wait(ptr, MEM_WRITE);
- if (memcached_success(rc))
- {
- continue;
- }
- else if (rc == MEMCACHED_TIMEOUT)
- {
- *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
- return -1;
- }
-
- memcached_quit_server(ptr, true);
- *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- return -1;
- }
- case ENOTCONN:
- case EPIPE:
- default:
- memcached_quit_server(ptr, true);
- *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
- WATCHPOINT_ASSERT(ptr->fd == -1);
- return -1;
- }
- }
-
- if (ptr->type == MEMCACHED_CONNECTION_UDP and
- (size_t)sent_length != write_length)
- {
- memcached_quit_server(ptr, true);
- *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
- return -1;
- }
-
- ptr->io_bytes_sent += (uint32_t) sent_length;
-
- local_write_ptr+= sent_length;
- write_length-= (uint32_t) sent_length;
- return_length+= (uint32_t) sent_length;
- }
-
- WATCHPOINT_ASSERT(write_length == 0);
- // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
- // ptr->write_buffer_offset);
-
- // if we are a udp server, the begining of the buffer is reserverd for
- // the upd frame header
- if (ptr->type == MEMCACHED_CONNECTION_UDP)
- ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
- else
- ptr->write_buffer_offset= 0;
-
- return (ssize_t) return_length;
-}
-
/*
Eventually we will just kill off the server with the problem.
*/
return MEMCACHED_SUCCESS;
}
-/*
- * The udp request id consists of two seperate sections
- * 1) The thread id
- * 2) The message number
- * The thread id should only be set when the memcached_st struct is created
- * and should not be changed.
- *
- * The message num is incremented for each new message we send, this function
- * extracts the message number from message_id, increments it and then
- * writes the new value back into the header
- */
-static void increment_udp_message_id(memcached_server_write_instance_st ptr)
-{
- struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
- uint16_t cur_req= get_udp_datagram_request_id(header);
- int msg_num= get_msg_num_from_request_id(cur_req);
- int thread_id= get_thread_id_from_request_id(cur_req);
-
- if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
- msg_num= 0;
-
- header->request_id= htons((uint16_t) (thread_id | msg_num));
-}
-
memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
{
if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)