1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
38 using namespace datadifferential
;
50 #include <sys/types.h>
53 extern memcached_binary_protocol_callback_st interface_v0_impl
;
54 extern memcached_binary_protocol_callback_st interface_v1_impl
;
56 static memcached_socket_t server_sockets
[1024];
57 static int num_server_sockets
= 0;
65 /* The default maximum number of connections... (change with -c) */
66 static int maxconns
= 1024;
68 static struct connection
*socket_userdata_map
;
69 static struct event_base
*event_base
= NULL
;
85 static options_st global_options
;
88 * Callback for driving a client connection
89 * @param fd the socket for the client socket
90 * @param which identifying the event that occurred (not used)
91 * @param arg the connection structure for the client
93 static void drive_client(memcached_socket_t fd
, short, void *arg
)
95 struct connection
*client
= (struct connection
*)arg
;
96 struct memcached_protocol_client_st
* c
= (struct memcached_protocol_client_st
*)client
->userdata
;
99 memcached_protocol_event_t events
= memcached_protocol_client_work(c
);
100 if (events
& MEMCACHED_PROTOCOL_ERROR_EVENT
)
102 if (global_options
.is_verbose
)
104 struct sockaddr_in sin
;
105 socklen_t addrlen
= sizeof(sin
);
107 if (getsockname(fd
, (struct sockaddr
*)&sin
, &addrlen
) != -1)
109 std::cout
<< __FILE__
<< ":" << __LINE__
110 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
111 << " " << inet_ntoa(sin
.sin_addr
) << ":" << sin
.sin_port
117 std::cout
<< __FILE__
<< ":" << __LINE__
<< "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl
;
121 memcached_protocol_client_destroy(c
);
127 if (events
& MEMCACHED_PROTOCOL_WRITE_EVENT
)
132 if (events
& MEMCACHED_PROTOCOL_READ_EVENT
)
137 event_set(&client
->event
, (intptr_t)fd
, flags
, drive_client
, client
);
138 event_base_set(event_base
, &client
->event
);
140 if (event_add(&client
->event
, 0) == -1)
142 memcached_protocol_client_destroy(c
);
149 * Callback for accepting new connections
150 * @param fd the socket for the server socket
151 * @param which identifying the event that occurred (not used)
152 * @param arg the connection structure for the server
154 static void accept_handler(memcached_socket_t fd
, short, void *arg
)
156 struct connection
*server
= (struct connection
*)arg
;
157 /* accept new client */
158 struct sockaddr_storage addr
;
159 socklen_t addrlen
= sizeof(addr
);
160 memcached_socket_t sock
= accept(fd
, (struct sockaddr
*)&addr
, &addrlen
);
162 if (sock
== INVALID_SOCKET
)
164 perror("Failed to accept client");
168 if (sock
>= maxconns
)
175 struct memcached_protocol_client_st
* c
= memcached_protocol_create_client((memcached_protocol_st
*)server
->userdata
, sock
);
182 memcached_protocol_client_set_verbose(c
, global_options
.is_verbose
);
183 struct connection
*client
= &socket_userdata_map
[sock
];
186 event_set(&client
->event
, (intptr_t)sock
, EV_READ
, drive_client
, client
);
187 event_base_set(event_base
, &client
->event
);
188 if (event_add(&client
->event
, 0) == -1)
190 std::cerr
<< "Failed to add event for " << sock
<< std::endl
;
191 memcached_protocol_client_destroy(c
);
197 static bool server_socket(util::log_info_st
& log_file
, const std::string
& service
)
200 struct addrinfo hints
;
201 memset(&hints
, 0, sizeof(struct addrinfo
));
203 hints
.ai_flags
= AI_PASSIVE
;
204 hints
.ai_family
= AF_UNSPEC
;
205 hints
.ai_socktype
= SOCK_STREAM
;
207 int error
= getaddrinfo("127.0.0.1", service
.c_str(), &hints
, &ai
);
210 if (error
!= EAI_SYSTEM
)
212 std::string
buffer("getaddrinfo: ");
213 buffer
+= gai_strerror(error
);
214 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
218 std::string
buffer("getaddrinfo: ");
219 buffer
+= strerror(errno
);
220 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
226 struct linger ling
= {0, 0};
228 for (struct addrinfo
*next
= ai
; next
; next
= next
->ai_next
)
230 memcached_socket_t sock
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
);
231 if (sock
== INVALID_SOCKET
)
233 std::string
buffer("Failed to create socket: ");
234 buffer
+= strerror(errno
);
235 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
242 if (ioctlsocket(sock
, FIONBIO
, &arg
) == SOCKET_ERROR
)
244 std::cerr
<< "Failed to set nonblocking io: " << strerror(errno
) << std::endl
;
249 flags
= fcntl(sock
, F_GETFL
, 0);
252 std::string
buffer("Failed to get socket flags: ");
253 buffer
+= strerror(errno
);
254 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
259 if ((flags
& O_NONBLOCK
) != O_NONBLOCK
)
261 if (fcntl(sock
, F_SETFL
, flags
| O_NONBLOCK
) == -1)
263 std::string
buffer("Failed to set socket to nonblocking mode: ");
264 buffer
+= strerror(errno
);
265 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
273 if (setsockopt(sock
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
)) != 0)
275 std::cerr
<< "Failed to set SO_REUSEADDR: " << strerror(errno
) << std::endl
;
278 if (setsockopt(sock
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
)) != 0)
280 std::cerr
<< "Failed to set SO_KEEPALIVE: " << strerror(errno
) << std::endl
;
283 if (setsockopt(sock
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
)) != 0)
285 std::cerr
<< "Failed to set SO_LINGER: " << strerror(errno
) << std::endl
;
288 if (setsockopt(sock
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
, sizeof(flags
)) != 0)
290 std::cerr
<< "Failed to set TCP_NODELAY: " << strerror(errno
) << std::endl
;
293 if (bind(sock
, next
->ai_addr
, next
->ai_addrlen
) == SOCKET_ERROR
)
295 if (get_socket_errno() != EADDRINUSE
)
297 std::cerr
<< "bind(): " << strerror(errno
) << std::endl
;
304 if (listen(sock
, 1024) == SOCKET_ERROR
)
306 std::string
buffer("listen(): ");
307 buffer
+= strerror(errno
);
308 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
313 if (global_options
.is_verbose
)
315 std::string
buffer("Listening to: ");
316 buffer
+= global_options
.service
;
317 log_file
.write(util::VERBOSE_NOTICE
, buffer
.c_str());
320 server_sockets
[num_server_sockets
++]= sock
;
325 return (num_server_sockets
> 0) ? true : false;
329 * Convert a command code to a textual string
330 * @param cmd the comcode to convert
331 * @return a textual string with the command or NULL for unknown commands
333 static const char* comcode2str(uint8_t cmd
)
335 static const char * const text
[] = {
336 "GET", "SET", "ADD", "REPLACE", "DELETE",
337 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
338 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
339 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
340 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
341 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
344 if (cmd
<= PROTOCOL_BINARY_CMD_PREPENDQ
)
353 * Print out the command we are about to execute
355 static void pre_execute(const void *cookie
,
356 protocol_binary_request_header
*header
)
358 if (global_options
.is_verbose
)
362 const char *cmd
= comcode2str(header
->request
.opcode
);
365 std::cout
<< "pre_execute from " << cookie
<< ": " << cmd
<< std::endl
;
369 std::cout
<< "pre_execute from " << cookie
<< ": " << header
->request
.opcode
<< std::endl
;
374 std::cout
<< "pre_execute from " << cookie
<< std::endl
;
380 * Print out the command we just executed
382 static void post_execute(const void *cookie
,
383 protocol_binary_request_header
*header
)
385 if (global_options
.is_verbose
)
389 const char *cmd
= comcode2str(header
->request
.opcode
);
392 std::cout
<< "post_execute from " << cookie
<< ": " << cmd
<< std::endl
;
396 std::cout
<< "post_execute from " << cookie
<< ": " << header
->request
.opcode
<< std::endl
;
401 std::cout
<< "post_execute from " << cookie
<< std::endl
;
407 * Callback handler for all unknown commands.
408 * Send an unknown command back to the client
410 static protocol_binary_response_status
unknown(const void *cookie
,
411 protocol_binary_request_header
*header
,
412 memcached_binary_protocol_raw_response_handler response_handler
)
414 protocol_binary_response_no_extras response
;
415 memset(&response
, 0, sizeof(protocol_binary_response_no_extras
));
417 response
.message
.header
.response
.magic
= PROTOCOL_BINARY_RES
;
418 response
.message
.header
.response
.opcode
= header
->request
.opcode
;
419 response
.message
.header
.response
.status
= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
);
420 response
.message
.header
.response
.opaque
= header
->request
.opaque
;
422 return response_handler(cookie
, header
, (protocol_binary_response_header
*)&response
);
426 * Program entry point. Bind to the specified port(s) and serve clients
428 * @param argc number of items in the argument vector
429 * @param argv argument vector
430 * @return EXIT_SUCCESS on success, 1 otherwise
432 int main(int argc
, char **argv
)
434 memcached_binary_protocol_callback_st
*interface
= &interface_v0_impl
;
437 * We need to initialize the handlers manually due to a bug in the
438 * warnings generated by struct initialization in gcc (all the way up to 4.4)
440 initialize_interface_v0_handler();
441 initialize_interface_v1_handler();
448 OPT_PROTOCOL_VERSION
,
456 static struct option long_options
[]=
458 { "help", no_argument
, NULL
, OPT_HELP
},
459 { "port", required_argument
, NULL
, OPT_PORT
},
460 { "verbose", no_argument
, NULL
, OPT_VERBOSE
},
461 { "daemon", no_argument
, NULL
, OPT_DAEMON
},
462 { "protocol", no_argument
, NULL
, OPT_PROTOCOL_VERSION
},
463 { "version", no_argument
, NULL
, OPT_VERSION
},
464 { "max-connections", required_argument
, NULL
, OPT_MAX_CONNECTIONS
},
465 { "pid-file", required_argument
, NULL
, OPT_PIDFILE
},
466 { "log-file", required_argument
, NULL
, OPT_LOGFILE
},
470 bool opt_help
= false;
473 while (done
== false)
475 switch (getopt_long(argc
, argv
, "", long_options
, &option_index
))
481 case OPT_PROTOCOL_VERSION
:
482 interface
= &interface_v1_impl
;
486 global_options
.pid_file
= optarg
;
490 global_options
.log_file
= optarg
;
494 global_options
.is_verbose
= true;
501 global_options
.opt_daemon
= true;
505 global_options
.service
= optarg
;
508 case OPT_MAX_CONNECTIONS
:
509 maxconns
= atoi(optarg
);
512 case OPT_HELP
: /* FALLTHROUGH */
518 std::cerr
<< "Unknown option: " << optarg
<< std::endl
;
526 std::cout
<< "Usage: " << argv
[0] << std::endl
;
527 for (struct option
*ptr_option
= long_options
; ptr_option
->name
; ptr_option
++)
529 std::cout
<< "\t" << ptr_option
->name
<< std::endl
;
535 if (global_options
.opt_daemon
)
537 util::daemonize(false, true);
540 if (initialize_storage() == false)
542 /* Error message already printed */
546 util::Pidfile
_pid_file(global_options
.pid_file
);
548 if (_pid_file
.create() == false)
550 std::cerr
<< "Failed to create pid-file" << _pid_file
.error_message() << std::endl
;
554 util::log_info_st
log_file(argv
[0], global_options
.log_file
, false);
555 log_file
.write(util::VERBOSE_NOTICE
, "starting log");
557 if (server_socket(log_file
, global_options
.service
) == false)
562 if (num_server_sockets
== 0)
564 log_file
.write(util::VERBOSE_ERROR
, "No server sockets are available.");
569 * Create and initialize the handles to the protocol handlers. I want
570 * to be able to trace the traffic throught the pre/post handlers, and
571 * set up a common handler for unknown messages
573 interface
->pre_execute
= pre_execute
;
574 interface
->post_execute
= post_execute
;
575 interface
->unknown
= unknown
;
577 struct memcached_protocol_st
*protocol_handle
;
578 if ((protocol_handle
= memcached_protocol_create_instance()) == NULL
)
580 log_file
.write(util::VERBOSE_ERROR
, "No server sockets are available.");
584 socket_userdata_map
= (struct connection
*)calloc((size_t)(maxconns
), sizeof(struct connection
));
585 if (socket_userdata_map
== NULL
)
587 log_file
.write(util::VERBOSE_ERROR
, "Failed to allocate room for connections");
591 memcached_binary_protocol_set_callbacks(protocol_handle
, interface
);
592 memcached_binary_protocol_set_pedantic(protocol_handle
, true);
594 event_base
= event_init();
595 if (event_base
== NULL
)
597 std::cerr
<< "Failed to create an instance of libevent" << std::endl
;
601 for (int xx
= 0; xx
< num_server_sockets
; ++xx
)
603 struct connection
*conn
= &socket_userdata_map
[server_sockets
[xx
]];
604 conn
->userdata
= protocol_handle
;
606 event_set(&conn
->event
, (intptr_t)server_sockets
[xx
], EV_READ
| EV_PERSIST
, accept_handler
, conn
);
608 event_base_set(event_base
, &conn
->event
);
609 if (event_add(&conn
->event
, 0) == -1)
611 log_file
.write(util::VERBOSE_ERROR
, "Failed to add event");
612 closesocket(server_sockets
[xx
]);
616 if (global_options
.opt_daemon
)
618 if (util::daemon_is_ready(true) == false)
620 log_file
.write(util::VERBOSE_ERROR
, "Failed for util::daemon_is_ready()");
626 /* Serve all of the clients */
627 switch (event_base_loop(event_base
, 0))
630 log_file
.write(util::VERBOSE_ERROR
, "event_base_loop() failed");
634 log_file
.write(util::VERBOSE_ERROR
, "event_base_loop(), no events were registered");
640 log_file
.write(util::VERBOSE_NOTICE
, "exiting");