173a0fce8e74cd333817d635eff19a44820e2f9e
[awesomized/libmemcached] / src / libmemcached / io.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/poll.hpp"
18 #include "p9y/clock_gettime.hpp"
19
20 void initialize_binary_request(memcached_instance_st *server,
21 protocol_binary_request_header &header) {
22 server->request_id++;
23 header.request.magic = PROTOCOL_BINARY_REQ;
24 header.request.opaque = htons(server->request_id);
25 }
26
27 enum memc_read_or_write { MEM_READ, MEM_WRITE };
28
29 /**
30 * Try to fill the input buffer for a server with as much
31 * data as possible.
32 *
33 * @param instance the server to pack
34 */
35 static bool repack_input_buffer(memcached_instance_st *instance) {
36 if (instance->read_ptr != instance->read_buffer) {
37 /* Move all of the data to the beginning of the buffer so
38 ** that we can fit more data into the buffer...
39 */
40 memmove(instance->read_buffer, instance->read_ptr, instance->read_buffer_length);
41 instance->read_ptr = instance->read_buffer;
42 }
43
44 /* There is room in the buffer, try to fill it! */
45 if (instance->read_buffer_length != MEMCACHED_MAX_BUFFER) {
46 do {
47 /* Just try a single read to grab what's available */
48 ssize_t nr;
49 if ((nr = ::recv(instance->fd, instance->read_ptr + instance->read_buffer_length,
50 MEMCACHED_MAX_BUFFER - instance->read_buffer_length, MSG_NOSIGNAL))
51 <= 0)
52 {
53 if (nr == 0) {
54 memcached_set_error(*instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT);
55 } else {
56 switch (get_socket_errno()) {
57 case EINTR:
58 continue;
59
60 #if EWOULDBLOCK != EAGAIN
61 case EWOULDBLOCK:
62 #endif
63 case EAGAIN:
64 #ifdef HAVE_ERESTART
65 case ERESTART:
66 #endif
67 break; // No IO is fine, we can just move on
68
69 default:
70 memcached_set_errno(*instance, get_socket_errno(), MEMCACHED_AT);
71 }
72 }
73
74 break;
75 } else // We read data, append to our read buffer
76 {
77 instance->read_buffer_length += size_t(nr);
78
79 return true;
80 }
81 } while (false);
82 }
83
84 return false;
85 }
86
87 /**
88 * If the we have callbacks connected to this server structure
89 * we may start process the input queue and fire the callbacks
90 * for the incomming messages. This function is _only_ called
91 * when the input buffer is full, so that we _know_ that we have
92 * at least _one_ message to process.
93 *
94 * @param instance the server to star processing iput messages for
95 * @return true if we processed anything, false otherwise
96 */
97 static bool process_input_buffer(memcached_instance_st *instance) {
98 /*
99 ** We might be able to process some of the response messages if we
100 ** have a callback set up
101 */
102 if (instance->root->callbacks) {
103 /*
104 * We might have responses... try to read them out and fire
105 * callbacks
106 */
107 memcached_callback_st cb = *instance->root->callbacks;
108
109 memcached_set_processing_input((Memcached *) instance->root, true);
110
111 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
112 Memcached *root = (Memcached *) instance->root;
113 memcached_return_t error = memcached_response(instance, buffer, sizeof(buffer), &root->result);
114
115 memcached_set_processing_input(root, false);
116
117 if (error == MEMCACHED_SUCCESS) {
118 for (unsigned int x = 0; x < cb.number_of_callback; x++) {
119 error = (*cb.callback[x])(instance->root, &root->result, cb.context);
120 if (error != MEMCACHED_SUCCESS) {
121 break;
122 }
123 }
124
125 /* @todo what should I do with the error message??? */
126 }
127 /* @todo what should I do with other error messages?? */
128 return true;
129 }
130
131 return false;
132 }
133
134 static memcached_return_t io_sock_err(memcached_instance_st *inst,
135 const char *reason_str, size_t reason_len) {
136 int err;
137 socklen_t len = sizeof(err);
138
139 if (getsockopt(inst->fd, SOL_SOCKET, SO_ERROR, (char *) &err, &len) == -1) {
140 return memcached_set_errno(*inst, errno, MEMCACHED_AT,
141 memcached_literal_param("getsockopt()"));
142 }
143
144 if (err) {
145 return memcached_set_errno(*inst, err, MEMCACHED_AT, reason_str, reason_len);
146 }
147 return MEMCACHED_SUCCESS;
148 }
149
150 memcached_return_t memcached_io_poll(memcached_instance_st *inst, int16_t events, int prev_errno) {
151 int32_t timeout;
152 pollfd pfd{};
153 pfd.fd = inst->fd;
154 pfd.events = events ? events : inst->events();
155
156 if (events) {
157 timeout = inst->root->poll_timeout;
158 } else {
159 timeout = inst->root->connect_timeout;
160 }
161
162 if (!timeout) {
163 return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
164 memcached_literal_param("timeout was set to zero"));
165 }
166
167 timespec tspec{}; // for clock_gettime()
168 int64_t start, elapsed; // ns
169 int32_t poll_timeout = timeout; // ms
170
171 if (clock_gettime(CLOCK_MONOTONIC, &tspec)) {
172 return memcached_set_errno(*inst, errno, MEMCACHED_AT,
173 memcached_literal_param("clock_gettime()"));
174 }
175 start = tspec.tv_sec * 1000000000 + tspec.tv_nsec;
176 while (true) {
177 int active = poll(&pfd, 1, poll_timeout);
178
179 if (active == SOCKET_ERROR) {
180 int local_errno = get_socket_errno();
181
182 switch (local_errno) {
183 #ifdef HAVE_ERESTART
184 case ERESTART:
185 #endif
186 case EINTR:
187 clock_gettime(CLOCK_MONOTONIC, &tspec);
188 elapsed = tspec.tv_sec * 1000000000 + tspec.tv_nsec - start;
189 if (elapsed / 1000000 >= timeout || !start /* safety if clock_gettime is broken */) {
190 return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
191 memcached_literal_param("timeout on interrupt or restart"));
192 }
193 poll_timeout -= elapsed / 1000000;
194 continue;
195
196 case EFAULT:
197 case ENOMEM:
198 return memcached_set_error(*inst, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT);
199
200 case EINVAL:
201 return memcached_set_error(*inst, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT,
202 memcached_literal_param("RLIMIT_NOFILE exceeded, or invalid timeout"));
203 default:
204 if (events == IO_POLL_CONNECT) {
205 inst->reset_socket();
206 inst->state = MEMCACHED_SERVER_STATE_NEW;
207 }
208 return memcached_set_errno(*inst, local_errno, MEMCACHED_AT, memcached_literal_param("poll()"));
209 }
210 }
211
212 if (active == 0) {
213 /* do not test SO_ERROR on EALREADY */
214 if (prev_errno != EALREADY) {
215 memcached_return_t rc = io_sock_err(inst, memcached_literal_param("getsockopt() after poll() timed out"));
216 if (MEMCACHED_SUCCESS != rc) {
217 return rc;
218 }
219 }
220 return memcached_set_error(*inst, MEMCACHED_TIMEOUT, MEMCACHED_AT,
221 memcached_literal_param("time out"));
222 }
223
224 assert_msg(active == 1, "poll() returned an unexpected number of active file descriptors");
225
226 if (pfd.revents & (POLLERR|POLLHUP|POLLNVAL)) {
227 memcached_return_t rc = io_sock_err(inst, memcached_literal_param("poll(POLLERR|POLLHUP|POLLNVAL)"));
228 if (MEMCACHED_SUCCESS != rc) {
229 if (events != IO_POLL_CONNECT) {
230 memcached_quit_server(inst, true);
231 }
232 return rc;
233 }
234 }
235 if (pfd.revents & events || (events == IO_POLL_CONNECT && pfd.revents & POLLOUT)) {
236 return MEMCACHED_SUCCESS;
237 }
238 #if DEBUG
239 dprintf(STDERR_FILENO, "io_poll() looped!\n");
240 #endif
241 }
242 }
243
244 static memcached_return_t io_wait(memcached_instance_st *instance, const short events) {
245 if (events & POLLOUT) {
246 /*
247 ** We are going to block on write, but at least on Solaris we might block
248 ** on write if we haven't read anything from our input buffer..
249 ** Try to purge the input buffer if we don't do any flow control in the
250 ** application layer (just sending a lot of data etc)
251 ** The test is moved down in the purge function to avoid duplication of
252 ** the test.
253 */
254 if (memcached_purge(instance) == false) {
255 return MEMCACHED_FAILURE;
256 }
257 instance->io_wait_count.write++;
258 } else {
259 instance->io_wait_count.read++;
260 }
261
262 return memcached_io_poll(instance, events);
263 }
264
265 static bool io_flush(memcached_instance_st *instance, const bool with_flush,
266 memcached_return_t &error) {
267 /*
268 ** We might want to purge the input buffer if we haven't consumed
269 ** any output yet... The test for the limits is the purge is inline
270 ** in the purge function to avoid duplicating the logic..
271 */
272 {
273 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
274
275 if (memcached_purge(instance) == false) {
276 return false;
277 }
278 }
279 char *local_write_ptr = instance->write_buffer;
280 size_t write_length = instance->write_buffer_offset;
281
282 error = MEMCACHED_SUCCESS;
283
284 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
285
286 /* Looking for memory overflows */
287 #if defined(DEBUG)
288 if (write_length == MEMCACHED_MAX_BUFFER)
289 WATCHPOINT_ASSERT(instance->write_buffer == local_write_ptr);
290 WATCHPOINT_ASSERT((instance->write_buffer + MEMCACHED_MAX_BUFFER)
291 >= (local_write_ptr + write_length));
292 #endif
293
294 while (write_length) {
295 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
296 WATCHPOINT_ASSERT(write_length > 0);
297
298 int flags;
299 if (with_flush) {
300 flags = MSG_NOSIGNAL;
301 } else {
302 flags = MSG_NOSIGNAL | MSG_MORE;
303 }
304
305 ssize_t sent_length = ::send(instance->fd, local_write_ptr, write_length, flags);
306 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
307
308 if (sent_length == SOCKET_ERROR) {
309 #if 0 // @todo I should look at why we hit this bit of code hard frequently
310 WATCHPOINT_ERRNO(get_socket_errno());
311 WATCHPOINT_NUMBER(get_socket_errno());
312 #endif
313 switch (get_socket_errno()) {
314 case ENOBUFS:
315 continue;
316
317 #if EWOULDBLOCK != EAGAIN
318 case EWOULDBLOCK:
319 #endif
320 case EAGAIN: {
321 /*
322 * We may be blocked on write because the input buffer
323 * is full. Let's check if we have room in our input
324 * buffer for more data and retry the write before
325 * waiting..
326 */
327 if (repack_input_buffer(instance) or process_input_buffer(instance)) {
328 continue;
329 }
330
331 memcached_return_t rc = io_wait(instance, POLLOUT);
332 if (memcached_success(rc)) {
333 continue;
334 } else if (rc == MEMCACHED_TIMEOUT) {
335 return false;
336 }
337
338 memcached_quit_server(instance, true);
339 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
340 return false;
341 }
342 case ENOTCONN:
343 case EPIPE:
344 default:
345 memcached_quit_server(instance, true);
346 error = memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
347 WATCHPOINT_ASSERT(instance->fd == INVALID_SOCKET);
348 return false;
349 }
350 }
351
352 instance->io_bytes_sent += uint32_t(sent_length);
353
354 local_write_ptr += sent_length;
355 write_length -= uint32_t(sent_length);
356 }
357
358 WATCHPOINT_ASSERT(write_length == 0);
359 instance->write_buffer_offset = 0;
360
361 return true;
362 }
363
364 memcached_return_t memcached_io_wait_for_write(memcached_instance_st *instance) {
365 return io_wait(instance, POLLOUT);
366 }
367
368 memcached_return_t memcached_io_wait_for_read(memcached_instance_st *instance) {
369 return io_wait(instance, POLLIN);
370 }
371
372 static memcached_return_t _io_fill(memcached_instance_st *instance) {
373 ssize_t data_read;
374 do {
375 data_read = ::recv(instance->fd, instance->read_buffer, MEMCACHED_MAX_BUFFER, MSG_NOSIGNAL);
376 int local_errno = get_socket_errno(); // We cache in case memcached_quit_server() modifies errno
377
378 if (data_read == SOCKET_ERROR) {
379 switch (get_socket_errno()) {
380 case EINTR: // We just retry
381 continue;
382
383 case ETIMEDOUT: // OSX
384 #if EWOULDBLOCK != EAGAIN
385 case EWOULDBLOCK:
386 #endif
387 case EAGAIN:
388 #ifdef HAVE_ERESTART
389 case ERESTART:
390 #endif
391 {
392 memcached_return_t io_wait_ret;
393 if (memcached_success(io_wait_ret = io_wait(instance, POLLIN))) {
394 continue;
395 }
396
397 return io_wait_ret;
398 }
399
400 /* fall through */
401
402 case ENOTCONN: // Programmer Error
403 WATCHPOINT_ASSERT(0);
404 // fall through
405 case ENOTSOCK:
406 WATCHPOINT_ASSERT(0);
407 // fall through
408 case EBADF:
409 assert_msg(instance->fd != INVALID_SOCKET, "Programmer error, invalid socket");
410 /* fall through */
411 case EINVAL:
412 case EFAULT:
413 case ECONNREFUSED:
414 default:
415 memcached_quit_server(instance, true);
416 memcached_set_errno(*instance, local_errno, MEMCACHED_AT);
417 break;
418 }
419
420 return memcached_instance_error_return(instance);
421 } else if (data_read == 0) {
422 /*
423 EOF. Any data received so far is incomplete
424 so discard it. This always reads by byte in case of TCP
425 and protocol enforcement happens at memcached_response()
426 looking for '\n'. We do not care for UDB which requests 8 bytes
427 at once. Generally, this means that connection went away. Since
428 for blocking I/O we do not return 0 and for non-blocking case
429 it will return EGAIN if data is not immediatly available.
430 */
431 memcached_quit_server(instance, true);
432 return memcached_set_error(
433 *instance, MEMCACHED_CONNECTION_FAILURE, MEMCACHED_AT,
434 memcached_literal_param("::rec() returned zero, server has disconnected"));
435 }
436 instance->io_wait_count._bytes_read += data_read;
437 } while (data_read <= 0);
438
439 instance->io_bytes_sent = 0;
440 instance->read_buffer_length = (size_t) data_read;
441 instance->read_ptr = instance->read_buffer;
442
443 return MEMCACHED_SUCCESS;
444 }
445
446 memcached_return_t memcached_io_read(memcached_instance_st *instance, void *buffer, size_t length,
447 ssize_t &nread) {
448 assert(memcached_is_udp(instance->root) == false);
449 assert_msg(
450 instance,
451 "Programmer error, memcached_io_read() recieved an invalid Instance"); // Programmer error
452 char *buffer_ptr = static_cast<char *>(buffer);
453
454 if (instance->fd == INVALID_SOCKET) {
455 #if 0
456 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO), "Programmer error, invalid socket state");
457 #endif
458 return MEMCACHED_CONNECTION_FAILURE;
459 }
460
461 while (length) {
462 if (instance->read_buffer_length == 0) {
463 memcached_return_t io_fill_ret;
464 if (memcached_fatal(io_fill_ret = _io_fill(instance))) {
465 nread = -1;
466 return io_fill_ret;
467 }
468 }
469
470 if (length > 1) {
471 size_t difference =
472 (length > instance->read_buffer_length) ? instance->read_buffer_length : length;
473
474 memcpy(buffer_ptr, instance->read_ptr, difference);
475 length -= difference;
476 instance->read_ptr += difference;
477 instance->read_buffer_length -= difference;
478 buffer_ptr += difference;
479 } else {
480 *buffer_ptr = *instance->read_ptr;
481 instance->read_ptr++;
482 instance->read_buffer_length--;
483 buffer_ptr++;
484 break;
485 }
486 }
487
488 nread = ssize_t(buffer_ptr - (char *) buffer);
489
490 return MEMCACHED_SUCCESS;
491 }
492
493 memcached_return_t memcached_io_slurp(memcached_instance_st *instance) {
494 assert_msg(instance, "Programmer error, invalid Instance");
495 assert(memcached_is_udp(instance->root) == false);
496
497 if (instance->fd == INVALID_SOCKET) {
498 assert_msg(int(instance->state) <= int(MEMCACHED_SERVER_STATE_ADDRINFO),
499 "Invalid socket state");
500 return MEMCACHED_CONNECTION_FAILURE;
501 }
502
503 ssize_t data_read;
504 char buffer[MEMCACHED_MAX_BUFFER];
505 do {
506 data_read = ::recv(instance->fd, instance->read_buffer, sizeof(buffer), MSG_NOSIGNAL);
507 if (data_read == SOCKET_ERROR) {
508 switch (get_socket_errno()) {
509 case EINTR: // We just retry
510 continue;
511
512 case ETIMEDOUT: // OSX
513 #if EWOULDBLOCK != EAGAIN
514 case EWOULDBLOCK:
515 #endif
516 case EAGAIN:
517 #ifdef ERESTART
518 case ERESTART:
519 #endif
520 if (memcached_success(io_wait(instance, POLLIN))) {
521 continue;
522 }
523 return MEMCACHED_IN_PROGRESS;
524
525 /* fall through */
526
527 case ENOTCONN: // Programmer Error
528 case ENOTSOCK:
529 assert(0);
530 /* fall through */
531 case EBADF:
532 assert_msg(instance->fd != INVALID_SOCKET, "Invalid socket state");
533 /* fall through */
534 case EINVAL:
535 case EFAULT:
536 case ECONNREFUSED:
537 default:
538 return MEMCACHED_CONNECTION_FAILURE; // We want this!
539 }
540 }
541 } while (data_read > 0);
542
543 return MEMCACHED_CONNECTION_FAILURE;
544 }
545
546 static bool _io_write(memcached_instance_st *instance, const void *buffer, size_t length,
547 bool with_flush, size_t &written) {
548 assert(instance->fd != INVALID_SOCKET);
549 assert(memcached_is_udp(instance->root) == false);
550
551 const char *buffer_ptr = static_cast<const char *>(buffer);
552
553 const size_t original_length = length;
554
555 while (length) {
556 char *write_ptr;
557 size_t buffer_end = MEMCACHED_MAX_BUFFER;
558 size_t should_write = buffer_end - instance->write_buffer_offset;
559 should_write = (should_write < length) ? should_write : length;
560
561 write_ptr = instance->write_buffer + instance->write_buffer_offset;
562 memcpy(write_ptr, buffer_ptr, should_write);
563 instance->write_buffer_offset += should_write;
564 buffer_ptr += should_write;
565 length -= should_write;
566
567 if (instance->write_buffer_offset == buffer_end) {
568 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
569
570 memcached_return_t rc;
571 if (io_flush(instance, with_flush, rc) == false) {
572 written = original_length - length;
573 return false;
574 }
575 }
576 }
577
578 if (with_flush) {
579 memcached_return_t rc;
580 WATCHPOINT_ASSERT(instance->fd != INVALID_SOCKET);
581 if (io_flush(instance, with_flush, rc) == false) {
582 written = original_length - length;
583 return false;
584 }
585 }
586
587 written = original_length - length;
588
589 return true;
590 }
591
592 bool memcached_io_write(memcached_instance_st *instance) {
593 size_t written;
594 return _io_write(instance, NULL, 0, true, written);
595 }
596
597 ssize_t memcached_io_write(memcached_instance_st *instance, const void *buffer, const size_t length,
598 const bool with_flush) {
599 size_t written;
600
601 if (_io_write(instance, buffer, length, with_flush, written) == false) {
602 return -1;
603 }
604
605 return ssize_t(written);
606 }
607
608 bool memcached_io_writev(memcached_instance_st *instance, libmemcached_io_vector_st vector[],
609 const size_t number_of, const bool with_flush) {
610 ssize_t complete_total = 0;
611 ssize_t total = 0;
612
613 for (size_t x = 0; x < number_of; x++, vector++) {
614 complete_total += vector->length;
615 if (vector->length) {
616 size_t written;
617 if ((_io_write(instance, vector->buffer, vector->length, false, written)) == false) {
618 return false;
619 }
620 total += written;
621 }
622 }
623
624 if (with_flush) {
625 if (memcached_io_write(instance) == false) {
626 return false;
627 }
628 }
629
630 return (complete_total == total);
631 }
632
633 void memcached_instance_st::start_close_socket() {
634 if (fd != INVALID_SOCKET) {
635 shutdown(fd, SHUT_WR);
636 options.is_shutting_down = true;
637 }
638 }
639
640 void memcached_instance_st::reset_socket() {
641 if (fd != INVALID_SOCKET) {
642 (void) closesocket(fd);
643 fd = INVALID_SOCKET;
644 }
645 }
646
647 void memcached_instance_st::close_socket() {
648 if (fd != INVALID_SOCKET) {
649 int shutdown_options = SHUT_RD;
650 if (options.is_shutting_down == false) {
651 shutdown_options = SHUT_RDWR;
652 }
653
654 /* in case of death shutdown to avoid blocking at close() */
655 if (shutdown(fd, shutdown_options) == SOCKET_ERROR and get_socket_errno() != ENOTCONN) {
656 WATCHPOINT_NUMBER(fd);
657 WATCHPOINT_ERRNO(get_socket_errno());
658 WATCHPOINT_ASSERT(get_socket_errno());
659 }
660
661 reset_socket();
662 state = MEMCACHED_SERVER_STATE_NEW;
663 }
664
665 state = MEMCACHED_SERVER_STATE_NEW;
666 cursor_active_ = 0;
667 io_bytes_sent = 0;
668 write_buffer_offset = size_t(root and memcached_is_udp(root) ? UDP_DATAGRAM_HEADER_LENGTH : 0);
669 read_buffer_length = 0;
670 read_ptr = read_buffer;
671 options.is_shutting_down = false;
672 memcached_server_response_reset(this);
673
674 // We reset the version so that if we end up talking to a different server
675 // we don't have stale server version information.
676 major_version = minor_version = micro_version = UINT8_MAX;
677 }
678
679 memcached_instance_st *memcached_io_get_readable_server(Memcached *memc, memcached_return_t &) {
680 #define MAX_SERVERS_TO_POLL 100
681 struct pollfd fds[MAX_SERVERS_TO_POLL];
682 nfds_t host_index = 0;
683
684 for (uint32_t x = 0; x < memcached_server_count(memc) and host_index < MAX_SERVERS_TO_POLL; ++x) {
685 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
686
687 if (instance->read_buffer_length > 0) /* I have data in the buffer */ {
688 return instance;
689 }
690
691 if (instance->response_count() > 0) {
692 fds[host_index].events = POLLIN;
693 fds[host_index].revents = 0;
694 fds[host_index].fd = instance->fd;
695 ++host_index;
696 }
697 }
698
699 if (host_index < 2) {
700 /* We have 0 or 1 server with pending events.. */
701 for (uint32_t x = 0; x < memcached_server_count(memc); ++x) {
702 memcached_instance_st *instance = memcached_instance_fetch(memc, x);
703
704 if (instance->response_count() > 0) {
705 return instance;
706 }
707 }
708
709 return NULL;
710 }
711
712 int error = poll(fds, host_index, memc->poll_timeout);
713 switch (error) {
714 case -1:
715 memcached_set_errno(*memc, get_socket_errno(), MEMCACHED_AT);
716 /* FALLTHROUGH */
717 case 0:
718 break;
719
720 default:
721 for (nfds_t x = 0; x < host_index; ++x) {
722 if (fds[x].revents & POLLIN) {
723 for (uint32_t y = 0; y < memcached_server_count(memc); ++y) {
724 memcached_instance_st *instance = memcached_instance_fetch(memc, y);
725
726 if (instance->fd == fds[x].fd) {
727 return instance;
728 }
729 }
730 }
731 }
732 }
733
734 return NULL;
735 }
736
737 /*
738 Eventually we will just kill off the server with the problem.
739 */
740 void memcached_io_reset(memcached_instance_st *instance) {
741 memcached_quit_server(instance, true);
742 }
743
744 /**
745 * Read a given number of bytes from the server and place it into a specific
746 * buffer. Reset the IO channel on this server if an error occurs.
747 */
748 memcached_return_t memcached_safe_read(memcached_instance_st *instance, void *dta,
749 const size_t size) {
750 size_t offset = 0;
751 char *data = static_cast<char *>(dta);
752
753 while (offset < size) {
754 ssize_t nread;
755 memcached_return_t rc;
756
757 while (
758 memcached_continue(rc = memcached_io_read(instance, data + offset, size - offset, nread))) {
759 };
760
761 if (memcached_failed(rc)) {
762 return rc;
763 }
764
765 offset += size_t(nread);
766 }
767
768 return MEMCACHED_SUCCESS;
769 }
770
771 memcached_return_t memcached_io_readline(memcached_instance_st *instance, char *buffer_ptr,
772 size_t size, size_t &total_nr) {
773 total_nr = 0;
774 bool line_complete = false;
775
776 while (line_complete == false) {
777 if (instance->read_buffer_length == 0) {
778 /*
779 * We don't have any data in the buffer, so let's fill the read
780 * buffer. Call the standard read function to avoid duplicating
781 * the logic.
782 */
783 ssize_t nread;
784 memcached_return_t rc = memcached_io_read(instance, buffer_ptr, 1, nread);
785 if (memcached_failed(rc) and rc == MEMCACHED_IN_PROGRESS) {
786 memcached_quit_server(instance, true);
787 return memcached_set_error(*instance, rc, MEMCACHED_AT);
788 } else if (memcached_failed(rc)) {
789 return rc;
790 }
791
792 if (*buffer_ptr == '\n') {
793 line_complete = true;
794 }
795
796 ++buffer_ptr;
797 ++total_nr;
798 }
799
800 /* Now let's look in the buffer and copy as we go! */
801 while (instance->read_buffer_length and total_nr < size and line_complete == false) {
802 *buffer_ptr = *instance->read_ptr;
803 if (*buffer_ptr == '\n') {
804 line_complete = true;
805 }
806 --instance->read_buffer_length;
807 ++instance->read_ptr;
808 ++total_nr;
809 ++buffer_ptr;
810 }
811
812 if (total_nr == size) {
813 return MEMCACHED_PROTOCOL_ERROR;
814 }
815 }
816
817 return MEMCACHED_SUCCESS;
818 }