1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.cc - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.cc - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.cc- This file sets up all of the sockets and run the main
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
26 #include <mem_config.h>
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
38 using namespace datadifferential
;
50 #include <sys/types.h>
54 extern memcached_binary_protocol_callback_st interface_v0_impl
;
55 extern memcached_binary_protocol_callback_st interface_v1_impl
;
57 static memcached_socket_t server_sockets
[1024];
58 static int num_server_sockets
= 0;
66 /* The default maximum number of connections... (change with -c) */
67 static int maxconns
= 1024;
69 static struct connection
*socket_userdata_map
;
70 static struct event_base
*event_base
= NULL
;
87 static options_st global_options
;
90 * Callback for driving a client connection
91 * @param fd the socket for the client socket
92 * @param which identifying the event that occurred (not used)
93 * @param arg the connection structure for the client
95 static void drive_client(memcached_socket_t fd
, short, void *arg
)
97 struct connection
*client
= (struct connection
*)arg
;
98 struct memcached_protocol_client_st
* c
= (struct memcached_protocol_client_st
*)client
->userdata
;
101 memcached_protocol_event_t events
= memcached_protocol_client_work(c
);
102 if (events
& MEMCACHED_PROTOCOL_ERROR_EVENT
)
104 if (global_options
.is_verbose
)
106 struct sockaddr_in sin
;
107 socklen_t addrlen
= sizeof(sin
);
109 if (getsockname(fd
, (struct sockaddr
*)&sin
, &addrlen
) != -1)
111 std::cout
<< __FILE__
<< ":" << __LINE__
112 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
113 << " " << inet_ntoa(sin
.sin_addr
) << ":" << sin
.sin_port
119 std::cout
<< __FILE__
<< ":" << __LINE__
<< "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl
;
123 memcached_protocol_client_destroy(c
);
129 if (events
& MEMCACHED_PROTOCOL_WRITE_EVENT
)
134 if (events
& MEMCACHED_PROTOCOL_READ_EVENT
)
139 event_set(&client
->event
, int(fd
), flags
, drive_client
, client
);
140 event_base_set(event_base
, &client
->event
);
142 if (event_add(&client
->event
, 0) == -1)
144 memcached_protocol_client_destroy(c
);
151 * Callback for accepting new connections
152 * @param fd the socket for the server socket
153 * @param which identifying the event that occurred (not used)
154 * @param arg the connection structure for the server
156 static void accept_handler(memcached_socket_t fd
, short, void *arg
)
158 struct connection
*server
= (struct connection
*)arg
;
159 /* accept new client */
160 struct sockaddr_storage addr
;
161 socklen_t addrlen
= sizeof(addr
);
162 memcached_socket_t sock
= accept(fd
, (struct sockaddr
*)&addr
, &addrlen
);
164 if (sock
== INVALID_SOCKET
)
166 perror("Failed to accept client");
170 if (sock
>= maxconns
)
177 struct memcached_protocol_client_st
* c
= memcached_protocol_create_client((memcached_protocol_st
*)server
->userdata
, sock
);
184 memcached_protocol_client_set_verbose(c
, global_options
.is_verbose
);
185 struct connection
*client
= &socket_userdata_map
[sock
];
188 event_set(&client
->event
, int(sock
), EV_READ
, drive_client
, client
);
189 event_base_set(event_base
, &client
->event
);
190 if (event_add(&client
->event
, 0) == -1)
192 std::cerr
<< "Failed to add event for " << sock
<< std::endl
;
193 memcached_protocol_client_destroy(c
);
199 static bool server_socket(util::log_info_st
& log_file
, const std::string
& service
)
202 struct addrinfo hints
;
203 memset(&hints
, 0, sizeof(struct addrinfo
));
205 hints
.ai_flags
= AI_PASSIVE
;
206 hints
.ai_family
= AF_UNSPEC
;
207 hints
.ai_socktype
= SOCK_STREAM
;
209 int error
= getaddrinfo("127.0.0.1", service
.c_str(), &hints
, &ai
);
212 if (error
!= EAI_SYSTEM
)
214 std::string
buffer("getaddrinfo: ");
215 buffer
+= gai_strerror(error
);
216 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
220 std::string
buffer("getaddrinfo: ");
221 buffer
+= strerror(errno
);
222 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
228 struct linger ling
= {0, 0};
230 for (struct addrinfo
*next
= ai
; next
; next
= next
->ai_next
)
232 memcached_socket_t sock
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
);
233 if (sock
== INVALID_SOCKET
)
235 std::string
buffer("Failed to create socket: ");
236 buffer
+= strerror(errno
);
237 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
244 if (ioctlsocket(sock
, FIONBIO
, &arg
) == SOCKET_ERROR
)
246 std::cerr
<< "Failed to set nonblocking io: " << strerror(errno
) << std::endl
;
251 flags
= fcntl(sock
, F_GETFL
, 0);
254 std::string
buffer("Failed to get socket flags: ");
255 buffer
+= strerror(errno
);
256 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
261 if ((flags
& O_NONBLOCK
) != O_NONBLOCK
)
263 if (fcntl(sock
, F_SETFL
, flags
| O_NONBLOCK
) == -1)
265 std::string
buffer("Failed to set socket to nonblocking mode: ");
266 buffer
+= strerror(errno
);
267 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
275 if (setsockopt(sock
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
)) != 0)
277 std::cerr
<< "Failed to set SO_REUSEADDR: " << strerror(errno
) << std::endl
;
280 if (setsockopt(sock
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
)) != 0)
282 std::cerr
<< "Failed to set SO_KEEPALIVE: " << strerror(errno
) << std::endl
;
285 if (setsockopt(sock
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
)) != 0)
287 std::cerr
<< "Failed to set SO_LINGER: " << strerror(errno
) << std::endl
;
290 if (setsockopt(sock
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
, sizeof(flags
)) != 0)
292 std::cerr
<< "Failed to set TCP_NODELAY: " << strerror(errno
) << std::endl
;
295 if (bind(sock
, next
->ai_addr
, next
->ai_addrlen
) == SOCKET_ERROR
)
297 if (get_socket_errno() != EADDRINUSE
)
299 std::cerr
<< "bind(): " << strerror(errno
) << std::endl
;
306 if (listen(sock
, 1024) == SOCKET_ERROR
)
308 std::string
buffer("listen(): ");
309 buffer
+= strerror(errno
);
310 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
315 if (global_options
.is_verbose
)
317 std::string
buffer("Listening to: ");
318 buffer
+= global_options
.service
;
319 log_file
.write(util::VERBOSE_NOTICE
, buffer
.c_str());
322 server_sockets
[num_server_sockets
++]= sock
;
327 return (num_server_sockets
> 0) ? true : false;
331 * Convert a command code to a textual string
332 * @param cmd the comcode to convert
333 * @return a textual string with the command or NULL for unknown commands
335 static const char* comcode2str(uint8_t cmd
)
337 static const char * const text
[] = {
338 "GET", "SET", "ADD", "REPLACE", "DELETE",
339 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
340 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
341 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
342 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
343 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
346 if (cmd
<= PROTOCOL_BINARY_CMD_PREPENDQ
)
355 * Print out the command we are about to execute
357 static void pre_execute(const void *cookie
,
358 protocol_binary_request_header
*header
)
360 if (global_options
.is_verbose
)
364 const char *cmd
= comcode2str(header
->request
.opcode
);
367 std::cout
<< "pre_execute from " << cookie
<< ": " << cmd
<< std::endl
;
371 std::cout
<< "pre_execute from " << cookie
<< ": " << header
->request
.opcode
<< std::endl
;
376 std::cout
<< "pre_execute from " << cookie
<< std::endl
;
382 * Print out the command we just executed
384 static void post_execute(const void *cookie
,
385 protocol_binary_request_header
*header
)
387 if (global_options
.is_verbose
)
391 const char *cmd
= comcode2str(header
->request
.opcode
);
394 std::cout
<< "post_execute from " << cookie
<< ": " << cmd
<< std::endl
;
398 std::cout
<< "post_execute from " << cookie
<< ": " << header
->request
.opcode
<< std::endl
;
403 std::cout
<< "post_execute from " << cookie
<< std::endl
;
409 * Callback handler for all unknown commands.
410 * Send an unknown command back to the client
412 static protocol_binary_response_status
unknown(const void *cookie
,
413 protocol_binary_request_header
*header
,
414 memcached_binary_protocol_raw_response_handler response_handler
)
416 protocol_binary_response_no_extras response
;
417 memset(&response
, 0, sizeof(protocol_binary_response_no_extras
));
419 response
.message
.header
.response
.magic
= PROTOCOL_BINARY_RES
;
420 response
.message
.header
.response
.opcode
= header
->request
.opcode
;
421 response
.message
.header
.response
.status
= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
);
422 response
.message
.header
.response
.opaque
= header
->request
.opaque
;
424 return response_handler(cookie
, header
, (protocol_binary_response_header
*)&response
);
428 * Program entry point. Bind to the specified port(s) and serve clients
430 * @param argc number of items in the argument vector
431 * @param argv argument vector
432 * @return EXIT_SUCCESS on success, 1 otherwise
434 int main(int argc
, char **argv
)
436 memcached_binary_protocol_callback_st
*interface
= &interface_v0_impl
;
443 OPT_PROTOCOL_VERSION
,
451 static struct option long_options
[]=
453 { "help", no_argument
, NULL
, OPT_HELP
},
454 { "port", required_argument
, NULL
, OPT_PORT
},
455 { "verbose", no_argument
, NULL
, OPT_VERBOSE
},
456 { "daemon", no_argument
, NULL
, OPT_DAEMON
},
457 { "protocol", no_argument
, NULL
, OPT_PROTOCOL_VERSION
},
458 { "version", no_argument
, NULL
, OPT_VERSION
},
459 { "max-connections", required_argument
, NULL
, OPT_MAX_CONNECTIONS
},
460 { "pid-file", required_argument
, NULL
, OPT_PIDFILE
},
461 { "log-file", required_argument
, NULL
, OPT_LOGFILE
},
465 bool opt_help
= false;
468 while (done
== false)
470 switch (getopt_long(argc
, argv
, "", long_options
, &option_index
))
476 case OPT_PROTOCOL_VERSION
:
477 interface
= &interface_v1_impl
;
481 global_options
.pid_file
= optarg
;
485 global_options
.log_file
= optarg
;
489 global_options
.is_verbose
= true;
496 global_options
.opt_daemon
= true;
500 global_options
.service
= optarg
;
503 case OPT_MAX_CONNECTIONS
:
504 maxconns
= atoi(optarg
);
507 case OPT_HELP
: /* FALLTHROUGH */
513 std::cerr
<< "Unknown option: " << optarg
<< std::endl
;
521 std::cout
<< "Usage: " << argv
[0] << std::endl
;
522 for (struct option
*ptr_option
= long_options
; ptr_option
->name
; ptr_option
++)
524 std::cout
<< "\t" << ptr_option
->name
<< std::endl
;
530 if (global_options
.opt_daemon
)
532 util::daemonize(false, true);
535 if (initialize_storage() == false)
537 /* Error message already printed */
541 util::Pidfile
_pid_file(global_options
.pid_file
);
543 if (_pid_file
.create() == false)
545 std::cerr
<< "Failed to create pid-file" << _pid_file
.error_message() << std::endl
;
549 util::log_info_st
log_file(argv
[0], global_options
.log_file
, false);
550 log_file
.write(util::VERBOSE_NOTICE
, "starting log");
553 * We need to initialize the handlers manually due to a bug in the
554 * warnings generated by struct initialization in gcc (all the way up to 4.4)
556 initialize_interface_v0_handler(log_file
);
557 initialize_interface_v1_handler(log_file
);
560 if (server_socket(log_file
, global_options
.service
) == false)
565 if (num_server_sockets
== 0)
567 log_file
.write(util::VERBOSE_ERROR
, "No server sockets are available.");
572 * Create and initialize the handles to the protocol handlers. I want
573 * to be able to trace the traffic throught the pre/post handlers, and
574 * set up a common handler for unknown messages
576 interface
->pre_execute
= pre_execute
;
577 interface
->post_execute
= post_execute
;
578 interface
->unknown
= unknown
;
580 struct memcached_protocol_st
*protocol_handle
;
581 if ((protocol_handle
= memcached_protocol_create_instance()) == NULL
)
583 log_file
.write(util::VERBOSE_ERROR
, "No server sockets are available.");
587 socket_userdata_map
= (struct connection
*)calloc((size_t)(maxconns
), sizeof(struct connection
));
588 if (socket_userdata_map
== NULL
)
590 log_file
.write(util::VERBOSE_ERROR
, "Failed to allocate room for connections");
594 memcached_binary_protocol_set_callbacks(protocol_handle
, interface
);
595 memcached_binary_protocol_set_pedantic(protocol_handle
, true);
597 event_base
= event_init();
598 if (event_base
== NULL
)
600 std::cerr
<< "Failed to create an instance of libevent" << std::endl
;
604 for (int xx
= 0; xx
< num_server_sockets
; ++xx
)
606 struct connection
*conn
= &socket_userdata_map
[server_sockets
[xx
]];
607 conn
->userdata
= protocol_handle
;
609 event_set(&conn
->event
, int(server_sockets
[xx
]), EV_READ
| EV_PERSIST
, accept_handler
, conn
);
611 event_base_set(event_base
, &conn
->event
);
612 if (event_add(&conn
->event
, 0) == -1)
614 log_file
.write(util::VERBOSE_ERROR
, "Failed to add event");
615 closesocket(server_sockets
[xx
]);
619 if (global_options
.opt_daemon
)
621 if (util::daemon_is_ready(true) == false)
623 log_file
.write(util::VERBOSE_ERROR
, "Failed for util::daemon_is_ready()");
629 /* Serve all of the clients */
630 switch (event_base_loop(event_base
, 0))
633 log_file
.write(util::VERBOSE_ERROR
, "event_base_loop() failed");
637 log_file
.write(util::VERBOSE_ERROR
, "event_base_loop(), no events were registered");
643 log_file
.write(util::VERBOSE_NOTICE
, "exiting");