Merge in updates for server failure testing.
[awesomized/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include "config.h"
27 #include <assert.h>
28 #include <sys/types.h>
29 #include <stdio.h>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <errno.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <event.h>
36
37 #include <libmemcached/protocol_handler.h>
38 #include <example/byteorder.h>
39 #include "storage.h"
40 #include "memcached_light.h"
41
42 extern memcached_binary_protocol_callback_st interface_v0_impl;
43 extern memcached_binary_protocol_callback_st interface_v1_impl;
44
45 static memcached_socket_t server_sockets[1024];
46 static int num_server_sockets= 0;
47
48 struct connection
49 {
50 void *userdata;
51 struct event event;
52 };
53
54 /* The default maximum number of connections... (change with -c) */
55 static int maxconns = 1024;
56
57 static struct connection *socket_userdata_map;
58 static bool verbose= false;
59 static struct event_base *event_base;
60
61 struct options_st {
62 char *pid_file;
63 bool has_port;
64 in_port_t port;
65 } global_options;
66
67 typedef struct options_st options_st;
68
69 /**
70 * Callback for driving a client connection
71 * @param fd the socket for the client socket
72 * @param which identifying the event that occurred (not used)
73 * @param arg the connection structure for the client
74 */
75 static void drive_client(memcached_socket_t fd, short which, void *arg)
76 {
77 (void)which;
78 struct connection *client= arg;
79 struct memcached_protocol_client_st* c= client->userdata;
80 assert(c != NULL);
81
82 memcached_protocol_event_t events= memcached_protocol_client_work(c);
83 if (events & MEMCACHED_PROTOCOL_ERROR_EVENT)
84 {
85 memcached_protocol_client_destroy(c);
86 closesocket(fd);
87 } else {
88 short flags = 0;
89 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
90 {
91 flags= EV_WRITE;
92 }
93
94 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
95 {
96 flags|= EV_READ;
97 }
98
99 event_set(&client->event, (intptr_t)fd, flags, drive_client, client);
100 event_base_set(event_base, &client->event);
101
102 if (event_add(&client->event, 0) == -1)
103 {
104 (void)fprintf(stderr, "Failed to add event for %d\n", fd);
105 memcached_protocol_client_destroy(c);
106 closesocket(fd);
107 }
108 }
109 }
110
111 /**
112 * Callback for accepting new connections
113 * @param fd the socket for the server socket
114 * @param which identifying the event that occurred (not used)
115 * @param arg the connection structure for the server
116 */
117 static void accept_handler(memcached_socket_t fd, short which, void *arg)
118 {
119 (void)which;
120 struct connection *server= arg;
121 /* accept new client */
122 struct sockaddr_storage addr;
123 socklen_t addrlen= sizeof(addr);
124 memcached_socket_t sock= accept(fd, (struct sockaddr *)&addr, &addrlen);
125
126 if (sock == INVALID_SOCKET)
127 {
128 perror("Failed to accept client");
129 return ;
130 }
131
132 #ifndef WIN32
133 if (sock >= maxconns)
134 {
135 (void)fprintf(stderr, "Client outside socket range (specified with -c)\n");
136 closesocket(sock);
137 return ;
138 }
139 #endif
140
141 struct memcached_protocol_client_st* c;
142 c= memcached_protocol_create_client(server->userdata, sock);
143 if (c == NULL)
144 {
145 (void)fprintf(stderr, "Failed to create client\n");
146 closesocket(sock);
147 }
148 else
149 {
150 struct connection *client = &socket_userdata_map[sock];
151 client->userdata= c;
152
153 event_set(&client->event, (intptr_t)sock, EV_READ, drive_client, client);
154 event_base_set(event_base, &client->event);
155 if (event_add(&client->event, 0) == -1)
156 {
157 (void)fprintf(stderr, "Failed to add event for %d\n", sock);
158 memcached_protocol_client_destroy(c);
159 closesocket(sock);
160 }
161 }
162 }
163
164 /**
165 * Create a socket and bind it to a specific port number
166 * @param port the port number to bind to
167 */
168 static int server_socket(const char *port)
169 {
170 struct addrinfo *ai;
171 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
172 .ai_family= AF_UNSPEC,
173 .ai_socktype= SOCK_STREAM };
174
175 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
176 if (error != 0)
177 {
178 if (error != EAI_SYSTEM)
179 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
180 else
181 perror("getaddrinfo()");
182
183 return 0;
184 }
185
186 struct linger ling= {0, 0};
187
188 for (struct addrinfo *next= ai; next; next= next->ai_next)
189 {
190 memcached_socket_t sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
191 if (sock == INVALID_SOCKET)
192 {
193 perror("Failed to create socket");
194 continue;
195 }
196
197 int flags;
198 #ifdef WIN32
199 u_long arg = 1;
200 if (ioctlsocket(sock, FIONBIO, &arg) == SOCKET_ERROR)
201 {
202 perror("Failed to set nonblocking io");
203 closesocket(sock);
204 continue;
205 }
206 #else
207 flags= fcntl(sock, F_GETFL, 0);
208 if (flags == -1)
209 {
210 perror("Failed to get socket flags");
211 closesocket(sock);
212 continue;
213 }
214
215 if ((flags & O_NONBLOCK) != O_NONBLOCK)
216 {
217 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
218 {
219 perror("Failed to set socket to nonblocking mode");
220 closesocket(sock);
221 continue;
222 }
223 }
224 #endif
225
226 flags= 1;
227 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
228 perror("Failed to set SO_REUSEADDR");
229
230 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
231 perror("Failed to set SO_KEEPALIVE");
232
233 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
234 perror("Failed to set SO_LINGER");
235
236 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
237 perror("Failed to set TCP_NODELAY");
238
239 if (bind(sock, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR)
240 {
241 if (get_socket_errno() != EADDRINUSE)
242 {
243 perror("bind()");
244 freeaddrinfo(ai);
245 }
246 closesocket(sock);
247 continue;
248 }
249
250 if (listen(sock, 1024) == SOCKET_ERROR)
251 {
252 perror("listen()");
253 closesocket(sock);
254 continue;
255 }
256
257 server_sockets[num_server_sockets++]= sock;
258 }
259
260 freeaddrinfo(ai);
261
262 return (num_server_sockets > 0) ? 0 : 1;
263 }
264
265 /**
266 * Convert a command code to a textual string
267 * @param cmd the comcode to convert
268 * @return a textual string with the command or NULL for unknown commands
269 */
270 static const char* comcode2str(uint8_t cmd)
271 {
272 static const char * const text[] = {
273 "GET", "SET", "ADD", "REPLACE", "DELETE",
274 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
275 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
276 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
277 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
278 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
279 };
280
281 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
282 return text[cmd];
283
284 return NULL;
285 }
286
287 /**
288 * Print out the command we are about to execute
289 */
290 static void pre_execute(const void *cookie,
291 protocol_binary_request_header *header)
292 {
293 if (verbose)
294 {
295 const char *cmd= comcode2str(header->request.opcode);
296 if (cmd != NULL)
297 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
298 else
299 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
300 }
301 }
302
303 /**
304 * Print out the command we just executed
305 */
306 static void post_execute(const void *cookie,
307 protocol_binary_request_header *header)
308 {
309 if (verbose)
310 {
311 const char *cmd= comcode2str(header->request.opcode);
312 if (cmd != NULL)
313 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
314 else
315 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
316 }
317 }
318
319 /**
320 * Callback handler for all unknown commands.
321 * Send an unknown command back to the client
322 */
323 static protocol_binary_response_status unknown(const void *cookie,
324 protocol_binary_request_header *header,
325 memcached_binary_protocol_raw_response_handler response_handler)
326 {
327 protocol_binary_response_no_extras response= {
328 .message= {
329 .header.response= {
330 .magic= PROTOCOL_BINARY_RES,
331 .opcode= header->request.opcode,
332 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
333 .opaque= header->request.opaque
334 }
335 }
336 };
337
338 return response_handler(cookie, header, (void*)&response);
339 }
340
341 /**
342 * Program entry point. Bind to the specified port(s) and serve clients
343 *
344 * @param argc number of items in the argument vector
345 * @param argv argument vector
346 * @return EXIT_SUCCESS on success, 1 otherwise
347 */
348 int main(int argc, char **argv)
349 {
350 int cmd;
351 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
352
353 memset(&global_options, 0, sizeof(global_options));
354
355 event_base= event_init();
356 if (event_base == NULL)
357 {
358 fprintf(stderr, "Failed to create an instance of libevent\n");
359 return EXIT_FAILURE;
360 }
361
362 /*
363 * We need to initialize the handlers manually due to a bug in the
364 * warnings generated by struct initialization in gcc (all the way up to 4.4)
365 */
366 initialize_interface_v0_handler();
367
368 while ((cmd= getopt(argc, argv, "v1p:P:?hc:")) != EOF)
369 {
370 switch (cmd) {
371 case '1':
372 interface= &interface_v1_impl;
373 break;
374 case 'P':
375 global_options.pid_file= strdup(optarg);
376 break;
377 case 'p':
378 global_options.has_port= true;
379 (void)server_socket(optarg);
380 break;
381 case 'v':
382 verbose= true;
383 break;
384 case 'c':
385 maxconns= atoi(optarg);
386 break;
387 case 'h': /* FALLTHROUGH */
388 case '?': /* FALLTHROUGH */
389 default:
390 (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1] [-c #clients] [-P pidfile]\n",
391 argv[0]);
392 return EXIT_FAILURE;
393 }
394 }
395
396 if (! initialize_storage())
397 {
398 /* Error message already printed */
399 return EXIT_FAILURE;
400 }
401
402 if (! global_options.has_port)
403 (void)server_socket("9999");
404
405 if (global_options.pid_file)
406 {
407 FILE *pid_file;
408 uint32_t pid;
409
410 pid_file= fopen(global_options.pid_file, "w+");
411
412 if (pid_file == NULL)
413 {
414 perror(strerror(get_socket_errno()));
415 abort();
416 }
417
418 pid= (uint32_t)getpid();
419 fprintf(pid_file, "%u\n", pid);
420 fclose(pid_file);
421 }
422
423 if (num_server_sockets == 0)
424 {
425 fprintf(stderr, "I don't have any server sockets\n");
426 return EXIT_FAILURE;
427 }
428
429 /*
430 * Create and initialize the handles to the protocol handlers. I want
431 * to be able to trace the traffic throught the pre/post handlers, and
432 * set up a common handler for unknown messages
433 */
434 interface->pre_execute= pre_execute;
435 interface->post_execute= post_execute;
436 interface->unknown= unknown;
437
438 struct memcached_protocol_st *protocol_handle;
439 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
440 {
441 fprintf(stderr, "Failed to allocate protocol handle\n");
442 return EXIT_FAILURE;
443 }
444
445 socket_userdata_map= calloc((size_t)(maxconns), sizeof(struct connection));
446 if (socket_userdata_map == NULL)
447 {
448 fprintf(stderr, "Failed to allocate room for connections\n");
449 return EXIT_FAILURE;
450 }
451
452 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
453 memcached_binary_protocol_set_pedantic(protocol_handle, true);
454
455 for (int xx= 0; xx < num_server_sockets; ++xx)
456 {
457 struct connection *conn= &socket_userdata_map[server_sockets[xx]];
458 conn->userdata= protocol_handle;
459 event_set(&conn->event, (intptr_t)server_sockets[xx], EV_READ | EV_PERSIST,
460 accept_handler, conn);
461 event_base_set(event_base, &conn->event);
462 if (event_add(&conn->event, 0) == -1)
463 {
464 fprintf(stderr, "Failed to add event for %d\n", server_sockets[xx]);
465 closesocket(server_sockets[xx]);
466 }
467 }
468
469 /* Serve all of the clients */
470 event_base_loop(event_base, 0);
471
472 /* NOTREACHED */
473 return EXIT_SUCCESS;
474 }