da845912fc7de05c745b743f137ff2ca1520cdd2
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 /*
34 ** We are going to block on write, but at least on Solaris we might block
35 ** on write if we haven't read anything from our input buffer..
36 ** Try to purge the input buffer if we don't do any flow control in the
37 ** application layer (just sending a lot of data etc)
38 ** The test is moved down in the purge function to avoid duplication of
39 ** the test.
40 */
41 if (read_or_write == MEM_WRITE)
42 memcached_purge(ptr);
43
44 error= poll(fds, 1, ptr->root->poll_timeout);
45
46 if (error == 1)
47 return MEMCACHED_SUCCESS;
48 else if (error == 0)
49 {
50 return MEMCACHED_TIMEOUT;
51 }
52
53 /* Imposssible for anything other then -1 */
54 WATCHPOINT_ASSERT(error == -1);
55 memcached_quit_server(ptr, 1);
56
57 return MEMCACHED_FAILURE;
58
59 }
60
61 #ifdef UNUSED
62 void memcached_io_preread(memcached_st *ptr)
63 {
64 unsigned int x;
65
66 return;
67
68 for (x= 0; x < ptr->number_of_hosts; x++)
69 {
70 if (memcached_server_response_count(ptr, x) &&
71 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
72 {
73 size_t data_read;
74
75 data_read= read(ptr->hosts[x].fd,
76 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
77 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
78 if (data_read == -1)
79 continue;
80
81 ptr->hosts[x].read_buffer_length+= data_read;
82 ptr->hosts[x].read_data_length+= data_read;
83 }
84 }
85 }
86 #endif
87
88 ssize_t memcached_io_read(memcached_server_st *ptr,
89 void *buffer, size_t length)
90 {
91 char *buffer_ptr;
92
93 buffer_ptr= buffer;
94
95 while (length)
96 {
97 uint8_t found_eof= 0;
98 if (!ptr->read_buffer_length)
99 {
100 ssize_t data_read;
101
102 while (1)
103 {
104 data_read= read(ptr->fd,
105 ptr->read_buffer,
106 MEMCACHED_MAX_BUFFER);
107 if (data_read > 0)
108 break;
109 else if (data_read == -1)
110 {
111 ptr->cached_errno= errno;
112 switch (errno)
113 {
114 case EAGAIN:
115 {
116 memcached_return rc;
117
118 rc= io_wait(ptr, MEM_READ);
119
120 if (rc == MEMCACHED_SUCCESS)
121 continue;
122 }
123 /* fall trough */
124 default:
125 {
126 memcached_quit_server(ptr, 1);
127 return -1;
128 }
129 }
130 }
131 else
132 {
133 found_eof= 1;
134 break;
135 }
136 }
137
138 ptr->io_bytes_sent = 0;
139 ptr->read_data_length= data_read;
140 ptr->read_buffer_length= data_read;
141 ptr->read_ptr= ptr->read_buffer;
142 }
143
144 if (length > 1)
145 {
146 size_t difference;
147
148 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
149
150 memcpy(buffer_ptr, ptr->read_ptr, difference);
151 length -= difference;
152 ptr->read_ptr+= difference;
153 ptr->read_buffer_length-= difference;
154 buffer_ptr+= difference;
155 }
156 else
157 {
158 *buffer_ptr= *ptr->read_ptr;
159 ptr->read_ptr++;
160 ptr->read_buffer_length--;
161 buffer_ptr++;
162 break;
163 }
164
165 if (found_eof)
166 break;
167 }
168
169 return (size_t)(buffer_ptr - (char*)buffer);
170 }
171
172 ssize_t memcached_io_write(memcached_server_st *ptr,
173 const void *buffer, size_t length, char with_flush)
174 {
175 size_t original_length;
176 const char* buffer_ptr;
177
178 original_length= length;
179 buffer_ptr= buffer;
180
181 while (length)
182 {
183 char *write_ptr;
184 size_t should_write;
185
186 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
187 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
188
189 should_write= (should_write < length) ? should_write : length;
190
191 memcpy(write_ptr, buffer_ptr, should_write);
192 ptr->write_buffer_offset+= should_write;
193 buffer_ptr+= should_write;
194 length-= should_write;
195
196 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
197 {
198 memcached_return rc;
199 ssize_t sent_length;
200
201 sent_length= io_flush(ptr, &rc);
202 if (sent_length == -1)
203 return -1;
204
205 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
206 }
207 }
208
209 if (with_flush)
210 {
211 memcached_return rc;
212 if (io_flush(ptr, &rc) == -1)
213 return -1;
214 }
215
216 return original_length;
217 }
218
219 memcached_return memcached_io_close(memcached_server_st *ptr)
220 {
221 int r;
222 /* in case of death shutdown to avoid blocking at close() */
223
224 r= shutdown(ptr->fd, SHUT_RDWR);
225
226 #ifdef HAVE_DEBUG
227 if (r && errno != ENOTCONN)
228 {
229 WATCHPOINT_ERRNO(errno);
230 WATCHPOINT_ASSERT(errno);
231 }
232 #endif
233
234 r= close(ptr->fd);
235 #ifdef HAVE_DEBUG
236 if (r != 0)
237 WATCHPOINT_ERRNO(errno);
238 #endif
239
240 return MEMCACHED_SUCCESS;
241 }
242
243 static ssize_t io_flush(memcached_server_st *ptr,
244 memcached_return *error)
245 {
246 ssize_t sent_length;
247 size_t return_length;
248 char *local_write_ptr= ptr->write_buffer;
249 size_t write_length= ptr->write_buffer_offset;
250
251 *error= MEMCACHED_SUCCESS;
252
253 if (ptr->write_buffer_offset == 0)
254 return 0;
255
256 /* Looking for memory overflows */
257 #if defined(HAVE_DEBUG)
258 if (write_length == MEMCACHED_MAX_BUFFER)
259 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
260 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
261 #endif
262
263 return_length= 0;
264 while (write_length)
265 {
266 WATCHPOINT_ASSERT(write_length > 0);
267 sent_length= 0;
268 if (ptr->type == MEMCACHED_CONNECTION_UDP)
269 {
270 struct addrinfo *ai;
271
272 ai= ptr->address_info;
273
274 /* Crappy test code */
275 char buffer[HUGE_STRING_LEN + 8];
276 memset(buffer, 0, HUGE_STRING_LEN + 8);
277 memcpy (buffer+8, local_write_ptr, write_length);
278 buffer[0]= 0;
279 buffer[1]= 0;
280 buffer[2]= 0;
281 buffer[3]= 0;
282 buffer[4]= 0;
283 buffer[5]= 1;
284 buffer[6]= 0;
285 buffer[7]= 0;
286 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
287 (struct sockaddr *)ai->ai_addr,
288 ai->ai_addrlen);
289 if (sent_length == -1)
290 {
291 WATCHPOINT_ERRNO(errno);
292 WATCHPOINT_ASSERT(0);
293 }
294 sent_length-= 8; /* We remove the header */
295 }
296 else
297 {
298 /*
299 ** We might want to purge the input buffer if we haven't consumed
300 ** any output yet... The test for the limits is the purge is inline
301 ** in the purge function to avoid duplicating the logic..
302 */
303 memcached_purge(ptr);
304
305 if ((sent_length= write(ptr->fd, local_write_ptr,
306 write_length)) == -1)
307 {
308 switch (errno)
309 {
310 case ENOBUFS:
311 continue;
312 case EAGAIN:
313 {
314 memcached_return rc;
315 rc= io_wait(ptr, MEM_WRITE);
316
317 if (rc == MEMCACHED_SUCCESS)
318 continue;
319
320 memcached_quit_server(ptr, 1);
321 return -1;
322 }
323 default:
324 memcached_quit_server(ptr, 1);
325 ptr->cached_errno= errno;
326 *error= MEMCACHED_ERRNO;
327 return -1;
328 }
329 }
330 }
331
332 ptr->io_bytes_sent += sent_length;
333
334 local_write_ptr+= sent_length;
335 write_length-= sent_length;
336 return_length+= sent_length;
337 }
338
339 WATCHPOINT_ASSERT(write_length == 0);
340 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
341 // ptr->write_buffer_offset);
342 ptr->write_buffer_offset= 0;
343
344 return return_length;
345 }
346
347 /*
348 Eventually we will just kill off the server with the problem.
349 */
350 void memcached_io_reset(memcached_server_st *ptr)
351 {
352 memcached_quit_server(ptr, 0);
353 }