Update header check.
[m6w6/libmemcached] / example / memcached_light.cc
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.cc - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.cc - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.cc- This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include <config.h>
27
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
33
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
37
38 using namespace datadifferential;
39
40 #include <event.h>
41
42 #include <cassert>
43 #include <cerrno>
44 #include <cstdio>
45 #include <cstdlib>
46 #include <cstring>
47 #include <fcntl.h>
48 #include <getopt.h>
49 #include <iostream>
50 #include <sys/types.h>
51 #include <unistd.h>
52
53 extern memcached_binary_protocol_callback_st interface_v0_impl;
54 extern memcached_binary_protocol_callback_st interface_v1_impl;
55
56 static memcached_socket_t server_sockets[1024];
57 static int num_server_sockets= 0;
58
59 struct connection
60 {
61 void *userdata;
62 struct event event;
63 };
64
65 /* The default maximum number of connections... (change with -c) */
66 static int maxconns= 1024;
67
68 static struct connection *socket_userdata_map;
69 static struct event_base *event_base= NULL;
70
71 struct options_st {
72 std::string pid_file;
73 std::string service;
74 std::string log_file;
75 bool is_verbose;
76 bool opt_daemon;
77
78 options_st() :
79 service("9999"),
80 is_verbose(false)
81 {
82 }
83 };
84
85 static options_st global_options;
86
87 /**
88 * Callback for driving a client connection
89 * @param fd the socket for the client socket
90 * @param which identifying the event that occurred (not used)
91 * @param arg the connection structure for the client
92 */
93 static void drive_client(memcached_socket_t fd, short, void *arg)
94 {
95 struct connection *client= (struct connection*)arg;
96 struct memcached_protocol_client_st* c= (struct memcached_protocol_client_st*)client->userdata;
97 assert(c != NULL);
98
99 memcached_protocol_event_t events= memcached_protocol_client_work(c);
100 if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
101 {
102 if (global_options.is_verbose)
103 {
104 struct sockaddr_in sin;
105 socklen_t addrlen= sizeof(sin);
106
107 if (getsockname(fd, (struct sockaddr *)&sin, &addrlen) != -1)
108 {
109 std::cout << __FILE__ << ":" << __LINE__
110 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
111 << " " << inet_ntoa(sin.sin_addr) << ":" << sin.sin_port
112 << " fd:" << fd
113 << std::endl;
114 }
115 else
116 {
117 std::cout << __FILE__ << ":" << __LINE__ << "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl;
118 }
119 }
120
121 memcached_protocol_client_destroy(c);
122 closesocket(fd);
123 }
124 else
125 {
126 short flags = 0;
127 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
128 {
129 flags= EV_WRITE;
130 }
131
132 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
133 {
134 flags|= EV_READ;
135 }
136
137 event_set(&client->event, (intptr_t)fd, flags, drive_client, client);
138 event_base_set(event_base, &client->event);
139
140 if (event_add(&client->event, 0) == -1)
141 {
142 memcached_protocol_client_destroy(c);
143 closesocket(fd);
144 }
145 }
146 }
147
148 /**
149 * Callback for accepting new connections
150 * @param fd the socket for the server socket
151 * @param which identifying the event that occurred (not used)
152 * @param arg the connection structure for the server
153 */
154 static void accept_handler(memcached_socket_t fd, short, void *arg)
155 {
156 struct connection *server= (struct connection *)arg;
157 /* accept new client */
158 struct sockaddr_storage addr;
159 socklen_t addrlen= sizeof(addr);
160 memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
161
162 if (sock == INVALID_SOCKET)
163 {
164 perror("Failed to accept client");
165 }
166
167 #ifndef WIN32
168 if (sock >= maxconns)
169 {
170 closesocket(sock);
171 return ;
172 }
173 #endif
174
175 struct memcached_protocol_client_st* c= memcached_protocol_create_client((memcached_protocol_st*)server->userdata, sock);
176 if (c == NULL)
177 {
178 closesocket(sock);
179 }
180 else
181 {
182 memcached_protocol_client_set_verbose(c, global_options.is_verbose);
183 struct connection *client = &socket_userdata_map[sock];
184 client->userdata= c;
185
186 event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client);
187 event_base_set(event_base, &client->event);
188 if (event_add(&client->event, 0) == -1)
189 {
190 std::cerr << "Failed to add event for " << sock << std::endl;
191 memcached_protocol_client_destroy(c);
192 closesocket(sock);
193 }
194 }
195 }
196
197 static bool server_socket(util::log_info_st& log_file, const std::string& service)
198 {
199 struct addrinfo *ai;
200 struct addrinfo hints;
201 memset(&hints, 0, sizeof(struct addrinfo));
202
203 hints.ai_flags= AI_PASSIVE;
204 hints.ai_family= AF_UNSPEC;
205 hints.ai_socktype= SOCK_STREAM;
206
207 int error= getaddrinfo("127.0.0.1", service.c_str(), &hints, &ai);
208 if (error != 0)
209 {
210 if (error != EAI_SYSTEM)
211 {
212 std::string buffer("getaddrinfo: ");
213 buffer+= gai_strerror(error);
214 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
215 }
216 else
217 {
218 std::string buffer("getaddrinfo: ");
219 buffer+= strerror(errno);
220 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
221 }
222
223 return false;
224 }
225
226 struct linger ling= {0, 0};
227
228 for (struct addrinfo *next= ai; next; next= next->ai_next)
229 {
230 memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
231 if (sock == INVALID_SOCKET)
232 {
233 std::string buffer("Failed to create socket: ");
234 buffer+= strerror(errno);
235 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
236 continue;
237 }
238
239 int flags;
240 #ifdef WIN32
241 u_long arg = 1;
242 if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
243 {
244 std::cerr << "Failed to set nonblocking io: " << strerror(errno) << std::endl;
245 closesocket(sock);
246 continue;
247 }
248 #else
249 flags= fcntl(sock, F_GETFL, 0);
250 if (flags == -1)
251 {
252 std::string buffer("Failed to get socket flags: ");
253 buffer+= strerror(errno);
254 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
255 closesocket(sock);
256 continue;
257 }
258
259 if ((flags & O_NONBLOCK) != O_NONBLOCK)
260 {
261 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
262 {
263 std::string buffer("Failed to set socket to nonblocking mode: ");
264 buffer+= strerror(errno);
265 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
266 closesocket(sock);
267 continue;
268 }
269 }
270 #endif
271
272 flags= 1;
273 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
274 {
275 std::cerr << "Failed to set SO_REUSEADDR: " << strerror(errno) << std::endl;
276 }
277
278 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
279 {
280 std::cerr << "Failed to set SO_KEEPALIVE: " << strerror(errno) << std::endl;
281 }
282
283 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
284 {
285 std::cerr << "Failed to set SO_LINGER: " << strerror(errno) << std::endl;
286 }
287
288 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
289 {
290 std::cerr << "Failed to set TCP_NODELAY: " << strerror(errno) << std::endl;
291 }
292
293 if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
294 {
295 if (get_socket_errno() != EADDRINUSE)
296 {
297 std::cerr << "bind(): " << strerror(errno) << std::endl;
298 freeaddrinfo(ai);
299 }
300 closesocket(sock);
301 continue;
302 }
303
304 if (listen(sock, 1024) == SOCKET_ERROR)
305 {
306 std::string buffer("listen(): ");
307 buffer+= strerror(errno);
308 log_file.write(util::VERBOSE_ERROR, buffer.c_str());
309 closesocket(sock);
310 continue;
311 }
312
313 if (global_options.is_verbose)
314 {
315 std::string buffer("Listening to: ");
316 buffer+= global_options.service;
317 log_file.write(util::VERBOSE_NOTICE, buffer.c_str());
318 }
319
320 server_sockets[num_server_sockets++]= sock;
321 }
322
323 freeaddrinfo(ai);
324
325 return (num_server_sockets > 0) ? true : false;
326 }
327
328 /**
329 * Convert a command code to a textual string
330 * @param cmd the comcode to convert
331 * @return a textual string with the command or NULL for unknown commands
332 */
333 static const char* comcode2str(uint8_t cmd)
334 {
335 static const char * const text[] = {
336 "GET", "SET", "ADD", "REPLACE", "DELETE",
337 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
338 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
339 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
340 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
341 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
342 };
343
344 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
345 {
346 return text[cmd];
347 }
348
349 return NULL;
350 }
351
352 /**
353 * Print out the command we are about to execute
354 */
355 static void pre_execute(const void *cookie,
356 protocol_binary_request_header *header)
357 {
358 if (global_options.is_verbose)
359 {
360 if (header)
361 {
362 const char *cmd= comcode2str(header->request.opcode);
363 if (cmd != NULL)
364 {
365 std::cout << "pre_execute from " << cookie << ": " << cmd << std::endl;
366 }
367 else
368 {
369 std::cout << "pre_execute from " << cookie << ": " << header->request.opcode << std::endl;
370 }
371 }
372 else
373 {
374 std::cout << "pre_execute from " << cookie << std::endl;
375 }
376 }
377 }
378
379 /**
380 * Print out the command we just executed
381 */
382 static void post_execute(const void *cookie,
383 protocol_binary_request_header *header)
384 {
385 if (global_options.is_verbose)
386 {
387 if (header)
388 {
389 const char *cmd= comcode2str(header->request.opcode);
390 if (cmd != NULL)
391 {
392 std::cout << "post_execute from " << cookie << ": " << cmd << std::endl;
393 }
394 else
395 {
396 std::cout << "post_execute from " << cookie << ": " << header->request.opcode << std::endl;
397 }
398 }
399 else
400 {
401 std::cout << "post_execute from " << cookie << std::endl;
402 }
403 }
404 }
405
406 /**
407 * Callback handler for all unknown commands.
408 * Send an unknown command back to the client
409 */
410 static protocol_binary_response_status unknown(const void *cookie,
411 protocol_binary_request_header *header,
412 memcached_binary_protocol_raw_response_handler response_handler)
413 {
414 protocol_binary_response_no_extras response;
415 memset(&response, 0, sizeof(protocol_binary_response_no_extras));
416
417 response.message.header.response.magic= PROTOCOL_BINARY_RES;
418 response.message.header.response.opcode= header->request.opcode;
419 response.message.header.response.status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND);
420 response.message.header.response.opaque= header->request.opaque;
421
422 return response_handler(cookie, header, (protocol_binary_response_header*)&response);
423 }
424
425 /**
426 * Program entry point. Bind to the specified port(s) and serve clients
427 *
428 * @param argc number of items in the argument vector
429 * @param argv argument vector
430 * @return EXIT_SUCCESS on success, 1 otherwise
431 */
432 int main(int argc, char **argv)
433 {
434 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
435
436 {
437 enum long_option_t {
438 OPT_HELP,
439 OPT_VERBOSE,
440 OPT_DAEMON,
441 OPT_PROTOCOL_VERSION,
442 OPT_VERSION,
443 OPT_PORT,
444 OPT_MAX_CONNECTIONS,
445 OPT_LOGFILE,
446 OPT_PIDFILE
447 };
448
449 static struct option long_options[]=
450 {
451 { "help", no_argument, NULL, OPT_HELP },
452 { "port", required_argument, NULL, OPT_PORT },
453 { "verbose", no_argument, NULL, OPT_VERBOSE },
454 { "daemon", no_argument, NULL, OPT_DAEMON },
455 { "protocol", no_argument, NULL, OPT_PROTOCOL_VERSION },
456 { "version", no_argument, NULL, OPT_VERSION },
457 { "max-connections", required_argument, NULL, OPT_MAX_CONNECTIONS },
458 { "pid-file", required_argument, NULL, OPT_PIDFILE },
459 { "log-file", required_argument, NULL, OPT_LOGFILE },
460 {0, 0, 0, 0}
461 };
462
463 bool opt_help= false;
464 int option_index;
465 bool done= false;
466 while (done == false)
467 {
468 switch (getopt_long(argc, argv, "", long_options, &option_index))
469 {
470 case -1:
471 done= true;
472 break;
473
474 case OPT_PROTOCOL_VERSION:
475 interface= &interface_v1_impl;
476 break;
477
478 case OPT_PIDFILE:
479 global_options.pid_file= optarg;
480 break;
481
482 case OPT_LOGFILE:
483 global_options.log_file= optarg;
484 break;
485
486 case OPT_VERBOSE:
487 global_options.is_verbose= true;
488 break;
489
490 case OPT_VERSION:
491 break;
492
493 case OPT_DAEMON:
494 global_options.opt_daemon= true;
495 break;
496
497 case OPT_PORT:
498 global_options.service= optarg;
499 break;
500
501 case OPT_MAX_CONNECTIONS:
502 maxconns= atoi(optarg);
503 break;
504
505 case OPT_HELP: /* FALLTHROUGH */
506 opt_help= true;
507 break;
508
509 default:
510 {
511 std::cerr << "Unknown option: " << optarg << std::endl;
512 return EXIT_FAILURE;
513 }
514 }
515 }
516
517 if (opt_help)
518 {
519 std::cout << "Usage: " << argv[0] << std::endl;
520 for (struct option *ptr_option= long_options; ptr_option->name; ptr_option++)
521 {
522 std::cout << "\t" << ptr_option->name << std::endl;
523 }
524 return EXIT_SUCCESS;
525 }
526 }
527
528 if (global_options.opt_daemon)
529 {
530 util::daemonize(false, true);
531 }
532
533 if (initialize_storage() == false)
534 {
535 /* Error message already printed */
536 return EXIT_FAILURE;
537 }
538
539 util::Pidfile _pid_file(global_options.pid_file);
540
541 if (_pid_file.create() == false)
542 {
543 std::cerr << "Failed to create pid-file" << _pid_file.error_message() << std::endl;
544 return EXIT_FAILURE;
545 }
546
547 util::log_info_st log_file(argv[0], global_options.log_file, false);
548 log_file.write(util::VERBOSE_NOTICE, "starting log");
549
550 /*
551 * We need to initialize the handlers manually due to a bug in the
552 * warnings generated by struct initialization in gcc (all the way up to 4.4)
553 */
554 initialize_interface_v0_handler(log_file);
555 initialize_interface_v1_handler(log_file);
556
557
558 if (server_socket(log_file, global_options.service) == false)
559 {
560 return EXIT_FAILURE;
561 }
562
563 if (num_server_sockets == 0)
564 {
565 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
566 return EXIT_FAILURE;
567 }
568
569 /*
570 * Create and initialize the handles to the protocol handlers. I want
571 * to be able to trace the traffic throught the pre/post handlers, and
572 * set up a common handler for unknown messages
573 */
574 interface->pre_execute= pre_execute;
575 interface->post_execute= post_execute;
576 interface->unknown= unknown;
577
578 struct memcached_protocol_st *protocol_handle;
579 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
580 {
581 log_file.write(util::VERBOSE_ERROR, "No server sockets are available.");
582 return EXIT_FAILURE;
583 }
584
585 socket_userdata_map= (struct connection*)calloc((size_t)(maxconns), sizeof(struct connection));
586 if (socket_userdata_map == NULL)
587 {
588 log_file.write(util::VERBOSE_ERROR, "Failed to allocate room for connections");
589 return EXIT_FAILURE;
590 }
591
592 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
593 memcached_binary_protocol_set_pedantic(protocol_handle, true);
594
595 event_base= event_init();
596 if (event_base == NULL)
597 {
598 std::cerr << "Failed to create an instance of libevent" << std::endl;
599 return EXIT_FAILURE;
600 }
601
602 for (int xx= 0; xx < num_server_sockets; ++xx)
603 {
604 struct connection *conn= &socket_userdata_map[server_sockets[xx]];
605 conn->userdata= protocol_handle;
606
607 event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST, accept_handler, conn);
608
609 event_base_set(event_base, &conn->event);
610 if (event_add(&conn->event, 0) == -1)
611 {
612 log_file.write(util::VERBOSE_ERROR, "Failed to add event");
613 closesocket(server_sockets[xx]);
614 }
615 }
616
617 if (global_options.opt_daemon)
618 {
619 if (util::daemon_is_ready(true) == false)
620 {
621 log_file.write(util::VERBOSE_ERROR, "Failed for util::daemon_is_ready()");
622 return EXIT_FAILURE;
623 }
624 }
625
626
627 /* Serve all of the clients */
628 switch (event_base_loop(event_base, 0))
629 {
630 case -1:
631 log_file.write(util::VERBOSE_ERROR, "event_base_loop() failed");
632 break;
633
634 case 1:
635 log_file.write(util::VERBOSE_ERROR, "event_base_loop(), no events were registered");
636 break;
637
638 default:
639 break;
640 }
641 log_file.write(util::VERBOSE_NOTICE, "exiting");
642
643 /* NOTREACHED */
644 return EXIT_SUCCESS;
645 }