Patch from evn
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 #ifdef NOT_DONE
34 /*
35 ** We are going to block on write, but at least on Solaris we might block
36 ** on write if we haven't read anything from our input buffer..
37 ** Try to purge the input buffer if we don't do any flow control in the
38 ** application layer (just sending a lot of data etc)
39 ** The test is moved down in the purge function to avoid duplication of
40 ** the test.
41 */
42 if (read_or_write == MEM_WRITE)
43 {
44 if (memcached_purge(ptr) != MEMCACHED_SUCCESS || memcached_purge(ptr) != MEMCACHED_STORED)
45 return MEMCACHED_FAILURE;
46 }
47 #endif
48
49 error= poll(fds, 1, ptr->root->poll_timeout);
50
51 if (error == 1)
52 return MEMCACHED_SUCCESS;
53 else if (error == 0)
54 {
55 return MEMCACHED_TIMEOUT;
56 }
57
58 /* Imposssible for anything other then -1 */
59 WATCHPOINT_ASSERT(error == -1);
60 memcached_quit_server(ptr, 1);
61
62 return MEMCACHED_FAILURE;
63
64 }
65
66 #ifdef UNUSED
67 void memcached_io_preread(memcached_st *ptr)
68 {
69 unsigned int x;
70
71 return;
72
73 for (x= 0; x < ptr->number_of_hosts; x++)
74 {
75 if (memcached_server_response_count(ptr, x) &&
76 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
77 {
78 size_t data_read;
79
80 data_read= read(ptr->hosts[x].fd,
81 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
82 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
83 if (data_read == -1)
84 continue;
85
86 ptr->hosts[x].read_buffer_length+= data_read;
87 ptr->hosts[x].read_data_length+= data_read;
88 }
89 }
90 }
91 #endif
92
93 ssize_t memcached_io_read(memcached_server_st *ptr,
94 void *buffer, size_t length)
95 {
96 char *buffer_ptr;
97
98 buffer_ptr= buffer;
99
100 while (length)
101 {
102 uint8_t found_eof= 0;
103 if (!ptr->read_buffer_length)
104 {
105 ssize_t data_read;
106
107 while (1)
108 {
109 data_read= read(ptr->fd,
110 ptr->read_buffer,
111 MEMCACHED_MAX_BUFFER);
112 if (data_read > 0)
113 break;
114 else if (data_read == -1)
115 {
116 ptr->cached_errno= errno;
117 switch (errno)
118 {
119 case EAGAIN:
120 case EINTR:
121 {
122 memcached_return rc;
123
124 rc= io_wait(ptr, MEM_READ);
125
126 if (rc == MEMCACHED_SUCCESS)
127 continue;
128 }
129 /* fall trough */
130 default:
131 {
132 memcached_quit_server(ptr, 1);
133 return -1;
134 }
135 }
136 }
137 else
138 {
139 found_eof= 1;
140 break;
141 }
142 }
143
144 ptr->io_bytes_sent = 0;
145 ptr->read_data_length= data_read;
146 ptr->read_buffer_length= data_read;
147 ptr->read_ptr= ptr->read_buffer;
148 }
149
150 if (length > 1)
151 {
152 size_t difference;
153
154 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
155
156 memcpy(buffer_ptr, ptr->read_ptr, difference);
157 length -= difference;
158 ptr->read_ptr+= difference;
159 ptr->read_buffer_length-= difference;
160 buffer_ptr+= difference;
161 }
162 else
163 {
164 *buffer_ptr= *ptr->read_ptr;
165 ptr->read_ptr++;
166 ptr->read_buffer_length--;
167 buffer_ptr++;
168 break;
169 }
170
171 if (found_eof)
172 break;
173 }
174
175 return (size_t)(buffer_ptr - (char*)buffer);
176 }
177
178 ssize_t memcached_io_write(memcached_server_st *ptr,
179 const void *buffer, size_t length, char with_flush)
180 {
181 size_t original_length;
182 const char* buffer_ptr;
183
184 WATCHPOINT_ASSERT(ptr->fd != -1);
185
186 original_length= length;
187 buffer_ptr= buffer;
188
189 while (length)
190 {
191 char *write_ptr;
192 size_t should_write;
193
194 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
195 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
196
197 should_write= (should_write < length) ? should_write : length;
198
199 memcpy(write_ptr, buffer_ptr, should_write);
200 ptr->write_buffer_offset+= should_write;
201 buffer_ptr+= should_write;
202 length-= should_write;
203
204 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
205 {
206 memcached_return rc;
207 ssize_t sent_length;
208
209 WATCHPOINT_ASSERT(ptr->fd != -1);
210 sent_length= io_flush(ptr, &rc);
211 if (sent_length == -1)
212 return -1;
213
214 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
215 }
216 }
217
218 if (with_flush)
219 {
220 memcached_return rc;
221 WATCHPOINT_ASSERT(ptr->fd != -1);
222 if (io_flush(ptr, &rc) == -1)
223 return -1;
224 }
225
226 return original_length;
227 }
228
229 memcached_return memcached_io_close(memcached_server_st *ptr)
230 {
231 int r;
232
233 if (ptr->fd == -1)
234 return MEMCACHED_SUCCESS;
235
236 /* in case of death shutdown to avoid blocking at close() */
237 if (1)
238 {
239 r= shutdown(ptr->fd, SHUT_RDWR);
240
241 #ifdef HAVE_DEBUG
242 if (r && errno != ENOTCONN)
243 {
244 WATCHPOINT_NUMBER(ptr->fd);
245 WATCHPOINT_ERRNO(errno);
246 WATCHPOINT_ASSERT(errno);
247 }
248 #endif
249 }
250
251 r= close(ptr->fd);
252 #ifdef HAVE_DEBUG
253 if (r != 0)
254 WATCHPOINT_ERRNO(errno);
255 #endif
256
257 return MEMCACHED_SUCCESS;
258 }
259
260 static ssize_t io_flush(memcached_server_st *ptr,
261 memcached_return *error)
262 {
263 ssize_t sent_length;
264 size_t return_length;
265 char *local_write_ptr= ptr->write_buffer;
266 size_t write_length= ptr->write_buffer_offset;
267
268 *error= MEMCACHED_SUCCESS;
269
270 WATCHPOINT_ASSERT(ptr->fd != -1);
271
272 if (ptr->write_buffer_offset == 0)
273 return 0;
274
275 /* Looking for memory overflows */
276 #if defined(HAVE_DEBUG)
277 if (write_length == MEMCACHED_MAX_BUFFER)
278 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
279 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
280 #endif
281
282 return_length= 0;
283 while (write_length)
284 {
285 WATCHPOINT_ASSERT(ptr->fd != -1);
286 WATCHPOINT_ASSERT(write_length > 0);
287 sent_length= 0;
288 if (ptr->type == MEMCACHED_CONNECTION_UDP)
289 {
290 struct addrinfo *ai;
291
292 ai= ptr->address_info;
293
294 /* Crappy test code */
295 char buffer[HUGE_STRING_LEN + 8];
296 memset(buffer, 0, HUGE_STRING_LEN + 8);
297 memcpy (buffer+8, local_write_ptr, write_length);
298 buffer[0]= 0;
299 buffer[1]= 0;
300 buffer[2]= 0;
301 buffer[3]= 0;
302 buffer[4]= 0;
303 buffer[5]= 1;
304 buffer[6]= 0;
305 buffer[7]= 0;
306 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
307 (struct sockaddr *)ai->ai_addr,
308 ai->ai_addrlen);
309 if (sent_length == -1)
310 {
311 WATCHPOINT_ERRNO(errno);
312 WATCHPOINT_ASSERT(0);
313 }
314 sent_length-= 8; /* We remove the header */
315 }
316 else
317 {
318 #ifdef NOT_DONE
319 /*
320 ** We might want to purge the input buffer if we haven't consumed
321 ** any output yet... The test for the limits is the purge is inline
322 ** in the purge function to avoid duplicating the logic..
323 */
324 {
325 memcached_return rc;
326 WATCHPOINT_ASSERT(ptr->fd != -1);
327 rc= memcached_purge(ptr);
328
329 if (rc != MEMCACHED_SUCCESS || rc != MEMCACHED_STORED)
330 return -1;
331 }
332 #endif
333
334 WATCHPOINT_ASSERT(ptr->fd != -1);
335 if ((sent_length= write(ptr->fd, local_write_ptr,
336 write_length)) == -1)
337 {
338 switch (errno)
339 {
340 case ENOBUFS:
341 continue;
342 case EAGAIN:
343 {
344 memcached_return rc;
345 rc= io_wait(ptr, MEM_WRITE);
346
347 if (rc == MEMCACHED_SUCCESS)
348 continue;
349
350 memcached_quit_server(ptr, 1);
351 return -1;
352 }
353 default:
354 memcached_quit_server(ptr, 1);
355 ptr->cached_errno= errno;
356 *error= MEMCACHED_ERRNO;
357 return -1;
358 }
359 }
360 }
361
362 ptr->io_bytes_sent += sent_length;
363
364 local_write_ptr+= sent_length;
365 write_length-= sent_length;
366 return_length+= sent_length;
367 }
368
369 WATCHPOINT_ASSERT(write_length == 0);
370 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
371 // ptr->write_buffer_offset);
372 ptr->write_buffer_offset= 0;
373
374 return return_length;
375 }
376
377 /*
378 Eventually we will just kill off the server with the problem.
379 */
380 void memcached_io_reset(memcached_server_st *ptr)
381 {
382 memcached_quit_server(ptr, 0);
383 }