Merge in trunk
[awesomized/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include "config.h"
27 #include <assert.h>
28 #include <sys/types.h>
29 #include <stdio.h>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <errno.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <event.h>
36
37 #include <libmemcachedprotocol-0.0/handler.h>
38 #include <libmemcached/close_socket.hpp>
39 #include <example/byteorder.h>
40 #include "example/storage.h"
41 #include "example/memcached_light.h"
42
43 extern memcached_binary_protocol_callback_st interface_v0_impl;
44 extern memcached_binary_protocol_callback_st interface_v1_impl;
45
46 static memcached_socket_t server_sockets[1024];
47 static int num_server_sockets= 0;
48
49 struct connection
50 {
51 void *userdata;
52 struct event event;
53 };
54
55 /* The default maximum number of connections... (change with -c) */
56 static int maxconns = 1024;
57
58 static struct connection *socket_userdata_map;
59 static bool verbose= false;
60 static struct event_base *event_base;
61
62 struct options_st {
63 char *pid_file;
64 bool has_port;
65 in_port_t port;
66 } global_options;
67
68 typedef struct options_st options_st;
69
70 /**
71 * Callback for driving a client connection
72 * @param fd the socket for the client socket
73 * @param which identifying the event that occurred (not used)
74 * @param arg the connection structure for the client
75 */
76 static void drive_client(memcached_socket_t fd, short which, void *arg)
77 {
78 (void)which;
79 struct connection *client= arg;
80 struct memcached_protocol_client_st* c= client->userdata;
81 assert(c != NULL);
82
83 memcached_protocol_event_t events= memcached_protocol_client_work(c);
84 if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
85 {
86 memcached_protocol_client_destroy(c);
87 closesocket(fd);
88 } else {
89 short flags = 0;
90 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
91 {
92 flags= EV_WRITE;
93 }
94
95 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
96 {
97 flags|= EV_READ;
98 }
99
100 event_set(&client->event, (intptr_t)fd, flags, drive_client, client);
101 event_base_set(event_base, &client->event);
102
103 if (event_add(&client->event, 0) == -1)
104 {
105 (void)fprintf(stderr, "Failed to add event for %d\n", fd);
106 memcached_protocol_client_destroy(c);
107 closesocket(fd);
108 }
109 }
110 }
111
112 /**
113 * Callback for accepting new connections
114 * @param fd the socket for the server socket
115 * @param which identifying the event that occurred (not used)
116 * @param arg the connection structure for the server
117 */
118 static void accept_handler(memcached_socket_t fd, short which, void *arg)
119 {
120 (void)which;
121 struct connection *server= arg;
122 /* accept new client */
123 struct sockaddr_storage addr;
124 socklen_t addrlen= sizeof(addr);
125 memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
126
127 if (sock == INVALID_SOCKET)
128 {
129 perror("Failed to accept client");
130 return ;
131 }
132
133 #ifndef WIN32
134 if (sock >= maxconns)
135 {
136 (void)fprintf(stderr, "Client outside socket range (specified with -c)\n");
137 closesocket(sock);
138 return ;
139 }
140 #endif
141
142 struct memcached_protocol_client_st* c;
143 c= memcached_protocol_create_client(server->userdata, sock);
144 if (c == NULL)
145 {
146 (void)fprintf(stderr, "Failed to create client\n");
147 closesocket(sock);
148 }
149 else
150 {
151 struct connection *client = &socket_userdata_map[sock];
152 client->userdata= c;
153
154 event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client);
155 event_base_set(event_base, &client->event);
156 if (event_add(&client->event, 0) == -1)
157 {
158 (void)fprintf(stderr, "Failed to add event for %d\n", sock);
159 memcached_protocol_client_destroy(c);
160 closesocket(sock);
161 }
162 }
163 }
164
165 /**
166 * Create a socket and bind it to a specific port number
167 * @param port the port number to bind to
168 */
169 static int server_socket(const char *port)
170 {
171 struct addrinfo *ai;
172 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
173 .ai_family= AF_UNSPEC,
174 .ai_socktype= SOCK_STREAM };
175
176 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
177 if (error != 0)
178 {
179 if (error != EAI_SYSTEM)
180 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
181 else
182 perror("getaddrinfo()");
183
184 return 0;
185 }
186
187 struct linger ling= {0, 0};
188
189 for (struct addrinfo *next= ai; next; next= next->ai_next)
190 {
191 memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
192 if (sock == INVALID_SOCKET)
193 {
194 perror("Failed to create socket");
195 continue;
196 }
197
198 int flags;
199 #ifdef WIN32
200 u_long arg = 1;
201 if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
202 {
203 perror("Failed to set nonblocking io");
204 closesocket(sock);
205 continue;
206 }
207 #else
208 flags= fcntl(sock, F_GETFL, 0);
209 if (flags == -1)
210 {
211 perror("Failed to get socket flags");
212 closesocket(sock);
213 continue;
214 }
215
216 if ((flags & O_NONBLOCK) != O_NONBLOCK)
217 {
218 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
219 {
220 perror("Failed to set socket to nonblocking mode");
221 closesocket(sock);
222 continue;
223 }
224 }
225 #endif
226
227 flags= 1;
228 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
229 perror("Failed to set SO_REUSEADDR");
230
231 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
232 perror("Failed to set SO_KEEPALIVE");
233
234 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
235 perror("Failed to set SO_LINGER");
236
237 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
238 perror("Failed to set TCP_NODELAY");
239
240 if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
241 {
242 if (get_socket_errno() != EADDRINUSE)
243 {
244 perror("bind()");
245 freeaddrinfo(ai);
246 }
247 closesocket(sock);
248 continue;
249 }
250
251 if (listen(sock, 1024) == SOCKET_ERROR)
252 {
253 perror("listen()");
254 closesocket(sock);
255 continue;
256 }
257
258 server_sockets[num_server_sockets++]= sock;
259 }
260
261 freeaddrinfo(ai);
262
263 return (num_server_sockets > 0) ? 0 : 1;
264 }
265
266 /**
267 * Convert a command code to a textual string
268 * @param cmd the comcode to convert
269 * @return a textual string with the command or NULL for unknown commands
270 */
271 static const char* comcode2str(uint8_t cmd)
272 {
273 static const char * const text[] = {
274 "GET", "SET", "ADD", "REPLACE", "DELETE",
275 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
276 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
277 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
278 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
279 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
280 };
281
282 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
283 return text[cmd];
284
285 return NULL;
286 }
287
288 /**
289 * Print out the command we are about to execute
290 */
291 static void pre_execute(const void *cookie,
292 protocol_binary_request_header *header)
293 {
294 if (verbose)
295 {
296 const char *cmd= comcode2str(header->request.opcode);
297 if (cmd != NULL)
298 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
299 else
300 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
301 }
302 }
303
304 /**
305 * Print out the command we just executed
306 */
307 static void post_execute(const void *cookie,
308 protocol_binary_request_header *header)
309 {
310 if (verbose)
311 {
312 const char *cmd= comcode2str(header->request.opcode);
313 if (cmd != NULL)
314 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
315 else
316 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
317 }
318 }
319
320 /**
321 * Callback handler for all unknown commands.
322 * Send an unknown command back to the client
323 */
324 static protocol_binary_response_status unknown(const void *cookie,
325 protocol_binary_request_header *header,
326 memcached_binary_protocol_raw_response_handler response_handler)
327 {
328 protocol_binary_response_no_extras response= {
329 .message= {
330 .header.response= {
331 .magic= PROTOCOL_BINARY_RES,
332 .opcode= header->request.opcode,
333 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
334 .opaque= header->request.opaque
335 }
336 }
337 };
338
339 return response_handler(cookie, header, (void*)&response);
340 }
341
342 /**
343 * Program entry point. Bind to the specified port(s) and serve clients
344 *
345 * @param argc number of items in the argument vector
346 * @param argv argument vector
347 * @return EXIT_SUCCESS on success, 1 otherwise
348 */
349 int main(int argc, char **argv)
350 {
351 int cmd;
352 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
353
354 memset(&global_options, 0, sizeof(global_options));
355
356 event_base= event_init();
357 if (event_base == NULL)
358 {
359 fprintf(stderr, "Failed to create an instance of libevent\n");
360 return EXIT_FAILURE;
361 }
362
363 /*
364 * We need to initialize the handlers manually due to a bug in the
365 * warnings generated by struct initialization in gcc (all the way up to 4.4)
366 */
367 initialize_interface_v0_handler();
368
369 while ((cmd= getopt(argc, argv, "v1p:P:?hc:")) != EOF)
370 {
371 switch (cmd) {
372 case '1':
373 interface= &interface_v1_impl;
374 break;
375 case 'P':
376 global_options.pid_file= strdup(optarg);
377 break;
378 case 'p':
379 global_options.has_port= true;
380 (void)server_socket(optarg);
381 break;
382 case 'v':
383 verbose= true;
384 break;
385 case 'c':
386 maxconns= atoi(optarg);
387 break;
388 case 'h': /* FALLTHROUGH */
389 case '?': /* FALLTHROUGH */
390 default:
391 (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n",
392 argv[0]);
393 return EXIT_FAILURE;
394 }
395 }
396
397 if (! initialize_storage())
398 {
399 /* Error message already printed */
400 return EXIT_FAILURE;
401 }
402
403 if (! global_options.has_port)
404 (void)server_socket("9999");
405
406 if (global_options.pid_file)
407 {
408 FILE *pid_file;
409 uint32_t pid;
410
411 pid_file= fopen(global_options.pid_file, "w+");
412
413 if (pid_file == NULL)
414 {
415 perror(strerror(get_socket_errno()));
416 abort();
417 }
418
419 pid= (uint32_t)getpid();
420 fprintf(pid_file, "%u\n", pid);
421 fclose(pid_file);
422 }
423
424 if (num_server_sockets == 0)
425 {
426 fprintf(stderr, "I don't have any server sockets\n");
427 return EXIT_FAILURE;
428 }
429
430 /*
431 * Create and initialize the handles to the protocol handlers. I want
432 * to be able to trace the traffic throught the pre/post handlers, and
433 * set up a common handler for unknown messages
434 */
435 interface->pre_execute= pre_execute;
436 interface->post_execute= post_execute;
437 interface->unknown= unknown;
438
439 struct memcached_protocol_st *protocol_handle;
440 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
441 {
442 fprintf(stderr, "Failed to allocate protocol handle\n");
443 return EXIT_FAILURE;
444 }
445
446 socket_userdata_map= calloc((size_t)(maxconns), sizeof(struct connection));
447 if (socket_userdata_map == NULL)
448 {
449 fprintf(stderr, "Failed to allocate room for connections\n");
450 return EXIT_FAILURE;
451 }
452
453 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
454 memcached_binary_protocol_set_pedantic(protocol_handle, true);
455
456 for (int xx= 0; xx < num_server_sockets; ++xx)
457 {
458 struct connection *conn= &socket_userdata_map[server_sockets[xx]];
459 conn->userdata= protocol_handle;
460 event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST,
461 accept_handler, conn);
462 event_base_set(event_base, &conn->event);
463 if (event_add(&conn->event, 0) == -1)
464 {
465 fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]);
466 closesocket(server_sockets[xx]);
467 }
468 }
469
470 /* Serve all of the clients */
471 event_base_loop(event_base, 0);
472
473 /* NOTREACHED */
474 return EXIT_SUCCESS;
475 }