Update to use memcached_fatal() for purge return test.
[m6w6/libmemcached] / libmemcached / io.cc
index 2b463ec77b2a5568b8501289d276ff01c3581231..ab9c9b9265b3c5fdba2ca05f36ed38fb3d75bfec 100644 (file)
@@ -81,10 +81,10 @@ static bool repack_input_buffer(memcached_server_write_instance_st ptr)
           case EINTR:
             continue;
 
+#if EWOULDBLOCK != EAGAIN
           case EWOULDBLOCK:
-#ifdef USE_EAGAIN
-          case EAGAIN:
 #endif
+          case EAGAIN:
 #ifdef TARGET_OS_LINUX
           case ERESTART:
 #endif
@@ -131,7 +131,7 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr)
    ** We might be able to process some of the response messages if we
    ** have a callback set up
  */
-  if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
+  if (ptr->root->callbacks != NULL)
   {
     /*
      * We might have responses... try to read them out and fire
@@ -142,10 +142,8 @@ static bool process_input_buffer(memcached_server_write_instance_st ptr)
     memcached_set_processing_input((memcached_st *)ptr->root, true);
 
     char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
-    memcached_return_t error;
     memcached_st *root= (memcached_st *)ptr->root;
-    error= memcached_response(ptr, buffer, sizeof(buffer),
-                              &root->result);
+    memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result);
 
     memcached_set_processing_input(root, false);
 
@@ -195,7 +193,7 @@ static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
   if (read_or_write == MEM_WRITE)
   {
     memcached_return_t rc= memcached_purge(ptr);
-    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+    if (memcached_fatal(rc))
     {
       return MEMCACHED_FAILURE;
     }
@@ -280,11 +278,10 @@ static bool io_flush(memcached_server_write_instance_st ptr,
    ** in the purge function to avoid duplicating the logic..
  */
   {
-    memcached_return_t rc;
     WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
-    rc= memcached_purge(ptr);
+    memcached_return_t rc= memcached_purge(ptr);
 
-    if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
+    if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED)
     {
       return false;
     }
@@ -296,18 +293,6 @@ static bool io_flush(memcached_server_write_instance_st ptr,
 
   WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
 
-  // UDP Sanity check, make sure that we are not sending somthing too big
-  if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH)
-  {
-    error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
-    return false;
-  }
-
-  if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
-  {
-    return true;
-  }
-
   /* Looking for memory overflows */
 #if defined(DEBUG)
   if (write_length == MEMCACHED_MAX_BUFFER)
@@ -319,10 +304,6 @@ static bool io_flush(memcached_server_write_instance_st ptr,
   {
     WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
     WATCHPOINT_ASSERT(write_length > 0);
-    if (memcached_is_udp(ptr->root))
-    {
-      increment_udp_message_id(ptr);
-    }
 
     ssize_t sent_length= 0;
     WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
@@ -337,7 +318,6 @@ static bool io_flush(memcached_server_write_instance_st ptr,
 
     if (sent_length == SOCKET_ERROR)
     {
-      memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
 #if 0 // @todo I should look at why we hit this bit of code hard frequently
       WATCHPOINT_ERRNO(get_socket_errno());
       WATCHPOINT_NUMBER(get_socket_errno());
@@ -346,10 +326,11 @@ static bool io_flush(memcached_server_write_instance_st ptr,
       {
       case ENOBUFS:
         continue;
+
+#if EWOULDBLOCK != EAGAIN
       case EWOULDBLOCK:
-#ifdef USE_EAGAIN
-      case EAGAIN:
 #endif
+      case EAGAIN:
         {
           /*
            * We may be blocked on write because the input buffer
@@ -387,13 +368,6 @@ static bool io_flush(memcached_server_write_instance_st ptr,
       }
     }
 
-    if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length)
-    {
-      memcached_quit_server(ptr, true);
-      error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
-      return false;
-    }
-
     ptr->io_bytes_sent+= uint32_t(sent_length);
 
     local_write_ptr+= sent_length;
@@ -401,14 +375,7 @@ static bool io_flush(memcached_server_write_instance_st ptr,
   }
 
   WATCHPOINT_ASSERT(write_length == 0);
-  if (memcached_is_udp(ptr->root))
-  {
-    ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
-  }
-  else
-  {
-    ptr->write_buffer_offset= 0;
-  }
+  ptr->write_buffer_offset= 0;
 
   return true;
 }
@@ -419,8 +386,9 @@ memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_s
 }
 
 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
-                                     void *buffer, size_t length, ssize_t *nread)
+                                     void *buffer, size_t length, ssize_tnread)
 {
+  assert(memcached_is_udp(ptr->root) == false);
   assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
   char *buffer_ptr= static_cast<char *>(buffer);
 
@@ -448,10 +416,10 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
             continue;
 
           case ETIMEDOUT: // OSX
+#if EWOULDBLOCK != EAGAIN
           case EWOULDBLOCK:
-#ifdef USE_EAGAIN
-          case EAGAIN:
 #endif
+          case EAGAIN:
 #ifdef TARGET_OS_LINUX
           case ERESTART:
 #endif
@@ -475,7 +443,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
           default:
             {
               memcached_quit_server(ptr, true);
-              *nread= -1;
+              nread= -1;
               return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
             }
           }
@@ -493,8 +461,8 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
           */
           WATCHPOINT_STRING("We had a zero length recv()");
           memcached_quit_server(ptr, true);
-          *nread= -1;
-          return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT, 
+          nread= -1;
+          return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT, 
                                      memcached_literal_param("::rec() returned zero, server has disconnected"));
         }
       } while (data_read <= 0);
@@ -527,7 +495,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
     }
   }
 
-  *nread = (ssize_t)(buffer_ptr - (char*)buffer);
+  nread= ssize_t(buffer_ptr - (char*)buffer);
 
   return MEMCACHED_SUCCESS;
 }
@@ -535,6 +503,7 @@ memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
 memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
 {
   assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
+  assert(memcached_is_udp(ptr->root) == false);
 
   if (ptr->fd == INVALID_SOCKET)
   {
@@ -555,10 +524,10 @@ memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
         continue;
 
       case ETIMEDOUT: // OSX
+#if EWOULDBLOCK != EAGAIN
       case EWOULDBLOCK:
-#ifdef USE_EAGAIN
-      case EAGAIN:
 #endif
+      case EAGAIN:
 #ifdef TARGET_OS_LINUX
       case ERESTART:
 #endif
@@ -592,6 +561,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr,
                          const void *buffer, size_t length, bool with_flush)
 {
   WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
+  assert(memcached_is_udp(ptr->root) == false);
 
   size_t original_length= length;
   const char *buffer_ptr= static_cast<const char *>(buffer);
@@ -599,25 +569,9 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr,
   while (length)
   {
     char *write_ptr;
-    size_t should_write;
-    size_t buffer_end;
-
-    if (memcached_is_udp(ptr->root))
-    {
-      //UDP does not support partial writes
-      buffer_end= MAX_UDP_DATAGRAM_LENGTH;
-      should_write= length;
-      if (ptr->write_buffer_offset + should_write > buffer_end)
-      {
-        return -1;
-      }
-    }
-    else
-    {
-      buffer_end= MEMCACHED_MAX_BUFFER;
-      should_write= buffer_end - ptr->write_buffer_offset;
-      should_write= (should_write < length) ? should_write : length;
-    }
+    size_t buffer_end= MEMCACHED_MAX_BUFFER;
+    size_t should_write= buffer_end -ptr->write_buffer_offset;
+    should_write= (should_write < length) ? should_write : length;
 
     write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
     memcpy(write_ptr, buffer_ptr, should_write);
@@ -625,7 +579,7 @@ static ssize_t _io_write(memcached_server_write_instance_st ptr,
     buffer_ptr+= should_write;
     length-= should_write;
 
-    if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false)
+    if (ptr->write_buffer_offset == buffer_end)
     {
       WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
 
@@ -661,21 +615,9 @@ ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
   return _io_write(ptr, buffer, length, with_flush);
 }
 
-size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of)
-{
-  ssize_t total= 0;
-
-  for (size_t x= 0; x < number_of; x++)
-  {
-    total+= vector->length;
-  }
-
-  return total;
-}
-
 ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
-                            libmemcached_io_vector_st *vector,
-                            size_t number_of, bool with_flush)
+                            libmemcached_io_vector_st vector[],
+                            const size_t number_of, const bool with_flush)
 {
   ssize_t total= 0;
 
@@ -732,9 +674,9 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
 {
 #define MAX_SERVERS_TO_POLL 100
   struct pollfd fds[MAX_SERVERS_TO_POLL];
-  unsigned int host_index= 0;
+  nfds_t host_index= 0;
 
-  for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
+  for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x)
   {
     memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
 
@@ -779,17 +721,18 @@ memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st
     break;
 
   default:
-    for (size_t x= 0; x < host_index; ++x)
+    for (nfds_t x= 0; x < host_index; ++x)
     {
       if (fds[x].revents & POLLIN)
       {
         for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
         {
-          memcached_server_write_instance_st instance=
-            memcached_server_instance_fetch(memc, y);
+          memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y);
 
           if (instance->fd == fds[x].fd)
+          {
             return instance;
+          }
         }
       }
     }
@@ -822,14 +765,14 @@ memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
     ssize_t nread;
     memcached_return_t rc;
 
-    while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
+    while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, nread))) { };
 
     if (memcached_failed(rc))
     {
       return rc;
     }
 
-    offset+= (size_t) nread;
+    offset+= size_t(nread);
   }
 
   return MEMCACHED_SUCCESS;
@@ -853,7 +796,7 @@ memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
        * the logic.
      */
       ssize_t nread;
-      memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
+      memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, nread);
       if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
       {
         memcached_quit_server(ptr, true);