Fixing a couple of items before releasing.
[m6w6/libmemcached] / libmemcachedprotocol / handler.c
1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Libmemcached library
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are
9 * met:
10 *
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 *
14 * * Redistributions in binary form must reproduce the above
15 * copyright notice, this list of conditions and the following disclaimer
16 * in the documentation and/or other materials provided with the
17 * distribution.
18 *
19 * * The names of its contributors may not be used to endorse or
20 * promote products derived from this software without specific prior
21 * written permission.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
29 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
30 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
33 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 *
35 */
36
37 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
38 #include <libmemcachedprotocol/common.h>
39
40 #include <stdlib.h>
41 #include <sys/types.h>
42 #include <errno.h>
43 #include <stdbool.h>
44 #include <string.h>
45 #include <strings.h>
46 #include <ctype.h>
47 #include <stdio.h>
48
49 /*
50 ** **********************************************************************
51 ** INTERNAL INTERFACE
52 ** **********************************************************************
53 */
54
55 /**
56 * The default function to receive data from the client. This function
57 * just wraps the recv function to receive from a socket.
58 * See man -s3socket recv for more information.
59 *
60 * @param cookie cookie indentifying a client, not used
61 * @param sock socket to read from
62 * @param buf the destination buffer
63 * @param nbytes the number of bytes to read
64 * @return the number of bytes transferred of -1 upon error
65 */
66 static ssize_t default_recv(const void *cookie,
67 memcached_socket_t sock,
68 void *buf,
69 size_t nbytes)
70 {
71 (void)cookie;
72 return recv(sock, buf, nbytes, 0);
73 }
74
75 /**
76 * The default function to send data to the server. This function
77 * just wraps the send function to send through a socket.
78 * See man -s3socket send for more information.
79 *
80 * @param cookie cookie indentifying a client, not used
81 * @param sock socket to send to
82 * @param buf the source buffer
83 * @param nbytes the number of bytes to send
84 * @return the number of bytes transferred of -1 upon error
85 */
86 static ssize_t default_send(const void *cookie,
87 memcached_socket_t fd,
88 const void *buf,
89 size_t nbytes)
90 {
91 (void)cookie;
92 return send(fd, buf, nbytes, MSG_NOSIGNAL);
93 }
94
95 /**
96 * Try to drain the output buffers without blocking
97 *
98 * @param client the client to drain
99 * @return false if an error occured (connection should be shut down)
100 * true otherwise (please note that there may be more data to
101 * left in the buffer to send)
102 */
103 static bool drain_output(struct memcached_protocol_client_st *client)
104 {
105 if (client->is_verbose)
106 {
107 fprintf(stderr, "%s:%d %s mute:%d output:%s length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute,
108 client->output ? "yes" : "no",
109 client->output ? (int)(client->output->nbytes - client->output->offset) : 0);
110 }
111
112 /* Do we have pending data to send? */
113 while (client->output != NULL)
114 {
115 ssize_t len= client->root->send(client,
116 client->sock,
117 client->output->data + client->output->offset,
118 client->output->nbytes - client->output->offset);
119
120 if (len == -1)
121 {
122 if (get_socket_errno() == EWOULDBLOCK)
123 {
124 return true;
125 }
126 else if (get_socket_errno() != EINTR)
127 {
128 client->error= get_socket_errno();
129 return false;
130 }
131 }
132 else
133 {
134 client->output->offset += (size_t)len;
135 if (client->output->offset == client->output->nbytes)
136 {
137 /* This was the complete buffer */
138 struct chunk_st *old= client->output;
139 client->output= client->output->next;
140 if (client->output == NULL)
141 {
142 client->output_tail= NULL;
143 }
144 cache_free(client->root->buffer_cache, old);
145 }
146 }
147 }
148
149 return true;
150 }
151
152 /**
153 * Allocate an output buffer and chain it into the output list
154 *
155 * @param client the client that needs the buffer
156 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
157 */
158 static struct chunk_st *allocate_output_chunk(struct memcached_protocol_client_st *client)
159 {
160 struct chunk_st *ret= cache_alloc(client->root->buffer_cache);
161
162 if (ret == NULL)
163 {
164 return NULL;
165 }
166
167 ret->offset= ret->nbytes= 0;
168 ret->next= NULL;
169 ret->size= CHUNK_BUFFERSIZE;
170 ret->data= (void*)(ret + 1);
171 if (client->output == NULL)
172 {
173 client->output= client->output_tail= ret;
174 }
175 else
176 {
177 client->output_tail->next= ret;
178 client->output_tail= ret;
179 }
180
181 return ret;
182 }
183
184 /**
185 * Spool data into the send-buffer for a client.
186 *
187 * @param client the client to spool the data for
188 * @param data the data to spool
189 * @param length the number of bytes of data to spool
190 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
191 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
192 */
193 static protocol_binary_response_status spool_output(struct memcached_protocol_client_st *client,
194 const void *data,
195 size_t length)
196 {
197 if (client->is_verbose)
198 {
199 fprintf(stderr, "%s:%d %s mute:%d length:%d\n", __FILE__, __LINE__, __func__, (int)client->mute, (int)length);
200 }
201
202 if (client->mute)
203 {
204 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
205 }
206
207 size_t offset= 0;
208
209 struct chunk_st *chunk= client->output;
210 while (offset < length)
211 {
212 if (chunk == NULL || (chunk->size - chunk->nbytes) == 0)
213 {
214 if ((chunk= allocate_output_chunk(client)) == NULL)
215 {
216 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
217 }
218 }
219
220 size_t bulk= length - offset;
221 if (bulk > chunk->size - chunk->nbytes)
222 {
223 bulk= chunk->size - chunk->nbytes;
224 }
225
226 memcpy(chunk->data + chunk->nbytes, data, bulk);
227 chunk->nbytes += bulk;
228 offset += bulk;
229 }
230
231 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
232 }
233
234 /**
235 * Try to determine the protocol used on this connection.
236 * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
237 * be using the binary protocol on the connection. I implemented the support
238 * for the ASCII protocol by wrapping into the simple interface (aka v1),
239 * so the implementors needs to provide an implementation of that interface
240 *
241 */
242 static memcached_protocol_event_t determine_protocol(struct memcached_protocol_client_st *client, ssize_t *length, void **endptr)
243 {
244 if (*client->root->input_buffer == (uint8_t)PROTOCOL_BINARY_REQ)
245 {
246 if (client->is_verbose)
247 {
248 fprintf(stderr, "%s:%d PROTOCOL: memcached_binary_protocol_process_data\n", __FILE__, __LINE__);
249 }
250 client->work= memcached_binary_protocol_process_data;
251 }
252 else if (client->root->callback->interface_version == 1)
253 {
254 if (client->is_verbose)
255 {
256 fprintf(stderr, "%s:%d PROTOCOL: memcached_ascii_protocol_process_data\n", __FILE__, __LINE__);
257 }
258
259 /*
260 * The ASCII protocol can only be used if the implementors provide
261 * an implementation for the version 1 of the interface..
262 *
263 * @todo I should allow the implementors to provide an implementation
264 * for version 0 and 1 at the same time and set the preferred
265 * interface to use...
266 */
267 client->work= memcached_ascii_protocol_process_data;
268 }
269 else
270 {
271 if (client->is_verbose)
272 {
273 fprintf(stderr, "%s:%d PROTOCOL: Unsupported protocol\n", __FILE__, __LINE__);
274 }
275
276 /* Let's just output a warning the way it is supposed to look like
277 * in the ASCII protocol...
278 */
279 const char *err= "CLIENT_ERROR: Unsupported protocol\r\n";
280 client->root->spool(client, err, strlen(err));
281 client->root->drain(client);
282
283 return MEMCACHED_PROTOCOL_ERROR_EVENT; /* Unsupported protocol */
284 }
285
286 return client->work(client, length, endptr);
287 }
288
289 /*
290 ** **********************************************************************
291 ** * PUBLIC INTERFACE
292 ** * See protocol_handler.h for function description
293 ** **********************************************************************
294 */
295 struct memcached_protocol_st *memcached_protocol_create_instance(void)
296 {
297 struct memcached_protocol_st *ret= calloc(1, sizeof(*ret));
298 if (ret != NULL)
299 {
300 ret->recv= default_recv;
301 ret->send= default_send;
302 ret->drain= drain_output;
303 ret->spool= spool_output;
304 ret->input_buffer_size= 1 * 1024 * 1024;
305 ret->input_buffer= malloc(ret->input_buffer_size);
306 if (ret->input_buffer == NULL)
307 {
308 free(ret);
309 ret= NULL;
310
311 return NULL;
312 }
313
314 ret->buffer_cache= cache_create("protocol_handler",
315 CHUNK_BUFFERSIZE + sizeof(struct chunk_st),
316 0, NULL, NULL);
317 if (ret->buffer_cache == NULL)
318 {
319 free(ret->input_buffer);
320 free(ret);
321 }
322 }
323
324 return ret;
325 }
326
327 void memcached_protocol_destroy_instance(struct memcached_protocol_st *instance)
328 {
329 cache_destroy(instance->buffer_cache);
330 free(instance->input_buffer);
331 free(instance);
332 }
333
334 struct memcached_protocol_client_st *memcached_protocol_create_client(struct memcached_protocol_st *instance, memcached_socket_t sock)
335 {
336 struct memcached_protocol_client_st *ret= calloc(1, sizeof(memcached_protocol_client_st));
337 if (ret != NULL)
338 {
339 ret->root= instance;
340 ret->sock= sock;
341 ret->work= determine_protocol;
342 }
343
344 return ret;
345 }
346
347 void memcached_protocol_client_destroy(struct memcached_protocol_client_st *client)
348 {
349 free(client);
350 }
351
352 void memcached_protocol_client_set_verbose(struct memcached_protocol_client_st *client, bool arg)
353 {
354 if (client)
355 {
356 client->is_verbose= arg;
357 }
358 }
359
360 memcached_protocol_event_t memcached_protocol_client_work(struct memcached_protocol_client_st *client)
361 {
362 /* Try to send data and read from the socket */
363 bool more_data= true;
364 do
365 {
366 ssize_t len= client->root->recv(client,
367 client->sock,
368 client->root->input_buffer + client->input_buffer_offset,
369 client->root->input_buffer_size - client->input_buffer_offset);
370
371 if (len > 0)
372 {
373 /* Do we have the complete packet? */
374 if (client->input_buffer_offset > 0)
375 {
376 memcpy(client->root->input_buffer, client->input_buffer,
377 client->input_buffer_offset);
378 len += (ssize_t)client->input_buffer_offset;
379
380 /* @todo use buffer-cache! */
381 free(client->input_buffer);
382 client->input_buffer_offset= 0;
383 }
384
385 void *endptr;
386 memcached_protocol_event_t events= client->work(client, &len, &endptr);
387 if (events == MEMCACHED_PROTOCOL_ERROR_EVENT)
388 {
389 return MEMCACHED_PROTOCOL_ERROR_EVENT;
390 }
391
392 if (len > 0)
393 {
394 /* save the data for later on */
395 /* @todo use buffer-cache */
396 client->input_buffer= malloc((size_t)len);
397 if (client->input_buffer == NULL)
398 {
399 client->error= ENOMEM;
400 return MEMCACHED_PROTOCOL_ERROR_EVENT;
401 }
402 memcpy(client->input_buffer, endptr, (size_t)len);
403 client->input_buffer_offset= (size_t)len;
404 more_data= false;
405 }
406 }
407 else if (len == 0)
408 {
409 /* Connection closed */
410 drain_output(client);
411 return MEMCACHED_PROTOCOL_ERROR_EVENT;
412 }
413 else
414 {
415 if (get_socket_errno() != EWOULDBLOCK)
416 {
417 client->error= get_socket_errno();
418 /* mark this client as terminated! */
419 return MEMCACHED_PROTOCOL_ERROR_EVENT;
420 }
421 more_data= false;
422 }
423 } while (more_data);
424
425 if (!drain_output(client))
426 {
427 return MEMCACHED_PROTOCOL_ERROR_EVENT;
428 }
429
430 memcached_protocol_event_t ret= MEMCACHED_PROTOCOL_READ_EVENT;
431 if (client->output)
432 {
433 ret|= MEMCACHED_PROTOCOL_READ_EVENT;
434 }
435
436 return ret;
437 }