Cleanup of linger call.
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 typedef enum {
11 MEM_READ,
12 MEM_WRITE,
13 } memc_read_or_write;
14
15 static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
16 memcached_return *error);
17
18 static memcached_return io_wait(memcached_st *ptr, unsigned int server_key,
19 memc_read_or_write read_or_write)
20 {
21 struct pollfd fds[1];
22 short flags= 0;
23 int error;
24
25 if (read_or_write == MEM_WRITE) /* write */
26 flags= POLLOUT | POLLERR;
27 else
28 flags= POLLIN | POLLERR;
29
30 memset(&fds, 0, sizeof(struct pollfd));
31 fds[0].fd= ptr->hosts[server_key].fd;
32 fds[0].events= flags;
33
34 error= poll(fds, 1, ptr->poll_timeout);
35
36 if (error == 1)
37 return MEMCACHED_SUCCESS;
38 else if (error == 0)
39 {
40 WATCHPOINT_NUMBER(read_or_write);
41 return MEMCACHED_TIMEOUT;
42 }
43
44 WATCHPOINT;
45 /* Imposssible for anything other then -1 */
46 WATCHPOINT_ASSERT(error == -1);
47 memcached_quit_server(ptr, server_key, 1);
48 return MEMCACHED_FAILURE;
49
50 }
51
52 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
53 char *buffer, size_t length)
54 {
55 char *buffer_ptr;
56
57 buffer_ptr= buffer;
58
59 while (length)
60 {
61 if (!ptr->hosts[server_key].read_buffer_length)
62 {
63 size_t data_read;
64
65 while (1)
66 {
67 data_read= read(ptr->hosts[server_key].fd,
68 ptr->hosts[server_key].read_buffer,
69 MEMCACHED_MAX_BUFFER);
70 if (data_read == -1)
71 {
72 switch (errno)
73 {
74 case EAGAIN:
75 {
76 memcached_return rc;
77 rc= io_wait(ptr, server_key, MEM_READ);
78
79 if (rc == MEMCACHED_SUCCESS)
80 continue;
81
82 memcached_quit_server(ptr, server_key, 1);
83 return -1;
84 }
85 default:
86 {
87 memcached_quit_server(ptr, server_key, 1);
88 ptr->cached_errno= errno;
89 return -1;
90 }
91 }
92 }
93 else if (data_read)
94 break;
95 /* If zero, just keep looping */
96 }
97
98 ptr->hosts[server_key].read_buffer_length= data_read;
99 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
100 }
101
102 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
103 length--;
104 ptr->hosts[server_key].read_ptr++;
105 ptr->hosts[server_key].read_buffer_length--;
106 buffer_ptr++;
107 }
108
109 return (size_t)(buffer_ptr - buffer);
110 }
111
112 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
113 char *buffer, size_t length, char with_flush)
114 {
115 unsigned long long x;
116
117 for (x= 0; x < length; x++)
118 {
119 if (ptr->hosts[server_key].write_ptr == 0)
120 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
121 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
122 *ptr->hosts[server_key].write_ptr= buffer[x];
123 ptr->hosts[server_key].write_ptr++;
124 ptr->hosts[server_key].write_buffer_offset++;
125
126 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
127 {
128 memcached_return rc;
129 size_t sent_length;
130
131 sent_length= io_flush(ptr, server_key, &rc);
132 if (sent_length == -1)
133 return -1;
134
135 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
136 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
137 ptr->hosts[server_key].write_buffer_offset= 0;
138 }
139 }
140
141 if (with_flush)
142 {
143 memcached_return rc;
144 if (io_flush(ptr, server_key, &rc) == -1)
145 return -1;
146 }
147
148 return length;
149 }
150
151 memcached_return memcached_io_close(memcached_st *ptr, unsigned int server_key)
152 {
153 if (ptr->flags & MEM_NO_BLOCK && 0)
154 {
155 int sock_size;
156 int error;
157 socklen_t sock_length;
158
159 error= getsockopt(ptr->hosts[server_key].fd, IPPROTO_TCP, SO_LINGER,
160 &sock_size, &sock_length);
161
162 WATCHPOINT_NUMBER(error);
163 WATCHPOINT_NUMBER(sock_size);
164 }
165
166 close(ptr->hosts[server_key].fd);
167
168 return MEMCACHED_SUCCESS;
169 }
170
171 static ssize_t io_flush(memcached_st *ptr, unsigned int server_key,
172 memcached_return *error)
173 {
174 size_t sent_length;
175 size_t return_length;
176 char *write_ptr= ptr->hosts[server_key].write_buffer;
177 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
178
179 *error= MEMCACHED_SUCCESS;
180
181 if (ptr->hosts[server_key].write_buffer_offset == 0)
182 return 0;
183
184 return_length= 0;
185 while (write_length)
186 {
187 sent_length= 0;
188 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
189 {
190 sent_length= sendto(ptr->hosts[server_key].fd,
191 write_ptr, write_length, 0,
192 (struct sockaddr *)&ptr->hosts[server_key].address_info->ai_addr,
193 sizeof(struct sockaddr));
194 }
195 else
196 {
197 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
198 write_length)) == -1)
199 {
200 switch (errno)
201 {
202 case ENOBUFS:
203 continue;
204 case EAGAIN:
205 {
206 memcached_return rc;
207 rc= io_wait(ptr, server_key, MEM_WRITE);
208
209 if (rc == MEMCACHED_SUCCESS)
210 continue;
211
212 memcached_quit_server(ptr, server_key, 1);
213 return -1;
214 }
215 default:
216 memcached_quit_server(ptr, server_key, 1);
217 ptr->cached_errno= errno;
218 *error= MEMCACHED_ERRNO;
219 return -1;
220 }
221 }
222 }
223
224 write_ptr+= sent_length;
225 write_length-= sent_length;
226 return_length+= sent_length;
227 }
228
229 WATCHPOINT_ASSERT(write_length == 0);
230 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
231 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
232 ptr->hosts[server_key].write_buffer_offset= 0;
233
234 return return_length;
235 }
236
237 /*
238 Eventually we will just kill off the server with the problem.
239 */
240 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
241 {
242 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
243 ptr->hosts[server_key].write_buffer_offset= 0;
244 memcached_quit(ptr);
245 }