0657dfd65da7f2e2a3e5bfb2df116596178627e3
[m6w6/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write)
11 {
12 struct pollfd fds[1];
13 short flags= 0;
14 struct timespec timer;
15
16 timer.tv_sec= 1;
17 timer.tv_nsec= 0;
18 if (read_or_write)
19 flags= POLLOUT | POLLERR;
20 else
21 flags= POLLIN | POLLERR;
22
23 memset(&fds, 0, sizeof(struct pollfd));
24 fds[0].fd= ptr->hosts[server_key].fd;
25 fds[0].events= flags;
26
27 if (poll(fds, 1, ptr->poll_timeout) < 0)
28 return MEMCACHED_FAILURE;
29
30 return MEMCACHED_SUCCESS;
31 }
32
33 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
34 char *buffer, size_t length)
35 {
36 char *buffer_ptr;
37
38 buffer_ptr= buffer;
39
40 while (length)
41 {
42 if (!ptr->hosts[server_key].read_buffer_length)
43 {
44 size_t data_read;
45
46 while (1)
47 {
48 if (ptr->flags & MEM_NO_BLOCK)
49 {
50 memcached_return rc;
51
52 rc= io_wait(ptr, server_key, 0);
53 if (rc != MEMCACHED_SUCCESS)
54 return -1;
55 }
56
57 data_read= read(ptr->hosts[server_key].fd,
58 ptr->hosts[server_key].read_buffer,
59 MEMCACHED_MAX_BUFFER);
60 if (data_read == -1)
61 {
62 switch (errno)
63 {
64 case EAGAIN:
65 break;
66 default:
67 {
68 ptr->cached_errno= errno;
69 return -1;
70 }
71 }
72 }
73 else if (data_read)
74 break;
75 /* If zero, just keep looping */
76 }
77
78 ptr->hosts[server_key].read_buffer_length= data_read;
79 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
80 }
81
82 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
83 length--;
84 ptr->hosts[server_key].read_ptr++;
85 ptr->hosts[server_key].read_buffer_length--;
86 buffer_ptr++;
87 }
88
89 return (size_t)(buffer_ptr - buffer);
90 }
91
92 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
93 char *buffer, size_t length, char with_flush)
94 {
95 unsigned long long x;
96
97 for (x= 0; x < length; x++)
98 {
99 if (ptr->hosts[server_key].write_ptr == 0)
100 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
101 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
102 *ptr->hosts[server_key].write_ptr= buffer[x];
103 ptr->hosts[server_key].write_ptr++;
104 ptr->hosts[server_key].write_buffer_offset++;
105
106 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
107 {
108 size_t sent_length;
109
110 sent_length= memcached_io_flush(ptr, server_key);
111
112 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
113 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
114 ptr->hosts[server_key].write_buffer_offset= 0;
115 }
116 }
117
118 if (with_flush)
119 {
120 if (memcached_io_flush(ptr, server_key) == -1)
121 return -1;
122 }
123
124 return length;
125 }
126
127 memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
128 {
129 struct pollfd fds[1];
130 short flags= 0;
131 struct timespec timer;
132 memcached_return rc;
133
134 timer.tv_sec= 1;
135 timer.tv_nsec= 0;
136 flags= POLLHUP | POLLERR;
137
138 memset(&fds, 0, sizeof(struct pollfd));
139 fds[0].fd= ptr->hosts[server_key].fd;
140 fds[0].events= flags;
141 fds[0].revents= flags;
142
143 if (poll(fds, 1, ptr->poll_timeout) < 0)
144 rc= MEMCACHED_FAILURE;
145 else
146 rc= MEMCACHED_SUCCESS;
147
148 close(ptr->hosts[server_key].fd);
149
150 return rc;
151 }
152
153 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
154 {
155 size_t sent_length;
156 size_t return_length;
157 char *write_ptr= ptr->hosts[server_key].write_buffer;
158 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
159 unsigned int loop= 1;
160
161 if (ptr->hosts[server_key].write_buffer_offset == 0)
162 return 0;
163
164 return_length= 0;
165 while (write_length)
166 {
167 if (ptr->flags & MEM_NO_BLOCK)
168 {
169 memcached_return rc;
170
171 rc= io_wait(ptr, server_key, 1);
172 if (rc != MEMCACHED_SUCCESS)
173 return -1;
174 }
175
176 sent_length= 0;
177 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
178 {
179 sent_length= sendto(ptr->hosts[server_key].fd,
180 write_ptr, write_length, 0,
181 (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
182 sizeof(struct sockaddr));
183 }
184 else
185 {
186 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
187 write_length)) == -1)
188 {
189 switch (errno)
190 {
191 case ENOBUFS:
192 case EAGAIN:
193 WATCHPOINT;
194 continue;
195 if (loop < 100)
196 {
197 loop++;
198 break;
199 }
200 /* Yes, we want to fall through */
201 default:
202 ptr->cached_errno= errno;
203 return -1;
204 }
205 }
206 }
207
208 write_ptr+= sent_length;
209 write_length-= sent_length;
210 return_length+= sent_length;
211 }
212
213 WATCHPOINT_ASSERT(write_length == 0);
214 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
215 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
216 ptr->hosts[server_key].write_buffer_offset= 0;
217
218 return return_length;
219 }
220
221 /*
222 Eventually we will just kill off the server with the problem.
223 */
224 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
225 {
226 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
227 ptr->hosts[server_key].write_buffer_offset= 0;
228 memcached_quit(ptr);
229 }