2 Basic socket buffered IO
6 #include "memcached_io.h"
7 #include <sys/select.h>
15 static ssize_t
io_flush(memcached_server_st
*ptr
, memcached_return
*error
);
17 static memcached_return
io_wait(memcached_server_st
*ptr
,
18 memc_read_or_write read_or_write
)
24 if (read_or_write
== MEM_WRITE
) /* write */
25 flags
= POLLOUT
| POLLERR
;
27 flags
= POLLIN
| POLLERR
;
29 memset(&fds
, 0, sizeof(struct pollfd
));
34 ** We are going to block on write, but at least on Solaris we might block
35 ** on write if we haven't read anything from our input buffer..
36 ** Try to purge the input buffer if we don't do any flow control in the
37 ** application layer (just sending a lot of data etc)
38 ** The test is moved down in the purge function to avoid duplication of
41 if (read_or_write
== MEM_WRITE
)
43 memcached_return rc
=memcached_purge(ptr
);
44 if (rc
!= MEMCACHED_SUCCESS
&& rc
!= MEMCACHED_STORED
)
45 return MEMCACHED_FAILURE
;
48 error
= poll(fds
, 1, ptr
->root
->poll_timeout
);
51 return MEMCACHED_SUCCESS
;
54 return MEMCACHED_TIMEOUT
;
57 /* Imposssible for anything other then -1 */
58 WATCHPOINT_ASSERT(error
== -1);
59 memcached_quit_server(ptr
, 1);
61 return MEMCACHED_FAILURE
;
66 void memcached_io_preread(memcached_st
*ptr
)
72 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
74 if (memcached_server_response_count(ptr
, x
) &&
75 ptr
->hosts
[x
].read_data_length
< MEMCACHED_MAX_BUFFER
)
79 data_read
= read(ptr
->hosts
[x
].fd
,
80 ptr
->hosts
[x
].read_ptr
+ ptr
->hosts
[x
].read_data_length
,
81 MEMCACHED_MAX_BUFFER
- ptr
->hosts
[x
].read_data_length
);
85 ptr
->hosts
[x
].read_buffer_length
+= data_read
;
86 ptr
->hosts
[x
].read_data_length
+= data_read
;
92 ssize_t
memcached_io_read(memcached_server_st
*ptr
,
93 void *buffer
, size_t length
)
101 if (!ptr
->read_buffer_length
)
107 data_read
= read(ptr
->fd
,
109 MEMCACHED_MAX_BUFFER
);
112 else if (data_read
== -1)
114 ptr
->cached_errno
= errno
;
122 rc
= io_wait(ptr
, MEM_READ
);
124 if (rc
== MEMCACHED_SUCCESS
)
130 memcached_quit_server(ptr
, 1);
138 EOF. Any data received so far is incomplete
139 so discard it. This always reads by byte in case of TCP
140 and protocol enforcement happens at memcached_response()
141 looking for '\n'. We do not care for UDB which requests 8 bytes
142 at once. Generally, this means that connection went away. Since
143 for blocking I/O we do not return 0 and for non-blocking case
144 it will return EGAIN if data is not immediatly available.
146 memcached_quit_server(ptr
, 1);
151 ptr
->io_bytes_sent
= 0;
152 ptr
->read_data_length
= data_read
;
153 ptr
->read_buffer_length
= data_read
;
154 ptr
->read_ptr
= ptr
->read_buffer
;
161 difference
= (length
> ptr
->read_buffer_length
) ? ptr
->read_buffer_length
: length
;
163 memcpy(buffer_ptr
, ptr
->read_ptr
, difference
);
164 length
-= difference
;
165 ptr
->read_ptr
+= difference
;
166 ptr
->read_buffer_length
-= difference
;
167 buffer_ptr
+= difference
;
171 *buffer_ptr
= *ptr
->read_ptr
;
173 ptr
->read_buffer_length
--;
179 return (size_t)(buffer_ptr
- (char*)buffer
);
182 ssize_t
memcached_io_write(memcached_server_st
*ptr
,
183 const void *buffer
, size_t length
, char with_flush
)
185 size_t original_length
;
186 const char* buffer_ptr
;
188 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
190 original_length
= length
;
198 should_write
= MEMCACHED_MAX_BUFFER
- ptr
->write_buffer_offset
;
199 write_ptr
= ptr
->write_buffer
+ ptr
->write_buffer_offset
;
201 should_write
= (should_write
< length
) ? should_write
: length
;
203 memcpy(write_ptr
, buffer_ptr
, should_write
);
204 ptr
->write_buffer_offset
+= should_write
;
205 buffer_ptr
+= should_write
;
206 length
-= should_write
;
208 if (ptr
->write_buffer_offset
== MEMCACHED_MAX_BUFFER
)
213 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
214 sent_length
= io_flush(ptr
, &rc
);
215 if (sent_length
== -1)
218 /* If io_flush calls memcached_purge, sent_length may be 0 */
219 if (sent_length
!= 0)
220 WATCHPOINT_ASSERT(sent_length
== MEMCACHED_MAX_BUFFER
);
227 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
228 if (io_flush(ptr
, &rc
) == -1)
232 return original_length
;
235 memcached_return
memcached_io_close(memcached_server_st
*ptr
)
240 return MEMCACHED_SUCCESS
;
242 /* in case of death shutdown to avoid blocking at close() */
245 r
= shutdown(ptr
->fd
, SHUT_RDWR
);
248 if (r
&& errno
!= ENOTCONN
)
250 WATCHPOINT_NUMBER(ptr
->fd
);
251 WATCHPOINT_ERRNO(errno
);
252 WATCHPOINT_ASSERT(errno
);
260 WATCHPOINT_ERRNO(errno
);
263 return MEMCACHED_SUCCESS
;
266 static ssize_t
io_flush(memcached_server_st
*ptr
,
267 memcached_return
*error
)
270 ** We might want to purge the input buffer if we haven't consumed
271 ** any output yet... The test for the limits is the purge is inline
272 ** in the purge function to avoid duplicating the logic..
276 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
277 rc
= memcached_purge(ptr
);
279 if (rc
!= MEMCACHED_SUCCESS
&& rc
!= MEMCACHED_STORED
)
283 size_t return_length
;
284 char *local_write_ptr
= ptr
->write_buffer
;
285 size_t write_length
= ptr
->write_buffer_offset
;
287 *error
= MEMCACHED_SUCCESS
;
289 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
291 if (ptr
->write_buffer_offset
== 0)
294 /* Looking for memory overflows */
295 #if defined(HAVE_DEBUG)
296 if (write_length
== MEMCACHED_MAX_BUFFER
)
297 WATCHPOINT_ASSERT(ptr
->write_buffer
== local_write_ptr
);
298 WATCHPOINT_ASSERT((ptr
->write_buffer
+ MEMCACHED_MAX_BUFFER
) >= (local_write_ptr
+ write_length
));
304 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
305 WATCHPOINT_ASSERT(write_length
> 0);
307 if (ptr
->type
== MEMCACHED_CONNECTION_UDP
)
311 ai
= ptr
->address_info
;
313 /* Crappy test code */
314 char buffer
[HUGE_STRING_LEN
+ 8];
315 memset(buffer
, 0, HUGE_STRING_LEN
+ 8);
316 memcpy (buffer
+8, local_write_ptr
, write_length
);
325 sent_length
= sendto(ptr
->fd
, buffer
, write_length
+ 8, 0,
326 (struct sockaddr
*)ai
->ai_addr
,
328 if (sent_length
== -1)
330 WATCHPOINT_ERRNO(errno
);
331 WATCHPOINT_ASSERT(0);
333 sent_length
-= 8; /* We remove the header */
337 WATCHPOINT_ASSERT(ptr
->fd
!= -1);
338 if ((sent_length
= write(ptr
->fd
, local_write_ptr
,
339 write_length
)) == -1)
348 rc
= io_wait(ptr
, MEM_WRITE
);
350 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_TIMEOUT
)
353 memcached_quit_server(ptr
, 1);
357 memcached_quit_server(ptr
, 1);
358 ptr
->cached_errno
= errno
;
359 *error
= MEMCACHED_ERRNO
;
365 ptr
->io_bytes_sent
+= sent_length
;
367 local_write_ptr
+= sent_length
;
368 write_length
-= sent_length
;
369 return_length
+= sent_length
;
372 WATCHPOINT_ASSERT(write_length
== 0);
373 // Need to study this assert() WATCHPOINT_ASSERT(return_length ==
374 // ptr->write_buffer_offset);
375 ptr
->write_buffer_offset
= 0;
377 return return_length
;
381 Eventually we will just kill off the server with the problem.
383 void memcached_io_reset(memcached_server_st
*ptr
)
385 memcached_quit_server(ptr
, 1);
389 * Read a given number of bytes from the server and place it into a specific
390 * buffer. Reset the IO channel on this server if an error occurs.
392 memcached_return
memcached_safe_read(memcached_server_st
*ptr
,
399 while (offset
< size
)
401 ssize_t nread
= memcached_io_read(ptr
, data
+ offset
, size
- offset
);
404 memcached_io_reset(ptr
);
405 return MEMCACHED_UNKNOWN_READ_FAILURE
;
410 return MEMCACHED_SUCCESS
;