WIP
[m6w6/libmemcached] / example / memcached_light.cc
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.cc - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.cc - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.cc- This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include <mem_config.h>
27
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
33
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
37
38 using namespace datadifferential;
39
40 #include <event.h>
41
42 #include <cassert>
43 #include <cerrno>
44 #include <cstdio>
45 #include <cstdlib>
46 #include <cstring>
47 #include <fcntl.h>
48 #include <getopt.h>
49 #include <iostream>
50 #include <sys/types.h>
51 #if HAVE_UNISTD_H
52 # include <unistd.h>
53 #endif
54 extern memcached_binary_protocol_callback_st interface_v0_impl;
55 extern memcached_binary_protocol_callback_st interface_v1_impl;
56
57 static memcached_socket_t server_sockets[1024];
58 static int num_server_sockets= 0;
59
60 struct connection
61 {
62 void *userdata;
63 struct event event;
64 };
65
66 /* The default maximum number of connections... (change with -c) */
67 static int maxconns= 1024;
68
69 static struct connection *socket_userdata_map;
70 static struct event_base *event_base= NULL;
71
72 struct options_st {
73 std::string pid_file;
74 std::string service;
75 std::string log_file;
76 bool is_verbose;
77 bool opt_daemon;
78
79 options_st() :
80 service("9999"),
81 is_verbose(false),
82 opt_daemon(false)
83 {
84 }
85 };
86
87 static options_st global_options;
88
89 /**
90 * Callback for driving a client connection
91 * @param fd the socket for the client socket
92 * @param which identifying the event that occurred (not used)
93 * @param arg the connection structure for the client
94 */
95 static void drive_client(memcached_socket_t fd, short, void *arg)
96 {
97 struct connection *client= (struct connection*)arg;
98 struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata;
99 assert(c != NULL);
100
101 memcached_protocol_event_t events= memcached_protocol_client_work(c);
102 if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
103 {
104 if (global_options.is_verbose)
105 {
106 struct sockaddr_in sin;
107 socklen_t addrlen= sizeof(sin);
108
109 if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1)
110 {
111 std::cout << __FILE__ << ":" << __LINE__
112 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
113 << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port
114 << " fd:" << fd
115 << std::endl;
116 }
117 else
118 {
119 std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl;
120 }
121 }
122
123 memcached_protocol_client_destroy(c);
124 closesocket(fd);
125 }
126 else
127 {
128 short flags = 0;
129 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
130 {
131 flags= EV_WRITE;
132 }
133
134 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
135 {
136 flags|= EV_READ;
137 }
138
139 event_set(&client->event, int(fd), flags, drive_client, client);
140 event_base_set(event_base, &client->event);
141
142 if (event_add(&client->event, 0) == -1)
143 {
144 memcached_protocol_client_destroy(c);
145 closesocket(fd);
146 }
147 }
148 }
149
150 /**
151 * Callback for accepting new connections
152 * @param fd the socket for the server socket
153 * @param which identifying the event that occurred (not used)
154 * @param arg the connection structure for the server
155 */
156 static void accept_handler(memcached_socket_t fd, short, void *arg)
157 {
158 struct connection *server= (struct connection *)arg;
159 /* accept new client */
160 struct sockaddr_storage addr;
161 socklen_t addrlen= sizeof(addr);
162 memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
163
164 if (sock == INVALID_SOCKET)
165 {
166 perror("Failed to accept client");
167 }
168
169 #ifndef WIN32
170 if (sock >= maxconns)
171 {
172 closesocket(sock);
173 return ;
174 }
175 #endif
176
177 struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock);
178 if (c == NULL)
179 {
180 closesocket(sock);
181 }
182 else
183 {
184 memcached_protocol_client_set_verbose(c, global_options.is_verbose);
185 struct connection *client = &socket_userdata_map[sock];
186 client->userdata= c;
187
188 event_set(&client->event, int(sock), EV_READ, drive_client, client);
189 event_base_set(event_base, &client->event);
190 if (event_add(&client->event, 0) == -1)
191 {
192 std::cerr << "Failed to add event for " << sock << std::endl;
193 memcached_protocol_client_destroy(c);
194 closesocket(sock);
195 }
196 }
197 }
198
199 static bool server_socket(util::log_info_st& log_file, const std::string& service)
200 {
201 struct addrinfo *ai;
202 struct addrinfo hints;
203 memset(&hints, 0, sizeof(struct addrinfo));
204
205 hints.ai_flags= AI_PASSIVE;
206 hints.ai_family= AF_UNSPEC;
207 hints.ai_socktype= SOCK_STREAM;
208
209 int error= getaddrinfo("127.0.0.1", service.c_str(), &hints, &ai);
210 if (error != 0)
211 {
212 if (error != EAI_SYSTEM)
213 {
214 std::string buffer("getaddrinfo: ");
215 buffer+= gai_strerror(error);
216 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
217 }
218 else
219 {
220 std::string buffer("getaddrinfo: ");
221 buffer+= strerror(errno);
222 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
223 }
224
225 return false;
226 }
227
228 struct linger ling= {0, 0};
229
230 for (struct addrinfo *next= ai; next; next= next->ai_next)
231 {
232 memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
233 if (sock == INVALID_SOCKET)
234 {
235 std::string buffer("Failed to create socket: ");
236 buffer+= strerror(errno);
237 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
238 continue;
239 }
240
241 int flags;
242 #ifdef WIN32
243 u_long arg = 1;
244 if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
245 {
246 std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl;
247 closesocket(sock);
248 continue;
249 }
250 #else
251 flags= fcntl(sock, F_GETFL, 0);
252 if (flags == -1)
253 {
254 std::string buffer("Failed to get socket flags: ");
255 buffer+= strerror(errno);
256 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
257 closesocket(sock);
258 continue;
259 }
260
261 if ((flags & O_NONBLOCK) != O_NONBLOCK)
262 {
263 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
264 {
265 std::string buffer("Failed to set socket to nonblocking mode: ");
266 buffer+= strerror(errno);
267 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
268 closesocket(sock);
269 continue;
270 }
271 }
272 #endif
273
274 flags= 1;
275 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
276 {
277 std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
278 }
279
280 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
281 {
282 std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl;
283 }
284
285 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
286 {
287 std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl;
288 }
289
290 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
291 {
292 std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl;
293 }
294
295 if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
296 {
297 if (get_socket_errno() != EADDRINUSE)
298 {
299 std::cerr << "bind(): " << strerror(errno) << std::endl;
300 freeaddrinfo(ai);
301 }
302 closesocket(sock);
303 continue;
304 }
305
306 if (listen(sock, 1024) == SOCKET_ERROR)
307 {
308 std::string buffer("listen(): ");
309 buffer+= strerror(errno);
310 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
311 closesocket(sock);
312 continue;
313 }
314
315 if (global_options.is_verbose)
316 {
317 std::string buffer("Listening to: ");
318 buffer+= global_options.service;
319 log_file.write(util::VERBOSE_NOTICE, buffer.c_str());
320 }
321
322 server_sockets[num_server_sockets++]= sock;
323 }
324
325 freeaddrinfo(ai);
326
327 return (num_server_sockets > 0) ? true : false;
328 }
329
330 /**
331 * Convert a command code to a textual string
332 * @param cmd the comcode to convert
333 * @return a textual string with the command or NULL for unknown commands
334 */
335 static const char* comcode2str(uint8_t cmd)
336 {
337 static const char * const text[] = {
338 "GET", "SET", "ADD", "REPLACE", "DELETE",
339 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
340 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
341 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
342 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
343 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
344 };
345
346 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
347 {
348 return text[cmd];
349 }
350
351 return NULL;
352 }
353
354 /**
355 * Print out the command we are about to execute
356 */
357 static void pre_execute(const void *cookie,
358 protocol_binary_request_header *header)
359 {
360 if (global_options.is_verbose)
361 {
362 if (header)
363 {
364 const char *cmd= comcode2str(header->request.opcode);
365 if (cmd != NULL)
366 {
367 std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl;
368 }
369 else
370 {
371 std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl;
372 }
373 }
374 else
375 {
376 std::cout << "pre_execute from " << cookie << std::endl;
377 }
378 }
379 }
380
381 /**
382 * Print out the command we just executed
383 */
384 static void post_execute(const void *cookie,
385 protocol_binary_request_header *header)
386 {
387 if (global_options.is_verbose)
388 {
389 if (header)
390 {
391 const char *cmd= comcode2str(header->request.opcode);
392 if (cmd != NULL)
393 {
394 std::cout << "post_execute from " << cookie << ": " << cmd << std::endl;
395 }
396 else
397 {
398 std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl;
399 }
400 }
401 else
402 {
403 std::cout << "post_execute from " << cookie << std::endl;
404 }
405 }
406 }
407
408 /**
409 * Callback handler for all unknown commands.
410 * Send an unknown command back to the client
411 */
412 static protocol_binary_response_status unknown(const void *cookie,
413 protocol_binary_request_header *header,
414 memcached_binary_protocol_raw_response_handler response_handler)
415 {
416 protocol_binary_response_no_extras response;
417 memset(&response, 0, sizeof(protocol_binary_response_no_extras));
418
419 response.message.header.response.magic= PROTOCOL_BINARY_RES;
420 response.message.header.response.opcode= header->request.opcode;
421 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
422 response.message.header.response.opaque= header->request.opaque;
423
424 return response_handler(cookie, header, (protocol_binary_response_header*)&response);
425 }
426
427 /**
428 * Program entry point. Bind to the specified port(s) and serve clients
429 *
430 * @param argc number of items in the argument vector
431 * @param argv argument vector
432 * @return EXIT_SUCCESS on success, 1 otherwise
433 */
434 int main(int argc, char **argv)
435 {
436 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
437
438 {
439 enum long_option_t {
440 OPT_HELP,
441 OPT_VERBOSE,
442 OPT_DAEMON,
443 OPT_PROTOCOL_VERSION,
444 OPT_VERSION,
445 OPT_PORT,
446 OPT_MAX_CONNECTIONS,
447 OPT_LOGFILE,
448 OPT_PIDFILE
449 };
450
451 static struct option long_options[]=
452 {
453 { "help", no_argument, NULL, OPT_HELP },
454 { "port", required_argument, NULL, OPT_PORT },
455 { "verbose", no_argument, NULL, OPT_VERBOSE },
456 { "daemon", no_argument, NULL, OPT_DAEMON },
457 { "protocol", no_argument, NULL, OPT_PROTOCOL_VERSION },
458 { "version", no_argument, NULL, OPT_VERSION },
459 { "max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS },
460 { "pid-file", required_argument, NULL, OPT_PIDFILE },
461 { "log-file", required_argument, NULL, OPT_LOGFILE },
462 {0, 0, 0, 0}
463 };
464
465 bool opt_help= false;
466 int option_index;
467 bool done= false;
468 while (done == false)
469 {
470 switch (getopt_long(argc, argv, "", long_options, &option_index))
471 {
472 case -1:
473 done= true;
474 break;
475
476 case OPT_PROTOCOL_VERSION:
477 interface= &interface_v1_impl;
478 break;
479
480 case OPT_PIDFILE:
481 global_options.pid_file= optarg;
482 break;
483
484 case OPT_LOGFILE:
485 global_options.log_file= optarg;
486 break;
487
488 case OPT_VERBOSE:
489 global_options.is_verbose= true;
490 break;
491
492 case OPT_VERSION:
493 break;
494
495 case OPT_DAEMON:
496 global_options.opt_daemon= true;
497 break;
498
499 case OPT_PORT:
500 global_options.service= optarg;
501 break;
502
503 case OPT_MAX_CONNECTIONS:
504 maxconns= atoi(optarg);
505 break;
506
507 case OPT_HELP: /* FALLTHROUGH */
508 opt_help= true;
509 break;
510
511 default:
512 {
513 std::cerr << "Unknown option: " << optarg << std::endl;
514 return EXIT_FAILURE;
515 }
516 }
517 }
518
519 if (opt_help)
520 {
521 std::cout << "Usage: " << argv[0] << std::endl;
522 for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++)
523 {
524 std::cout << "\t" << ptr_option->name << std::endl;
525 }
526 return EXIT_SUCCESS;
527 }
528 }
529
530 if (global_options.opt_daemon)
531 {
532 util::daemonize(false, true);
533 }
534
535 if (initialize_storage() == false)
536 {
537 /* Error message already printed */
538 return EXIT_FAILURE;
539 }
540
541 util::Pidfile _pid_file(global_options.pid_file);
542
543 if (_pid_file.create() == false)
544 {
545 std::cerr << "Failed to create pid-file" << _pid_file.error_message() << std::endl;
546 return EXIT_FAILURE;
547 }
548
549 util::log_info_st log_file(argv[0], global_options.log_file, false);
550 log_file.write(util::VERBOSE_NOTICE, "starting log");
551
552 /*
553 * We need to initialize the handlers manually due to a bug in the
554 * warnings generated by struct initialization in gcc (all the way up to 4.4)
555 */
556 initialize_interface_v0_handler(log_file);
557 initialize_interface_v1_handler(log_file);
558
559
560 if (server_socket(log_file, global_options.service) == false)
561 {
562 return EXIT_FAILURE;
563 }
564
565 if (num_server_sockets == 0)
566 {
567 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
568 return EXIT_FAILURE;
569 }
570
571 /*
572 * Create and initialize the handles to the protocol handlers. I want
573 * to be able to trace the traffic throught the pre/post handlers, and
574 * set up a common handler for unknown messages
575 */
576 interface->pre_execute= pre_execute;
577 interface->post_execute= post_execute;
578 interface->unknown= unknown;
579
580 struct memcached_protocol_st *protocol_handle;
581 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
582 {
583 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
584 return EXIT_FAILURE;
585 }
586
587 socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection));
588 if (socket_userdata_map == NULL)
589 {
590 log_file.write(util::VERBOSE_ERROR, "Failed to allocate room for connections");
591 return EXIT_FAILURE;
592 }
593
594 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
595 memcached_binary_protocol_set_pedantic(protocol_handle, true);
596
597 event_base= event_init();
598 if (event_base == NULL)
599 {
600 std::cerr << "Failed to create an instance of libevent" << std::endl;
601 return EXIT_FAILURE;
602 }
603
604 for (int xx= 0; xx < num_server_sockets; ++xx)
605 {
606 struct connection *conn= &socket_userdata_map[server_sockets[xx]];
607 conn->userdata= protocol_handle;
608
609 event_set(&conn->event, int(server_sockets[xx]), EV_READ | EV_PERSIST, accept_handler, conn);
610
611 event_base_set(event_base, &conn->event);
612 if (event_add(&conn->event, 0) == -1)
613 {
614 log_file.write(util::VERBOSE_ERROR, "Failed to add event");
615 closesocket(server_sockets[xx]);
616 }
617 }
618
619 if (global_options.opt_daemon)
620 {
621 if (util::daemon_is_ready(true) == false)
622 {
623 log_file.write(util::VERBOSE_ERROR, "Failed for util::daemon_is_ready()");
624 return EXIT_FAILURE;
625 }
626 }
627
628
629 /* Serve all of the clients */
630 switch (event_base_loop(event_base, 0))
631 {
632 case -1:
633 log_file.write(util::VERBOSE_ERROR, "event_base_loop() failed");
634 break;
635
636 case 1:
637 log_file.write(util::VERBOSE_ERROR, "event_base_loop(), no events were registered");
638 break;
639
640 default:
641 break;
642 }
643 log_file.write(util::VERBOSE_NOTICE, "exiting");
644
645 /* NOTREACHED */
646 return EXIT_SUCCESS;
647 }