Fix for platform poll() return values.
[awesomized/libmemcached] / libmemcached / io.c
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Server IO, Not public!
9 *
10 */
11
12
13 #include "common.h"
14 #include <sys/select.h>
15 #include <poll.h>
16
17 typedef enum {
18 MEM_READ,
19 MEM_WRITE
20 } memc_read_or_write;
21
22 static ssize_t io_flush(memcached_server_instance_st *ptr, memcached_return_t *error);
23 static void increment_udp_message_id(memcached_server_instance_st *ptr);
24
25 static memcached_return_t io_wait(memcached_server_instance_st *ptr,
26 memc_read_or_write read_or_write)
27 {
28 struct pollfd fds= {
29 .fd= ptr->fd,
30 .events = POLLIN
31 };
32 int error;
33
34 unlikely (read_or_write == MEM_WRITE) /* write */
35 fds.events= POLLOUT;
36
37 /*
38 ** We are going to block on write, but at least on Solaris we might block
39 ** on write if we haven't read anything from our input buffer..
40 ** Try to purge the input buffer if we don't do any flow control in the
41 ** application layer (just sending a lot of data etc)
42 ** The test is moved down in the purge function to avoid duplication of
43 ** the test.
44 */
45 if (read_or_write == MEM_WRITE)
46 {
47 memcached_return_t rc= memcached_purge(ptr);
48 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
49 return MEMCACHED_FAILURE;
50 }
51
52 int timeout= ptr->root->poll_timeout;
53 if (ptr->root->flags.no_block == false)
54 timeout= -1;
55
56 size_t loop_max= 5;
57 while (--loop_max)
58 {
59 error= poll(&fds, 1, timeout);
60
61 switch (error)
62 {
63 case 1:
64 return MEMCACHED_SUCCESS;
65 case 0:
66 return MEMCACHED_TIMEOUT;
67 #if TARGET_OS_LINUX
68 case ERESTART:
69 #endif
70 case EINTR:
71 continue;
72 default:
73 ptr->cached_errno= error;
74 memcached_quit_server(ptr, 1);
75
76 return MEMCACHED_FAILURE;
77 }
78 }
79
80 /* Imposssible for anything other then -1 */
81 WATCHPOINT_ASSERT(error == -1);
82 ptr->cached_errno= error;
83 memcached_quit_server(ptr, 1);
84
85 return MEMCACHED_FAILURE;
86 }
87
88 /**
89 * Try to fill the input buffer for a server with as much
90 * data as possible.
91 *
92 * @param ptr the server to pack
93 */
94 static bool repack_input_buffer(memcached_server_instance_st *ptr)
95 {
96 if (ptr->read_ptr != ptr->read_buffer)
97 {
98 /* Move all of the data to the beginning of the buffer so
99 ** that we can fit more data into the buffer...
100 */
101 memmove(ptr->read_buffer, ptr->read_ptr, ptr->read_buffer_length);
102 ptr->read_ptr= ptr->read_buffer;
103 ptr->read_data_length= ptr->read_buffer_length;
104 }
105
106 /* There is room in the buffer, try to fill it! */
107 if (ptr->read_buffer_length != MEMCACHED_MAX_BUFFER)
108 {
109 /* Just try a single read to grab what's available */
110 ssize_t nr= read(ptr->fd,
111 ptr->read_ptr + ptr->read_data_length,
112 MEMCACHED_MAX_BUFFER - ptr->read_data_length);
113
114 if (nr > 0)
115 {
116 ptr->read_data_length+= (size_t)nr;
117 ptr->read_buffer_length+= (size_t)nr;
118 return true;
119 }
120 }
121 return false;
122 }
123
124 /**
125 * If the we have callbacks connected to this server structure
126 * we may start process the input queue and fire the callbacks
127 * for the incomming messages. This function is _only_ called
128 * when the input buffer is full, so that we _know_ that we have
129 * at least _one_ message to process.
130 *
131 * @param ptr the server to star processing iput messages for
132 * @return true if we processed anything, false otherwise
133 */
134 static bool process_input_buffer(memcached_server_instance_st *ptr)
135 {
136 /*
137 ** We might be able to process some of the response messages if we
138 ** have a callback set up
139 */
140 if (ptr->root->callbacks != NULL && ptr->root->flags.use_udp == false)
141 {
142 /*
143 * We might have responses... try to read them out and fire
144 * callbacks
145 */
146 memcached_callback_st cb= *ptr->root->callbacks;
147
148 memcached_set_processing_input((memcached_st *)ptr->root, true);
149
150 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
151 memcached_return_t error;
152 memcached_st *root= (memcached_st *)ptr->root;
153 error= memcached_response(ptr, buffer, sizeof(buffer),
154 &root->result);
155
156 memcached_set_processing_input(root, false);
157
158 if (error == MEMCACHED_SUCCESS)
159 {
160 for (unsigned int x= 0; x < cb.number_of_callback; x++)
161 {
162 error= (*cb.callback[x])(ptr->root, &root->result, cb.context);
163 if (error != MEMCACHED_SUCCESS)
164 break;
165 }
166
167 /* @todo what should I do with the error message??? */
168 }
169 /* @todo what should I do with other error messages?? */
170 return true;
171 }
172
173 return false;
174 }
175
176 static inline void memcached_io_cork_push(memcached_server_st *ptr)
177 {
178 (void)ptr;
179 #ifdef CORK
180 if (ptr->root->flags.cork == false || ptr->state.is_corked)
181 return;
182
183 int enable= 1;
184 int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
185 &enable, (socklen_t)sizeof(int));
186 if (! err)
187 ptr->state.is_corked= true;
188
189 WATCHPOINT_ASSERT(ptr->state.is_corked == true);
190 #endif
191 }
192
193 static inline void memcached_io_cork_pop(memcached_server_st *ptr)
194 {
195 (void)ptr;
196 #ifdef CORK
197 if (ptr->root->flags.cork == false || ptr->state.is_corked == false)
198 return;
199
200 int enable= 0;
201 int err= setsockopt(ptr->fd, IPPROTO_TCP, CORK,
202 &enable, (socklen_t)sizeof(int));
203 if (! err)
204 ptr->state.is_corked= false;
205
206 WATCHPOINT_ASSERT(ptr->state.is_corked == false);
207 #endif
208 }
209
210 #if 0 // Dead code, this should be removed.
211 void memcached_io_preread(memcached_st *ptr)
212 {
213 unsigned int x;
214
215 return;
216
217 for (x= 0; x < memcached_server_count(ptr); x++)
218 {
219 if (memcached_server_response_count(ptr, x) &&
220 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
221 {
222 size_t data_read;
223
224 data_read= read(ptr->hosts[x].fd,
225 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
226 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
227 if (data_read == -1)
228 continue;
229
230 ptr->hosts[x].read_buffer_length+= data_read;
231 ptr->hosts[x].read_data_length+= data_read;
232 }
233 }
234 }
235 #endif
236
237 memcached_return_t memcached_io_read(memcached_server_instance_st *ptr,
238 void *buffer, size_t length, ssize_t *nread)
239 {
240 char *buffer_ptr;
241
242 buffer_ptr= buffer;
243
244 while (length)
245 {
246 if (!ptr->read_buffer_length)
247 {
248 ssize_t data_read;
249
250 while (1)
251 {
252 data_read= read(ptr->fd, ptr->read_buffer, MEMCACHED_MAX_BUFFER);
253 if (data_read > 0)
254 break;
255 else if (data_read == -1)
256 {
257 ptr->cached_errno= errno;
258 memcached_return_t rc= MEMCACHED_UNKNOWN_READ_FAILURE;
259 switch (errno)
260 {
261 case EAGAIN:
262 case EINTR:
263 #if TARGET_OS_LINUX
264 case ERESTART:
265 #endif
266 if ((rc= io_wait(ptr, MEM_READ)) == MEMCACHED_SUCCESS)
267 continue;
268 /* fall through */
269
270 default:
271 {
272 memcached_quit_server(ptr, 1);
273 *nread= -1;
274 return rc;
275 }
276 }
277 }
278 else
279 {
280 /*
281 EOF. Any data received so far is incomplete
282 so discard it. This always reads by byte in case of TCP
283 and protocol enforcement happens at memcached_response()
284 looking for '\n'. We do not care for UDB which requests 8 bytes
285 at once. Generally, this means that connection went away. Since
286 for blocking I/O we do not return 0 and for non-blocking case
287 it will return EGAIN if data is not immediatly available.
288 */
289 memcached_quit_server(ptr, 1);
290 *nread= -1;
291 return MEMCACHED_UNKNOWN_READ_FAILURE;
292 }
293 }
294
295 ptr->io_bytes_sent = 0;
296 ptr->read_data_length= (size_t) data_read;
297 ptr->read_buffer_length= (size_t) data_read;
298 ptr->read_ptr= ptr->read_buffer;
299 }
300
301 if (length > 1)
302 {
303 size_t difference;
304
305 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
306
307 memcpy(buffer_ptr, ptr->read_ptr, difference);
308 length -= difference;
309 ptr->read_ptr+= difference;
310 ptr->read_buffer_length-= difference;
311 buffer_ptr+= difference;
312 }
313 else
314 {
315 *buffer_ptr= *ptr->read_ptr;
316 ptr->read_ptr++;
317 ptr->read_buffer_length--;
318 buffer_ptr++;
319 break;
320 }
321 }
322
323 ptr->server_failure_counter= 0;
324 *nread = (ssize_t)(buffer_ptr - (char*)buffer);
325 return MEMCACHED_SUCCESS;
326 }
327
328 ssize_t memcached_io_write(memcached_server_instance_st *ptr,
329 const void *buffer, size_t length, bool with_flush)
330 {
331 size_t original_length;
332 const char* buffer_ptr;
333
334 WATCHPOINT_ASSERT(ptr->fd != -1);
335
336 original_length= length;
337 buffer_ptr= buffer;
338
339 /* more writable data is coming if a flush isn't required, so delay send */
340 if (! with_flush)
341 {
342 memcached_io_cork_push(ptr);
343 }
344
345 while (length)
346 {
347 char *write_ptr;
348 size_t should_write;
349 size_t buffer_end;
350
351 if (ptr->type == MEMCACHED_CONNECTION_UDP)
352 {
353 //UDP does not support partial writes
354 buffer_end= MAX_UDP_DATAGRAM_LENGTH;
355 should_write= length;
356 if (ptr->write_buffer_offset + should_write > buffer_end)
357 return -1;
358 }
359 else
360 {
361 buffer_end= MEMCACHED_MAX_BUFFER;
362 should_write= buffer_end - ptr->write_buffer_offset;
363 should_write= (should_write < length) ? should_write : length;
364 }
365
366 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
367 memcpy(write_ptr, buffer_ptr, should_write);
368 ptr->write_buffer_offset+= should_write;
369 buffer_ptr+= should_write;
370 length-= should_write;
371
372 if (ptr->write_buffer_offset == buffer_end && ptr->type != MEMCACHED_CONNECTION_UDP)
373 {
374 memcached_return_t rc;
375 ssize_t sent_length;
376
377 WATCHPOINT_ASSERT(ptr->fd != -1);
378 sent_length= io_flush(ptr, &rc);
379 if (sent_length == -1)
380 return -1;
381
382 /* If io_flush calls memcached_purge, sent_length may be 0 */
383 unlikely (sent_length != 0)
384 {
385 WATCHPOINT_ASSERT(sent_length == (ssize_t)buffer_end);
386 }
387 }
388 }
389
390 if (with_flush)
391 {
392 memcached_return_t rc;
393 WATCHPOINT_ASSERT(ptr->fd != -1);
394 if (io_flush(ptr, &rc) == -1)
395 {
396 return -1;
397 }
398
399 memcached_io_cork_pop(ptr);
400 }
401
402 return (ssize_t) original_length;
403 }
404
405 memcached_return_t memcached_io_close(memcached_server_instance_st *ptr)
406 {
407 if (ptr->fd == -1)
408 {
409 return MEMCACHED_SUCCESS;
410 }
411
412 /* in case of death shutdown to avoid blocking at close() */
413 if (shutdown(ptr->fd, SHUT_RDWR) == -1 && errno != ENOTCONN)
414 {
415 WATCHPOINT_NUMBER(ptr->fd);
416 WATCHPOINT_ERRNO(errno);
417 WATCHPOINT_ASSERT(errno);
418 }
419
420 if (close(ptr->fd) == -1)
421 {
422 WATCHPOINT_ERRNO(errno);
423 }
424
425 return MEMCACHED_SUCCESS;
426 }
427
428 memcached_server_instance_st *memcached_io_get_readable_server(memcached_st *memc)
429 {
430 #define MAX_SERVERS_TO_POLL 100
431 struct pollfd fds[MAX_SERVERS_TO_POLL];
432 unsigned int host_index= 0;
433
434 for (uint32_t x= 0;
435 x< memcached_server_count(memc) && host_index < MAX_SERVERS_TO_POLL;
436 ++x)
437 {
438 memcached_server_instance_st *instance=
439 memcached_server_instance_fetch(memc, x);
440
441 if (instance->read_buffer_length > 0) /* I have data in the buffer */
442 return instance;
443
444 if (memcached_server_response_count(instance) > 0)
445 {
446 fds[host_index].events = POLLIN;
447 fds[host_index].revents = 0;
448 fds[host_index].fd = instance->fd;
449 ++host_index;
450 }
451 }
452
453 if (host_index < 2)
454 {
455 /* We have 0 or 1 server with pending events.. */
456 for (uint32_t x= 0; x< memcached_server_count(memc); ++x)
457 {
458 memcached_server_instance_st *instance=
459 memcached_server_instance_fetch(memc, x);
460
461 if (memcached_server_response_count(instance) > 0)
462 {
463 return instance;
464 }
465 }
466
467 return NULL;
468 }
469
470 int err= poll(fds, host_index, memc->poll_timeout);
471 switch (err) {
472 case -1:
473 memc->cached_errno = errno;
474 /* FALLTHROUGH */
475 case 0:
476 break;
477 default:
478 for (size_t x= 0; x < host_index; ++x)
479 {
480 if (fds[x].revents & POLLIN)
481 {
482 for (uint32_t y= 0; y < memcached_server_count(memc); ++y)
483 {
484 memcached_server_instance_st *instance=
485 memcached_server_instance_fetch(memc, y);
486
487 if (instance->fd == fds[x].fd)
488 return instance;
489 }
490 }
491 }
492 }
493
494 return NULL;
495 }
496
497 static ssize_t io_flush(memcached_server_instance_st *ptr,
498 memcached_return_t *error)
499 {
500 /*
501 ** We might want to purge the input buffer if we haven't consumed
502 ** any output yet... The test for the limits is the purge is inline
503 ** in the purge function to avoid duplicating the logic..
504 */
505 {
506 memcached_return_t rc;
507 WATCHPOINT_ASSERT(ptr->fd != -1);
508 rc= memcached_purge(ptr);
509
510 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
511 return -1;
512 }
513 ssize_t sent_length;
514 size_t return_length;
515 char *local_write_ptr= ptr->write_buffer;
516 size_t write_length= ptr->write_buffer_offset;
517
518 *error= MEMCACHED_SUCCESS;
519
520 WATCHPOINT_ASSERT(ptr->fd != -1);
521
522 // UDP Sanity check, make sure that we are not sending somthing too big
523 if (ptr->type == MEMCACHED_CONNECTION_UDP && write_length > MAX_UDP_DATAGRAM_LENGTH)
524 return -1;
525
526 if (ptr->write_buffer_offset == 0 || (ptr->type == MEMCACHED_CONNECTION_UDP
527 && ptr->write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH))
528 return 0;
529
530 /* Looking for memory overflows */
531 #if defined(DEBUG)
532 if (write_length == MEMCACHED_MAX_BUFFER)
533 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
534 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
535 #endif
536
537 return_length= 0;
538 while (write_length)
539 {
540 WATCHPOINT_ASSERT(ptr->fd != -1);
541 WATCHPOINT_ASSERT(write_length > 0);
542 sent_length= 0;
543 if (ptr->type == MEMCACHED_CONNECTION_UDP)
544 increment_udp_message_id(ptr);
545 sent_length= write(ptr->fd, local_write_ptr, write_length);
546
547 if (sent_length == -1)
548 {
549 ptr->cached_errno= errno;
550 switch (errno)
551 {
552 case ENOBUFS:
553 continue;
554 case EAGAIN:
555 {
556 /*
557 * We may be blocked on write because the input buffer
558 * is full. Let's check if we have room in our input
559 * buffer for more data and retry the write before
560 * waiting..
561 */
562 if (repack_input_buffer(ptr) ||
563 process_input_buffer(ptr))
564 continue;
565
566 memcached_return_t rc;
567 rc= io_wait(ptr, MEM_WRITE);
568
569 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
570 continue;
571
572 memcached_quit_server(ptr, 1);
573 return -1;
574 }
575 default:
576 memcached_quit_server(ptr, 1);
577 *error= MEMCACHED_ERRNO;
578 return -1;
579 }
580 }
581
582 if (ptr->type == MEMCACHED_CONNECTION_UDP &&
583 (size_t)sent_length != write_length)
584 {
585 memcached_quit_server(ptr, 1);
586 return -1;
587 }
588
589 ptr->io_bytes_sent += (uint32_t) sent_length;
590
591 local_write_ptr+= sent_length;
592 write_length-= (uint32_t) sent_length;
593 return_length+= (uint32_t) sent_length;
594 }
595
596 WATCHPOINT_ASSERT(write_length == 0);
597 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
598 // ptr->write_buffer_offset);
599
600 // if we are a udp server, the begining of the buffer is reserverd for
601 // the upd frame header
602 if (ptr->type == MEMCACHED_CONNECTION_UDP)
603 ptr->write_buffer_offset= UDP_DATAGRAM_HEADER_LENGTH;
604 else
605 ptr->write_buffer_offset= 0;
606
607 return (ssize_t) return_length;
608 }
609
610 /*
611 Eventually we will just kill off the server with the problem.
612 */
613 void memcached_io_reset(memcached_server_instance_st *ptr)
614 {
615 memcached_quit_server(ptr, 1);
616 }
617
618 /**
619 * Read a given number of bytes from the server and place it into a specific
620 * buffer. Reset the IO channel on this server if an error occurs.
621 */
622 memcached_return_t memcached_safe_read(memcached_server_instance_st *ptr,
623 void *dta,
624 size_t size)
625 {
626 size_t offset= 0;
627 char *data= dta;
628
629 while (offset < size)
630 {
631 ssize_t nread;
632 memcached_return_t rc= memcached_io_read(ptr, data + offset, size - offset,
633 &nread);
634 if (rc != MEMCACHED_SUCCESS)
635 return rc;
636
637 offset+= (size_t) nread;
638 }
639
640 return MEMCACHED_SUCCESS;
641 }
642
643 memcached_return_t memcached_io_readline(memcached_server_instance_st *ptr,
644 char *buffer_ptr,
645 size_t size)
646 {
647 bool line_complete= false;
648 size_t total_nr= 0;
649
650 while (!line_complete)
651 {
652 if (ptr->read_buffer_length == 0)
653 {
654 /*
655 * We don't have any data in the buffer, so let's fill the read
656 * buffer. Call the standard read function to avoid duplicating
657 * the logic.
658 */
659 ssize_t nread;
660 memcached_return_t rc= memcached_io_read(ptr, buffer_ptr, 1, &nread);
661 if (rc != MEMCACHED_SUCCESS)
662 return rc;
663
664 if (*buffer_ptr == '\n')
665 line_complete= true;
666
667 ++buffer_ptr;
668 ++total_nr;
669 }
670
671 /* Now let's look in the buffer and copy as we go! */
672 while (ptr->read_buffer_length && total_nr < size && !line_complete)
673 {
674 *buffer_ptr = *ptr->read_ptr;
675 if (*buffer_ptr == '\n')
676 line_complete = true;
677 --ptr->read_buffer_length;
678 ++ptr->read_ptr;
679 ++total_nr;
680 ++buffer_ptr;
681 }
682
683 if (total_nr == size)
684 return MEMCACHED_PROTOCOL_ERROR;
685 }
686
687 return MEMCACHED_SUCCESS;
688 }
689
690 /*
691 * The udp request id consists of two seperate sections
692 * 1) The thread id
693 * 2) The message number
694 * The thread id should only be set when the memcached_st struct is created
695 * and should not be changed.
696 *
697 * The message num is incremented for each new message we send, this function
698 * extracts the message number from message_id, increments it and then
699 * writes the new value back into the header
700 */
701 static void increment_udp_message_id(memcached_server_instance_st *ptr)
702 {
703 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
704 uint16_t cur_req= get_udp_datagram_request_id(header);
705 int msg_num= get_msg_num_from_request_id(cur_req);
706 int thread_id= get_thread_id_from_request_id(cur_req);
707
708 if (((++msg_num) & UDP_REQUEST_ID_THREAD_MASK) != 0)
709 msg_num= 0;
710
711 header->request_id= htons((uint16_t) (thread_id | msg_num));
712 }
713
714 memcached_return_t memcached_io_init_udp_header(memcached_server_instance_st *ptr, uint16_t thread_id)
715 {
716 if (thread_id > UDP_REQUEST_ID_MAX_THREAD_ID)
717 return MEMCACHED_FAILURE;
718
719 struct udp_datagram_header_st *header= (struct udp_datagram_header_st *)ptr->write_buffer;
720 header->request_id= htons((uint16_t) (generate_udp_request_thread_id(thread_id)));
721 header->num_datagrams= htons(1);
722 header->sequence_number= htons(0);
723
724 return MEMCACHED_SUCCESS;
725 }