Updating test framework for startup/shutdown of memcached.
[m6w6/libmemcached] / libmemcached / io.cc
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * LibMemcached
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2006-2009 Brian Aker
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40 #include <libmemcached/common.h>
41 #include <cassert>
42
43 enum memc_read_or_write {
44 MEM_READ,
45 MEM_WRITE
46 };
47
48 static ssize_t io_flush(memcached_server_write_instance_st ptr,
49 const bool with_flush,
50 memcached_return_t *error);
51 static void increment_udp_message_id(memcached_server_write_instance_st ptr);
52
53 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
54 memc_read_or_write read_or_write)
55 {
56 struct pollfd fds;
57 fds.fd= ptr->fd;
58 fds.events= POLLIN;
59
60 if (read_or_write == MEM_WRITE) /* write */
61 {
62 fds.events= POLLOUT;
63 WATCHPOINT_SET(ptr->io_wait_count.write++);
64 }
65 else
66 {
67 WATCHPOINT_SET(ptr->io_wait_count.read++);
68 }
69
70 /*
71 ** We are going to block on write, but at least on Solaris we might block
72 ** on write if we haven't read anything from our input buffer..
73 ** Try to purge the input buffer if we don't do any flow control in the
74 ** application layer (just sending a lot of data etc)
75 ** The test is moved down in the purge function to avoid duplication of
76 ** the test.
77 */
78 if (read_or_write == MEM_WRITE)
79 {
80 memcached_return_t rc= memcached_purge(ptr);
81 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
82 {
83 return MEMCACHED_FAILURE;
84 }
85 }
86
87 if (ptr->root->poll_timeout == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
88 {
89 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
90 }
91
92 size_t loop_max= 5;
93 while (--loop_max) // While loop is for ERESTART or EINTR
94 {
95
96 int error= poll(&fds, 1, ptr->root->poll_timeout);
97 switch (error)
98 {
99 case 1: // Success!
100 WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
101 WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
102
103 return MEMCACHED_SUCCESS;
104
105 case 0: // Timeout occured, we let the while() loop do its thing.
106 return memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
107
108 default:
109 WATCHPOINT_ERRNO(get_socket_errno());
110 switch (get_socket_errno())
111 {
112 #ifdef TARGET_OS_LINUX
113 case ERESTART:
114 #endif
115 case EINTR:
116 break;
117
118 case EFAULT:
119 case ENOMEM:
120 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
121
122 case EINVAL:
123 return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
124
125 default:
126 if (fds.revents & POLLERR)
127 {
128 int err;
129 socklen_t len= sizeof (err);
130 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
131 memcached_set_errno(*ptr, (err == 0) ? get_socket_errno() : err, MEMCACHED_AT);
132 }
133 else
134 {
135 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
136 }
137 memcached_quit_server(ptr, true);
138
139 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
140 }
141 }
142 }
143
144 memcached_quit_server(ptr, true);
145
146 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
147 }
148
149 memcached_return_t memcached_io_wait_for_write(memcached_server_write_instance_st ptr)
150 {
151 return io_wait(ptr, MEM_WRITE);
152 }
153
154 /**
155 * Try to fill the input buffer for a server with as much
156 * data as possible.
157 *
158 * @param ptr the server to pack
159 */
160 static bool repack_input_buffer(memcached_server_write_instance_st ptr)
161 {
162 if (ptr->read_ptr != ptr->read_buffer)
163 {
164 /* Move all of the data to the beginning of the buffer so
165 ** that we can fit more data into the buffer...
166 */
167 memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
168 ptr->read_ptr= ptr->read_buffer;
169 ptr->read_data_length= ptr->read_buffer_length;
170 }
171
172 /* There is room in the buffer, try to fill it! */
173 if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
174 {
175 do {
176 /* Just try a single read to grab what's available */
177 ssize_t nr= recv(ptr->fd,
178 ptr->read_ptr + ptr->read_data_length,
179 MEMCACHED_MAX_BUFFER - ptr->read_data_length,
180 MSG_DONTWAIT);
181
182 switch (nr)
183 {
184 case SOCKET_ERROR:
185 {
186 switch (get_socket_errno())
187 {
188 case EINTR:
189 continue;
190
191 case EWOULDBLOCK:
192 #ifdef USE_EAGAIN
193 case EAGAIN:
194 #endif
195 #ifdef TARGET_OS_LINUX
196 case ERESTART:
197 #endif
198 break; // No IO is fine, we can just move on
199
200 default:
201 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
202 }
203 }
204 break;
205
206 case 0: // Shutdown on the socket has occurred
207 {
208 memcached_set_error(*ptr, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
209 }
210 break;
211
212 default:
213 {
214 ptr->read_data_length+= size_t(nr);
215 ptr->read_buffer_length+= size_t(nr);
216 return true;
217 }
218 break;
219 }
220 } while (0);
221 }
222 return false;
223 }
224
225 /**
226 * If the we have callbacks connected to this server structure
227 * we may start process the input queue and fire the callbacks
228 * for the incomming messages. This function is _only_ called
229 * when the input buffer is full, so that we _know_ that we have
230 * at least _one_ message to process.
231 *
232 * @param ptr the server to star processing iput messages for
233 * @return true if we processed anything, false otherwise
234 */
235 static bool process_input_buffer(memcached_server_write_instance_st ptr)
236 {
237 /*
238 ** We might be able to process some of the response messages if we
239 ** have a callback set up
240 */
241 if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
242 {
243 /*
244 * We might have responses... try to read them out and fire
245 * callbacks
246 */
247 memcached_callback_st cb= *ptr->root->callbacks;
248
249 memcached_set_processing_input((memcached_st *)ptr->root, true);
250
251 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
252 memcached_return_t error;
253 memcached_st *root= (memcached_st *)ptr->root;
254 error= memcached_response(ptr, buffer, sizeof(buffer),
255 &root->result);
256
257 memcached_set_processing_input(root, false);
258
259 if (error == MEMCACHED_SUCCESS)
260 {
261 for (unsigned int x= 0; x < cb.number_of_callback; x++)
262 {
263 error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
264 if (error != MEMCACHED_SUCCESS)
265 break;
266 }
267
268 /* @todo what should I do with the error message??? */
269 }
270 /* @todo what should I do with other error messages?? */
271 return true;
272 }
273
274 return false;
275 }
276
277 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
278 void *buffer, size_t length, ssize_t *nread)
279 {
280 assert(ptr); // Programmer error
281 char *buffer_ptr= static_cast<char *>(buffer);
282
283 if (ptr->fd == INVALID_SOCKET)
284 {
285 assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
286 return MEMCACHED_CONNECTION_FAILURE;
287 }
288
289 while (length)
290 {
291 if (not ptr->read_buffer_length)
292 {
293 ssize_t data_read;
294 do
295 {
296 data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, MSG_DONTWAIT);
297 if (data_read == SOCKET_ERROR)
298 {
299 switch (get_socket_errno())
300 {
301 case EINTR: // We just retry
302 continue;
303
304 case ETIMEDOUT: // OSX
305 case EWOULDBLOCK:
306 #ifdef USE_EAGAIN
307 case EAGAIN:
308 #endif
309 #ifdef TARGET_OS_LINUX
310 case ERESTART:
311 #endif
312 if (memcached_success(io_wait(ptr, MEM_READ)))
313 {
314 continue;
315 }
316 return MEMCACHED_IN_PROGRESS;
317
318 /* fall through */
319
320 case ENOTCONN: // Programmer Error
321 WATCHPOINT_ASSERT(0);
322 case ENOTSOCK:
323 WATCHPOINT_ASSERT(0);
324 case EBADF:
325 assert(ptr->fd != INVALID_SOCKET);
326 case EINVAL:
327 case EFAULT:
328 case ECONNREFUSED:
329 default:
330 {
331 memcached_quit_server(ptr, true);
332 *nread= -1;
333 return memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
334 }
335 }
336 }
337 else if (data_read == 0)
338 {
339 /*
340 EOF. Any data received so far is incomplete
341 so discard it. This always reads by byte in case of TCP
342 and protocol enforcement happens at memcached_response()
343 looking for '\n'. We do not care for UDB which requests 8 bytes
344 at once. Generally, this means that connection went away. Since
345 for blocking I/O we do not return 0 and for non-blocking case
346 it will return EGAIN if data is not immediatly available.
347 */
348 WATCHPOINT_STRING("We had a zero length recv()");
349 assert(0);
350 memcached_quit_server(ptr, true);
351 *nread= -1;
352 return memcached_set_error(*ptr, MEMCACHED_UNKNOWN_READ_FAILURE, MEMCACHED_AT);
353 }
354 } while (data_read <= 0);
355
356 ptr->io_bytes_sent = 0;
357 ptr->read_data_length= (size_t) data_read;
358 ptr->read_buffer_length= (size_t) data_read;
359 ptr->read_ptr= ptr->read_buffer;
360 }
361
362 if (length > 1)
363 {
364 size_t difference;
365
366 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
367
368 memcpy(buffer_ptr, ptr->read_ptr, difference);
369 length -= difference;
370 ptr->read_ptr+= difference;
371 ptr->read_buffer_length-= difference;
372 buffer_ptr+= difference;
373 }
374 else
375 {
376 *buffer_ptr= *ptr->read_ptr;
377 ptr->read_ptr++;
378 ptr->read_buffer_length--;
379 buffer_ptr++;
380 break;
381 }
382 }
383
384 ptr->server_failure_counter= 0;
385 *nread = (ssize_t)(buffer_ptr - (char*)buffer);
386 return MEMCACHED_SUCCESS;
387 }
388
389 memcached_return_t memcached_io_slurp(memcached_server_write_instance_st ptr)
390 {
391 assert(ptr); // Programmer error
392
393 if (ptr->fd == INVALID_SOCKET)
394 {
395 assert(int(ptr->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO));
396 return MEMCACHED_CONNECTION_FAILURE;
397 }
398
399 ssize_t data_read;
400 char buffer[MEMCACHED_MAX_BUFFER];
401 do
402 {
403 data_read= recv(ptr->fd, ptr->read_buffer, sizeof(buffer), MSG_DONTWAIT);
404 if (data_read == SOCKET_ERROR)
405 {
406 switch (get_socket_errno())
407 {
408 case EINTR: // We just retry
409 continue;
410
411 case ETIMEDOUT: // OSX
412 case EWOULDBLOCK:
413 #ifdef USE_EAGAIN
414 case EAGAIN:
415 #endif
416 #ifdef TARGET_OS_LINUX
417 case ERESTART:
418 #endif
419 if (memcached_success(io_wait(ptr, MEM_READ)))
420 {
421 continue;
422 }
423 return MEMCACHED_IN_PROGRESS;
424
425 /* fall through */
426
427 case ENOTCONN: // Programmer Error
428 WATCHPOINT_ASSERT(0);
429 case ENOTSOCK:
430 WATCHPOINT_ASSERT(0);
431 case EBADF:
432 assert(ptr->fd != INVALID_SOCKET);
433 case EINVAL:
434 case EFAULT:
435 case ECONNREFUSED:
436 default:
437 return MEMCACHED_CONNECTION_FAILURE; // We want this!
438 }
439 }
440 } while (data_read > 0);
441
442 return MEMCACHED_CONNECTION_FAILURE;
443 }
444
445 static ssize_t _io_write(memcached_server_write_instance_st ptr,
446 const void *buffer, size_t length, bool with_flush)
447 {
448 size_t original_length;
449 const char* buffer_ptr;
450
451 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
452
453 original_length= length;
454 buffer_ptr= static_cast<const char *>(buffer);
455
456 while (length)
457 {
458 char *write_ptr;
459 size_t should_write;
460 size_t buffer_end;
461
462 if (ptr->type == MEMCACHED_CONNECTION_UDP)
463 {
464 //UDP does not support partial writes
465 buffer_end= MAX_UDP_DATAGRAM_LENGTH;
466 should_write= length;
467 if (ptr->write_buffer_offset + should_write > buffer_end)
468 {
469 return -1;
470 }
471 }
472 else
473 {
474 buffer_end= MEMCACHED_MAX_BUFFER;
475 should_write= buffer_end - ptr->write_buffer_offset;
476 should_write= (should_write < length) ? should_write : length;
477 }
478
479 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
480 memcpy(write_ptr, buffer_ptr, should_write);
481 ptr->write_buffer_offset+= should_write;
482 buffer_ptr+= should_write;
483 length-= should_write;
484
485 if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
486 {
487 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
488
489 memcached_return_t rc;
490 ssize_t sent_length= io_flush(ptr, with_flush, &rc);
491 if (sent_length == -1)
492 {
493 return -1;
494 }
495
496 /* If io_flush calls memcached_purge, sent_length may be 0 */
497 unlikely (sent_length != 0)
498 {
499 WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end);
500 }
501 }
502 }
503
504 if (with_flush)
505 {
506 memcached_return_t rc;
507 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
508 if (io_flush(ptr, with_flush, &rc) == -1)
509 {
510 return -1;
511 }
512 }
513
514 return (ssize_t) original_length;
515 }
516
517 ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
518 const void *buffer, size_t length, bool with_flush)
519 {
520 return _io_write(ptr, buffer, length, with_flush);
521 }
522
523 ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
524 const struct libmemcached_io_vector_st *vector,
525 size_t number_of, bool with_flush)
526 {
527 ssize_t total= 0;
528
529 for (size_t x= 0; x < number_of; x++, vector++)
530 {
531 ssize_t returnable;
532
533 if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
534 {
535 return -1;
536 }
537 total+= returnable;
538 }
539
540 if (with_flush)
541 {
542 if (memcached_io_write(ptr, NULL, 0, true) == -1)
543 {
544 return -1;
545 }
546 }
547
548 return total;
549 }
550
551
552 void memcached_io_close(memcached_server_write_instance_st ptr)
553 {
554 if (ptr->fd == INVALID_SOCKET)
555 {
556 return;
557 }
558
559 /* in case of death shutdown to avoid blocking at close() */
560 if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
561 {
562 WATCHPOINT_NUMBER(ptr->fd);
563 WATCHPOINT_ERRNO(get_socket_errno());
564 WATCHPOINT_ASSERT(get_socket_errno());
565 }
566
567 if (closesocket(ptr->fd) == SOCKET_ERROR)
568 {
569 WATCHPOINT_ERRNO(get_socket_errno());
570 }
571 ptr->state= MEMCACHED_SERVER_STATE_NEW;
572 ptr->fd= INVALID_SOCKET;
573 }
574
575 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
576 {
577 #define MAX_SERVERS_TO_POLL 100
578 struct pollfd fds[MAX_SERVERS_TO_POLL];
579 unsigned int host_index= 0;
580
581 for (uint32_t x= 0; x < memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL; ++x)
582 {
583 memcached_server_write_instance_st instance=
584 memcached_server_instance_fetch(memc, x);
585
586 if (instance->read_buffer_length > 0) /* I have data in the buffer */
587 return instance;
588
589 if (memcached_server_response_count(instance) > 0)
590 {
591 fds[host_index].events = POLLIN;
592 fds[host_index].revents = 0;
593 fds[host_index].fd = instance->fd;
594 ++host_index;
595 }
596 }
597
598 if (host_index < 2)
599 {
600 /* We have 0 or 1 server with pending events.. */
601 for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
602 {
603 memcached_server_write_instance_st instance=
604 memcached_server_instance_fetch(memc, x);
605
606 if (memcached_server_response_count(instance) > 0)
607 {
608 return instance;
609 }
610 }
611
612 return NULL;
613 }
614
615 int error= poll(fds, host_index, memc->poll_timeout);
616 switch (error)
617 {
618 case -1:
619 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
620 /* FALLTHROUGH */
621 case 0:
622 break;
623
624 default:
625 for (size_t x= 0; x < host_index; ++x)
626 {
627 if (fds[x].revents & POLLIN)
628 {
629 for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
630 {
631 memcached_server_write_instance_st instance=
632 memcached_server_instance_fetch(memc, y);
633
634 if (instance->fd == fds[x].fd)
635 return instance;
636 }
637 }
638 }
639 }
640
641 return NULL;
642 }
643
644 static ssize_t io_flush(memcached_server_write_instance_st ptr,
645 const bool with_flush,
646 memcached_return_t *error)
647 {
648 /*
649 ** We might want to purge the input buffer if we haven't consumed
650 ** any output yet... The test for the limits is the purge is inline
651 ** in the purge function to avoid duplicating the logic..
652 */
653 {
654 memcached_return_t rc;
655 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
656 rc= memcached_purge(ptr);
657
658 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
659 {
660 return -1;
661 }
662 }
663 ssize_t sent_length;
664 size_t return_length;
665 char *local_write_ptr= ptr->write_buffer;
666 size_t write_length= ptr->write_buffer_offset;
667
668 *error= MEMCACHED_SUCCESS;
669
670 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
671
672 // UDP Sanity check, make sure that we are not sending somthing too big
673 if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
674 {
675 *error= MEMCACHED_WRITE_FAILURE;
676 return -1;
677 }
678
679 if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
680 && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
681 {
682 return 0;
683 }
684
685 /* Looking for memory overflows */
686 #if defined(DEBUG)
687 if (write_length == MEMCACHED_MAX_BUFFER)
688 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
689 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
690 #endif
691
692 return_length= 0;
693 while (write_length)
694 {
695 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
696 WATCHPOINT_ASSERT(write_length > 0);
697 sent_length= 0;
698 if (ptr->type == MEMCACHED_CONNECTION_UDP)
699 increment_udp_message_id(ptr);
700
701 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
702 if (with_flush)
703 {
704 sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT);
705 }
706 else
707 {
708 sent_length= send(ptr->fd, local_write_ptr, write_length, MSG_NOSIGNAL|MSG_DONTWAIT|MSG_MORE);
709 }
710
711 if (sent_length == SOCKET_ERROR)
712 {
713 memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
714 #if 0 // @todo I should look at why we hit this bit of code hard frequently
715 WATCHPOINT_ERRNO(get_socket_errno());
716 WATCHPOINT_NUMBER(get_socket_errno());
717 #endif
718 switch (get_socket_errno())
719 {
720 case ENOBUFS:
721 continue;
722 case EWOULDBLOCK:
723 #ifdef USE_EAGAIN
724 case EAGAIN:
725 #endif
726 {
727 /*
728 * We may be blocked on write because the input buffer
729 * is full. Let's check if we have room in our input
730 * buffer for more data and retry the write before
731 * waiting..
732 */
733 if (repack_input_buffer(ptr) or
734 process_input_buffer(ptr))
735 {
736 continue;
737 }
738
739 memcached_return_t rc= io_wait(ptr, MEM_WRITE);
740 if (memcached_success(rc))
741 {
742 continue;
743 }
744 else if (rc == MEMCACHED_TIMEOUT)
745 {
746 *error= memcached_set_error(*ptr, MEMCACHED_TIMEOUT, MEMCACHED_AT);
747 return -1;
748 }
749
750 memcached_quit_server(ptr, true);
751 *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
752 return -1;
753 }
754 case ENOTCONN:
755 case EPIPE:
756 default:
757 memcached_quit_server(ptr, true);
758 *error= memcached_set_errno(*ptr, get_socket_errno(), MEMCACHED_AT);
759 WATCHPOINT_ASSERT(ptr->fd == -1);
760 return -1;
761 }
762 }
763
764 if (ptr->type == MEMCACHED_CONNECTION_UDP and
765 (size_t)sent_length != write_length)
766 {
767 memcached_quit_server(ptr, true);
768 *error= memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
769 return -1;
770 }
771
772 ptr->io_bytes_sent += (uint32_t) sent_length;
773
774 local_write_ptr+= sent_length;
775 write_length-= (uint32_t) sent_length;
776 return_length+= (uint32_t) sent_length;
777 }
778
779 WATCHPOINT_ASSERT(write_length == 0);
780 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
781 // ptr->write_buffer_offset);
782
783 // if we are a udp server, the begining of the buffer is reserverd for
784 // the upd frame header
785 if (ptr->type == MEMCACHED_CONNECTION_UDP)
786 ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
787 else
788 ptr->write_buffer_offset= 0;
789
790 return (ssize_t) return_length;
791 }
792
793 /*
794 Eventually we will just kill off the server with the problem.
795 */
796 void memcached_io_reset(memcached_server_write_instance_st ptr)
797 {
798 memcached_quit_server(ptr, true);
799 }
800
801 /**
802 * Read a given number of bytes from the server and place it into a specific
803 * buffer. Reset the IO channel on this server if an error occurs.
804 */
805 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
806 void *dta,
807 size_t size)
808 {
809 size_t offset= 0;
810 char *data= static_cast<char *>(dta);
811
812 while (offset < size)
813 {
814 ssize_t nread;
815 memcached_return_t rc;
816
817 while (memcached_continue(rc= memcached_io_read(ptr, data + offset, size - offset, &nread))) { };
818
819 if (memcached_failed(rc))
820 {
821 return rc;
822 }
823
824 offset+= (size_t) nread;
825 }
826
827 return MEMCACHED_SUCCESS;
828 }
829
830 memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
831 char *buffer_ptr,
832 size_t size)
833 {
834 bool line_complete= false;
835 size_t total_nr= 0;
836
837 while (not line_complete)
838 {
839 if (ptr->read_buffer_length == 0)
840 {
841 /*
842 * We don't have any data in the buffer, so let's fill the read
843 * buffer. Call the standard read function to avoid duplicating
844 * the logic.
845 */
846 ssize_t nread;
847 memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
848 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS)
849 {
850 memcached_quit_server(ptr, true);
851 return memcached_set_error(*ptr, rc, MEMCACHED_AT);
852 }
853 else if (memcached_failed(rc))
854 {
855 return rc;
856 }
857
858 if (*buffer_ptr == '\n')
859 line_complete= true;
860
861 ++buffer_ptr;
862 ++total_nr;
863 }
864
865 /* Now let's look in the buffer and copy as we go! */
866 while (ptr->read_buffer_length && total_nr < size && !line_complete)
867 {
868 *buffer_ptr = *ptr->read_ptr;
869 if (*buffer_ptr == '\n')
870 line_complete = true;
871 --ptr->read_buffer_length;
872 ++ptr->read_ptr;
873 ++total_nr;
874 ++buffer_ptr;
875 }
876
877 if (total_nr == size)
878 return MEMCACHED_PROTOCOL_ERROR;
879 }
880
881 return MEMCACHED_SUCCESS;
882 }
883
884 /*
885 * The udp request id consists of two seperate sections
886 * 1) The thread id
887 * 2) The message number
888 * The thread id should only be set when the memcached_st struct is created
889 * and should not be changed.
890 *
891 * The message num is incremented for each new message we send, this function
892 * extracts the message number from message_id, increments it and then
893 * writes the new value back into the header
894 */
895 static void increment_udp_message_id(memcached_server_write_instance_st ptr)
896 {
897 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
898 uint16_t cur_req= get_udp_datagram_request_id(header);
899 int msg_num= get_msg_num_from_request_id(cur_req);
900 int thread_id= get_thread_id_from_request_id(cur_req);
901
902 if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
903 msg_num= 0;
904
905 header->request_id= htons((uint16_t) (thread_id | msg_num));
906 }
907
908 memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
909 {
910 if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
911 return MEMCACHED_FAILURE;
912
913 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
914 header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
915 header->num_datagrams= htons(1);
916 header->sequence_number= htons(0);
917
918 return MEMCACHED_SUCCESS;
919 }