Created a bigger buffer for write/read.
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include "common.h"
6 #include "memcached_io.h"
7 #include <sys/select.h>
8
9 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
10 char *buffer, size_t length)
11 {
12 char *buffer_ptr;
13
14 buffer_ptr= buffer;
15
16 while (length)
17 {
18 if (!ptr->read_buffer_length)
19 {
20 size_t data_read;
21
22 while (1)
23 {
24 if (ptr->flags & MEM_NO_BLOCK)
25 {
26 while (1)
27 {
28 int select_return;
29 struct timeval local_tv;
30 fd_set set;
31
32 memset(&local_tv, 0, sizeof(struct timeval));
33
34 local_tv.tv_sec= 0;
35 local_tv.tv_usec= 300;
36
37 FD_ZERO(&set);
38 FD_SET(ptr->hosts[server_key].fd, &set);
39
40 select_return= select(1, &set, NULL, NULL, &local_tv);
41
42 if (select_return == -1)
43 {
44 ptr->my_errno= errno;
45 return -1;
46 }
47 else if (!select_return)
48 break;
49 }
50 }
51
52 data_read= recv(ptr->hosts[server_key].fd,
53 ptr->read_buffer,
54 MEMCACHED_MAX_BUFFER, 0);
55 if (data_read == -1)
56 {
57 switch (errno)
58 {
59 case EAGAIN:
60 break;
61 default:
62 {
63 ptr->my_errno= errno;
64 return -1;
65 }
66 }
67 }
68 else if (data_read)
69 break;
70 /* If zero, just keep looping */
71 }
72
73 ptr->read_buffer_length= data_read;
74 ptr->read_ptr= ptr->read_buffer;
75 }
76
77 *buffer_ptr= *ptr->read_ptr;
78 length--;
79 ptr->read_ptr++;
80 ptr->read_buffer_length--;
81 buffer_ptr++;
82 }
83
84 return (size_t)(buffer_ptr - buffer);
85 }
86
87 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
88 char *buffer, size_t length, char with_flush)
89 {
90 unsigned long long x;
91
92 for (x= 0; x < length; x++)
93 {
94 ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
95 ptr->write_buffer_offset++;
96
97 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
98 {
99 size_t sent_length;
100
101 sent_length= memcached_io_flush(ptr, server_key);
102
103 assert(sent_length == MEMCACHED_MAX_BUFFER);
104 ptr->write_buffer_offset= 0;
105 }
106 }
107
108 if (with_flush)
109 {
110 if (memcached_io_flush(ptr, server_key) == -1)
111 return -1;
112 }
113
114 return length;
115 }
116
117 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
118 {
119 size_t sent_length;
120 char *write_ptr= ptr->write_buffer;
121 size_t write_length= ptr->write_buffer_offset;
122 unsigned int loop= 1;
123
124 if (ptr->write_buffer_offset == 0)
125 return 0;
126
127 while (write_length)
128 {
129 if (ptr->flags & MEM_NO_BLOCK)
130 {
131
132 while (1)
133 {
134 struct timeval local_tv;
135 fd_set set;
136 int select_return;
137
138 local_tv.tv_sec= 0;
139 local_tv.tv_usec= 300 * loop;
140
141 FD_ZERO(&set);
142 FD_SET(ptr->hosts[server_key].fd, &set);
143
144 select_return= select(1, NULL, &set, NULL, &local_tv);
145
146 if (select_return == -1)
147 {
148 ptr->my_errno= errno;
149 return -1;
150 }
151 else if (!select_return)
152 break;
153 }
154 }
155
156 sent_length= 0;
157 #ifdef orig
158 if ((sent_length= send(ptr->hosts[server_key].fd, write_ptr,
159 write_length, 0)) == -1)
160 #endif
161 if ((sent_length= write(ptr->hosts[server_key].fd, write_ptr,
162 write_length)) == -1)
163 {
164 switch (errno)
165 {
166 case ENOBUFS:
167 case EAGAIN:
168 if (loop < 10)
169 {
170 loop++;
171 break;
172 }
173 /* Yes, we want to fall through */
174 default:
175 ptr->my_errno= errno;
176 return -1;
177 }
178 }
179 else
180 {
181 write_ptr+= sent_length;
182 write_length-= sent_length;
183 }
184 }
185
186 ptr->write_buffer_offset= 0;
187
188 return sent_length;
189 }
190
191 /*
192 Eventually we will just kill off the server with the problem.
193 */
194 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
195 {
196 ptr->write_buffer_offset= 0;
197 memcached_quit(ptr);
198 }