Update for storage to now use vector
[m6w6/libmemcached] / libmemcached / io.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * LibMemcached
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40 #include <libmemcached/common.h>
41
42 enum memc_read_or_write {
43 MEM_READ,
44 MEM_WRITE
45 };
46
47 /**
48 * Try to fill the input buffer for a server with as much
49 * data as possible.
50 *
51 * @param ptr the server to pack
52 */
53 static bool repack_input_buffer(memcached_server_write_instance_st ptr)
54 {
55 if (ptr->read_ptr != ptr->read_buffer)
56 {
57 /* Move all of the data to the beginning of the buffer so
58 ** that we can fit more data into the buffer...
59 */
60 memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
61 ptr->read_ptr= ptr->read_buffer;
62 ptr->read_data_length= ptr->read_buffer_length;
63 }
64
65 /* There is room in the buffer, try to fill it! */
66 if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
67 {
68 do {
69 /* Just try a single read to grab what's available */
70 ssize_t nr= recv(ptr->fd,
71 ptr->read_ptr + ptr->read_data_length,
72 MEMCACHED_MAX_BUFFER - ptr->read_data_length,
73 MSG_DONTWAIT);
74
75 switch (nr)
76 {
77 case SOCKET_ERROR:
78 {
79 switch (get_socket_errno())
80 {
81 case EINTR:
82 continue;
83
84 case EWOULDBLOCK:
85 #ifdef USE_EAGAIN
86 case EAGAIN:
87 #endif
88 #ifdef TARGET_OS_LINUX
89 case ERESTART:
90 #endif
91 break; // No IO is fine, we can just move on
92
93 default:
94 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
95 }
96 }
97 break;
98
99 case 0: // Shutdown on the socket has occurred
100 {
101 memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
102 }
103 break;
104
105 default:
106 {
107 ptr->read_data_length+= size_t(nr);
108 ptr->read_buffer_length+= size_t(nr);
109 return true;
110 }
111 break;
112 }
113 } while (0);
114 }
115 return false;
116 }
117
118 /**
119 * If the we have callbacks connected to this server structure
120 * we may start process the input queue and fire the callbacks
121 * for the incomming messages. This function is _only_ called
122 * when the input buffer is full, so that we _know_ that we have
123 * at least _one_ message to process.
124 *
125 * @param ptr the server to star processing iput messages for
126 * @return true if we processed anything, false otherwise
127 */
128 static bool process_input_buffer(memcached_server_write_instance_st ptr)
129 {
130 /*
131 ** We might be able to process some of the response messages if we
132 ** have a callback set up
133 */
134 if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
135 {
136 /*
137 * We might have responses... try to read them out and fire
138 * callbacks
139 */
140 memcached_callback_st cb= *ptr->root->callbacks;
141
142 memcached_set_processing_input((memcached_st *)ptr->root, true);
143
144 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
145 memcached_return_t error;
146 memcached_st *root= (memcached_st *)ptr->root;
147 error= memcached_response(ptr, buffer, sizeof(buffer),
148 &root->result);
149
150 memcached_set_processing_input(root, false);
151
152 if (error == MEMCACHED_SUCCESS)
153 {
154 for (unsigned int x= 0; x < cb.number_of_callback; x++)
155 {
156 error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
157 if (error != MEMCACHED_SUCCESS)
158 break;
159 }
160
161 /* @todo what should I do with the error message??? */
162 }
163 /* @todo what should I do with other error messages?? */
164 return true;
165 }
166
167 return false;
168 }
169
170 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
171 const memc_read_or_write read_or_write)
172 {
173 struct pollfd fds;
174 fds.fd= ptr->fd;
175 fds.events= POLLIN;
176
177 if (read_or_write == MEM_WRITE) /* write */
178 {
179 fds.events= POLLOUT;
180 WATCHPOINT_SET(ptr->io_wait_count.write++);
181 }
182 else
183 {
184 WATCHPOINT_SET(ptr->io_wait_count.read++);
185 }
186
187 /*
188 ** We are going to block on write, but at least on Solaris we might block
189 ** on write if we haven't read anything from our input buffer..
190 ** Try to purge the input buffer if we don't do any flow control in the
191 ** application layer (just sending a lot of data etc)
192 ** The test is moved down in the purge function to avoid duplication of
193 ** the test.
194 */
195 if (read_or_write == MEM_WRITE)
196 {
197 memcached_return_t rc= memcached_purge(ptr);
198 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
199 {
200 return MEMCACHED_FAILURE;
201 }
202 }
203
204 if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
205 {
206 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
207 }
208
209 size_t loop_max= 5;
210 while (--loop_max) // While loop is for ERESTART or EINTR
211 {
212
213 int error= poll(&fds, 1, ptr->root->poll_timeout);
214 switch (error)
215 {
216 case 1: // Success!
217 WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
218 WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
219
220 return MEMCACHED_SUCCESS;
221
222 case 0: // Timeout occured, we let the while() loop do its thing.
223 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
224
225 default:
226 WATCHPOINT_ERRNO(get_socket_errno());
227 switch (get_socket_errno())
228 {
229 #ifdef TARGET_OS_LINUX
230 case ERESTART:
231 #endif
232 case EINTR:
233 break;
234
235 case EFAULT:
236 case ENOMEM:
237 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
238
239 case EINVAL:
240 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
241
242 default:
243 if (fds.revents & POLLERR)
244 {
245 int err;
246 socklen_t len= sizeof (err);
247 if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
248 {
249 if (err == 0)
250 {
251 continue;
252 }
253 errno= err;
254 }
255 }
256 else
257 {
258 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
259 }
260 int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
261 memcached_quit_server(ptr, true);
262
263 return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
264 }
265 }
266 }
267
268 memcached_quit_server(ptr, true);
269
270 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
271 }
272
273 static bool io_flush(memcached_server_write_instance_st ptr,
274 const bool with_flush,
275 memcached_return_t& error)
276 {
277 /*
278 ** We might want to purge the input buffer if we haven't consumed
279 ** any output yet... The test for the limits is the purge is inline
280 ** in the purge function to avoid duplicating the logic..
281 */
282 {
283 memcached_return_t rc;
284 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
285 rc= memcached_purge(ptr);
286
287 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
288 {
289 return false;
290 }
291 }
292 char *local_write_ptr= ptr->write_buffer;
293 size_t write_length= ptr->write_buffer_offset;
294
295 error= MEMCACHED_SUCCESS;
296
297 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
298
299 // UDP Sanity check, make sure that we are not sending somthing too big
300 if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH)
301 {
302 error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
303 return false;
304 }
305
306 if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
307 {
308 return true;
309 }
310
311 /* Looking for memory overflows */
312 #if defined(DEBUG)
313 if (write_length == MEMCACHED_MAX_BUFFER)
314 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
315 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
316 #endif
317
318 while (write_length)
319 {
320 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
321 WATCHPOINT_ASSERT(write_length > 0);
322 if (memcached_is_udp(ptr->root))
323 {
324 increment_udp_message_id(ptr);
325 }
326
327 ssize_t sent_length= 0;
328 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
329 if (with_flush)
330 {
331 sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
332 }
333 else
334 {
335 sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
336 }
337
338 if (sent_length == SOCKET_ERROR)
339 {
340 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
341 #if 0 // @todo I should look at why we hit this bit of code hard frequently
342 WATCHPOINT_ERRNO(get_socket_errno());
343 WATCHPOINT_NUMBER(get_socket_errno());
344 #endif
345 switch (get_socket_errno())
346 {
347 case ENOBUFS:
348 continue;
349 case EWOULDBLOCK:
350 #ifdef USE_EAGAIN
351 case EAGAIN:
352 #endif
353 {
354 /*
355 * We may be blocked on write because the input buffer
356 * is full. Let's check if we have room in our input
357 * buffer for more data and retry the write before
358 * waiting..
359 */
360 if (repack_input_buffer(ptr) or process_input_buffer(ptr))
361 {
362 continue;
363 }
364
365 memcached_return_t rc= io_wait(ptr, MEM_WRITE);
366 if (memcached_success(rc))
367 {
368 continue;
369 }
370 else if (rc == MEMCACHED_TIMEOUT)
371 {
372 error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
373 return false;
374 }
375
376 memcached_quit_server(ptr, true);
377 error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
378 return false;
379 }
380 case ENOTCONN:
381 case EPIPE:
382 default:
383 memcached_quit_server(ptr, true);
384 error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
385 WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
386 return false;
387 }
388 }
389
390 if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length)
391 {
392 memcached_quit_server(ptr, true);
393 error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
394 return false;
395 }
396
397 ptr->io_bytes_sent+= uint32_t(sent_length);
398
399 local_write_ptr+= sent_length;
400 write_length-= uint32_t(sent_length);
401 }
402
403 WATCHPOINT_ASSERT(write_length == 0);
404 if (memcached_is_udp(ptr->root))
405 {
406 ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
407 }
408 else
409 {
410 ptr->write_buffer_offset= 0;
411 }
412
413 return true;
414 }
415
416 memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
417 {
418 return io_wait(ptr, MEM_WRITE);
419 }
420
421 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
422 void *buffer, size_t length, ssize_t *nread)
423 {
424 assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
425 char *buffer_ptr= static_cast<char *>(buffer);
426
427 if (ptr->fd == INVALID_SOCKET)
428 {
429 #if 0
430 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
431 #endif
432 return MEMCACHED_CONNECTION_FAILURE;
433 }
434
435 while (length)
436 {
437 if (not ptr->read_buffer_length)
438 {
439 ssize_t data_read;
440 do
441 {
442 data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
443 if (data_read == SOCKET_ERROR)
444 {
445 switch (get_socket_errno())
446 {
447 case EINTR: // We just retry
448 continue;
449
450 case ETIMEDOUT: // OSX
451 case EWOULDBLOCK:
452 #ifdef USE_EAGAIN
453 case EAGAIN:
454 #endif
455 #ifdef TARGET_OS_LINUX
456 case ERESTART:
457 #endif
458 if (memcached_success(io_wait(ptr, MEM_READ)))
459 {
460 continue;
461 }
462 return MEMCACHED_IN_PROGRESS;
463
464 /* fall through */
465
466 case ENOTCONN: // Programmer Error
467 WATCHPOINT_ASSERT(0);
468 case ENOTSOCK:
469 WATCHPOINT_ASSERT(0);
470 case EBADF:
471 assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket");
472 case EINVAL:
473 case EFAULT:
474 case ECONNREFUSED:
475 default:
476 {
477 memcached_quit_server(ptr, true);
478 *nread= -1;
479 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
480 }
481 }
482 }
483 else if (data_read == 0)
484 {
485 /*
486 EOF. Any data received so far is incomplete
487 so discard it. This always reads by byte in case of TCP
488 and protocol enforcement happens at memcached_response()
489 looking for '\n'. We do not care for UDB which requests 8 bytes
490 at once. Generally, this means that connection went away. Since
491 for blocking I/O we do not return 0 and for non-blocking case
492 it will return EGAIN if data is not immediatly available.
493 */
494 WATCHPOINT_STRING("We had a zero length recv()");
495 memcached_quit_server(ptr, true);
496 *nread= -1;
497 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
498 }
499 } while (data_read <= 0);
500
501 ptr->io_bytes_sent = 0;
502 ptr->read_data_length= (size_t) data_read;
503 ptr->read_buffer_length= (size_t) data_read;
504 ptr->read_ptr= ptr->read_buffer;
505 }
506
507 if (length > 1)
508 {
509 size_t difference;
510
511 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
512
513 memcpy(buffer_ptr, ptr->read_ptr, difference);
514 length -= difference;
515 ptr->read_ptr+= difference;
516 ptr->read_buffer_length-= difference;
517 buffer_ptr+= difference;
518 }
519 else
520 {
521 *buffer_ptr= *ptr->read_ptr;
522 ptr->read_ptr++;
523 ptr->read_buffer_length--;
524 buffer_ptr++;
525 break;
526 }
527 }
528
529 *nread = (ssize_t)(buffer_ptr - (char*)buffer);
530
531 return MEMCACHED_SUCCESS;
532 }
533
534 memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
535 {
536 assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
537
538 if (ptr->fd == INVALID_SOCKET)
539 {
540 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state");
541 return MEMCACHED_CONNECTION_FAILURE;
542 }
543
544 ssize_t data_read;
545 char buffer[MEMCACHED_MAX_BUFFER];
546 do
547 {
548 data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
549 if (data_read == SOCKET_ERROR)
550 {
551 switch (get_socket_errno())
552 {
553 case EINTR: // We just retry
554 continue;
555
556 case ETIMEDOUT: // OSX
557 case EWOULDBLOCK:
558 #ifdef USE_EAGAIN
559 case EAGAIN:
560 #endif
561 #ifdef TARGET_OS_LINUX
562 case ERESTART:
563 #endif
564 if (memcached_success(io_wait(ptr, MEM_READ)))
565 {
566 continue;
567 }
568 return MEMCACHED_IN_PROGRESS;
569
570 /* fall through */
571
572 case ENOTCONN: // Programmer Error
573 WATCHPOINT_ASSERT(0);
574 case ENOTSOCK:
575 WATCHPOINT_ASSERT(0);
576 case EBADF:
577 assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state");
578 case EINVAL:
579 case EFAULT:
580 case ECONNREFUSED:
581 default:
582 return MEMCACHED_CONNECTION_FAILURE; // We want this!
583 }
584 }
585 } while (data_read > 0);
586
587 return MEMCACHED_CONNECTION_FAILURE;
588 }
589
590 static ssize_t _io_write(memcached_server_write_instance_st ptr,
591 const void *buffer, size_t length, bool with_flush)
592 {
593 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
594
595 size_t original_length= length;
596 const char *buffer_ptr= static_cast<const char *>(buffer);
597
598 while (length)
599 {
600 char *write_ptr;
601 size_t should_write;
602 size_t buffer_end;
603
604 if (memcached_is_udp(ptr->root))
605 {
606 //UDP does not support partial writes
607 buffer_end= MAX_UDP_DATAGRAM_LENGTH;
608 should_write= length;
609 if (ptr->write_buffer_offset + should_write > buffer_end)
610 {
611 return -1;
612 }
613 }
614 else
615 {
616 buffer_end= MEMCACHED_MAX_BUFFER;
617 should_write= buffer_end - ptr->write_buffer_offset;
618 should_write= (should_write < length) ? should_write : length;
619 }
620
621 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
622 memcpy(write_ptr, buffer_ptr, should_write);
623 ptr->write_buffer_offset+= should_write;
624 buffer_ptr+= should_write;
625 length-= should_write;
626
627 if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false)
628 {
629 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
630
631 memcached_return_t rc;
632 if (io_flush(ptr, with_flush, rc) == false)
633 {
634 return -1;
635 }
636 }
637 }
638
639 if (with_flush)
640 {
641 memcached_return_t rc;
642 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
643 if (io_flush(ptr, with_flush, rc) == false)
644 {
645 return -1;
646 }
647 }
648
649 return (ssize_t) original_length;
650 }
651
652 bool memcached_io_write(memcached_server_write_instance_st ptr)
653 {
654 return (_io_write(ptr, NULL, 0, true) >= 0);
655 }
656
657 ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
658 const void *buffer, const size_t length, const bool with_flush)
659 {
660 return _io_write(ptr, buffer, length, with_flush);
661 }
662
663 size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of)
664 {
665 ssize_t total= 0;
666
667 for (size_t x= 0; x < number_of; x++)
668 {
669 total+= vector->length;
670 }
671
672 return total;
673 }
674
675 ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
676 const struct libmemcached_io_vector_st *vector,
677 size_t number_of, bool with_flush)
678 {
679 ssize_t total= 0;
680
681 for (size_t x= 0; x < number_of; x++, vector++)
682 {
683 ssize_t returnable;
684
685 if (vector->length)
686 {
687 if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
688 {
689 return -1;
690 }
691 total+= returnable;
692 }
693 }
694
695 if (with_flush)
696 {
697 if (memcached_io_write(ptr) == false)
698 {
699 return -1;
700 }
701 }
702
703 return total;
704 }
705
706
707 void memcached_io_close(memcached_server_write_instance_st ptr)
708 {
709 if (ptr->fd == INVALID_SOCKET)
710 {
711 return;
712 }
713
714 /* in case of death shutdown to avoid blocking at close() */
715 if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
716 {
717 WATCHPOINT_NUMBER(ptr->fd);
718 WATCHPOINT_ERRNO(get_socket_errno());
719 WATCHPOINT_ASSERT(get_socket_errno());
720 }
721
722 if (closesocket(ptr->fd) == SOCKET_ERROR)
723 {
724 WATCHPOINT_ERRNO(get_socket_errno());
725 }
726 ptr->state= MEMCACHED_SERVER_STATE_NEW;
727 ptr->fd= INVALID_SOCKET;
728 }
729
730 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
731 {
732 #define MAX_SERVERS_TO_POLL 100
733 struct pollfd fds[MAX_SERVERS_TO_POLL];
734 unsigned int host_index= 0;
735
736 for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
737 {
738 memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
739
740 if (instance->read_buffer_length > 0) /* I have data in the buffer */
741 {
742 return instance;
743 }
744
745 if (memcached_server_response_count(instance) > 0)
746 {
747 fds[host_index].events = POLLIN;
748 fds[host_index].revents = 0;
749 fds[host_index].fd = instance->fd;
750 ++host_index;
751 }
752 }
753
754 if (host_index < 2)
755 {
756 /* We have 0 or 1 server with pending events.. */
757 for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
758 {
759 memcached_server_write_instance_st instance=
760 memcached_server_instance_fetch(memc, x);
761
762 if (memcached_server_response_count(instance) > 0)
763 {
764 return instance;
765 }
766 }
767
768 return NULL;
769 }
770
771 int error= poll(fds, host_index, memc->poll_timeout);
772 switch (error)
773 {
774 case -1:
775 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
776 /* FALLTHROUGH */
777 case 0:
778 break;
779
780 default:
781 for (size_t x= 0; x < host_index; ++x)
782 {
783 if (fds[x].revents & POLLIN)
784 {
785 for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
786 {
787 memcached_server_write_instance_st instance=
788 memcached_server_instance_fetch(memc, y);
789
790 if (instance->fd == fds[x].fd)
791 return instance;
792 }
793 }
794 }
795 }
796
797 return NULL;
798 }
799
800 /*
801 Eventually we will just kill off the server with the problem.
802 */
803 void memcached_io_reset(memcached_server_write_instance_st ptr)
804 {
805 memcached_quit_server(ptr, true);
806 }
807
808 /**
809 * Read a given number of bytes from the server and place it into a specific
810 * buffer. Reset the IO channel on this server if an error occurs.
811 */
812 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
813 void *dta,
814 const size_t size)
815 {
816 size_t offset= 0;
817 char *data= static_cast<char *>(dta);
818
819 while (offset < size)
820 {
821 ssize_t nread;
822 memcached_return_t rc;
823
824 while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
825
826 if (memcached_failed(rc))
827 {
828 return rc;
829 }
830
831 offset+= (size_t) nread;
832 }
833
834 return MEMCACHED_SUCCESS;
835 }
836
837 memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
838 char *buffer_ptr,
839 size_t size,
840 size_t& total_nr)
841 {
842 total_nr= 0;
843 bool line_complete= false;
844
845 while (line_complete == false)
846 {
847 if (ptr->read_buffer_length == 0)
848 {
849 /*
850 * We don't have any data in the buffer, so let's fill the read
851 * buffer. Call the standard read function to avoid duplicating
852 * the logic.
853 */
854 ssize_t nread;
855 memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
856 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
857 {
858 memcached_quit_server(ptr, true);
859 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
860 }
861 else if (memcached_failed(rc))
862 {
863 return rc;
864 }
865
866 if (*buffer_ptr == '\n')
867 {
868 line_complete= true;
869 }
870
871 ++buffer_ptr;
872 ++total_nr;
873 }
874
875 /* Now let's look in the buffer and copy as we go! */
876 while (ptr->read_buffer_length && total_nr < size && !line_complete)
877 {
878 *buffer_ptr = *ptr->read_ptr;
879 if (*buffer_ptr == '\n')
880 {
881 line_complete = true;
882 }
883 --ptr->read_buffer_length;
884 ++ptr->read_ptr;
885 ++total_nr;
886 ++buffer_ptr;
887 }
888
889 if (total_nr == size)
890 {
891 return MEMCACHED_PROTOCOL_ERROR;
892 }
893 }
894
895 return MEMCACHED_SUCCESS;
896 }