9a11556bdbaee21765d8fabbe5af3db70f74a4f6
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 /*
34 ** We are going to block on write, but at least on Solaris we might block
35 ** on write if we haven't read anything from our input buffer..
36 ** Try to purge the input buffer if we don't do any flow control in the
37 ** application layer (just sending a lot of data etc)
38 ** The test is moved down in the purge function to avoid duplication of
39 ** the test.
40 */
41 if (read_or_write == MEM_WRITE)
42 {
43 memcached_return rc=memcached_purge(ptr);
44 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
45 return MEMCACHED_FAILURE;
46 }
47
48 error= poll(fds, 1, ptr->root->poll_timeout);
49
50 if (error == 1)
51 return MEMCACHED_SUCCESS;
52 else if (error == 0)
53 {
54 return MEMCACHED_TIMEOUT;
55 }
56
57 /* Imposssible for anything other then -1 */
58 WATCHPOINT_ASSERT(error == -1);
59 memcached_quit_server(ptr, 1);
60
61 return MEMCACHED_FAILURE;
62
63 }
64
65 #ifdef UNUSED
66 void memcached_io_preread(memcached_st *ptr)
67 {
68 unsigned int x;
69
70 return;
71
72 for (x= 0; x < ptr->number_of_hosts; x++)
73 {
74 if (memcached_server_response_count(ptr, x) &&
75 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
76 {
77 size_t data_read;
78
79 data_read= read(ptr->hosts[x].fd,
80 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
81 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
82 if (data_read == -1)
83 continue;
84
85 ptr->hosts[x].read_buffer_length+= data_read;
86 ptr->hosts[x].read_data_length+= data_read;
87 }
88 }
89 }
90 #endif
91
92 ssize_t memcached_io_read(memcached_server_st *ptr,
93 void *buffer, size_t length)
94 {
95 char *buffer_ptr;
96
97 buffer_ptr= buffer;
98
99 while (length)
100 {
101 if (!ptr->read_buffer_length)
102 {
103 ssize_t data_read;
104
105 while (1)
106 {
107 data_read= read(ptr->fd,
108 ptr->read_buffer,
109 MEMCACHED_MAX_BUFFER);
110 if (data_read > 0)
111 break;
112 else if (data_read == -1)
113 {
114 ptr->cached_errno= errno;
115 switch (errno)
116 {
117 case EAGAIN:
118 case EINTR:
119 {
120 memcached_return rc;
121
122 rc= io_wait(ptr, MEM_READ);
123
124 if (rc == MEMCACHED_SUCCESS)
125 continue;
126 }
127 /* fall trough */
128 default:
129 {
130 memcached_quit_server(ptr, 1);
131 return -1;
132 }
133 }
134 }
135 else
136 {
137 /*
138 EOF. Any data received so far is incomplete
139 so discard it. This always reads by byte in case of TCP
140 and protocol enforcement happens at memcached_response()
141 looking for '\n'. We do not care for UDB which requests 8 bytes
142 at once. Generally, this means that connection went away. Since
143 for blocking I/O we do not return 0 and for non-blocking case
144 it will return EGAIN if data is not immediatly available.
145 */
146 memcached_quit_server(ptr, 1);
147 return -1;
148 }
149 }
150
151 ptr->io_bytes_sent = 0;
152 ptr->read_data_length= data_read;
153 ptr->read_buffer_length= data_read;
154 ptr->read_ptr= ptr->read_buffer;
155 }
156
157 if (length > 1)
158 {
159 size_t difference;
160
161 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
162
163 memcpy(buffer_ptr, ptr->read_ptr, difference);
164 length -= difference;
165 ptr->read_ptr+= difference;
166 ptr->read_buffer_length-= difference;
167 buffer_ptr+= difference;
168 }
169 else
170 {
171 *buffer_ptr= *ptr->read_ptr;
172 ptr->read_ptr++;
173 ptr->read_buffer_length--;
174 buffer_ptr++;
175 break;
176 }
177 }
178
179 return (size_t)(buffer_ptr - (char*)buffer);
180 }
181
182 ssize_t memcached_io_write(memcached_server_st *ptr,
183 const void *buffer, size_t length, char with_flush)
184 {
185 size_t original_length;
186 const char* buffer_ptr;
187
188 WATCHPOINT_ASSERT(ptr->fd != -1);
189
190 original_length= length;
191 buffer_ptr= buffer;
192
193 while (length)
194 {
195 char *write_ptr;
196 size_t should_write;
197
198 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
199 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
200
201 should_write= (should_write < length) ? should_write : length;
202
203 memcpy(write_ptr, buffer_ptr, should_write);
204 ptr->write_buffer_offset+= should_write;
205 buffer_ptr+= should_write;
206 length-= should_write;
207
208 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
209 {
210 memcached_return rc;
211 ssize_t sent_length;
212
213 WATCHPOINT_ASSERT(ptr->fd != -1);
214 sent_length= io_flush(ptr, &rc);
215 if (sent_length == -1)
216 return -1;
217
218 /* If io_flush calls memcached_purge, sent_length may be 0 */
219 if (sent_length != 0)
220 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
221 }
222 }
223
224 if (with_flush)
225 {
226 memcached_return rc;
227 WATCHPOINT_ASSERT(ptr->fd != -1);
228 if (io_flush(ptr, &rc) == -1)
229 return -1;
230 }
231
232 return original_length;
233 }
234
235 memcached_return memcached_io_close(memcached_server_st *ptr)
236 {
237 int r;
238
239 if (ptr->fd == -1)
240 return MEMCACHED_SUCCESS;
241
242 /* in case of death shutdown to avoid blocking at close() */
243 if (1)
244 {
245 r= shutdown(ptr->fd, SHUT_RDWR);
246
247 #ifdef HAVE_DEBUG
248 if (r && errno != ENOTCONN)
249 {
250 WATCHPOINT_NUMBER(ptr->fd);
251 WATCHPOINT_ERRNO(errno);
252 WATCHPOINT_ASSERT(errno);
253 }
254 #endif
255 }
256
257 r= close(ptr->fd);
258 #ifdef HAVE_DEBUG
259 if (r != 0)
260 WATCHPOINT_ERRNO(errno);
261 #endif
262
263 return MEMCACHED_SUCCESS;
264 }
265
266 static ssize_t io_flush(memcached_server_st *ptr,
267 memcached_return *error)
268 {
269 /*
270 ** We might want to purge the input buffer if we haven't consumed
271 ** any output yet... The test for the limits is the purge is inline
272 ** in the purge function to avoid duplicating the logic..
273 */
274 {
275 memcached_return rc;
276 WATCHPOINT_ASSERT(ptr->fd != -1);
277 rc= memcached_purge(ptr);
278
279 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_STORED)
280 return -1;
281 }
282 ssize_t sent_length;
283 size_t return_length;
284 char *local_write_ptr= ptr->write_buffer;
285 size_t write_length= ptr->write_buffer_offset;
286
287 *error= MEMCACHED_SUCCESS;
288
289 WATCHPOINT_ASSERT(ptr->fd != -1);
290
291 if (ptr->write_buffer_offset == 0)
292 return 0;
293
294 /* Looking for memory overflows */
295 #if defined(HAVE_DEBUG)
296 if (write_length == MEMCACHED_MAX_BUFFER)
297 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
298 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
299 #endif
300
301 return_length= 0;
302 while (write_length)
303 {
304 WATCHPOINT_ASSERT(ptr->fd != -1);
305 WATCHPOINT_ASSERT(write_length > 0);
306 sent_length= 0;
307 if (ptr->type == MEMCACHED_CONNECTION_UDP)
308 {
309 struct addrinfo *ai;
310
311 ai= ptr->address_info;
312
313 /* Crappy test code */
314 char buffer[HUGE_STRING_LEN + 8];
315 memset(buffer, 0, HUGE_STRING_LEN + 8);
316 memcpy (buffer+8, local_write_ptr, write_length);
317 buffer[0]= 0;
318 buffer[1]= 0;
319 buffer[2]= 0;
320 buffer[3]= 0;
321 buffer[4]= 0;
322 buffer[5]= 1;
323 buffer[6]= 0;
324 buffer[7]= 0;
325 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
326 (struct sockaddr *)ai->ai_addr,
327 ai->ai_addrlen);
328 if (sent_length == -1)
329 {
330 WATCHPOINT_ERRNO(errno);
331 WATCHPOINT_ASSERT(0);
332 }
333 sent_length-= 8; /* We remove the header */
334 }
335 else
336 {
337 WATCHPOINT_ASSERT(ptr->fd != -1);
338 if ((sent_length= write(ptr->fd, local_write_ptr,
339 write_length)) == -1)
340 {
341 switch (errno)
342 {
343 case ENOBUFS:
344 continue;
345 case EAGAIN:
346 {
347 memcached_return rc;
348 rc= io_wait(ptr, MEM_WRITE);
349
350 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_TIMEOUT)
351 continue;
352
353 memcached_quit_server(ptr, 1);
354 return -1;
355 }
356 default:
357 memcached_quit_server(ptr, 1);
358 ptr->cached_errno= errno;
359 *error= MEMCACHED_ERRNO;
360 return -1;
361 }
362 }
363 }
364
365 ptr->io_bytes_sent += sent_length;
366
367 local_write_ptr+= sent_length;
368 write_length-= sent_length;
369 return_length+= sent_length;
370 }
371
372 WATCHPOINT_ASSERT(write_length == 0);
373 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
374 // ptr->write_buffer_offset);
375 ptr->write_buffer_offset= 0;
376
377 return return_length;
378 }
379
380 /*
381 Eventually we will just kill off the server with the problem.
382 */
383 void memcached_io_reset(memcached_server_st *ptr)
384 {
385 memcached_quit_server(ptr, 1);
386 }
387
388 /**
389 * Read a given number of bytes from the server and place it into a specific
390 * buffer. Reset the IO channel on this server if an error occurs.
391 */
392 memcached_return memcached_safe_read(memcached_server_st *ptr,
393 void *dta,
394 size_t size)
395 {
396 size_t offset= 0;
397 char *data= dta;
398
399 while (offset < size)
400 {
401 ssize_t nread= memcached_io_read(ptr, data + offset, size - offset);
402 if (nread <= 0)
403 {
404 memcached_io_reset(ptr);
405 return MEMCACHED_UNKNOWN_READ_FAILURE;
406 }
407 offset+= nread;
408 }
409
410 return MEMCACHED_SUCCESS;
411 }