src/libmemcached: apply clang-format
[awesomized/libmemcached] / src / libmemcached / io.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 #ifdef HAVE_SYS_SOCKET_H
19 # include <sys/socket.h>
20 #endif
21
22 void initialize_binary_request(memcached_instance_st *server,
23 protocol_binary_request_header &header) {
24 server->request_id++;
25 header.request.magic = PROTOCOL_BINARY_REQ;
26 header.request.opaque = htons(server->request_id);
27 }
28
29 enum memc_read_or_write { MEM_READ, MEM_WRITE };
30
31 /**
32 * Try to fill the input buffer for a server with as much
33 * data as possible.
34 *
35 * @param instance the server to pack
36 */
37 static bool repack_input_buffer(memcached_instance_st *instance) {
38 if (instance->read_ptr != instance->read_buffer) {
39 /* Move all of the data to the beginning of the buffer so
40 ** that we can fit more data into the buffer...
41 */
42 memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length);
43 instance->read_ptr = instance->read_buffer;
44 }
45
46 /* There is room in the buffer, try to fill it! */
47 if (instance->read_buffer_length != MEMCACHED_MAX_BUFFER) {
48 do {
49 /* Just try a single read to grab what's available */
50 ssize_t nr;
51 if ((nr = ::recv(instance->fd, instance->read_ptr + instance->read_buffer_length,
52 MEMCACHED_MAX_BUFFER - instance->read_buffer_length, MSG_NOSIGNAL))
53 <= 0)
54 {
55 if (nr == 0) {
56 memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
57 } else {
58 switch (get_socket_errno()) {
59 case EINTR: continue;
60
61 #if EWOULDBLOCK != EAGAIN
62 case EWOULDBLOCK:
63 #endif
64 case EAGAIN:
65 #ifdef __linux
66 case ERESTART:
67 #endif
68 break; // No IO is fine, we can just move on
69
70 default: memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT);
71 }
72 }
73
74 break;
75 } else // We read data, append to our read buffer
76 {
77 instance->read_buffer_length += size_t(nr);
78
79 return true;
80 }
81 } while (false);
82 }
83
84 return false;
85 }
86
87 /**
88 * If the we have callbacks connected to this server structure
89 * we may start process the input queue and fire the callbacks
90 * for the incomming messages. This function is _only_ called
91 * when the input buffer is full, so that we _know_ that we have
92 * at least _one_ message to process.
93 *
94 * @param instance the server to star processing iput messages for
95 * @return true if we processed anything, false otherwise
96 */
97 static bool process_input_buffer(memcached_instance_st *instance) {
98 /*
99 ** We might be able to process some of the response messages if we
100 ** have a callback set up
101 */
102 if (instance->root->callbacks != NULL) {
103 /*
104 * We might have responses... try to read them out and fire
105 * callbacks
106 */
107 memcached_callback_st cb = *instance->root->callbacks;
108
109 memcached_set_processing_input((Memcached *) instance->root, true);
110
111 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
112 Memcached *root = (Memcached *) instance->root;
113 memcached_return_t error = memcached_response(instance, buffer, sizeof(buffer), &root->result);
114
115 memcached_set_processing_input(root, false);
116
117 if (error == MEMCACHED_SUCCESS) {
118 for (unsigned int x = 0; x < cb.number_of_callback; x++) {
119 error = (*cb.callback[x])(instance->root, &root->result, cb.context);
120 if (error != MEMCACHED_SUCCESS) {
121 break;
122 }
123 }
124
125 /* @todo what should I do with the error message??? */
126 }
127 /* @todo what should I do with other error messages?? */
128 return true;
129 }
130
131 return false;
132 }
133
134 static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
135 /*
136 ** We are going to block on write, but at least on Solaris we might block
137 ** on write if we haven't read anything from our input buffer..
138 ** Try to purge the input buffer if we don't do any flow control in the
139 ** application layer (just sending a lot of data etc)
140 ** The test is moved down in the purge function to avoid duplication of
141 ** the test.
142 */
143 if (events & POLLOUT) {
144 if (memcached_purge(instance) == false) {
145 return MEMCACHED_FAILURE;
146 }
147 }
148
149 struct pollfd fds;
150 fds.fd = instance->fd;
151 fds.events = events;
152 fds.revents = 0;
153
154 if (fds.events & POLLOUT) /* write */ {
155 instance->io_wait_count.write++;
156 } else {
157 instance->io_wait_count.read++;
158 }
159
160 if (instance->root->poll_timeout
161 == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
162 {
163 return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
164 memcached_literal_param("poll_timeout() was set to zero"));
165 }
166
167 size_t loop_max = 5;
168 while (--loop_max) // While loop is for ERESTART or EINTR
169 {
170 int active_fd = poll(&fds, 1, instance->root->poll_timeout);
171
172 if (active_fd >= 1) {
173 assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors");
174 if (fds.revents & POLLIN or fds.revents & POLLOUT) {
175 return MEMCACHED_SUCCESS;
176 }
177
178 if (fds.revents & POLLHUP) {
179 return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
180 memcached_literal_param("poll() detected hang up"));
181 }
182
183 if (fds.revents & POLLERR) {
184 int local_errno = EINVAL;
185 int err;
186 socklen_t len = sizeof(err);
187 if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) {
188 if (err == 0) // treat this as EINTR
189 {
190 continue;
191 }
192 local_errno = err;
193 }
194 memcached_quit_server(instance, true);
195 return memcached_set_errno(*instance, local_errno, MEMCACHED_AT,
196 memcached_literal_param("poll() returned POLLHUP"));
197 }
198
199 return memcached_set_error(
200 *instance, MEMCACHED_FAILURE, MEMCACHED_AT,
201 memcached_literal_param("poll() returned a value that was not dealt with"));
202 }
203
204 if (active_fd == 0) {
205 return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
206 memcached_literal_param("No active_fd were found"));
207 }
208
209 // Only an error should result in this code being called.
210 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
211 assert_msg(active_fd == -1, "poll() returned an unexpected value");
212 switch (local_errno) {
213 #ifdef __linux
214 case ERESTART:
215 #endif
216 case EINTR: continue;
217
218 case EFAULT:
219 case ENOMEM:
220 memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
221 break;
222
223 case EINVAL:
224 memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
225 memcached_literal_param(
226 "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
227 break;
228
229 default:
230 memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
231 }
232
233 break;
234 }
235
236 memcached_quit_server(instance, true);
237
238 if (memcached_has_error(instance)) {
239 return memcached_instance_error_return(instance);
240 }
241
242 return memcached_set_error(
243 *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
244 memcached_literal_param("number of attempts to call io_wait() failed"));
245 }
246
247 static bool io_flush(memcached_instance_st *instance, const bool with_flush,
248 memcached_return_t &error) {
249 /*
250 ** We might want to purge the input buffer if we haven't consumed
251 ** any output yet... The test for the limits is the purge is inline
252 ** in the purge function to avoid duplicating the logic..
253 */
254 {
255 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
256
257 if (memcached_purge(instance) == false) {
258 return false;
259 }
260 }
261 char *local_write_ptr = instance->write_buffer;
262 size_t write_length = instance->write_buffer_offset;
263
264 error = MEMCACHED_SUCCESS;
265
266 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
267
268 /* Looking for memory overflows */
269 #if defined(DEBUG)
270 if (write_length == MEMCACHED_MAX_BUFFER)
271 WATCHPOINT_ASSERT(instance->write_buffer == local_write_ptr);
272 WATCHPOINT_ASSERT((instance->write_buffer + MEMCACHED_MAX_BUFFER)
273 >= (local_write_ptr + write_length));
274 #endif
275
276 while (write_length) {
277 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
278 WATCHPOINT_ASSERT(write_length > 0);
279
280 int flags;
281 if (with_flush) {
282 flags = MSG_NOSIGNAL;
283 } else {
284 flags = MSG_NOSIGNAL | MSG_MORE;
285 }
286
287 ssize_t sent_length = ::send(instance->fd, local_write_ptr, write_length, flags);
288 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
289
290 if (sent_length == SOCKET_ERROR) {
291 #if 0 // @todo I should look at why we hit this bit of code hard frequently
292 WATCHPOINT_ERRNO(get_socket_errno());
293 WATCHPOINT_NUMBER(get_socket_errno());
294 #endif
295 switch (get_socket_errno()) {
296 case ENOBUFS: continue;
297
298 #if EWOULDBLOCK != EAGAIN
299 case EWOULDBLOCK:
300 #endif
301 case EAGAIN: {
302 /*
303 * We may be blocked on write because the input buffer
304 * is full. Let's check if we have room in our input
305 * buffer for more data and retry the write before
306 * waiting..
307 */
308 if (repack_input_buffer(instance) or process_input_buffer(instance)) {
309 continue;
310 }
311
312 memcached_return_t rc = io_wait(instance, POLLOUT);
313 if (memcached_success(rc)) {
314 continue;
315 } else if (rc == MEMCACHED_TIMEOUT) {
316 return false;
317 }
318
319 memcached_quit_server(instance, true);
320 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
321 return false;
322 }
323 case ENOTCONN:
324 case EPIPE:
325 default:
326 memcached_quit_server(instance, true);
327 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
328 WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET);
329 return false;
330 }
331 }
332
333 instance->io_bytes_sent += uint32_t(sent_length);
334
335 local_write_ptr += sent_length;
336 write_length -= uint32_t(sent_length);
337 }
338
339 WATCHPOINT_ASSERT(write_length == 0);
340 instance->write_buffer_offset = 0;
341
342 return true;
343 }
344
345 memcached_return_t memcached_io_wait_for_write(memcached_instance_st *instance) {
346 return io_wait(instance, POLLOUT);
347 }
348
349 memcached_return_t memcached_io_wait_for_read(memcached_instance_st *instance) {
350 return io_wait(instance, POLLIN);
351 }
352
353 static memcached_return_t _io_fill(memcached_instance_st *instance) {
354 ssize_t data_read;
355 do {
356 data_read = ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL);
357 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
358
359 if (data_read == SOCKET_ERROR) {
360 switch (get_socket_errno()) {
361 case EINTR: // We just retry
362 continue;
363
364 case ETIMEDOUT: // OSX
365 #if EWOULDBLOCK != EAGAIN
366 case EWOULDBLOCK:
367 #endif
368 case EAGAIN:
369 #ifdef __linux
370 case ERESTART:
371 #endif
372 {
373 memcached_return_t io_wait_ret;
374 if (memcached_success(io_wait_ret = io_wait(instance, POLLIN))) {
375 continue;
376 }
377
378 return io_wait_ret;
379 }
380
381 /* fall through */
382
383 case ENOTCONN: // Programmer Error
384 WATCHPOINT_ASSERT(0);
385 // fall through
386 case ENOTSOCK:
387 WATCHPOINT_ASSERT(0);
388 // fall through
389 case EBADF:
390 assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket");
391 /* fall through */
392 case EINVAL:
393 case EFAULT:
394 case ECONNREFUSED:
395 default:
396 memcached_quit_server(instance, true);
397 memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
398 break;
399 }
400
401 return memcached_instance_error_return(instance);
402 } else if (data_read == 0) {
403 /*
404 EOF. Any data received so far is incomplete
405 so discard it. This always reads by byte in case of TCP
406 and protocol enforcement happens at memcached_response()
407 looking for '\n'. We do not care for UDB which requests 8 bytes
408 at once. Generally, this means that connection went away. Since
409 for blocking I/O we do not return 0 and for non-blocking case
410 it will return EGAIN if data is not immediatly available.
411 */
412 memcached_quit_server(instance, true);
413 return memcached_set_error(
414 *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
415 memcached_literal_param("::rec() returned zero, server has disconnected"));
416 }
417 instance->io_wait_count._bytes_read += data_read;
418 } while (data_read <= 0);
419
420 instance->io_bytes_sent = 0;
421 instance->read_buffer_length = (size_t) data_read;
422 instance->read_ptr = instance->read_buffer;
423
424 return MEMCACHED_SUCCESS;
425 }
426
427 memcached_return_t memcached_io_read(memcached_instance_st *instance, void *buffer, size_t length,
428 ssize_t &nread) {
429 assert(memcached_is_udp(instance->root) == false);
430 assert_msg(
431 instance,
432 "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error
433 char *buffer_ptr = static_cast<char *>(buffer);
434
435 if (instance->fd == INVALID_SOCKET) {
436 #if 0
437 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
438 #endif
439 return MEMCACHED_CONNECTION_FAILURE;
440 }
441
442 while (length) {
443 if (instance->read_buffer_length == 0) {
444 memcached_return_t io_fill_ret;
445 if (memcached_fatal(io_fill_ret = _io_fill(instance))) {
446 nread = -1;
447 return io_fill_ret;
448 }
449 }
450
451 if (length > 1) {
452 size_t difference =
453 (length > instance->read_buffer_length) ? instance->read_buffer_length : length;
454
455 memcpy(buffer_ptr, instance->read_ptr, difference);
456 length -= difference;
457 instance->read_ptr += difference;
458 instance->read_buffer_length -= difference;
459 buffer_ptr += difference;
460 } else {
461 *buffer_ptr = *instance->read_ptr;
462 instance->read_ptr++;
463 instance->read_buffer_length--;
464 buffer_ptr++;
465 break;
466 }
467 }
468
469 nread = ssize_t(buffer_ptr - (char *) buffer);
470
471 return MEMCACHED_SUCCESS;
472 }
473
474 memcached_return_t memcached_io_slurp(memcached_instance_st *instance) {
475 assert_msg(instance, "Programmer error, invalid Instance");
476 assert(memcached_is_udp(instance->root) == false);
477
478 if (instance->fd == INVALID_SOCKET) {
479 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO),
480 "Invalid socket state");
481 return MEMCACHED_CONNECTION_FAILURE;
482 }
483
484 ssize_t data_read;
485 char buffer[MEMCACHED_MAX_BUFFER];
486 do {
487 data_read = ::recv(instance->fd, instance->read_buffer, sizeof(buffer), MSG_NOSIGNAL);
488 if (data_read == SOCKET_ERROR) {
489 switch (get_socket_errno()) {
490 case EINTR: // We just retry
491 continue;
492
493 case ETIMEDOUT: // OSX
494 #if EWOULDBLOCK != EAGAIN
495 case EWOULDBLOCK:
496 #endif
497 case EAGAIN:
498 #ifdef __linux
499 case ERESTART:
500 #endif
501 if (memcached_success(io_wait(instance, POLLIN))) {
502 continue;
503 }
504 return MEMCACHED_IN_PROGRESS;
505
506 /* fall through */
507
508 case ENOTCONN: // Programmer Error
509 case ENOTSOCK:
510 assert(0);
511 /* fall through */
512 case EBADF:
513 assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state");
514 /* fall through */
515 case EINVAL:
516 case EFAULT:
517 case ECONNREFUSED:
518 default: return MEMCACHED_CONNECTION_FAILURE; // We want this!
519 }
520 }
521 } while (data_read > 0);
522
523 return MEMCACHED_CONNECTION_FAILURE;
524 }
525
526 static bool _io_write(memcached_instance_st *instance, const void *buffer, size_t length,
527 bool with_flush, size_t &written) {
528 assert(instance->fd != INVALID_SOCKET);
529 assert(memcached_is_udp(instance->root) == false);
530
531 const char *buffer_ptr = static_cast<const char *>(buffer);
532
533 const size_t original_length = length;
534
535 while (length) {
536 char *write_ptr;
537 size_t buffer_end = MEMCACHED_MAX_BUFFER;
538 size_t should_write = buffer_end - instance->write_buffer_offset;
539 should_write = (should_write < length) ? should_write : length;
540
541 write_ptr = instance->write_buffer + instance->write_buffer_offset;
542 memcpy(write_ptr, buffer_ptr, should_write);
543 instance->write_buffer_offset += should_write;
544 buffer_ptr += should_write;
545 length -= should_write;
546
547 if (instance->write_buffer_offset == buffer_end) {
548 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
549
550 memcached_return_t rc;
551 if (io_flush(instance, with_flush, rc) == false) {
552 written = original_length - length;
553 return false;
554 }
555 }
556 }
557
558 if (with_flush) {
559 memcached_return_t rc;
560 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
561 if (io_flush(instance, with_flush, rc) == false) {
562 written = original_length - length;
563 return false;
564 }
565 }
566
567 written = original_length - length;
568
569 return true;
570 }
571
572 bool memcached_io_write(memcached_instance_st *instance) {
573 size_t written;
574 return _io_write(instance, NULL, 0, true, written);
575 }
576
577 ssize_t memcached_io_write(memcached_instance_st *instance, const void *buffer, const size_t length,
578 const bool with_flush) {
579 size_t written;
580
581 if (_io_write(instance, buffer, length, with_flush, written) == false) {
582 return -1;
583 }
584
585 return ssize_t(written);
586 }
587
588 bool memcached_io_writev(memcached_instance_st *instance, libmemcached_io_vector_st vector[],
589 const size_t number_of, const bool with_flush) {
590 ssize_t complete_total = 0;
591 ssize_t total = 0;
592
593 for (size_t x = 0; x < number_of; x++, vector++) {
594 complete_total += vector->length;
595 if (vector->length) {
596 size_t written;
597 if ((_io_write(instance, vector->buffer, vector->length, false, written)) == false) {
598 return false;
599 }
600 total += written;
601 }
602 }
603
604 if (with_flush) {
605 if (memcached_io_write(instance) == false) {
606 return false;
607 }
608 }
609
610 return (complete_total == total);
611 }
612
613 void memcached_instance_st::start_close_socket() {
614 if (fd != INVALID_SOCKET) {
615 shutdown(fd, SHUT_WR);
616 options.is_shutting_down = true;
617 }
618 }
619
620 void memcached_instance_st::reset_socket() {
621 if (fd != INVALID_SOCKET) {
622 (void) closesocket(fd);
623 fd = INVALID_SOCKET;
624 }
625 }
626
627 void memcached_instance_st::close_socket() {
628 if (fd != INVALID_SOCKET) {
629 int shutdown_options = SHUT_RD;
630 if (options.is_shutting_down == false) {
631 shutdown_options = SHUT_RDWR;
632 }
633
634 /* in case of death shutdown to avoid blocking at close() */
635 if (shutdown(fd, shutdown_options) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) {
636 WATCHPOINT_NUMBER(fd);
637 WATCHPOINT_ERRNO(get_socket_errno());
638 WATCHPOINT_ASSERT(get_socket_errno());
639 }
640
641 reset_socket();
642 state = MEMCACHED_SERVER_STATE_NEW;
643 }
644
645 state = MEMCACHED_SERVER_STATE_NEW;
646 cursor_active_ = 0;
647 io_bytes_sent = 0;
648 write_buffer_offset = size_t(root and memcached_is_udp(root) ? UDP_DATAGRAM_HEADER_LENGTH : 0);
649 read_buffer_length = 0;
650 read_ptr = read_buffer;
651 options.is_shutting_down = false;
652 memcached_server_response_reset(this);
653
654 // We reset the version so that if we end up talking to a different server
655 // we don't have stale server version information.
656 major_version = minor_version = micro_version = UINT8_MAX;
657 }
658
659 memcached_instance_st *memcached_io_get_readable_server(Memcached *memc, memcached_return_t &) {
660 #define MAX_SERVERS_TO_POLL 100
661 struct pollfd fds[MAX_SERVERS_TO_POLL];
662 nfds_t host_index = 0;
663
664 for (uint32_t x = 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) {
665 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
666
667 if (instance->read_buffer_length > 0) /* I have data in the buffer */ {
668 return instance;
669 }
670
671 if (instance->response_count() > 0) {
672 fds[host_index].events = POLLIN;
673 fds[host_index].revents = 0;
674 fds[host_index].fd = instance->fd;
675 ++host_index;
676 }
677 }
678
679 if (host_index < 2) {
680 /* We have 0 or 1 server with pending events.. */
681 for (uint32_t x = 0; x < memcached_server_count(memc); ++x) {
682 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
683
684 if (instance->response_count() > 0) {
685 return instance;
686 }
687 }
688
689 return NULL;
690 }
691
692 int error = poll(fds, host_index, memc->poll_timeout);
693 switch (error) {
694 case -1:
695 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
696 /* FALLTHROUGH */
697 case 0: break;
698
699 default:
700 for (nfds_t x = 0; x < host_index; ++x) {
701 if (fds[x].revents & POLLIN) {
702 for (uint32_t y = 0; y < memcached_server_count(memc); ++y) {
703 memcached_instance_st *instance = memcached_instance_fetch(memc, y);
704
705 if (instance->fd == fds[x].fd) {
706 return instance;
707 }
708 }
709 }
710 }
711 }
712
713 return NULL;
714 }
715
716 /*
717 Eventually we will just kill off the server with the problem.
718 */
719 void memcached_io_reset(memcached_instance_st *instance) {
720 memcached_quit_server(instance, true);
721 }
722
723 /**
724 * Read a given number of bytes from the server and place it into a specific
725 * buffer. Reset the IO channel on this server if an error occurs.
726 */
727 memcached_return_t memcached_safe_read(memcached_instance_st *instance, void *dta,
728 const size_t size) {
729 size_t offset = 0;
730 char *data = static_cast<char *>(dta);
731
732 while (offset < size) {
733 ssize_t nread;
734 memcached_return_t rc;
735
736 while (
737 memcached_continue(rc = memcached_io_read(instance, data + offset, size - offset, nread))) {
738 };
739
740 if (memcached_failed(rc)) {
741 return rc;
742 }
743
744 offset += size_t(nread);
745 }
746
747 return MEMCACHED_SUCCESS;
748 }
749
750 memcached_return_t memcached_io_readline(memcached_instance_st *instance, char *buffer_ptr,
751 size_t size, size_t &total_nr) {
752 total_nr = 0;
753 bool line_complete = false;
754
755 while (line_complete == false) {
756 if (instance->read_buffer_length == 0) {
757 /*
758 * We don't have any data in the buffer, so let's fill the read
759 * buffer. Call the standard read function to avoid duplicating
760 * the logic.
761 */
762 ssize_t nread;
763 memcached_return_t rc = memcached_io_read(instance, buffer_ptr, 1, nread);
764 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) {
765 memcached_quit_server(instance, true);
766 return memcached_set_error(*instance, rc, MEMCACHED_AT);
767 } else if (memcached_failed(rc)) {
768 return rc;
769 }
770
771 if (*buffer_ptr == '\n') {
772 line_complete = true;
773 }
774
775 ++buffer_ptr;
776 ++total_nr;
777 }
778
779 /* Now let's look in the buffer and copy as we go! */
780 while (instance->read_buffer_length and total_nr < size and line_complete == false) {
781 *buffer_ptr = *instance->read_ptr;
782 if (*buffer_ptr == '\n') {
783 line_complete = true;
784 }
785 --instance->read_buffer_length;
786 ++instance->read_ptr;
787 ++total_nr;
788 ++buffer_ptr;
789 }
790
791 if (total_nr == size) {
792 return MEMCACHED_PROTOCOL_ERROR;
793 }
794 }
795
796 return MEMCACHED_SUCCESS;
797 }