This is a rewrite of some of the IO code to handle larger loads of set data
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8
9 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
10 char *buffer, size_t length)
11 {
12 size_t x;
13 char *buffer_ptr;
14
15 buffer_ptr= buffer;
16
17 for (x= 0, buffer_ptr= buffer;
18 x < length; x++)
19 {
20 if (!ptr->read_buffer_length)
21 {
22 if (length > 1)
23 {
24
25 size_t data_read;
26 data_read= recv(ptr->hosts[server_key].fd,
27 buffer_ptr,
28 length - x, 0);
29 if (data_read == -1)
30 {
31 return -1;
32 }
33 if (data_read == 0)
34 return x;
35
36 data_read+= x;
37
38 return data_read;
39 }
40 else
41 {
42 size_t data_read;
43 try_again:
44
45 if (ptr->flags & MEM_NO_BLOCK)
46 {
47 struct timeval local_tv;
48 fd_set set;
49
50 memset(&local_tv, 0, sizeof(struct timeval));
51
52 local_tv.tv_sec= 0;
53 local_tv.tv_usec= 300;
54
55 FD_ZERO(&set);
56 FD_SET(ptr->hosts[server_key].fd, &set);
57
58 select(1, &set, NULL, NULL, &local_tv);
59 }
60
61 data_read= recv(ptr->hosts[server_key].fd,
62 ptr->read_buffer,
63 MEMCACHED_MAX_BUFFER, 0);
64 if (data_read == -1)
65 {
66 if (errno == EAGAIN)
67 goto try_again;
68 return -1;
69 }
70 ptr->read_buffer_length= data_read;
71 ptr->read_ptr= ptr->read_buffer;
72 }
73
74 if (ptr->read_buffer_length == -1)
75 return -1;
76 if (ptr->read_buffer_length == 0)
77 return x;
78 }
79 *buffer_ptr= *ptr->read_ptr;
80 buffer_ptr++;
81 ptr->read_ptr++;
82 ptr->read_buffer_length--;
83 }
84
85 return length;
86 }
87
88 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
89 char *buffer, size_t length, char with_flush)
90 {
91 unsigned long long x;
92
93 for (x= 0; x < length; x++)
94 {
95 ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
96 ptr->write_buffer_offset++;
97
98 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
99 {
100 size_t sent_length;
101
102 sent_length= memcached_io_flush(ptr, server_key);
103
104 assert(sent_length == MEMCACHED_MAX_BUFFER);
105 ptr->write_buffer_offset= 0;
106 }
107 }
108
109 if (with_flush)
110 {
111 if (memcached_io_flush(ptr, server_key) == -1)
112 return -1;
113 }
114
115 return length;
116 }
117
118 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
119 {
120 size_t sent_length;
121 char *write_ptr= ptr->write_buffer;
122 size_t write_length= ptr->write_buffer_offset;
123 unsigned int loop= 1;
124
125 if (ptr->write_buffer_offset == 0)
126 return 0;
127
128 while (write_length)
129 {
130 if (ptr->flags & MEM_NO_BLOCK)
131 {
132
133 while (1)
134 {
135 struct timeval local_tv;
136 fd_set set;
137 int select_return;
138
139 local_tv.tv_sec= 0;
140 local_tv.tv_usec= 300 * loop;
141
142 FD_ZERO(&set);
143 FD_SET(ptr->hosts[server_key].fd, &set);
144
145 select_return= select(1, NULL, &set, NULL, &local_tv);
146
147 if (select_return == -1)
148 {
149 ptr->my_errno= errno;
150 return -1;
151 }
152 else if (!select_return)
153 break;
154 }
155 }
156
157 sent_length= 0;
158 if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr,
159 write_length, 0)) == -1)
160 {
161 switch (errno)
162 {
163 case ENOBUFS:
164 case EAGAIN:
165 if (loop < 10)
166 {
167 loop++;
168 break;
169 }
170 /* Yes, we want to fall through */
171 default:
172 ptr->my_errno= errno;
173 return -1;
174 }
175 }
176 else
177 {
178 write_ptr+= sent_length;
179 write_length-= sent_length;
180 }
181 }
182
183 ptr->write_buffer_offset= 0;
184
185 return sent_length;
186 }
187
188 /*
189 Eventually we will just kill off the server with the problem.
190 */
191 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
192 {
193 ptr->write_buffer_offset= 0;
194 memcached_quit(ptr);
195 }