Add an exception class for tossing resource error.
[awesomized/libmemcached] / example / memcached_light.c
index 86087c5a9b7010140a4de95a0ade3b1e2c5fb72f..c064e5199efff18a16635bbc07f477d6e9e78743 100644 (file)
 #include "config.h"
 #include <assert.h>
 #include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/tcp.h>
 #include <stdio.h>
 #include <unistd.h>
 #include <fcntl.h>
 #include <errno.h>
 #include <stdlib.h>
 #include <string.h>
-#include <poll.h>
+#include <event.h>
 
-#include <libmemcached/protocol_handler.h>
-#include <libmemcached/byteorder.h>
-#include "storage.h"
-#include "memcached_light.h"
+#include <libmemcachedprotocol-0.0/handler.h>
+#include <libmemcached/socket.hpp>
+#include <example/byteorder.h>
+#include "example/storage.h"
+#include "example/memcached_light.h"
 
 extern memcached_binary_protocol_callback_st interface_v0_impl;
 extern memcached_binary_protocol_callback_st interface_v1_impl;
 
-static int server_sockets[1024];
+static memcached_socket_t server_sockets[1024];
 static int num_server_sockets= 0;
-static void* socket_userdata_map[1024];
+
+struct connection
+{
+  void *userdata;
+  struct event event;
+};
+
+/* The default maximum number of connections... (change with -c) */
+static int maxconns = 1024;
+
+static struct connection *socket_userdata_map;
 static bool verbose= false;
+static struct event_base *event_base;
+
+struct options_st {
+  char *pid_file;
+  bool has_port;
+  in_port_t port;
+} global_options;
+
+typedef struct options_st options_st;
+
+/**
+ * Callback for driving a client connection
+ * @param fd the socket for the client socket
+ * @param which identifying the event that occurred (not used)
+ * @param arg the connection structure for the client
+ */
+static void drive_client(memcached_socket_t fd, short which, void *arg)
+{
+  (void)which;
+  struct connection *client= arg;
+  struct memcached_protocol_client_st* c= client->userdata;
+  assert(c != NULL);
+
+  memcached_protocol_event_t events= memcached_protocol_client_work(c);
+  if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
+  {
+    memcached_protocol_client_destroy(c);
+    closesocket(fd);
+  } else {
+    short flags = 0;
+    if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
+    {
+      flags= EV_WRITE;
+    }
+
+    if (events & MEMCACHED_PROTOCOL_READ_EVENT)
+    {
+      flags|= EV_READ;
+    }
+
+    event_set(&client->event, (intptr_t)fd, flags, drive_client, client);
+    event_base_set(event_base, &client->event);
+
+    if (event_add(&client->event, 0) == -1)
+    {
+      (void)fprintf(stderr, "Failed to add event for %d\n", fd);
+      memcached_protocol_client_destroy(c);
+      closesocket(fd);
+    }
+  }
+}
+
+/**
+ * Callback for accepting new connections
+ * @param fd the socket for the server socket
+ * @param which identifying the event that occurred (not used)
+ * @param arg the connection structure for the server
+ */
+static void accept_handler(memcached_socket_t fd, short which, void *arg)
+{
+  (void)which;
+  struct connection *server= arg;
+  /* accept new client */
+  struct sockaddr_storage addr;
+  socklen_t addrlen= sizeof(addr);
+  memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
+
+  if (sock == INVALID_SOCKET)
+  {
+    perror("Failed to accept client");
+    return ;
+  }
+
+#ifndef WIN32
+  if (sock >= maxconns)
+  {
+    (void)fprintf(stderr, "Client outside socket range (specified with -c)\n");
+    closesocket(sock);
+    return ;
+  }
+#endif
+
+  struct memcached_protocol_client_st* c;
+  c= memcached_protocol_create_client(server->userdata, sock);
+  if (c == NULL)
+  {
+    (void)fprintf(stderr, "Failed to create client\n");
+    closesocket(sock);
+  }
+  else
+  {
+    struct connection *client = &socket_userdata_map[sock];
+    client->userdata= c;
+
+    event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client);
+    event_base_set(event_base, &client->event);
+    if (event_add(&client->event, 0) == -1)
+    {
+      (void)fprintf(stderr, "Failed to add event for %d\n", sock);
+      memcached_protocol_client_destroy(c);
+      closesocket(sock);
+    }
+  }
+}
 
 /**
  * Create a socket and bind it to a specific port number
  * @param port the port number to bind to
  */
-static int server_socket(const char *port) {
+static int server_socket(const char *port)
+{
   struct addrinfo *ai;
   struct addrinfo hints= { .ai_flags= AI_PASSIVE,
                            .ai_family= AF_UNSPEC,
@@ -68,25 +181,35 @@ static int server_socket(const char *port) {
     else
       perror("getaddrinfo()");
 
-    return 1;
+    return 0;
   }
 
   struct linger ling= {0, 0};
 
   for (struct addrinfo *next= ai; next; next= next->ai_next)
   {
-    int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
-    if (sock == -1)
+    memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
+    if (sock == INVALID_SOCKET)
     {
       perror("Failed to create socket");
       continue;
     }
 
-    int flags= fcntl(sock, F_GETFL, 0);
+    int flags;
+#ifdef WIN32
+    u_long arg = 1;
+    if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
+    {
+      perror("Failed to set nonblocking io");
+      closesocket(sock);
+      continue;
+    }
+#else
+    flags= fcntl(sock, F_GETFL, 0);
     if (flags == -1)
     {
       perror("Failed to get socket flags");
-      close(sock);
+      closesocket(sock);
       continue;
     }
 
@@ -95,10 +218,11 @@ static int server_socket(const char *port) {
       if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
       {
         perror("Failed to set socket to nonblocking mode");
-        close(sock);
+        closesocket(sock);
         continue;
       }
     }
+#endif
 
     flags= 1;
     if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
@@ -113,21 +237,21 @@ static int server_socket(const char *port) {
     if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
       perror("Failed to set TCP_NODELAY");
 
-    if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
+    if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
     {
-      if (errno != EADDRINUSE)
+      if (get_socket_errno() != EADDRINUSE)
       {
         perror("bind()");
         freeaddrinfo(ai);
       }
-      close(sock);
+      closesocket(sock);
       continue;
     }
 
-    if (listen(sock, 1024) == -1)
+    if (listen(sock, 1024) == SOCKET_ERROR)
     {
       perror("listen()");
-      close(sock);
+      closesocket(sock);
       continue;
     }
 
@@ -164,8 +288,8 @@ static const char* comcode2str(uint8_t cmd)
 /**
  * Print out the command we are about to execute
  */
-static void pre_execute(const void *cookie __attribute__((unused)),
-                        protocol_binary_request_header *header __attribute__((unused)))
+static void pre_execute(const void *cookie,
+                        protocol_binary_request_header *header)
 {
   if (verbose)
   {
@@ -180,8 +304,8 @@ static void pre_execute(const void *cookie __attribute__((unused)),
 /**
  * Print out the command we just executed
  */
-static void post_execute(const void *cookie __attribute__((unused)),
-                         protocol_binary_request_header *header __attribute__((unused)))
+static void post_execute(const void *cookie,
+                         protocol_binary_request_header *header)
 {
   if (verbose)
   {
@@ -215,60 +339,92 @@ static protocol_binary_response_status unknown(const void *cookie,
   return response_handler(cookie, header, (void*)&response);
 }
 
-static void work(void);
-
 /**
  * Program entry point. Bind to the specified port(s) and serve clients
  *
  * @param argc number of items in the argument vector
  * @param argv argument vector
- * @return 0 on success, 1 otherwise
+ * @return EXIT_SUCCESS on success, 1 otherwise
  */
 int main(int argc, char **argv)
 {
-  bool port_specified= false;
   int cmd;
   memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
 
+  memset(&global_options, 0, sizeof(global_options));
+
+  event_base= event_init();
+  if (event_base == NULL)
+  {
+    fprintf(stderr, "Failed to create an instance of libevent\n");
+    return EXIT_FAILURE;
+  }
+
   /*
    * We need to initialize the handlers manually due to a bug in the
    * warnings generated by struct initialization in gcc (all the way up to 4.4)
    */
-  initialize_iterface_v0_handler();
+  initialize_interface_v0_handler();
 
-  while ((cmd= getopt(argc, argv, "v1p:?")) != EOF)
+  while ((cmd= getopt(argc, argv, "v1p:P:?hc:")) != EOF)
   {
     switch (cmd) {
     case '1':
       interface= &interface_v1_impl;
       break;
+    case 'P':
+      global_options.pid_file= strdup(optarg);
+      break;
     case 'p':
-      port_specified= true;
+      global_options.has_port= true;
       (void)server_socket(optarg);
       break;
     case 'v':
       verbose= true;
       break;
+    case 'c':
+      maxconns= atoi(optarg);
+      break;
+    case 'h':  /* FALLTHROUGH */
     case '?':  /* FALLTHROUGH */
     default:
-      (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
-      return 1;
+      (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n",
+                    argv[0]);
+      return EXIT_FAILURE;
     }
   }
 
-  if (!initialize_storage())
+  if (! initialize_storage())
   {
     /* Error message already printed */
-    return 1;
+    return EXIT_FAILURE;
   }
 
-  if (!port_specified)
+  if (! global_options.has_port)
     (void)server_socket("9999");
 
+  if (global_options.pid_file)
+  {
+    FILE *pid_file;
+    uint32_t pid;
+
+    pid_file= fopen(global_options.pid_file, "w+");
+
+    if (pid_file == NULL)
+    {
+      perror(strerror(get_socket_errno()));
+      abort();
+    }
+
+    pid= (uint32_t)getpid();
+    fprintf(pid_file, "%u\n", pid);
+    fclose(pid_file);
+  }
+
   if (num_server_sockets == 0)
   {
     fprintf(stderr, "I don't have any server sockets\n");
-    return 1;
+    return EXIT_FAILURE;
   }
 
   /*
@@ -284,114 +440,36 @@ int main(int argc, char **argv)
   if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
   {
     fprintf(stderr, "Failed to allocate protocol handle\n");
-    return 1;
+    return EXIT_FAILURE;
+  }
+
+  socket_userdata_map= calloc((size_t)(maxconns), sizeof(struct connection));
+  if (socket_userdata_map == NULL)
+  {
+    fprintf(stderr, "Failed to allocate room for connections\n");
+    return EXIT_FAILURE;
   }
 
   memcached_binary_protocol_set_callbacks(protocol_handle, interface);
   memcached_binary_protocol_set_pedantic(protocol_handle, true);
 
   for (int xx= 0; xx < num_server_sockets; ++xx)
-    socket_userdata_map[server_sockets[xx]]= protocol_handle;
-
-  /* Serve all of the clients */
-  work();
-
-  /* NOTREACHED */
-  return 0;
-}
-
-static void work(void)
-{
-#define MAX_SERVERS_TO_POLL 100
-  struct pollfd fds[MAX_SERVERS_TO_POLL];
-  int max_poll;
-
-  for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
   {
-    fds[max_poll].events= POLLIN;
-    fds[max_poll].revents= 0;
-    fds[max_poll].fd= server_sockets[max_poll];
-    ++max_poll;
-  }
-
-  while (true)
-  {
-    int err= poll(fds, (nfds_t)max_poll, -1);
-
-    if (err == 0 || (err == -1 && errno != EINTR))
+    struct connection *conn= &socket_userdata_map[server_sockets[xx]];
+    conn->userdata= protocol_handle;
+    event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST,
+              accept_handler, conn);
+    event_base_set(event_base, &conn->event);
+    if (event_add(&conn->event, 0) == -1)
     {
-      perror("poll() failed");
-      abort();
-    }
-
-    /* find the available filedescriptors */
-    for (int x= max_poll - 1; x > -1 && err > 0; --x)
-    {
-      if (fds[x].revents != 0)
-      {
-        --err;
-        if (x < num_server_sockets)
-        {
-          /* accept new client */
-          struct sockaddr_storage addr;
-          socklen_t addrlen= sizeof(addr);
-          int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
-                           &addrlen);
-
-          if (sock == -1)
-          {
-            perror("Failed to accept client");
-            continue;
-          }
-
-          struct memcached_protocol_st *protocol;
-          protocol= socket_userdata_map[fds[x].fd];
-
-          struct memcached_protocol_client_st* c;
-          c= memcached_protocol_create_client(protocol, sock);
-          if (c == NULL)
-          {
-            fprintf(stderr, "Failed to create client\n");
-            close(sock);
-          }
-          else
-          {
-            socket_userdata_map[sock]= c;
-            fds[max_poll].events= POLLIN;
-            fds[max_poll].revents= 0;
-            fds[max_poll].fd= sock;
-            ++max_poll;
-          }
-        }
-        else
-        {
-          /* drive the client */
-          struct memcached_protocol_client_st* c;
-          c= socket_userdata_map[fds[x].fd];
-          assert(c != NULL);
-          fds[max_poll].events= 0;
-
-          memcached_protocol_event_t events= memcached_protocol_client_work(c);
-          if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
-            fds[max_poll].events= POLLOUT;
-
-          if (events & MEMCACHED_PROTOCOL_READ_EVENT)
-            fds[max_poll].events= POLLIN;
-
-          if (!(events & MEMCACHED_PROTOCOL_PAUSE_EVENT ||
-                fds[max_poll].events != 0))
-          {
-            memcached_protocol_client_destroy(c);
-            close(fds[x].fd);
-            fds[x].events= 0;
-
-            if (x != max_poll - 1)
-              memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
-
-            --max_poll;
-          }
-        }
-      }
+      fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]);
+      closesocket(server_sockets[xx]);
     }
   }
+
+  /* Serve all of the clients */
+  event_base_loop(event_base, 0);
+
+  /* NOTREACHED */
+  return EXIT_SUCCESS;
 }