From 8aedd0b8357139896851dee23565ffc3757f1253 Mon Sep 17 00:00:00 2001 From: Trond Norbye Date: Wed, 14 Apr 2010 15:32:40 -0700 Subject: [PATCH] Use libevent for the eventloop in memcached_light --- example/include.am | 5 +- example/memcached_light.c | 242 ++++++++++++++++++++++---------------- 2 files changed, 143 insertions(+), 104 deletions(-) diff --git a/example/include.am b/example/include.am index daae642b..e2ac51e4 100644 --- a/example/include.am +++ b/example/include.am @@ -2,7 +2,9 @@ # included from Top Level Makefile.am # All paths should be given relative to the root +if HAVE_LIBEVENT noinst_PROGRAMS += example/memcached_light +endif example_memcached_light_SOURCES= \ example/interface_v0.c \ @@ -11,7 +13,8 @@ example_memcached_light_SOURCES= \ example/memcached_light.h \ example/storage.h -example_memcached_light_LDADD= libmemcached/libmemcachedprotocol.la $(LIBINNODB) +example_memcached_light_LDADD= libmemcached/libmemcachedprotocol.la \ + $(LIBINNODB) $(LTLIBEVENT) if BUILD_BYTEORDER example_memcached_light_LDADD+= libmemcached/libbyteorder.la diff --git a/example/memcached_light.c b/example/memcached_light.c index 4a53217a..489ec1e4 100644 --- a/example/memcached_light.c +++ b/example/memcached_light.c @@ -35,7 +35,7 @@ #include #include #include -#include +#include #include #include @@ -47,8 +47,19 @@ extern memcached_binary_protocol_callback_st interface_v1_impl; static int server_sockets[1024]; static int num_server_sockets= 0; -static void* socket_userdata_map[1024]; + +struct connection +{ + void *userdata; + struct event event; +}; + +/* The default maximum number of connections... (change with -c) */ +static int maxconns = 1024; + +static struct connection *socket_userdata_map; static bool verbose= false; +static struct event_base *event_base; struct options_st { char *pid_file; @@ -58,6 +69,99 @@ struct options_st { typedef struct options_st options_st; +/** + * Callback for driving a client connection + * @param fd the socket for the client socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the client + */ +static void drive_client(int fd, short which, void *arg) +{ + (void)which; + struct connection *client= arg; + struct memcached_protocol_client_st* c= client->userdata; + assert(c != NULL); + + memcached_protocol_event_t events= memcached_protocol_client_work(c); + if (events & MEMCACHED_PROTOCOL_ERROR_EVENT) + { + memcached_protocol_client_destroy(c); + (void)close(fd); + } else { + short flags = 0; + if (events & MEMCACHED_PROTOCOL_WRITE_EVENT) + { + flags= EV_WRITE; + } + + if (events & MEMCACHED_PROTOCOL_READ_EVENT) + { + flags|= EV_READ; + } + + event_set(&client->event, fd, flags, drive_client, client); + event_base_set(event_base, &client->event); + + if (event_add(&client->event, 0) == -1) + { + (void)fprintf(stderr, "Failed to add event for %d\n", fd); + memcached_protocol_client_destroy(c); + (void)close(fd); + } + } +} + +/** + * Callback for accepting new connections + * @param fd the socket for the server socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the server + */ +static void accept_handler(int fd, short which, void *arg) +{ + (void)which; + struct connection *server= arg; + /* accept new client */ + struct sockaddr_storage addr; + socklen_t addrlen= sizeof(addr); + int sock= accept(fd, (struct sockaddr *)&addr, &addrlen); + + if (sock == -1) + { + perror("Failed to accept client"); + return ; + } + + if (sock >= maxconns) + { + (void)fprintf(stderr, "Client outside socket range (specified with -c)\n"); + (void)close(sock); + return ; + } + + struct memcached_protocol_client_st* c; + c= memcached_protocol_create_client(server->userdata, sock); + if (c == NULL) + { + (void)fprintf(stderr, "Failed to create client\n"); + (void)close(sock); + } + else + { + struct connection *client = &socket_userdata_map[sock]; + client->userdata= c; + + event_set(&client->event, sock, EV_READ, drive_client, client); + event_base_set(event_base, &client->event); + if (event_add(&client->event, 0) == -1) + { + (void)fprintf(stderr, "Failed to add event for %d\n", sock); + memcached_protocol_client_destroy(c); + (void)close(sock); + } + } +} + /** * Create a socket and bind it to a specific port number * @param port the port number to bind to @@ -224,8 +328,6 @@ static protocol_binary_response_status unknown(const void *cookie, return response_handler(cookie, header, (void*)&response); } -static void work(void); - /** * Program entry point. Bind to the specified port(s) and serve clients * @@ -240,13 +342,20 @@ int main(int argc, char **argv) memset(&global_options, 0, sizeof(global_options)); + event_base= event_init(); + if (event_base == NULL) + { + fprintf(stderr, "Failed to create an instance of libevent\n"); + return 1; + } + /* * We need to initialize the handlers manually due to a bug in the * warnings generated by struct initialization in gcc (all the way up to 4.4) */ initialize_interface_v0_handler(); - while ((cmd= getopt(argc, argv, "v1pP:?h")) != EOF) + while ((cmd= getopt(argc, argv, "v1pP:?hc:")) != EOF) { switch (cmd) { case '1': @@ -262,10 +371,14 @@ int main(int argc, char **argv) case 'v': verbose= true; break; + case 'c': + maxconns= atoi(optarg); + break; case 'h': /* FALLTHROUGH */ case '?': /* FALLTHROUGH */ default: - (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]); + (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n", + argv[0]); return 1; } } @@ -319,110 +432,33 @@ int main(int argc, char **argv) return 1; } + socket_userdata_map= calloc(maxconns, sizeof(struct connection)); + if (socket_userdata_map == NULL) + { + fprintf(stderr, "Failed to allocate room for connections\n"); + return 1; + } + memcached_binary_protocol_set_callbacks(protocol_handle, interface); memcached_binary_protocol_set_pedantic(protocol_handle, true); for (int xx= 0; xx < num_server_sockets; ++xx) - socket_userdata_map[server_sockets[xx]]= protocol_handle; + { + struct connection *conn= &socket_userdata_map[server_sockets[xx]]; + conn->userdata= protocol_handle; + event_set(&conn->event, server_sockets[xx], EV_READ | EV_PERSIST, + accept_handler, conn); + event_base_set(event_base, &conn->event); + if (event_add(&conn->event, 0) == -1) + { + fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]); + close(server_sockets[xx]); + } + } /* Serve all of the clients */ - work(); + event_base_loop(event_base, 0); /* NOTREACHED */ return 0; } - -static void work(void) -{ -#define MAX_SERVERS_TO_POLL 100 - struct pollfd fds[MAX_SERVERS_TO_POLL]; - int max_poll; - - for (max_poll= 0; max_poll < num_server_sockets; ++max_poll) - { - fds[max_poll].events= POLLIN; - fds[max_poll].revents= 0; - fds[max_poll].fd= server_sockets[max_poll]; - } - - while (true) - { - int err= poll(fds, (nfds_t)max_poll, -1); - - if (err == 0 || (err == -1 && errno != EINTR)) - { - perror("poll() failed"); - abort(); - } - - /* find the available filedescriptors */ - for (int x= max_poll - 1; x > -1 && err > 0; --x) - { - if (fds[x].revents != 0) - { - --err; - if (x < num_server_sockets) - { - /* accept new client */ - struct sockaddr_storage addr; - socklen_t addrlen= sizeof(addr); - int sock= accept(fds[x].fd, (struct sockaddr *)&addr, - &addrlen); - - if (sock == -1) - { - perror("Failed to accept client"); - continue; - } - - struct memcached_protocol_st *protocol; - protocol= socket_userdata_map[fds[x].fd]; - - struct memcached_protocol_client_st* c; - c= memcached_protocol_create_client(protocol, sock); - if (c == NULL) - { - fprintf(stderr, "Failed to create client\n"); - close(sock); - } - else - { - socket_userdata_map[sock]= c; - fds[max_poll].events= POLLIN; - fds[max_poll].revents= 0; - fds[max_poll].fd= sock; - ++max_poll; - } - } - else - { - /* drive the client */ - struct memcached_protocol_client_st* c; - c= socket_userdata_map[fds[x].fd]; - assert(c != NULL); - fds[max_poll].events= 0; - - memcached_protocol_event_t events= memcached_protocol_client_work(c); - if (events & MEMCACHED_PROTOCOL_WRITE_EVENT) - fds[max_poll].events= POLLOUT; - - if (events & MEMCACHED_PROTOCOL_READ_EVENT) - fds[max_poll].events= POLLIN; - - if (!(events & MEMCACHED_PROTOCOL_PAUSE_EVENT || - fds[max_poll].events != 0)) - { - memcached_protocol_client_destroy(c); - close(fds[x].fd); - fds[x].events= 0; - - if (x != max_poll - 1) - memmove(fds + x, fds + x + 1, (size_t)(max_poll - x)); - - --max_poll; - } - } - } - } - } -} -- 2.30.2