Removing purge support for the time being.
[m6w6/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 #ifdef NOT_DONE
34 /*
35 ** We are going to block on write, but at least on Solaris we might block
36 ** on write if we haven't read anything from our input buffer..
37 ** Try to purge the input buffer if we don't do any flow control in the
38 ** application layer (just sending a lot of data etc)
39 ** The test is moved down in the purge function to avoid duplication of
40 ** the test.
41 */
42 if (read_or_write == MEM_WRITE)
43 {
44 if (memcached_purge(ptr) != MEMCACHED_SUCCESS || memcached_purge(ptr) != MEMCACHED_STORED)
45 return MEMCACHED_FAILURE;
46 }
47 #endif
48
49 error= poll(fds, 1, ptr->root->poll_timeout);
50
51 if (error == 1)
52 return MEMCACHED_SUCCESS;
53 else if (error == 0)
54 {
55 return MEMCACHED_TIMEOUT;
56 }
57
58 /* Imposssible for anything other then -1 */
59 WATCHPOINT_ASSERT(error == -1);
60 memcached_quit_server(ptr, 1);
61
62 return MEMCACHED_FAILURE;
63
64 }
65
66 #ifdef UNUSED
67 void memcached_io_preread(memcached_st *ptr)
68 {
69 unsigned int x;
70
71 return;
72
73 for (x= 0; x < ptr->number_of_hosts; x++)
74 {
75 if (memcached_server_response_count(ptr, x) &&
76 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
77 {
78 size_t data_read;
79
80 data_read= read(ptr->hosts[x].fd,
81 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
82 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
83 if (data_read == -1)
84 continue;
85
86 ptr->hosts[x].read_buffer_length+= data_read;
87 ptr->hosts[x].read_data_length+= data_read;
88 }
89 }
90 }
91 #endif
92
93 ssize_t memcached_io_read(memcached_server_st *ptr,
94 void *buffer, size_t length)
95 {
96 char *buffer_ptr;
97
98 buffer_ptr= buffer;
99
100 while (length)
101 {
102 uint8_t found_eof= 0;
103 if (!ptr->read_buffer_length)
104 {
105 ssize_t data_read;
106
107 while (1)
108 {
109 data_read= read(ptr->fd,
110 ptr->read_buffer,
111 MEMCACHED_MAX_BUFFER);
112 if (data_read > 0)
113 break;
114 else if (data_read == -1)
115 {
116 ptr->cached_errno= errno;
117 switch (errno)
118 {
119 case EAGAIN:
120 case EINTR:
121 {
122 memcached_return rc;
123
124 rc= io_wait(ptr, MEM_READ);
125
126 if (rc == MEMCACHED_SUCCESS)
127 continue;
128 }
129 /* fall trough */
130 default:
131 {
132 memcached_quit_server(ptr, 1);
133 return -1;
134 }
135 }
136 }
137 else
138 {
139 found_eof= 1;
140 break;
141 }
142 }
143
144 ptr->io_bytes_sent = 0;
145 ptr->read_data_length= data_read;
146 ptr->read_buffer_length= data_read;
147 ptr->read_ptr= ptr->read_buffer;
148 }
149
150 if (length > 1)
151 {
152 size_t difference;
153
154 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
155
156 memcpy(buffer_ptr, ptr->read_ptr, difference);
157 length -= difference;
158 ptr->read_ptr+= difference;
159 ptr->read_buffer_length-= difference;
160 buffer_ptr+= difference;
161 }
162 else
163 {
164 *buffer_ptr= *ptr->read_ptr;
165 ptr->read_ptr++;
166 ptr->read_buffer_length--;
167 buffer_ptr++;
168 break;
169 }
170
171 if (found_eof)
172 break;
173 }
174
175 return (size_t)(buffer_ptr - (char*)buffer);
176 }
177
178 ssize_t memcached_io_write(memcached_server_st *ptr,
179 const void *buffer, size_t length, char with_flush)
180 {
181 size_t original_length;
182 const char* buffer_ptr;
183
184 WATCHPOINT_ASSERT(ptr->fd != -1);
185
186 original_length= length;
187 buffer_ptr= buffer;
188
189 while (length)
190 {
191 char *write_ptr;
192 size_t should_write;
193
194 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
195 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
196
197 should_write= (should_write < length) ? should_write : length;
198
199 memcpy(write_ptr, buffer_ptr, should_write);
200 ptr->write_buffer_offset+= should_write;
201 buffer_ptr+= should_write;
202 length-= should_write;
203
204 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
205 {
206 memcached_return rc;
207 ssize_t sent_length;
208
209 WATCHPOINT_ASSERT(ptr->fd != -1);
210 sent_length= io_flush(ptr, &rc);
211 if (sent_length == -1)
212 return -1;
213
214 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
215 }
216 }
217
218 if (with_flush)
219 {
220 memcached_return rc;
221 WATCHPOINT_ASSERT(ptr->fd != -1);
222 if (io_flush(ptr, &rc) == -1)
223 return -1;
224 }
225
226 return original_length;
227 }
228
229 memcached_return memcached_io_close(memcached_server_st *ptr)
230 {
231 int r;
232
233 if (ptr->fd == -1)
234 return MEMCACHED_SUCCESS;
235
236 /* in case of death shutdown to avoid blocking at close() */
237 r= shutdown(ptr->fd, SHUT_RDWR);
238
239 #ifdef HAVE_DEBUG
240 if (r && errno != ENOTCONN)
241 {
242 WATCHPOINT_NUMBER(ptr->fd);
243 WATCHPOINT_ERRNO(errno);
244 WATCHPOINT_ASSERT(errno);
245 }
246 #endif
247
248 r= close(ptr->fd);
249 #ifdef HAVE_DEBUG
250 if (r != 0)
251 WATCHPOINT_ERRNO(errno);
252 #endif
253
254 return MEMCACHED_SUCCESS;
255 }
256
257 static ssize_t io_flush(memcached_server_st *ptr,
258 memcached_return *error)
259 {
260 ssize_t sent_length;
261 size_t return_length;
262 char *local_write_ptr= ptr->write_buffer;
263 size_t write_length= ptr->write_buffer_offset;
264
265 *error= MEMCACHED_SUCCESS;
266
267 WATCHPOINT_ASSERT(ptr->fd != -1);
268
269 if (ptr->write_buffer_offset == 0)
270 return 0;
271
272 /* Looking for memory overflows */
273 #if defined(HAVE_DEBUG)
274 if (write_length == MEMCACHED_MAX_BUFFER)
275 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
276 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
277 #endif
278
279 return_length= 0;
280 while (write_length)
281 {
282 WATCHPOINT_ASSERT(ptr->fd != -1);
283 WATCHPOINT_ASSERT(write_length > 0);
284 sent_length= 0;
285 if (ptr->type == MEMCACHED_CONNECTION_UDP)
286 {
287 struct addrinfo *ai;
288
289 ai= ptr->address_info;
290
291 /* Crappy test code */
292 char buffer[HUGE_STRING_LEN + 8];
293 memset(buffer, 0, HUGE_STRING_LEN + 8);
294 memcpy (buffer+8, local_write_ptr, write_length);
295 buffer[0]= 0;
296 buffer[1]= 0;
297 buffer[2]= 0;
298 buffer[3]= 0;
299 buffer[4]= 0;
300 buffer[5]= 1;
301 buffer[6]= 0;
302 buffer[7]= 0;
303 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
304 (struct sockaddr *)ai->ai_addr,
305 ai->ai_addrlen);
306 if (sent_length == -1)
307 {
308 WATCHPOINT_ERRNO(errno);
309 WATCHPOINT_ASSERT(0);
310 }
311 sent_length-= 8; /* We remove the header */
312 }
313 else
314 {
315 #ifdef NOT_DONE
316 /*
317 ** We might want to purge the input buffer if we haven't consumed
318 ** any output yet... The test for the limits is the purge is inline
319 ** in the purge function to avoid duplicating the logic..
320 */
321 {
322 memcached_return rc;
323 WATCHPOINT_ASSERT(ptr->fd != -1);
324 rc= memcached_purge(ptr);
325
326 if (rc != MEMCACHED_SUCCESS || rc != MEMCACHED_STORED)
327 return -1;
328 }
329 #endif
330
331 WATCHPOINT_ASSERT(ptr->fd != -1);
332 if ((sent_length= write(ptr->fd, local_write_ptr,
333 write_length)) == -1)
334 {
335 switch (errno)
336 {
337 case ENOBUFS:
338 continue;
339 case EAGAIN:
340 {
341 memcached_return rc;
342 rc= io_wait(ptr, MEM_WRITE);
343
344 if (rc == MEMCACHED_SUCCESS)
345 continue;
346
347 memcached_quit_server(ptr, 1);
348 return -1;
349 }
350 default:
351 memcached_quit_server(ptr, 1);
352 ptr->cached_errno= errno;
353 *error= MEMCACHED_ERRNO;
354 return -1;
355 }
356 }
357 }
358
359 ptr->io_bytes_sent += sent_length;
360
361 local_write_ptr+= sent_length;
362 write_length-= sent_length;
363 return_length+= sent_length;
364 }
365
366 WATCHPOINT_ASSERT(write_length == 0);
367 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
368 // ptr->write_buffer_offset);
369 ptr->write_buffer_offset= 0;
370
371 return return_length;
372 }
373
374 /*
375 Eventually we will just kill off the server with the problem.
376 */
377 void memcached_io_reset(memcached_server_st *ptr)
378 {
379 memcached_quit_server(ptr, 0);
380 }