c5385d75950e60714e7e12ca2c02e3aa474ea495
[m6w6/libmemcached] / example / memcached_light.cc
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.cc - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.cc - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.cc- This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include <mem_config.h>
27
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
33
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
37
38 using namespace datadifferential;
39
40 #include <event.h>
41
42 #include <cassert>
43 #include <cerrno>
44 #include <cstdio>
45 #include <cstdlib>
46 #include <cstring>
47 #include <fcntl.h>
48 #include <getopt.h>
49 #include <iostream>
50 #include <sys/types.h>
51 #include <unistd.h>
52
53 extern memcached_binary_protocol_callback_st interface_v0_impl;
54 extern memcached_binary_protocol_callback_st interface_v1_impl;
55
56 static memcached_socket_t server_sockets[1024];
57 static int num_server_sockets= 0;
58
59 struct connection
60 {
61 void *userdata;
62 struct event event;
63 };
64
65 /* The default maximum number of connections... (change with -c) */
66 static int maxconns= 1024;
67
68 static struct connection *socket_userdata_map;
69 static struct event_base *event_base= NULL;
70
71 struct options_st {
72 std::string pid_file;
73 std::string service;
74 std::string log_file;
75 bool is_verbose;
76 bool opt_daemon;
77
78 options_st() :
79 service("9999"),
80 is_verbose(false),
81 opt_daemon(false)
82 {
83 }
84 };
85
86 static options_st global_options;
87
88 /**
89 * Callback for driving a client connection
90 * @param fd the socket for the client socket
91 * @param which identifying the event that occurred (not used)
92 * @param arg the connection structure for the client
93 */
94 static void drive_client(memcached_socket_t fd, short, void *arg)
95 {
96 struct connection *client= (struct connection*)arg;
97 struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata;
98 assert(c != NULL);
99
100 memcached_protocol_event_t events= memcached_protocol_client_work(c);
101 if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
102 {
103 if (global_options.is_verbose)
104 {
105 struct sockaddr_in sin;
106 socklen_t addrlen= sizeof(sin);
107
108 if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1)
109 {
110 std::cout << __FILE__ << ":" << __LINE__
111 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
112 << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port
113 << " fd:" << fd
114 << std::endl;
115 }
116 else
117 {
118 std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl;
119 }
120 }
121
122 memcached_protocol_client_destroy(c);
123 closesocket(fd);
124 }
125 else
126 {
127 short flags = 0;
128 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
129 {
130 flags= EV_WRITE;
131 }
132
133 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
134 {
135 flags|= EV_READ;
136 }
137
138 event_set(&client->event, int(fd), flags, drive_client, client);
139 event_base_set(event_base, &client->event);
140
141 if (event_add(&client->event, 0) == -1)
142 {
143 memcached_protocol_client_destroy(c);
144 closesocket(fd);
145 }
146 }
147 }
148
149 /**
150 * Callback for accepting new connections
151 * @param fd the socket for the server socket
152 * @param which identifying the event that occurred (not used)
153 * @param arg the connection structure for the server
154 */
155 static void accept_handler(memcached_socket_t fd, short, void *arg)
156 {
157 struct connection *server= (struct connection *)arg;
158 /* accept new client */
159 struct sockaddr_storage addr;
160 socklen_t addrlen= sizeof(addr);
161 memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
162
163 if (sock == INVALID_SOCKET)
164 {
165 perror("Failed to accept client");
166 }
167
168 #ifndef WIN32
169 if (sock >= maxconns)
170 {
171 closesocket(sock);
172 return ;
173 }
174 #endif
175
176 struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock);
177 if (c == NULL)
178 {
179 closesocket(sock);
180 }
181 else
182 {
183 memcached_protocol_client_set_verbose(c, global_options.is_verbose);
184 struct connection *client = &socket_userdata_map[sock];
185 client->userdata= c;
186
187 event_set(&client->event, int(sock), EV_READ, drive_client, client);
188 event_base_set(event_base, &client->event);
189 if (event_add(&client->event, 0) == -1)
190 {
191 std::cerr << "Failed to add event for " << sock << std::endl;
192 memcached_protocol_client_destroy(c);
193 closesocket(sock);
194 }
195 }
196 }
197
198 static bool server_socket(util::log_info_st& log_file, const std::string& service)
199 {
200 struct addrinfo *ai;
201 struct addrinfo hints;
202 memset(&hints, 0, sizeof(struct addrinfo));
203
204 hints.ai_flags= AI_PASSIVE;
205 hints.ai_family= AF_UNSPEC;
206 hints.ai_socktype= SOCK_STREAM;
207
208 int error= getaddrinfo("127.0.0.1", service.c_str(), &hints, &ai);
209 if (error != 0)
210 {
211 if (error != EAI_SYSTEM)
212 {
213 std::string buffer("getaddrinfo: ");
214 buffer+= gai_strerror(error);
215 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
216 }
217 else
218 {
219 std::string buffer("getaddrinfo: ");
220 buffer+= strerror(errno);
221 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
222 }
223
224 return false;
225 }
226
227 struct linger ling= {0, 0};
228
229 for (struct addrinfo *next= ai; next; next= next->ai_next)
230 {
231 memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
232 if (sock == INVALID_SOCKET)
233 {
234 std::string buffer("Failed to create socket: ");
235 buffer+= strerror(errno);
236 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
237 continue;
238 }
239
240 int flags;
241 #ifdef WIN32
242 u_long arg = 1;
243 if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
244 {
245 std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl;
246 closesocket(sock);
247 continue;
248 }
249 #else
250 flags= fcntl(sock, F_GETFL, 0);
251 if (flags == -1)
252 {
253 std::string buffer("Failed to get socket flags: ");
254 buffer+= strerror(errno);
255 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
256 closesocket(sock);
257 continue;
258 }
259
260 if ((flags & O_NONBLOCK) != O_NONBLOCK)
261 {
262 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
263 {
264 std::string buffer("Failed to set socket to nonblocking mode: ");
265 buffer+= strerror(errno);
266 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
267 closesocket(sock);
268 continue;
269 }
270 }
271 #endif
272
273 flags= 1;
274 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
275 {
276 std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
277 }
278
279 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
280 {
281 std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl;
282 }
283
284 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
285 {
286 std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl;
287 }
288
289 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
290 {
291 std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl;
292 }
293
294 if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
295 {
296 if (get_socket_errno() != EADDRINUSE)
297 {
298 std::cerr << "bind(): " << strerror(errno) << std::endl;
299 freeaddrinfo(ai);
300 }
301 closesocket(sock);
302 continue;
303 }
304
305 if (listen(sock, 1024) == SOCKET_ERROR)
306 {
307 std::string buffer("listen(): ");
308 buffer+= strerror(errno);
309 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
310 closesocket(sock);
311 continue;
312 }
313
314 if (global_options.is_verbose)
315 {
316 std::string buffer("Listening to: ");
317 buffer+= global_options.service;
318 log_file.write(util::VERBOSE_NOTICE, buffer.c_str());
319 }
320
321 server_sockets[num_server_sockets++]= sock;
322 }
323
324 freeaddrinfo(ai);
325
326 return (num_server_sockets > 0) ? true : false;
327 }
328
329 /**
330 * Convert a command code to a textual string
331 * @param cmd the comcode to convert
332 * @return a textual string with the command or NULL for unknown commands
333 */
334 static const char* comcode2str(uint8_t cmd)
335 {
336 static const char * const text[] = {
337 "GET", "SET", "ADD", "REPLACE", "DELETE",
338 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
339 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
340 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
341 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
342 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
343 };
344
345 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
346 {
347 return text[cmd];
348 }
349
350 return NULL;
351 }
352
353 /**
354 * Print out the command we are about to execute
355 */
356 static void pre_execute(const void *cookie,
357 protocol_binary_request_header *header)
358 {
359 if (global_options.is_verbose)
360 {
361 if (header)
362 {
363 const char *cmd= comcode2str(header->request.opcode);
364 if (cmd != NULL)
365 {
366 std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl;
367 }
368 else
369 {
370 std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl;
371 }
372 }
373 else
374 {
375 std::cout << "pre_execute from " << cookie << std::endl;
376 }
377 }
378 }
379
380 /**
381 * Print out the command we just executed
382 */
383 static void post_execute(const void *cookie,
384 protocol_binary_request_header *header)
385 {
386 if (global_options.is_verbose)
387 {
388 if (header)
389 {
390 const char *cmd= comcode2str(header->request.opcode);
391 if (cmd != NULL)
392 {
393 std::cout << "post_execute from " << cookie << ": " << cmd << std::endl;
394 }
395 else
396 {
397 std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl;
398 }
399 }
400 else
401 {
402 std::cout << "post_execute from " << cookie << std::endl;
403 }
404 }
405 }
406
407 /**
408 * Callback handler for all unknown commands.
409 * Send an unknown command back to the client
410 */
411 static protocol_binary_response_status unknown(const void *cookie,
412 protocol_binary_request_header *header,
413 memcached_binary_protocol_raw_response_handler response_handler)
414 {
415 protocol_binary_response_no_extras response;
416 memset(&response, 0, sizeof(protocol_binary_response_no_extras));
417
418 response.message.header.response.magic= PROTOCOL_BINARY_RES;
419 response.message.header.response.opcode= header->request.opcode;
420 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
421 response.message.header.response.opaque= header->request.opaque;
422
423 return response_handler(cookie, header, (protocol_binary_response_header*)&response);
424 }
425
426 /**
427 * Program entry point. Bind to the specified port(s) and serve clients
428 *
429 * @param argc number of items in the argument vector
430 * @param argv argument vector
431 * @return EXIT_SUCCESS on success, 1 otherwise
432 */
433 int main(int argc, char **argv)
434 {
435 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
436
437 {
438 enum long_option_t {
439 OPT_HELP,
440 OPT_VERBOSE,
441 OPT_DAEMON,
442 OPT_PROTOCOL_VERSION,
443 OPT_VERSION,
444 OPT_PORT,
445 OPT_MAX_CONNECTIONS,
446 OPT_LOGFILE,
447 OPT_PIDFILE
448 };
449
450 static struct option long_options[]=
451 {
452 { "help", no_argument, NULL, OPT_HELP },
453 { "port", required_argument, NULL, OPT_PORT },
454 { "verbose", no_argument, NULL, OPT_VERBOSE },
455 { "daemon", no_argument, NULL, OPT_DAEMON },
456 { "protocol", no_argument, NULL, OPT_PROTOCOL_VERSION },
457 { "version", no_argument, NULL, OPT_VERSION },
458 { "max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS },
459 { "pid-file", required_argument, NULL, OPT_PIDFILE },
460 { "log-file", required_argument, NULL, OPT_LOGFILE },
461 {0, 0, 0, 0}
462 };
463
464 bool opt_help= false;
465 int option_index;
466 bool done= false;
467 while (done == false)
468 {
469 switch (getopt_long(argc, argv, "", long_options, &option_index))
470 {
471 case -1:
472 done= true;
473 break;
474
475 case OPT_PROTOCOL_VERSION:
476 interface= &interface_v1_impl;
477 break;
478
479 case OPT_PIDFILE:
480 global_options.pid_file= optarg;
481 break;
482
483 case OPT_LOGFILE:
484 global_options.log_file= optarg;
485 break;
486
487 case OPT_VERBOSE:
488 global_options.is_verbose= true;
489 break;
490
491 case OPT_VERSION:
492 break;
493
494 case OPT_DAEMON:
495 global_options.opt_daemon= true;
496 break;
497
498 case OPT_PORT:
499 global_options.service= optarg;
500 break;
501
502 case OPT_MAX_CONNECTIONS:
503 maxconns= atoi(optarg);
504 break;
505
506 case OPT_HELP: /* FALLTHROUGH */
507 opt_help= true;
508 break;
509
510 default:
511 {
512 std::cerr << "Unknown option: " << optarg << std::endl;
513 return EXIT_FAILURE;
514 }
515 }
516 }
517
518 if (opt_help)
519 {
520 std::cout << "Usage: " << argv[0] << std::endl;
521 for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++)
522 {
523 std::cout << "\t" << ptr_option->name << std::endl;
524 }
525 return EXIT_SUCCESS;
526 }
527 }
528
529 if (global_options.opt_daemon)
530 {
531 util::daemonize(false, true);
532 }
533
534 if (initialize_storage() == false)
535 {
536 /* Error message already printed */
537 return EXIT_FAILURE;
538 }
539
540 util::Pidfile _pid_file(global_options.pid_file);
541
542 if (_pid_file.create() == false)
543 {
544 std::cerr << "Failed to create pid-file" << _pid_file.error_message() << std::endl;
545 return EXIT_FAILURE;
546 }
547
548 util::log_info_st log_file(argv[0], global_options.log_file, false);
549 log_file.write(util::VERBOSE_NOTICE, "starting log");
550
551 /*
552 * We need to initialize the handlers manually due to a bug in the
553 * warnings generated by struct initialization in gcc (all the way up to 4.4)
554 */
555 initialize_interface_v0_handler(log_file);
556 initialize_interface_v1_handler(log_file);
557
558
559 if (server_socket(log_file, global_options.service) == false)
560 {
561 return EXIT_FAILURE;
562 }
563
564 if (num_server_sockets == 0)
565 {
566 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
567 return EXIT_FAILURE;
568 }
569
570 /*
571 * Create and initialize the handles to the protocol handlers. I want
572 * to be able to trace the traffic throught the pre/post handlers, and
573 * set up a common handler for unknown messages
574 */
575 interface->pre_execute= pre_execute;
576 interface->post_execute= post_execute;
577 interface->unknown= unknown;
578
579 struct memcached_protocol_st *protocol_handle;
580 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
581 {
582 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
583 return EXIT_FAILURE;
584 }
585
586 socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection));
587 if (socket_userdata_map == NULL)
588 {
589 log_file.write(util::VERBOSE_ERROR, "Failed to allocate room for connections");
590 return EXIT_FAILURE;
591 }
592
593 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
594 memcached_binary_protocol_set_pedantic(protocol_handle, true);
595
596 event_base= event_init();
597 if (event_base == NULL)
598 {
599 std::cerr << "Failed to create an instance of libevent" << std::endl;
600 return EXIT_FAILURE;
601 }
602
603 for (int xx= 0; xx < num_server_sockets; ++xx)
604 {
605 struct connection *conn= &socket_userdata_map[server_sockets[xx]];
606 conn->userdata= protocol_handle;
607
608 event_set(&conn->event, int(server_sockets[xx]), EV_READ | EV_PERSIST, accept_handler, conn);
609
610 event_base_set(event_base, &conn->event);
611 if (event_add(&conn->event, 0) == -1)
612 {
613 log_file.write(util::VERBOSE_ERROR, "Failed to add event");
614 closesocket(server_sockets[xx]);
615 }
616 }
617
618 if (global_options.opt_daemon)
619 {
620 if (util::daemon_is_ready(true) == false)
621 {
622 log_file.write(util::VERBOSE_ERROR, "Failed for util::daemon_is_ready()");
623 return EXIT_FAILURE;
624 }
625 }
626
627
628 /* Serve all of the clients */
629 switch (event_base_loop(event_base, 0))
630 {
631 case -1:
632 log_file.write(util::VERBOSE_ERROR, "event_base_loop() failed");
633 break;
634
635 case 1:
636 log_file.write(util::VERBOSE_ERROR, "event_base_loop(), no events were registered");
637 break;
638
639 default:
640 break;
641 }
642 log_file.write(util::VERBOSE_NOTICE, "exiting");
643
644 /* NOTREACHED */
645 return EXIT_SUCCESS;
646 }