Refactor and add support for the ASCII protocol by wrapping the binary protocol
[awesomized/libmemcached] / libmemcached / protocol / protocol_handler.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include "libmemcached/protocol/common.h"
3
4 #include <stdlib.h>
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <errno.h>
8 #include <stdbool.h>
9 #include <string.h>
10 #include <strings.h>
11 #include <ctype.h>
12 #include <stdio.h>
13
14 /*
15 ** **********************************************************************
16 ** INTERNAL INTERFACE
17 ** **********************************************************************
18 */
19
20 /**
21 * The default function to receive data from the client. This function
22 * just wraps the recv function to receive from a socket.
23 * See man -s3socket recv for more information.
24 *
25 * @param cookie cookie indentifying a client, not used
26 * @param sock socket to read from
27 * @param buf the destination buffer
28 * @param nbytes the number of bytes to read
29 * @return the number of bytes transferred of -1 upon error
30 */
31 static ssize_t default_recv(const void *cookie,
32 int sock,
33 void *buf,
34 size_t nbytes)
35 {
36 (void)cookie;
37 return recv(sock, buf, nbytes, 0);
38 }
39
40 /**
41 * The default function to send data to the server. This function
42 * just wraps the send function to send through a socket.
43 * See man -s3socket send for more information.
44 *
45 * @param cookie cookie indentifying a client, not used
46 * @param sock socket to send to
47 * @param buf the source buffer
48 * @param nbytes the number of bytes to send
49 * @return the number of bytes transferred of -1 upon error
50 */
51 static ssize_t default_send(const void *cookie,
52 int fd,
53 const void *buf,
54 size_t nbytes)
55 {
56 (void)cookie;
57 return send(fd, buf, nbytes, 0);
58 }
59
60 /**
61 * Try to drain the output buffers without blocking
62 *
63 * @param client the client to drain
64 * @return false if an error occured (connection should be shut down)
65 * true otherwise (please note that there may be more data to
66 * left in the buffer to send)
67 */
68 static bool drain_output(struct memcached_protocol_client_st *client)
69 {
70 ssize_t len;
71
72 /* Do we have pending data to send? */
73 while (client->output != NULL)
74 {
75 len= client->root->send(client,
76 client->sock,
77 client->output->data + client->output->offset,
78 client->output->nbytes - client->output->offset);
79
80 if (len == -1)
81 {
82 if (errno == EWOULDBLOCK)
83 {
84 return true;
85 }
86 else if (errno != EINTR)
87 {
88 client->error= errno;
89 return false;
90 }
91 }
92 else
93 {
94 client->output->offset += (size_t)len;
95 if (client->output->offset == client->output->nbytes)
96 {
97 /* This was the complete buffer */
98 struct chunk_st *old= client->output;
99 client->output= client->output->next;
100 if (client->output == NULL)
101 {
102 client->output_tail= NULL;
103 }
104 cache_free(client->root->buffer_cache, old);
105 }
106 }
107 }
108
109 return true;
110 }
111
112 /**
113 * Allocate an output buffer and chain it into the output list
114 *
115 * @param client the client that needs the buffer
116 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
117 */
118 static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
119 {
120 struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
121 if (ret == NULL)
122 {
123 return NULL;
124 }
125
126 ret->offset = ret->nbytes = 0;
127 ret->next = NULL;
128 ret->size = CHUNK_BUFFERSIZE;
129 ret->data= (void*)(ret + 1);
130 if (client->output == NULL)
131 {
132 client->output = client->output_tail = ret;
133 }
134 else
135 {
136 client->output_tail->next= ret;
137 client->output_tail= ret;
138 }
139
140 return ret;
141 }
142
143 /**
144 * Spool data into the send-buffer for a client.
145 *
146 * @param client the client to spool the data for
147 * @param data the data to spool
148 * @param length the number of bytes of data to spool
149 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
150 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
151 */
152 static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
153 const void *data,
154 size_t length)
155 {
156 if (client->mute)
157 {
158 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
159 }
160
161 size_t offset = 0;
162
163 struct chunk_st *chunk= client->output;
164 while (offset < length)
165 {
166 if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
167 {
168 if ((chunk= allocate_output_chunk(client)) == NULL)
169 {
170 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
171 }
172 }
173
174 size_t bulk = length - offset;
175 if (bulk > chunk->size - chunk->nbytes)
176 {
177 bulk = chunk->size - chunk->nbytes;
178 }
179
180 memcpy(chunk->data + chunk->nbytes, data, bulk);
181 chunk->nbytes += bulk;
182 offset += bulk;
183 }
184
185 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
186 }
187
188 /**
189 * Try to determine the protocol used on this connection.
190 * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
191 * be using the binary protocol on the connection. I implemented the support
192 * for the ASCII protocol by wrapping into the simple interface (aka v1),
193 * so the implementors needs to provide an implementation of that interface
194 *
195 */
196 static enum MEMCACHED_PROTOCOL_EVENT determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
197 {
198 if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
199 {
200 client->work= memcached_binary_protocol_process_data;
201 }
202 else if (client->root->callback->interface_version == 1)
203 {
204 /*
205 * The ASCII protocol can only be used if the implementors provide
206 * an implementation for the version 1 of the interface..
207 *
208 * @todo I should allow the implementors to provide an implementation
209 * for version 0 and 1 at the same time and set the preferred
210 * interface to use...
211 */
212 client->work= memcached_ascii_protocol_process_data;
213 }
214 else
215 {
216 /* Let's just output a warning the way it is supposed to look like
217 * in the ASCII protocol...
218 */
219 const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
220 client->root->spool(client, err, strlen(err));
221 client->root->drain(client);
222 return ERROR_EVENT; /* Unsupported protocol */
223 }
224
225 return client->work(client, length, endptr);
226 }
227
228 /*
229 ** **********************************************************************
230 ** * PUBLIC INTERFACE
231 ** * See protocol_handler.h for function description
232 ** **********************************************************************
233 */
234 struct memcached_protocol_st *memcached_protocol_create_instance(void)
235 {
236 struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
237 if (ret != NULL)
238 {
239 ret->recv= default_recv;
240 ret->send= default_send;
241 ret->drain= drain_output;
242 ret->spool= spool_output;
243 ret->input_buffer_size= 1 * 1024 * 1024;
244 ret->input_buffer= malloc(ret->input_buffer_size);
245 if (ret->input_buffer == NULL)
246 {
247 free(ret);
248 ret= NULL;
249 return NULL;
250 }
251
252 ret->buffer_cache = cache_create("protocol_handler",
253 CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
254 0, NULL, NULL);
255 if (ret->buffer_cache == NULL) {
256 free(ret->input_buffer);
257 free(ret);
258 }
259 }
260
261 return ret;
262 }
263
264 void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
265 {
266 cache_destroy(instance->buffer_cache);
267 free(instance->input_buffer);
268 free(instance);
269 }
270
271 struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, int sock)
272 {
273 struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret));
274 if (ret != NULL)
275 {
276 ret->root= instance;
277 ret->sock= sock;
278 ret->work= determine_protocol;
279 }
280
281 return ret;
282 }
283
284 void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
285 {
286 free(client);
287 }
288
289 enum MEMCACHED_PROTOCOL_EVENT memcached_protocol_client_work(struct memcached_protocol_client_st *client)
290 {
291 /* Try to send data and read from the socket */
292 bool more_data= true;
293 do
294 {
295 ssize_t len= client->root->recv(client,
296 client->sock,
297 client->root->input_buffer + client->input_buffer_offset,
298 client->root->input_buffer_size - client->input_buffer_offset);
299
300 if (len > 0)
301 {
302 /* Do we have the complete packet? */
303 if (client->input_buffer_offset > 0)
304 {
305 memcpy(client->root->input_buffer, client->input_buffer,
306 client->input_buffer_offset);
307 len += (ssize_t)client->input_buffer_offset;
308
309 /* @todo use buffer-cache! */
310 free(client->input_buffer);
311 client->input_buffer_offset= 0;
312 }
313
314 void *endptr;
315 if (client->work(client, &len, &endptr) == ERROR_EVENT)
316 {
317 return ERROR_EVENT;
318 }
319
320 if (len > 0)
321 {
322 /* save the data for later on */
323 /* @todo use buffer-cache */
324 client->input_buffer= malloc((size_t)len);
325 if (client->input_buffer == NULL)
326 {
327 client->error= ENOMEM;
328 return ERROR_EVENT;
329 }
330 memcpy(client->input_buffer, endptr, (size_t)len);
331 client->input_buffer_offset= (size_t)len;
332 more_data= false;
333 }
334 }
335 else if (len == 0)
336 {
337 /* Connection closed */
338 drain_output(client);
339 return ERROR_EVENT;
340 }
341 else
342 {
343 if (errno != EWOULDBLOCK)
344 {
345 client->error= errno;
346 /* mark this client as terminated! */
347 return ERROR_EVENT;
348 }
349 more_data = false;
350 }
351 } while (more_data);
352
353 if (!drain_output(client))
354 {
355 return ERROR_EVENT;
356 }
357
358 return (client->output) ? READ_WRITE_EVENT : READ_EVENT;
359 }