1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions are
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
14 * * Redistributions in binary form must reproduce the above
15 * copyright notice, this list of conditions and the following disclaimer
16 * in the documentation and/or other materials provided with the
19 * * The names of its contributors may not be used to endorse or
20 * promote products derived from this software without specific prior
23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
24 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
25 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
26 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
27 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
28 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
29 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
30 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
33 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
38 #include <libmemcachedprotocol/common.h>
41 #include <sys/types.h>
49 #include <sys/types.h>
50 #include <sys/socket.h>
53 ** **********************************************************************
55 ** **********************************************************************
59 * The default function to receive data from the client. This function
60 * just wraps the recv function to receive from a socket.
61 * See man -s3socket recv for more information.
63 * @param cookie cookie indentifying a client, not used
64 * @param sock socket to read from
65 * @param buf the destination buffer
66 * @param nbytes the number of bytes to read
67 * @return the number of bytes transferred of -1 upon error
69 static ssize_t
default_recv(const void *cookie
,
70 memcached_socket_t sock
,
75 return recv(sock
, buf
, nbytes
, 0);
79 * The default function to send data to the server. This function
80 * just wraps the send function to send through a socket.
81 * See man -s3socket send for more information.
83 * @param cookie cookie indentifying a client, not used
84 * @param sock socket to send to
85 * @param buf the source buffer
86 * @param nbytes the number of bytes to send
87 * @return the number of bytes transferred of -1 upon error
89 static ssize_t
default_send(const void *cookie
,
90 memcached_socket_t fd
,
95 return send(fd
, buf
, nbytes
, MSG_NOSIGNAL
);
99 * Try to drain the output buffers without blocking
101 * @param client the client to drain
102 * @return false if an error occured (connection should be shut down)
103 * true otherwise (please note that there may be more data to
104 * left in the buffer to send)
106 static bool drain_output(struct memcached_protocol_client_st
*client
)
108 if (client
->is_verbose
)
110 fprintf(stderr
, "%s:%d %s mute:%d output:%s length:%d\n", __FILE__
, __LINE__
, __func__
, (int)client
->mute
,
111 client
->output
? "yes" : "no",
112 client
->output
? (int)(client
->output
->nbytes
- client
->output
->offset
) : 0);
115 /* Do we have pending data to send? */
116 while (client
->output
!= NULL
)
118 ssize_t len
= client
->root
->send(client
,
120 client
->output
->data
+ client
->output
->offset
,
121 client
->output
->nbytes
- client
->output
->offset
);
125 if (get_socket_errno() == EWOULDBLOCK
)
129 else if (get_socket_errno() != EINTR
)
131 client
->error
= get_socket_errno();
137 client
->output
->offset
+= (size_t)len
;
138 if (client
->output
->offset
== client
->output
->nbytes
)
140 /* This was the complete buffer */
141 struct chunk_st
*old
= client
->output
;
142 client
->output
= client
->output
->next
;
143 if (client
->output
== NULL
)
145 client
->output_tail
= NULL
;
147 cache_free(client
->root
->buffer_cache
, old
);
156 * Allocate an output buffer and chain it into the output list
158 * @param client the client that needs the buffer
159 * @return pointer to the new chunk if the allocation succeeds, NULL otherwise
161 static struct chunk_st
*allocate_output_chunk(struct memcached_protocol_client_st
*client
)
163 struct chunk_st
*ret
= cache_alloc(client
->root
->buffer_cache
);
170 ret
->offset
= ret
->nbytes
= 0;
172 ret
->size
= CHUNK_BUFFERSIZE
;
173 ret
->data
= (void*)(ret
+ 1);
174 if (client
->output
== NULL
)
176 client
->output
= client
->output_tail
= ret
;
180 client
->output_tail
->next
= ret
;
181 client
->output_tail
= ret
;
188 * Spool data into the send-buffer for a client.
190 * @param client the client to spool the data for
191 * @param data the data to spool
192 * @param length the number of bytes of data to spool
193 * @return PROTOCOL_BINARY_RESPONSE_SUCCESS if success,
194 * PROTOCOL_BINARY_RESPONSE_ENOMEM if we failed to allocate memory
196 static protocol_binary_response_status
spool_output(struct memcached_protocol_client_st
*client
,
200 if (client
->is_verbose
)
202 fprintf(stderr
, "%s:%d %s mute:%d length:%d\n", __FILE__
, __LINE__
, __func__
, (int)client
->mute
, (int)length
);
207 return PROTOCOL_BINARY_RESPONSE_SUCCESS
;
212 struct chunk_st
*chunk
= client
->output
;
213 while (offset
< length
)
215 if (chunk
== NULL
|| (chunk
->size
- chunk
->nbytes
) == 0)
217 if ((chunk
= allocate_output_chunk(client
)) == NULL
)
219 return PROTOCOL_BINARY_RESPONSE_ENOMEM
;
223 size_t bulk
= length
- offset
;
224 if (bulk
> chunk
->size
- chunk
->nbytes
)
226 bulk
= chunk
->size
- chunk
->nbytes
;
229 memcpy(chunk
->data
+ chunk
->nbytes
, data
, bulk
);
230 chunk
->nbytes
+= bulk
;
234 return PROTOCOL_BINARY_RESPONSE_SUCCESS
;
238 * Try to determine the protocol used on this connection.
239 * If the first byte contains the magic byte PROTOCOL_BINARY_REQ we should
240 * be using the binary protocol on the connection. I implemented the support
241 * for the ASCII protocol by wrapping into the simple interface (aka v1),
242 * so the implementors needs to provide an implementation of that interface
245 static memcached_protocol_event_t
determine_protocol(struct memcached_protocol_client_st
*client
, ssize_t
*length
, void **endptr
)
247 if (*client
->root
->input_buffer
== (uint8_t)PROTOCOL_BINARY_REQ
)
249 if (client
->is_verbose
)
251 fprintf(stderr
, "%s:%d PROTOCOL: memcached_binary_protocol_process_data\n", __FILE__
, __LINE__
);
253 client
->work
= memcached_binary_protocol_process_data
;
255 else if (client
->root
->callback
->interface_version
== 1)
257 if (client
->is_verbose
)
259 fprintf(stderr
, "%s:%d PROTOCOL: memcached_ascii_protocol_process_data\n", __FILE__
, __LINE__
);
263 * The ASCII protocol can only be used if the implementors provide
264 * an implementation for the version 1 of the interface..
266 * @todo I should allow the implementors to provide an implementation
267 * for version 0 and 1 at the same time and set the preferred
268 * interface to use...
270 client
->work
= memcached_ascii_protocol_process_data
;
274 if (client
->is_verbose
)
276 fprintf(stderr
, "%s:%d PROTOCOL: Unsupported protocol\n", __FILE__
, __LINE__
);
279 /* Let's just output a warning the way it is supposed to look like
280 * in the ASCII protocol...
282 const char *err
= "CLIENT_ERROR: Unsupported protocol\r\n";
283 client
->root
->spool(client
, err
, strlen(err
));
284 client
->root
->drain(client
);
286 return MEMCACHED_PROTOCOL_ERROR_EVENT
; /* Unsupported protocol */
289 return client
->work(client
, length
, endptr
);
293 ** **********************************************************************
294 ** * PUBLIC INTERFACE
295 ** * See protocol_handler.h for function description
296 ** **********************************************************************
298 struct memcached_protocol_st
*memcached_protocol_create_instance(void)
300 struct memcached_protocol_st
*ret
= calloc(1, sizeof(*ret
));
303 ret
->recv
= default_recv
;
304 ret
->send
= default_send
;
305 ret
->drain
= drain_output
;
306 ret
->spool
= spool_output
;
307 ret
->input_buffer_size
= 1 * 1024 * 1024;
308 ret
->input_buffer
= malloc(ret
->input_buffer_size
);
309 if (ret
->input_buffer
== NULL
)
317 ret
->buffer_cache
= cache_create("protocol_handler",
318 CHUNK_BUFFERSIZE
+ sizeof(struct chunk_st
),
320 if (ret
->buffer_cache
== NULL
)
322 free(ret
->input_buffer
);
330 void memcached_protocol_destroy_instance(struct memcached_protocol_st
*instance
)
332 cache_destroy(instance
->buffer_cache
);
333 free(instance
->input_buffer
);
337 struct memcached_protocol_client_st
*memcached_protocol_create_client(struct memcached_protocol_st
*instance
, memcached_socket_t sock
)
339 struct memcached_protocol_client_st
*ret
= calloc(1, sizeof(memcached_protocol_client_st
));
344 ret
->work
= determine_protocol
;
350 void memcached_protocol_client_destroy(struct memcached_protocol_client_st
*client
)
355 void memcached_protocol_client_set_verbose(struct memcached_protocol_client_st
*client
, bool arg
)
359 client
->is_verbose
= arg
;
363 memcached_protocol_event_t
memcached_protocol_client_work(struct memcached_protocol_client_st
*client
)
365 /* Try to send data and read from the socket */
366 bool more_data
= true;
369 ssize_t len
= client
->root
->recv(client
,
371 client
->root
->input_buffer
+ client
->input_buffer_offset
,
372 client
->root
->input_buffer_size
- client
->input_buffer_offset
);
376 /* Do we have the complete packet? */
377 if (client
->input_buffer_offset
> 0)
379 memcpy(client
->root
->input_buffer
, client
->input_buffer
,
380 client
->input_buffer_offset
);
381 len
+= (ssize_t
)client
->input_buffer_offset
;
383 /* @todo use buffer-cache! */
384 free(client
->input_buffer
);
385 client
->input_buffer_offset
= 0;
389 memcached_protocol_event_t events
= client
->work(client
, &len
, &endptr
);
390 if (events
== MEMCACHED_PROTOCOL_ERROR_EVENT
)
392 return MEMCACHED_PROTOCOL_ERROR_EVENT
;
397 /* save the data for later on */
398 /* @todo use buffer-cache */
399 client
->input_buffer
= malloc((size_t)len
);
400 if (client
->input_buffer
== NULL
)
402 client
->error
= ENOMEM
;
403 return MEMCACHED_PROTOCOL_ERROR_EVENT
;
405 memcpy(client
->input_buffer
, endptr
, (size_t)len
);
406 client
->input_buffer_offset
= (size_t)len
;
412 /* Connection closed */
413 drain_output(client
);
414 return MEMCACHED_PROTOCOL_ERROR_EVENT
;
418 if (get_socket_errno() != EWOULDBLOCK
)
420 client
->error
= get_socket_errno();
421 /* mark this client as terminated! */
422 return MEMCACHED_PROTOCOL_ERROR_EVENT
;
428 if (!drain_output(client
))
430 return MEMCACHED_PROTOCOL_ERROR_EVENT
;
433 memcached_protocol_event_t ret
= MEMCACHED_PROTOCOL_READ_EVENT
;
436 ret
|= MEMCACHED_PROTOCOL_READ_EVENT
;