X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=example%2Fmemcached_light.c;h=e2689e03a43ca515ae24346db7d94d0e8c9f964c;hb=ca549c7f21e052db3cc00f37bdf9e2f39988e02f;hp=86087c5a9b7010140a4de95a0ade3b1e2c5fb72f;hpb=bb79693dc3100f28ea4fa477db895876d71fe0e9;p=awesomized%2Flibmemcached diff --git a/example/memcached_light.c b/example/memcached_light.c index 86087c5a..e2689e03 100644 --- a/example/memcached_light.c +++ b/example/memcached_light.c @@ -35,7 +35,7 @@ #include #include #include -#include +#include #include #include @@ -47,14 +47,127 @@ extern memcached_binary_protocol_callback_st interface_v1_impl; static int server_sockets[1024]; static int num_server_sockets= 0; -static void* socket_userdata_map[1024]; + +struct connection +{ + void *userdata; + struct event event; +}; + +/* The default maximum number of connections... (change with -c) */ +static int maxconns = 1024; + +static struct connection *socket_userdata_map; static bool verbose= false; +static struct event_base *event_base; + +struct options_st { + char *pid_file; + bool has_port; + in_port_t port; +} global_options; + +typedef struct options_st options_st; + +/** + * Callback for driving a client connection + * @param fd the socket for the client socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the client + */ +static void drive_client(int fd, short which, void *arg) +{ + (void)which; + struct connection *client= arg; + struct memcached_protocol_client_st* c= client->userdata; + assert(c != NULL); + + memcached_protocol_event_t events= memcached_protocol_client_work(c); + if (events & MEMCACHED_PROTOCOL_ERROR_EVENT) + { + memcached_protocol_client_destroy(c); + (void)close(fd); + } else { + short flags = 0; + if (events & MEMCACHED_PROTOCOL_WRITE_EVENT) + { + flags= EV_WRITE; + } + + if (events & MEMCACHED_PROTOCOL_READ_EVENT) + { + flags|= EV_READ; + } + + event_set(&client->event, fd, flags, drive_client, client); + event_base_set(event_base, &client->event); + + if (event_add(&client->event, 0) == -1) + { + (void)fprintf(stderr, "Failed to add event for %d\n", fd); + memcached_protocol_client_destroy(c); + (void)close(fd); + } + } +} + +/** + * Callback for accepting new connections + * @param fd the socket for the server socket + * @param which identifying the event that occurred (not used) + * @param arg the connection structure for the server + */ +static void accept_handler(int fd, short which, void *arg) +{ + (void)which; + struct connection *server= arg; + /* accept new client */ + struct sockaddr_storage addr; + socklen_t addrlen= sizeof(addr); + int sock= accept(fd, (struct sockaddr *)&addr, &addrlen); + + if (sock == -1) + { + perror("Failed to accept client"); + return ; + } + + if (sock >= maxconns) + { + (void)fprintf(stderr, "Client outside socket range (specified with -c)\n"); + (void)close(sock); + return ; + } + + struct memcached_protocol_client_st* c; + c= memcached_protocol_create_client(server->userdata, sock); + if (c == NULL) + { + (void)fprintf(stderr, "Failed to create client\n"); + (void)close(sock); + } + else + { + struct connection *client = &socket_userdata_map[sock]; + client->userdata= c; + + event_set(&client->event, sock, EV_READ, drive_client, client); + event_base_set(event_base, &client->event); + if (event_add(&client->event, 0) == -1) + { + (void)fprintf(stderr, "Failed to add event for %d\n", sock); + memcached_protocol_client_destroy(c); + (void)close(sock); + } + } +} /** * Create a socket and bind it to a specific port number * @param port the port number to bind to */ -static int server_socket(const char *port) { +static int server_socket(const char *port) +{ struct addrinfo *ai; struct addrinfo hints= { .ai_flags= AI_PASSIVE, .ai_family= AF_UNSPEC, @@ -215,8 +328,6 @@ static protocol_binary_response_status unknown(const void *cookie, return response_handler(cookie, header, (void*)&response); } -static void work(void); - /** * Program entry point. Bind to the specified port(s) and serve clients * @@ -226,45 +337,79 @@ static void work(void); */ int main(int argc, char **argv) { - bool port_specified= false; int cmd; memcached_binary_protocol_callback_st *interface= &interface_v0_impl; + memset(&global_options, 0, sizeof(global_options)); + + event_base= event_init(); + if (event_base == NULL) + { + fprintf(stderr, "Failed to create an instance of libevent\n"); + return 1; + } + /* * We need to initialize the handlers manually due to a bug in the * warnings generated by struct initialization in gcc (all the way up to 4.4) */ - initialize_iterface_v0_handler(); + initialize_interface_v0_handler(); - while ((cmd= getopt(argc, argv, "v1p:?")) != EOF) + while ((cmd= getopt(argc, argv, "v1p:P:?hc:")) != EOF) { switch (cmd) { case '1': interface= &interface_v1_impl; break; + case 'P': + global_options.pid_file= strdup(optarg); + break; case 'p': - port_specified= true; + global_options.has_port= true; (void)server_socket(optarg); break; case 'v': verbose= true; break; + case 'c': + maxconns= atoi(optarg); + break; + case 'h': /* FALLTHROUGH */ case '?': /* FALLTHROUGH */ default: - (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]); + (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n", + argv[0]); return 1; } } - if (!initialize_storage()) + if (! initialize_storage()) { /* Error message already printed */ return 1; } - if (!port_specified) + if (! global_options.has_port) (void)server_socket("9999"); + if (global_options.pid_file) + { + FILE *pid_file; + uint32_t pid; + + pid_file= fopen(global_options.pid_file, "w+"); + + if (pid_file == NULL) + { + perror(strerror(errno)); + abort(); + } + + pid= (uint32_t)getpid(); + fprintf(pid_file, "%u\n", pid); + fclose(pid_file); + } + if (num_server_sockets == 0) { fprintf(stderr, "I don't have any server sockets\n"); @@ -287,111 +432,33 @@ int main(int argc, char **argv) return 1; } + socket_userdata_map= calloc((size_t)(maxconns), sizeof(struct connection)); + if (socket_userdata_map == NULL) + { + fprintf(stderr, "Failed to allocate room for connections\n"); + return 1; + } + memcached_binary_protocol_set_callbacks(protocol_handle, interface); memcached_binary_protocol_set_pedantic(protocol_handle, true); for (int xx= 0; xx < num_server_sockets; ++xx) - socket_userdata_map[server_sockets[xx]]= protocol_handle; + { + struct connection *conn= &socket_userdata_map[server_sockets[xx]]; + conn->userdata= protocol_handle; + event_set(&conn->event, server_sockets[xx], EV_READ | EV_PERSIST, + accept_handler, conn); + event_base_set(event_base, &conn->event); + if (event_add(&conn->event, 0) == -1) + { + fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]); + close(server_sockets[xx]); + } + } /* Serve all of the clients */ - work(); + event_base_loop(event_base, 0); /* NOTREACHED */ return 0; } - -static void work(void) -{ -#define MAX_SERVERS_TO_POLL 100 - struct pollfd fds[MAX_SERVERS_TO_POLL]; - int max_poll; - - for (max_poll= 0; max_poll < num_server_sockets; ++max_poll) - { - fds[max_poll].events= POLLIN; - fds[max_poll].revents= 0; - fds[max_poll].fd= server_sockets[max_poll]; - ++max_poll; - } - - while (true) - { - int err= poll(fds, (nfds_t)max_poll, -1); - - if (err == 0 || (err == -1 && errno != EINTR)) - { - perror("poll() failed"); - abort(); - } - - /* find the available filedescriptors */ - for (int x= max_poll - 1; x > -1 && err > 0; --x) - { - if (fds[x].revents != 0) - { - --err; - if (x < num_server_sockets) - { - /* accept new client */ - struct sockaddr_storage addr; - socklen_t addrlen= sizeof(addr); - int sock= accept(fds[x].fd, (struct sockaddr *)&addr, - &addrlen); - - if (sock == -1) - { - perror("Failed to accept client"); - continue; - } - - struct memcached_protocol_st *protocol; - protocol= socket_userdata_map[fds[x].fd]; - - struct memcached_protocol_client_st* c; - c= memcached_protocol_create_client(protocol, sock); - if (c == NULL) - { - fprintf(stderr, "Failed to create client\n"); - close(sock); - } - else - { - socket_userdata_map[sock]= c; - fds[max_poll].events= POLLIN; - fds[max_poll].revents= 0; - fds[max_poll].fd= sock; - ++max_poll; - } - } - else - { - /* drive the client */ - struct memcached_protocol_client_st* c; - c= socket_userdata_map[fds[x].fd]; - assert(c != NULL); - fds[max_poll].events= 0; - - memcached_protocol_event_t events= memcached_protocol_client_work(c); - if (events & MEMCACHED_PROTOCOL_WRITE_EVENT) - fds[max_poll].events= POLLOUT; - - if (events & MEMCACHED_PROTOCOL_READ_EVENT) - fds[max_poll].events= POLLIN; - - if (!(events & MEMCACHED_PROTOCOL_PAUSE_EVENT || - fds[max_poll].events != 0)) - { - memcached_protocol_client_destroy(c); - close(fds[x].fd); - fds[x].events= 0; - - if (x != max_poll - 1) - memmove(fds + x, fds + x + 1, (size_t)(max_poll - x)); - - --max_poll; - } - } - } - } - } -}