27b5ecbd0f244064a743150e71bf4b925223cd72
[m6w6/libmemcached] / src / libmemcached / io.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 #ifdef HAVE_SYS_SOCKET_H
19 # include <sys/socket.h>
20 #endif
21
22 void initialize_binary_request(memcached_instance_st *server,
23 protocol_binary_request_header &header) {
24 server->request_id++;
25 header.request.magic = PROTOCOL_BINARY_REQ;
26 header.request.opaque = htons(server->request_id);
27 }
28
29 enum memc_read_or_write { MEM_READ, MEM_WRITE };
30
31 /**
32 * Try to fill the input buffer for a server with as much
33 * data as possible.
34 *
35 * @param instance the server to pack
36 */
37 static bool repack_input_buffer(memcached_instance_st *instance) {
38 if (instance->read_ptr != instance->read_buffer) {
39 /* Move all of the data to the beginning of the buffer so
40 ** that we can fit more data into the buffer...
41 */
42 memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length);
43 instance->read_ptr = instance->read_buffer;
44 }
45
46 /* There is room in the buffer, try to fill it! */
47 if (instance->read_buffer_length != MEMCACHED_MAX_BUFFER) {
48 do {
49 /* Just try a single read to grab what's available */
50 ssize_t nr;
51 if ((nr = ::recv(instance->fd, instance->read_ptr + instance->read_buffer_length,
52 MEMCACHED_MAX_BUFFER - instance->read_buffer_length, MSG_NOSIGNAL))
53 <= 0)
54 {
55 if (nr == 0) {
56 memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
57 } else {
58 switch (get_socket_errno()) {
59 case EINTR:
60 continue;
61
62 #if EWOULDBLOCK != EAGAIN
63 case EWOULDBLOCK:
64 #endif
65 case EAGAIN:
66 #ifdef __linux
67 case ERESTART:
68 #endif
69 break; // No IO is fine, we can just move on
70
71 default:
72 memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT);
73 }
74 }
75
76 break;
77 } else // We read data, append to our read buffer
78 {
79 instance->read_buffer_length += size_t(nr);
80
81 return true;
82 }
83 } while (false);
84 }
85
86 return false;
87 }
88
89 /**
90 * If the we have callbacks connected to this server structure
91 * we may start process the input queue and fire the callbacks
92 * for the incomming messages. This function is _only_ called
93 * when the input buffer is full, so that we _know_ that we have
94 * at least _one_ message to process.
95 *
96 * @param instance the server to star processing iput messages for
97 * @return true if we processed anything, false otherwise
98 */
99 static bool process_input_buffer(memcached_instance_st *instance) {
100 /*
101 ** We might be able to process some of the response messages if we
102 ** have a callback set up
103 */
104 if (instance->root->callbacks) {
105 /*
106 * We might have responses... try to read them out and fire
107 * callbacks
108 */
109 memcached_callback_st cb = *instance->root->callbacks;
110
111 memcached_set_processing_input((Memcached *) instance->root, true);
112
113 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
114 Memcached *root = (Memcached *) instance->root;
115 memcached_return_t error = memcached_response(instance, buffer, sizeof(buffer), &root->result);
116
117 memcached_set_processing_input(root, false);
118
119 if (error == MEMCACHED_SUCCESS) {
120 for (unsigned int x = 0; x < cb.number_of_callback; x++) {
121 error = (*cb.callback[x])(instance->root, &root->result, cb.context);
122 if (error != MEMCACHED_SUCCESS) {
123 break;
124 }
125 }
126
127 /* @todo what should I do with the error message??? */
128 }
129 /* @todo what should I do with other error messages?? */
130 return true;
131 }
132
133 return false;
134 }
135
136 static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
137 /*
138 ** We are going to block on write, but at least on Solaris we might block
139 ** on write if we haven't read anything from our input buffer..
140 ** Try to purge the input buffer if we don't do any flow control in the
141 ** application layer (just sending a lot of data etc)
142 ** The test is moved down in the purge function to avoid duplication of
143 ** the test.
144 */
145 if (events & POLLOUT) {
146 if (memcached_purge(instance) == false) {
147 return MEMCACHED_FAILURE;
148 }
149 }
150
151 struct pollfd fds;
152 fds.fd = instance->fd;
153 fds.events = events;
154 fds.revents = 0;
155
156 if (fds.events & POLLOUT) /* write */ {
157 instance->io_wait_count.write++;
158 } else {
159 instance->io_wait_count.read++;
160 }
161
162 if (instance->root->poll_timeout
163 == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
164 {
165 return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
166 memcached_literal_param("poll_timeout() was set to zero"));
167 }
168
169 size_t loop_max = 5;
170 while (--loop_max) // While loop is for ERESTART or EINTR
171 {
172 int active_fd = poll(&fds, 1, instance->root->poll_timeout);
173
174 if (active_fd >= 1) {
175 assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors");
176 if (fds.revents & POLLIN or fds.revents & POLLOUT) {
177 return MEMCACHED_SUCCESS;
178 }
179
180 if (fds.revents & POLLHUP) {
181 return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
182 memcached_literal_param("poll() detected hang up"));
183 }
184
185 if (fds.revents & POLLERR) {
186 int local_errno = EINVAL;
187 int err;
188 socklen_t len = sizeof(err);
189 if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) {
190 if (err == 0) // treat this as EINTR
191 {
192 continue;
193 }
194 local_errno = err;
195 }
196 memcached_quit_server(instance, true);
197 return memcached_set_errno(*instance, local_errno, MEMCACHED_AT,
198 memcached_literal_param("poll() returned POLLHUP"));
199 }
200
201 return memcached_set_error(
202 *instance, MEMCACHED_FAILURE, MEMCACHED_AT,
203 memcached_literal_param("poll() returned a value that was not dealt with"));
204 }
205
206 if (active_fd == 0) {
207 return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
208 memcached_literal_param("No active_fd were found"));
209 }
210
211 // Only an error should result in this code being called.
212 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
213 assert_msg(active_fd == -1, "poll() returned an unexpected value");
214 switch (local_errno) {
215 #ifdef __linux
216 case ERESTART:
217 #endif
218 case EINTR:
219 continue;
220
221 case EFAULT:
222 case ENOMEM:
223 memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
224 break;
225
226 case EINVAL:
227 memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
228 memcached_literal_param(
229 "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
230 break;
231
232 default:
233 memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
234 }
235
236 break;
237 }
238
239 memcached_quit_server(instance, true);
240
241 if (memcached_has_error(instance)) {
242 return memcached_instance_error_return(instance);
243 }
244
245 return memcached_set_error(
246 *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
247 memcached_literal_param("number of attempts to call io_wait() failed"));
248 }
249
250 static bool io_flush(memcached_instance_st *instance, const bool with_flush,
251 memcached_return_t &error) {
252 /*
253 ** We might want to purge the input buffer if we haven't consumed
254 ** any output yet... The test for the limits is the purge is inline
255 ** in the purge function to avoid duplicating the logic..
256 */
257 {
258 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
259
260 if (memcached_purge(instance) == false) {
261 return false;
262 }
263 }
264 char *local_write_ptr = instance->write_buffer;
265 size_t write_length = instance->write_buffer_offset;
266
267 error = MEMCACHED_SUCCESS;
268
269 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
270
271 /* Looking for memory overflows */
272 #if defined(DEBUG)
273 if (write_length == MEMCACHED_MAX_BUFFER)
274 WATCHPOINT_ASSERT(instance->write_buffer == local_write_ptr);
275 WATCHPOINT_ASSERT((instance->write_buffer + MEMCACHED_MAX_BUFFER)
276 >= (local_write_ptr + write_length));
277 #endif
278
279 while (write_length) {
280 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
281 WATCHPOINT_ASSERT(write_length > 0);
282
283 int flags;
284 if (with_flush) {
285 flags = MSG_NOSIGNAL;
286 } else {
287 flags = MSG_NOSIGNAL | MSG_MORE;
288 }
289
290 ssize_t sent_length = ::send(instance->fd, local_write_ptr, write_length, flags);
291 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
292
293 if (sent_length == SOCKET_ERROR) {
294 #if 0 // @todo I should look at why we hit this bit of code hard frequently
295 WATCHPOINT_ERRNO(get_socket_errno());
296 WATCHPOINT_NUMBER(get_socket_errno());
297 #endif
298 switch (get_socket_errno()) {
299 case ENOBUFS:
300 continue;
301
302 #if EWOULDBLOCK != EAGAIN
303 case EWOULDBLOCK:
304 #endif
305 case EAGAIN: {
306 /*
307 * We may be blocked on write because the input buffer
308 * is full. Let's check if we have room in our input
309 * buffer for more data and retry the write before
310 * waiting..
311 */
312 if (repack_input_buffer(instance) or process_input_buffer(instance)) {
313 continue;
314 }
315
316 memcached_return_t rc = io_wait(instance, POLLOUT);
317 if (memcached_success(rc)) {
318 continue;
319 } else if (rc == MEMCACHED_TIMEOUT) {
320 return false;
321 }
322
323 memcached_quit_server(instance, true);
324 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
325 return false;
326 }
327 case ENOTCONN:
328 case EPIPE:
329 default:
330 memcached_quit_server(instance, true);
331 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
332 WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET);
333 return false;
334 }
335 }
336
337 instance->io_bytes_sent += uint32_t(sent_length);
338
339 local_write_ptr += sent_length;
340 write_length -= uint32_t(sent_length);
341 }
342
343 WATCHPOINT_ASSERT(write_length == 0);
344 instance->write_buffer_offset = 0;
345
346 return true;
347 }
348
349 memcached_return_t memcached_io_wait_for_write(memcached_instance_st *instance) {
350 return io_wait(instance, POLLOUT);
351 }
352
353 memcached_return_t memcached_io_wait_for_read(memcached_instance_st *instance) {
354 return io_wait(instance, POLLIN);
355 }
356
357 static memcached_return_t _io_fill(memcached_instance_st *instance) {
358 ssize_t data_read;
359 do {
360 data_read = ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL);
361 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
362
363 if (data_read == SOCKET_ERROR) {
364 switch (get_socket_errno()) {
365 case EINTR: // We just retry
366 continue;
367
368 case ETIMEDOUT: // OSX
369 #if EWOULDBLOCK != EAGAIN
370 case EWOULDBLOCK:
371 #endif
372 case EAGAIN:
373 #ifdef __linux
374 case ERESTART:
375 #endif
376 {
377 memcached_return_t io_wait_ret;
378 if (memcached_success(io_wait_ret = io_wait(instance, POLLIN))) {
379 continue;
380 }
381
382 return io_wait_ret;
383 }
384
385 /* fall through */
386
387 case ENOTCONN: // Programmer Error
388 WATCHPOINT_ASSERT(0);
389 // fall through
390 case ENOTSOCK:
391 WATCHPOINT_ASSERT(0);
392 // fall through
393 case EBADF:
394 assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket");
395 /* fall through */
396 case EINVAL:
397 case EFAULT:
398 case ECONNREFUSED:
399 default:
400 memcached_quit_server(instance, true);
401 memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
402 break;
403 }
404
405 return memcached_instance_error_return(instance);
406 } else if (data_read == 0) {
407 /*
408 EOF. Any data received so far is incomplete
409 so discard it. This always reads by byte in case of TCP
410 and protocol enforcement happens at memcached_response()
411 looking for '\n'. We do not care for UDB which requests 8 bytes
412 at once. Generally, this means that connection went away. Since
413 for blocking I/O we do not return 0 and for non-blocking case
414 it will return EGAIN if data is not immediatly available.
415 */
416 memcached_quit_server(instance, true);
417 return memcached_set_error(
418 *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
419 memcached_literal_param("::rec() returned zero, server has disconnected"));
420 }
421 instance->io_wait_count._bytes_read += data_read;
422 } while (data_read <= 0);
423
424 instance->io_bytes_sent = 0;
425 instance->read_buffer_length = (size_t) data_read;
426 instance->read_ptr = instance->read_buffer;
427
428 return MEMCACHED_SUCCESS;
429 }
430
431 memcached_return_t memcached_io_read(memcached_instance_st *instance, void *buffer, size_t length,
432 ssize_t &nread) {
433 assert(memcached_is_udp(instance->root) == false);
434 assert_msg(
435 instance,
436 "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error
437 char *buffer_ptr = static_cast<char *>(buffer);
438
439 if (instance->fd == INVALID_SOCKET) {
440 #if 0
441 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
442 #endif
443 return MEMCACHED_CONNECTION_FAILURE;
444 }
445
446 while (length) {
447 if (instance->read_buffer_length == 0) {
448 memcached_return_t io_fill_ret;
449 if (memcached_fatal(io_fill_ret = _io_fill(instance))) {
450 nread = -1;
451 return io_fill_ret;
452 }
453 }
454
455 if (length > 1) {
456 size_t difference =
457 (length > instance->read_buffer_length) ? instance->read_buffer_length : length;
458
459 memcpy(buffer_ptr, instance->read_ptr, difference);
460 length -= difference;
461 instance->read_ptr += difference;
462 instance->read_buffer_length -= difference;
463 buffer_ptr += difference;
464 } else {
465 *buffer_ptr = *instance->read_ptr;
466 instance->read_ptr++;
467 instance->read_buffer_length--;
468 buffer_ptr++;
469 break;
470 }
471 }
472
473 nread = ssize_t(buffer_ptr - (char *) buffer);
474
475 return MEMCACHED_SUCCESS;
476 }
477
478 memcached_return_t memcached_io_slurp(memcached_instance_st *instance) {
479 assert_msg(instance, "Programmer error, invalid Instance");
480 assert(memcached_is_udp(instance->root) == false);
481
482 if (instance->fd == INVALID_SOCKET) {
483 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO),
484 "Invalid socket state");
485 return MEMCACHED_CONNECTION_FAILURE;
486 }
487
488 ssize_t data_read;
489 char buffer[MEMCACHED_MAX_BUFFER];
490 do {
491 data_read = ::recv(instance->fd, instance->read_buffer, sizeof(buffer), MSG_NOSIGNAL);
492 if (data_read == SOCKET_ERROR) {
493 switch (get_socket_errno()) {
494 case EINTR: // We just retry
495 continue;
496
497 case ETIMEDOUT: // OSX
498 #if EWOULDBLOCK != EAGAIN
499 case EWOULDBLOCK:
500 #endif
501 case EAGAIN:
502 #ifdef __linux
503 case ERESTART:
504 #endif
505 if (memcached_success(io_wait(instance, POLLIN))) {
506 continue;
507 }
508 return MEMCACHED_IN_PROGRESS;
509
510 /* fall through */
511
512 case ENOTCONN: // Programmer Error
513 case ENOTSOCK:
514 assert(0);
515 /* fall through */
516 case EBADF:
517 assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state");
518 /* fall through */
519 case EINVAL:
520 case EFAULT:
521 case ECONNREFUSED:
522 default:
523 return MEMCACHED_CONNECTION_FAILURE; // We want this!
524 }
525 }
526 } while (data_read > 0);
527
528 return MEMCACHED_CONNECTION_FAILURE;
529 }
530
531 static bool _io_write(memcached_instance_st *instance, const void *buffer, size_t length,
532 bool with_flush, size_t &written) {
533 assert(instance->fd != INVALID_SOCKET);
534 assert(memcached_is_udp(instance->root) == false);
535
536 const char *buffer_ptr = static_cast<const char *>(buffer);
537
538 const size_t original_length = length;
539
540 while (length) {
541 char *write_ptr;
542 size_t buffer_end = MEMCACHED_MAX_BUFFER;
543 size_t should_write = buffer_end - instance->write_buffer_offset;
544 should_write = (should_write < length) ? should_write : length;
545
546 write_ptr = instance->write_buffer + instance->write_buffer_offset;
547 memcpy(write_ptr, buffer_ptr, should_write);
548 instance->write_buffer_offset += should_write;
549 buffer_ptr += should_write;
550 length -= should_write;
551
552 if (instance->write_buffer_offset == buffer_end) {
553 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
554
555 memcached_return_t rc;
556 if (io_flush(instance, with_flush, rc) == false) {
557 written = original_length - length;
558 return false;
559 }
560 }
561 }
562
563 if (with_flush) {
564 memcached_return_t rc;
565 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
566 if (io_flush(instance, with_flush, rc) == false) {
567 written = original_length - length;
568 return false;
569 }
570 }
571
572 written = original_length - length;
573
574 return true;
575 }
576
577 bool memcached_io_write(memcached_instance_st *instance) {
578 size_t written;
579 return _io_write(instance, NULL, 0, true, written);
580 }
581
582 ssize_t memcached_io_write(memcached_instance_st *instance, const void *buffer, const size_t length,
583 const bool with_flush) {
584 size_t written;
585
586 if (_io_write(instance, buffer, length, with_flush, written) == false) {
587 return -1;
588 }
589
590 return ssize_t(written);
591 }
592
593 bool memcached_io_writev(memcached_instance_st *instance, libmemcached_io_vector_st vector[],
594 const size_t number_of, const bool with_flush) {
595 ssize_t complete_total = 0;
596 ssize_t total = 0;
597
598 for (size_t x = 0; x < number_of; x++, vector++) {
599 complete_total += vector->length;
600 if (vector->length) {
601 size_t written;
602 if ((_io_write(instance, vector->buffer, vector->length, false, written)) == false) {
603 return false;
604 }
605 total += written;
606 }
607 }
608
609 if (with_flush) {
610 if (memcached_io_write(instance) == false) {
611 return false;
612 }
613 }
614
615 return (complete_total == total);
616 }
617
618 void memcached_instance_st::start_close_socket() {
619 if (fd != INVALID_SOCKET) {
620 shutdown(fd, SHUT_WR);
621 options.is_shutting_down = true;
622 }
623 }
624
625 void memcached_instance_st::reset_socket() {
626 if (fd != INVALID_SOCKET) {
627 (void) closesocket(fd);
628 fd = INVALID_SOCKET;
629 }
630 }
631
632 void memcached_instance_st::close_socket() {
633 if (fd != INVALID_SOCKET) {
634 int shutdown_options = SHUT_RD;
635 if (options.is_shutting_down == false) {
636 shutdown_options = SHUT_RDWR;
637 }
638
639 /* in case of death shutdown to avoid blocking at close() */
640 if (shutdown(fd, shutdown_options) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) {
641 WATCHPOINT_NUMBER(fd);
642 WATCHPOINT_ERRNO(get_socket_errno());
643 WATCHPOINT_ASSERT(get_socket_errno());
644 }
645
646 reset_socket();
647 state = MEMCACHED_SERVER_STATE_NEW;
648 }
649
650 state = MEMCACHED_SERVER_STATE_NEW;
651 cursor_active_ = 0;
652 io_bytes_sent = 0;
653 write_buffer_offset = size_t(root and memcached_is_udp(root) ? UDP_DATAGRAM_HEADER_LENGTH : 0);
654 read_buffer_length = 0;
655 read_ptr = read_buffer;
656 options.is_shutting_down = false;
657 memcached_server_response_reset(this);
658
659 // We reset the version so that if we end up talking to a different server
660 // we don't have stale server version information.
661 major_version = minor_version = micro_version = UINT8_MAX;
662 }
663
664 memcached_instance_st *memcached_io_get_readable_server(Memcached *memc, memcached_return_t &) {
665 #define MAX_SERVERS_TO_POLL 100
666 struct pollfd fds[MAX_SERVERS_TO_POLL];
667 nfds_t host_index = 0;
668
669 for (uint32_t x = 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) {
670 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
671
672 if (instance->read_buffer_length > 0) /* I have data in the buffer */ {
673 return instance;
674 }
675
676 if (instance->response_count() > 0) {
677 fds[host_index].events = POLLIN;
678 fds[host_index].revents = 0;
679 fds[host_index].fd = instance->fd;
680 ++host_index;
681 }
682 }
683
684 if (host_index < 2) {
685 /* We have 0 or 1 server with pending events.. */
686 for (uint32_t x = 0; x < memcached_server_count(memc); ++x) {
687 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
688
689 if (instance->response_count() > 0) {
690 return instance;
691 }
692 }
693
694 return NULL;
695 }
696
697 int error = poll(fds, host_index, memc->poll_timeout);
698 switch (error) {
699 case -1:
700 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
701 /* FALLTHROUGH */
702 case 0:
703 break;
704
705 default:
706 for (nfds_t x = 0; x < host_index; ++x) {
707 if (fds[x].revents & POLLIN) {
708 for (uint32_t y = 0; y < memcached_server_count(memc); ++y) {
709 memcached_instance_st *instance = memcached_instance_fetch(memc, y);
710
711 if (instance->fd == fds[x].fd) {
712 return instance;
713 }
714 }
715 }
716 }
717 }
718
719 return NULL;
720 }
721
722 /*
723 Eventually we will just kill off the server with the problem.
724 */
725 void memcached_io_reset(memcached_instance_st *instance) {
726 memcached_quit_server(instance, true);
727 }
728
729 /**
730 * Read a given number of bytes from the server and place it into a specific
731 * buffer. Reset the IO channel on this server if an error occurs.
732 */
733 memcached_return_t memcached_safe_read(memcached_instance_st *instance, void *dta,
734 const size_t size) {
735 size_t offset = 0;
736 char *data = static_cast<char *>(dta);
737
738 while (offset < size) {
739 ssize_t nread;
740 memcached_return_t rc;
741
742 while (
743 memcached_continue(rc = memcached_io_read(instance, data + offset, size - offset, nread))) {
744 };
745
746 if (memcached_failed(rc)) {
747 return rc;
748 }
749
750 offset += size_t(nread);
751 }
752
753 return MEMCACHED_SUCCESS;
754 }
755
756 memcached_return_t memcached_io_readline(memcached_instance_st *instance, char *buffer_ptr,
757 size_t size, size_t &total_nr) {
758 total_nr = 0;
759 bool line_complete = false;
760
761 while (line_complete == false) {
762 if (instance->read_buffer_length == 0) {
763 /*
764 * We don't have any data in the buffer, so let's fill the read
765 * buffer. Call the standard read function to avoid duplicating
766 * the logic.
767 */
768 ssize_t nread;
769 memcached_return_t rc = memcached_io_read(instance, buffer_ptr, 1, nread);
770 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) {
771 memcached_quit_server(instance, true);
772 return memcached_set_error(*instance, rc, MEMCACHED_AT);
773 } else if (memcached_failed(rc)) {
774 return rc;
775 }
776
777 if (*buffer_ptr == '\n') {
778 line_complete = true;
779 }
780
781 ++buffer_ptr;
782 ++total_nr;
783 }
784
785 /* Now let's look in the buffer and copy as we go! */
786 while (instance->read_buffer_length and total_nr < size and line_complete == false) {
787 *buffer_ptr = *instance->read_ptr;
788 if (*buffer_ptr == '\n') {
789 line_complete = true;
790 }
791 --instance->read_buffer_length;
792 ++instance->read_ptr;
793 ++total_nr;
794 ++buffer_ptr;
795 }
796
797 if (total_nr == size) {
798 return MEMCACHED_PROTOCOL_ERROR;
799 }
800 }
801
802 return MEMCACHED_SUCCESS;
803 }