Update logic around UDP.
[awesomized/libmemcached] / libmemcached / io.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * LibMemcached
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40 #include <libmemcached/common.h>
41
42 enum memc_read_or_write {
43 MEM_READ,
44 MEM_WRITE
45 };
46
47 /**
48 * Try to fill the input buffer for a server with as much
49 * data as possible.
50 *
51 * @param ptr the server to pack
52 */
53 static bool repack_input_buffer(memcached_server_write_instance_st ptr)
54 {
55 if (ptr->read_ptr != ptr->read_buffer)
56 {
57 /* Move all of the data to the beginning of the buffer so
58 ** that we can fit more data into the buffer...
59 */
60 memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
61 ptr->read_ptr= ptr->read_buffer;
62 ptr->read_data_length= ptr->read_buffer_length;
63 }
64
65 /* There is room in the buffer, try to fill it! */
66 if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
67 {
68 do {
69 /* Just try a single read to grab what's available */
70 ssize_t nr= recv(ptr->fd,
71 ptr->read_ptr + ptr->read_data_length,
72 MEMCACHED_MAX_BUFFER - ptr->read_data_length,
73 MSG_DONTWAIT);
74
75 switch (nr)
76 {
77 case SOCKET_ERROR:
78 {
79 switch (get_socket_errno())
80 {
81 case EINTR:
82 continue;
83
84 case EWOULDBLOCK:
85 #ifdef USE_EAGAIN
86 case EAGAIN:
87 #endif
88 #ifdef TARGET_OS_LINUX
89 case ERESTART:
90 #endif
91 break; // No IO is fine, we can just move on
92
93 default:
94 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
95 }
96 }
97 break;
98
99 case 0: // Shutdown on the socket has occurred
100 {
101 memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
102 }
103 break;
104
105 default:
106 {
107 ptr->read_data_length+= size_t(nr);
108 ptr->read_buffer_length+= size_t(nr);
109 return true;
110 }
111 break;
112 }
113 } while (0);
114 }
115 return false;
116 }
117
118 /**
119 * If the we have callbacks connected to this server structure
120 * we may start process the input queue and fire the callbacks
121 * for the incomming messages. This function is _only_ called
122 * when the input buffer is full, so that we _know_ that we have
123 * at least _one_ message to process.
124 *
125 * @param ptr the server to star processing iput messages for
126 * @return true if we processed anything, false otherwise
127 */
128 static bool process_input_buffer(memcached_server_write_instance_st ptr)
129 {
130 /*
131 ** We might be able to process some of the response messages if we
132 ** have a callback set up
133 */
134 if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
135 {
136 /*
137 * We might have responses... try to read them out and fire
138 * callbacks
139 */
140 memcached_callback_st cb= *ptr->root->callbacks;
141
142 memcached_set_processing_input((memcached_st *)ptr->root, true);
143
144 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
145 memcached_return_t error;
146 memcached_st *root= (memcached_st *)ptr->root;
147 error= memcached_response(ptr, buffer, sizeof(buffer),
148 &root->result);
149
150 memcached_set_processing_input(root, false);
151
152 if (error == MEMCACHED_SUCCESS)
153 {
154 for (unsigned int x= 0; x < cb.number_of_callback; x++)
155 {
156 error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
157 if (error != MEMCACHED_SUCCESS)
158 break;
159 }
160
161 /* @todo what should I do with the error message??? */
162 }
163 /* @todo what should I do with other error messages?? */
164 return true;
165 }
166
167 return false;
168 }
169
170 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
171 const memc_read_or_write read_or_write)
172 {
173 struct pollfd fds;
174 fds.fd= ptr->fd;
175 fds.events= POLLIN;
176
177 if (read_or_write == MEM_WRITE) /* write */
178 {
179 fds.events= POLLOUT;
180 WATCHPOINT_SET(ptr->io_wait_count.write++);
181 }
182 else
183 {
184 WATCHPOINT_SET(ptr->io_wait_count.read++);
185 }
186
187 /*
188 ** We are going to block on write, but at least on Solaris we might block
189 ** on write if we haven't read anything from our input buffer..
190 ** Try to purge the input buffer if we don't do any flow control in the
191 ** application layer (just sending a lot of data etc)
192 ** The test is moved down in the purge function to avoid duplication of
193 ** the test.
194 */
195 if (read_or_write == MEM_WRITE)
196 {
197 memcached_return_t rc= memcached_purge(ptr);
198 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
199 {
200 return MEMCACHED_FAILURE;
201 }
202 }
203
204 if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
205 {
206 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
207 }
208
209 size_t loop_max= 5;
210 while (--loop_max) // While loop is for ERESTART or EINTR
211 {
212
213 int error= poll(&fds, 1, ptr->root->poll_timeout);
214 switch (error)
215 {
216 case 1: // Success!
217 WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
218 WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
219
220 return MEMCACHED_SUCCESS;
221
222 case 0: // Timeout occured, we let the while() loop do its thing.
223 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
224
225 default:
226 WATCHPOINT_ERRNO(get_socket_errno());
227 switch (get_socket_errno())
228 {
229 #ifdef TARGET_OS_LINUX
230 case ERESTART:
231 #endif
232 case EINTR:
233 break;
234
235 case EFAULT:
236 case ENOMEM:
237 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
238
239 case EINVAL:
240 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
241
242 default:
243 if (fds.revents & POLLERR)
244 {
245 int err;
246 socklen_t len= sizeof (err);
247 if (getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0)
248 {
249 if (err == 0)
250 {
251 continue;
252 }
253 errno= err;
254 }
255 }
256 else
257 {
258 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
259 }
260 int local_errno= get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
261 memcached_quit_server(ptr, true);
262
263 return memcached_set_errno(*ptr, local_errno, MEMCACHED_AT);
264 }
265 }
266 }
267
268 memcached_quit_server(ptr, true);
269
270 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
271 }
272
273 static bool io_flush(memcached_server_write_instance_st ptr,
274 const bool with_flush,
275 memcached_return_t& error)
276 {
277 /*
278 ** We might want to purge the input buffer if we haven't consumed
279 ** any output yet... The test for the limits is the purge is inline
280 ** in the purge function to avoid duplicating the logic..
281 */
282 {
283 memcached_return_t rc;
284 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
285 rc= memcached_purge(ptr);
286
287 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
288 {
289 return false;
290 }
291 }
292 char *local_write_ptr= ptr->write_buffer;
293 size_t write_length= ptr->write_buffer_offset;
294
295 error= MEMCACHED_SUCCESS;
296
297 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
298
299 // UDP Sanity check, make sure that we are not sending somthing too big
300 if (memcached_is_udp(ptr->root) and write_length > MAX_UDP_DATAGRAM_LENGTH)
301 {
302 error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
303 return false;
304 }
305
306 if (ptr->write_buffer_offset == 0 or (memcached_is_udp(ptr->root) and ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
307 {
308 return true;
309 }
310
311 /* Looking for memory overflows */
312 #if defined(DEBUG)
313 if (write_length == MEMCACHED_MAX_BUFFER)
314 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
315 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
316 #endif
317
318 while (write_length)
319 {
320 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
321 WATCHPOINT_ASSERT(write_length > 0);
322 if (memcached_is_udp(ptr->root))
323 {
324 increment_udp_message_id(ptr);
325 }
326
327 ssize_t sent_length= 0;
328 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
329 if (with_flush)
330 {
331 sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
332 }
333 else
334 {
335 sent_length= ::send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
336 }
337
338 if (sent_length == SOCKET_ERROR)
339 {
340 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
341 #if 0 // @todo I should look at why we hit this bit of code hard frequently
342 WATCHPOINT_ERRNO(get_socket_errno());
343 WATCHPOINT_NUMBER(get_socket_errno());
344 #endif
345 switch (get_socket_errno())
346 {
347 case ENOBUFS:
348 continue;
349 case EWOULDBLOCK:
350 #ifdef USE_EAGAIN
351 case EAGAIN:
352 #endif
353 {
354 /*
355 * We may be blocked on write because the input buffer
356 * is full. Let's check if we have room in our input
357 * buffer for more data and retry the write before
358 * waiting..
359 */
360 if (repack_input_buffer(ptr) or process_input_buffer(ptr))
361 {
362 continue;
363 }
364
365 memcached_return_t rc= io_wait(ptr, MEM_WRITE);
366 if (memcached_success(rc))
367 {
368 continue;
369 }
370 else if (rc == MEMCACHED_TIMEOUT)
371 {
372 error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
373 return false;
374 }
375
376 memcached_quit_server(ptr, true);
377 error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
378 return false;
379 }
380 case ENOTCONN:
381 case EPIPE:
382 default:
383 memcached_quit_server(ptr, true);
384 error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
385 WATCHPOINT_ASSERT(ptr->fd == INVALID_SOCKET);
386 return false;
387 }
388 }
389
390 if (memcached_is_udp(ptr->root) and size_t(sent_length) != write_length)
391 {
392 memcached_quit_server(ptr, true);
393 error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
394 return false;
395 }
396
397 ptr->io_bytes_sent+= uint32_t(sent_length);
398
399 local_write_ptr+= sent_length;
400 write_length-= uint32_t(sent_length);
401 }
402
403 WATCHPOINT_ASSERT(write_length == 0);
404 if (memcached_is_udp(ptr->root))
405 {
406 ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
407 }
408 else
409 {
410 ptr->write_buffer_offset= 0;
411 }
412
413 return true;
414 }
415
416 memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
417 {
418 return io_wait(ptr, MEM_WRITE);
419 }
420
421 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
422 void *buffer, size_t length, ssize_t *nread)
423 {
424 assert_msg(ptr, "Programmer error, memcached_io_read() recieved an invalid memcached_server_write_instance_st"); // Programmer error
425 char *buffer_ptr= static_cast<char *>(buffer);
426
427 if (ptr->fd == INVALID_SOCKET)
428 {
429 #if 0
430 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
431 #endif
432 return MEMCACHED_CONNECTION_FAILURE;
433 }
434
435 while (length)
436 {
437 if (not ptr->read_buffer_length)
438 {
439 ssize_t data_read;
440 do
441 {
442 data_read= ::recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
443 if (data_read == SOCKET_ERROR)
444 {
445 switch (get_socket_errno())
446 {
447 case EINTR: // We just retry
448 continue;
449
450 case ETIMEDOUT: // OSX
451 case EWOULDBLOCK:
452 #ifdef USE_EAGAIN
453 case EAGAIN:
454 #endif
455 #ifdef TARGET_OS_LINUX
456 case ERESTART:
457 #endif
458 if (memcached_success(io_wait(ptr, MEM_READ)))
459 {
460 continue;
461 }
462 return MEMCACHED_IN_PROGRESS;
463
464 /* fall through */
465
466 case ENOTCONN: // Programmer Error
467 WATCHPOINT_ASSERT(0);
468 case ENOTSOCK:
469 WATCHPOINT_ASSERT(0);
470 case EBADF:
471 assert_msg(ptr->fd != INVALID_SOCKET, "Programmer error, invalid socket");
472 case EINVAL:
473 case EFAULT:
474 case ECONNREFUSED:
475 default:
476 {
477 memcached_quit_server(ptr, true);
478 *nread= -1;
479 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
480 }
481 }
482 }
483 else if (data_read == 0)
484 {
485 /*
486 EOF. Any data received so far is incomplete
487 so discard it. This always reads by byte in case of TCP
488 and protocol enforcement happens at memcached_response()
489 looking for '\n'. We do not care for UDB which requests 8 bytes
490 at once. Generally, this means that connection went away. Since
491 for blocking I/O we do not return 0 and for non-blocking case
492 it will return EGAIN if data is not immediatly available.
493 */
494 WATCHPOINT_STRING("We had a zero length recv()");
495 memcached_quit_server(ptr, true);
496 *nread= -1;
497 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT,
498 memcached_literal_param("::rec() returned zero, server has disconnected"));
499 }
500 } while (data_read <= 0);
501
502 ptr->io_bytes_sent = 0;
503 ptr->read_data_length= (size_t) data_read;
504 ptr->read_buffer_length= (size_t) data_read;
505 ptr->read_ptr= ptr->read_buffer;
506 }
507
508 if (length > 1)
509 {
510 size_t difference;
511
512 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
513
514 memcpy(buffer_ptr, ptr->read_ptr, difference);
515 length -= difference;
516 ptr->read_ptr+= difference;
517 ptr->read_buffer_length-= difference;
518 buffer_ptr+= difference;
519 }
520 else
521 {
522 *buffer_ptr= *ptr->read_ptr;
523 ptr->read_ptr++;
524 ptr->read_buffer_length--;
525 buffer_ptr++;
526 break;
527 }
528 }
529
530 *nread = (ssize_t)(buffer_ptr - (char*)buffer);
531
532 return MEMCACHED_SUCCESS;
533 }
534
535 memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
536 {
537 assert_msg(ptr, "Programmer error, invalid memcached_server_write_instance_st");
538
539 if (ptr->fd == INVALID_SOCKET)
540 {
541 assert_msg(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Invalid socket state");
542 return MEMCACHED_CONNECTION_FAILURE;
543 }
544
545 ssize_t data_read;
546 char buffer[MEMCACHED_MAX_BUFFER];
547 do
548 {
549 data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
550 if (data_read == SOCKET_ERROR)
551 {
552 switch (get_socket_errno())
553 {
554 case EINTR: // We just retry
555 continue;
556
557 case ETIMEDOUT: // OSX
558 case EWOULDBLOCK:
559 #ifdef USE_EAGAIN
560 case EAGAIN:
561 #endif
562 #ifdef TARGET_OS_LINUX
563 case ERESTART:
564 #endif
565 if (memcached_success(io_wait(ptr, MEM_READ)))
566 {
567 continue;
568 }
569 return MEMCACHED_IN_PROGRESS;
570
571 /* fall through */
572
573 case ENOTCONN: // Programmer Error
574 WATCHPOINT_ASSERT(0);
575 case ENOTSOCK:
576 WATCHPOINT_ASSERT(0);
577 case EBADF:
578 assert_msg(ptr->fd != INVALID_SOCKET, "Invalid socket state");
579 case EINVAL:
580 case EFAULT:
581 case ECONNREFUSED:
582 default:
583 return MEMCACHED_CONNECTION_FAILURE; // We want this!
584 }
585 }
586 } while (data_read > 0);
587
588 return MEMCACHED_CONNECTION_FAILURE;
589 }
590
591 static ssize_t _io_write(memcached_server_write_instance_st ptr,
592 const void *buffer, size_t length, bool with_flush)
593 {
594 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
595
596 size_t original_length= length;
597 const char *buffer_ptr= static_cast<const char *>(buffer);
598
599 while (length)
600 {
601 char *write_ptr;
602 size_t should_write;
603 size_t buffer_end;
604
605 if (memcached_is_udp(ptr->root))
606 {
607 //UDP does not support partial writes
608 buffer_end= MAX_UDP_DATAGRAM_LENGTH;
609 should_write= length;
610 if (ptr->write_buffer_offset + should_write > buffer_end)
611 {
612 return -1;
613 }
614 }
615 else
616 {
617 buffer_end= MEMCACHED_MAX_BUFFER;
618 should_write= buffer_end - ptr->write_buffer_offset;
619 should_write= (should_write < length) ? should_write : length;
620 }
621
622 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
623 memcpy(write_ptr, buffer_ptr, should_write);
624 ptr->write_buffer_offset+= should_write;
625 buffer_ptr+= should_write;
626 length-= should_write;
627
628 if (ptr->write_buffer_offset == buffer_end and memcached_is_udp(ptr->root) == false)
629 {
630 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
631
632 memcached_return_t rc;
633 if (io_flush(ptr, with_flush, rc) == false)
634 {
635 return -1;
636 }
637 }
638 }
639
640 if (with_flush)
641 {
642 memcached_return_t rc;
643 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
644 if (io_flush(ptr, with_flush, rc) == false)
645 {
646 return -1;
647 }
648 }
649
650 return (ssize_t) original_length;
651 }
652
653 bool memcached_io_write(memcached_server_write_instance_st ptr)
654 {
655 return (_io_write(ptr, NULL, 0, true) >= 0);
656 }
657
658 ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
659 const void *buffer, const size_t length, const bool with_flush)
660 {
661 return _io_write(ptr, buffer, length, with_flush);
662 }
663
664 size_t io_vector_total_size(libmemcached_io_vector_st* vector, const size_t number_of)
665 {
666 ssize_t total= 0;
667
668 for (size_t x= 0; x < number_of; x++)
669 {
670 total+= vector->length;
671 }
672
673 return total;
674 }
675
676 ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
677 const struct libmemcached_io_vector_st *vector,
678 size_t number_of, bool with_flush)
679 {
680 ssize_t total= 0;
681
682 for (size_t x= 0; x < number_of; x++, vector++)
683 {
684 ssize_t returnable;
685
686 if (vector->length)
687 {
688 if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
689 {
690 return -1;
691 }
692 total+= returnable;
693 }
694 }
695
696 if (with_flush)
697 {
698 if (memcached_io_write(ptr) == false)
699 {
700 return -1;
701 }
702 }
703
704 return total;
705 }
706
707
708 void memcached_io_close(memcached_server_write_instance_st ptr)
709 {
710 if (ptr->fd == INVALID_SOCKET)
711 {
712 return;
713 }
714
715 /* in case of death shutdown to avoid blocking at close() */
716 if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
717 {
718 WATCHPOINT_NUMBER(ptr->fd);
719 WATCHPOINT_ERRNO(get_socket_errno());
720 WATCHPOINT_ASSERT(get_socket_errno());
721 }
722
723 if (closesocket(ptr->fd) == SOCKET_ERROR)
724 {
725 WATCHPOINT_ERRNO(get_socket_errno());
726 }
727 ptr->state= MEMCACHED_SERVER_STATE_NEW;
728 ptr->fd= INVALID_SOCKET;
729 }
730
731 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
732 {
733 #define MAX_SERVERS_TO_POLL 100
734 struct pollfd fds[MAX_SERVERS_TO_POLL];
735 unsigned int host_index= 0;
736
737 for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
738 {
739 memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, x);
740
741 if (instance->read_buffer_length > 0) /* I have data in the buffer */
742 {
743 return instance;
744 }
745
746 if (memcached_server_response_count(instance) > 0)
747 {
748 fds[host_index].events = POLLIN;
749 fds[host_index].revents = 0;
750 fds[host_index].fd = instance->fd;
751 ++host_index;
752 }
753 }
754
755 if (host_index < 2)
756 {
757 /* We have 0 or 1 server with pending events.. */
758 for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
759 {
760 memcached_server_write_instance_st instance=
761 memcached_server_instance_fetch(memc, x);
762
763 if (memcached_server_response_count(instance) > 0)
764 {
765 return instance;
766 }
767 }
768
769 return NULL;
770 }
771
772 int error= poll(fds, host_index, memc->poll_timeout);
773 switch (error)
774 {
775 case -1:
776 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
777 /* FALLTHROUGH */
778 case 0:
779 break;
780
781 default:
782 for (size_t x= 0; x < host_index; ++x)
783 {
784 if (fds[x].revents & POLLIN)
785 {
786 for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
787 {
788 memcached_server_write_instance_st instance=
789 memcached_server_instance_fetch(memc, y);
790
791 if (instance->fd == fds[x].fd)
792 return instance;
793 }
794 }
795 }
796 }
797
798 return NULL;
799 }
800
801 /*
802 Eventually we will just kill off the server with the problem.
803 */
804 void memcached_io_reset(memcached_server_write_instance_st ptr)
805 {
806 memcached_quit_server(ptr, true);
807 }
808
809 /**
810 * Read a given number of bytes from the server and place it into a specific
811 * buffer. Reset the IO channel on this server if an error occurs.
812 */
813 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
814 void *dta,
815 const size_t size)
816 {
817 size_t offset= 0;
818 char *data= static_cast<char *>(dta);
819
820 while (offset < size)
821 {
822 ssize_t nread;
823 memcached_return_t rc;
824
825 while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
826
827 if (memcached_failed(rc))
828 {
829 return rc;
830 }
831
832 offset+= (size_t) nread;
833 }
834
835 return MEMCACHED_SUCCESS;
836 }
837
838 memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
839 char *buffer_ptr,
840 size_t size,
841 size_t& total_nr)
842 {
843 total_nr= 0;
844 bool line_complete= false;
845
846 while (line_complete == false)
847 {
848 if (ptr->read_buffer_length == 0)
849 {
850 /*
851 * We don't have any data in the buffer, so let's fill the read
852 * buffer. Call the standard read function to avoid duplicating
853 * the logic.
854 */
855 ssize_t nread;
856 memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
857 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
858 {
859 memcached_quit_server(ptr, true);
860 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
861 }
862 else if (memcached_failed(rc))
863 {
864 return rc;
865 }
866
867 if (*buffer_ptr == '\n')
868 {
869 line_complete= true;
870 }
871
872 ++buffer_ptr;
873 ++total_nr;
874 }
875
876 /* Now let's look in the buffer and copy as we go! */
877 while (ptr->read_buffer_length && total_nr < size && !line_complete)
878 {
879 *buffer_ptr = *ptr->read_ptr;
880 if (*buffer_ptr == '\n')
881 {
882 line_complete = true;
883 }
884 --ptr->read_buffer_length;
885 ++ptr->read_ptr;
886 ++total_nr;
887 ++buffer_ptr;
888 }
889
890 if (total_nr == size)
891 {
892 return MEMCACHED_PROTOCOL_ERROR;
893 }
894 }
895
896 return MEMCACHED_SUCCESS;
897 }