IPV6 support, plus cleanup around consistent hashing.
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write)
11 {
12 struct pollfd fds[1];
13 short flags= 0;
14 struct timespec timer;
15
16 timer.tv_sec= 1;
17 timer.tv_nsec= 0;
18 if (read_or_write)
19 flags= POLLOUT | POLLERR;
20 else
21 flags= POLLIN | POLLERR;
22
23 memset(&fds, 0, sizeof(struct pollfd));
24 fds[0].fd= ptr->hosts[server_key].fd;
25 fds[0].events= flags;
26
27 if (poll(fds, 1, ptr->poll_timeout) < 0)
28 return MEMCACHED_FAILURE;
29
30 return MEMCACHED_SUCCESS;
31 }
32
33 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
34 char *buffer, size_t length)
35 {
36 char *buffer_ptr;
37
38 buffer_ptr= buffer;
39
40 while (length)
41 {
42 if (!ptr->hosts[server_key].read_buffer_length)
43 {
44 size_t data_read;
45
46 while (1)
47 {
48 if (ptr->flags & MEM_NO_BLOCK)
49 {
50 memcached_return rc;
51
52 rc= io_wait(ptr, server_key, 0);
53 if (rc != MEMCACHED_SUCCESS)
54 return -1;
55 }
56
57 data_read= read(ptr->hosts[server_key].fd,
58 ptr->hosts[server_key].read_buffer,
59 MEMCACHED_MAX_BUFFER);
60 if (data_read == -1)
61 {
62 switch (errno)
63 {
64 case EAGAIN:
65 break;
66 default:
67 {
68 ptr->cached_errno= errno;
69 return -1;
70 }
71 }
72 }
73 else if (data_read)
74 break;
75 /* If zero, just keep looping */
76 }
77
78 ptr->hosts[server_key].read_buffer_length= data_read;
79 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
80 }
81
82 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
83 length--;
84 ptr->hosts[server_key].read_ptr++;
85 ptr->hosts[server_key].read_buffer_length--;
86 buffer_ptr++;
87 }
88
89 return (size_t)(buffer_ptr - buffer);
90 }
91
92 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
93 char *buffer, size_t length, char with_flush)
94 {
95 unsigned long long x;
96
97 for (x= 0; x < length; x++)
98 {
99 if (ptr->hosts[server_key].write_ptr == 0)
100 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
101 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
102 *ptr->hosts[server_key].write_ptr= buffer[x];
103 ptr->hosts[server_key].write_ptr++;
104 ptr->hosts[server_key].write_buffer_offset++;
105
106 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
107 {
108 size_t sent_length;
109
110 sent_length= memcached_io_flush(ptr, server_key);
111
112 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
113 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
114 ptr->hosts[server_key].write_buffer_offset= 0;
115 }
116 }
117
118 if (with_flush)
119 {
120 if (memcached_io_flush(ptr, server_key) == -1)
121 return -1;
122 }
123
124 return length;
125 }
126
127 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
128 {
129 size_t sent_length;
130 size_t return_length;
131 char *write_ptr= ptr->hosts[server_key].write_buffer;
132 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
133 unsigned int loop= 1;
134
135 if (ptr->hosts[server_key].write_buffer_offset == 0)
136 return 0;
137
138 return_length= 0;
139 while (write_length)
140 {
141 if (ptr->flags & MEM_NO_BLOCK)
142 {
143 memcached_return rc;
144
145 rc= io_wait(ptr, server_key, 1);
146 if (rc != MEMCACHED_SUCCESS)
147 return -1;
148 }
149
150 sent_length= 0;
151 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
152 {
153 sent_length= sendto(ptr->hosts[server_key].fd,
154 write_ptr, write_length, 0,
155 (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
156 sizeof(struct sockaddr));
157 }
158 else
159 {
160 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
161 write_length)) == -1)
162 {
163 switch (errno)
164 {
165 case ENOBUFS:
166 case EAGAIN:
167 WATCHPOINT;
168 continue;
169 if (loop < 100)
170 {
171 loop++;
172 break;
173 }
174 /* Yes, we want to fall through */
175 default:
176 ptr->cached_errno= errno;
177 return -1;
178 }
179 }
180 }
181
182 write_ptr+= sent_length;
183 write_length-= sent_length;
184 return_length+= sent_length;
185 }
186
187 WATCHPOINT_ASSERT(write_length == 0);
188 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
189 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
190 ptr->hosts[server_key].write_buffer_offset= 0;
191
192 return return_length;
193 }
194
195 /*
196 Eventually we will just kill off the server with the problem.
197 */
198 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
199 {
200 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
201 ptr->hosts[server_key].write_buffer_offset= 0;
202 memcached_quit(ptr);
203 }