* the more "logical" interface.
* memcached_light.c - This file sets up all of the sockets and run the main
* message loop.
+ *
+ *
+ * config.h is included so that I can use the ntohll/htonll on platforms that
+ * doesn't have that (this is a private function inside libmemcached, so you
+ * cannot use it directly from libmemcached without special modifications to
+ * the library)
*/
+#include "config.h"
#include <assert.h>
#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/tcp.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdlib.h>
#include <string.h>
-#include <poll.h>
+#include <event.h>
-#include <libmemcached/protocol_handler.h>
-#include "common.h"
-#include "storage.h"
+#include <libmemcachedprotocol-0.0/handler.h>
+#include <libmemcached/socket.hpp>
+#include <example/byteorder.h>
+#include "example/storage.h"
+#include "example/memcached_light.h"
-extern struct memcached_binary_protocol_callback_st interface_v0_impl;
-extern struct memcached_binary_protocol_callback_st interface_v1_impl;
+extern memcached_binary_protocol_callback_st interface_v0_impl;
+extern memcached_binary_protocol_callback_st interface_v1_impl;
-static int server_sockets[1024];
+static memcached_socket_t server_sockets[1024];
static int num_server_sockets= 0;
-static void* socket_userdata_map[1024];
+
+struct connection
+{
+ void *userdata;
+ struct event event;
+};
+
+/* The default maximum number of connections... (change with -c) */
+static int maxconns = 1024;
+
+static struct connection *socket_userdata_map;
+static bool verbose= false;
+static struct event_base *event_base;
+
+struct options_st {
+ char *pid_file;
+ bool has_port;
+ in_port_t port;
+} global_options;
+
+typedef struct options_st options_st;
+
+/**
+ * Callback for driving a client connection
+ * @param fd the socket for the client socket
+ * @param which identifying the event that occurred (not used)
+ * @param arg the connection structure for the client
+ */
+static void drive_client(memcached_socket_t fd, short which, void *arg)
+{
+ (void)which;
+ struct connection *client= arg;
+ struct memcached_protocol_client_st* c= client->userdata;
+ assert(c != NULL);
+
+ memcached_protocol_event_t events= memcached_protocol_client_work(c);
+ if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
+ {
+ memcached_protocol_client_destroy(c);
+ closesocket(fd);
+ } else {
+ short flags = 0;
+ if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
+ {
+ flags= EV_WRITE;
+ }
+
+ if (events & MEMCACHED_PROTOCOL_READ_EVENT)
+ {
+ flags|= EV_READ;
+ }
+
+ event_set(&client->event, (intptr_t)fd, flags, drive_client, client);
+ event_base_set(event_base, &client->event);
+
+ if (event_add(&client->event, 0) == -1)
+ {
+ (void)fprintf(stderr, "Failed to add event for %d\n", fd);
+ memcached_protocol_client_destroy(c);
+ closesocket(fd);
+ }
+ }
+}
+
+/**
+ * Callback for accepting new connections
+ * @param fd the socket for the server socket
+ * @param which identifying the event that occurred (not used)
+ * @param arg the connection structure for the server
+ */
+static void accept_handler(memcached_socket_t fd, short which, void *arg)
+{
+ (void)which;
+ struct connection *server= arg;
+ /* accept new client */
+ struct sockaddr_storage addr;
+ socklen_t addrlen= sizeof(addr);
+ memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
+
+ if (sock == INVALID_SOCKET)
+ {
+ perror("Failed to accept client");
+ return ;
+ }
+
+#ifndef WIN32
+ if (sock >= maxconns)
+ {
+ (void)fprintf(stderr, "Client outside socket range (specified with -c)\n");
+ closesocket(sock);
+ return ;
+ }
+#endif
+
+ struct memcached_protocol_client_st* c;
+ c= memcached_protocol_create_client(server->userdata, sock);
+ if (c == NULL)
+ {
+ (void)fprintf(stderr, "Failed to create client\n");
+ closesocket(sock);
+ }
+ else
+ {
+ struct connection *client = &socket_userdata_map[sock];
+ client->userdata= c;
+
+ event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client);
+ event_base_set(event_base, &client->event);
+ if (event_add(&client->event, 0) == -1)
+ {
+ (void)fprintf(stderr, "Failed to add event for %d\n", sock);
+ memcached_protocol_client_destroy(c);
+ closesocket(sock);
+ }
+ }
+}
/**
* Create a socket and bind it to a specific port number
* @param port the port number to bind to
*/
-static int server_socket(const char *port) {
+static int server_socket(const char *port)
+{
struct addrinfo *ai;
struct addrinfo hints= { .ai_flags= AI_PASSIVE,
.ai_family= AF_UNSPEC,
else
perror("getaddrinfo()");
- return 1;
+ return 0;
}
struct linger ling= {0, 0};
for (struct addrinfo *next= ai; next; next= next->ai_next)
{
- int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
- if (sock == -1)
+ memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+ if (sock == INVALID_SOCKET)
{
perror("Failed to create socket");
continue;
}
- int flags= fcntl(sock, F_GETFL, 0);
+ int flags;
+#ifdef WIN32
+ u_long arg = 1;
+ if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
+ {
+ perror("Failed to set nonblocking io");
+ closesocket(sock);
+ continue;
+ }
+#else
+ flags= fcntl(sock, F_GETFL, 0);
if (flags == -1)
{
perror("Failed to get socket flags");
- close(sock);
+ closesocket(sock);
continue;
}
if ((flags & O_NONBLOCK) != O_NONBLOCK)
+ {
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
{
perror("Failed to set socket to nonblocking mode");
- close(sock);
+ closesocket(sock);
continue;
}
+ }
+#endif
flags= 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
perror("Failed to set TCP_NODELAY");
- if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
+ if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
{
- if (errno != EADDRINUSE)
+ if (get_socket_errno() != EADDRINUSE)
{
perror("bind()");
freeaddrinfo(ai);
}
- close(sock);
+ closesocket(sock);
continue;
}
- if (listen(sock, 1024) == -1)
+ if (listen(sock, 1024) == SOCKET_ERROR)
{
perror("listen()");
- close(sock);
+ closesocket(sock);
continue;
}
/**
* Print out the command we are about to execute
*/
-static void pre_execute(const void *cookie, protocol_binary_request_header *header)
+static void pre_execute(const void *cookie,
+ protocol_binary_request_header *header)
{
- const char *cmd= comcode2str(header->request.opcode);
- if (cmd != NULL)
- fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
- else
- fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+ if (verbose)
+ {
+ const char *cmd= comcode2str(header->request.opcode);
+ if (cmd != NULL)
+ fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
+ else
+ fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+ }
}
/**
* Print out the command we just executed
*/
-static void post_execute(const void *cookie, protocol_binary_request_header *header)
+static void post_execute(const void *cookie,
+ protocol_binary_request_header *header)
{
- const char *cmd= comcode2str(header->request.opcode);
- if (cmd != NULL)
- fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
- else
- fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+ if (verbose)
+ {
+ const char *cmd= comcode2str(header->request.opcode);
+ if (cmd != NULL)
+ fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
+ else
+ fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
+ }
}
/**
return response_handler(cookie, header, (void*)&response);
}
-static void work(void);
-
/**
* Program entry point. Bind to the specified port(s) and serve clients
*
* @param argc number of items in the argument vector
* @param argv argument vector
- * @return 0 on success, 1 otherwise
+ * @return EXIT_SUCCESS on success, 1 otherwise
*/
int main(int argc, char **argv)
{
- bool port_specified= false;
int cmd;
- struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
+ memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
+
+ memset(&global_options, 0, sizeof(global_options));
+
+ event_base= event_init();
+ if (event_base == NULL)
+ {
+ fprintf(stderr, "Failed to create an instance of libevent\n");
+ return EXIT_FAILURE;
+ }
+
+ /*
+ * We need to initialize the handlers manually due to a bug in the
+ * warnings generated by struct initialization in gcc (all the way up to 4.4)
+ */
+ initialize_interface_v0_handler();
- while ((cmd= getopt(argc, argv, "1p:?")) != EOF)
+ while ((cmd= getopt(argc, argv, "v1p:P:?hc:")) != EOF)
{
switch (cmd) {
case '1':
interface= &interface_v1_impl;
break;
+ case 'P':
+ global_options.pid_file= strdup(optarg);
+ break;
case 'p':
- port_specified= true;
+ global_options.has_port= true;
(void)server_socket(optarg);
break;
+ case 'v':
+ verbose= true;
+ break;
+ case 'c':
+ maxconns= atoi(optarg);
+ break;
+ case 'h': /* FALLTHROUGH */
case '?': /* FALLTHROUGH */
default:
- (void)fprintf(stderr, "Usage: %s [-p port]\n", argv[0]);
- return 1;
+ (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n",
+ argv[0]);
+ return EXIT_FAILURE;
}
}
- if (!port_specified)
+ if (! initialize_storage())
+ {
+ /* Error message already printed */
+ return EXIT_FAILURE;
+ }
+
+ if (! global_options.has_port)
(void)server_socket("9999");
+ if (global_options.pid_file)
+ {
+ FILE *pid_file;
+ uint32_t pid;
+
+ pid_file= fopen(global_options.pid_file, "w+");
+
+ if (pid_file == NULL)
+ {
+ perror(strerror(get_socket_errno()));
+ abort();
+ }
+
+ pid= (uint32_t)getpid();
+ fprintf(pid_file, "%u\n", pid);
+ fclose(pid_file);
+ }
+
if (num_server_sockets == 0)
{
fprintf(stderr, "I don't have any server sockets\n");
- return 1;
+ return EXIT_FAILURE;
}
/*
interface->post_execute= post_execute;
interface->unknown= unknown;
- struct memcached_binary_protocol_st *protocol_handle;
- if ((protocol_handle= memcached_binary_protocol_create_instance()) == NULL)
+ struct memcached_protocol_st *protocol_handle;
+ if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
{
fprintf(stderr, "Failed to allocate protocol handle\n");
- return 1;
+ return EXIT_FAILURE;
+ }
+
+ socket_userdata_map= calloc((size_t)(maxconns), sizeof(struct connection));
+ if (socket_userdata_map == NULL)
+ {
+ fprintf(stderr, "Failed to allocate room for connections\n");
+ return EXIT_FAILURE;
}
memcached_binary_protocol_set_callbacks(protocol_handle, interface);
memcached_binary_protocol_set_pedantic(protocol_handle, true);
for (int xx= 0; xx < num_server_sockets; ++xx)
- socket_userdata_map[server_sockets[xx]]= protocol_handle;
-
- /* Serve all of the clients */
- work();
-
- /* NOTREACHED */
- return 0;
-}
-
-static void work(void) {
-#define MAX_SERVERS_TO_POLL 100
- struct pollfd fds[MAX_SERVERS_TO_POLL];
- int max_poll;
-
- for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
- {
- fds[max_poll].events= POLLIN;
- fds[max_poll].revents= 0;
- fds[max_poll].fd= server_sockets[max_poll];
- ++max_poll;
- }
-
- while (true)
{
- int err= poll(fds, (nfds_t)max_poll, -1);
-
- if (err == 0 || (err == -1 && errno != EINTR))
+ struct connection *conn= &socket_userdata_map[server_sockets[xx]];
+ conn->userdata= protocol_handle;
+ event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST,
+ accept_handler, conn);
+ event_base_set(event_base, &conn->event);
+ if (event_add(&conn->event, 0) == -1)
{
- perror("poll() failed");
- abort();
- }
-
- /* find the available filedescriptors */
- for (int x= max_poll - 1; x > -1 && err > 0; --x)
- {
- if (fds[x].revents != 0)
- {
- --err;
- if (x < num_server_sockets)
- {
- /* accept new client */
- struct sockaddr_storage addr;
- socklen_t addrlen= sizeof(addr);
- int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
- &addrlen);
-
- if (sock == -1)
- {
- perror("Failed to accept client");
- continue;
- }
-
- struct memcached_binary_protocol_st *protocol;
- protocol= socket_userdata_map[fds[x].fd];
-
- struct memcached_binary_protocol_client_st* c;
- c= memcached_binary_protocol_create_client(protocol, sock);
- if (c == NULL)
- {
- fprintf(stderr, "Failed to create client\n");
- close(sock);
- }
- else
- {
- socket_userdata_map[sock]= c;
- fds[max_poll].events= POLLIN;
- fds[max_poll].revents= 0;
- fds[max_poll].fd= sock;
- ++max_poll;
- }
- }
- else
- {
- /* drive the client */
- struct memcached_binary_protocol_client_st* c;
- c= socket_userdata_map[fds[x].fd];
- assert(c != NULL);
- fds[max_poll].events= 0;
-
- switch (memcached_binary_protocol_client_work(c))
- {
- case WRITE_EVENT:
- case READ_WRITE_EVENT:
- fds[max_poll].events= POLLOUT;
- /* FALLTHROUGH */
- case READ_EVENT:
- fds[max_poll].events |= POLLIN;
- break;
- case ERROR_EVENT:
- default: /* ERROR or unknown state.. close */
- memcached_binary_protocol_client_destroy(c);
- close(fds[x].fd);
- fds[x].events= 0;
-
- if (x != max_poll - 1)
- memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
-
- --max_poll;
- }
- }
- }
+ fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]);
+ closesocket(server_sockets[xx]);
}
}
+
+ /* Serve all of the clients */
+ event_base_loop(event_base, 0);
+
+ /* NOTREACHED */
+ return EXIT_SUCCESS;
}