Fix for the light server.
[m6w6/libmemcached] / example / memcached_light.cc
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include "config.h"
27
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
33
34 #include <event.h>
35
36 #include <cassert>
37 #include <cerrno>
38 #include <cstdio>
39 #include <cstdlib>
40 #include <cstring>
41 #include <fcntl.h>
42 #include <getopt.h>
43 #include <iostream>
44 #include <sys/types.h>
45 #include <unistd.h>
46
47 extern memcached_binary_protocol_callback_st interface_v0_impl;
48 extern memcached_binary_protocol_callback_st interface_v1_impl;
49
50 static memcached_socket_t server_sockets[1024];
51 static int num_server_sockets= 0;
52
53 struct connection
54 {
55 void *userdata;
56 struct event event;
57 };
58
59 /* The default maximum number of connections... (change with -c) */
60 static int maxconns= 1024;
61
62 static struct connection *socket_userdata_map;
63 static struct event_base *event_base;
64
65 struct options_st {
66 char *pid_file;
67 bool is_verbose;
68
69 options_st() :
70 pid_file(NULL),
71 is_verbose(false)
72 {
73 }
74 };
75
76 static options_st global_options;
77
78 /**
79 * Callback for driving a client connection
80 * @param fd the socket for the client socket
81 * @param which identifying the event that occurred (not used)
82 * @param arg the connection structure for the client
83 */
84 static void drive_client(memcached_socket_t fd, short, void *arg)
85 {
86 struct connection *client= (struct connection*)arg;
87 struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata;
88 assert(c != NULL);
89
90 memcached_protocol_event_t events= memcached_protocol_client_work(c);
91 if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
92 {
93 if (global_options.is_verbose)
94 {
95 struct sockaddr_in sin;
96 socklen_t addrlen= sizeof(sin);
97
98 if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1)
99 {
100 std::cout << __FILE__ << ":" << __LINE__
101 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
102 << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port
103 << " fd:" << fd
104 << std::endl;
105 }
106 else
107 {
108 std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl;
109 }
110 }
111
112 memcached_protocol_client_destroy(c);
113 closesocket(fd);
114 }
115 else
116 {
117 short flags = 0;
118 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
119 {
120 flags= EV_WRITE;
121 }
122
123 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
124 {
125 flags|= EV_READ;
126 }
127
128 event_set(&client->event, (intptr_t)fd, flags, drive_client, client);
129 event_base_set(event_base, &client->event);
130
131 if (event_add(&client->event, 0) == -1)
132 {
133 memcached_protocol_client_destroy(c);
134 closesocket(fd);
135 }
136 }
137 }
138
139 /**
140 * Callback for accepting new connections
141 * @param fd the socket for the server socket
142 * @param which identifying the event that occurred (not used)
143 * @param arg the connection structure for the server
144 */
145 static void accept_handler(memcached_socket_t fd, short, void *arg)
146 {
147 struct connection *server= (struct connection *)arg;
148 /* accept new client */
149 struct sockaddr_storage addr;
150 socklen_t addrlen= sizeof(addr);
151 memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
152
153 if (sock == INVALID_SOCKET)
154 {
155 perror("Failed to accept client");
156 }
157
158 #ifndef WIN32
159 if (sock >= maxconns)
160 {
161 closesocket(sock);
162 return ;
163 }
164 #endif
165
166 struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock);
167 if (c == NULL)
168 {
169 closesocket(sock);
170 }
171 else
172 {
173 memcached_protocol_client_set_verbose(c, global_options.is_verbose);
174 struct connection *client = &socket_userdata_map[sock];
175 client->userdata= c;
176
177 event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client);
178 event_base_set(event_base, &client->event);
179 if (event_add(&client->event, 0) == -1)
180 {
181 std::cerr << "Failed to add event for " << sock << std::endl;
182 memcached_protocol_client_destroy(c);
183 closesocket(sock);
184 }
185 }
186 }
187
188 /**
189 * Create a socket and bind it to a specific port number
190 * @param port the port number to bind to
191 */
192 static int server_socket(const char *port)
193 {
194 struct addrinfo *ai;
195 struct addrinfo hints;
196 memset(&hints, 0, sizeof(struct addrinfo));
197
198 hints.ai_flags= AI_PASSIVE;
199 hints.ai_family= AF_UNSPEC;
200 hints.ai_socktype= SOCK_STREAM;
201
202 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
203 if (error != 0)
204 {
205 if (error != EAI_SYSTEM)
206 {
207 std::cerr << "getaddrinfo(): " << gai_strerror(error) << std::endl;
208 }
209 else
210 {
211 std::cerr << "getaddrinfo(): " << strerror(errno) << std::endl;
212 }
213
214 return 0;
215 }
216
217 struct linger ling= {0, 0};
218
219 for (struct addrinfo *next= ai; next; next= next->ai_next)
220 {
221 memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
222 if (sock == INVALID_SOCKET)
223 {
224 std::cerr << "Failed to create socket: " << strerror(errno) << std::endl;
225 continue;
226 }
227
228 int flags;
229 #ifdef WIN32
230 u_long arg = 1;
231 if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
232 {
233 std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl;
234 closesocket(sock);
235 continue;
236 }
237 #else
238 flags= fcntl(sock, F_GETFL, 0);
239 if (flags == -1)
240 {
241 std::cerr << "Failed to get socket flags: " << strerror(errno) << std::endl;
242 closesocket(sock);
243 continue;
244 }
245
246 if ((flags & O_NONBLOCK) != O_NONBLOCK)
247 {
248 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
249 {
250 std::cerr << "Failed to set socket to nonblocking mode: " << strerror(errno) << std::endl;
251 closesocket(sock);
252 continue;
253 }
254 }
255 #endif
256
257 flags= 1;
258 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
259 {
260 std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
261 }
262
263 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
264 {
265 std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl;
266 }
267
268 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
269 {
270 std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl;
271 }
272
273 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
274 {
275 std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl;
276 }
277
278 if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
279 {
280 if (get_socket_errno() != EADDRINUSE)
281 {
282 std::cerr << "bind(): " << strerror(errno) << std::endl;
283 freeaddrinfo(ai);
284 }
285 closesocket(sock);
286 continue;
287 }
288
289 if (listen(sock, 1024) == SOCKET_ERROR)
290 {
291 std::cerr << "listen(): " << strerror(errno) << std::endl;
292 closesocket(sock);
293 continue;
294 }
295
296 if (global_options.is_verbose)
297 {
298 std::cout << "Listening to " << port << std::endl;
299 }
300
301 server_sockets[num_server_sockets++]= sock;
302 }
303
304 freeaddrinfo(ai);
305
306 return (num_server_sockets > 0) ? 0 : 1;
307 }
308
309 /**
310 * Convert a command code to a textual string
311 * @param cmd the comcode to convert
312 * @return a textual string with the command or NULL for unknown commands
313 */
314 static const char* comcode2str(uint8_t cmd)
315 {
316 static const char * const text[] = {
317 "GET", "SET", "ADD", "REPLACE", "DELETE",
318 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
319 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
320 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
321 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
322 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
323 };
324
325 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
326 {
327 return text[cmd];
328 }
329
330 return NULL;
331 }
332
333 /**
334 * Print out the command we are about to execute
335 */
336 static void pre_execute(const void *cookie,
337 protocol_binary_request_header *header)
338 {
339 if (global_options.is_verbose)
340 {
341 if (header)
342 {
343 const char *cmd= comcode2str(header->request.opcode);
344 if (cmd != NULL)
345 {
346 std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl;
347 }
348 else
349 {
350 std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl;
351 }
352 }
353 else
354 {
355 std::cout << "pre_execute from " << cookie << std::endl;
356 }
357 }
358 }
359
360 /**
361 * Print out the command we just executed
362 */
363 static void post_execute(const void *cookie,
364 protocol_binary_request_header *header)
365 {
366 if (global_options.is_verbose)
367 {
368 if (header)
369 {
370 const char *cmd= comcode2str(header->request.opcode);
371 if (cmd != NULL)
372 {
373 std::cout << "post_execute from " << cookie << ": " << cmd << std::endl;
374 }
375 else
376 {
377 std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl;
378 }
379 }
380 else
381 {
382 std::cout << "post_execute from " << cookie << std::endl;
383 }
384 }
385 }
386
387 /**
388 * Callback handler for all unknown commands.
389 * Send an unknown command back to the client
390 */
391 static protocol_binary_response_status unknown(const void *cookie,
392 protocol_binary_request_header *header,
393 memcached_binary_protocol_raw_response_handler response_handler)
394 {
395 protocol_binary_response_no_extras response;
396 memset(&response, 0, sizeof(protocol_binary_response_no_extras));
397
398 response.message.header.response.magic= PROTOCOL_BINARY_RES;
399 response.message.header.response.opcode= header->request.opcode;
400 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
401 response.message.header.response.opaque= header->request.opaque;
402
403 return response_handler(cookie, header, (protocol_binary_response_header*)&response);
404 }
405
406 /**
407 * Program entry point. Bind to the specified port(s) and serve clients
408 *
409 * @param argc number of items in the argument vector
410 * @param argv argument vector
411 * @return EXIT_SUCCESS on success, 1 otherwise
412 */
413 int main(int argc, char **argv)
414 {
415 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
416
417 event_base= event_init();
418 if (event_base == NULL)
419 {
420 std::cerr << "Failed to create an instance of libevent" << std::endl;
421 return EXIT_FAILURE;
422 }
423
424 /*
425 * We need to initialize the handlers manually due to a bug in the
426 * warnings generated by struct initialization in gcc (all the way up to 4.4)
427 */
428 initialize_interface_v0_handler();
429 initialize_interface_v1_handler();
430
431 {
432 enum long_option_t {
433 OPT_HELP,
434 OPT_VERBOSE,
435 OPT_PROTOCOL_VERSION,
436 OPT_VERSION,
437 OPT_PORT,
438 OPT_MAX_CONNECTIONS,
439 OPT_PIDFILE
440 };
441
442 static struct option long_options[]=
443 {
444 {"help", no_argument, NULL, OPT_HELP},
445 {"port", required_argument, NULL, OPT_PORT},
446 {"verbose", no_argument, NULL, OPT_VERBOSE},
447 {"protocol", required_argument, NULL, OPT_PROTOCOL_VERSION},
448 {"version", no_argument, NULL, OPT_VERSION},
449 {"max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS},
450 {"pid-file", required_argument, NULL, OPT_PIDFILE},
451 {0, 0, 0, 0}
452 };
453
454 int option_index;
455 bool has_port= false;
456 bool done= false;
457 while (done == false)
458 {
459 switch (getopt_long(argc, argv, "", long_options, &option_index))
460 {
461 case -1:
462 done= true;
463 break;
464
465 case OPT_PROTOCOL_VERSION:
466 interface= &interface_v1_impl;
467 break;
468
469 case OPT_PIDFILE:
470 global_options.pid_file= strdup(optarg);
471 break;
472
473 case OPT_VERBOSE:
474 global_options.is_verbose= true;
475 break;
476
477 case OPT_PORT:
478 has_port= true;
479 (void)server_socket(optarg);
480 break;
481
482 case OPT_MAX_CONNECTIONS:
483 maxconns= atoi(optarg);
484 break;
485
486 case OPT_HELP: /* FALLTHROUGH */
487 std::cout << "Usage: " << argv[0] << std::endl;
488 for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++)
489 {
490 std::cout << "\t" << ptr_option->name << std::endl;
491 }
492 return EXIT_SUCCESS;
493
494 default:
495 {
496 std::cerr << "Unknown option: " << optarg << std::endl;
497 return EXIT_FAILURE;
498 }
499 }
500 }
501
502 if (has_port == false)
503 {
504 (void)server_socket("9999");
505 }
506 }
507
508 if (! initialize_storage())
509 {
510 /* Error message already printed */
511 return EXIT_FAILURE;
512 }
513
514 if (global_options.pid_file)
515 {
516 FILE *pid_file;
517 uint32_t pid;
518
519 pid_file= fopen(global_options.pid_file, "w+");
520
521 if (pid_file == NULL)
522 {
523 perror(strerror(get_socket_errno()));
524 abort();
525 }
526
527 pid= (uint32_t)getpid();
528 if (global_options.is_verbose)
529 {
530 std::cout << "pid:" << pid << std::endl;
531 }
532 fclose(pid_file);
533 }
534
535 if (num_server_sockets == 0)
536 {
537 std::cerr << "No server sockets are available" << std::endl;
538 return EXIT_FAILURE;
539 }
540
541 /*
542 * Create and initialize the handles to the protocol handlers. I want
543 * to be able to trace the traffic throught the pre/post handlers, and
544 * set up a common handler for unknown messages
545 */
546 interface->pre_execute= pre_execute;
547 interface->post_execute= post_execute;
548 interface->unknown= unknown;
549
550 struct memcached_protocol_st *protocol_handle;
551 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
552 {
553 std::cerr << "Failed to allocate protocol handle" << std::endl;
554 return EXIT_FAILURE;
555 }
556
557 socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection));
558 if (socket_userdata_map == NULL)
559 {
560 std::cerr << "Failed to allocate room for connections" << std::endl;
561 return EXIT_FAILURE;
562 }
563
564 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
565 memcached_binary_protocol_set_pedantic(protocol_handle, true);
566
567 for (int xx= 0; xx < num_server_sockets; ++xx)
568 {
569 struct connection *conn= &socket_userdata_map[server_sockets[xx]];
570 conn->userdata= protocol_handle;
571
572 event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST, accept_handler, conn);
573
574 event_base_set(event_base, &conn->event);
575 if (event_add(&conn->event, 0) == -1)
576 {
577 std::cerr << "Failed to add event for " << server_sockets[xx] << std::endl;
578 closesocket(server_sockets[xx]);
579 }
580 }
581
582 /* Serve all of the clients */
583 event_base_loop(event_base, 0);
584
585 /* NOTREACHED */
586 return EXIT_SUCCESS;
587 }