Fix for config to not be stored into filesystem.
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 #ifdef NOT_DONE
34 /*
35 ** We are going to block on write, but at least on Solaris we might block
36 ** on write if we haven't read anything from our input buffer..
37 ** Try to purge the input buffer if we don't do any flow control in the
38 ** application layer (just sending a lot of data etc)
39 ** The test is moved down in the purge function to avoid duplication of
40 ** the test.
41 */
42 if (read_or_write == MEM_WRITE)
43 {
44 if (memcached_purge(ptr) != MEMCACHED_SUCCESS || memcached_purge(ptr) != MEMCACHED_STORED)
45 return MEMCACHED_FAILURE;
46 }
47 #endif
48
49 error= poll(fds, 1, ptr->root->poll_timeout);
50
51 if (error == 1)
52 return MEMCACHED_SUCCESS;
53 else if (error == 0)
54 {
55 return MEMCACHED_TIMEOUT;
56 }
57
58 /* Imposssible for anything other then -1 */
59 WATCHPOINT_ASSERT(error == -1);
60 memcached_quit_server(ptr, 1);
61
62 return MEMCACHED_FAILURE;
63
64 }
65
66 #ifdef UNUSED
67 void memcached_io_preread(memcached_st *ptr)
68 {
69 unsigned int x;
70
71 return;
72
73 for (x= 0; x < ptr->number_of_hosts; x++)
74 {
75 if (memcached_server_response_count(ptr, x) &&
76 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
77 {
78 size_t data_read;
79
80 data_read= read(ptr->hosts[x].fd,
81 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
82 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
83 if (data_read == -1)
84 continue;
85
86 ptr->hosts[x].read_buffer_length+= data_read;
87 ptr->hosts[x].read_data_length+= data_read;
88 }
89 }
90 }
91 #endif
92
93 ssize_t memcached_io_read(memcached_server_st *ptr,
94 void *buffer, size_t length)
95 {
96 char *buffer_ptr;
97
98 buffer_ptr= buffer;
99
100 while (length)
101 {
102 if (!ptr->read_buffer_length)
103 {
104 ssize_t data_read;
105
106 while (1)
107 {
108 data_read= read(ptr->fd,
109 ptr->read_buffer,
110 MEMCACHED_MAX_BUFFER);
111 if (data_read > 0)
112 break;
113 else if (data_read == -1)
114 {
115 ptr->cached_errno= errno;
116 switch (errno)
117 {
118 case EAGAIN:
119 case EINTR:
120 {
121 memcached_return rc;
122
123 rc= io_wait(ptr, MEM_READ);
124
125 if (rc == MEMCACHED_SUCCESS)
126 continue;
127 }
128 /* fall trough */
129 default:
130 {
131 memcached_quit_server(ptr, 1);
132 return -1;
133 }
134 }
135 }
136 else
137 {
138 /*
139 EOF. Any data received so far is incomplete
140 so discard it. This always reads by byte in case of TCP
141 and protocol enforcement happens at memcached_response()
142 looking for '\n'. We do not care for UDB which requests 8 bytes
143 at once. Generally, this means that connection went away. Since
144 for blocking I/O we do not return 0 and for non-blocking case
145 it will return EGAIN if data is not immediatly available.
146 */
147 memcached_quit_server(ptr, 1);
148 return -1;
149 }
150 }
151
152 ptr->io_bytes_sent = 0;
153 ptr->read_data_length= data_read;
154 ptr->read_buffer_length= data_read;
155 ptr->read_ptr= ptr->read_buffer;
156 }
157
158 if (length > 1)
159 {
160 size_t difference;
161
162 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
163
164 memcpy(buffer_ptr, ptr->read_ptr, difference);
165 length -= difference;
166 ptr->read_ptr+= difference;
167 ptr->read_buffer_length-= difference;
168 buffer_ptr+= difference;
169 }
170 else
171 {
172 *buffer_ptr= *ptr->read_ptr;
173 ptr->read_ptr++;
174 ptr->read_buffer_length--;
175 buffer_ptr++;
176 break;
177 }
178 }
179
180 return (size_t)(buffer_ptr - (char*)buffer);
181 }
182
183 ssize_t memcached_io_write(memcached_server_st *ptr,
184 const void *buffer, size_t length, char with_flush)
185 {
186 size_t original_length;
187 const char* buffer_ptr;
188
189 WATCHPOINT_ASSERT(ptr->fd != -1);
190
191 original_length= length;
192 buffer_ptr= buffer;
193
194 while (length)
195 {
196 char *write_ptr;
197 size_t should_write;
198
199 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
200 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
201
202 should_write= (should_write < length) ? should_write : length;
203
204 memcpy(write_ptr, buffer_ptr, should_write);
205 ptr->write_buffer_offset+= should_write;
206 buffer_ptr+= should_write;
207 length-= should_write;
208
209 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
210 {
211 memcached_return rc;
212 ssize_t sent_length;
213
214 WATCHPOINT_ASSERT(ptr->fd != -1);
215 sent_length= io_flush(ptr, &rc);
216 if (sent_length == -1)
217 return -1;
218
219 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
220 }
221 }
222
223 if (with_flush)
224 {
225 memcached_return rc;
226 WATCHPOINT_ASSERT(ptr->fd != -1);
227 if (io_flush(ptr, &rc) == -1)
228 return -1;
229 }
230
231 return original_length;
232 }
233
234 memcached_return memcached_io_close(memcached_server_st *ptr)
235 {
236 int r;
237
238 if (ptr->fd == -1)
239 return MEMCACHED_SUCCESS;
240
241 /* in case of death shutdown to avoid blocking at close() */
242 if (1)
243 {
244 r= shutdown(ptr->fd, SHUT_RDWR);
245
246 #ifdef HAVE_DEBUG
247 if (r && errno != ENOTCONN)
248 {
249 WATCHPOINT_NUMBER(ptr->fd);
250 WATCHPOINT_ERRNO(errno);
251 WATCHPOINT_ASSERT(errno);
252 }
253 #endif
254 }
255
256 r= close(ptr->fd);
257 #ifdef HAVE_DEBUG
258 if (r != 0)
259 WATCHPOINT_ERRNO(errno);
260 #endif
261
262 return MEMCACHED_SUCCESS;
263 }
264
265 static ssize_t io_flush(memcached_server_st *ptr,
266 memcached_return *error)
267 {
268 ssize_t sent_length;
269 size_t return_length;
270 char *local_write_ptr= ptr->write_buffer;
271 size_t write_length= ptr->write_buffer_offset;
272
273 *error= MEMCACHED_SUCCESS;
274
275 WATCHPOINT_ASSERT(ptr->fd != -1);
276
277 if (ptr->write_buffer_offset == 0)
278 return 0;
279
280 /* Looking for memory overflows */
281 #if defined(HAVE_DEBUG)
282 if (write_length == MEMCACHED_MAX_BUFFER)
283 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
284 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
285 #endif
286
287 return_length= 0;
288 while (write_length)
289 {
290 WATCHPOINT_ASSERT(ptr->fd != -1);
291 WATCHPOINT_ASSERT(write_length > 0);
292 sent_length= 0;
293 if (ptr->type == MEMCACHED_CONNECTION_UDP)
294 {
295 struct addrinfo *ai;
296
297 ai= ptr->address_info;
298
299 /* Crappy test code */
300 char buffer[HUGE_STRING_LEN + 8];
301 memset(buffer, 0, HUGE_STRING_LEN + 8);
302 memcpy (buffer+8, local_write_ptr, write_length);
303 buffer[0]= 0;
304 buffer[1]= 0;
305 buffer[2]= 0;
306 buffer[3]= 0;
307 buffer[4]= 0;
308 buffer[5]= 1;
309 buffer[6]= 0;
310 buffer[7]= 0;
311 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
312 (struct sockaddr *)ai->ai_addr,
313 ai->ai_addrlen);
314 if (sent_length == -1)
315 {
316 WATCHPOINT_ERRNO(errno);
317 WATCHPOINT_ASSERT(0);
318 }
319 sent_length-= 8; /* We remove the header */
320 }
321 else
322 {
323 #ifdef NOT_DONE
324 /*
325 ** We might want to purge the input buffer if we haven't consumed
326 ** any output yet... The test for the limits is the purge is inline
327 ** in the purge function to avoid duplicating the logic..
328 */
329 {
330 memcached_return rc;
331 WATCHPOINT_ASSERT(ptr->fd != -1);
332 rc= memcached_purge(ptr);
333
334 if (rc != MEMCACHED_SUCCESS || rc != MEMCACHED_STORED)
335 return -1;
336 }
337 #endif
338
339 WATCHPOINT_ASSERT(ptr->fd != -1);
340 if ((sent_length= write(ptr->fd, local_write_ptr,
341 write_length)) == -1)
342 {
343 switch (errno)
344 {
345 case ENOBUFS:
346 continue;
347 case EAGAIN:
348 {
349 memcached_return rc;
350 rc= io_wait(ptr, MEM_WRITE);
351
352 if (rc == MEMCACHED_SUCCESS)
353 continue;
354
355 memcached_quit_server(ptr, 1);
356 return -1;
357 }
358 default:
359 memcached_quit_server(ptr, 1);
360 ptr->cached_errno= errno;
361 *error= MEMCACHED_ERRNO;
362 return -1;
363 }
364 }
365 }
366
367 ptr->io_bytes_sent += sent_length;
368
369 local_write_ptr+= sent_length;
370 write_length-= sent_length;
371 return_length+= sent_length;
372 }
373
374 WATCHPOINT_ASSERT(write_length == 0);
375 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
376 // ptr->write_buffer_offset);
377 ptr->write_buffer_offset= 0;
378
379 return return_length;
380 }
381
382 /*
383 Eventually we will just kill off the server with the problem.
384 */
385 void memcached_io_reset(memcached_server_st *ptr)
386 {
387 memcached_quit_server(ptr, 0);
388 }