06ac2e9b51862f3929429d754ea12ba30b272075
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write)
11 {
12 struct pollfd fds[1];
13 short flags= 0;
14
15 if (read_or_write)
16 flags= POLLOUT | POLLERR;
17 else
18 flags= POLLIN | POLLERR;
19
20 memset(&fds, 0, sizeof(struct pollfd));
21 fds[0].fd= ptr->hosts[server_key].fd;
22 fds[0].events= flags;
23
24 if (poll(fds, 1, -1) < 0)
25 return MEMCACHED_FAILURE;
26
27 return MEMCACHED_SUCCESS;
28 }
29
30 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
31 char *buffer, size_t length)
32 {
33 char *buffer_ptr;
34
35 buffer_ptr= buffer;
36
37 while (length)
38 {
39 if (!ptr->hosts[server_key].read_buffer_length)
40 {
41 size_t data_read;
42
43 while (1)
44 {
45 if (ptr->flags & MEM_NO_BLOCK)
46 {
47 memcached_return rc;
48
49 rc= io_wait(ptr, server_key, 0);
50 if (rc != MEMCACHED_SUCCESS)
51 return -1;
52 }
53
54 data_read= read(ptr->hosts[server_key].fd,
55 ptr->hosts[server_key].read_buffer,
56 MEMCACHED_MAX_BUFFER);
57 if (data_read == -1)
58 {
59 switch (errno)
60 {
61 case EAGAIN:
62 break;
63 default:
64 {
65 ptr->cached_errno= errno;
66 return -1;
67 }
68 }
69 }
70 else if (data_read)
71 break;
72 /* If zero, just keep looping */
73 }
74
75 ptr->hosts[server_key].read_buffer_length= data_read;
76 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
77 }
78
79 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
80 length--;
81 ptr->hosts[server_key].read_ptr++;
82 ptr->hosts[server_key].read_buffer_length--;
83 buffer_ptr++;
84 }
85
86 return (size_t)(buffer_ptr - buffer);
87 }
88
89 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
90 char *buffer, size_t length, char with_flush)
91 {
92 unsigned long long x;
93
94 for (x= 0; x < length; x++)
95 {
96 if (ptr->hosts[server_key].write_ptr == 0)
97 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
98 WATCHPOINT_ASSERT(ptr->hosts[server_key].write_ptr);
99 *ptr->hosts[server_key].write_ptr= buffer[x];
100 ptr->hosts[server_key].write_ptr++;
101 ptr->hosts[server_key].write_buffer_offset++;
102
103 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
104 {
105 size_t sent_length;
106
107 sent_length= memcached_io_flush(ptr, server_key);
108
109 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
110 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
111 ptr->hosts[server_key].write_buffer_offset= 0;
112 }
113 }
114
115 if (with_flush)
116 {
117 if (memcached_io_flush(ptr, server_key) == -1)
118 return -1;
119 }
120
121 return length;
122 }
123
124 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
125 {
126 size_t sent_length;
127 size_t return_length;
128 char *write_ptr= ptr->hosts[server_key].write_buffer;
129 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
130 unsigned int loop= 1;
131
132 if (ptr->hosts[server_key].write_buffer_offset == 0)
133 return 0;
134
135 return_length= 0;
136 while (write_length)
137 {
138 if (ptr->flags & MEM_NO_BLOCK)
139 {
140 memcached_return rc;
141
142 rc= io_wait(ptr, server_key, 1);
143 if (rc != MEMCACHED_SUCCESS)
144 return -1;
145 }
146
147 sent_length= 0;
148 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
149 {
150 sent_length= sendto(ptr->hosts[server_key].fd,
151 write_ptr, write_length, 0,
152 (struct sockaddr *)&ptr->hosts[server_key].servAddr,
153 sizeof(struct sockaddr));
154 }
155 else
156 {
157 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
158 write_length)) == -1)
159 {
160 switch (errno)
161 {
162 case ENOBUFS:
163 case EAGAIN:
164 WATCHPOINT;
165 continue;
166 if (loop < 100)
167 {
168 loop++;
169 break;
170 }
171 /* Yes, we want to fall through */
172 default:
173 ptr->cached_errno= errno;
174 return -1;
175 }
176 }
177 }
178
179 write_ptr+= sent_length;
180 write_length-= sent_length;
181 return_length+= sent_length;
182 }
183
184 WATCHPOINT_ASSERT(write_length == 0);
185 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
186 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
187 ptr->hosts[server_key].write_buffer_offset= 0;
188
189 return return_length;
190 }
191
192 /*
193 Eventually we will just kill off the server with the problem.
194 */
195 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
196 {
197 ptr->hosts[server_key].write_ptr= ptr->hosts[server_key].write_buffer;
198 ptr->hosts[server_key].write_buffer_offset= 0;
199 memcached_quit(ptr);
200 }