Refactor of async code. poll() is now only called when needed.
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
16 memcached_return *error);
17
18 static memcached_return io_wait(memcached_st *ptr, unsigned int server_key,
19 memc_read_or_write read_or_write)
20 {
21 struct pollfd fds[1];
22 short flags= 0;
23 int error;
24
25 if (read_or_write == MEM_WRITE) /* write */
26 flags= POLLOUT | POLLERR;
27 else
28 flags= POLLIN | POLLERR;
29
30 memset(&fds, 0, sizeof(struct pollfd));
31 fds[0].fd= ptr->hosts[server_key].fd;
32 fds[0].events= flags;
33
34 error= poll(fds, 1, ptr->poll_timeout);
35
36 if (error == 1)
37 return MEMCACHED_SUCCESS;
38 else if (error == 0)
39 {
40 WATCHPOINT_NUMBER(read_or_write);
41 return MEMCACHED_TIMEOUT;
42 }
43
44 WATCHPOINT;
45 /* Imposssible for anything other then -1 */
46 WATCHPOINT_ASSERT(error == -1);
47 memcached_quit_server(ptr, server_key, 1);
48 return MEMCACHED_FAILURE;
49
50 }
51
52 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
53 char *buffer, size_t length)
54 {
55 char *buffer_ptr;
56
57 buffer_ptr= buffer;
58
59 while (length)
60 {
61 if (!ptr->hosts[server_key].read_buffer_length)
62 {
63 size_t data_read;
64
65 while (1)
66 {
67 data_read= read(ptr->hosts[server_key].fd,
68 ptr->hosts[server_key].read_buffer,
69 MEMCACHED_MAX_BUFFER);
70 if (data_read == -1)
71 {
72 switch (errno)
73 {
74 case EAGAIN:
75 {
76 memcached_return rc;
77 rc= io_wait(ptr, server_key, MEM_READ);
78
79 if (rc == MEMCACHED_SUCCESS)
80 continue;
81
82 memcached_quit_server(ptr, server_key, 1);
83 return -1;
84 }
85 default:
86 {
87 memcached_quit_server(ptr, server_key, 1);
88 ptr->cached_errno= errno;
89 return -1;
90 }
91 }
92 }
93 else if (data_read)
94 break;
95 /* If zero, just keep looping */
96 }
97
98 ptr->hosts[server_key].read_buffer_length= data_read;
99 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
100 }
101
102 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
103 length--;
104 ptr->hosts[server_key].read_ptr++;
105 ptr->hosts[server_key].read_buffer_length--;
106 buffer_ptr++;
107 }
108
109 return (size_t)(buffer_ptr - buffer);
110 }
111
112 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
113 char *buffer, size_t length, char with_flush)
114 {
115 unsigned long long x;
116
117 for (x= 0; x < length; x++)
118 {
119 if (ptr->hosts[server_key].write_ptr == 0)
120 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
121 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
122 *ptr->hosts[server_key].write_ptr= buffer[x];
123 ptr->hosts[server_key].write_ptr++;
124 ptr->hosts[server_key].write_buffer_offset++;
125
126 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
127 {
128 memcached_return rc;
129 size_t sent_length;
130
131 sent_length= io_flush(ptr, server_key, &rc);
132 if (sent_length == -1)
133 return -1;
134
135 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
136 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
137 ptr->hosts[server_key].write_buffer_offset= 0;
138 }
139 }
140
141 if (with_flush)
142 {
143 memcached_return rc;
144 if (io_flush(ptr, server_key, &rc) == -1)
145 return -1;
146 }
147
148 return length;
149 }
150
151 memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
152 {
153 memcached_return rc;
154
155 rc= MEMCACHED_SUCCESS;
156 if (ptr->flags & MEM_NO_BLOCK)
157 {
158 int error;
159 struct pollfd fds[1];
160 short flags= 0;
161
162 flags= POLLHUP | POLLERR;
163
164 memset(&fds, 0, sizeof(struct pollfd));
165 fds[0].fd= ptr->hosts[server_key].fd;
166 fds[0].events= flags;
167 fds[0].revents= 0;
168
169 error= poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout);
170
171 if (error == -1)
172 {
173 memcached_quit_server(ptr, server_key, 1);
174 return MEMCACHED_FAILURE;
175 }
176 else if (error == 0)
177 return MEMCACHED_FAILURE; /* Timeout occurred */
178 }
179
180 close(ptr->hosts[server_key].fd);
181
182 return rc;
183 }
184
185 static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
186 memcached_return *error)
187 {
188 size_t sent_length;
189 size_t return_length;
190 char *write_ptr= ptr->hosts[server_key].write_buffer;
191 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
192
193 *error= MEMCACHED_SUCCESS;
194
195 if (ptr->hosts[server_key].write_buffer_offset == 0)
196 return 0;
197
198 return_length= 0;
199 while (write_length)
200 {
201 sent_length= 0;
202 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
203 {
204 sent_length= sendto(ptr->hosts[server_key].fd,
205 write_ptr, write_length, 0,
206 (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
207 sizeof(struct sockaddr));
208 }
209 else
210 {
211 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
212 write_length)) == -1)
213 {
214 switch (errno)
215 {
216 case ENOBUFS:
217 continue;
218 case EAGAIN:
219 {
220 memcached_return rc;
221 rc= io_wait(ptr, server_key, MEM_WRITE);
222
223 if (rc == MEMCACHED_SUCCESS)
224 continue;
225
226 memcached_quit_server(ptr, server_key, 1);
227 return -1;
228 }
229 default:
230 memcached_quit_server(ptr, server_key, 1);
231 ptr->cached_errno= errno;
232 *error= MEMCACHED_ERRNO;
233 return -1;
234 }
235 }
236 }
237
238 write_ptr+= sent_length;
239 write_length-= sent_length;
240 return_length+= sent_length;
241 }
242
243 WATCHPOINT_ASSERT(write_length == 0);
244 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
245 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
246 ptr->hosts[server_key].write_buffer_offset= 0;
247
248 return return_length;
249 }
250
251 /*
252 Eventually we will just kill off the server with the problem.
253 */
254 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
255 {
256 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
257 ptr->hosts[server_key].write_buffer_offset= 0;
258 memcached_quit(ptr);
259 }