1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.cc - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.cc - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.cc- This file sets up all of the sockets and run the main
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
28 #include <libmemcachedprotocol-0.0/handler.h>
29 #include <libmemcached/socket.hpp>
30 #include <example/byteorder.h>
31 #include "example/storage.h"
32 #include "example/memcached_light.h"
34 #include "util/daemon.hpp"
35 #include "util/log.hpp"
36 #include "util/pidfile.hpp"
38 using namespace datadifferential
;
50 #include <sys/types.h>
53 extern memcached_binary_protocol_callback_st interface_v0_impl
;
54 extern memcached_binary_protocol_callback_st interface_v1_impl
;
56 static memcached_socket_t server_sockets
[1024];
57 static int num_server_sockets
= 0;
65 /* The default maximum number of connections... (change with -c) */
66 static int maxconns
= 1024;
68 static struct connection
*socket_userdata_map
;
69 static struct event_base
*event_base
= NULL
;
85 static options_st global_options
;
88 * Callback for driving a client connection
89 * @param fd the socket for the client socket
90 * @param which identifying the event that occurred (not used)
91 * @param arg the connection structure for the client
93 static void drive_client(memcached_socket_t fd
, short, void *arg
)
95 struct connection
*client
= (struct connection
*)arg
;
96 struct memcached_protocol_client_st
* c
= (struct memcached_protocol_client_st
*)client
->userdata
;
99 memcached_protocol_event_t events
= memcached_protocol_client_work(c
);
100 if (events
& MEMCACHED_PROTOCOL_ERROR_EVENT
)
102 if (global_options
.is_verbose
)
104 struct sockaddr_in sin
;
105 socklen_t addrlen
= sizeof(sin
);
107 if (getsockname(fd
, (struct sockaddr
*)&sin
, &addrlen
) != -1)
109 std::cout
<< __FILE__
<< ":" << __LINE__
110 << " close(MEMCACHED_PROTOCOL_ERROR_EVENT)"
111 << " " << inet_ntoa(sin
.sin_addr
) << ":" << sin
.sin_port
117 std::cout
<< __FILE__
<< ":" << __LINE__
<< "close() MEMCACHED_PROTOCOL_ERROR_EVENT" << std::endl
;
121 memcached_protocol_client_destroy(c
);
127 if (events
& MEMCACHED_PROTOCOL_WRITE_EVENT
)
132 if (events
& MEMCACHED_PROTOCOL_READ_EVENT
)
137 event_set(&client
->event
, (intptr_t)fd
, flags
, drive_client
, client
);
138 event_base_set(event_base
, &client
->event
);
140 if (event_add(&client
->event
, 0) == -1)
142 memcached_protocol_client_destroy(c
);
149 * Callback for accepting new connections
150 * @param fd the socket for the server socket
151 * @param which identifying the event that occurred (not used)
152 * @param arg the connection structure for the server
154 static void accept_handler(memcached_socket_t fd
, short, void *arg
)
156 struct connection
*server
= (struct connection
*)arg
;
157 /* accept new client */
158 struct sockaddr_storage addr
;
159 socklen_t addrlen
= sizeof(addr
);
160 memcached_socket_t sock
= accept(fd
, (struct sockaddr
*)&addr
, &addrlen
);
162 if (sock
== INVALID_SOCKET
)
164 perror("Failed to accept client");
168 if (sock
>= maxconns
)
175 struct memcached_protocol_client_st
* c
= memcached_protocol_create_client((memcached_protocol_st
*)server
->userdata
, sock
);
182 memcached_protocol_client_set_verbose(c
, global_options
.is_verbose
);
183 struct connection
*client
= &socket_userdata_map
[sock
];
186 event_set(&client
->event
, (intptr_t)sock
, EV_READ
, drive_client
, client
);
187 event_base_set(event_base
, &client
->event
);
188 if (event_add(&client
->event
, 0) == -1)
190 std::cerr
<< "Failed to add event for " << sock
<< std::endl
;
191 memcached_protocol_client_destroy(c
);
197 static bool server_socket(util::log_info_st
& log_file
, const std::string
& service
)
200 struct addrinfo hints
;
201 memset(&hints
, 0, sizeof(struct addrinfo
));
203 hints
.ai_flags
= AI_PASSIVE
;
204 hints
.ai_family
= AF_UNSPEC
;
205 hints
.ai_socktype
= SOCK_STREAM
;
207 int error
= getaddrinfo("127.0.0.1", service
.c_str(), &hints
, &ai
);
210 if (error
!= EAI_SYSTEM
)
212 std::string
buffer("getaddrinfo: ");
213 buffer
+= gai_strerror(error
);
214 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
218 std::string
buffer("getaddrinfo: ");
219 buffer
+= strerror(errno
);
220 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
226 struct linger ling
= {0, 0};
228 for (struct addrinfo
*next
= ai
; next
; next
= next
->ai_next
)
230 memcached_socket_t sock
= socket(ai
->ai_family
, ai
->ai_socktype
, ai
->ai_protocol
);
231 if (sock
== INVALID_SOCKET
)
233 std::string
buffer("Failed to create socket: ");
234 buffer
+= strerror(errno
);
235 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
242 if (ioctlsocket(sock
, FIONBIO
, &arg
) == SOCKET_ERROR
)
244 std::cerr
<< "Failed to set nonblocking io: " << strerror(errno
) << std::endl
;
249 flags
= fcntl(sock
, F_GETFL
, 0);
252 std::string
buffer("Failed to get socket flags: ");
253 buffer
+= strerror(errno
);
254 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
259 if ((flags
& O_NONBLOCK
) != O_NONBLOCK
)
261 if (fcntl(sock
, F_SETFL
, flags
| O_NONBLOCK
) == -1)
263 std::string
buffer("Failed to set socket to nonblocking mode: ");
264 buffer
+= strerror(errno
);
265 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
273 if (setsockopt(sock
, SOL_SOCKET
, SO_REUSEADDR
, (void *)&flags
, sizeof(flags
)) != 0)
275 std::cerr
<< "Failed to set SO_REUSEADDR: " << strerror(errno
) << std::endl
;
278 if (setsockopt(sock
, SOL_SOCKET
, SO_KEEPALIVE
, (void *)&flags
, sizeof(flags
)) != 0)
280 std::cerr
<< "Failed to set SO_KEEPALIVE: " << strerror(errno
) << std::endl
;
283 if (setsockopt(sock
, SOL_SOCKET
, SO_LINGER
, (void *)&ling
, sizeof(ling
)) != 0)
285 std::cerr
<< "Failed to set SO_LINGER: " << strerror(errno
) << std::endl
;
288 if (setsockopt(sock
, IPPROTO_TCP
, TCP_NODELAY
, (void *)&flags
, sizeof(flags
)) != 0)
290 std::cerr
<< "Failed to set TCP_NODELAY: " << strerror(errno
) << std::endl
;
293 if (bind(sock
, next
->ai_addr
, next
->ai_addrlen
) == SOCKET_ERROR
)
295 if (get_socket_errno() != EADDRINUSE
)
297 std::cerr
<< "bind(): " << strerror(errno
) << std::endl
;
304 if (listen(sock
, 1024) == SOCKET_ERROR
)
306 std::string
buffer("listen(): ");
307 buffer
+= strerror(errno
);
308 log_file
.write(util::VERBOSE_ERROR
, buffer
.c_str());
313 if (global_options
.is_verbose
)
315 std::string
buffer("Listening to: ");
316 buffer
+= global_options
.service
;
317 log_file
.write(util::VERBOSE_NOTICE
, buffer
.c_str());
320 server_sockets
[num_server_sockets
++]= sock
;
325 return (num_server_sockets
> 0) ? true : false;
329 * Convert a command code to a textual string
330 * @param cmd the comcode to convert
331 * @return a textual string with the command or NULL for unknown commands
333 static const char* comcode2str(uint8_t cmd
)
335 static const char * const text
[] = {
336 "GET", "SET", "ADD", "REPLACE", "DELETE",
337 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
338 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
339 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
340 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
341 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
344 if (cmd
<= PROTOCOL_BINARY_CMD_PREPENDQ
)
353 * Print out the command we are about to execute
355 static void pre_execute(const void *cookie
,
356 protocol_binary_request_header
*header
)
358 if (global_options
.is_verbose
)
362 const char *cmd
= comcode2str(header
->request
.opcode
);
365 std::cout
<< "pre_execute from " << cookie
<< ": " << cmd
<< std::endl
;
369 std::cout
<< "pre_execute from " << cookie
<< ": " << header
->request
.opcode
<< std::endl
;
374 std::cout
<< "pre_execute from " << cookie
<< std::endl
;
380 * Print out the command we just executed
382 static void post_execute(const void *cookie
,
383 protocol_binary_request_header
*header
)
385 if (global_options
.is_verbose
)
389 const char *cmd
= comcode2str(header
->request
.opcode
);
392 std::cout
<< "post_execute from " << cookie
<< ": " << cmd
<< std::endl
;
396 std::cout
<< "post_execute from " << cookie
<< ": " << header
->request
.opcode
<< std::endl
;
401 std::cout
<< "post_execute from " << cookie
<< std::endl
;
407 * Callback handler for all unknown commands.
408 * Send an unknown command back to the client
410 static protocol_binary_response_status
unknown(const void *cookie
,
411 protocol_binary_request_header
*header
,
412 memcached_binary_protocol_raw_response_handler response_handler
)
414 protocol_binary_response_no_extras response
;
415 memset(&response
, 0, sizeof(protocol_binary_response_no_extras
));
417 response
.message
.header
.response
.magic
= PROTOCOL_BINARY_RES
;
418 response
.message
.header
.response
.opcode
= header
->request
.opcode
;
419 response
.message
.header
.response
.status
= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND
);
420 response
.message
.header
.response
.opaque
= header
->request
.opaque
;
422 return response_handler(cookie
, header
, (protocol_binary_response_header
*)&response
);
426 * Program entry point. Bind to the specified port(s) and serve clients
428 * @param argc number of items in the argument vector
429 * @param argv argument vector
430 * @return EXIT_SUCCESS on success, 1 otherwise
432 int main(int argc
, char **argv
)
434 memcached_binary_protocol_callback_st
*interface
= &interface_v0_impl
;
441 OPT_PROTOCOL_VERSION
,
449 static struct option long_options
[]=
451 { "help", no_argument
, NULL
, OPT_HELP
},
452 { "port", required_argument
, NULL
, OPT_PORT
},
453 { "verbose", no_argument
, NULL
, OPT_VERBOSE
},
454 { "daemon", no_argument
, NULL
, OPT_DAEMON
},
455 { "protocol", no_argument
, NULL
, OPT_PROTOCOL_VERSION
},
456 { "version", no_argument
, NULL
, OPT_VERSION
},
457 { "max-connections", required_argument
, NULL
, OPT_MAX_CONNECTIONS
},
458 { "pid-file", required_argument
, NULL
, OPT_PIDFILE
},
459 { "log-file", required_argument
, NULL
, OPT_LOGFILE
},
463 bool opt_help
= false;
466 while (done
== false)
468 switch (getopt_long(argc
, argv
, "", long_options
, &option_index
))
474 case OPT_PROTOCOL_VERSION
:
475 interface
= &interface_v1_impl
;
479 global_options
.pid_file
= optarg
;
483 global_options
.log_file
= optarg
;
487 global_options
.is_verbose
= true;
494 global_options
.opt_daemon
= true;
498 global_options
.service
= optarg
;
501 case OPT_MAX_CONNECTIONS
:
502 maxconns
= atoi(optarg
);
505 case OPT_HELP
: /* FALLTHROUGH */
511 std::cerr
<< "Unknown option: " << optarg
<< std::endl
;
519 std::cout
<< "Usage: " << argv
[0] << std::endl
;
520 for (struct option
*ptr_option
= long_options
; ptr_option
->name
; ptr_option
++)
522 std::cout
<< "\t" << ptr_option
->name
<< std::endl
;
528 if (global_options
.opt_daemon
)
530 util::daemonize(false, true);
533 if (initialize_storage() == false)
535 /* Error message already printed */
539 util::Pidfile
_pid_file(global_options
.pid_file
);
541 if (_pid_file
.create() == false)
543 std::cerr
<< "Failed to create pid-file" << _pid_file
.error_message() << std::endl
;
547 util::log_info_st
log_file(argv
[0], global_options
.log_file
, false);
548 log_file
.write(util::VERBOSE_NOTICE
, "starting log");
551 * We need to initialize the handlers manually due to a bug in the
552 * warnings generated by struct initialization in gcc (all the way up to 4.4)
554 initialize_interface_v0_handler(log_file
);
555 initialize_interface_v1_handler(log_file
);
558 if (server_socket(log_file
, global_options
.service
) == false)
563 if (num_server_sockets
== 0)
565 log_file
.write(util::VERBOSE_ERROR
, "No server sockets are available.");
570 * Create and initialize the handles to the protocol handlers. I want
571 * to be able to trace the traffic throught the pre/post handlers, and
572 * set up a common handler for unknown messages
574 interface
->pre_execute
= pre_execute
;
575 interface
->post_execute
= post_execute
;
576 interface
->unknown
= unknown
;
578 struct memcached_protocol_st
*protocol_handle
;
579 if ((protocol_handle
= memcached_protocol_create_instance()) == NULL
)
581 log_file
.write(util::VERBOSE_ERROR
, "No server sockets are available.");
585 socket_userdata_map
= (struct connection
*)calloc((size_t)(maxconns
), sizeof(struct connection
));
586 if (socket_userdata_map
== NULL
)
588 log_file
.write(util::VERBOSE_ERROR
, "Failed to allocate room for connections");
592 memcached_binary_protocol_set_callbacks(protocol_handle
, interface
);
593 memcached_binary_protocol_set_pedantic(protocol_handle
, true);
595 event_base
= event_init();
596 if (event_base
== NULL
)
598 std::cerr
<< "Failed to create an instance of libevent" << std::endl
;
602 for (int xx
= 0; xx
< num_server_sockets
; ++xx
)
604 struct connection
*conn
= &socket_userdata_map
[server_sockets
[xx
]];
605 conn
->userdata
= protocol_handle
;
607 event_set(&conn
->event
, (intptr_t)server_sockets
[xx
], EV_READ
| EV_PERSIST
, accept_handler
, conn
);
609 event_base_set(event_base
, &conn
->event
);
610 if (event_add(&conn
->event
, 0) == -1)
612 log_file
.write(util::VERBOSE_ERROR
, "Failed to add event");
613 closesocket(server_sockets
[xx
]);
617 if (global_options
.opt_daemon
)
619 if (util::daemon_is_ready(true) == false)
621 log_file
.write(util::VERBOSE_ERROR
, "Failed for util::daemon_is_ready()");
627 /* Serve all of the clients */
628 switch (event_base_loop(event_base
, 0))
631 log_file
.write(util::VERBOSE_ERROR
, "event_base_loop() failed");
635 log_file
.write(util::VERBOSE_ERROR
, "event_base_loop(), no events were registered");
641 log_file
.write(util::VERBOSE_NOTICE
, "exiting");