Merge in build trunk.
[m6w6/libmemcached] / example / memcached_light.cc
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include "config.h"
27
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
33
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
37
38 using namespace datadifferential;
39
40 #include <event.h>
41
42 #include <cassert>
43 #include <cerrno>
44 #include <cstdio>
45 #include <cstdlib>
46 #include <cstring>
47 #include <fcntl.h>
48 #include <getopt.h>
49 #include <iostream>
50 #include <sys/types.h>
51 #include <unistd.h>
52
53 extern memcached_binary_protocol_callback_st interface_v0_impl;
54 extern memcached_binary_protocol_callback_st interface_v1_impl;
55
56 static memcached_socket_t server_sockets[1024];
57 static int num_server_sockets= 0;
58
59 struct connection
60 {
61 void *userdata;
62 struct event event;
63 };
64
65 /* The default maximum number of connections... (change with -c) */
66 static int maxconns= 1024;
67
68 static struct connection *socket_userdata_map;
69 static struct event_base *event_base= NULL;
70
71 struct options_st {
72 std::string pid_file;
73 std::string service;
74 std::string log_file;
75 bool is_verbose;
76 bool opt_daemon;
77
78 options_st() :
79 service("9999"),
80 is_verbose(false)
81 {
82 }
83 };
84
85 static options_st global_options;
86
87 /**
88 * Callback for driving a client connection
89 * @param fd the socket for the client socket
90 * @param which identifying the event that occurred (not used)
91 * @param arg the connection structure for the client
92 */
93 static void drive_client(memcached_socket_t fd, short, void *arg)
94 {
95 struct connection *client= (struct connection*)arg;
96 struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata;
97 assert(c != NULL);
98
99 memcached_protocol_event_t events= memcached_protocol_client_work(c);
100 if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
101 {
102 if (global_options.is_verbose)
103 {
104 struct sockaddr_in sin;
105 socklen_t addrlen= sizeof(sin);
106
107 if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1)
108 {
109 std::cout << __FILE__ << ":" << __LINE__
110 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
111 << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port
112 << " fd:" << fd
113 << std::endl;
114 }
115 else
116 {
117 std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl;
118 }
119 }
120
121 memcached_protocol_client_destroy(c);
122 closesocket(fd);
123 }
124 else
125 {
126 short flags = 0;
127 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
128 {
129 flags= EV_WRITE;
130 }
131
132 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
133 {
134 flags|= EV_READ;
135 }
136
137 event_set(&client->event, (intptr_t)fd, flags, drive_client, client);
138 event_base_set(event_base, &client->event);
139
140 if (event_add(&client->event, 0) == -1)
141 {
142 memcached_protocol_client_destroy(c);
143 closesocket(fd);
144 }
145 }
146 }
147
148 /**
149 * Callback for accepting new connections
150 * @param fd the socket for the server socket
151 * @param which identifying the event that occurred (not used)
152 * @param arg the connection structure for the server
153 */
154 static void accept_handler(memcached_socket_t fd, short, void *arg)
155 {
156 struct connection *server= (struct connection *)arg;
157 /* accept new client */
158 struct sockaddr_storage addr;
159 socklen_t addrlen= sizeof(addr);
160 memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
161
162 if (sock == INVALID_SOCKET)
163 {
164 perror("Failed to accept client");
165 }
166
167 #ifndef WIN32
168 if (sock >= maxconns)
169 {
170 closesocket(sock);
171 return ;
172 }
173 #endif
174
175 struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock);
176 if (c == NULL)
177 {
178 closesocket(sock);
179 }
180 else
181 {
182 memcached_protocol_client_set_verbose(c, global_options.is_verbose);
183 struct connection *client = &socket_userdata_map[sock];
184 client->userdata= c;
185
186 event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client);
187 event_base_set(event_base, &client->event);
188 if (event_add(&client->event, 0) == -1)
189 {
190 std::cerr << "Failed to add event for " << sock << std::endl;
191 memcached_protocol_client_destroy(c);
192 closesocket(sock);
193 }
194 }
195 }
196
197 static bool server_socket(util::log_info_st& log_file, const std::string& service)
198 {
199 struct addrinfo *ai;
200 struct addrinfo hints;
201 memset(&hints, 0, sizeof(struct addrinfo));
202
203 hints.ai_flags= AI_PASSIVE;
204 hints.ai_family= AF_UNSPEC;
205 hints.ai_socktype= SOCK_STREAM;
206
207 int error= getaddrinfo("127.0.0.1", service.c_str(), &hints, &ai);
208 if (error != 0)
209 {
210 if (error != EAI_SYSTEM)
211 {
212 std::string buffer("getaddrinfo: ");
213 buffer+= gai_strerror(error);
214 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
215 }
216 else
217 {
218 std::string buffer("getaddrinfo: ");
219 buffer+= strerror(errno);
220 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
221 }
222
223 return false;
224 }
225
226 struct linger ling= {0, 0};
227
228 for (struct addrinfo *next= ai; next; next= next->ai_next)
229 {
230 memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
231 if (sock == INVALID_SOCKET)
232 {
233 std::string buffer("Failed to create socket: ");
234 buffer+= strerror(errno);
235 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
236 continue;
237 }
238
239 int flags;
240 #ifdef WIN32
241 u_long arg = 1;
242 if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
243 {
244 std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl;
245 closesocket(sock);
246 continue;
247 }
248 #else
249 flags= fcntl(sock, F_GETFL, 0);
250 if (flags == -1)
251 {
252 std::string buffer("Failed to get socket flags: ");
253 buffer+= strerror(errno);
254 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
255 closesocket(sock);
256 continue;
257 }
258
259 if ((flags & O_NONBLOCK) != O_NONBLOCK)
260 {
261 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
262 {
263 std::string buffer("Failed to set socket to nonblocking mode: ");
264 buffer+= strerror(errno);
265 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
266 closesocket(sock);
267 continue;
268 }
269 }
270 #endif
271
272 flags= 1;
273 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
274 {
275 std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
276 }
277
278 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
279 {
280 std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl;
281 }
282
283 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
284 {
285 std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl;
286 }
287
288 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
289 {
290 std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl;
291 }
292
293 if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
294 {
295 if (get_socket_errno() != EADDRINUSE)
296 {
297 std::cerr << "bind(): " << strerror(errno) << std::endl;
298 freeaddrinfo(ai);
299 }
300 closesocket(sock);
301 continue;
302 }
303
304 if (listen(sock, 1024) == SOCKET_ERROR)
305 {
306 std::string buffer("listen(): ");
307 buffer+= strerror(errno);
308 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
309 closesocket(sock);
310 continue;
311 }
312
313 if (global_options.is_verbose)
314 {
315 std::string buffer("Listening to: ");
316 buffer+= global_options.service;
317 log_file.write(util::VERBOSE_NOTICE, buffer.c_str());
318 }
319
320 server_sockets[num_server_sockets++]= sock;
321 }
322
323 freeaddrinfo(ai);
324
325 return (num_server_sockets > 0) ? true : false;
326 }
327
328 /**
329 * Convert a command code to a textual string
330 * @param cmd the comcode to convert
331 * @return a textual string with the command or NULL for unknown commands
332 */
333 static const char* comcode2str(uint8_t cmd)
334 {
335 static const char * const text[] = {
336 "GET", "SET", "ADD", "REPLACE", "DELETE",
337 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
338 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
339 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
340 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
341 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
342 };
343
344 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
345 {
346 return text[cmd];
347 }
348
349 return NULL;
350 }
351
352 /**
353 * Print out the command we are about to execute
354 */
355 static void pre_execute(const void *cookie,
356 protocol_binary_request_header *header)
357 {
358 if (global_options.is_verbose)
359 {
360 if (header)
361 {
362 const char *cmd= comcode2str(header->request.opcode);
363 if (cmd != NULL)
364 {
365 std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl;
366 }
367 else
368 {
369 std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl;
370 }
371 }
372 else
373 {
374 std::cout << "pre_execute from " << cookie << std::endl;
375 }
376 }
377 }
378
379 /**
380 * Print out the command we just executed
381 */
382 static void post_execute(const void *cookie,
383 protocol_binary_request_header *header)
384 {
385 if (global_options.is_verbose)
386 {
387 if (header)
388 {
389 const char *cmd= comcode2str(header->request.opcode);
390 if (cmd != NULL)
391 {
392 std::cout << "post_execute from " << cookie << ": " << cmd << std::endl;
393 }
394 else
395 {
396 std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl;
397 }
398 }
399 else
400 {
401 std::cout << "post_execute from " << cookie << std::endl;
402 }
403 }
404 }
405
406 /**
407 * Callback handler for all unknown commands.
408 * Send an unknown command back to the client
409 */
410 static protocol_binary_response_status unknown(const void *cookie,
411 protocol_binary_request_header *header,
412 memcached_binary_protocol_raw_response_handler response_handler)
413 {
414 protocol_binary_response_no_extras response;
415 memset(&response, 0, sizeof(protocol_binary_response_no_extras));
416
417 response.message.header.response.magic= PROTOCOL_BINARY_RES;
418 response.message.header.response.opcode= header->request.opcode;
419 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
420 response.message.header.response.opaque= header->request.opaque;
421
422 return response_handler(cookie, header, (protocol_binary_response_header*)&response);
423 }
424
425 /**
426 * Program entry point. Bind to the specified port(s) and serve clients
427 *
428 * @param argc number of items in the argument vector
429 * @param argv argument vector
430 * @return EXIT_SUCCESS on success, 1 otherwise
431 */
432 int main(int argc, char **argv)
433 {
434 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
435
436 /*
437 * We need to initialize the handlers manually due to a bug in the
438 * warnings generated by struct initialization in gcc (all the way up to 4.4)
439 */
440 initialize_interface_v0_handler();
441 initialize_interface_v1_handler();
442
443 {
444 enum long_option_t {
445 OPT_HELP,
446 OPT_VERBOSE,
447 OPT_DAEMON,
448 OPT_PROTOCOL_VERSION,
449 OPT_VERSION,
450 OPT_PORT,
451 OPT_MAX_CONNECTIONS,
452 OPT_LOGFILE,
453 OPT_PIDFILE
454 };
455
456 static struct option long_options[]=
457 {
458 { "help", no_argument, NULL, OPT_HELP },
459 { "port", required_argument, NULL, OPT_PORT },
460 { "verbose", no_argument, NULL, OPT_VERBOSE },
461 { "daemon", no_argument, NULL, OPT_DAEMON },
462 { "protocol", no_argument, NULL, OPT_PROTOCOL_VERSION },
463 { "version", no_argument, NULL, OPT_VERSION },
464 { "max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS },
465 { "pid-file", required_argument, NULL, OPT_PIDFILE },
466 { "log-file", required_argument, NULL, OPT_LOGFILE },
467 {0, 0, 0, 0}
468 };
469
470 bool opt_help= false;
471 int option_index;
472 bool done= false;
473 while (done == false)
474 {
475 switch (getopt_long(argc, argv, "", long_options, &option_index))
476 {
477 case -1:
478 done= true;
479 break;
480
481 case OPT_PROTOCOL_VERSION:
482 interface= &interface_v1_impl;
483 break;
484
485 case OPT_PIDFILE:
486 global_options.pid_file= optarg;
487 break;
488
489 case OPT_LOGFILE:
490 global_options.log_file= optarg;
491 break;
492
493 case OPT_VERBOSE:
494 global_options.is_verbose= true;
495 break;
496
497 case OPT_VERSION:
498 break;
499
500 case OPT_DAEMON:
501 global_options.opt_daemon= true;
502 break;
503
504 case OPT_PORT:
505 global_options.service= optarg;
506 break;
507
508 case OPT_MAX_CONNECTIONS:
509 maxconns= atoi(optarg);
510 break;
511
512 case OPT_HELP: /* FALLTHROUGH */
513 opt_help= true;
514 break;
515
516 default:
517 {
518 std::cerr << "Unknown option: " << optarg << std::endl;
519 return EXIT_FAILURE;
520 }
521 }
522 }
523
524 if (opt_help)
525 {
526 std::cout << "Usage: " << argv[0] << std::endl;
527 for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++)
528 {
529 std::cout << "\t" << ptr_option->name << std::endl;
530 }
531 return EXIT_SUCCESS;
532 }
533 }
534
535 if (global_options.opt_daemon)
536 {
537 util::daemonize(false, true);
538 }
539
540 if (initialize_storage() == false)
541 {
542 /* Error message already printed */
543 return EXIT_FAILURE;
544 }
545
546 util::Pidfile _pid_file(global_options.pid_file);
547
548 if (_pid_file.create() == false)
549 {
550 std::cerr << "Failed to create pid-file" << _pid_file.error_message() << std::endl;
551 return EXIT_FAILURE;
552 }
553
554 util::log_info_st log_file(argv[0], global_options.log_file, false);
555 log_file.write(util::VERBOSE_NOTICE, "starting log");
556
557 if (server_socket(log_file, global_options.service) == false)
558 {
559 return EXIT_FAILURE;
560 }
561
562 if (num_server_sockets == 0)
563 {
564 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
565 return EXIT_FAILURE;
566 }
567
568 /*
569 * Create and initialize the handles to the protocol handlers. I want
570 * to be able to trace the traffic throught the pre/post handlers, and
571 * set up a common handler for unknown messages
572 */
573 interface->pre_execute= pre_execute;
574 interface->post_execute= post_execute;
575 interface->unknown= unknown;
576
577 struct memcached_protocol_st *protocol_handle;
578 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
579 {
580 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
581 return EXIT_FAILURE;
582 }
583
584 socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection));
585 if (socket_userdata_map == NULL)
586 {
587 log_file.write(util::VERBOSE_ERROR, "Failed to allocate room for connections");
588 return EXIT_FAILURE;
589 }
590
591 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
592 memcached_binary_protocol_set_pedantic(protocol_handle, true);
593
594 event_base= event_init();
595 if (event_base == NULL)
596 {
597 std::cerr << "Failed to create an instance of libevent" << std::endl;
598 return EXIT_FAILURE;
599 }
600
601 for (int xx= 0; xx < num_server_sockets; ++xx)
602 {
603 struct connection *conn= &socket_userdata_map[server_sockets[xx]];
604 conn->userdata= protocol_handle;
605
606 event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST, accept_handler, conn);
607
608 event_base_set(event_base, &conn->event);
609 if (event_add(&conn->event, 0) == -1)
610 {
611 log_file.write(util::VERBOSE_ERROR, "Failed to add event");
612 closesocket(server_sockets[xx]);
613 }
614 }
615
616 if (global_options.opt_daemon)
617 {
618 if (util::daemon_is_ready(true) == false)
619 {
620 log_file.write(util::VERBOSE_ERROR, "Failed for util::daemon_is_ready()");
621 return EXIT_FAILURE;
622 }
623 }
624
625
626 /* Serve all of the clients */
627 switch (event_base_loop(event_base, 0))
628 {
629 case -1:
630 log_file.write(util::VERBOSE_ERROR, "event_base_loop() failed");
631 break;
632
633 case 1:
634 log_file.write(util::VERBOSE_ERROR, "event_base_loop(), no events were registered");
635 break;
636
637 default:
638 break;
639 }
640 log_file.write(util::VERBOSE_NOTICE, "exiting");
641
642 /* NOTREACHED */
643 return EXIT_SUCCESS;
644 }