Merge in docs.
[m6w6/libmemcached] / libmemcached / io.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * LibMemcached
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40 #include <libmemcached/common.h>
41 #include <cassert>
42
43 enum memc_read_or_write {
44 MEM_READ,
45 MEM_WRITE
46 };
47
48 static ssize_t io_flush(memcached_server_write_instance_st ptr,
49 const bool with_flush,
50 memcached_return_t *error);
51 static void increment_udp_message_id(memcached_server_write_instance_st ptr);
52
53 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
54 memc_read_or_write read_or_write)
55 {
56 struct pollfd fds;
57 fds.fd= ptr->fd;
58 fds.events= POLLIN;
59
60 int error;
61
62 if (read_or_write == MEM_WRITE) /* write */
63 {
64 fds.events= POLLOUT;
65 WATCHPOINT_SET(ptr->io_wait_count.write++);
66 }
67 else
68 {
69 WATCHPOINT_SET(ptr->io_wait_count.read++);
70 }
71
72 /*
73 ** We are going to block on write, but at least on Solaris we might block
74 ** on write if we haven't read anything from our input buffer..
75 ** Try to purge the input buffer if we don't do any flow control in the
76 ** application layer (just sending a lot of data etc)
77 ** The test is moved down in the purge function to avoid duplication of
78 ** the test.
79 */
80 if (read_or_write == MEM_WRITE)
81 {
82 memcached_return_t rc= memcached_purge(ptr);
83 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
84 {
85 return MEMCACHED_FAILURE;
86 }
87 }
88
89 size_t loop_max= 5;
90 while (--loop_max) // While loop is for ERESTART or EINTR
91 {
92 if (ptr->root->poll_timeout) // Mimic 0 causes timeout behavior (not all platforms do this)
93 {
94 error= poll(&fds, 1, ptr->root->poll_timeout);
95 }
96 else
97 {
98 error= 0;
99 }
100
101 switch (error)
102 {
103 case 1: // Success!
104 WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
105 WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
106
107 return MEMCACHED_SUCCESS;
108
109 case 0: // Timeout occured, we let the while() loop do its thing.
110 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
111
112 default:
113 WATCHPOINT_ERRNO(get_socket_errno());
114 switch (get_socket_errno())
115 {
116 #ifdef TARGET_OS_LINUX
117 case ERESTART:
118 #endif
119 case EINTR:
120 break;
121
122 case EFAULT:
123 case ENOMEM:
124 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
125
126 case EINVAL:
127 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
128
129 default:
130 if (fds.revents & POLLERR)
131 {
132 int err;
133 socklen_t len= sizeof (err);
134 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
135 ptr->cached_errno= (err == 0) ? get_socket_errno() : err;
136 }
137 else
138 {
139 ptr->cached_errno= get_socket_errno();
140 }
141 memcached_quit_server(ptr, true);
142
143 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
144 }
145 }
146 }
147
148 /* Imposssible for anything other then -1 */
149 WATCHPOINT_ASSERT(error == -1);
150 ptr->cached_errno= get_socket_errno();
151 memcached_quit_server(ptr, true);
152
153 return memcached_set_error(*ptr, MEMCACHED_FAILURE, MEMCACHED_AT);
154 }
155
156 memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
157 {
158 return io_wait(ptr, MEM_WRITE);
159 }
160
161 /**
162 * Try to fill the input buffer for a server with as much
163 * data as possible.
164 *
165 * @param ptr the server to pack
166 */
167 static bool repack_input_buffer(memcached_server_write_instance_st ptr)
168 {
169 if (ptr->read_ptr != ptr->read_buffer)
170 {
171 /* Move all of the data to the beginning of the buffer so
172 ** that we can fit more data into the buffer...
173 */
174 memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
175 ptr->read_ptr= ptr->read_buffer;
176 ptr->read_data_length= ptr->read_buffer_length;
177 }
178
179 /* There is room in the buffer, try to fill it! */
180 if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
181 {
182 do {
183 /* Just try a single read to grab what's available */
184 ssize_t nr= recv(ptr->fd,
185 ptr->read_ptr + ptr->read_data_length,
186 MEMCACHED_MAX_BUFFER - ptr->read_data_length,
187 MSG_DONTWAIT);
188
189 switch (nr)
190 {
191 case SOCKET_ERROR:
192 {
193 switch (get_socket_errno())
194 {
195 case EINTR:
196 continue;
197
198 case EWOULDBLOCK:
199 #ifdef USE_EAGAIN
200 case EAGAIN:
201 #endif
202 #ifdef TARGET_OS_LINUX
203 case ERESTART:
204 #endif
205 break; // No IO is fine, we can just move on
206
207 default:
208 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
209 }
210 }
211 break;
212
213 case 0: // Shutdown on the socket has occurred
214 {
215 memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
216 }
217 break;
218
219 default:
220 {
221 ptr->read_data_length+= size_t(nr);
222 ptr->read_buffer_length+= size_t(nr);
223 return true;
224 }
225 break;
226 }
227 } while (0);
228 }
229 return false;
230 }
231
232 /**
233 * If the we have callbacks connected to this server structure
234 * we may start process the input queue and fire the callbacks
235 * for the incomming messages. This function is _only_ called
236 * when the input buffer is full, so that we _know_ that we have
237 * at least _one_ message to process.
238 *
239 * @param ptr the server to star processing iput messages for
240 * @return true if we processed anything, false otherwise
241 */
242 static bool process_input_buffer(memcached_server_write_instance_st ptr)
243 {
244 /*
245 ** We might be able to process some of the response messages if we
246 ** have a callback set up
247 */
248 if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
249 {
250 /*
251 * We might have responses... try to read them out and fire
252 * callbacks
253 */
254 memcached_callback_st cb= *ptr->root->callbacks;
255
256 memcached_set_processing_input((memcached_st *)ptr->root, true);
257
258 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
259 memcached_return_t error;
260 memcached_st *root= (memcached_st *)ptr->root;
261 error= memcached_response(ptr, buffer, sizeof(buffer),
262 &root->result);
263
264 memcached_set_processing_input(root, false);
265
266 if (error == MEMCACHED_SUCCESS)
267 {
268 for (unsigned int x= 0; x < cb.number_of_callback; x++)
269 {
270 error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
271 if (error != MEMCACHED_SUCCESS)
272 break;
273 }
274
275 /* @todo what should I do with the error message??? */
276 }
277 /* @todo what should I do with other error messages?? */
278 return true;
279 }
280
281 return false;
282 }
283
284 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
285 void *buffer, size_t length, ssize_t *nread)
286 {
287 assert(ptr); // Programmer error
288 char *buffer_ptr= static_cast<char *>(buffer);
289
290 if (ptr->fd == INVALID_SOCKET)
291 {
292 assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
293 return MEMCACHED_CONNECTION_FAILURE;
294 }
295
296 while (length)
297 {
298 if (not ptr->read_buffer_length)
299 {
300 ssize_t data_read;
301 do
302 {
303 data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
304 if (data_read == SOCKET_ERROR)
305 {
306 switch (get_socket_errno())
307 {
308 case EINTR: // We just retry
309 continue;
310
311 case ETIMEDOUT: // OSX
312 case EWOULDBLOCK:
313 #ifdef USE_EAGAIN
314 case EAGAIN:
315 #endif
316 #ifdef TARGET_OS_LINUX
317 case ERESTART:
318 #endif
319 if (memcached_success(io_wait(ptr, MEM_READ)))
320 {
321 continue;
322 }
323 return MEMCACHED_IN_PROGRESS;
324
325 /* fall through */
326
327 case ENOTCONN: // Programmer Error
328 WATCHPOINT_ASSERT(0);
329 case ENOTSOCK:
330 WATCHPOINT_ASSERT(0);
331 case EBADF:
332 assert(ptr->fd != INVALID_SOCKET);
333 case EINVAL:
334 case EFAULT:
335 case ECONNREFUSED:
336 default:
337 {
338 memcached_quit_server(ptr, true);
339 *nread= -1;
340 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
341 }
342 }
343 }
344 else if (data_read == 0)
345 {
346 /*
347 EOF. Any data received so far is incomplete
348 so discard it. This always reads by byte in case of TCP
349 and protocol enforcement happens at memcached_response()
350 looking for '\n'. We do not care for UDB which requests 8 bytes
351 at once. Generally, this means that connection went away. Since
352 for blocking I/O we do not return 0 and for non-blocking case
353 it will return EGAIN if data is not immediatly available.
354 */
355 WATCHPOINT_STRING("We had a zero length recv()");
356 assert(0);
357 memcached_quit_server(ptr, true);
358 *nread= -1;
359 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
360 }
361 } while (data_read <= 0);
362
363 ptr->io_bytes_sent = 0;
364 ptr->read_data_length= (size_t) data_read;
365 ptr->read_buffer_length= (size_t) data_read;
366 ptr->read_ptr= ptr->read_buffer;
367 }
368
369 if (length > 1)
370 {
371 size_t difference;
372
373 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
374
375 memcpy(buffer_ptr, ptr->read_ptr, difference);
376 length -= difference;
377 ptr->read_ptr+= difference;
378 ptr->read_buffer_length-= difference;
379 buffer_ptr+= difference;
380 }
381 else
382 {
383 *buffer_ptr= *ptr->read_ptr;
384 ptr->read_ptr++;
385 ptr->read_buffer_length--;
386 buffer_ptr++;
387 break;
388 }
389 }
390
391 ptr->server_failure_counter= 0;
392 *nread = (ssize_t)(buffer_ptr - (char*)buffer);
393 return MEMCACHED_SUCCESS;
394 }
395
396 memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
397 {
398 assert(ptr); // Programmer error
399
400 if (ptr->fd == INVALID_SOCKET)
401 {
402 assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
403 return MEMCACHED_CONNECTION_FAILURE;
404 }
405
406 ssize_t data_read;
407 char buffer[MEMCACHED_MAX_BUFFER];
408 do
409 {
410 data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
411 if (data_read == SOCKET_ERROR)
412 {
413 switch (get_socket_errno())
414 {
415 case EINTR: // We just retry
416 continue;
417
418 case ETIMEDOUT: // OSX
419 case EWOULDBLOCK:
420 #ifdef USE_EAGAIN
421 case EAGAIN:
422 #endif
423 #ifdef TARGET_OS_LINUX
424 case ERESTART:
425 #endif
426 if (memcached_success(io_wait(ptr, MEM_READ)))
427 {
428 continue;
429 }
430 return MEMCACHED_IN_PROGRESS;
431
432 /* fall through */
433
434 case ENOTCONN: // Programmer Error
435 WATCHPOINT_ASSERT(0);
436 case ENOTSOCK:
437 WATCHPOINT_ASSERT(0);
438 case EBADF:
439 assert(ptr->fd != INVALID_SOCKET);
440 case EINVAL:
441 case EFAULT:
442 case ECONNREFUSED:
443 default:
444 return MEMCACHED_CONNECTION_FAILURE; // We want this!
445 }
446 }
447 } while (data_read > 0);
448
449 return MEMCACHED_CONNECTION_FAILURE;
450 }
451
452 static ssize_t _io_write(memcached_server_write_instance_st ptr,
453 const void *buffer, size_t length, bool with_flush)
454 {
455 size_t original_length;
456 const char* buffer_ptr;
457
458 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
459
460 original_length= length;
461 buffer_ptr= static_cast<const char *>(buffer);
462
463 while (length)
464 {
465 char *write_ptr;
466 size_t should_write;
467 size_t buffer_end;
468
469 if (ptr->type == MEMCACHED_CONNECTION_UDP)
470 {
471 //UDP does not support partial writes
472 buffer_end= MAX_UDP_DATAGRAM_LENGTH;
473 should_write= length;
474 if (ptr->write_buffer_offset + should_write > buffer_end)
475 {
476 return -1;
477 }
478 }
479 else
480 {
481 buffer_end= MEMCACHED_MAX_BUFFER;
482 should_write= buffer_end - ptr->write_buffer_offset;
483 should_write= (should_write < length) ? should_write : length;
484 }
485
486 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
487 memcpy(write_ptr, buffer_ptr, should_write);
488 ptr->write_buffer_offset+= should_write;
489 buffer_ptr+= should_write;
490 length-= should_write;
491
492 if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
493 {
494 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
495
496 memcached_return_t rc;
497 ssize_t sent_length= io_flush(ptr, with_flush, &rc);
498 if (sent_length == -1)
499 {
500 return -1;
501 }
502
503 /* If io_flush calls memcached_purge, sent_length may be 0 */
504 unlikely (sent_length != 0)
505 {
506 WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end);
507 }
508 }
509 }
510
511 if (with_flush)
512 {
513 memcached_return_t rc;
514 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
515 if (io_flush(ptr, with_flush, &rc) == -1)
516 {
517 return -1;
518 }
519 }
520
521 return (ssize_t) original_length;
522 }
523
524 ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
525 const void *buffer, size_t length, bool with_flush)
526 {
527 return _io_write(ptr, buffer, length, with_flush);
528 }
529
530 ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
531 const struct libmemcached_io_vector_st *vector,
532 size_t number_of, bool with_flush)
533 {
534 ssize_t total= 0;
535
536 for (size_t x= 0; x < number_of; x++, vector++)
537 {
538 ssize_t returnable;
539
540 if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
541 {
542 return -1;
543 }
544 total+= returnable;
545 }
546
547 if (with_flush)
548 {
549 if (memcached_io_write(ptr, NULL, 0, true) == -1)
550 {
551 return -1;
552 }
553 }
554
555 return total;
556 }
557
558
559 void memcached_io_close(memcached_server_write_instance_st ptr)
560 {
561 if (ptr->fd == INVALID_SOCKET)
562 {
563 return;
564 }
565
566 /* in case of death shutdown to avoid blocking at close() */
567 if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
568 {
569 WATCHPOINT_NUMBER(ptr->fd);
570 WATCHPOINT_ERRNO(get_socket_errno());
571 WATCHPOINT_ASSERT(get_socket_errno());
572 }
573
574 if (closesocket(ptr->fd) == SOCKET_ERROR)
575 {
576 WATCHPOINT_ERRNO(get_socket_errno());
577 }
578 ptr->state= MEMCACHED_SERVER_STATE_NEW;
579 ptr->fd= INVALID_SOCKET;
580 }
581
582 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
583 {
584 #define MAX_SERVERS_TO_POLL 100
585 struct pollfd fds[MAX_SERVERS_TO_POLL];
586 unsigned int host_index= 0;
587
588 for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
589 {
590 memcached_server_write_instance_st instance=
591 memcached_server_instance_fetch(memc, x);
592
593 if (instance->read_buffer_length > 0) /* I have data in the buffer */
594 return instance;
595
596 if (memcached_server_response_count(instance) > 0)
597 {
598 fds[host_index].events = POLLIN;
599 fds[host_index].revents = 0;
600 fds[host_index].fd = instance->fd;
601 ++host_index;
602 }
603 }
604
605 if (host_index < 2)
606 {
607 /* We have 0 or 1 server with pending events.. */
608 for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
609 {
610 memcached_server_write_instance_st instance=
611 memcached_server_instance_fetch(memc, x);
612
613 if (memcached_server_response_count(instance) > 0)
614 {
615 return instance;
616 }
617 }
618
619 return NULL;
620 }
621
622 switch (poll(fds, host_index, memc->poll_timeout))
623 {
624 case -1:
625 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
626 /* FALLTHROUGH */
627 case 0:
628 break;
629
630 default:
631 for (size_t x= 0; x < host_index; ++x)
632 {
633 if (fds[x].revents & POLLIN)
634 {
635 for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
636 {
637 memcached_server_write_instance_st instance=
638 memcached_server_instance_fetch(memc, y);
639
640 if (instance->fd == fds[x].fd)
641 return instance;
642 }
643 }
644 }
645 }
646
647 return NULL;
648 }
649
650 static ssize_t io_flush(memcached_server_write_instance_st ptr,
651 const bool with_flush,
652 memcached_return_t *error)
653 {
654 /*
655 ** We might want to purge the input buffer if we haven't consumed
656 ** any output yet... The test for the limits is the purge is inline
657 ** in the purge function to avoid duplicating the logic..
658 */
659 {
660 memcached_return_t rc;
661 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
662 rc= memcached_purge(ptr);
663
664 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
665 {
666 return -1;
667 }
668 }
669 ssize_t sent_length;
670 size_t return_length;
671 char *local_write_ptr= ptr->write_buffer;
672 size_t write_length= ptr->write_buffer_offset;
673
674 *error= MEMCACHED_SUCCESS;
675
676 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
677
678 // UDP Sanity check, make sure that we are not sending somthing too big
679 if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
680 {
681 *error= MEMCACHED_WRITE_FAILURE;
682 return -1;
683 }
684
685 if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
686 && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
687 {
688 return 0;
689 }
690
691 /* Looking for memory overflows */
692 #if defined(DEBUG)
693 if (write_length == MEMCACHED_MAX_BUFFER)
694 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
695 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
696 #endif
697
698 return_length= 0;
699 while (write_length)
700 {
701 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
702 WATCHPOINT_ASSERT(write_length > 0);
703 sent_length= 0;
704 if (ptr->type == MEMCACHED_CONNECTION_UDP)
705 increment_udp_message_id(ptr);
706
707 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
708 if (with_flush)
709 {
710 sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
711 }
712 else
713 {
714 sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
715 }
716
717 if (sent_length == SOCKET_ERROR)
718 {
719 ptr->cached_errno= get_socket_errno();
720 #if 0 // @todo I should look at why we hit this bit of code hard frequently
721 WATCHPOINT_ERRNO(get_socket_errno());
722 WATCHPOINT_NUMBER(get_socket_errno());
723 #endif
724 switch (get_socket_errno())
725 {
726 case ENOBUFS:
727 continue;
728 case EWOULDBLOCK:
729 #ifdef USE_EAGAIN
730 case EAGAIN:
731 #endif
732 {
733 /*
734 * We may be blocked on write because the input buffer
735 * is full. Let's check if we have room in our input
736 * buffer for more data and retry the write before
737 * waiting..
738 */
739 if (repack_input_buffer(ptr) or
740 process_input_buffer(ptr))
741 {
742 continue;
743 }
744
745 memcached_return_t rc= io_wait(ptr, MEM_WRITE);
746 if (memcached_success(rc))
747 {
748 continue;
749 }
750 else if (rc == MEMCACHED_TIMEOUT)
751 {
752 *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
753 return -1;
754 }
755
756 memcached_quit_server(ptr, true);
757 *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
758 return -1;
759 }
760 case ENOTCONN:
761 case EPIPE:
762 default:
763 memcached_quit_server(ptr, true);
764 *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
765 WATCHPOINT_ASSERT(ptr->fd == -1);
766 return -1;
767 }
768 }
769
770 if (ptr->type == MEMCACHED_CONNECTION_UDP and
771 (size_t)sent_length != write_length)
772 {
773 memcached_quit_server(ptr, true);
774 *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
775 return -1;
776 }
777
778 ptr->io_bytes_sent += (uint32_t) sent_length;
779
780 local_write_ptr+= sent_length;
781 write_length-= (uint32_t) sent_length;
782 return_length+= (uint32_t) sent_length;
783 }
784
785 WATCHPOINT_ASSERT(write_length == 0);
786 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
787 // ptr->write_buffer_offset);
788
789 // if we are a udp server, the begining of the buffer is reserverd for
790 // the upd frame header
791 if (ptr->type == MEMCACHED_CONNECTION_UDP)
792 ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
793 else
794 ptr->write_buffer_offset= 0;
795
796 return (ssize_t) return_length;
797 }
798
799 /*
800 Eventually we will just kill off the server with the problem.
801 */
802 void memcached_io_reset(memcached_server_write_instance_st ptr)
803 {
804 memcached_quit_server(ptr, true);
805 }
806
807 /**
808 * Read a given number of bytes from the server and place it into a specific
809 * buffer. Reset the IO channel on this server if an error occurs.
810 */
811 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
812 void *dta,
813 size_t size)
814 {
815 size_t offset= 0;
816 char *data= static_cast<char *>(dta);
817
818 while (offset < size)
819 {
820 ssize_t nread;
821 memcached_return_t rc;
822
823 while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
824
825 if (memcached_failed(rc))
826 {
827 return rc;
828 }
829
830 offset+= (size_t) nread;
831 }
832
833 return MEMCACHED_SUCCESS;
834 }
835
836 memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
837 char *buffer_ptr,
838 size_t size)
839 {
840 bool line_complete= false;
841 size_t total_nr= 0;
842
843 while (not line_complete)
844 {
845 if (ptr->read_buffer_length == 0)
846 {
847 /*
848 * We don't have any data in the buffer, so let's fill the read
849 * buffer. Call the standard read function to avoid duplicating
850 * the logic.
851 */
852 ssize_t nread;
853 memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
854 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
855 {
856 memcached_quit_server(ptr, true);
857 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
858 }
859 else if (memcached_failed(rc))
860 {
861 return rc;
862 }
863
864 if (*buffer_ptr == '\n')
865 line_complete= true;
866
867 ++buffer_ptr;
868 ++total_nr;
869 }
870
871 /* Now let's look in the buffer and copy as we go! */
872 while (ptr->read_buffer_length && total_nr < size && !line_complete)
873 {
874 *buffer_ptr = *ptr->read_ptr;
875 if (*buffer_ptr == '\n')
876 line_complete = true;
877 --ptr->read_buffer_length;
878 ++ptr->read_ptr;
879 ++total_nr;
880 ++buffer_ptr;
881 }
882
883 if (total_nr == size)
884 return MEMCACHED_PROTOCOL_ERROR;
885 }
886
887 return MEMCACHED_SUCCESS;
888 }
889
890 /*
891 * The udp request id consists of two seperate sections
892 * 1) The thread id
893 * 2) The message number
894 * The thread id should only be set when the memcached_st struct is created
895 * and should not be changed.
896 *
897 * The message num is incremented for each new message we send, this function
898 * extracts the message number from message_id, increments it and then
899 * writes the new value back into the header
900 */
901 static void increment_udp_message_id(memcached_server_write_instance_st ptr)
902 {
903 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
904 uint16_t cur_req= get_udp_datagram_request_id(header);
905 int msg_num= get_msg_num_from_request_id(cur_req);
906 int thread_id= get_thread_id_from_request_id(cur_req);
907
908 if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
909 msg_num= 0;
910
911 header->request_id= htons((uint16_t) (thread_id | msg_num));
912 }
913
914 memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
915 {
916 if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
917 return MEMCACHED_FAILURE;
918
919 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
920 header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
921 header->num_datagrams= htons(1);
922 header->sequence_number= htons(0);
923
924 return MEMCACHED_SUCCESS;
925 }