Fixed memcached_get() to now use finish instead of faking extra response.
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write)
11 {
12 struct pollfd fds[1];
13 short flags= 0;
14
15 if (read_or_write)
16 flags= POLLOUT | POLLERR;
17 else
18 flags= POLLIN | POLLERR;
19
20 memset(&fds, 0, sizeof(struct pollfd));
21 fds[0].fd= ptr->hosts[server_key].fd;
22 fds[0].events= flags;
23
24 if (poll(fds, 1, ptr->poll_timeout) < 0)
25 return MEMCACHED_FAILURE;
26
27 return MEMCACHED_SUCCESS;
28 }
29
30 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
31 char *buffer, size_t length)
32 {
33 char *buffer_ptr;
34
35 buffer_ptr= buffer;
36
37 while (length)
38 {
39 if (!ptr->hosts[server_key].read_buffer_length)
40 {
41 size_t data_read;
42
43 while (1)
44 {
45 if (ptr->flags & MEM_NO_BLOCK)
46 {
47 memcached_return rc;
48
49 rc= io_wait(ptr, server_key, 0);
50 if (rc != MEMCACHED_SUCCESS)
51 return -1;
52 }
53
54 data_read= read(ptr->hosts[server_key].fd,
55 ptr->hosts[server_key].read_buffer,
56 MEMCACHED_MAX_BUFFER);
57 if (data_read == -1)
58 {
59 switch (errno)
60 {
61 case EAGAIN:
62 break;
63 default:
64 {
65 ptr->cached_errno= errno;
66 return -1;
67 }
68 }
69 }
70 else if (data_read)
71 break;
72 /* If zero, just keep looping */
73 }
74
75 ptr->hosts[server_key].read_buffer_length= data_read;
76 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
77 }
78
79 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
80 length--;
81 ptr->hosts[server_key].read_ptr++;
82 ptr->hosts[server_key].read_buffer_length--;
83 buffer_ptr++;
84 }
85
86 return (size_t)(buffer_ptr - buffer);
87 }
88
89 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
90 char *buffer, size_t length, char with_flush)
91 {
92 unsigned long long x;
93
94 for (x= 0; x < length; x++)
95 {
96 if (ptr->hosts[server_key].write_ptr == 0)
97 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
98 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
99 *ptr->hosts[server_key].write_ptr= buffer[x];
100 ptr->hosts[server_key].write_ptr++;
101 ptr->hosts[server_key].write_buffer_offset++;
102
103 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
104 {
105 size_t sent_length;
106
107 sent_length= memcached_io_flush(ptr, server_key);
108
109 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
110 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
111 ptr->hosts[server_key].write_buffer_offset= 0;
112 }
113 }
114
115 if (with_flush)
116 {
117 if (memcached_io_flush(ptr, server_key) == -1)
118 return -1;
119 }
120
121 return length;
122 }
123
124 memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
125 {
126 memcached_return rc;
127
128 rc= MEMCACHED_SUCCESS;
129 if (ptr->flags & MEM_NO_BLOCK)
130 {
131 struct pollfd fds[1];
132 short flags= 0;
133
134 flags= POLLHUP | POLLERR;
135
136 memset(&fds, 0, sizeof(struct pollfd));
137 fds[0].fd= ptr->hosts[server_key].fd;
138 fds[0].events= flags;
139 fds[0].revents= 0;
140
141 if (poll(fds, 1, ptr->poll_timeout == -1 ? 100 : ptr->poll_timeout) < 0)
142 rc= MEMCACHED_FAILURE;
143 }
144
145 close(ptr->hosts[server_key].fd);
146
147 return rc;
148 }
149
150 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
151 {
152 size_t sent_length;
153 size_t return_length;
154 char *write_ptr= ptr->hosts[server_key].write_buffer;
155 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
156 unsigned int loop= 1;
157
158 if (ptr->hosts[server_key].write_buffer_offset == 0)
159 return 0;
160
161 return_length= 0;
162 while (write_length)
163 {
164 if (ptr->flags & MEM_NO_BLOCK)
165 {
166 memcached_return rc;
167
168 rc= io_wait(ptr, server_key, 1);
169 if (rc != MEMCACHED_SUCCESS)
170 return -1;
171 }
172
173 sent_length= 0;
174 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
175 {
176 sent_length= sendto(ptr->hosts[server_key].fd,
177 write_ptr, write_length, 0,
178 (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
179 sizeof(struct sockaddr));
180 }
181 else
182 {
183 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
184 write_length)) == -1)
185 {
186 switch (errno)
187 {
188 case ENOBUFS:
189 case EAGAIN:
190 WATCHPOINT;
191 continue;
192 if (loop < 100)
193 {
194 loop++;
195 break;
196 }
197 /* Yes, we want to fall through */
198 default:
199 ptr->cached_errno= errno;
200 return -1;
201 }
202 }
203 }
204
205 write_ptr+= sent_length;
206 write_length-= sent_length;
207 return_length+= sent_length;
208 }
209
210 WATCHPOINT_ASSERT(write_length == 0);
211 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
212 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
213 ptr->hosts[server_key].write_buffer_offset= 0;
214
215 return return_length;
216 }
217
218 /*
219 Eventually we will just kill off the server with the problem.
220 */
221 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
222 {
223 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
224 ptr->hosts[server_key].write_buffer_offset= 0;
225 memcached_quit(ptr);
226 }