Merge Stewart
[awesomized/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include "config.h"
27 #include <assert.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netdb.h>
31 #include <netinet/tcp.h>
32 #include <stdio.h>
33 #include <unistd.h>
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <poll.h>
39
40 #include <libmemcached/protocol_handler.h>
41 #include <libmemcached/byteorder.h>
42 #include "storage.h"
43 #include "memcached_light.h"
44
45 extern memcached_binary_protocol_callback_st interface_v0_impl;
46 extern memcached_binary_protocol_callback_st interface_v1_impl;
47
48 static int server_sockets[1024];
49 static int num_server_sockets= 0;
50 static void* socket_userdata_map[1024];
51 static bool verbose= false;
52
53 /**
54 * Create a socket and bind it to a specific port number
55 * @param port the port number to bind to
56 */
57 static int server_socket(const char *port) {
58 struct addrinfo *ai;
59 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
60 .ai_family= AF_UNSPEC,
61 .ai_socktype= SOCK_STREAM };
62
63 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
64 if (error != 0)
65 {
66 if (error != EAI_SYSTEM)
67 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
68 else
69 perror("getaddrinfo()");
70
71 return 1;
72 }
73
74 struct linger ling= {0, 0};
75
76 for (struct addrinfo *next= ai; next; next= next->ai_next)
77 {
78 int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
79 if (sock == -1)
80 {
81 perror("Failed to create socket");
82 continue;
83 }
84
85 int flags= fcntl(sock, F_GETFL, 0);
86 if (flags == -1)
87 {
88 perror("Failed to get socket flags");
89 close(sock);
90 continue;
91 }
92
93 if ((flags & O_NONBLOCK) != O_NONBLOCK)
94 {
95 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
96 {
97 perror("Failed to set socket to nonblocking mode");
98 close(sock);
99 continue;
100 }
101 }
102
103 flags= 1;
104 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
105 perror("Failed to set SO_REUSEADDR");
106
107 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
108 perror("Failed to set SO_KEEPALIVE");
109
110 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
111 perror("Failed to set SO_LINGER");
112
113 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
114 perror("Failed to set TCP_NODELAY");
115
116 if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
117 {
118 if (errno != EADDRINUSE)
119 {
120 perror("bind()");
121 freeaddrinfo(ai);
122 }
123 close(sock);
124 continue;
125 }
126
127 if (listen(sock, 1024) == -1)
128 {
129 perror("listen()");
130 close(sock);
131 continue;
132 }
133
134 server_sockets[num_server_sockets++]= sock;
135 }
136
137 freeaddrinfo(ai);
138
139 return (num_server_sockets > 0) ? 0 : 1;
140 }
141
142 /**
143 * Convert a command code to a textual string
144 * @param cmd the comcode to convert
145 * @return a textual string with the command or NULL for unknown commands
146 */
147 static const char* comcode2str(uint8_t cmd)
148 {
149 static const char * const text[] = {
150 "GET", "SET", "ADD", "REPLACE", "DELETE",
151 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
152 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
153 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
154 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
155 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
156 };
157
158 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
159 return text[cmd];
160
161 return NULL;
162 }
163
164 /**
165 * Print out the command we are about to execute
166 */
167 static void pre_execute(const void *cookie __attribute__((unused)),
168 protocol_binary_request_header *header __attribute__((unused)))
169 {
170 if (verbose)
171 {
172 const char *cmd= comcode2str(header->request.opcode);
173 if (cmd != NULL)
174 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
175 else
176 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
177 }
178 }
179
180 /**
181 * Print out the command we just executed
182 */
183 static void post_execute(const void *cookie __attribute__((unused)),
184 protocol_binary_request_header *header __attribute__((unused)))
185 {
186 if (verbose)
187 {
188 const char *cmd= comcode2str(header->request.opcode);
189 if (cmd != NULL)
190 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
191 else
192 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
193 }
194 }
195
196 /**
197 * Callback handler for all unknown commands.
198 * Send an unknown command back to the client
199 */
200 static protocol_binary_response_status unknown(const void *cookie,
201 protocol_binary_request_header *header,
202 memcached_binary_protocol_raw_response_handler response_handler)
203 {
204 protocol_binary_response_no_extras response= {
205 .message= {
206 .header.response= {
207 .magic= PROTOCOL_BINARY_RES,
208 .opcode= header->request.opcode,
209 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
210 .opaque= header->request.opaque
211 }
212 }
213 };
214
215 return response_handler(cookie, header, (void*)&response);
216 }
217
218 static void work(void);
219
220 /**
221 * Program entry point. Bind to the specified port(s) and serve clients
222 *
223 * @param argc number of items in the argument vector
224 * @param argv argument vector
225 * @return 0 on success, 1 otherwise
226 */
227 int main(int argc, char **argv)
228 {
229 bool port_specified= false;
230 int cmd;
231 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
232
233 /*
234 * We need to initialize the handlers manually due to a bug in the
235 * warnings generated by struct initialization in gcc (all the way up to 4.4)
236 */
237 initialize_interface_v0_handler();
238
239 while ((cmd= getopt(argc, argv, "v1p:?")) != EOF)
240 {
241 switch (cmd) {
242 case '1':
243 interface= &interface_v1_impl;
244 break;
245 case 'p':
246 port_specified= true;
247 (void)server_socket(optarg);
248 break;
249 case 'v':
250 verbose= true;
251 break;
252 case '?': /* FALLTHROUGH */
253 default:
254 (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
255 return 1;
256 }
257 }
258
259 if (!initialize_storage())
260 {
261 /* Error message already printed */
262 return 1;
263 }
264
265 if (!port_specified)
266 (void)server_socket("9999");
267
268 if (num_server_sockets == 0)
269 {
270 fprintf(stderr, "I don't have any server sockets\n");
271 return 1;
272 }
273
274 /*
275 * Create and initialize the handles to the protocol handlers. I want
276 * to be able to trace the traffic throught the pre/post handlers, and
277 * set up a common handler for unknown messages
278 */
279 interface->pre_execute= pre_execute;
280 interface->post_execute= post_execute;
281 interface->unknown= unknown;
282
283 struct memcached_protocol_st *protocol_handle;
284 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
285 {
286 fprintf(stderr, "Failed to allocate protocol handle\n");
287 return 1;
288 }
289
290 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
291 memcached_binary_protocol_set_pedantic(protocol_handle, true);
292
293 for (int xx= 0; xx < num_server_sockets; ++xx)
294 socket_userdata_map[server_sockets[xx]]= protocol_handle;
295
296 /* Serve all of the clients */
297 work();
298
299 /* NOTREACHED */
300 return 0;
301 }
302
303 static void work(void)
304 {
305 #define MAX_SERVERS_TO_POLL 100
306 struct pollfd fds[MAX_SERVERS_TO_POLL];
307 int max_poll;
308
309 for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
310 {
311 fds[max_poll].events= POLLIN;
312 fds[max_poll].revents= 0;
313 fds[max_poll].fd= server_sockets[max_poll];
314 }
315
316 while (true)
317 {
318 int err= poll(fds, (nfds_t)max_poll, -1);
319
320 if (err == 0 || (err == -1 && errno != EINTR))
321 {
322 perror("poll() failed");
323 abort();
324 }
325
326 /* find the available filedescriptors */
327 for (int x= max_poll - 1; x > -1 && err > 0; --x)
328 {
329 if (fds[x].revents != 0)
330 {
331 --err;
332 if (x < num_server_sockets)
333 {
334 /* accept new client */
335 struct sockaddr_storage addr;
336 socklen_t addrlen= sizeof(addr);
337 int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
338 &addrlen);
339
340 if (sock == -1)
341 {
342 perror("Failed to accept client");
343 continue;
344 }
345
346 struct memcached_protocol_st *protocol;
347 protocol= socket_userdata_map[fds[x].fd];
348
349 struct memcached_protocol_client_st* c;
350 c= memcached_protocol_create_client(protocol, sock);
351 if (c == NULL)
352 {
353 fprintf(stderr, "Failed to create client\n");
354 close(sock);
355 }
356 else
357 {
358 socket_userdata_map[sock]= c;
359 fds[max_poll].events= POLLIN;
360 fds[max_poll].revents= 0;
361 fds[max_poll].fd= sock;
362 ++max_poll;
363 }
364 }
365 else
366 {
367 /* drive the client */
368 struct memcached_protocol_client_st* c;
369 c= socket_userdata_map[fds[x].fd];
370 assert(c != NULL);
371 fds[max_poll].events= 0;
372
373 memcached_protocol_event_t events= memcached_protocol_client_work(c);
374 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
375 fds[max_poll].events= POLLOUT;
376
377 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
378 fds[max_poll].events= POLLIN;
379
380 if (!(events & MEMCACHED_PROTOCOL_PAUSE_EVENT ||
381 fds[max_poll].events != 0))
382 {
383 memcached_protocol_client_destroy(c);
384 close(fds[x].fd);
385 fds[x].events= 0;
386
387 if (x != max_poll - 1)
388 memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
389
390 --max_poll;
391 }
392 }
393 }
394 }
395 }
396 }