Add in MSG_DONTWAIT for recv().
[awesomized/libmemcached] / libmemcached / protocol / protocol_handler.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include "libmemcached/protocol/common.h"
3
4 #include <stdlib.h>
5 #include <sys/types.h>
6 #include <errno.h>
7 #include <stdbool.h>
8 #include <string.h>
9 #include <strings.h>
10 #include <ctype.h>
11 #include <stdio.h>
12
13 /*
14 ** **********************************************************************
15 ** INTERNAL INTERFACE
16 ** **********************************************************************
17 */
18
19 /**
20 * The default function to receive data from the client. This function
21 * just wraps the recv function to receive from a socket.
22 * See man -s3socket recv for more information.
23 *
24 * @param cookie cookie indentifying a client, not used
25 * @param sock socket to read from
26 * @param buf the destination buffer
27 * @param nbytes the number of bytes to read
28 * @return the number of bytes transferred of -1 upon error
29 */
30 static ssize_t default_recv(const void *cookie,
31 memcached_socket_t sock,
32 void *buf,
33 size_t nbytes)
34 {
35 (void)cookie;
36 return recv(sock, buf, nbytes, 0);
37 }
38
39 /**
40 * The default function to send data to the server. This function
41 * just wraps the send function to send through a socket.
42 * See man -s3socket send for more information.
43 *
44 * @param cookie cookie indentifying a client, not used
45 * @param sock socket to send to
46 * @param buf the source buffer
47 * @param nbytes the number of bytes to send
48 * @return the number of bytes transferred of -1 upon error
49 */
50 static ssize_t default_send(const void *cookie,
51 memcached_socket_t fd,
52 const void *buf,
53 size_t nbytes)
54 {
55 (void)cookie;
56 return send(fd, buf, nbytes, 0);
57 }
58
59 /**
60 * Try to drain the output buffers without blocking
61 *
62 * @param client the client to drain
63 * @return false if an error occured (connection should be shut down)
64 * true otherwise (please note that there may be more data to
65 * left in the buffer to send)
66 */
67 static bool drain_output(struct memcached_protocol_client_st *client)
68 {
69 ssize_t len;
70
71 /* Do we have pending data to send? */
72 while (client->output != NULL)
73 {
74 len= client->root->send(client,
75 client->sock,
76 client->output->data + client->output->offset,
77 client->output->nbytes - client->output->offset);
78
79 if (len == -1)
80 {
81 if (get_socket_errno() == EWOULDBLOCK)
82 {
83 return true;
84 }
85 else if (get_socket_errno() != EINTR)
86 {
87 client->error= get_socket_errno();
88 return false;
89 }
90 }
91 else
92 {
93 client->output->offset += (size_t)len;
94 if (client->output->offset == client->output->nbytes)
95 {
96 /* This was the complete buffer */
97 struct chunk_st *old= client->output;
98 client->output= client->output->next;
99 if (client->output == NULL)
100 {
101 client->output_tail= NULL;
102 }
103 cache_free(client->root->buffer_cache, old);
104 }
105 }
106 }
107
108 return true;
109 }
110
111 /**
112 * Allocate an output buffer and chain it into the output list
113 *
114 * @param client the client that needs the buffer
115 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
116 */
117 static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
118 {
119 struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
120
121 if (ret == NULL)
122 {
123 return NULL;
124 }
125
126 ret->offset= ret->nbytes= 0;
127 ret->next= NULL;
128 ret->size= CHUNK_BUFFERSIZE;
129 ret->data= (void*)(ret + 1);
130 if (client->output == NULL)
131 {
132 client->output= client->output_tail= ret;
133 }
134 else
135 {
136 client->output_tail->next= ret;
137 client->output_tail= ret;
138 }
139
140 return ret;
141 }
142
143 /**
144 * Spool data into the send-buffer for a client.
145 *
146 * @param client the client to spool the data for
147 * @param data the data to spool
148 * @param length the number of bytes of data to spool
149 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
150 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
151 */
152 static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
153 const void *data,
154 size_t length)
155 {
156 if (client->mute)
157 {
158 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
159 }
160
161 size_t offset= 0;
162
163 struct chunk_st *chunk= client->output;
164 while (offset < length)
165 {
166 if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
167 {
168 if ((chunk= allocate_output_chunk(client)) == NULL)
169 {
170 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
171 }
172 }
173
174 size_t bulk= length - offset;
175 if (bulk > chunk->size - chunk->nbytes)
176 {
177 bulk= chunk->size - chunk->nbytes;
178 }
179
180 memcpy(chunk->data + chunk->nbytes, data, bulk);
181 chunk->nbytes += bulk;
182 offset += bulk;
183 }
184
185 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
186 }
187
188 /**
189 * Try to determine the protocol used on this connection.
190 * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
191 * be using the binary protocol on the connection. I implemented the support
192 * for the ASCII protocol by wrapping into the simple interface (aka v1),
193 * so the implementors needs to provide an implementation of that interface
194 *
195 */
196 static memcached_protocol_event_t determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
197 {
198 if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
199 {
200 client->work= memcached_binary_protocol_process_data;
201 }
202 else if (client->root->callback->interface_version == 1)
203 {
204 /*
205 * The ASCII protocol can only be used if the implementors provide
206 * an implementation for the version 1 of the interface..
207 *
208 * @todo I should allow the implementors to provide an implementation
209 * for version 0 and 1 at the same time and set the preferred
210 * interface to use...
211 */
212 client->work= memcached_ascii_protocol_process_data;
213 }
214 else
215 {
216 /* Let's just output a warning the way it is supposed to look like
217 * in the ASCII protocol...
218 */
219 const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
220 client->root->spool(client, err, strlen(err));
221 client->root->drain(client);
222 return MEMCACHED_PROTOCOL_ERROR_EVENT; /* Unsupported protocol */
223 }
224
225 return client->work(client, length, endptr);
226 }
227
228 /*
229 ** **********************************************************************
230 ** * PUBLIC INTERFACE
231 ** * See protocol_handler.h for function description
232 ** **********************************************************************
233 */
234 struct memcached_protocol_st *memcached_protocol_create_instance(void)
235 {
236 struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
237 if (ret != NULL)
238 {
239 ret->recv= default_recv;
240 ret->send= default_send;
241 ret->drain= drain_output;
242 ret->spool= spool_output;
243 ret->input_buffer_size= 1 * 1024 * 1024;
244 ret->input_buffer= malloc(ret->input_buffer_size);
245 if (ret->input_buffer == NULL)
246 {
247 free(ret);
248 ret= NULL;
249 return NULL;
250 }
251
252 ret->buffer_cache= cache_create("protocol_handler",
253 CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
254 0, NULL, NULL);
255 if (ret->buffer_cache == NULL)
256 {
257 free(ret->input_buffer);
258 free(ret);
259 }
260 }
261
262 return ret;
263 }
264
265 void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
266 {
267 cache_destroy(instance->buffer_cache);
268 free(instance->input_buffer);
269 free(instance);
270 }
271
272 struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, memcached_socket_t sock)
273 {
274 struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret));
275 if (ret != NULL)
276 {
277 ret->root= instance;
278 ret->sock= sock;
279 ret->work= determine_protocol;
280 }
281
282 return ret;
283 }
284
285 void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
286 {
287 free(client);
288 }
289
290 memcached_protocol_event_t memcached_protocol_client_work(struct memcached_protocol_client_st *client)
291 {
292 /* Try to send data and read from the socket */
293 bool more_data= true;
294 do
295 {
296 ssize_t len= client->root->recv(client,
297 client->sock,
298 client->root->input_buffer + client->input_buffer_offset,
299 client->root->input_buffer_size - client->input_buffer_offset);
300
301 if (len > 0)
302 {
303 /* Do we have the complete packet? */
304 if (client->input_buffer_offset > 0)
305 {
306 memcpy(client->root->input_buffer, client->input_buffer,
307 client->input_buffer_offset);
308 len += (ssize_t)client->input_buffer_offset;
309
310 /* @todo use buffer-cache! */
311 free(client->input_buffer);
312 client->input_buffer_offset= 0;
313 }
314
315 void *endptr;
316 memcached_protocol_event_t events= client->work(client, &len, &endptr);
317 if (events == MEMCACHED_PROTOCOL_ERROR_EVENT)
318 {
319 return MEMCACHED_PROTOCOL_ERROR_EVENT;
320 }
321
322 if (len > 0)
323 {
324 /* save the data for later on */
325 /* @todo use buffer-cache */
326 client->input_buffer= malloc((size_t)len);
327 if (client->input_buffer == NULL)
328 {
329 client->error= ENOMEM;
330 return MEMCACHED_PROTOCOL_ERROR_EVENT;
331 }
332 memcpy(client->input_buffer, endptr, (size_t)len);
333 client->input_buffer_offset= (size_t)len;
334 more_data= false;
335 }
336 }
337 else if (len == 0)
338 {
339 /* Connection closed */
340 drain_output(client);
341 return MEMCACHED_PROTOCOL_ERROR_EVENT;
342 }
343 else
344 {
345 if (get_socket_errno() != EWOULDBLOCK)
346 {
347 client->error= get_socket_errno();
348 /* mark this client as terminated! */
349 return MEMCACHED_PROTOCOL_ERROR_EVENT;
350 }
351 more_data= false;
352 }
353 } while (more_data);
354
355 if (!drain_output(client))
356 {
357 return MEMCACHED_PROTOCOL_ERROR_EVENT;
358 }
359
360 memcached_protocol_event_t ret= MEMCACHED_PROTOCOL_READ_EVENT;
361 if (client->output)
362 ret|= MEMCACHED_PROTOCOL_READ_EVENT;
363
364 return ret;
365 }