Updating for version 0.39
[m6w6/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include "config.h"
27 #include <assert.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netdb.h>
31 #include <netinet/tcp.h>
32 #include <stdio.h>
33 #include <unistd.h>
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <poll.h>
39
40 #include <libmemcached/protocol_handler.h>
41 #include <libmemcached/byteorder.h>
42 #include "storage.h"
43 #include "memcached_light.h"
44
45 extern memcached_binary_protocol_callback_st interface_v0_impl;
46 extern memcached_binary_protocol_callback_st interface_v1_impl;
47
48 static int server_sockets[1024];
49 static int num_server_sockets= 0;
50 static void* socket_userdata_map[1024];
51 static bool verbose= false;
52
53 struct options_st {
54 char *pid_file;
55 bool has_port;
56 in_port_t port;
57 } global_options;
58
59 typedef struct options_st options_st;
60
61 /**
62 * Create a socket and bind it to a specific port number
63 * @param port the port number to bind to
64 */
65 static int server_socket(const char *port)
66 {
67 struct addrinfo *ai;
68 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
69 .ai_family= AF_UNSPEC,
70 .ai_socktype= SOCK_STREAM };
71
72 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
73 if (error != 0)
74 {
75 if (error != EAI_SYSTEM)
76 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
77 else
78 perror("getaddrinfo()");
79
80 return 1;
81 }
82
83 struct linger ling= {0, 0};
84
85 for (struct addrinfo *next= ai; next; next= next->ai_next)
86 {
87 int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
88 if (sock == -1)
89 {
90 perror("Failed to create socket");
91 continue;
92 }
93
94 int flags= fcntl(sock, F_GETFL, 0);
95 if (flags == -1)
96 {
97 perror("Failed to get socket flags");
98 close(sock);
99 continue;
100 }
101
102 if ((flags & O_NONBLOCK) != O_NONBLOCK)
103 {
104 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
105 {
106 perror("Failed to set socket to nonblocking mode");
107 close(sock);
108 continue;
109 }
110 }
111
112 flags= 1;
113 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
114 perror("Failed to set SO_REUSEADDR");
115
116 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
117 perror("Failed to set SO_KEEPALIVE");
118
119 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
120 perror("Failed to set SO_LINGER");
121
122 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
123 perror("Failed to set TCP_NODELAY");
124
125 if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
126 {
127 if (errno != EADDRINUSE)
128 {
129 perror("bind()");
130 freeaddrinfo(ai);
131 }
132 close(sock);
133 continue;
134 }
135
136 if (listen(sock, 1024) == -1)
137 {
138 perror("listen()");
139 close(sock);
140 continue;
141 }
142
143 server_sockets[num_server_sockets++]= sock;
144 }
145
146 freeaddrinfo(ai);
147
148 return (num_server_sockets > 0) ? 0 : 1;
149 }
150
151 /**
152 * Convert a command code to a textual string
153 * @param cmd the comcode to convert
154 * @return a textual string with the command or NULL for unknown commands
155 */
156 static const char* comcode2str(uint8_t cmd)
157 {
158 static const char * const text[] = {
159 "GET", "SET", "ADD", "REPLACE", "DELETE",
160 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
161 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
162 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
163 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
164 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
165 };
166
167 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
168 return text[cmd];
169
170 return NULL;
171 }
172
173 /**
174 * Print out the command we are about to execute
175 */
176 static void pre_execute(const void *cookie __attribute__((unused)),
177 protocol_binary_request_header *header __attribute__((unused)))
178 {
179 if (verbose)
180 {
181 const char *cmd= comcode2str(header->request.opcode);
182 if (cmd != NULL)
183 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
184 else
185 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
186 }
187 }
188
189 /**
190 * Print out the command we just executed
191 */
192 static void post_execute(const void *cookie __attribute__((unused)),
193 protocol_binary_request_header *header __attribute__((unused)))
194 {
195 if (verbose)
196 {
197 const char *cmd= comcode2str(header->request.opcode);
198 if (cmd != NULL)
199 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
200 else
201 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
202 }
203 }
204
205 /**
206 * Callback handler for all unknown commands.
207 * Send an unknown command back to the client
208 */
209 static protocol_binary_response_status unknown(const void *cookie,
210 protocol_binary_request_header *header,
211 memcached_binary_protocol_raw_response_handler response_handler)
212 {
213 protocol_binary_response_no_extras response= {
214 .message= {
215 .header.response= {
216 .magic= PROTOCOL_BINARY_RES,
217 .opcode= header->request.opcode,
218 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
219 .opaque= header->request.opaque
220 }
221 }
222 };
223
224 return response_handler(cookie, header, (void*)&response);
225 }
226
227 static void work(void);
228
229 /**
230 * Program entry point. Bind to the specified port(s) and serve clients
231 *
232 * @param argc number of items in the argument vector
233 * @param argv argument vector
234 * @return 0 on success, 1 otherwise
235 */
236 int main(int argc, char **argv)
237 {
238 int cmd;
239 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
240
241 memset(&global_options, 0, sizeof(global_options));
242
243 /*
244 * We need to initialize the handlers manually due to a bug in the
245 * warnings generated by struct initialization in gcc (all the way up to 4.4)
246 */
247 initialize_interface_v0_handler();
248
249 while ((cmd= getopt(argc, argv, "v1pP:?h")) != EOF)
250 {
251 switch (cmd) {
252 case '1':
253 interface= &interface_v1_impl;
254 break;
255 case 'P':
256 global_options.pid_file= strdup(optarg);
257 break;
258 case 'p':
259 global_options.has_port= true;
260 (void)server_socket(optarg);
261 break;
262 case 'v':
263 verbose= true;
264 break;
265 case 'h': /* FALLTHROUGH */
266 case '?': /* FALLTHROUGH */
267 default:
268 (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
269 return 1;
270 }
271 }
272
273 if (! initialize_storage())
274 {
275 /* Error message already printed */
276 return 1;
277 }
278
279 if (! global_options.has_port)
280 (void)server_socket("9999");
281
282 if (global_options.pid_file)
283 {
284 FILE *pid_file;
285 uint32_t pid;
286
287 pid_file= fopen(global_options.pid_file, "w+");
288
289 if (pid_file == NULL)
290 {
291 perror(strerror(errno));
292 abort();
293 }
294
295 pid= (uint32_t)getpid();
296 fprintf(pid_file, "%u\n", pid);
297 fclose(pid_file);
298 }
299
300 if (num_server_sockets == 0)
301 {
302 fprintf(stderr, "I don't have any server sockets\n");
303 return 1;
304 }
305
306 /*
307 * Create and initialize the handles to the protocol handlers. I want
308 * to be able to trace the traffic throught the pre/post handlers, and
309 * set up a common handler for unknown messages
310 */
311 interface->pre_execute= pre_execute;
312 interface->post_execute= post_execute;
313 interface->unknown= unknown;
314
315 struct memcached_protocol_st *protocol_handle;
316 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
317 {
318 fprintf(stderr, "Failed to allocate protocol handle\n");
319 return 1;
320 }
321
322 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
323 memcached_binary_protocol_set_pedantic(protocol_handle, true);
324
325 for (int xx= 0; xx < num_server_sockets; ++xx)
326 socket_userdata_map[server_sockets[xx]]= protocol_handle;
327
328 /* Serve all of the clients */
329 work();
330
331 /* NOTREACHED */
332 return 0;
333 }
334
335 static void work(void)
336 {
337 #define MAX_SERVERS_TO_POLL 100
338 struct pollfd fds[MAX_SERVERS_TO_POLL];
339 int max_poll;
340
341 for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
342 {
343 fds[max_poll].events= POLLIN;
344 fds[max_poll].revents= 0;
345 fds[max_poll].fd= server_sockets[max_poll];
346 }
347
348 while (true)
349 {
350 int err= poll(fds, (nfds_t)max_poll, -1);
351
352 if (err == 0 || (err == -1 && errno != EINTR))
353 {
354 perror("poll() failed");
355 abort();
356 }
357
358 /* find the available filedescriptors */
359 for (int x= max_poll - 1; x > -1 && err > 0; --x)
360 {
361 if (fds[x].revents != 0)
362 {
363 --err;
364 if (x < num_server_sockets)
365 {
366 /* accept new client */
367 struct sockaddr_storage addr;
368 socklen_t addrlen= sizeof(addr);
369 int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
370 &addrlen);
371
372 if (sock == -1)
373 {
374 perror("Failed to accept client");
375 continue;
376 }
377
378 struct memcached_protocol_st *protocol;
379 protocol= socket_userdata_map[fds[x].fd];
380
381 struct memcached_protocol_client_st* c;
382 c= memcached_protocol_create_client(protocol, sock);
383 if (c == NULL)
384 {
385 fprintf(stderr, "Failed to create client\n");
386 close(sock);
387 }
388 else
389 {
390 socket_userdata_map[sock]= c;
391 fds[max_poll].events= POLLIN;
392 fds[max_poll].revents= 0;
393 fds[max_poll].fd= sock;
394 ++max_poll;
395 }
396 }
397 else
398 {
399 /* drive the client */
400 struct memcached_protocol_client_st* c;
401 c= socket_userdata_map[fds[x].fd];
402 assert(c != NULL);
403 fds[max_poll].events= 0;
404
405 memcached_protocol_event_t events= memcached_protocol_client_work(c);
406 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
407 fds[max_poll].events= POLLOUT;
408
409 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
410 fds[max_poll].events= POLLIN;
411
412 if (!(events & MEMCACHED_PROTOCOL_PAUSE_EVENT ||
413 fds[max_poll].events != 0))
414 {
415 memcached_protocol_client_destroy(c);
416 close(fds[x].fd);
417 fds[x].events= 0;
418
419 if (x != max_poll - 1)
420 memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
421
422 --max_poll;
423 }
424 }
425 }
426 }
427 }
428 }