Remove dead variable and on close check value of file descriptor.
[awesomized/libmemcached] / libmemcached / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_server_st *ptr, memcached_return *error);
16
17 static memcached_return io_wait(memcached_server_st *ptr,
18 memc_read_or_write read_or_write)
19 {
20 struct pollfd fds[1];
21 short flags= 0;
22 int error;
23
24 if (read_or_write == MEM_WRITE) /* write */
25 flags= POLLOUT | POLLERR;
26 else
27 flags= POLLIN | POLLERR;
28
29 memset(&fds, 0, sizeof(struct pollfd));
30 fds[0].fd= ptr->fd;
31 fds[0].events= flags;
32
33 /*
34 ** We are going to block on write, but at least on Solaris we might block
35 ** on write if we haven't read anything from our input buffer..
36 ** Try to purge the input buffer if we don't do any flow control in the
37 ** application layer (just sending a lot of data etc)
38 ** The test is moved down in the purge function to avoid duplication of
39 ** the test.
40 */
41 if (read_or_write == MEM_WRITE)
42 memcached_purge(ptr);
43
44 error= poll(fds, 1, ptr->root->poll_timeout);
45
46 if (error == 1)
47 return MEMCACHED_SUCCESS;
48 else if (error == 0)
49 {
50 return MEMCACHED_TIMEOUT;
51 }
52
53 /* Imposssible for anything other then -1 */
54 WATCHPOINT_ASSERT(error == -1);
55 memcached_quit_server(ptr, 1);
56
57 return MEMCACHED_FAILURE;
58
59 }
60
61 #ifdef UNUSED
62 void memcached_io_preread(memcached_st *ptr)
63 {
64 unsigned int x;
65
66 return;
67
68 for (x= 0; x < ptr->number_of_hosts; x++)
69 {
70 if (memcached_server_response_count(ptr, x) &&
71 ptr->hosts[x].read_data_length < MEMCACHED_MAX_BUFFER )
72 {
73 size_t data_read;
74
75 data_read= read(ptr->hosts[x].fd,
76 ptr->hosts[x].read_ptr + ptr->hosts[x].read_data_length,
77 MEMCACHED_MAX_BUFFER - ptr->hosts[x].read_data_length);
78 if (data_read == -1)
79 continue;
80
81 ptr->hosts[x].read_buffer_length+= data_read;
82 ptr->hosts[x].read_data_length+= data_read;
83 }
84 }
85 }
86 #endif
87
88 ssize_t memcached_io_read(memcached_server_st *ptr,
89 void *buffer, size_t length)
90 {
91 char *buffer_ptr;
92
93 buffer_ptr= buffer;
94
95 while (length)
96 {
97 uint8_t found_eof= 0;
98 if (!ptr->read_buffer_length)
99 {
100 ssize_t data_read;
101
102 while (1)
103 {
104 data_read= read(ptr->fd,
105 ptr->read_buffer,
106 MEMCACHED_MAX_BUFFER);
107 if (data_read > 0)
108 break;
109 else if (data_read == -1)
110 {
111 ptr->cached_errno= errno;
112 switch (errno)
113 {
114 case EAGAIN:
115 {
116 memcached_return rc;
117
118 rc= io_wait(ptr, MEM_READ);
119
120 if (rc == MEMCACHED_SUCCESS)
121 continue;
122 }
123 /* fall trough */
124 default:
125 {
126 memcached_quit_server(ptr, 1);
127 return -1;
128 }
129 }
130 }
131 else
132 {
133 found_eof= 1;
134 break;
135 }
136 }
137
138 ptr->io_bytes_sent = 0;
139 ptr->read_data_length= data_read;
140 ptr->read_buffer_length= data_read;
141 ptr->read_ptr= ptr->read_buffer;
142 }
143
144 if (length > 1)
145 {
146 size_t difference;
147
148 difference= (length > ptr->read_buffer_length) ? ptr->read_buffer_length : length;
149
150 memcpy(buffer_ptr, ptr->read_ptr, difference);
151 length -= difference;
152 ptr->read_ptr+= difference;
153 ptr->read_buffer_length-= difference;
154 buffer_ptr+= difference;
155 }
156 else
157 {
158 *buffer_ptr= *ptr->read_ptr;
159 ptr->read_ptr++;
160 ptr->read_buffer_length--;
161 buffer_ptr++;
162 break;
163 }
164
165 if (found_eof)
166 break;
167 }
168
169 return (size_t)(buffer_ptr - (char*)buffer);
170 }
171
172 ssize_t memcached_io_write(memcached_server_st *ptr,
173 const void *buffer, size_t length, char with_flush)
174 {
175 size_t original_length;
176 const char* buffer_ptr;
177
178 original_length= length;
179 buffer_ptr= buffer;
180
181 while (length)
182 {
183 char *write_ptr;
184 size_t should_write;
185
186 should_write= MEMCACHED_MAX_BUFFER - ptr->write_buffer_offset;
187 write_ptr= ptr->write_buffer + ptr->write_buffer_offset;
188
189 should_write= (should_write < length) ? should_write : length;
190
191 memcpy(write_ptr, buffer_ptr, should_write);
192 ptr->write_buffer_offset+= should_write;
193 buffer_ptr+= should_write;
194 length-= should_write;
195
196 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
197 {
198 memcached_return rc;
199 ssize_t sent_length;
200
201 sent_length= io_flush(ptr, &rc);
202 if (sent_length == -1)
203 return -1;
204
205 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
206 }
207 }
208
209 if (with_flush)
210 {
211 memcached_return rc;
212 if (io_flush(ptr, &rc) == -1)
213 return -1;
214 }
215
216 return original_length;
217 }
218
219 memcached_return memcached_io_close(memcached_server_st *ptr)
220 {
221 int r;
222
223 if (ptr->fd == -1)
224 return MEMCACHED_SUCCESS;
225
226 /* in case of death shutdown to avoid blocking at close() */
227 r= shutdown(ptr->fd, SHUT_RDWR);
228
229 #ifdef HAVE_DEBUG
230 if (r && errno != ENOTCONN)
231 {
232 WATCHPOINT_NUMBER(ptr->fd);
233 WATCHPOINT_ERRNO(errno);
234 WATCHPOINT_ASSERT(errno);
235 }
236 #endif
237
238 r= close(ptr->fd);
239 #ifdef HAVE_DEBUG
240 if (r != 0)
241 WATCHPOINT_ERRNO(errno);
242 #endif
243
244 return MEMCACHED_SUCCESS;
245 }
246
247 static ssize_t io_flush(memcached_server_st *ptr,
248 memcached_return *error)
249 {
250 ssize_t sent_length;
251 size_t return_length;
252 char *local_write_ptr= ptr->write_buffer;
253 size_t write_length= ptr->write_buffer_offset;
254
255 *error= MEMCACHED_SUCCESS;
256
257 if (ptr->write_buffer_offset == 0)
258 return 0;
259
260 /* Looking for memory overflows */
261 #if defined(HAVE_DEBUG)
262 if (write_length == MEMCACHED_MAX_BUFFER)
263 WATCHPOINT_ASSERT(ptr->write_buffer == local_write_ptr);
264 WATCHPOINT_ASSERT((ptr->write_buffer + MEMCACHED_MAX_BUFFER) >= (local_write_ptr + write_length));
265 #endif
266
267 return_length= 0;
268 while (write_length)
269 {
270 WATCHPOINT_ASSERT(write_length > 0);
271 sent_length= 0;
272 if (ptr->type == MEMCACHED_CONNECTION_UDP)
273 {
274 struct addrinfo *ai;
275
276 ai= ptr->address_info;
277
278 /* Crappy test code */
279 char buffer[HUGE_STRING_LEN + 8];
280 memset(buffer, 0, HUGE_STRING_LEN + 8);
281 memcpy (buffer+8, local_write_ptr, write_length);
282 buffer[0]= 0;
283 buffer[1]= 0;
284 buffer[2]= 0;
285 buffer[3]= 0;
286 buffer[4]= 0;
287 buffer[5]= 1;
288 buffer[6]= 0;
289 buffer[7]= 0;
290 sent_length= sendto(ptr->fd, buffer, write_length + 8, 0,
291 (struct sockaddr *)ai->ai_addr,
292 ai->ai_addrlen);
293 if (sent_length == -1)
294 {
295 WATCHPOINT_ERRNO(errno);
296 WATCHPOINT_ASSERT(0);
297 }
298 sent_length-= 8; /* We remove the header */
299 }
300 else
301 {
302 /*
303 ** We might want to purge the input buffer if we haven't consumed
304 ** any output yet... The test for the limits is the purge is inline
305 ** in the purge function to avoid duplicating the logic..
306 */
307 memcached_purge(ptr);
308
309 if ((sent_length= write(ptr->fd, local_write_ptr,
310 write_length)) == -1)
311 {
312 switch (errno)
313 {
314 case ENOBUFS:
315 continue;
316 case EAGAIN:
317 {
318 memcached_return rc;
319 rc= io_wait(ptr, MEM_WRITE);
320
321 if (rc == MEMCACHED_SUCCESS)
322 continue;
323
324 memcached_quit_server(ptr, 1);
325 return -1;
326 }
327 default:
328 memcached_quit_server(ptr, 1);
329 ptr->cached_errno= errno;
330 *error= MEMCACHED_ERRNO;
331 return -1;
332 }
333 }
334 }
335
336 ptr->io_bytes_sent += sent_length;
337
338 local_write_ptr+= sent_length;
339 write_length-= sent_length;
340 return_length+= sent_length;
341 }
342
343 WATCHPOINT_ASSERT(write_length == 0);
344 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
345 // ptr->write_buffer_offset);
346 ptr->write_buffer_offset= 0;
347
348 return return_length;
349 }
350
351 /*
352 Eventually we will just kill off the server with the problem.
353 */
354 void memcached_io_reset(memcached_server_st *ptr)
355 {
356 memcached_quit_server(ptr, 0);
357 }