Merge lp:~tangent-org/libmemcached/1.0-build/ Build: jenkins-Libmemcached-187
[m6w6/libmemcached] / libmemcachedprotocol / handler.c
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are
9 * met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 *
14 * * Redistributions in binary form must reproduce the above
15 * copyright notice, this list of conditions and the following disclaimer
16 * in the documentation and/or other materials provided with the
17 * distribution.
18 *
19 * * The names of its contributors may not be used to endorse or
20 * promote products derived from this software without specific prior
21 * written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
29 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
30 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
33 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *
35 */
36
37 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
38 #include <libmemcachedprotocol/common.h>
39
40 #include <stdlib.h>
41 #include <sys/types.h>
42 #include <errno.h>
43 #include <stdbool.h>
44 #include <string.h>
45 #include <strings.h>
46 #include <ctype.h>
47 #include <stdio.h>
48
49 #include <sys/types.h>
50 #include <sys/socket.h>
51
52 /*
53 ** **********************************************************************
54 ** INTERNAL INTERFACE
55 ** **********************************************************************
56 */
57
58 /**
59 * The default function to receive data from the client. This function
60 * just wraps the recv function to receive from a socket.
61 * See man -s3socket recv for more information.
62 *
63 * @param cookie cookie indentifying a client, not used
64 * @param sock socket to read from
65 * @param buf the destination buffer
66 * @param nbytes the number of bytes to read
67 * @return the number of bytes transferred of -1 upon error
68 */
69 static ssize_t default_recv(const void *cookie,
70 memcached_socket_t sock,
71 void *buf,
72 size_t nbytes)
73 {
74 (void)cookie;
75 return recv(sock, buf, nbytes, 0);
76 }
77
78 /**
79 * The default function to send data to the server. This function
80 * just wraps the send function to send through a socket.
81 * See man -s3socket send for more information.
82 *
83 * @param cookie cookie indentifying a client, not used
84 * @param sock socket to send to
85 * @param buf the source buffer
86 * @param nbytes the number of bytes to send
87 * @return the number of bytes transferred of -1 upon error
88 */
89 static ssize_t default_send(const void *cookie,
90 memcached_socket_t fd,
91 const void *buf,
92 size_t nbytes)
93 {
94 (void)cookie;
95 return send(fd, buf, nbytes, MSG_NOSIGNAL);
96 }
97
98 /**
99 * Try to drain the output buffers without blocking
100 *
101 * @param client the client to drain
102 * @return false if an error occured (connection should be shut down)
103 * true otherwise (please note that there may be more data to
104 * left in the buffer to send)
105 */
106 static bool drain_output(struct memcached_protocol_client_st *client)
107 {
108 if (client->is_verbose)
109 {
110 fprintf(stderr, "%s:%d %s mute:%d output:%s length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute,
111 client->output ? "yes" : "no",
112 client->output ? (int)(client->output->nbytes - client->output->offset) : 0);
113 }
114
115 /* Do we have pending data to send? */
116 while (client->output != NULL)
117 {
118 ssize_t len= client->root->send(client,
119 client->sock,
120 client->output->data + client->output->offset,
121 client->output->nbytes - client->output->offset);
122
123 if (len == -1)
124 {
125 if (get_socket_errno() == EWOULDBLOCK)
126 {
127 return true;
128 }
129 else if (get_socket_errno() != EINTR)
130 {
131 client->error= get_socket_errno();
132 return false;
133 }
134 }
135 else
136 {
137 client->output->offset += (size_t)len;
138 if (client->output->offset == client->output->nbytes)
139 {
140 /* This was the complete buffer */
141 struct chunk_st *old= client->output;
142 client->output= client->output->next;
143 if (client->output == NULL)
144 {
145 client->output_tail= NULL;
146 }
147 cache_free(client->root->buffer_cache, old);
148 }
149 }
150 }
151
152 return true;
153 }
154
155 /**
156 * Allocate an output buffer and chain it into the output list
157 *
158 * @param client the client that needs the buffer
159 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
160 */
161 static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
162 {
163 struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
164
165 if (ret == NULL)
166 {
167 return NULL;
168 }
169
170 ret->offset= ret->nbytes= 0;
171 ret->next= NULL;
172 ret->size= CHUNK_BUFFERSIZE;
173 ret->data= (void*)(ret + 1);
174 if (client->output == NULL)
175 {
176 client->output= client->output_tail= ret;
177 }
178 else
179 {
180 client->output_tail->next= ret;
181 client->output_tail= ret;
182 }
183
184 return ret;
185 }
186
187 /**
188 * Spool data into the send-buffer for a client.
189 *
190 * @param client the client to spool the data for
191 * @param data the data to spool
192 * @param length the number of bytes of data to spool
193 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
194 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
195 */
196 static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
197 const void *data,
198 size_t length)
199 {
200 if (client->is_verbose)
201 {
202 fprintf(stderr, "%s:%d %s mute:%d length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute, (int)length);
203 }
204
205 if (client->mute)
206 {
207 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
208 }
209
210 size_t offset= 0;
211
212 struct chunk_st *chunk= client->output;
213 while (offset < length)
214 {
215 if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
216 {
217 if ((chunk= allocate_output_chunk(client)) == NULL)
218 {
219 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
220 }
221 }
222
223 size_t bulk= length - offset;
224 if (bulk > chunk->size - chunk->nbytes)
225 {
226 bulk= chunk->size - chunk->nbytes;
227 }
228
229 memcpy(chunk->data + chunk->nbytes, data, bulk);
230 chunk->nbytes += bulk;
231 offset += bulk;
232 }
233
234 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
235 }
236
237 /**
238 * Try to determine the protocol used on this connection.
239 * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
240 * be using the binary protocol on the connection. I implemented the support
241 * for the ASCII protocol by wrapping into the simple interface (aka v1),
242 * so the implementors needs to provide an implementation of that interface
243 *
244 */
245 static memcached_protocol_event_t determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
246 {
247 if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
248 {
249 if (client->is_verbose)
250 {
251 fprintf(stderr, "%s:%d PROTOCOL: memcached_binary_protocol_process_data\n", __FILE__, __LINE__);
252 }
253 client->work= memcached_binary_protocol_process_data;
254 }
255 else if (client->root->callback->interface_version == 1)
256 {
257 if (client->is_verbose)
258 {
259 fprintf(stderr, "%s:%d PROTOCOL: memcached_ascii_protocol_process_data\n", __FILE__, __LINE__);
260 }
261
262 /*
263 * The ASCII protocol can only be used if the implementors provide
264 * an implementation for the version 1 of the interface..
265 *
266 * @todo I should allow the implementors to provide an implementation
267 * for version 0 and 1 at the same time and set the preferred
268 * interface to use...
269 */
270 client->work= memcached_ascii_protocol_process_data;
271 }
272 else
273 {
274 if (client->is_verbose)
275 {
276 fprintf(stderr, "%s:%d PROTOCOL: Unsupported protocol\n", __FILE__, __LINE__);
277 }
278
279 /* Let's just output a warning the way it is supposed to look like
280 * in the ASCII protocol...
281 */
282 const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
283 client->root->spool(client, err, strlen(err));
284 client->root->drain(client);
285
286 return MEMCACHED_PROTOCOL_ERROR_EVENT; /* Unsupported protocol */
287 }
288
289 return client->work(client, length, endptr);
290 }
291
292 /*
293 ** **********************************************************************
294 ** * PUBLIC INTERFACE
295 ** * See protocol_handler.h for function description
296 ** **********************************************************************
297 */
298 struct memcached_protocol_st *memcached_protocol_create_instance(void)
299 {
300 struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
301 if (ret != NULL)
302 {
303 ret->recv= default_recv;
304 ret->send= default_send;
305 ret->drain= drain_output;
306 ret->spool= spool_output;
307 ret->input_buffer_size= 1 * 1024 * 1024;
308 ret->input_buffer= malloc(ret->input_buffer_size);
309 if (ret->input_buffer == NULL)
310 {
311 free(ret);
312 ret= NULL;
313
314 return NULL;
315 }
316
317 ret->buffer_cache= cache_create("protocol_handler",
318 CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
319 0, NULL, NULL);
320 if (ret->buffer_cache == NULL)
321 {
322 free(ret->input_buffer);
323 free(ret);
324 }
325 }
326
327 return ret;
328 }
329
330 void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
331 {
332 cache_destroy(instance->buffer_cache);
333 free(instance->input_buffer);
334 free(instance);
335 }
336
337 struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, memcached_socket_t sock)
338 {
339 struct memcached_protocol_client_st *ret= calloc(1, sizeof(memcached_protocol_client_st));
340 if (ret != NULL)
341 {
342 ret->root= instance;
343 ret->sock= sock;
344 ret->work= determine_protocol;
345 }
346
347 return ret;
348 }
349
350 void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
351 {
352 free(client);
353 }
354
355 void memcached_protocol_client_set_verbose(struct memcached_protocol_client_st *client, bool arg)
356 {
357 if (client)
358 {
359 client->is_verbose= arg;
360 }
361 }
362
363 memcached_protocol_event_t memcached_protocol_client_work(struct memcached_protocol_client_st *client)
364 {
365 /* Try to send data and read from the socket */
366 bool more_data= true;
367 do
368 {
369 ssize_t len= client->root->recv(client,
370 client->sock,
371 client->root->input_buffer + client->input_buffer_offset,
372 client->root->input_buffer_size - client->input_buffer_offset);
373
374 if (len > 0)
375 {
376 /* Do we have the complete packet? */
377 if (client->input_buffer_offset > 0)
378 {
379 memcpy(client->root->input_buffer, client->input_buffer,
380 client->input_buffer_offset);
381 len += (ssize_t)client->input_buffer_offset;
382
383 /* @todo use buffer-cache! */
384 free(client->input_buffer);
385 client->input_buffer_offset= 0;
386 }
387
388 void *endptr;
389 memcached_protocol_event_t events= client->work(client, &len, &endptr);
390 if (events == MEMCACHED_PROTOCOL_ERROR_EVENT)
391 {
392 return MEMCACHED_PROTOCOL_ERROR_EVENT;
393 }
394
395 if (len > 0)
396 {
397 /* save the data for later on */
398 /* @todo use buffer-cache */
399 client->input_buffer= malloc((size_t)len);
400 if (client->input_buffer == NULL)
401 {
402 client->error= ENOMEM;
403 return MEMCACHED_PROTOCOL_ERROR_EVENT;
404 }
405 memcpy(client->input_buffer, endptr, (size_t)len);
406 client->input_buffer_offset= (size_t)len;
407 more_data= false;
408 }
409 }
410 else if (len == 0)
411 {
412 /* Connection closed */
413 drain_output(client);
414 return MEMCACHED_PROTOCOL_ERROR_EVENT;
415 }
416 else
417 {
418 if (get_socket_errno() != EWOULDBLOCK)
419 {
420 client->error= get_socket_errno();
421 /* mark this client as terminated! */
422 return MEMCACHED_PROTOCOL_ERROR_EVENT;
423 }
424 more_data= false;
425 }
426 } while (more_data);
427
428 if (!drain_output(client))
429 {
430 return MEMCACHED_PROTOCOL_ERROR_EVENT;
431 }
432
433 memcached_protocol_event_t ret= MEMCACHED_PROTOCOL_READ_EVENT;
434 if (client->output)
435 {
436 ret|= MEMCACHED_PROTOCOL_READ_EVENT;
437 }
438
439 return ret;
440 }