8f72f54d0104fe255c1503a068713e9b01d17ff4
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write)
11 {
12 struct pollfd fds[1];
13 short flags= 0;
14 int error;
15 int latch= 0;
16
17 if (read_or_write)
18 flags= POLLOUT | POLLERR;
19 else
20 flags= POLLIN | POLLERR;
21
22 memset(&fds, 0, sizeof(struct pollfd));
23 fds[0].fd= ptr->hosts[server_key].fd;
24 fds[0].events= flags;
25
26 while (latch == 0)
27 {
28 error= poll(fds, 1, ptr->poll_timeout);
29
30 if (error == 1)
31 return MEMCACHED_SUCCESS;
32 else if (error == -1)
33 {
34 memcached_quit_server(ptr, server_key, 1);
35 return MEMCACHED_FAILURE;
36 }
37 else if (error)
38 {
39 /* This is impossible */
40 WATCHPOINT_ASSERT(0);
41 return MEMCACHED_FAILURE;
42 }
43 else
44 latch++;
45 }
46
47 memcached_quit_server(ptr, server_key, 1);
48
49 return MEMCACHED_FAILURE; /* Timeout occurred */
50 }
51
52 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
53 char *buffer, size_t length)
54 {
55 char *buffer_ptr;
56
57 buffer_ptr= buffer;
58
59 while (length)
60 {
61 if (!ptr->hosts[server_key].read_buffer_length)
62 {
63 size_t data_read;
64
65 while (1)
66 {
67 if (ptr->flags & MEM_NO_BLOCK)
68 {
69 memcached_return rc;
70
71 rc= io_wait(ptr, server_key, 0);
72 if (rc != MEMCACHED_SUCCESS)
73 return -1;
74 }
75
76 data_read= read(ptr->hosts[server_key].fd,
77 ptr->hosts[server_key].read_buffer,
78 MEMCACHED_MAX_BUFFER);
79 if (data_read == -1)
80 {
81 switch (errno)
82 {
83 case EAGAIN:
84 break;
85 default:
86 {
87 memcached_quit_server(ptr, server_key, 1);
88 ptr->cached_errno= errno;
89 return -1;
90 }
91 }
92 }
93 else if (data_read)
94 break;
95 /* If zero, just keep looping */
96 }
97
98 ptr->hosts[server_key].read_buffer_length= data_read;
99 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
100 }
101
102 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
103 length--;
104 ptr->hosts[server_key].read_ptr++;
105 ptr->hosts[server_key].read_buffer_length--;
106 buffer_ptr++;
107 }
108
109 return (size_t)(buffer_ptr - buffer);
110 }
111
112 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
113 char *buffer, size_t length, char with_flush)
114 {
115 unsigned long long x;
116
117 for (x= 0; x < length; x++)
118 {
119 if (ptr->hosts[server_key].write_ptr == 0)
120 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
121 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
122 *ptr->hosts[server_key].write_ptr= buffer[x];
123 ptr->hosts[server_key].write_ptr++;
124 ptr->hosts[server_key].write_buffer_offset++;
125
126 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
127 {
128 size_t sent_length;
129
130 sent_length= memcached_io_flush(ptr, server_key);
131 if (sent_length == -1)
132 return -1;
133
134 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
135 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
136 ptr->hosts[server_key].write_buffer_offset= 0;
137 }
138 }
139
140 if (with_flush)
141 {
142 if (memcached_io_flush(ptr, server_key) == -1)
143 return -1;
144 }
145
146 return length;
147 }
148
149 memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
150 {
151 memcached_return rc;
152
153 rc= MEMCACHED_SUCCESS;
154 if (ptr->flags & MEM_NO_BLOCK)
155 {
156 int error;
157 struct pollfd fds[1];
158 short flags= 0;
159
160 flags= POLLHUP | POLLERR;
161
162 memset(&fds, 0, sizeof(struct pollfd));
163 fds[0].fd= ptr->hosts[server_key].fd;
164 fds[0].events= flags;
165 fds[0].revents= 0;
166
167 error= poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout);
168
169 if (error == -1)
170 {
171 memcached_quit_server(ptr, server_key, 1);
172 return MEMCACHED_FAILURE;
173 }
174 else if (error == 0)
175 return MEMCACHED_FAILURE; /* Timeout occurred */
176 }
177
178 close(ptr->hosts[server_key].fd);
179
180 return rc;
181 }
182
183 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
184 {
185 size_t sent_length;
186 size_t return_length;
187 char *write_ptr= ptr->hosts[server_key].write_buffer;
188 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
189 unsigned int loop= 1;
190
191 if (ptr->hosts[server_key].write_buffer_offset == 0)
192 return 0;
193
194 return_length= 0;
195 while (write_length)
196 {
197 if (ptr->flags & MEM_NO_BLOCK)
198 {
199 memcached_return rc;
200
201 rc= io_wait(ptr, server_key, 1);
202 if (rc != MEMCACHED_SUCCESS)
203 return -1;
204 }
205
206 sent_length= 0;
207 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
208 {
209 sent_length= sendto(ptr->hosts[server_key].fd,
210 write_ptr, write_length, 0,
211 (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
212 sizeof(struct sockaddr));
213 }
214 else
215 {
216 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
217 write_length)) == -1)
218 {
219 switch (errno)
220 {
221 case ENOBUFS:
222 case EAGAIN:
223 WATCHPOINT;
224 continue;
225 if (loop < 100)
226 {
227 loop++;
228 break;
229 }
230 /* Yes, we want to fall through */
231 default:
232 memcached_quit_server(ptr, server_key, 1);
233 ptr->cached_errno= errno;
234 return -1;
235 }
236 }
237 }
238
239 write_ptr+= sent_length;
240 write_length-= sent_length;
241 return_length+= sent_length;
242 }
243
244 WATCHPOINT_ASSERT(write_length == 0);
245 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
246 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
247 ptr->hosts[server_key].write_buffer_offset= 0;
248
249 return return_length;
250 }
251
252 /*
253 Eventually we will just kill off the server with the problem.
254 */
255 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
256 {
257 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
258 ptr->hosts[server_key].write_buffer_offset= 0;
259 memcached_quit(ptr);
260 }