Better reporting on errors from mget.
[awesomized/libmemcached] / libmemcached / io.c
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Server IO, Not public!
9 *
10 */
11
12
13 #include "common.h"
14
15 #include <assert.h>
16
17 typedef enum {
18 MEM_READ,
19 MEM_WRITE
20 } memc_read_or_write;
21
22 static ssize_t io_flush(memcached_server_write_instance_st ptr,
23 memcached_return_t *error);
24 static void increment_udp_message_id(memcached_server_write_instance_st ptr);
25
26 static memcached_return_t io_wait(memcached_server_write_instance_st ptr,
27 memc_read_or_write read_or_write)
28 {
29 struct pollfd fds= {
30 .fd= ptr->fd,
31 .events = POLLIN
32 };
33 int error;
34
35 if (read_or_write == MEM_WRITE) /* write */
36 {
37 fds.events= POLLOUT;
38 WATCHPOINT_SET(ptr->io_wait_count.write++);
39 }
40 else
41 {
42 WATCHPOINT_SET(ptr->io_wait_count.read++);
43 }
44
45 /*
46 ** We are going to block on write, but at least on Solaris we might block
47 ** on write if we haven't read anything from our input buffer..
48 ** Try to purge the input buffer if we don't do any flow control in the
49 ** application layer (just sending a lot of data etc)
50 ** The test is moved down in the purge function to avoid duplication of
51 ** the test.
52 */
53 if (read_or_write == MEM_WRITE)
54 {
55 memcached_return_t rc= memcached_purge(ptr);
56 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
57 return MEMCACHED_FAILURE;
58 }
59
60 size_t loop_max= 5;
61 while (--loop_max) // While loop is for ERESTART or EINTR
62 {
63 error= poll(&fds, 1, ptr->root->poll_timeout);
64
65 switch (error)
66 {
67 case 1: // Success!
68 WATCHPOINT_IF_LABELED_NUMBER(read_or_write && loop_max < 4, "read() times we had to loop, decremented down from 5", loop_max);
69 WATCHPOINT_IF_LABELED_NUMBER(!read_or_write && loop_max < 4, "write() times we had to loop, decremented down from 5", loop_max);
70
71 return MEMCACHED_SUCCESS;
72 case 0: // Timeout occured, we let the while() loop do its thing.
73 return MEMCACHED_TIMEOUT;
74 default:
75 WATCHPOINT_ERRNO(get_socket_errno());
76 switch (get_socket_errno())
77 {
78 #ifdef TARGET_OS_LINUX
79 case ERESTART:
80 #endif
81 case EINTR:
82 break;
83 default:
84 if (fds.revents & POLLERR)
85 {
86 int err;
87 socklen_t len= sizeof (err);
88 (void)getsockopt(ptr->fd, SOL_SOCKET, SO_ERROR, &err, &len);
89 ptr->cached_errno= (err == 0) ? get_socket_errno() : err;
90 }
91 else
92 {
93 ptr->cached_errno= get_socket_errno();
94 }
95 memcached_quit_server(ptr, true);
96
97 return MEMCACHED_FAILURE;
98 }
99 }
100 }
101
102 /* Imposssible for anything other then -1 */
103 WATCHPOINT_ASSERT(error == -1);
104 ptr->cached_errno= get_socket_errno();
105 memcached_quit_server(ptr, true);
106
107 return MEMCACHED_FAILURE;
108 }
109
110 /**
111 * Try to fill the input buffer for a server with as much
112 * data as possible.
113 *
114 * @param ptr the server to pack
115 */
116 static bool repack_input_buffer(memcached_server_write_instance_st ptr)
117 {
118 if (ptr->read_ptr != ptr->read_buffer)
119 {
120 /* Move all of the data to the beginning of the buffer so
121 ** that we can fit more data into the buffer...
122 */
123 memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
124 ptr->read_ptr= ptr->read_buffer;
125 ptr->read_data_length= ptr->read_buffer_length;
126 }
127
128 /* There is room in the buffer, try to fill it! */
129 if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
130 {
131 /* Just try a single read to grab what's available */
132 ssize_t nr= recv(ptr->fd,
133 ptr->read_ptr + ptr->read_data_length,
134 MEMCACHED_MAX_BUFFER - ptr->read_data_length,
135 0);
136
137 if (nr > 0)
138 {
139 ptr->read_data_length+= (size_t)nr;
140 ptr->read_buffer_length+= (size_t)nr;
141 return true;
142 }
143 }
144 return false;
145 }
146
147 /**
148 * If the we have callbacks connected to this server structure
149 * we may start process the input queue and fire the callbacks
150 * for the incomming messages. This function is _only_ called
151 * when the input buffer is full, so that we _know_ that we have
152 * at least _one_ message to process.
153 *
154 * @param ptr the server to star processing iput messages for
155 * @return true if we processed anything, false otherwise
156 */
157 static bool process_input_buffer(memcached_server_write_instance_st ptr)
158 {
159 /*
160 ** We might be able to process some of the response messages if we
161 ** have a callback set up
162 */
163 if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
164 {
165 /*
166 * We might have responses... try to read them out and fire
167 * callbacks
168 */
169 memcached_callback_st cb= *ptr->root->callbacks;
170
171 memcached_set_processing_input((memcached_st *)ptr->root, true);
172
173 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
174 memcached_return_t error;
175 memcached_st *root= (memcached_st *)ptr->root;
176 error= memcached_response(ptr, buffer, sizeof(buffer),
177 &root->result);
178
179 memcached_set_processing_input(root, false);
180
181 if (error == MEMCACHED_SUCCESS)
182 {
183 for (unsigned int x= 0; x < cb.number_of_callback; x++)
184 {
185 error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
186 if (error != MEMCACHED_SUCCESS)
187 break;
188 }
189
190 /* @todo what should I do with the error message??? */
191 }
192 /* @todo what should I do with other error messages?? */
193 return true;
194 }
195
196 return false;
197 }
198
199 static inline void memcached_io_cork_push(memcached_server_st *ptr)
200 {
201 (void)ptr;
202 #ifdef CORK
203 if (ptr->root->flags.cork == false || ptr->state.is_corked)
204 return;
205
206 int enable= 1;
207 int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
208 &enable, (socklen_t)sizeof(int));
209 if (! err)
210 ptr->state.is_corked= true;
211
212 WATCHPOINT_ASSERT(ptr->state.is_corked == true);
213 #endif
214 }
215
216 static inline void memcached_io_cork_pop(memcached_server_st *ptr)
217 {
218 (void)ptr;
219 #ifdef CORK
220 if (ptr->root->flags.cork == false || ptr->state.is_corked == false)
221 return;
222
223 int enable= 0;
224 int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
225 &enable, (socklen_t)sizeof(int));
226 if (! err)
227 ptr->state.is_corked= false;
228
229 WATCHPOINT_ASSERT(ptr->state.is_corked == false);
230 #endif
231 }
232
233 #if 0 // Dead code, this should be removed.
234 void memcached_io_preread(memcached_st *ptr)
235 {
236 unsigned int x;
237
238 return;
239
240 for (x= 0; x < memcached_server_count(ptr); x++)
241 {
242 if (memcached_server_response_count(ptr, x) &&
243 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
244 {
245 size_t data_read;
246
247 data_read= recv(ptr->hosts[x].fd,
248 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
249 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length, 0);
250 if (data_read == SOCKET_ERROR)
251 continue;
252
253 ptr->hosts[x].read_buffer_length+= data_read;
254 ptr->hosts[x].read_data_length+= data_read;
255 }
256 }
257 }
258 #endif
259
260 memcached_return_t memcached_io_read(memcached_server_write_instance_st ptr,
261 void *buffer, size_t length, ssize_t *nread)
262 {
263 char *buffer_ptr;
264
265 buffer_ptr= buffer;
266
267 while (length)
268 {
269 if (!ptr->read_buffer_length)
270 {
271 ssize_t data_read;
272
273 while (1)
274 {
275 data_read= recv(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER, 0);
276 if (data_read > 0)
277 {
278 break;
279 }
280 else if (data_read == SOCKET_ERROR)
281 {
282 ptr->cached_errno= get_socket_errno();
283 memcached_return_t rc= MEMCACHED_ERRNO;
284 switch (get_socket_errno())
285 {
286 case EWOULDBLOCK:
287 #ifdef USE_EAGAIN
288 case EAGAIN:
289 #endif
290 case EINTR:
291 #ifdef TARGET_OS_LINUX
292 case ERESTART:
293 #endif
294 if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
295 continue;
296 /* fall through */
297
298 default:
299 {
300 memcached_quit_server(ptr, true);
301 *nread= -1;
302 return rc;
303 }
304 }
305 }
306 else
307 {
308 /*
309 EOF. Any data received so far is incomplete
310 so discard it. This always reads by byte in case of TCP
311 and protocol enforcement happens at memcached_response()
312 looking for '\n'. We do not care for UDB which requests 8 bytes
313 at once. Generally, this means that connection went away. Since
314 for blocking I/O we do not return 0 and for non-blocking case
315 it will return EGAIN if data is not immediatly available.
316 */
317 WATCHPOINT_STRING("We had a zero length recv()");
318 memcached_quit_server(ptr, true);
319 *nread= -1;
320 return MEMCACHED_UNKNOWN_READ_FAILURE;
321 }
322 }
323
324 ptr->io_bytes_sent = 0;
325 ptr->read_data_length= (size_t) data_read;
326 ptr->read_buffer_length= (size_t) data_read;
327 ptr->read_ptr= ptr->read_buffer;
328 }
329
330 if (length > 1)
331 {
332 size_t difference;
333
334 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
335
336 memcpy(buffer_ptr, ptr->read_ptr, difference);
337 length -= difference;
338 ptr->read_ptr+= difference;
339 ptr->read_buffer_length-= difference;
340 buffer_ptr+= difference;
341 }
342 else
343 {
344 *buffer_ptr= *ptr->read_ptr;
345 ptr->read_ptr++;
346 ptr->read_buffer_length--;
347 buffer_ptr++;
348 break;
349 }
350 }
351
352 ptr->server_failure_counter= 0;
353 *nread = (ssize_t)(buffer_ptr - (char*)buffer);
354 return MEMCACHED_SUCCESS;
355 }
356
357 static ssize_t _io_write(memcached_server_write_instance_st ptr,
358 const void *buffer, size_t length, bool with_flush)
359 {
360 size_t original_length;
361 const char* buffer_ptr;
362
363 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
364
365 original_length= length;
366 buffer_ptr= buffer;
367
368 /* more writable data is coming if a flush isn't required, so delay send */
369 if (! with_flush)
370 {
371 memcached_io_cork_push(ptr);
372 }
373
374 while (length)
375 {
376 char *write_ptr;
377 size_t should_write;
378 size_t buffer_end;
379
380 if (ptr->type == MEMCACHED_CONNECTION_UDP)
381 {
382 //UDP does not support partial writes
383 buffer_end= MAX_UDP_DATAGRAM_LENGTH;
384 should_write= length;
385 if (ptr->write_buffer_offset + should_write > buffer_end)
386 {
387 fprintf(stderr, "%s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout);
388 return -1;
389 }
390 }
391 else
392 {
393 buffer_end= MEMCACHED_MAX_BUFFER;
394 should_write= buffer_end - ptr->write_buffer_offset;
395 should_write= (should_write < length) ? should_write : length;
396 }
397
398 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
399 memcpy(write_ptr, buffer_ptr, should_write);
400 ptr->write_buffer_offset+= should_write;
401 buffer_ptr+= should_write;
402 length-= should_write;
403
404 if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
405 {
406 memcached_return_t rc;
407 ssize_t sent_length;
408
409 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
410 sent_length= io_flush(ptr, &rc);
411 if (sent_length == -1)
412 {
413 return -1;
414 }
415
416 /* If io_flush calls memcached_purge, sent_length may be 0 */
417 unlikely (sent_length != 0)
418 {
419 WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end);
420 }
421 }
422 }
423
424 if (with_flush)
425 {
426 memcached_return_t rc;
427 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
428 if (io_flush(ptr, &rc) == -1)
429 {
430 fprintf(stderr, "%s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout);
431 return -1;
432 }
433
434 memcached_io_cork_pop(ptr);
435 }
436
437 return (ssize_t) original_length;
438 }
439
440 ssize_t memcached_io_write(memcached_server_write_instance_st ptr,
441 const void *buffer, size_t length, bool with_flush)
442 {
443 return _io_write(ptr, buffer, length, with_flush);
444 }
445
446 ssize_t memcached_io_writev(memcached_server_write_instance_st ptr,
447 const struct libmemcached_io_vector_st *vector,
448 size_t number_of, bool with_flush)
449 {
450 ssize_t total= 0;
451
452 for (size_t x= 0; x < number_of; x++, vector++)
453 {
454 ssize_t returnable;
455
456 if ((returnable= _io_write(ptr, vector->buffer, vector->length, false)) == -1)
457 {
458 return -1;
459 }
460 total+= returnable;
461 }
462
463 if (with_flush)
464 {
465 if (memcached_io_write(ptr, NULL, 0, true) == -1)
466 {
467 return -1;
468 }
469 }
470
471 return total;
472 }
473
474
475 memcached_return_t memcached_io_close(memcached_server_write_instance_st ptr)
476 {
477 if (ptr->fd == INVALID_SOCKET)
478 {
479 return MEMCACHED_SUCCESS;
480 }
481
482 /* in case of death shutdown to avoid blocking at close() */
483 if (shutdown(ptr->fd, SHUT_RDWR) == SOCKET_ERROR && get_socket_errno() != ENOTCONN)
484 {
485 WATCHPOINT_NUMBER(ptr->fd);
486 WATCHPOINT_ERRNO(get_socket_errno());
487 WATCHPOINT_ASSERT(get_socket_errno());
488 }
489
490 if (closesocket(ptr->fd) == SOCKET_ERROR)
491 {
492 WATCHPOINT_ERRNO(get_socket_errno());
493 }
494
495 return MEMCACHED_SUCCESS;
496 }
497
498 memcached_server_write_instance_st memcached_io_get_readable_server(memcached_st *memc)
499 {
500 #define MAX_SERVERS_TO_POLL 100
501 struct pollfd fds[MAX_SERVERS_TO_POLL];
502 unsigned int host_index= 0;
503
504 for (uint32_t x= 0;
505 x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL;
506 ++x)
507 {
508 memcached_server_write_instance_st instance=
509 memcached_server_instance_fetch(memc, x);
510
511 if (instance->read_buffer_length > 0) /* I have data in the buffer */
512 return instance;
513
514 if (memcached_server_response_count(instance) > 0)
515 {
516 fds[host_index].events = POLLIN;
517 fds[host_index].revents = 0;
518 fds[host_index].fd = instance->fd;
519 ++host_index;
520 }
521 }
522
523 if (host_index < 2)
524 {
525 /* We have 0 or 1 server with pending events.. */
526 for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
527 {
528 memcached_server_write_instance_st instance=
529 memcached_server_instance_fetch(memc, x);
530
531 if (memcached_server_response_count(instance) > 0)
532 {
533 return instance;
534 }
535 }
536
537 return NULL;
538 }
539
540 int err= poll(fds, host_index, memc->poll_timeout);
541 switch (err) {
542 case -1:
543 memc->cached_errno = get_socket_errno();
544 /* FALLTHROUGH */
545 case 0:
546 break;
547 default:
548 for (size_t x= 0; x < host_index; ++x)
549 {
550 if (fds[x].revents & POLLIN)
551 {
552 for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
553 {
554 memcached_server_write_instance_st instance=
555 memcached_server_instance_fetch(memc, y);
556
557 if (instance->fd == fds[x].fd)
558 return instance;
559 }
560 }
561 }
562 }
563
564 return NULL;
565 }
566
567 static ssize_t io_flush(memcached_server_write_instance_st ptr,
568 memcached_return_t *error)
569 {
570 /*
571 ** We might want to purge the input buffer if we haven't consumed
572 ** any output yet... The test for the limits is the purge is inline
573 ** in the purge function to avoid duplicating the logic..
574 */
575 {
576 memcached_return_t rc;
577 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
578 rc= memcached_purge(ptr);
579
580 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
581 {
582 fprintf(stderr, "%s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout);
583 return -1;
584 }
585 }
586 ssize_t sent_length;
587 size_t return_length;
588 char *local_write_ptr= ptr->write_buffer;
589 size_t write_length= ptr->write_buffer_offset;
590
591 *error= MEMCACHED_SUCCESS;
592
593 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
594
595 // UDP Sanity check, make sure that we are not sending somthing too big
596 if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
597 {
598 fprintf(stderr, "%s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout);
599 return -1;
600 }
601
602 if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
603 && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
604 return 0;
605
606 /* Looking for memory overflows */
607 #if defined(DEBUG)
608 if (write_length == MEMCACHED_MAX_BUFFER)
609 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
610 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
611 #endif
612
613 return_length= 0;
614 while (write_length)
615 {
616 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
617 WATCHPOINT_ASSERT(write_length > 0);
618 sent_length= 0;
619 if (ptr->type == MEMCACHED_CONNECTION_UDP)
620 increment_udp_message_id(ptr);
621
622 assert(ptr->fd != -1);
623 sent_length= send(ptr->fd, local_write_ptr, write_length, 0);
624 if (sent_length == SOCKET_ERROR)
625 {
626 ptr->cached_errno= get_socket_errno();
627 #if 0 // @todo I should look at why we hit this bit of code hard frequently
628 WATCHPOINT_ERRNO(get_socket_errno());
629 WATCHPOINT_NUMBER(get_socket_errno());
630 #endif
631 switch (get_socket_errno())
632 {
633 case ENOBUFS:
634 continue;
635 case EWOULDBLOCK:
636 #ifdef USE_EAGAIN
637 case EAGAIN:
638 #endif
639 {
640 /*
641 * We may be blocked on write because the input buffer
642 * is full. Let's check if we have room in our input
643 * buffer for more data and retry the write before
644 * waiting..
645 */
646 if (repack_input_buffer(ptr) ||
647 process_input_buffer(ptr))
648 continue;
649
650 memcached_return_t rc;
651 rc= io_wait(ptr, MEM_WRITE);
652
653 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
654 continue;
655
656 memcached_quit_server(ptr, true);
657 fprintf(stderr, "%s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout);
658 return -1;
659 }
660 case ENOTCONN:
661 case EPIPE:
662 default:
663 fprintf(stderr, "%s %u %u %u\n", ptr->hostname, ptr->port, ptr->io_bytes_sent, ptr->server_failure_counter);
664 fprintf(stderr, "%s:%d (%s)(%s)\n", __FILE__, __LINE__,__func__, strerror(errno));fflush(stdout);
665 memcached_quit_server(ptr, true);
666 *error= MEMCACHED_ERRNO;
667 assert(ptr->fd == -1);
668 return -1;
669 }
670 }
671
672 if (ptr->type == MEMCACHED_CONNECTION_UDP &&
673 (size_t)sent_length != write_length)
674 {
675 memcached_quit_server(ptr, true);
676 fprintf(stderr, "%s:%d (%s)\n", __FILE__, __LINE__,__func__);fflush(stdout);
677 return -1;
678 }
679
680 ptr->io_bytes_sent += (uint32_t) sent_length;
681
682 local_write_ptr+= sent_length;
683 write_length-= (uint32_t) sent_length;
684 return_length+= (uint32_t) sent_length;
685 }
686
687 WATCHPOINT_ASSERT(write_length == 0);
688 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
689 // ptr->write_buffer_offset);
690
691 // if we are a udp server, the begining of the buffer is reserverd for
692 // the upd frame header
693 if (ptr->type == MEMCACHED_CONNECTION_UDP)
694 ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
695 else
696 ptr->write_buffer_offset= 0;
697
698 return (ssize_t) return_length;
699 }
700
701 /*
702 Eventually we will just kill off the server with the problem.
703 */
704 void memcached_io_reset(memcached_server_write_instance_st ptr)
705 {
706 memcached_quit_server(ptr, true);
707 }
708
709 /**
710 * Read a given number of bytes from the server and place it into a specific
711 * buffer. Reset the IO channel on this server if an error occurs.
712 */
713 memcached_return_t memcached_safe_read(memcached_server_write_instance_st ptr,
714 void *dta,
715 size_t size)
716 {
717 size_t offset= 0;
718 char *data= dta;
719
720 while (offset < size)
721 {
722 ssize_t nread;
723 memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset,
724 &nread);
725 if (rc != MEMCACHED_SUCCESS)
726 return rc;
727
728 offset+= (size_t) nread;
729 }
730
731 return MEMCACHED_SUCCESS;
732 }
733
734 memcached_return_t memcached_io_readline(memcached_server_write_instance_st ptr,
735 char *buffer_ptr,
736 size_t size)
737 {
738 bool line_complete= false;
739 size_t total_nr= 0;
740
741 while (!line_complete)
742 {
743 if (ptr->read_buffer_length == 0)
744 {
745 /*
746 * We don't have any data in the buffer, so let's fill the read
747 * buffer. Call the standard read function to avoid duplicating
748 * the logic.
749 */
750 ssize_t nread;
751 memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
752 if (rc != MEMCACHED_SUCCESS)
753 return rc;
754
755 if (*buffer_ptr == '\n')
756 line_complete= true;
757
758 ++buffer_ptr;
759 ++total_nr;
760 }
761
762 /* Now let's look in the buffer and copy as we go! */
763 while (ptr->read_buffer_length && total_nr < size && !line_complete)
764 {
765 *buffer_ptr = *ptr->read_ptr;
766 if (*buffer_ptr == '\n')
767 line_complete = true;
768 --ptr->read_buffer_length;
769 ++ptr->read_ptr;
770 ++total_nr;
771 ++buffer_ptr;
772 }
773
774 if (total_nr == size)
775 return MEMCACHED_PROTOCOL_ERROR;
776 }
777
778 return MEMCACHED_SUCCESS;
779 }
780
781 /*
782 * The udp request id consists of two seperate sections
783 * 1) The thread id
784 * 2) The message number
785 * The thread id should only be set when the memcached_st struct is created
786 * and should not be changed.
787 *
788 * The message num is incremented for each new message we send, this function
789 * extracts the message number from message_id, increments it and then
790 * writes the new value back into the header
791 */
792 static void increment_udp_message_id(memcached_server_write_instance_st ptr)
793 {
794 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
795 uint16_t cur_req= get_udp_datagram_request_id(header);
796 int msg_num= get_msg_num_from_request_id(cur_req);
797 int thread_id= get_thread_id_from_request_id(cur_req);
798
799 if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
800 msg_num= 0;
801
802 header->request_id= htons((uint16_t) (thread_id | msg_num));
803 }
804
805 memcached_return_t memcached_io_init_udp_header(memcached_server_write_instance_st ptr, uint16_t thread_id)
806 {
807 if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
808 return MEMCACHED_FAILURE;
809
810 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
811 header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
812 header->num_datagrams= htons(1);
813 header->sequence_number= htons(0);
814
815 return MEMCACHED_SUCCESS;
816 }