Fixed return length issues.
[m6w6/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8 #include <poll.h>
9
10 int io_wait(memcached_st *ptr, unsigned int server_key, unsigned read_or_write)
11 {
12 struct pollfd fds[1];
13 short flags= 0;
14
15 if (read_or_write)
16 flags= POLLOUT | POLLERR;
17 else
18 flags= POLLIN | POLLERR;
19
20 memset(&fds, 0, sizeof(struct pollfd));
21 fds[0].fd= ptr->hosts[server_key].fd;
22 fds[0].events= flags;
23
24 if (poll(fds, 1, -1) < 0)
25 return MEMCACHED_FAILURE;
26
27 return MEMCACHED_SUCCESS;
28 }
29
30 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
31 char *buffer, size_t length)
32 {
33 char *buffer_ptr;
34
35 buffer_ptr= buffer;
36
37 while (length)
38 {
39 if (!ptr->read_buffer_length)
40 {
41 size_t data_read;
42
43 while (1)
44 {
45 if (ptr->flags & MEM_NO_BLOCK)
46 {
47 memcached_return rc;
48
49 rc= io_wait(ptr, server_key, 0);
50 if (rc != MEMCACHED_SUCCESS)
51 return -1;
52 }
53
54 data_read= read(ptr->hosts[server_key].fd,
55 ptr->read_buffer,
56 MEMCACHED_MAX_BUFFER);
57 if (data_read == -1)
58 {
59 switch (errno)
60 {
61 case EAGAIN:
62 break;
63 default:
64 {
65 ptr->my_errno= errno;
66 return -1;
67 }
68 }
69 }
70 else if (data_read)
71 break;
72 /* If zero, just keep looping */
73 }
74
75 ptr->read_buffer_length= data_read;
76 ptr->read_ptr= ptr->read_buffer;
77 }
78
79 *buffer_ptr= *ptr->read_ptr;
80 length--;
81 ptr->read_ptr++;
82 ptr->read_buffer_length--;
83 buffer_ptr++;
84 }
85
86 return (size_t)(buffer_ptr - buffer);
87 }
88
89 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
90 char *buffer, size_t length, char with_flush)
91 {
92 unsigned long long x;
93
94 for (x= 0; x < length; x++)
95 {
96 ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
97 ptr->write_buffer_offset++;
98
99 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
100 {
101 size_t sent_length;
102
103 sent_length= memcached_io_flush(ptr, server_key);
104
105 WATCHPOINT_ASSERT(sent_length == MEMCACHED_MAX_BUFFER);
106 ptr->write_buffer_offset= 0;
107 }
108 }
109
110 if (with_flush)
111 {
112 if (memcached_io_flush(ptr, server_key) == -1)
113 return -1;
114 }
115
116 return length;
117 }
118
119 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
120 {
121 size_t sent_length;
122 size_t return_length;
123 char *write_ptr= ptr->write_buffer;
124 size_t write_length= ptr->write_buffer_offset;
125 unsigned int loop= 1;
126
127 if (ptr->write_buffer_offset == 0)
128 return 0;
129
130 return_length= 0;
131 while (write_length)
132 {
133 if (ptr->flags & MEM_NO_BLOCK)
134 {
135 memcached_return rc;
136
137 rc= io_wait(ptr, server_key, 1);
138 if (rc != MEMCACHED_SUCCESS)
139 return -1;
140 }
141
142 sent_length= 0;
143 if ((ssize_t)(sent_length= write(ptr->hosts[server_key].fd, write_ptr,
144 write_length)) == -1)
145 {
146 switch (errno)
147 {
148 case ENOBUFS:
149 case EAGAIN:
150 WATCHPOINT;
151 continue;
152 if (loop < 100)
153 {
154 loop++;
155 break;
156 }
157 /* Yes, we want to fall through */
158 default:
159 ptr->my_errno= errno;
160 return -1;
161 }
162 }
163
164 write_ptr+= sent_length;
165 write_length-= sent_length;
166 return_length+= sent_length;
167 }
168
169 WATCHPOINT_ASSERT(write_length == 0);
170 WATCHPOINT_ASSERT(return_length == ptr->write_buffer_offset);
171 ptr->write_buffer_offset= 0;
172
173 return return_length;
174 }
175
176 /*
177 Eventually we will just kill off the server with the problem.
178 */
179 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
180 {
181 ptr->write_buffer_offset= 0;
182 memcached_quit(ptr);
183 }