ddca197260f818de70c5e04e6e60a553479f1b8e
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 /*
34 ** We are going to block on write, but at least on Solaris we might block
35 ** on write if we haven't read anything from our input buffer..
36 ** Try to purge the input buffer if we don't do any flow control in the
37 ** application layer (just sending a lot of data etc)
38 ** The test is moved down in the purge function to avoid duplication of
39 ** the test.
40 */
41 if (read_or_write == MEM_WRITE)
42 memcached_purge(ptr);
43
44 error= poll(fds, 1, ptr->root->poll_timeout);
45
46 if (error == 1)
47 return MEMCACHED_SUCCESS;
48 else if (error == 0)
49 {
50 return MEMCACHED_TIMEOUT;
51 }
52
53 /* Imposssible for anything other then -1 */
54 WATCHPOINT_ASSERT(error == -1);
55 memcached_quit_server(ptr, 1);
56
57 return MEMCACHED_FAILURE;
58
59 }
60
61 #ifdef UNUSED
62 void memcached_io_preread(memcached_st *ptr)
63 {
64 unsigned int x;
65
66 return;
67
68 for (x= 0; x < ptr->number_of_hosts; x++)
69 {
70 if (memcached_server_response_count(ptr, x) &&
71 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
72 {
73 size_t data_read;
74
75 data_read= read(ptr->hosts[x].fd,
76 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
77 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
78 if (data_read == -1)
79 continue;
80
81 ptr->hosts[x].read_buffer_length+= data_read;
82 ptr->hosts[x].read_data_length+= data_read;
83 }
84 }
85 }
86 #endif
87
88 ssize_t memcached_io_read(memcached_server_st *ptr,
89 void *buffer, size_t length)
90 {
91 char *buffer_ptr;
92
93 buffer_ptr= buffer;
94
95 while (length)
96 {
97 uint8_t found_eof= 0;
98 if (!ptr->read_buffer_length)
99 {
100 ssize_t data_read;
101
102 while (1)
103 {
104 data_read= read(ptr->fd,
105 ptr->read_buffer,
106 MEMCACHED_MAX_BUFFER);
107 if (data_read > 0)
108 break;
109 else if (data_read == -1)
110 {
111 ptr->cached_errno= errno;
112 switch (errno)
113 {
114 case EAGAIN:
115 {
116 memcached_return rc;
117
118 rc= io_wait(ptr, MEM_READ);
119
120 if (rc == MEMCACHED_SUCCESS)
121 continue;
122 }
123 /* fall trough */
124 default:
125 {
126 memcached_quit_server(ptr, 1);
127 return -1;
128 }
129 }
130 }
131 else
132 {
133 found_eof= 1;
134 break;
135 }
136 }
137
138 ptr->io_bytes_sent = 0;
139 ptr->read_data_length= data_read;
140 ptr->read_buffer_length= data_read;
141 ptr->read_ptr= ptr->read_buffer;
142 }
143
144 if (length > 1)
145 {
146 size_t difference;
147
148 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
149
150 memcpy(buffer_ptr, ptr->read_ptr, difference);
151 length -= difference;
152 ptr->read_ptr+= difference;
153 ptr->read_buffer_length-= difference;
154 buffer_ptr+= difference;
155 }
156 else
157 {
158 *buffer_ptr= *ptr->read_ptr;
159 ptr->read_ptr++;
160 ptr->read_buffer_length--;
161 buffer_ptr++;
162 break;
163 }
164
165 if (found_eof)
166 break;
167 }
168
169 return (size_t)(buffer_ptr - (char*)buffer);
170 }
171
172 ssize_t memcached_io_write(memcached_server_st *ptr,
173 const void *buffer, size_t length, char with_flush)
174 {
175 size_t original_length;
176 const char* buffer_ptr;
177
178 original_length= length;
179 buffer_ptr= buffer;
180
181 while (length)
182 {
183 char *write_ptr;
184 size_t should_write;
185
186 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
187 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
188
189 should_write= (should_write < length) ? should_write : length;
190
191 memcpy(write_ptr, buffer_ptr, should_write);
192 ptr->write_buffer_offset+= should_write;
193 buffer_ptr+= should_write;
194 length-= should_write;
195
196 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
197 {
198 memcached_return rc;
199 ssize_t sent_length;
200
201 sent_length= io_flush(ptr, &rc);
202 if (sent_length == -1)
203 return -1;
204
205 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
206 }
207 }
208
209 if (with_flush)
210 {
211 memcached_return rc;
212 if (io_flush(ptr, &rc) == -1)
213 return -1;
214 }
215
216 return original_length;
217 }
218
219 memcached_return memcached_io_close(memcached_server_st *ptr)
220 {
221 int r;
222 /* in case of death shutdown to avoid blocking at close() */
223
224 r= shutdown(ptr->fd, SHUT_RDWR);
225
226 #ifdef HAVE_DEBUG
227 if (r && errno != ENOTCONN)
228 {
229 WATCHPOINT_ERRNO(errno);
230 WATCHPOINT_ASSERT(errno);
231 }
232 #endif
233
234 r= close(ptr->fd);
235 WATCHPOINT_ASSERT(r == 0);
236
237 return MEMCACHED_SUCCESS;
238 }
239
240 static ssize_t io_flush(memcached_server_st *ptr,
241 memcached_return *error)
242 {
243 ssize_t sent_length;
244 size_t return_length;
245 char *local_write_ptr= ptr->write_buffer;
246 size_t write_length= ptr->write_buffer_offset;
247
248 *error= MEMCACHED_SUCCESS;
249
250 if (ptr->write_buffer_offset == 0)
251 return 0;
252
253 /* Looking for memory overflows */
254 #if defined(HAVE_DEBUG)
255 if (write_length == MEMCACHED_MAX_BUFFER)
256 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
257 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
258 #endif
259
260 return_length= 0;
261 while (write_length)
262 {
263 WATCHPOINT_ASSERT(write_length > 0);
264 sent_length= 0;
265 if (ptr->type == MEMCACHED_CONNECTION_UDP)
266 {
267 struct addrinfo *ai;
268
269 ai= ptr->address_info;
270
271 /* Crappy test code */
272 char buffer[HUGE_STRING_LEN + 8];
273 memset(buffer, 0, HUGE_STRING_LEN + 8);
274 memcpy (buffer+8, local_write_ptr, write_length);
275 buffer[0]= 0;
276 buffer[1]= 0;
277 buffer[2]= 0;
278 buffer[3]= 0;
279 buffer[4]= 0;
280 buffer[5]= 1;
281 buffer[6]= 0;
282 buffer[7]= 0;
283 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
284 (struct sockaddr *)ai->ai_addr,
285 ai->ai_addrlen);
286 if (sent_length == -1)
287 {
288 WATCHPOINT_ERRNO(errno);
289 WATCHPOINT_ASSERT(0);
290 }
291 sent_length-= 8; /* We remove the header */
292 }
293 else
294 {
295 /*
296 ** We might want to purge the input buffer if we haven't consumed
297 ** any output yet... The test for the limits is the purge is inline
298 ** in the purge function to avoid duplicating the logic..
299 */
300 memcached_purge(ptr);
301
302 if ((sent_length= write(ptr->fd, local_write_ptr,
303 write_length)) == -1)
304 {
305 switch (errno)
306 {
307 case ENOBUFS:
308 continue;
309 case EAGAIN:
310 {
311 memcached_return rc;
312 rc= io_wait(ptr, MEM_WRITE);
313
314 if (rc == MEMCACHED_SUCCESS)
315 continue;
316
317 memcached_quit_server(ptr, 1);
318 return -1;
319 }
320 default:
321 memcached_quit_server(ptr, 1);
322 ptr->cached_errno= errno;
323 *error= MEMCACHED_ERRNO;
324 return -1;
325 }
326 }
327 }
328
329 ptr->io_bytes_sent += sent_length;
330
331 local_write_ptr+= sent_length;
332 write_length-= sent_length;
333 return_length+= sent_length;
334 }
335
336 WATCHPOINT_ASSERT(write_length == 0);
337 WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset);
338 ptr->write_buffer_offset= 0;
339
340 return return_length;
341 }
342
343 /*
344 Eventually we will just kill off the server with the problem.
345 */
346 void memcached_io_reset(memcached_server_st *ptr)
347 {
348 memcached_quit_server(ptr, 0);
349 }