c0215ac08f917d83249a0c74dcfebd277d2a894d
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 static int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write)
11 {
12 struct pollfd fds[1];
13 short flags= 0;
14
15 if (read_or_write)
16 flags= POLLOUT | POLLERR;
17 else
18 flags= POLLIN | POLLERR;
19
20 memset(&fds, 0, sizeof(struct pollfd));
21 fds[0].fd= ptr->hosts[server_key].fd;
22 fds[0].events= flags;
23
24 if (poll(fds, 1, -1) < 0)
25 return MEMCACHED_FAILURE;
26
27 return MEMCACHED_SUCCESS;
28 }
29
30 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
31 char *buffer, size_t length)
32 {
33 char *buffer_ptr;
34
35 buffer_ptr= buffer;
36
37 while (length)
38 {
39 if (!ptr->hosts[server_key].read_buffer_length)
40 {
41 size_t data_read;
42
43 while (1)
44 {
45 if (ptr->flags & MEM_NO_BLOCK)
46 {
47 memcached_return rc;
48
49 rc= io_wait(ptr, server_key, 0);
50 if (rc != MEMCACHED_SUCCESS)
51 return -1;
52 }
53
54 data_read= read(ptr->hosts[server_key].fd,
55 ptr->hosts[server_key].read_buffer,
56 MEMCACHED_MAX_BUFFER);
57 if (data_read == -1)
58 {
59 switch (errno)
60 {
61 case EAGAIN:
62 break;
63 default:
64 {
65 ptr->cached_errno= errno;
66 return -1;
67 }
68 }
69 }
70 else if (data_read)
71 break;
72 /* If zero, just keep looping */
73 }
74
75 ptr->hosts[server_key].read_buffer_length= data_read;
76 ptr->hosts[server_key].read_ptr= ptr->hosts[server_key].read_buffer;
77 }
78
79 *buffer_ptr= *ptr->hosts[server_key].read_ptr;
80 length--;
81 ptr->hosts[server_key].read_ptr++;
82 ptr->hosts[server_key].read_buffer_length--;
83 buffer_ptr++;
84 }
85
86 return (size_t)(buffer_ptr - buffer);
87 }
88
89 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
90 char *buffer, size_t length, char with_flush)
91 {
92 unsigned long long x;
93
94 for (x= 0; x < length; x++)
95 {
96 ptr->hosts[server_key].write_buffer[ptr->hosts[server_key].write_buffer_offset]= buffer[x];
97 ptr->hosts[server_key].write_buffer_offset++;
98
99 if (ptr->hosts[server_key].write_buffer_offset == MEMCACHED_MAX_BUFFER)
100 {
101 size_t sent_length;
102
103 sent_length= memcached_io_flush(ptr, server_key);
104
105 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
106 ptr->hosts[server_key].write_buffer_offset= 0;
107 }
108 }
109
110 if (with_flush)
111 {
112 if (memcached_io_flush(ptr, server_key) == -1)
113 return -1;
114 }
115
116 return length;
117 }
118
119 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
120 {
121 size_t sent_length;
122 size_t return_length;
123 char *write_ptr= ptr->hosts[server_key].write_buffer;
124 size_t write_length= ptr->hosts[server_key].write_buffer_offset;
125 unsigned int loop= 1;
126
127 if (ptr->hosts[server_key].write_buffer_offset == 0)
128 return 0;
129
130 return_length= 0;
131 while (write_length)
132 {
133 if (ptr->flags & MEM_NO_BLOCK)
134 {
135 memcached_return rc;
136
137 rc= io_wait(ptr, server_key, 1);
138 if (rc != MEMCACHED_SUCCESS)
139 return -1;
140 }
141
142 sent_length= 0;
143 if (ptr->hosts[server_key].type == MEMCACHED_CONNECTION_UDP)
144 {
145
146 sent_length= sendto(ptr->hosts[server_key].fd, write_ptr, write_length,
147 0, 0, 0);
148 /*
149 rc = sendto(sd, argv[i], strlen(argv[i])+1, 0,
150 (struct sockaddr *) &remoteServAddr,
151 sizeof(remoteServAddr));
152 */
153 }
154 else
155 {
156 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
157 write_length)) == -1)
158 {
159 switch (errno)
160 {
161 case ENOBUFS:
162 case EAGAIN:
163 WATCHPOINT;
164 continue;
165 if (loop < 100)
166 {
167 loop++;
168 break;
169 }
170 /* Yes, we want to fall through */
171 default:
172 ptr->cached_errno= errno;
173 return -1;
174 }
175 }
176 }
177
178 write_ptr+= sent_length;
179 write_length-= sent_length;
180 return_length+= sent_length;
181 }
182
183 WATCHPOINT_ASSERT(write_length == 0);
184 WATCHPOINT_ASSERT(return_length == ptr->hosts[server_key].write_buffer_offset);
185 ptr->hosts[server_key].write_buffer_offset= 0;
186
187 return return_length;
188 }
189
190 /*
191 Eventually we will just kill off the server with the problem.
192 */
193 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
194 {
195 ptr->hosts[server_key].write_buffer_offset= 0;
196 memcached_quit(ptr);
197 }