Shuffle headers so that we don't need prototypes.
authorBrian Aker <brian@tangent.org>
Fri, 29 Jul 2011 16:33:01 +0000 (09:33 -0700)
committerBrian Aker <brian@tangent.org>
Fri, 29 Jul 2011 16:33:01 +0000 (09:33 -0700)
libmemcached/io.cc

index c04deb30502001570333bfc82f24a94ee57d0e6b..f706048e71341289add79a30931d529c5825de03 100644 (file)
@@ -44,10 +44,152 @@ enum memc_read_or_write {
   MEM_WRITE
 };
 
-static ssize_t io_flush(memcached_server_write_instance_st ptr,
-                        const bool with_flush,
-                        memcached_return_t *error);
-static void increment_udp_message_id(memcached_server_write_instance_st ptr);
+/*
+ * The udp request id consists of two seperate sections
+ *   1) The thread id
+ *   2) The message number
+ * The thread id should only be set when the memcached_st struct is created
+ * and should not be changed.
+ *
+ * The message num is incremented for each new message we send, this function
+ * extracts the message number from message_id, increments it and then
+ * writes the new value back into the header
+ */
+static void increment_udp_message_id(memcached_server_write_instance_st ptr)
+{
+  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
+  uint16_t cur_req= get_udp_datagram_request_id(header);
+  int msg_num= get_msg_num_from_request_id(cur_req);
+  int thread_id= get_thread_id_from_request_id(cur_req);
+
+  if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
+    msg_num= 0;
+
+  header->request_id= htons((uint16_t) (thread_id | msg_num));
+}
+
+/**
+ * Try to fill the input buffer for a server with as much
+ * data as possible.
+ *
+ * @param ptr the server to pack
+ */
+static bool repack_input_buffer(memcached_server_write_instance_st ptr)
+{
+  if (ptr->read_ptr != ptr->read_buffer)
+  {
+    /* Move all of the data to the beginning of the buffer so
+     ** that we can fit more data into the buffer...
+   */
+    memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
+    ptr->read_ptr= ptr->read_buffer;
+    ptr->read_data_length= ptr->read_buffer_length;
+  }
+
+  /* There is room in the buffer, try to fill it! */
+  if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
+  {
+    do {
+      /* Just try a single read to grab what's available */
+      ssize_t nr= recv(ptr->fd,
+                       ptr->read_ptr + ptr->read_data_length,
+                       MEMCACHED_MAX_BUFFER - ptr->read_data_length,
+                       MSG_DONTWAIT);
+
+      switch (nr)
+      {
+      case SOCKET_ERROR:
+        {
+          switch (get_socket_errno())
+          {
+          case EINTR:
+            continue;
+
+          case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+          case EAGAIN:
+#endif
+#ifdef TARGET_OS_LINUX
+          case ERESTART:
+#endif
+            break; // No IO is fine, we can just move on
+
+          default:
+            memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+          }
+        }
+        break;
+
+      case 0: // Shutdown on the socket has occurred
+        {
+          memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
+        }
+        break;
+
+      default:
+        {
+          ptr->read_data_length+= size_t(nr);
+          ptr->read_buffer_length+= size_t(nr);
+          return true;
+        }
+        break;
+      }
+    } while (0);
+  }
+  return false;
+}
+
+/**
+ * If the we have callbacks connected to this server structure
+ * we may start process the input queue and fire the callbacks
+ * for the incomming messages. This function is _only_ called
+ * when the input buffer is full, so that we _know_ that we have
+ * at least _one_ message to process.
+ *
+ * @param ptr the server to star processing iput messages for
+ * @return true if we processed anything, false otherwise
+ */
+static bool process_input_buffer(memcached_server_write_instance_st ptr)
+{
+  /*
+   ** We might be able to process some of the response messages if we
+   ** have a callback set up
+ */
+  if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
+  {
+    /*
+     * We might have responses... try to read them out and fire
+     * callbacks
+   */
+    memcached_callback_st cb= *ptr->root->callbacks;
+
+    memcached_set_processing_input((memcached_st *)ptr->root, true);
+
+    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
+    memcached_return_t error;
+    memcached_st *root= (memcached_st *)ptr->root;
+    error= memcached_response(ptr, buffer, sizeof(buffer),
+                              &root->result);
+
+    memcached_set_processing_input(root, false);
+
+    if (error == MEMCACHED_SUCCESS)
+    {
+      for (unsigned int x= 0; x < cb.number_of_callback; x++)
+      {
+        error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
+        if (error != MEMCACHED_SUCCESS)
+          break;
+      }
+
+      /* @todo what should I do with the error message??? */
+    }
+    /* @todo what should I do with other error messages?? */
+    return true;
+  }
+
+  return false;
+}
 
 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
                                   memc_read_or_write read_or_write)
@@ -145,132 +287,158 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
   return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
 }
 
-memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+static ssize_t io_flush(memcached_server_write_instance_st ptr,
+                        const bool with_flush,
+                        memcached_return_t *error)
 {
-  return io_wait(ptr, MEM_WRITE);
-}
-
-/**
- * Try to fill the input buffer for a server with as much
- * data as possible.
- *
- * @param ptr the server to pack
+  /*
+   ** We might want to purge the input buffer if we haven't consumed
+   ** any output yet... The test for the limits is the purge is inline
+   ** in the purge function to avoid duplicating the logic..
  */
-static bool repack_input_buffer(memcached_server_write_instance_st ptr)
-{
-  if (ptr->read_ptr != ptr->read_buffer)
   {
-    /* Move all of the data to the beginning of the buffer so
-     ** that we can fit more data into the buffer...
-   */
-    memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
-    ptr->read_ptr= ptr->read_buffer;
-    ptr->read_data_length= ptr->read_buffer_length;
+    memcached_return_t rc;
+    WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+    rc= memcached_purge(ptr);
+
+    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+    {
+      return -1;
+    }
   }
+  ssize_t sent_length;
+  size_t return_length;
+  char *local_write_ptr= ptr->write_buffer;
+  size_t write_length= ptr->write_buffer_offset;
 
-  /* There is room in the buffer, try to fill it! */
-  if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
+  *error= MEMCACHED_SUCCESS;
+
+  WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+
+  // UDP Sanity check, make sure that we are not sending somthing too big
+  if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
   {
-    do {
-      /* Just try a single read to grab what's available */
-      ssize_t nr= recv(ptr->fd,
-                       ptr->read_ptr + ptr->read_data_length,
-                       MEMCACHED_MAX_BUFFER - ptr->read_data_length,
-                       MSG_DONTWAIT);
+    *error= MEMCACHED_WRITE_FAILURE;
+    return -1;
+  }
 
-      switch (nr)
+  if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
+                                        && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
+  {
+    return 0;
+  }
+
+  /* Looking for memory overflows */
+#if defined(DEBUG)
+  if (write_length == MEMCACHED_MAX_BUFFER)
+    WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
+  WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
+#endif
+
+  return_length= 0;
+  while (write_length)
+  {
+    WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+    WATCHPOINT_ASSERT(write_length > 0);
+    sent_length= 0;
+    if (ptr->type == MEMCACHED_CONNECTION_UDP)
+      increment_udp_message_id(ptr);
+
+    WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+    if (with_flush)
+    {
+      sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
+    }
+    else
+    {
+      sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
+    }
+
+    if (sent_length == SOCKET_ERROR)
+    {
+      memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+#if 0 // @todo I should look at why we hit this bit of code hard frequently
+      WATCHPOINT_ERRNO(get_socket_errno());
+      WATCHPOINT_NUMBER(get_socket_errno());
+#endif
+      switch (get_socket_errno())
       {
-      case SOCKET_ERROR:
+      case ENOBUFS:
+        continue;
+      case EWOULDBLOCK:
+#ifdef USE_EAGAIN
+      case EAGAIN:
+#endif
         {
-          switch (get_socket_errno())
+          /*
+           * We may be blocked on write because the input buffer
+           * is full. Let's check if we have room in our input
+           * buffer for more data and retry the write before
+           * waiting..
+         */
+          if (repack_input_buffer(ptr) or
+              process_input_buffer(ptr))
           {
-          case EINTR:
             continue;
-
-          case EWOULDBLOCK:
-#ifdef USE_EAGAIN
-          case EAGAIN:
-#endif
-#ifdef TARGET_OS_LINUX
-          case ERESTART:
-#endif
-            break; // No IO is fine, we can just move on
-
-          default:
-            memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
           }
-        }
-        break;
 
-      case 0: // Shutdown on the socket has occurred
-        {
-          memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
-        }
-        break;
+          memcached_return_t rc= io_wait(ptr, MEM_WRITE);
+          if (memcached_success(rc))
+          {
+            continue;
+          }
+          else if (rc == MEMCACHED_TIMEOUT)
+          {
+            *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
+            return -1;
+          }
 
-      default:
-        {
-          ptr->read_data_length+= size_t(nr);
-          ptr->read_buffer_length+= size_t(nr);
-          return true;
+          memcached_quit_server(ptr, true);
+          *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+          return -1;
         }
-        break;
+      case ENOTCONN:
+      case EPIPE:
+      default:
+        memcached_quit_server(ptr, true);
+        *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
+        WATCHPOINT_ASSERT(ptr->fd == -1);
+        return -1;
       }
-    } while (0);
-  }
-  return false;
-}
+    }
 
-/**
- * If the we have callbacks connected to this server structure
- * we may start process the input queue and fire the callbacks
- * for the incomming messages. This function is _only_ called
- * when the input buffer is full, so that we _know_ that we have
- * at least _one_ message to process.
- *
- * @param ptr the server to star processing iput messages for
- * @return true if we processed anything, false otherwise
- */
-static bool process_input_buffer(memcached_server_write_instance_st ptr)
-{
-  /*
-   ** We might be able to process some of the response messages if we
-   ** have a callback set up
- */
-  if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
-  {
-    /*
-     * We might have responses... try to read them out and fire
-     * callbacks
-   */
-    memcached_callback_st cb= *ptr->root->callbacks;
+    if (ptr->type == MEMCACHED_CONNECTION_UDP and
+        (size_t)sent_length != write_length)
+    {
+      memcached_quit_server(ptr, true);
+      *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
+      return -1;
+    }
 
-    memcached_set_processing_input((memcached_st *)ptr->root, true);
+    ptr->io_bytes_sent += (uint32_t) sent_length;
 
-    char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
-    memcached_return_t error;
-    memcached_st *root= (memcached_st *)ptr->root;
-    error= memcached_response(ptr, buffer, sizeof(buffer),
-                              &root->result);
+    local_write_ptr+= sent_length;
+    write_length-= (uint32_t) sent_length;
+    return_length+= (uint32_t) sent_length;
+  }
 
-    memcached_set_processing_input(root, false);
+  WATCHPOINT_ASSERT(write_length == 0);
+  // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
+  // ptr->write_buffer_offset);
 
-    if (error == MEMCACHED_SUCCESS)
-    {
-      for (unsigned int x= 0; x < cb.number_of_callback; x++)
-      {
-        error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
-        if (error != MEMCACHED_SUCCESS)
-          break;
-      }
+  // if we are a udp server, the begining of the buffer is reserverd for
+  // the upd frame header
+  if (ptr->type == MEMCACHED_CONNECTION_UDP)
+    ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
+  else
+    ptr->write_buffer_offset= 0;
 
-      /* @todo what should I do with the error message??? */
-    }
-    /* @todo what should I do with other error messages?? */
-    return true;
-  }
+  return (ssize_t) return_length;
+}
 
-  return false;
+memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
+{
+  return io_wait(ptr, MEM_WRITE);
 }
 
 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
@@ -639,155 +807,6 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
   return NULL;
 }
 
-static ssize_t io_flush(memcached_server_write_instance_st ptr,
-                        const bool with_flush,
-                        memcached_return_t *error)
-{
-  /*
-   ** We might want to purge the input buffer if we haven't consumed
-   ** any output yet... The test for the limits is the purge is inline
-   ** in the purge function to avoid duplicating the logic..
- */
-  {
-    memcached_return_t rc;
-    WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-    rc= memcached_purge(ptr);
-
-    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
-    {
-      return -1;
-    }
-  }
-  ssize_t sent_length;
-  size_t return_length;
-  char *local_write_ptr= ptr->write_buffer;
-  size_t write_length= ptr->write_buffer_offset;
-
-  *error= MEMCACHED_SUCCESS;
-
-  WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-
-  // UDP Sanity check, make sure that we are not sending somthing too big
-  if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
-  {
-    *error= MEMCACHED_WRITE_FAILURE;
-    return -1;
-  }
-
-  if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
-                                        && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
-  {
-    return 0;
-  }
-
-  /* Looking for memory overflows */
-#if defined(DEBUG)
-  if (write_length == MEMCACHED_MAX_BUFFER)
-    WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
-  WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
-#endif
-
-  return_length= 0;
-  while (write_length)
-  {
-    WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-    WATCHPOINT_ASSERT(write_length > 0);
-    sent_length= 0;
-    if (ptr->type == MEMCACHED_CONNECTION_UDP)
-      increment_udp_message_id(ptr);
-
-    WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-    if (with_flush)
-    {
-      sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
-    }
-    else
-    {
-      sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
-    }
-
-    if (sent_length == SOCKET_ERROR)
-    {
-      memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
-#if 0 // @todo I should look at why we hit this bit of code hard frequently
-      WATCHPOINT_ERRNO(get_socket_errno());
-      WATCHPOINT_NUMBER(get_socket_errno());
-#endif
-      switch (get_socket_errno())
-      {
-      case ENOBUFS:
-        continue;
-      case EWOULDBLOCK:
-#ifdef USE_EAGAIN
-      case EAGAIN:
-#endif
-        {
-          /*
-           * We may be blocked on write because the input buffer
-           * is full. Let's check if we have room in our input
-           * buffer for more data and retry the write before
-           * waiting..
-         */
-          if (repack_input_buffer(ptr) or
-              process_input_buffer(ptr))
-          {
-            continue;
-          }
-
-          memcached_return_t rc= io_wait(ptr, MEM_WRITE);
-          if (memcached_success(rc))
-          {
-            continue;
-          }
-          else if (rc == MEMCACHED_TIMEOUT)
-          {
-            *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
-            return -1;
-          }
-
-          memcached_quit_server(ptr, true);
-          *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
-          return -1;
-        }
-      case ENOTCONN:
-      case EPIPE:
-      default:
-        memcached_quit_server(ptr, true);
-        *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
-        WATCHPOINT_ASSERT(ptr->fd == -1);
-        return -1;
-      }
-    }
-
-    if (ptr->type == MEMCACHED_CONNECTION_UDP and
-        (size_t)sent_length != write_length)
-    {
-      memcached_quit_server(ptr, true);
-      *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
-      return -1;
-    }
-
-    ptr->io_bytes_sent += (uint32_t) sent_length;
-
-    local_write_ptr+= sent_length;
-    write_length-= (uint32_t) sent_length;
-    return_length+= (uint32_t) sent_length;
-  }
-
-  WATCHPOINT_ASSERT(write_length == 0);
-  // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
-  // ptr->write_buffer_offset);
-
-  // if we are a udp server, the begining of the buffer is reserverd for
-  // the upd frame header
-  if (ptr->type == MEMCACHED_CONNECTION_UDP)
-    ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
-  else
-    ptr->write_buffer_offset= 0;
-
-  return (ssize_t) return_length;
-}
-
 /*
   Eventually we will just kill off the server with the problem.
 */
@@ -879,30 +898,6 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
   return MEMCACHED_SUCCESS;
 }
 
-/*
- * The udp request id consists of two seperate sections
- *   1) The thread id
- *   2) The message number
- * The thread id should only be set when the memcached_st struct is created
- * and should not be changed.
- *
- * The message num is incremented for each new message we send, this function
- * extracts the message number from message_id, increments it and then
- * writes the new value back into the header
- */
-static void increment_udp_message_id(memcached_server_write_instance_st ptr)
-{
-  struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
-  uint16_t cur_req= get_udp_datagram_request_id(header);
-  int msg_num= get_msg_num_from_request_id(cur_req);
-  int thread_id= get_thread_id_from_request_id(cur_req);
-
-  if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
-    msg_num= 0;
-
-  header->request_id= htons((uint16_t) (thread_id | msg_num));
-}
-
 memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
 {
   if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)