p9y
[m6w6/libmemcached] / src / libmemcached / io.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 #include "p9y/socket.hpp"
19 #include "p9y/poll.hpp"
20
21 void initialize_binary_request(memcached_instance_st *server,
22 protocol_binary_request_header &header) {
23 server->request_id++;
24 header.request.magic = PROTOCOL_BINARY_REQ;
25 header.request.opaque = htons(server->request_id);
26 }
27
28 enum memc_read_or_write { MEM_READ, MEM_WRITE };
29
30 /**
31 * Try to fill the input buffer for a server with as much
32 * data as possible.
33 *
34 * @param instance the server to pack
35 */
36 static bool repack_input_buffer(memcached_instance_st *instance) {
37 if (instance->read_ptr != instance->read_buffer) {
38 /* Move all of the data to the beginning of the buffer so
39 ** that we can fit more data into the buffer...
40 */
41 memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length);
42 instance->read_ptr = instance->read_buffer;
43 }
44
45 /* There is room in the buffer, try to fill it! */
46 if (instance->read_buffer_length != MEMCACHED_MAX_BUFFER) {
47 do {
48 /* Just try a single read to grab what's available */
49 ssize_t nr;
50 if ((nr = ::recv(instance->fd, instance->read_ptr + instance->read_buffer_length,
51 MEMCACHED_MAX_BUFFER - instance->read_buffer_length, MSG_NOSIGNAL))
52 <= 0)
53 {
54 if (nr == 0) {
55 memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
56 } else {
57 switch (get_socket_errno()) {
58 case EINTR:
59 continue;
60
61 #if EWOULDBLOCK != EAGAIN
62 case EWOULDBLOCK:
63 #endif
64 case EAGAIN:
65 #ifdef __linux
66 case ERESTART:
67 #endif
68 break; // No IO is fine, we can just move on
69
70 default:
71 memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT);
72 }
73 }
74
75 break;
76 } else // We read data, append to our read buffer
77 {
78 instance->read_buffer_length += size_t(nr);
79
80 return true;
81 }
82 } while (false);
83 }
84
85 return false;
86 }
87
88 /**
89 * If the we have callbacks connected to this server structure
90 * we may start process the input queue and fire the callbacks
91 * for the incomming messages. This function is _only_ called
92 * when the input buffer is full, so that we _know_ that we have
93 * at least _one_ message to process.
94 *
95 * @param instance the server to star processing iput messages for
96 * @return true if we processed anything, false otherwise
97 */
98 static bool process_input_buffer(memcached_instance_st *instance) {
99 /*
100 ** We might be able to process some of the response messages if we
101 ** have a callback set up
102 */
103 if (instance->root->callbacks) {
104 /*
105 * We might have responses... try to read them out and fire
106 * callbacks
107 */
108 memcached_callback_st cb = *instance->root->callbacks;
109
110 memcached_set_processing_input((Memcached *) instance->root, true);
111
112 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
113 Memcached *root = (Memcached *) instance->root;
114 memcached_return_t error = memcached_response(instance, buffer, sizeof(buffer), &root->result);
115
116 memcached_set_processing_input(root, false);
117
118 if (error == MEMCACHED_SUCCESS) {
119 for (unsigned int x = 0; x < cb.number_of_callback; x++) {
120 error = (*cb.callback[x])(instance->root, &root->result, cb.context);
121 if (error != MEMCACHED_SUCCESS) {
122 break;
123 }
124 }
125
126 /* @todo what should I do with the error message??? */
127 }
128 /* @todo what should I do with other error messages?? */
129 return true;
130 }
131
132 return false;
133 }
134
135 static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
136 /*
137 ** We are going to block on write, but at least on Solaris we might block
138 ** on write if we haven't read anything from our input buffer..
139 ** Try to purge the input buffer if we don't do any flow control in the
140 ** application layer (just sending a lot of data etc)
141 ** The test is moved down in the purge function to avoid duplication of
142 ** the test.
143 */
144 if (events & POLLOUT) {
145 if (memcached_purge(instance) == false) {
146 return MEMCACHED_FAILURE;
147 }
148 }
149
150 struct pollfd fds;
151 fds.fd = instance->fd;
152 fds.events = events;
153 fds.revents = 0;
154
155 if (fds.events & POLLOUT) /* write */ {
156 instance->io_wait_count.write++;
157 } else {
158 instance->io_wait_count.read++;
159 }
160
161 if (instance->root->poll_timeout
162 == 0) // Mimic 0 causes timeout behavior (not all platforms do this)
163 {
164 return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
165 memcached_literal_param("poll_timeout() was set to zero"));
166 }
167
168 size_t loop_max = 5;
169 while (--loop_max) // While loop is for ERESTART or EINTR
170 {
171 int active_fd = poll(&fds, 1, instance->root->poll_timeout);
172
173 if (active_fd >= 1) {
174 assert_msg(active_fd == 1, "poll() returned an unexpected number of active file descriptors");
175 if (fds.revents & POLLIN or fds.revents & POLLOUT) {
176 return MEMCACHED_SUCCESS;
177 }
178
179 if (fds.revents & POLLHUP) {
180 return memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
181 memcached_literal_param("poll() detected hang up"));
182 }
183
184 if (fds.revents & POLLERR) {
185 int local_errno = EINVAL;
186 int err;
187 socklen_t len = sizeof(err);
188 if (getsockopt(instance->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0) {
189 if (err == 0) // treat this as EINTR
190 {
191 continue;
192 }
193 local_errno = err;
194 }
195 memcached_quit_server(instance, true);
196 return memcached_set_errno(*instance, local_errno, MEMCACHED_AT,
197 memcached_literal_param("poll() returned POLLHUP"));
198 }
199
200 return memcached_set_error(
201 *instance, MEMCACHED_FAILURE, MEMCACHED_AT,
202 memcached_literal_param("poll() returned a value that was not dealt with"));
203 }
204
205 if (active_fd == 0) {
206 return memcached_set_error(*instance, MEMCACHED_TIMEOUT, MEMCACHED_AT,
207 memcached_literal_param("No active_fd were found"));
208 }
209
210 // Only an error should result in this code being called.
211 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
212 assert_msg(active_fd == -1, "poll() returned an unexpected value");
213 switch (local_errno) {
214 #ifdef __linux
215 case ERESTART:
216 #endif
217 case EINTR:
218 continue;
219
220 case EFAULT:
221 case ENOMEM:
222 memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
223 break;
224
225 case EINVAL:
226 memcached_set_error(*instance, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT,
227 memcached_literal_param(
228 "RLIMIT_NOFILE exceeded, or if OSX the timeout value was invalid"));
229 break;
230
231 default:
232 memcached_set_errno(*instance, local_errno, MEMCACHED_AT, memcached_literal_param("poll"));
233 }
234
235 break;
236 }
237
238 memcached_quit_server(instance, true);
239
240 if (memcached_has_error(instance)) {
241 return memcached_instance_error_return(instance);
242 }
243
244 return memcached_set_error(
245 *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
246 memcached_literal_param("number of attempts to call io_wait() failed"));
247 }
248
249 static bool io_flush(memcached_instance_st *instance, const bool with_flush,
250 memcached_return_t &error) {
251 /*
252 ** We might want to purge the input buffer if we haven't consumed
253 ** any output yet... The test for the limits is the purge is inline
254 ** in the purge function to avoid duplicating the logic..
255 */
256 {
257 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
258
259 if (memcached_purge(instance) == false) {
260 return false;
261 }
262 }
263 char *local_write_ptr = instance->write_buffer;
264 size_t write_length = instance->write_buffer_offset;
265
266 error = MEMCACHED_SUCCESS;
267
268 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
269
270 /* Looking for memory overflows */
271 #if defined(DEBUG)
272 if (write_length == MEMCACHED_MAX_BUFFER)
273 WATCHPOINT_ASSERT(instance->write_buffer == local_write_ptr);
274 WATCHPOINT_ASSERT((instance->write_buffer + MEMCACHED_MAX_BUFFER)
275 >= (local_write_ptr + write_length));
276 #endif
277
278 while (write_length) {
279 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
280 WATCHPOINT_ASSERT(write_length > 0);
281
282 int flags;
283 if (with_flush) {
284 flags = MSG_NOSIGNAL;
285 } else {
286 flags = MSG_NOSIGNAL | MSG_MORE;
287 }
288
289 ssize_t sent_length = ::send(instance->fd, local_write_ptr, write_length, flags);
290 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
291
292 if (sent_length == SOCKET_ERROR) {
293 #if 0 // @todo I should look at why we hit this bit of code hard frequently
294 WATCHPOINT_ERRNO(get_socket_errno());
295 WATCHPOINT_NUMBER(get_socket_errno());
296 #endif
297 switch (get_socket_errno()) {
298 case ENOBUFS:
299 continue;
300
301 #if EWOULDBLOCK != EAGAIN
302 case EWOULDBLOCK:
303 #endif
304 case EAGAIN: {
305 /*
306 * We may be blocked on write because the input buffer
307 * is full. Let's check if we have room in our input
308 * buffer for more data and retry the write before
309 * waiting..
310 */
311 if (repack_input_buffer(instance) or process_input_buffer(instance)) {
312 continue;
313 }
314
315 memcached_return_t rc = io_wait(instance, POLLOUT);
316 if (memcached_success(rc)) {
317 continue;
318 } else if (rc == MEMCACHED_TIMEOUT) {
319 return false;
320 }
321
322 memcached_quit_server(instance, true);
323 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
324 return false;
325 }
326 case ENOTCONN:
327 case EPIPE:
328 default:
329 memcached_quit_server(instance, true);
330 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
331 WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET);
332 return false;
333 }
334 }
335
336 instance->io_bytes_sent += uint32_t(sent_length);
337
338 local_write_ptr += sent_length;
339 write_length -= uint32_t(sent_length);
340 }
341
342 WATCHPOINT_ASSERT(write_length == 0);
343 instance->write_buffer_offset = 0;
344
345 return true;
346 }
347
348 memcached_return_t memcached_io_wait_for_write(memcached_instance_st *instance) {
349 return io_wait(instance, POLLOUT);
350 }
351
352 memcached_return_t memcached_io_wait_for_read(memcached_instance_st *instance) {
353 return io_wait(instance, POLLIN);
354 }
355
356 static memcached_return_t _io_fill(memcached_instance_st *instance) {
357 ssize_t data_read;
358 do {
359 data_read = ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL);
360 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
361
362 if (data_read == SOCKET_ERROR) {
363 switch (get_socket_errno()) {
364 case EINTR: // We just retry
365 continue;
366
367 case ETIMEDOUT: // OSX
368 #if EWOULDBLOCK != EAGAIN
369 case EWOULDBLOCK:
370 #endif
371 case EAGAIN:
372 #ifdef __linux
373 case ERESTART:
374 #endif
375 {
376 memcached_return_t io_wait_ret;
377 if (memcached_success(io_wait_ret = io_wait(instance, POLLIN))) {
378 continue;
379 }
380
381 return io_wait_ret;
382 }
383
384 /* fall through */
385
386 case ENOTCONN: // Programmer Error
387 WATCHPOINT_ASSERT(0);
388 // fall through
389 case ENOTSOCK:
390 WATCHPOINT_ASSERT(0);
391 // fall through
392 case EBADF:
393 assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket");
394 /* fall through */
395 case EINVAL:
396 case EFAULT:
397 case ECONNREFUSED:
398 default:
399 memcached_quit_server(instance, true);
400 memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
401 break;
402 }
403
404 return memcached_instance_error_return(instance);
405 } else if (data_read == 0) {
406 /*
407 EOF. Any data received so far is incomplete
408 so discard it. This always reads by byte in case of TCP
409 and protocol enforcement happens at memcached_response()
410 looking for '\n'. We do not care for UDB which requests 8 bytes
411 at once. Generally, this means that connection went away. Since
412 for blocking I/O we do not return 0 and for non-blocking case
413 it will return EGAIN if data is not immediatly available.
414 */
415 memcached_quit_server(instance, true);
416 return memcached_set_error(
417 *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
418 memcached_literal_param("::rec() returned zero, server has disconnected"));
419 }
420 instance->io_wait_count._bytes_read += data_read;
421 } while (data_read <= 0);
422
423 instance->io_bytes_sent = 0;
424 instance->read_buffer_length = (size_t) data_read;
425 instance->read_ptr = instance->read_buffer;
426
427 return MEMCACHED_SUCCESS;
428 }
429
430 memcached_return_t memcached_io_read(memcached_instance_st *instance, void *buffer, size_t length,
431 ssize_t &nread) {
432 assert(memcached_is_udp(instance->root) == false);
433 assert_msg(
434 instance,
435 "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error
436 char *buffer_ptr = static_cast<char *>(buffer);
437
438 if (instance->fd == INVALID_SOCKET) {
439 #if 0
440 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
441 #endif
442 return MEMCACHED_CONNECTION_FAILURE;
443 }
444
445 while (length) {
446 if (instance->read_buffer_length == 0) {
447 memcached_return_t io_fill_ret;
448 if (memcached_fatal(io_fill_ret = _io_fill(instance))) {
449 nread = -1;
450 return io_fill_ret;
451 }
452 }
453
454 if (length > 1) {
455 size_t difference =
456 (length > instance->read_buffer_length) ? instance->read_buffer_length : length;
457
458 memcpy(buffer_ptr, instance->read_ptr, difference);
459 length -= difference;
460 instance->read_ptr += difference;
461 instance->read_buffer_length -= difference;
462 buffer_ptr += difference;
463 } else {
464 *buffer_ptr = *instance->read_ptr;
465 instance->read_ptr++;
466 instance->read_buffer_length--;
467 buffer_ptr++;
468 break;
469 }
470 }
471
472 nread = ssize_t(buffer_ptr - (char *) buffer);
473
474 return MEMCACHED_SUCCESS;
475 }
476
477 memcached_return_t memcached_io_slurp(memcached_instance_st *instance) {
478 assert_msg(instance, "Programmer error, invalid Instance");
479 assert(memcached_is_udp(instance->root) == false);
480
481 if (instance->fd == INVALID_SOCKET) {
482 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO),
483 "Invalid socket state");
484 return MEMCACHED_CONNECTION_FAILURE;
485 }
486
487 ssize_t data_read;
488 char buffer[MEMCACHED_MAX_BUFFER];
489 do {
490 data_read = ::recv(instance->fd, instance->read_buffer, sizeof(buffer), MSG_NOSIGNAL);
491 if (data_read == SOCKET_ERROR) {
492 switch (get_socket_errno()) {
493 case EINTR: // We just retry
494 continue;
495
496 case ETIMEDOUT: // OSX
497 #if EWOULDBLOCK != EAGAIN
498 case EWOULDBLOCK:
499 #endif
500 case EAGAIN:
501 #ifdef __linux
502 case ERESTART:
503 #endif
504 if (memcached_success(io_wait(instance, POLLIN))) {
505 continue;
506 }
507 return MEMCACHED_IN_PROGRESS;
508
509 /* fall through */
510
511 case ENOTCONN: // Programmer Error
512 case ENOTSOCK:
513 assert(0);
514 /* fall through */
515 case EBADF:
516 assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state");
517 /* fall through */
518 case EINVAL:
519 case EFAULT:
520 case ECONNREFUSED:
521 default:
522 return MEMCACHED_CONNECTION_FAILURE; // We want this!
523 }
524 }
525 } while (data_read > 0);
526
527 return MEMCACHED_CONNECTION_FAILURE;
528 }
529
530 static bool _io_write(memcached_instance_st *instance, const void *buffer, size_t length,
531 bool with_flush, size_t &written) {
532 assert(instance->fd != INVALID_SOCKET);
533 assert(memcached_is_udp(instance->root) == false);
534
535 const char *buffer_ptr = static_cast<const char *>(buffer);
536
537 const size_t original_length = length;
538
539 while (length) {
540 char *write_ptr;
541 size_t buffer_end = MEMCACHED_MAX_BUFFER;
542 size_t should_write = buffer_end - instance->write_buffer_offset;
543 should_write = (should_write < length) ? should_write : length;
544
545 write_ptr = instance->write_buffer + instance->write_buffer_offset;
546 memcpy(write_ptr, buffer_ptr, should_write);
547 instance->write_buffer_offset += should_write;
548 buffer_ptr += should_write;
549 length -= should_write;
550
551 if (instance->write_buffer_offset == buffer_end) {
552 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
553
554 memcached_return_t rc;
555 if (io_flush(instance, with_flush, rc) == false) {
556 written = original_length - length;
557 return false;
558 }
559 }
560 }
561
562 if (with_flush) {
563 memcached_return_t rc;
564 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
565 if (io_flush(instance, with_flush, rc) == false) {
566 written = original_length - length;
567 return false;
568 }
569 }
570
571 written = original_length - length;
572
573 return true;
574 }
575
576 bool memcached_io_write(memcached_instance_st *instance) {
577 size_t written;
578 return _io_write(instance, NULL, 0, true, written);
579 }
580
581 ssize_t memcached_io_write(memcached_instance_st *instance, const void *buffer, const size_t length,
582 const bool with_flush) {
583 size_t written;
584
585 if (_io_write(instance, buffer, length, with_flush, written) == false) {
586 return -1;
587 }
588
589 return ssize_t(written);
590 }
591
592 bool memcached_io_writev(memcached_instance_st *instance, libmemcached_io_vector_st vector[],
593 const size_t number_of, const bool with_flush) {
594 ssize_t complete_total = 0;
595 ssize_t total = 0;
596
597 for (size_t x = 0; x < number_of; x++, vector++) {
598 complete_total += vector->length;
599 if (vector->length) {
600 size_t written;
601 if ((_io_write(instance, vector->buffer, vector->length, false, written)) == false) {
602 return false;
603 }
604 total += written;
605 }
606 }
607
608 if (with_flush) {
609 if (memcached_io_write(instance) == false) {
610 return false;
611 }
612 }
613
614 return (complete_total == total);
615 }
616
617 void memcached_instance_st::start_close_socket() {
618 if (fd != INVALID_SOCKET) {
619 shutdown(fd, SHUT_WR);
620 options.is_shutting_down = true;
621 }
622 }
623
624 void memcached_instance_st::reset_socket() {
625 if (fd != INVALID_SOCKET) {
626 (void) closesocket(fd);
627 fd = INVALID_SOCKET;
628 }
629 }
630
631 void memcached_instance_st::close_socket() {
632 if (fd != INVALID_SOCKET) {
633 int shutdown_options = SHUT_RD;
634 if (options.is_shutting_down == false) {
635 shutdown_options = SHUT_RDWR;
636 }
637
638 /* in case of death shutdown to avoid blocking at close() */
639 if (shutdown(fd, shutdown_options) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) {
640 WATCHPOINT_NUMBER(fd);
641 WATCHPOINT_ERRNO(get_socket_errno());
642 WATCHPOINT_ASSERT(get_socket_errno());
643 }
644
645 reset_socket();
646 state = MEMCACHED_SERVER_STATE_NEW;
647 }
648
649 state = MEMCACHED_SERVER_STATE_NEW;
650 cursor_active_ = 0;
651 io_bytes_sent = 0;
652 write_buffer_offset = size_t(root and memcached_is_udp(root) ? UDP_DATAGRAM_HEADER_LENGTH : 0);
653 read_buffer_length = 0;
654 read_ptr = read_buffer;
655 options.is_shutting_down = false;
656 memcached_server_response_reset(this);
657
658 // We reset the version so that if we end up talking to a different server
659 // we don't have stale server version information.
660 major_version = minor_version = micro_version = UINT8_MAX;
661 }
662
663 memcached_instance_st *memcached_io_get_readable_server(Memcached *memc, memcached_return_t &) {
664 #define MAX_SERVERS_TO_POLL 100
665 struct pollfd fds[MAX_SERVERS_TO_POLL];
666 nfds_t host_index = 0;
667
668 for (uint32_t x = 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) {
669 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
670
671 if (instance->read_buffer_length > 0) /* I have data in the buffer */ {
672 return instance;
673 }
674
675 if (instance->response_count() > 0) {
676 fds[host_index].events = POLLIN;
677 fds[host_index].revents = 0;
678 fds[host_index].fd = instance->fd;
679 ++host_index;
680 }
681 }
682
683 if (host_index < 2) {
684 /* We have 0 or 1 server with pending events.. */
685 for (uint32_t x = 0; x < memcached_server_count(memc); ++x) {
686 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
687
688 if (instance->response_count() > 0) {
689 return instance;
690 }
691 }
692
693 return NULL;
694 }
695
696 int error = poll(fds, host_index, memc->poll_timeout);
697 switch (error) {
698 case -1:
699 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
700 /* FALLTHROUGH */
701 case 0:
702 break;
703
704 default:
705 for (nfds_t x = 0; x < host_index; ++x) {
706 if (fds[x].revents & POLLIN) {
707 for (uint32_t y = 0; y < memcached_server_count(memc); ++y) {
708 memcached_instance_st *instance = memcached_instance_fetch(memc, y);
709
710 if (instance->fd == fds[x].fd) {
711 return instance;
712 }
713 }
714 }
715 }
716 }
717
718 return NULL;
719 }
720
721 /*
722 Eventually we will just kill off the server with the problem.
723 */
724 void memcached_io_reset(memcached_instance_st *instance) {
725 memcached_quit_server(instance, true);
726 }
727
728 /**
729 * Read a given number of bytes from the server and place it into a specific
730 * buffer. Reset the IO channel on this server if an error occurs.
731 */
732 memcached_return_t memcached_safe_read(memcached_instance_st *instance, void *dta,
733 const size_t size) {
734 size_t offset = 0;
735 char *data = static_cast<char *>(dta);
736
737 while (offset < size) {
738 ssize_t nread;
739 memcached_return_t rc;
740
741 while (
742 memcached_continue(rc = memcached_io_read(instance, data + offset, size - offset, nread))) {
743 };
744
745 if (memcached_failed(rc)) {
746 return rc;
747 }
748
749 offset += size_t(nread);
750 }
751
752 return MEMCACHED_SUCCESS;
753 }
754
755 memcached_return_t memcached_io_readline(memcached_instance_st *instance, char *buffer_ptr,
756 size_t size, size_t &total_nr) {
757 total_nr = 0;
758 bool line_complete = false;
759
760 while (line_complete == false) {
761 if (instance->read_buffer_length == 0) {
762 /*
763 * We don't have any data in the buffer, so let's fill the read
764 * buffer. Call the standard read function to avoid duplicating
765 * the logic.
766 */
767 ssize_t nread;
768 memcached_return_t rc = memcached_io_read(instance, buffer_ptr, 1, nread);
769 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) {
770 memcached_quit_server(instance, true);
771 return memcached_set_error(*instance, rc, MEMCACHED_AT);
772 } else if (memcached_failed(rc)) {
773 return rc;
774 }
775
776 if (*buffer_ptr == '\n') {
777 line_complete = true;
778 }
779
780 ++buffer_ptr;
781 ++total_nr;
782 }
783
784 /* Now let's look in the buffer and copy as we go! */
785 while (instance->read_buffer_length and total_nr < size and line_complete == false) {
786 *buffer_ptr = *instance->read_ptr;
787 if (*buffer_ptr == '\n') {
788 line_complete = true;
789 }
790 --instance->read_buffer_length;
791 ++instance->read_ptr;
792 ++total_nr;
793 ++buffer_ptr;
794 }
795
796 if (total_nr == size) {
797 return MEMCACHED_PROTOCOL_ERROR;
798 }
799 }
800
801 return MEMCACHED_SUCCESS;
802 }