Update
[m6w6/libmemcached] / libmemcached / io.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * LibMemcached
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40 #include <libmemcached/common.h>
41
42 enum memc_read_or_write {
43 MEM_READ,
44 MEM_WRITE
45 };
46
47 /**
48 * Try to fill the input buffer for a server with as much
49 * data as possible.
50 *
51 * @param ptr the server to pack
52 */
53 static bool repack_input_buffer(memcached_server_write_instance_st ptr)
54 {
55 if (ptr->read_ptr != ptr->read_buffer)
56 {
57 /* Move all of the data to the beginning of the buffer so
58 ** that we can fit more data into the buffer...
59 */
60 memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
61 ptr->read_ptr= ptr->read_buffer;
62 ptr->read_data_length= ptr->read_buffer_length;
63 }
64
65 /* There is room in the buffer, try to fill it! */
66 if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
67 {
68 do {
69 /* Just try a single read to grab what's available */
70 ssize_t nr= recv(ptr->fd,
71 ptr->read_ptr + ptr->read_data_length,
72 MEMCACHED_MAX_BUFFER - ptr->read_data_length,
73 MSG_DONTWAIT);
74
75 switch (nr)
76 {
77 case SOCKET_ERROR:
78 {
79 switch (get_socket_errno())
80 {
81 case EINTR:
82 continue;
83
84 #if EWOULDBLOCK != EAGAIN
85 case EWOULDBLOCK:
86 #endif
87 case EAGAIN:
88 #ifdef TARGET_OS_LINUX
89 case ERESTART:
90 #endif
91 break; // No IO is fine, we can just move on
92
93 default:
94 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
95 }
96 }
97 break;
98
99 case 0: // Shutdown on the socket has occurred
100 {
101 memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
102 }
103 break;
104
105 default:
106 {
107 ptr->read_data_length+= size_t(nr);
108 ptr->read_buffer_length+= size_t(nr);
109 return true;
110 }
111 break;
112 }
113 } while (0);
114 }
115 return false;
116 }
117
118 /**
119 * If the we have callbacks connected to this server structure
120 * we may start process the input queue and fire the callbacks
121 * for the incomming messages. This function is _only_ called
122 * when the input buffer is full, so that we _know_ that we have
123 * at least _one_ message to process.
124 *
125 * @param ptr the server to star processing iput messages for
126 * @return true if we processed anything, false otherwise
127 */
128 static bool process_input_buffer(memcached_server_write_instance_st ptr)
129 {
130 /*
131 ** We might be able to process some of the response messages if we
132 ** have a callback set up
133 */
134 if (ptr->root->callbacks != NULL)
135 {
136 /*
137 * We might have responses... try to read them out and fire
138 * callbacks
139 */
140 memcached_callback_st cb= *ptr->root->callbacks;
141
142 memcached_set_processing_input((memcached_st *)ptr->root, true);
143
144 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
145 memcached_st *root= (memcached_st *)ptr->root;
146 memcached_return_t error= memcached_response(ptr, buffer, sizeof(buffer), &root->result);
147
148 memcached_set_processing_input(root, false);
149
150 if (error == MEMCACHED_SUCCESS)
151 {
152 for (unsigned int x= 0; x < cb.number_of_callback; x++)
153 {
154 error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
155 if (error != MEMCACHED_SUCCESS)
156 break;
157 }
158
159 /* @todo what should I do with the error message??? */
160 }
161 /* @todo what should I do with other error messages?? */
162 return true;
163 }
164
165 return false;
166 }
167
168 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
169 const memc_read_or_write read_or_write)
170 {
171 struct pollfd fds;
172 fds.fd= ptr->fd;
173 fds.events= POLLIN;
174
175 if (read_or_write == MEM_WRITE) /* write */
176 {
177 fds.events= POLLOUT;
178 WATCHPOINT_SET(ptr->io_wait_count.write++);
179 }
180 else
181 {
182 WATCHPOINT_SET(ptr->io_wait_count.read++);
183 }
184
185 /*
186 ** We are going to block on write, but at least on Solaris we might block
187 ** on write if we haven't read anything from our input buffer..
188 ** Try to purge the input buffer if we don't do any flow control in the
189 ** application layer (just sending a lot of data etc)
190 ** The test is moved down in the purge function to avoid duplication of
191 ** the test.
192 */
193 if (read_or_write == MEM_WRITE)
194 {
195 if (memcached_fatal(memcached_purge(ptr)))
196 {
197 return MEMCACHED_FAILURE;
198 }
199 }
200
201 if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
202 {
203 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
204 }
205
206 size_t loop_max= 5;
207 while (--loop_max) // While loop is for ERESTART or EINTR
208 {
209 int error= poll(&fds, 1, ptr->root->poll_timeout);
210 switch (error)
211 {
212 case 1: // Success!
213 WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
214 WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
215
216 return MEMCACHED_SUCCESS;
217
218 case 0: // Timeout occured, we let the while() loop do its thing.
219 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
220
221 default:
222 WATCHPOINT_ERRNO(get_socket_errno());
223 switch (get_socket_errno())
224 {
225 #ifdef TARGET_OS_LINUX
226 case ERESTART:
227 #endif
228 case EINTR:
229 break;
230
231 case EFAULT:
232 case ENOMEM:
233 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
234
235 case EINVAL:
236 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
237
238 default:
239 if (fds.revents & POLLERR)
240 {
241 int err;
242 socklen_t len= sizeof (err);
243 if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
244 {
245 if (err == 0)
246 {
247 continue;
248 }
249 errno= err;
250 }
251 }
252 else
253 {
254 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
255 }
256 int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
257 memcached_quit_server(ptr, true);
258
259 return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
260 }
261 }
262 }
263
264 memcached_quit_server(ptr, true);
265
266 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
267 }
268
269 static bool io_flush(memcached_server_write_instance_st ptr,
270 const bool with_flush,
271 memcached_return_t& error)
272 {
273 /*
274 ** We might want to purge the input buffer if we haven't consumed
275 ** any output yet... The test for the limits is the purge is inline
276 ** in the purge function to avoid duplicating the logic..
277 */
278 {
279 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
280 memcached_return_t rc= memcached_purge(ptr);
281
282 if (rc != MEMCACHED_SUCCESS and rc != MEMCACHED_STORED)
283 {
284 return false;
285 }
286 }
287 char *local_write_ptr= ptr->write_buffer;
288 size_t write_length= ptr->write_buffer_offset;
289
290 error= MEMCACHED_SUCCESS;
291
292 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
293
294 /* Looking for memory overflows */
295 #if defined(DEBUG)
296 if (write_length == MEMCACHED_MAX_BUFFER)
297 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
298 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
299 #endif
300
301 while (write_length)
302 {
303 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
304 WATCHPOINT_ASSERT(write_length > 0);
305
306 ssize_t sent_length= 0;
307 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
308 if (with_flush)
309 {
310 sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
311 }
312 else
313 {
314 sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
315 }
316
317 if (sent_length == SOCKET_ERROR)
318 {
319 #if 0 // @todo I should look at why we hit this bit of code hard frequently
320 WATCHPOINT_ERRNO(get_socket_errno());
321 WATCHPOINT_NUMBER(get_socket_errno());
322 #endif
323 switch (get_socket_errno())
324 {
325 case ENOBUFS:
326 continue;
327
328 #if EWOULDBLOCK != EAGAIN
329 case EWOULDBLOCK:
330 #endif
331 case EAGAIN:
332 {
333 /*
334 * We may be blocked on write because the input buffer
335 * is full. Let's check if we have room in our input
336 * buffer for more data and retry the write before
337 * waiting..
338 */
339 if (repack_input_buffer(ptr) or process_input_buffer(ptr))
340 {
341 continue;
342 }
343
344 memcached_return_t rc= io_wait(ptr, MEM_WRITE);
345 if (memcached_success(rc))
346 {
347 continue;
348 }
349 else if (rc == MEMCACHED_TIMEOUT)
350 {
351 error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
352 return false;
353 }
354
355 memcached_quit_server(ptr, true);
356 error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
357 return false;
358 }
359 case ENOTCONN:
360 case EPIPE:
361 default:
362 memcached_quit_server(ptr, true);
363 error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
364 WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
365 return false;
366 }
367 }
368
369 ptr->io_bytes_sent+= uint32_t(sent_length);
370
371 local_write_ptr+= sent_length;
372 write_length-= uint32_t(sent_length);
373 }
374
375 WATCHPOINT_ASSERT(write_length == 0);
376 ptr->write_buffer_offset= 0;
377
378 return true;
379 }
380
381 memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
382 {
383 return io_wait(ptr, MEM_WRITE);
384 }
385
386 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
387 void *buffer, size_t length, ssize_t& nread)
388 {
389 assert(memcached_is_udp(ptr->root) == false);
390 assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
391 char *buffer_ptr= static_cast<char *>(buffer);
392
393 if (ptr->fd == INVALID_SOCKET)
394 {
395 #if 0
396 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
397 #endif
398 return MEMCACHED_CONNECTION_FAILURE;
399 }
400
401 while (length)
402 {
403 if (not ptr->read_buffer_length)
404 {
405 ssize_t data_read;
406 do
407 {
408 data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
409 if (data_read == SOCKET_ERROR)
410 {
411 switch (get_socket_errno())
412 {
413 case EINTR: // We just retry
414 continue;
415
416 case ETIMEDOUT: // OSX
417 #if EWOULDBLOCK != EAGAIN
418 case EWOULDBLOCK:
419 #endif
420 case EAGAIN:
421 #ifdef TARGET_OS_LINUX
422 case ERESTART:
423 #endif
424 if (memcached_success(io_wait(ptr, MEM_READ)))
425 {
426 continue;
427 }
428 return MEMCACHED_IN_PROGRESS;
429
430 /* fall through */
431
432 case ENOTCONN: // Programmer Error
433 WATCHPOINT_ASSERT(0);
434 case ENOTSOCK:
435 WATCHPOINT_ASSERT(0);
436 case EBADF:
437 assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket");
438 case EINVAL:
439 case EFAULT:
440 case ECONNREFUSED:
441 default:
442 {
443 memcached_quit_server(ptr, true);
444 nread= -1;
445 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
446 }
447 }
448 }
449 else if (data_read == 0)
450 {
451 /*
452 EOF. Any data received so far is incomplete
453 so discard it. This always reads by byte in case of TCP
454 and protocol enforcement happens at memcached_response()
455 looking for '\n'. We do not care for UDB which requests 8 bytes
456 at once. Generally, this means that connection went away. Since
457 for blocking I/O we do not return 0 and for non-blocking case
458 it will return EGAIN if data is not immediatly available.
459 */
460 WATCHPOINT_STRING("We had a zero length recv()");
461 memcached_quit_server(ptr, true);
462 nread= -1;
463 return memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
464 memcached_literal_param("::rec() returned zero, server has disconnected"));
465 }
466 } while (data_read <= 0);
467
468 ptr->io_bytes_sent = 0;
469 ptr->read_data_length= (size_t) data_read;
470 ptr->read_buffer_length= (size_t) data_read;
471 ptr->read_ptr= ptr->read_buffer;
472 }
473
474 if (length > 1)
475 {
476 size_t difference;
477
478 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
479
480 memcpy(buffer_ptr, ptr->read_ptr, difference);
481 length -= difference;
482 ptr->read_ptr+= difference;
483 ptr->read_buffer_length-= difference;
484 buffer_ptr+= difference;
485 }
486 else
487 {
488 *buffer_ptr= *ptr->read_ptr;
489 ptr->read_ptr++;
490 ptr->read_buffer_length--;
491 buffer_ptr++;
492 break;
493 }
494 }
495
496 nread= ssize_t(buffer_ptr - (char*)buffer);
497
498 return MEMCACHED_SUCCESS;
499 }
500
501 memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
502 {
503 assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
504 assert(memcached_is_udp(ptr->root) == false);
505
506 if (ptr->fd == INVALID_SOCKET)
507 {
508 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state");
509 return MEMCACHED_CONNECTION_FAILURE;
510 }
511
512 ssize_t data_read;
513 char buffer[MEMCACHED_MAX_BUFFER];
514 do
515 {
516 data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
517 if (data_read == SOCKET_ERROR)
518 {
519 switch (get_socket_errno())
520 {
521 case EINTR: // We just retry
522 continue;
523
524 case ETIMEDOUT: // OSX
525 #if EWOULDBLOCK != EAGAIN
526 case EWOULDBLOCK:
527 #endif
528 case EAGAIN:
529 #ifdef TARGET_OS_LINUX
530 case ERESTART:
531 #endif
532 if (memcached_success(io_wait(ptr, MEM_READ)))
533 {
534 continue;
535 }
536 return MEMCACHED_IN_PROGRESS;
537
538 /* fall through */
539
540 case ENOTCONN: // Programmer Error
541 WATCHPOINT_ASSERT(0);
542 case ENOTSOCK:
543 WATCHPOINT_ASSERT(0);
544 case EBADF:
545 assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state");
546 case EINVAL:
547 case EFAULT:
548 case ECONNREFUSED:
549 default:
550 return MEMCACHED_CONNECTION_FAILURE; // We want this!
551 }
552 }
553 } while (data_read > 0);
554
555 return MEMCACHED_CONNECTION_FAILURE;
556 }
557
558 static ssize_t _io_write(memcached_server_write_instance_st ptr,
559 const void *buffer, size_t length, bool with_flush)
560 {
561 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
562 assert(memcached_is_udp(ptr->root) == false);
563
564 size_t original_length= length;
565 const char *buffer_ptr= static_cast<const char *>(buffer);
566
567 while (length)
568 {
569 char *write_ptr;
570 size_t buffer_end= MEMCACHED_MAX_BUFFER;
571 size_t should_write= buffer_end -ptr->write_buffer_offset;
572 should_write= (should_write < length) ? should_write : length;
573
574 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
575 memcpy(write_ptr, buffer_ptr, should_write);
576 ptr->write_buffer_offset+= should_write;
577 buffer_ptr+= should_write;
578 length-= should_write;
579
580 if (ptr->write_buffer_offset == buffer_end)
581 {
582 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
583
584 memcached_return_t rc;
585 if (io_flush(ptr, with_flush, rc) == false)
586 {
587 return -1;
588 }
589 }
590 }
591
592 if (with_flush)
593 {
594 memcached_return_t rc;
595 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
596 if (io_flush(ptr, with_flush, rc) == false)
597 {
598 return -1;
599 }
600 }
601
602 return (ssize_t) original_length;
603 }
604
605 bool memcached_io_write(memcached_server_write_instance_st ptr)
606 {
607 return (_io_write(ptr, NULL, 0, true) >= 0);
608 }
609
610 ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
611 const void *buffer, const size_t length, const bool with_flush)
612 {
613 return _io_write(ptr, buffer, length, with_flush);
614 }
615
616 ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
617 libmemcached_io_vector_st vector[],
618 const size_t number_of, const bool with_flush)
619 {
620 ssize_t total= 0;
621
622 for (size_t x= 0; x < number_of; x++, vector++)
623 {
624 ssize_t returnable;
625
626 if (vector->length)
627 {
628 if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
629 {
630 return -1;
631 }
632 total+= returnable;
633 }
634 }
635
636 if (with_flush)
637 {
638 if (memcached_io_write(ptr) == false)
639 {
640 return -1;
641 }
642 }
643
644 return total;
645 }
646
647
648 void memcached_io_close(memcached_server_write_instance_st ptr)
649 {
650 if (ptr->fd == INVALID_SOCKET)
651 {
652 return;
653 }
654
655 /* in case of death shutdown to avoid blocking at close() */
656 if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
657 {
658 WATCHPOINT_NUMBER(ptr->fd);
659 WATCHPOINT_ERRNO(get_socket_errno());
660 WATCHPOINT_ASSERT(get_socket_errno());
661 }
662
663 if (closesocket(ptr->fd) == SOCKET_ERROR)
664 {
665 WATCHPOINT_ERRNO(get_socket_errno());
666 }
667 ptr->state= MEMCACHED_SERVER_STATE_NEW;
668 ptr->fd= INVALID_SOCKET;
669 }
670
671 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
672 {
673 #define MAX_SERVERS_TO_POLL 100
674 struct pollfd fds[MAX_SERVERS_TO_POLL];
675 nfds_t host_index= 0;
676
677 for (uint32_t x= 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x)
678 {
679 memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
680
681 if (instance->read_buffer_length > 0) /* I have data in the buffer */
682 {
683 return instance;
684 }
685
686 if (memcached_server_response_count(instance) > 0)
687 {
688 fds[host_index].events = POLLIN;
689 fds[host_index].revents = 0;
690 fds[host_index].fd = instance->fd;
691 ++host_index;
692 }
693 }
694
695 if (host_index < 2)
696 {
697 /* We have 0 or 1 server with pending events.. */
698 for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
699 {
700 memcached_server_write_instance_st instance=
701 memcached_server_instance_fetch(memc, x);
702
703 if (memcached_server_response_count(instance) > 0)
704 {
705 return instance;
706 }
707 }
708
709 return NULL;
710 }
711
712 int error= poll(fds, host_index, memc->poll_timeout);
713 switch (error)
714 {
715 case -1:
716 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
717 /* FALLTHROUGH */
718 case 0:
719 break;
720
721 default:
722 for (nfds_t x= 0; x < host_index; ++x)
723 {
724 if (fds[x].revents & POLLIN)
725 {
726 for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
727 {
728 memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, y);
729
730 if (instance->fd == fds[x].fd)
731 {
732 return instance;
733 }
734 }
735 }
736 }
737 }
738
739 return NULL;
740 }
741
742 /*
743 Eventually we will just kill off the server with the problem.
744 */
745 void memcached_io_reset(memcached_server_write_instance_st ptr)
746 {
747 memcached_quit_server(ptr, true);
748 }
749
750 /**
751 * Read a given number of bytes from the server and place it into a specific
752 * buffer. Reset the IO channel on this server if an error occurs.
753 */
754 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
755 void *dta,
756 const size_t size)
757 {
758 size_t offset= 0;
759 char *data= static_cast<char *>(dta);
760
761 while (offset < size)
762 {
763 ssize_t nread;
764 memcached_return_t rc;
765
766 while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, nread))) { };
767
768 if (memcached_failed(rc))
769 {
770 return rc;
771 }
772
773 offset+= size_t(nread);
774 }
775
776 return MEMCACHED_SUCCESS;
777 }
778
779 memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
780 char *buffer_ptr,
781 size_t size,
782 size_t& total_nr)
783 {
784 total_nr= 0;
785 bool line_complete= false;
786
787 while (line_complete == false)
788 {
789 if (ptr->read_buffer_length == 0)
790 {
791 /*
792 * We don't have any data in the buffer, so let's fill the read
793 * buffer. Call the standard read function to avoid duplicating
794 * the logic.
795 */
796 ssize_t nread;
797 memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, nread);
798 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
799 {
800 memcached_quit_server(ptr, true);
801 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
802 }
803 else if (memcached_failed(rc))
804 {
805 return rc;
806 }
807
808 if (*buffer_ptr == '\n')
809 {
810 line_complete= true;
811 }
812
813 ++buffer_ptr;
814 ++total_nr;
815 }
816
817 /* Now let's look in the buffer and copy as we go! */
818 while (ptr->read_buffer_length && total_nr < size && !line_complete)
819 {
820 *buffer_ptr = *ptr->read_ptr;
821 if (*buffer_ptr == '\n')
822 {
823 line_complete = true;
824 }
825 --ptr->read_buffer_length;
826 ++ptr->read_ptr;
827 ++total_nr;
828 ++buffer_ptr;
829 }
830
831 if (total_nr == size)
832 {
833 return MEMCACHED_PROTOCOL_ERROR;
834 }
835 }
836
837 return MEMCACHED_SUCCESS;
838 }