fix io_read bug
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 /*
34 ** We are going to block on write, but at least on Solaris we might block
35 ** on write if we haven't read anything from our input buffer..
36 ** Try to purge the input buffer if we don't do any flow control in the
37 ** application layer (just sending a lot of data etc)
38 ** The test is moved down in the purge function to avoid duplication of
39 ** the test.
40 */
41 if (read_or_write == MEM_WRITE)
42 memcached_purge(ptr);
43
44 error= poll(fds, 1, ptr->root->poll_timeout);
45
46 if (error == 1)
47 return MEMCACHED_SUCCESS;
48 else if (error == 0)
49 {
50 return MEMCACHED_TIMEOUT;
51 }
52
53 /* Imposssible for anything other then -1 */
54 WATCHPOINT_ASSERT(error == -1);
55 memcached_quit_server(ptr, 1);
56
57 return MEMCACHED_FAILURE;
58
59 }
60
61 #ifdef UNUSED
62 void memcached_io_preread(memcached_st *ptr)
63 {
64 unsigned int x;
65
66 return;
67
68 for (x= 0; x < ptr->number_of_hosts; x++)
69 {
70 if (memcached_server_response_count(ptr, x) &&
71 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
72 {
73 size_t data_read;
74
75 data_read= read(ptr->hosts[x].fd,
76 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
77 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
78 if (data_read == -1)
79 continue;
80
81 ptr->hosts[x].read_buffer_length+= data_read;
82 ptr->hosts[x].read_data_length+= data_read;
83 }
84 }
85 }
86 #endif
87
88 ssize_t memcached_io_read(memcached_server_st *ptr,
89 void *buffer, size_t length)
90 {
91 char *buffer_ptr;
92
93 buffer_ptr= buffer;
94
95 while (length)
96 {
97 uint8_t found_eof= 0;
98 if (!ptr->read_buffer_length)
99 {
100 ssize_t data_read;
101
102 while (1)
103 {
104 data_read= read(ptr->fd,
105 ptr->read_buffer,
106 MEMCACHED_MAX_BUFFER);
107 if (data_read > 0)
108 break;
109 else if (data_read == -1)
110 {
111 ptr->cached_errno= errno;
112 switch (errno)
113 {
114 case EAGAIN:
115 case EINTR:
116 {
117 memcached_return rc;
118
119 rc= io_wait(ptr, MEM_READ);
120
121 if (rc == MEMCACHED_SUCCESS)
122 continue;
123 }
124 /* fall trough */
125 default:
126 {
127 memcached_quit_server(ptr, 1);
128 return -1;
129 }
130 }
131 }
132 else
133 {
134 found_eof= 1;
135 break;
136 }
137 }
138
139 ptr->io_bytes_sent = 0;
140 ptr->read_data_length= data_read;
141 ptr->read_buffer_length= data_read;
142 ptr->read_ptr= ptr->read_buffer;
143 }
144
145 if (length > 1)
146 {
147 size_t difference;
148
149 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
150
151 memcpy(buffer_ptr, ptr->read_ptr, difference);
152 length -= difference;
153 ptr->read_ptr+= difference;
154 ptr->read_buffer_length-= difference;
155 buffer_ptr+= difference;
156 }
157 else
158 {
159 *buffer_ptr= *ptr->read_ptr;
160 ptr->read_ptr++;
161 ptr->read_buffer_length--;
162 buffer_ptr++;
163 break;
164 }
165
166 if (found_eof)
167 break;
168 }
169
170 return (size_t)(buffer_ptr - (char*)buffer);
171 }
172
173 ssize_t memcached_io_write(memcached_server_st *ptr,
174 const void *buffer, size_t length, char with_flush)
175 {
176 size_t original_length;
177 const char* buffer_ptr;
178
179 original_length= length;
180 buffer_ptr= buffer;
181
182 while (length)
183 {
184 char *write_ptr;
185 size_t should_write;
186
187 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
188 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
189
190 should_write= (should_write < length) ? should_write : length;
191
192 memcpy(write_ptr, buffer_ptr, should_write);
193 ptr->write_buffer_offset+= should_write;
194 buffer_ptr+= should_write;
195 length-= should_write;
196
197 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
198 {
199 memcached_return rc;
200 ssize_t sent_length;
201
202 sent_length= io_flush(ptr, &rc);
203 if (sent_length == -1)
204 return -1;
205
206 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
207 }
208 }
209
210 if (with_flush)
211 {
212 memcached_return rc;
213 if (io_flush(ptr, &rc) == -1)
214 return -1;
215 }
216
217 return original_length;
218 }
219
220 memcached_return memcached_io_close(memcached_server_st *ptr)
221 {
222 int r;
223 /* in case of death shutdown to avoid blocking at close() */
224
225 r= shutdown(ptr->fd, SHUT_RDWR);
226
227 #ifdef HAVE_DEBUG
228 if (r && errno != ENOTCONN)
229 {
230 WATCHPOINT_ERRNO(errno);
231 WATCHPOINT_ASSERT(errno);
232 }
233 #endif
234
235 r= close(ptr->fd);
236 #ifdef HAVE_DEBUG
237 if (r != 0)
238 WATCHPOINT_ERRNO(errno);
239 #endif
240
241 return MEMCACHED_SUCCESS;
242 }
243
244 static ssize_t io_flush(memcached_server_st *ptr,
245 memcached_return *error)
246 {
247 ssize_t sent_length;
248 size_t return_length;
249 char *local_write_ptr= ptr->write_buffer;
250 size_t write_length= ptr->write_buffer_offset;
251
252 *error= MEMCACHED_SUCCESS;
253
254 if (ptr->write_buffer_offset == 0)
255 return 0;
256
257 /* Looking for memory overflows */
258 #if defined(HAVE_DEBUG)
259 if (write_length == MEMCACHED_MAX_BUFFER)
260 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
261 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
262 #endif
263
264 return_length= 0;
265 while (write_length)
266 {
267 WATCHPOINT_ASSERT(write_length > 0);
268 sent_length= 0;
269 if (ptr->type == MEMCACHED_CONNECTION_UDP)
270 {
271 struct addrinfo *ai;
272
273 ai= ptr->address_info;
274
275 /* Crappy test code */
276 char buffer[HUGE_STRING_LEN + 8];
277 memset(buffer, 0, HUGE_STRING_LEN + 8);
278 memcpy (buffer+8, local_write_ptr, write_length);
279 buffer[0]= 0;
280 buffer[1]= 0;
281 buffer[2]= 0;
282 buffer[3]= 0;
283 buffer[4]= 0;
284 buffer[5]= 1;
285 buffer[6]= 0;
286 buffer[7]= 0;
287 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
288 (struct sockaddr *)ai->ai_addr,
289 ai->ai_addrlen);
290 if (sent_length == -1)
291 {
292 WATCHPOINT_ERRNO(errno);
293 WATCHPOINT_ASSERT(0);
294 }
295 sent_length-= 8; /* We remove the header */
296 }
297 else
298 {
299 /*
300 ** We might want to purge the input buffer if we haven't consumed
301 ** any output yet... The test for the limits is the purge is inline
302 ** in the purge function to avoid duplicating the logic..
303 */
304 memcached_purge(ptr);
305
306 if ((sent_length= write(ptr->fd, local_write_ptr,
307 write_length)) == -1)
308 {
309 switch (errno)
310 {
311 case ENOBUFS:
312 continue;
313 case EAGAIN:
314 {
315 memcached_return rc;
316 rc= io_wait(ptr, MEM_WRITE);
317
318 if (rc == MEMCACHED_SUCCESS)
319 continue;
320
321 memcached_quit_server(ptr, 1);
322 return -1;
323 }
324 default:
325 memcached_quit_server(ptr, 1);
326 ptr->cached_errno= errno;
327 *error= MEMCACHED_ERRNO;
328 return -1;
329 }
330 }
331 }
332
333 ptr->io_bytes_sent += sent_length;
334
335 local_write_ptr+= sent_length;
336 write_length-= sent_length;
337 return_length+= sent_length;
338 }
339
340 WATCHPOINT_ASSERT(write_length == 0);
341 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
342 // ptr->write_buffer_offset);
343 ptr->write_buffer_offset= 0;
344
345 return return_length;
346 }
347
348 /*
349 Eventually we will just kill off the server with the problem.
350 */
351 void memcached_io_reset(memcached_server_st *ptr)
352 {
353 memcached_quit_server(ptr, 0);
354 }