X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=example%2Fmemcached_light.c;h=6a87ff1917ace469f334c7fc62be6139e3a3bf5d;hb=54b628acfd0aa4915d97de81e68355b07c980a7d;hp=4343b835e265b9e5ca4a85bc5fff86414990dd1a;hpb=34418e51585216006d5b241fc9104dc4b88d1be0;p=awesomized%2Flibmemcached diff --git a/example/memcached_light.c b/example/memcached_light.c index 4343b835..6a87ff19 100644 --- a/example/memcached_light.c +++ b/example/memcached_light.c @@ -15,38 +15,158 @@ * the more "logical" interface. * memcached_light.c - This file sets up all of the sockets and run the main * message loop. + * + * + * config.h is included so that I can use the ntohll/htonll on platforms that + * doesn't have that (this is a private function inside libmemcached, so you + * cannot use it directly from libmemcached without special modifications to + * the library) */ +#include "config.h" #include #include -#include -#include -#include #include #include #include #include #include #include -#include +#include #include #include #include "storage.h" +#include "memcached_light.h" -extern struct memcached_binary_protocol_callback_st interface_v0_impl; -extern struct memcached_binary_protocol_callback_st interface_v1_impl; +extern memcached_binary_protocol_callback_st interface_v0_impl; +extern memcached_binary_protocol_callback_st interface_v1_impl; -static int server_sockets[1024]; +static memcached_socket_t server_sockets[1024]; static int num_server_sockets= 0; -static void* socket_userdata_map[1024]; + +struct connection +{ + void *userdata; + struct event event; +}; + +/* The default maximum number of connections... (change with -c) */ +static int maxconns = 1024; + +static struct connection *socket_userdata_map; static bool verbose= false; +static struct event_base *event_base; + +struct options_st { + char *pid_file; + bool has_port; + in_port_t port; +} global_options; + +typedef struct options_st options_st; + +/** + * Callback for driving a client connection + * @param fd the socket for the client socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the client + */ +static void drive_client(memcached_socket_t fd, short which, void *arg) +{ + (void)which; + struct connection *client= arg; + struct memcached_protocol_client_st* c= client->userdata; + assert(c != NULL); + + memcached_protocol_event_t events= memcached_protocol_client_work(c); + if (events & MEMCACHED_PROTOCOL_ERROR_EVENT) + { + memcached_protocol_client_destroy(c); + closesocket(fd); + } else { + short flags = 0; + if (events & MEMCACHED_PROTOCOL_WRITE_EVENT) + { + flags= EV_WRITE; + } + + if (events & MEMCACHED_PROTOCOL_READ_EVENT) + { + flags|= EV_READ; + } + + event_set(&client->event, (intptr_t)fd, flags, drive_client, client); + event_base_set(event_base, &client->event); + + if (event_add(&client->event, 0) == -1) + { + (void)fprintf(stderr, "Failed to add event for %d\n", fd); + memcached_protocol_client_destroy(c); + closesocket(fd); + } + } +} + +/** + * Callback for accepting new connections + * @param fd the socket for the server socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the server + */ +static void accept_handler(memcached_socket_t fd, short which, void *arg) +{ + (void)which; + struct connection *server= arg; + /* accept new client */ + struct sockaddr_storage addr; + socklen_t addrlen= sizeof(addr); + memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen); + + if (sock == INVALID_SOCKET) + { + perror("Failed to accept client"); + return ; + } + +#ifndef WIN32 + if (sock >= maxconns) + { + (void)fprintf(stderr, "Client outside socket range (specified with -c)\n"); + closesocket(sock); + return ; + } +#endif + + struct memcached_protocol_client_st* c; + c= memcached_protocol_create_client(server->userdata, sock); + if (c == NULL) + { + (void)fprintf(stderr, "Failed to create client\n"); + closesocket(sock); + } + else + { + struct connection *client = &socket_userdata_map[sock]; + client->userdata= c; + + event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client); + event_base_set(event_base, &client->event); + if (event_add(&client->event, 0) == -1) + { + (void)fprintf(stderr, "Failed to add event for %d\n", sock); + memcached_protocol_client_destroy(c); + closesocket(sock); + } + } +} /** * Create a socket and bind it to a specific port number * @param port the port number to bind to */ -static int server_socket(const char *port) { +static int server_socket(const char *port) +{ struct addrinfo *ai; struct addrinfo hints= { .ai_flags= AI_PASSIVE, .ai_family= AF_UNSPEC, @@ -60,25 +180,35 @@ static int server_socket(const char *port) { else perror("getaddrinfo()"); - return 1; + return 0; } struct linger ling= {0, 0}; for (struct addrinfo *next= ai; next; next= next->ai_next) { - int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); - if (sock == -1) + memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol); + if (sock == INVALID_SOCKET) { perror("Failed to create socket"); continue; } - int flags= fcntl(sock, F_GETFL, 0); + int flags; +#ifdef WIN32 + u_long arg = 1; + if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR) + { + perror("Failed to set nonblocking io"); + closesocket(sock); + continue; + } +#else + flags= fcntl(sock, F_GETFL, 0); if (flags == -1) { perror("Failed to get socket flags"); - close(sock); + closesocket(sock); continue; } @@ -87,10 +217,11 @@ static int server_socket(const char *port) { if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1) { perror("Failed to set socket to nonblocking mode"); - close(sock); + closesocket(sock); continue; } } +#endif flags= 1; if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0) @@ -105,21 +236,21 @@ static int server_socket(const char *port) { if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0) perror("Failed to set TCP_NODELAY"); - if (bind(sock, next->ai_addr, next->ai_addrlen) == -1) + if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR) { - if (errno != EADDRINUSE) + if (get_socket_errno() != EADDRINUSE) { perror("bind()"); freeaddrinfo(ai); } - close(sock); + closesocket(sock); continue; } - if (listen(sock, 1024) == -1) + if (listen(sock, 1024) == SOCKET_ERROR) { perror("listen()"); - close(sock); + closesocket(sock); continue; } @@ -156,8 +287,8 @@ static const char* comcode2str(uint8_t cmd) /** * Print out the command we are about to execute */ -static void pre_execute(const void *cookie __attribute__((unused)), - protocol_binary_request_header *header __attribute__((unused))) +static void pre_execute(const void *cookie, + protocol_binary_request_header *header) { if (verbose) { @@ -172,8 +303,8 @@ static void pre_execute(const void *cookie __attribute__((unused)), /** * Print out the command we just executed */ -static void post_execute(const void *cookie __attribute__((unused)), - protocol_binary_request_header *header __attribute__((unused))) +static void post_execute(const void *cookie, + protocol_binary_request_header *header) { if (verbose) { @@ -207,54 +338,92 @@ static protocol_binary_response_status unknown(const void *cookie, return response_handler(cookie, header, (void*)&response); } -static void work(void); - /** * Program entry point. Bind to the specified port(s) and serve clients * * @param argc number of items in the argument vector * @param argv argument vector - * @return 0 on success, 1 otherwise + * @return EXIT_SUCCESS on success, 1 otherwise */ int main(int argc, char **argv) { - bool port_specified= false; int cmd; - struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl; + memcached_binary_protocol_callback_st *interface= &interface_v0_impl; + + memset(&global_options, 0, sizeof(global_options)); + + event_base= event_init(); + if (event_base == NULL) + { + fprintf(stderr, "Failed to create an instance of libevent\n"); + return EXIT_FAILURE; + } - while ((cmd= getopt(argc, argv, "v1p:?")) != EOF) + /* + * We need to initialize the handlers manually due to a bug in the + * warnings generated by struct initialization in gcc (all the way up to 4.4) + */ + initialize_interface_v0_handler(); + + while ((cmd= getopt(argc, argv, "v1p:P:?hc:")) != EOF) { switch (cmd) { case '1': interface= &interface_v1_impl; break; + case 'P': + global_options.pid_file= strdup(optarg); + break; case 'p': - port_specified= true; + global_options.has_port= true; (void)server_socket(optarg); break; case 'v': verbose= true; break; + case 'c': + maxconns= atoi(optarg); + break; + case 'h': /* FALLTHROUGH */ case '?': /* FALLTHROUGH */ default: - (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]); - return 1; + (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n", + argv[0]); + return EXIT_FAILURE; } } - if (!initialize_storage()) + if (! initialize_storage()) { /* Error message already printed */ - return 1; + return EXIT_FAILURE; } - if (!port_specified) + if (! global_options.has_port) (void)server_socket("9999"); + if (global_options.pid_file) + { + FILE *pid_file; + uint32_t pid; + + pid_file= fopen(global_options.pid_file, "w+"); + + if (pid_file == NULL) + { + perror(strerror(get_socket_errno())); + abort(); + } + + pid= (uint32_t)getpid(); + fprintf(pid_file, "%u\n", pid); + fclose(pid_file); + } + if (num_server_sockets == 0) { fprintf(stderr, "I don't have any server sockets\n"); - return 1; + return EXIT_FAILURE; } /* @@ -270,115 +439,36 @@ int main(int argc, char **argv) if ((protocol_handle= memcached_protocol_create_instance()) == NULL) { fprintf(stderr, "Failed to allocate protocol handle\n"); - return 1; + return EXIT_FAILURE; + } + + socket_userdata_map= calloc((size_t)(maxconns), sizeof(struct connection)); + if (socket_userdata_map == NULL) + { + fprintf(stderr, "Failed to allocate room for connections\n"); + return EXIT_FAILURE; } memcached_binary_protocol_set_callbacks(protocol_handle, interface); memcached_binary_protocol_set_pedantic(protocol_handle, true); for (int xx= 0; xx < num_server_sockets; ++xx) - socket_userdata_map[server_sockets[xx]]= protocol_handle; - - /* Serve all of the clients */ - work(); - - /* NOTREACHED */ - return 0; -} - -static void work(void) -{ -#define MAX_SERVERS_TO_POLL 100 - struct pollfd fds[MAX_SERVERS_TO_POLL]; - int max_poll; - - for (max_poll= 0; max_poll < num_server_sockets; ++max_poll) - { - fds[max_poll].events= POLLIN; - fds[max_poll].revents= 0; - fds[max_poll].fd= server_sockets[max_poll]; - ++max_poll; - } - - while (true) { - int err= poll(fds, (nfds_t)max_poll, -1); - - if (err == 0 || (err == -1 && errno != EINTR)) + struct connection *conn= &socket_userdata_map[server_sockets[xx]]; + conn->userdata= protocol_handle; + event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST, + accept_handler, conn); + event_base_set(event_base, &conn->event); + if (event_add(&conn->event, 0) == -1) { - perror("poll() failed"); - abort(); - } - - /* find the available filedescriptors */ - for (int x= max_poll - 1; x > -1 && err > 0; --x) - { - if (fds[x].revents != 0) - { - --err; - if (x < num_server_sockets) - { - /* accept new client */ - struct sockaddr_storage addr; - socklen_t addrlen= sizeof(addr); - int sock= accept(fds[x].fd, (struct sockaddr *)&addr, - &addrlen); - - if (sock == -1) - { - perror("Failed to accept client"); - continue; - } - - struct memcached_protocol_st *protocol; - protocol= socket_userdata_map[fds[x].fd]; - - struct memcached_protocol_client_st* c; - c= memcached_protocol_create_client(protocol, sock); - if (c == NULL) - { - fprintf(stderr, "Failed to create client\n"); - close(sock); - } - else - { - socket_userdata_map[sock]= c; - fds[max_poll].events= POLLIN; - fds[max_poll].revents= 0; - fds[max_poll].fd= sock; - ++max_poll; - } - } - else - { - /* drive the client */ - struct memcached_protocol_client_st* c; - c= socket_userdata_map[fds[x].fd]; - assert(c != NULL); - fds[max_poll].events= 0; - - switch (memcached_protocol_client_work(c)) - { - case WRITE_EVENT: - case READ_WRITE_EVENT: - fds[max_poll].events= POLLOUT; - /* FALLTHROUGH */ - case READ_EVENT: - fds[max_poll].events |= POLLIN; - break; - case ERROR_EVENT: - default: /* ERROR or unknown state.. close */ - memcached_protocol_client_destroy(c); - close(fds[x].fd); - fds[x].events= 0; - - if (x != max_poll - 1) - memmove(fds + x, fds + x + 1, (size_t)(max_poll - x)); - - --max_poll; - } - } - } + fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]); + closesocket(server_sockets[xx]); } } + + /* Serve all of the clients */ + event_base_loop(event_base, 0); + + /* NOTREACHED */ + return EXIT_SUCCESS; }