85a27d962b3d4acc046e11c1bc9aca39b046db61
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
16 memcached_return *error);
17
18 static memcached_return io_wait(memcached_st *ptr, unsigned int server_key,
19 memc_read_or_write read_or_write)
20 {
21 struct pollfd fds[1];
22 short flags= 0;
23 int error;
24
25 if (read_or_write == MEM_WRITE) /* write */
26 flags= POLLOUT | POLLERR;
27 else
28 flags= POLLIN | POLLERR;
29
30 memset(&fds, 0, sizeof(struct pollfd));
31 fds[0].fd= ptr->hosts[server_key].fd;
32 fds[0].events= flags;
33
34 error= poll(fds, 1, ptr->poll_timeout);
35
36 if (error == 1)
37 return MEMCACHED_SUCCESS;
38 else if (error == 0)
39 {
40 WATCHPOINT_NUMBER(read_or_write);
41 return MEMCACHED_TIMEOUT;
42 }
43
44 WATCHPOINT;
45 /* Imposssible for anything other then -1 */
46 WATCHPOINT_ASSERT(error == -1);
47 memcached_quit_server(ptr, server_key, 1);
48 return MEMCACHED_FAILURE;
49
50 }
51
52 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
53 char *buffer, size_t length)
54 {
55 char *buffer_ptr;
56
57 buffer_ptr= buffer;
58
59 while (length)
60 {
61 if (!ptr->hosts[server_key].read_buffer_length)
62 {
63 size_t data_read;
64
65 while (1)
66 {
67 data_read= read(ptr->hosts[server_key].fd,
68 ptr->hosts[server_key].read_buffer,
69 MEMCACHED_MAX_BUFFER);
70 if (data_read == -1)
71 {
72 switch (errno)
73 {
74 case EAGAIN:
75 {
76 memcached_return rc;
77 rc= io_wait(ptr, server_key, MEM_READ);
78
79 if (rc == MEMCACHED_SUCCESS)
80 continue;
81
82 memcached_quit_server(ptr, server_key, 1);
83 return -1;
84 }
85 default:
86 {
87 memcached_quit_server(ptr, server_key, 1);
88 ptr->cached_errno= errno;
89 return -1;
90 }
91 }
92 }
93 else if (data_read)
94 break;
95 /* If zero, just keep looping */
96 }
97
98 ptr->hosts[server_key].read_buffer_length= data_read;
99 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
100 }
101
102 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
103 length--;
104 ptr->hosts[server_key].read_ptr++;
105 ptr->hosts[server_key].read_buffer_length--;
106 buffer_ptr++;
107 }
108
109 return (size_t)(buffer_ptr - buffer);
110 }
111
112 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
113 char *buffer, size_t length, char with_flush)
114 {
115 unsigned long long x;
116
117 for (x= 0; x < length; x++)
118 {
119 if (ptr->hosts[server_key].write_ptr == 0)
120 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
121 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
122 *ptr->hosts[server_key].write_ptr= buffer[x];
123 ptr->hosts[server_key].write_ptr++;
124 ptr->hosts[server_key].write_buffer_offset++;
125
126 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
127 {
128 memcached_return rc;
129 size_t sent_length;
130
131 sent_length= io_flush(ptr, server_key, &rc);
132 if (sent_length == -1)
133 return -1;
134
135 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
136 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
137 ptr->hosts[server_key].write_buffer_offset= 0;
138 }
139 }
140
141 if (with_flush)
142 {
143 memcached_return rc;
144 if (io_flush(ptr, server_key, &rc) == -1)
145 return -1;
146 }
147
148 return length;
149 }
150
151 memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
152 {
153 close(ptr->hosts[server_key].fd);
154
155 return MEMCACHED_SUCCESS;
156 }
157
158 static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
159 memcached_return *error)
160 {
161 size_t sent_length;
162 size_t return_length;
163 char *write_ptr= ptr->hosts[server_key].write_buffer;
164 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
165
166 *error= MEMCACHED_SUCCESS;
167
168 if (ptr->hosts[server_key].write_buffer_offset == 0)
169 return 0;
170
171 return_length= 0;
172 while (write_length)
173 {
174 sent_length= 0;
175 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
176 {
177 sent_length= sendto(ptr->hosts[server_key].fd,
178 write_ptr, write_length, 0,
179 (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
180 sizeof(struct sockaddr));
181 }
182 else
183 {
184 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
185 write_length)) == -1)
186 {
187 switch (errno)
188 {
189 case ENOBUFS:
190 continue;
191 case EAGAIN:
192 {
193 memcached_return rc;
194 rc= io_wait(ptr, server_key, MEM_WRITE);
195
196 if (rc == MEMCACHED_SUCCESS)
197 continue;
198
199 memcached_quit_server(ptr, server_key, 1);
200 return -1;
201 }
202 default:
203 memcached_quit_server(ptr, server_key, 1);
204 ptr->cached_errno= errno;
205 *error= MEMCACHED_ERRNO;
206 return -1;
207 }
208 }
209 }
210
211 write_ptr+= sent_length;
212 write_length-= sent_length;
213 return_length+= sent_length;
214 }
215
216 WATCHPOINT_ASSERT(write_length == 0);
217 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
218 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
219 ptr->hosts[server_key].write_buffer_offset= 0;
220
221 return return_length;
222 }
223
224 /*
225 Eventually we will just kill off the server with the problem.
226 */
227 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
228 {
229 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
230 ptr->hosts[server_key].write_buffer_offset= 0;
231 memcached_quit(ptr);
232 }