fb90001db940548c385c635c71c671050d4b8c1c
[awesomized/libmemcached] / libmemcached / protocol / protocol_handler.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include "libmemcached/protocol/common.h"
3
4 #include <stdlib.h>
5 #include <sys/types.h>
6 #include <sys/socket.h>
7 #include <errno.h>
8 #include <stdbool.h>
9 #include <string.h>
10 #include <strings.h>
11 #include <ctype.h>
12 #include <stdio.h>
13
14 /*
15 ** **********************************************************************
16 ** INTERNAL INTERFACE
17 ** **********************************************************************
18 */
19
20 /**
21 * The default function to receive data from the client. This function
22 * just wraps the recv function to receive from a socket.
23 * See man -s3socket recv for more information.
24 *
25 * @param cookie cookie indentifying a client, not used
26 * @param sock socket to read from
27 * @param buf the destination buffer
28 * @param nbytes the number of bytes to read
29 * @return the number of bytes transferred of -1 upon error
30 */
31 static ssize_t default_recv(const void *cookie,
32 int sock,
33 void *buf,
34 size_t nbytes)
35 {
36 (void)cookie;
37 return recv(sock, buf, nbytes, 0);
38 }
39
40 /**
41 * The default function to send data to the server. This function
42 * just wraps the send function to send through a socket.
43 * See man -s3socket send for more information.
44 *
45 * @param cookie cookie indentifying a client, not used
46 * @param sock socket to send to
47 * @param buf the source buffer
48 * @param nbytes the number of bytes to send
49 * @return the number of bytes transferred of -1 upon error
50 */
51 static ssize_t default_send(const void *cookie,
52 int fd,
53 const void *buf,
54 size_t nbytes)
55 {
56 (void)cookie;
57 return send(fd, buf, nbytes, 0);
58 }
59
60 /**
61 * Try to drain the output buffers without blocking
62 *
63 * @param client the client to drain
64 * @return false if an error occured (connection should be shut down)
65 * true otherwise (please note that there may be more data to
66 * left in the buffer to send)
67 */
68 static bool drain_output(struct memcached_protocol_client_st *client)
69 {
70 ssize_t len;
71
72 /* Do we have pending data to send? */
73 while (client->output != NULL)
74 {
75 len= client->root->send(client,
76 client->sock,
77 client->output->data + client->output->offset,
78 client->output->nbytes - client->output->offset);
79
80 if (len == -1)
81 {
82 if (errno == EWOULDBLOCK)
83 {
84 return true;
85 }
86 else if (errno != EINTR)
87 {
88 client->error= errno;
89 return false;
90 }
91 }
92 else
93 {
94 client->output->offset += (size_t)len;
95 if (client->output->offset == client->output->nbytes)
96 {
97 /* This was the complete buffer */
98 struct chunk_st *old= client->output;
99 client->output= client->output->next;
100 if (client->output == NULL)
101 {
102 client->output_tail= NULL;
103 }
104 cache_free(client->root->buffer_cache, old);
105 }
106 }
107 }
108
109 return true;
110 }
111
112 /**
113 * Allocate an output buffer and chain it into the output list
114 *
115 * @param client the client that needs the buffer
116 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
117 */
118 static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
119 {
120 struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
121
122 if (ret == NULL)
123 {
124 return NULL;
125 }
126
127 ret->offset= ret->nbytes= 0;
128 ret->next= NULL;
129 ret->size= CHUNK_BUFFERSIZE;
130 ret->data= (void*)(ret + 1);
131 if (client->output == NULL)
132 {
133 client->output= client->output_tail= ret;
134 }
135 else
136 {
137 client->output_tail->next= ret;
138 client->output_tail= ret;
139 }
140
141 return ret;
142 }
143
144 /**
145 * Spool data into the send-buffer for a client.
146 *
147 * @param client the client to spool the data for
148 * @param data the data to spool
149 * @param length the number of bytes of data to spool
150 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
151 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
152 */
153 static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
154 const void *data,
155 size_t length)
156 {
157 if (client->mute)
158 {
159 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
160 }
161
162 size_t offset= 0;
163
164 struct chunk_st *chunk= client->output;
165 while (offset < length)
166 {
167 if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
168 {
169 if ((chunk= allocate_output_chunk(client)) == NULL)
170 {
171 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
172 }
173 }
174
175 size_t bulk= length - offset;
176 if (bulk > chunk->size - chunk->nbytes)
177 {
178 bulk= chunk->size - chunk->nbytes;
179 }
180
181 memcpy(chunk->data + chunk->nbytes, data, bulk);
182 chunk->nbytes += bulk;
183 offset += bulk;
184 }
185
186 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
187 }
188
189 /**
190 * Try to determine the protocol used on this connection.
191 * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
192 * be using the binary protocol on the connection. I implemented the support
193 * for the ASCII protocol by wrapping into the simple interface (aka v1),
194 * so the implementors needs to provide an implementation of that interface
195 *
196 */
197 static memcached_protocol_event_t determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
198 {
199 if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
200 {
201 client->work= memcached_binary_protocol_process_data;
202 }
203 else if (client->root->callback->interface_version == 1)
204 {
205 /*
206 * The ASCII protocol can only be used if the implementors provide
207 * an implementation for the version 1 of the interface..
208 *
209 * @todo I should allow the implementors to provide an implementation
210 * for version 0 and 1 at the same time and set the preferred
211 * interface to use...
212 */
213 client->work= memcached_ascii_protocol_process_data;
214 }
215 else
216 {
217 /* Let's just output a warning the way it is supposed to look like
218 * in the ASCII protocol...
219 */
220 const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
221 client->root->spool(client, err, strlen(err));
222 client->root->drain(client);
223 return MEMCACHED_PROTOCOL_ERROR_EVENT; /* Unsupported protocol */
224 }
225
226 return client->work(client, length, endptr);
227 }
228
229 /*
230 ** **********************************************************************
231 ** * PUBLIC INTERFACE
232 ** * See protocol_handler.h for function description
233 ** **********************************************************************
234 */
235 struct memcached_protocol_st *memcached_protocol_create_instance(void)
236 {
237 struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
238 if (ret != NULL)
239 {
240 ret->recv= default_recv;
241 ret->send= default_send;
242 ret->drain= drain_output;
243 ret->spool= spool_output;
244 ret->input_buffer_size= 1 * 1024 * 1024;
245 ret->input_buffer= malloc(ret->input_buffer_size);
246 if (ret->input_buffer == NULL)
247 {
248 free(ret);
249 ret= NULL;
250 return NULL;
251 }
252
253 ret->buffer_cache= cache_create("protocol_handler",
254 CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
255 0, NULL, NULL);
256 if (ret->buffer_cache == NULL)
257 {
258 free(ret->input_buffer);
259 free(ret);
260 }
261 }
262
263 return ret;
264 }
265
266 void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
267 {
268 cache_destroy(instance->buffer_cache);
269 free(instance->input_buffer);
270 free(instance);
271 }
272
273 struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, int sock)
274 {
275 struct memcached_protocol_client_st *ret= calloc(1, sizeof(*ret));
276 if (ret != NULL)
277 {
278 ret->root= instance;
279 ret->sock= sock;
280 ret->work= determine_protocol;
281 }
282
283 return ret;
284 }
285
286 void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
287 {
288 free(client);
289 }
290
291 memcached_protocol_event_t memcached_protocol_client_work(struct memcached_protocol_client_st *client)
292 {
293 /* Try to send data and read from the socket */
294 bool more_data= true;
295 do
296 {
297 ssize_t len= client->root->recv(client,
298 client->sock,
299 client->root->input_buffer + client->input_buffer_offset,
300 client->root->input_buffer_size - client->input_buffer_offset);
301
302 if (len > 0)
303 {
304 /* Do we have the complete packet? */
305 if (client->input_buffer_offset > 0)
306 {
307 memcpy(client->root->input_buffer, client->input_buffer,
308 client->input_buffer_offset);
309 len += (ssize_t)client->input_buffer_offset;
310
311 /* @todo use buffer-cache! */
312 free(client->input_buffer);
313 client->input_buffer_offset= 0;
314 }
315
316 void *endptr;
317 memcached_protocol_event_t events= client->work(client, &len, &endptr);
318 if (events == MEMCACHED_PROTOCOL_ERROR_EVENT)
319 {
320 return MEMCACHED_PROTOCOL_ERROR_EVENT;
321 }
322
323 if (len > 0)
324 {
325 /* save the data for later on */
326 /* @todo use buffer-cache */
327 client->input_buffer= malloc((size_t)len);
328 if (client->input_buffer == NULL)
329 {
330 client->error= ENOMEM;
331 return MEMCACHED_PROTOCOL_ERROR_EVENT;
332 }
333 memcpy(client->input_buffer, endptr, (size_t)len);
334 client->input_buffer_offset= (size_t)len;
335 more_data= false;
336 }
337 }
338 else if (len == 0)
339 {
340 /* Connection closed */
341 drain_output(client);
342 return MEMCACHED_PROTOCOL_ERROR_EVENT;
343 }
344 else
345 {
346 if (errno != EWOULDBLOCK)
347 {
348 client->error= errno;
349 /* mark this client as terminated! */
350 return MEMCACHED_PROTOCOL_ERROR_EVENT;
351 }
352 more_data= false;
353 }
354 } while (more_data);
355
356 if (!drain_output(client))
357 {
358 return MEMCACHED_PROTOCOL_ERROR_EVENT;
359 }
360
361 memcached_protocol_event_t ret= MEMCACHED_PROTOCOL_READ_EVENT;
362 if (client->output)
363 ret|= MEMCACHED_PROTOCOL_READ_EVENT;
364
365 return ret;
366 }