Non-blocking IO :)
[awesomized/libmemcached] / lib / memcached_io.c
1 /*
2 Basic socket buffered IO
3 */
4
5 #include <memcached.h>
6 #include "memcached_io.h"
7 #include <sys/select.h>
8
9 ssize_t memcached_io_read(memcached_st *ptr, unsigned int server_key,
10 char *buffer, size_t length)
11 {
12 size_t x;
13 char *buffer_ptr;
14
15 buffer_ptr= buffer;
16
17 for (x= 0, buffer_ptr= buffer;
18 x < length; x++)
19 {
20 if (!ptr->read_buffer_length)
21 {
22 if (length > 1)
23 {
24
25 size_t data_read;
26 data_read= recv(ptr->hosts[server_key].fd,
27 buffer_ptr,
28 length - x, 0);
29 if (data_read == -1)
30 {
31 return -1;
32 }
33 if (data_read == 0)
34 return x;
35
36 data_read+= x;
37
38 return data_read;
39 }
40 else
41 {
42 size_t data_read;
43 try_again:
44
45 if (ptr->flags & MEM_NO_BLOCK)
46 {
47 struct timeval local_tv;
48 fd_set set;
49
50 memset(&local_tv, 0, sizeof(struct timeval));
51
52 local_tv.tv_sec= 0;
53 local_tv.tv_usec= 300;
54
55 FD_ZERO(&set);
56 FD_SET(ptr->hosts[server_key].fd, &set);
57
58 select(1, &set, NULL, NULL, &local_tv);
59 }
60
61 data_read= recv(ptr->hosts[server_key].fd,
62 ptr->read_buffer,
63 MEMCACHED_MAX_BUFFER, 0);
64 if (data_read == -1)
65 {
66 if (errno == EAGAIN)
67 goto try_again;
68 return -1;
69 }
70 ptr->read_buffer_length= data_read;
71 ptr->read_ptr= ptr->read_buffer;
72 }
73
74 if (ptr->read_buffer_length == -1)
75 return -1;
76 if (ptr->read_buffer_length == 0)
77 return x;
78 }
79 *buffer_ptr= *ptr->read_ptr;
80 buffer_ptr++;
81 ptr->read_ptr++;
82 ptr->read_buffer_length--;
83 }
84
85 return length;
86 }
87
88 ssize_t memcached_io_write(memcached_st *ptr, unsigned int server_key,
89 char *buffer, size_t length, char with_flush)
90 {
91 unsigned long long x;
92
93 for (x= 0; x < length; x++)
94 {
95 ptr->write_buffer[ptr->write_buffer_offset]= buffer[x];
96 ptr->write_buffer_offset++;
97
98 if (ptr->write_buffer_offset == MEMCACHED_MAX_BUFFER)
99 {
100 size_t sent_length;
101
102 sent_length= memcached_io_flush(ptr, server_key);
103
104 assert(sent_length == MEMCACHED_MAX_BUFFER);
105 ptr->write_buffer_offset= 0;
106 }
107 }
108
109 if (with_flush)
110 memcached_io_flush(ptr, server_key);
111
112 return length;
113 }
114
115 ssize_t memcached_io_flush(memcached_st *ptr, unsigned int server_key)
116 {
117 size_t sent_length;
118
119 if (ptr->write_buffer_offset == 0)
120 return 0;
121
122 if (ptr->flags & MEM_NO_BLOCK)
123 {
124 struct timeval local_tv;
125 fd_set set;
126
127 local_tv.tv_sec= 0;
128 local_tv.tv_usec= 300;
129
130 FD_ZERO(&set);
131 FD_SET(ptr->hosts[server_key].fd, &set);
132
133 select(1, NULL, &set, NULL, &local_tv);
134 }
135 if ((sent_length= send(ptr->hosts[server_key].fd, ptr->write_buffer,
136 ptr->write_buffer_offset, 0)) == -1)
137 {
138 return -1;
139 }
140
141 assert(sent_length == ptr->write_buffer_offset);
142
143 ptr->write_buffer_offset= 0;
144
145 return sent_length;
146 }
147
148 /*
149 Eventually we will just kill off the server with the problem.
150 */
151 void memcached_io_reset(memcached_st *ptr, unsigned int server_key)
152 {
153 ptr->write_buffer_offset= 0;
154 memcached_quit(ptr);
155 }