Fix for most errors around non-block
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 /*
34 ** We are going to block on write, but at least on Solaris we might block
35 ** on write if we haven't read anything from our input buffer..
36 ** Try to purge the input buffer if we don't do any flow control in the
37 ** application layer (just sending a lot of data etc)
38 ** The test is moved down in the purge function to avoid duplication of
39 ** the test.
40 */
41 if (read_or_write == MEM_WRITE)
42 memcached_purge(ptr);
43
44 error= poll(fds, 1, ptr->root->poll_timeout);
45
46 if (error == 1)
47 return MEMCACHED_SUCCESS;
48 else if (error == 0)
49 {
50 return MEMCACHED_TIMEOUT;
51 }
52
53 /* Imposssible for anything other then -1 */
54 WATCHPOINT_ASSERT(error == -1);
55 memcached_quit_server(ptr, 1);
56
57 return MEMCACHED_FAILURE;
58
59 }
60
61 #ifdef UNUSED
62 void memcached_io_preread(memcached_st *ptr)
63 {
64 unsigned int x;
65
66 return;
67
68 for (x= 0; x < ptr->number_of_hosts; x++)
69 {
70 if (memcached_server_response_count(ptr, x) &&
71 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
72 {
73 size_t data_read;
74
75 data_read= read(ptr->hosts[x].fd,
76 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
77 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
78 if (data_read == -1)
79 continue;
80
81 ptr->hosts[x].read_buffer_length+= data_read;
82 ptr->hosts[x].read_data_length+= data_read;
83 }
84 }
85 }
86 #endif
87
88 ssize_t memcached_io_read(memcached_server_st *ptr,
89 void *buffer, size_t length)
90 {
91 char *buffer_ptr;
92
93 buffer_ptr= buffer;
94
95 while (length)
96 {
97 uint8_t found_eof= 0;
98 if (!ptr->read_buffer_length)
99 {
100 ssize_t data_read;
101
102 while (1)
103 {
104 data_read= read(ptr->fd,
105 ptr->read_buffer,
106 MEMCACHED_MAX_BUFFER);
107 if (data_read > 0)
108 break;
109 else if (data_read == -1)
110 {
111 ptr->cached_errno= errno;
112 switch (errno)
113 {
114 case EAGAIN:
115 case EINTR:
116 {
117 memcached_return rc;
118
119 rc= io_wait(ptr, MEM_READ);
120
121 if (rc == MEMCACHED_SUCCESS)
122 continue;
123 }
124 /* fall trough */
125 default:
126 {
127 memcached_quit_server(ptr, 1);
128 return -1;
129 }
130 }
131 }
132 else
133 {
134 found_eof= 1;
135 break;
136 }
137 }
138
139 ptr->io_bytes_sent = 0;
140 ptr->read_data_length= data_read;
141 ptr->read_buffer_length= data_read;
142 ptr->read_ptr= ptr->read_buffer;
143 }
144
145 if (length > 1)
146 {
147 size_t difference;
148
149 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
150
151 memcpy(buffer_ptr, ptr->read_ptr, difference);
152 length -= difference;
153 ptr->read_ptr+= difference;
154 ptr->read_buffer_length-= difference;
155 buffer_ptr+= difference;
156 }
157 else
158 {
159 *buffer_ptr= *ptr->read_ptr;
160 ptr->read_ptr++;
161 ptr->read_buffer_length--;
162 buffer_ptr++;
163 break;
164 }
165
166 if (found_eof)
167 break;
168 }
169
170 return (size_t)(buffer_ptr - (char*)buffer);
171 }
172
173 ssize_t memcached_io_write(memcached_server_st *ptr,
174 const void *buffer, size_t length, char with_flush)
175 {
176 size_t original_length;
177 const char* buffer_ptr;
178
179 original_length= length;
180 buffer_ptr= buffer;
181
182 while (length)
183 {
184 char *write_ptr;
185 size_t should_write;
186
187 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
188 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
189
190 should_write= (should_write < length) ? should_write : length;
191
192 memcpy(write_ptr, buffer_ptr, should_write);
193 ptr->write_buffer_offset+= should_write;
194 buffer_ptr+= should_write;
195 length-= should_write;
196
197 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
198 {
199 memcached_return rc;
200 ssize_t sent_length;
201
202 sent_length= io_flush(ptr, &rc);
203 if (sent_length == -1)
204 return -1;
205
206 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
207 }
208 }
209
210 if (with_flush)
211 {
212 memcached_return rc;
213 if (io_flush(ptr, &rc) == -1)
214 return -1;
215 }
216
217 return original_length;
218 }
219
220 memcached_return memcached_io_close(memcached_server_st *ptr)
221 {
222 int r;
223
224 if (ptr->fd == -1)
225 return MEMCACHED_SUCCESS;
226
227 /* in case of death shutdown to avoid blocking at close() */
228 if (1)
229 {
230 r= shutdown(ptr->fd, SHUT_RDWR);
231
232 #ifdef HAVE_DEBUG
233 if (r && errno != ENOTCONN)
234 {
235 WATCHPOINT_NUMBER(ptr->fd);
236 WATCHPOINT_ERRNO(errno);
237 WATCHPOINT_ASSERT(errno);
238 }
239 #endif
240 }
241
242 r= close(ptr->fd);
243 #ifdef HAVE_DEBUG
244 if (r != 0)
245 WATCHPOINT_ERRNO(errno);
246 #endif
247
248 return MEMCACHED_SUCCESS;
249 }
250
251 static ssize_t io_flush(memcached_server_st *ptr,
252 memcached_return *error)
253 {
254 ssize_t sent_length;
255 size_t return_length;
256 char *local_write_ptr= ptr->write_buffer;
257 size_t write_length= ptr->write_buffer_offset;
258
259 *error= MEMCACHED_SUCCESS;
260
261 if (ptr->write_buffer_offset == 0)
262 return 0;
263
264 /* Looking for memory overflows */
265 #if defined(HAVE_DEBUG)
266 if (write_length == MEMCACHED_MAX_BUFFER)
267 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
268 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
269 #endif
270
271 return_length= 0;
272 while (write_length)
273 {
274 WATCHPOINT_ASSERT(write_length > 0);
275 sent_length= 0;
276 if (ptr->type == MEMCACHED_CONNECTION_UDP)
277 {
278 struct addrinfo *ai;
279
280 ai= ptr->address_info;
281
282 /* Crappy test code */
283 char buffer[HUGE_STRING_LEN + 8];
284 memset(buffer, 0, HUGE_STRING_LEN + 8);
285 memcpy (buffer+8, local_write_ptr, write_length);
286 buffer[0]= 0;
287 buffer[1]= 0;
288 buffer[2]= 0;
289 buffer[3]= 0;
290 buffer[4]= 0;
291 buffer[5]= 1;
292 buffer[6]= 0;
293 buffer[7]= 0;
294 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
295 (struct sockaddr *)ai->ai_addr,
296 ai->ai_addrlen);
297 if (sent_length == -1)
298 {
299 WATCHPOINT_ERRNO(errno);
300 WATCHPOINT_ASSERT(0);
301 }
302 sent_length-= 8; /* We remove the header */
303 }
304 else
305 {
306 /*
307 ** We might want to purge the input buffer if we haven't consumed
308 ** any output yet... The test for the limits is the purge is inline
309 ** in the purge function to avoid duplicating the logic..
310 */
311 memcached_purge(ptr);
312
313 if ((sent_length= write(ptr->fd, local_write_ptr,
314 write_length)) == -1)
315 {
316 switch (errno)
317 {
318 case ENOBUFS:
319 continue;
320 case EAGAIN:
321 {
322 memcached_return rc;
323 rc= io_wait(ptr, MEM_WRITE);
324
325 if (rc == MEMCACHED_SUCCESS)
326 continue;
327
328 memcached_quit_server(ptr, 1);
329 return -1;
330 }
331 default:
332 memcached_quit_server(ptr, 1);
333 ptr->cached_errno= errno;
334 *error= MEMCACHED_ERRNO;
335 return -1;
336 }
337 }
338 }
339
340 ptr->io_bytes_sent += sent_length;
341
342 local_write_ptr+= sent_length;
343 write_length-= sent_length;
344 return_length+= sent_length;
345 }
346
347 WATCHPOINT_ASSERT(write_length == 0);
348 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
349 // ptr->write_buffer_offset);
350 ptr->write_buffer_offset= 0;
351
352 return return_length;
353 }
354
355 /*
356 Eventually we will just kill off the server with the problem.
357 */
358 void memcached_io_reset(memcached_server_st *ptr)
359 {
360 memcached_quit_server(ptr, 0);
361 }