Update protocol due to review comments:
[awesomized/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 *
19 *
20 * config.h is included so that I can use the ntohll/htonll on platforms that
21 * doesn't have that (this is a private function inside libmemcached, so you
22 * cannot use it directly from libmemcached without special modifications to
23 * the library)
24 */
25
26 #include "config.h"
27 #include <assert.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <netdb.h>
31 #include <netinet/tcp.h>
32 #include <stdio.h>
33 #include <unistd.h>
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <poll.h>
39
40 #include <libmemcached/protocol_handler.h>
41 #include <libmemcached/byteorder.h>
42 #include "storage.h"
43
44 extern memcached_binary_protocol_callback_st interface_v0_impl;
45 extern memcached_binary_protocol_callback_st interface_v1_impl;
46
47 static int server_sockets[1024];
48 static int num_server_sockets= 0;
49 static void* socket_userdata_map[1024];
50 static bool verbose= false;
51
52 /**
53 * Create a socket and bind it to a specific port number
54 * @param port the port number to bind to
55 */
56 static int server_socket(const char *port) {
57 struct addrinfo *ai;
58 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
59 .ai_family= AF_UNSPEC,
60 .ai_socktype= SOCK_STREAM };
61
62 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
63 if (error != 0)
64 {
65 if (error != EAI_SYSTEM)
66 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
67 else
68 perror("getaddrinfo()");
69
70 return 1;
71 }
72
73 struct linger ling= {0, 0};
74
75 for (struct addrinfo *next= ai; next; next= next->ai_next)
76 {
77 int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
78 if (sock == -1)
79 {
80 perror("Failed to create socket");
81 continue;
82 }
83
84 int flags= fcntl(sock, F_GETFL, 0);
85 if (flags == -1)
86 {
87 perror("Failed to get socket flags");
88 close(sock);
89 continue;
90 }
91
92 if ((flags & O_NONBLOCK) != O_NONBLOCK)
93 {
94 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
95 {
96 perror("Failed to set socket to nonblocking mode");
97 close(sock);
98 continue;
99 }
100 }
101
102 flags= 1;
103 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
104 perror("Failed to set SO_REUSEADDR");
105
106 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
107 perror("Failed to set SO_KEEPALIVE");
108
109 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
110 perror("Failed to set SO_LINGER");
111
112 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
113 perror("Failed to set TCP_NODELAY");
114
115 if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
116 {
117 if (errno != EADDRINUSE)
118 {
119 perror("bind()");
120 freeaddrinfo(ai);
121 }
122 close(sock);
123 continue;
124 }
125
126 if (listen(sock, 1024) == -1)
127 {
128 perror("listen()");
129 close(sock);
130 continue;
131 }
132
133 server_sockets[num_server_sockets++]= sock;
134 }
135
136 freeaddrinfo(ai);
137
138 return (num_server_sockets > 0) ? 0 : 1;
139 }
140
141 /**
142 * Convert a command code to a textual string
143 * @param cmd the comcode to convert
144 * @return a textual string with the command or NULL for unknown commands
145 */
146 static const char* comcode2str(uint8_t cmd)
147 {
148 static const char * const text[] = {
149 "GET", "SET", "ADD", "REPLACE", "DELETE",
150 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
151 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
152 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
153 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
154 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
155 };
156
157 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
158 return text[cmd];
159
160 return NULL;
161 }
162
163 /**
164 * Print out the command we are about to execute
165 */
166 static void pre_execute(const void *cookie __attribute__((unused)),
167 protocol_binary_request_header *header __attribute__((unused)))
168 {
169 if (verbose)
170 {
171 const char *cmd= comcode2str(header->request.opcode);
172 if (cmd != NULL)
173 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
174 else
175 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
176 }
177 }
178
179 /**
180 * Print out the command we just executed
181 */
182 static void post_execute(const void *cookie __attribute__((unused)),
183 protocol_binary_request_header *header __attribute__((unused)))
184 {
185 if (verbose)
186 {
187 const char *cmd= comcode2str(header->request.opcode);
188 if (cmd != NULL)
189 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
190 else
191 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
192 }
193 }
194
195 /**
196 * Callback handler for all unknown commands.
197 * Send an unknown command back to the client
198 */
199 static protocol_binary_response_status unknown(const void *cookie,
200 protocol_binary_request_header *header,
201 memcached_binary_protocol_raw_response_handler response_handler)
202 {
203 protocol_binary_response_no_extras response= {
204 .message= {
205 .header.response= {
206 .magic= PROTOCOL_BINARY_RES,
207 .opcode= header->request.opcode,
208 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
209 .opaque= header->request.opaque
210 }
211 }
212 };
213
214 return response_handler(cookie, header, (void*)&response);
215 }
216
217 static void work(void);
218
219 /**
220 * Program entry point. Bind to the specified port(s) and serve clients
221 *
222 * @param argc number of items in the argument vector
223 * @param argv argument vector
224 * @return 0 on success, 1 otherwise
225 */
226 int main(int argc, char **argv)
227 {
228 bool port_specified= false;
229 int cmd;
230 memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
231
232 while ((cmd= getopt(argc, argv, "v1p:?")) != EOF)
233 {
234 switch (cmd) {
235 case '1':
236 interface= &interface_v1_impl;
237 break;
238 case 'p':
239 port_specified= true;
240 (void)server_socket(optarg);
241 break;
242 case 'v':
243 verbose= true;
244 break;
245 case '?': /* FALLTHROUGH */
246 default:
247 (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
248 return 1;
249 }
250 }
251
252 if (!initialize_storage())
253 {
254 /* Error message already printed */
255 return 1;
256 }
257
258 if (!port_specified)
259 (void)server_socket("9999");
260
261 if (num_server_sockets == 0)
262 {
263 fprintf(stderr, "I don't have any server sockets\n");
264 return 1;
265 }
266
267 /*
268 * Create and initialize the handles to the protocol handlers. I want
269 * to be able to trace the traffic throught the pre/post handlers, and
270 * set up a common handler for unknown messages
271 */
272 interface->pre_execute= pre_execute;
273 interface->post_execute= post_execute;
274 interface->unknown= unknown;
275
276 struct memcached_protocol_st *protocol_handle;
277 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
278 {
279 fprintf(stderr, "Failed to allocate protocol handle\n");
280 return 1;
281 }
282
283 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
284 memcached_binary_protocol_set_pedantic(protocol_handle, true);
285
286 for (int xx= 0; xx < num_server_sockets; ++xx)
287 socket_userdata_map[server_sockets[xx]]= protocol_handle;
288
289 /* Serve all of the clients */
290 work();
291
292 /* NOTREACHED */
293 return 0;
294 }
295
296 static void work(void)
297 {
298 #define MAX_SERVERS_TO_POLL 100
299 struct pollfd fds[MAX_SERVERS_TO_POLL];
300 int max_poll;
301
302 for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
303 {
304 fds[max_poll].events= POLLIN;
305 fds[max_poll].revents= 0;
306 fds[max_poll].fd= server_sockets[max_poll];
307 ++max_poll;
308 }
309
310 while (true)
311 {
312 int err= poll(fds, (nfds_t)max_poll, -1);
313
314 if (err == 0 || (err == -1 && errno != EINTR))
315 {
316 perror("poll() failed");
317 abort();
318 }
319
320 /* find the available filedescriptors */
321 for (int x= max_poll - 1; x > -1 && err > 0; --x)
322 {
323 if (fds[x].revents != 0)
324 {
325 --err;
326 if (x < num_server_sockets)
327 {
328 /* accept new client */
329 struct sockaddr_storage addr;
330 socklen_t addrlen= sizeof(addr);
331 int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
332 &addrlen);
333
334 if (sock == -1)
335 {
336 perror("Failed to accept client");
337 continue;
338 }
339
340 struct memcached_protocol_st *protocol;
341 protocol= socket_userdata_map[fds[x].fd];
342
343 struct memcached_protocol_client_st* c;
344 c= memcached_protocol_create_client(protocol, sock);
345 if (c == NULL)
346 {
347 fprintf(stderr, "Failed to create client\n");
348 close(sock);
349 }
350 else
351 {
352 socket_userdata_map[sock]= c;
353 fds[max_poll].events= POLLIN;
354 fds[max_poll].revents= 0;
355 fds[max_poll].fd= sock;
356 ++max_poll;
357 }
358 }
359 else
360 {
361 /* drive the client */
362 struct memcached_protocol_client_st* c;
363 c= socket_userdata_map[fds[x].fd];
364 assert(c != NULL);
365 fds[max_poll].events= 0;
366
367 memcached_protocol_event_t events= memcached_protocol_client_work(c);
368 if (events & MEMCACHED_PROTOCOL_WRITE_EVENT)
369 fds[max_poll].events= POLLOUT;
370
371 if (events & MEMCACHED_PROTOCOL_READ_EVENT)
372 fds[max_poll].events= POLLIN;
373
374 if (!(events & MEMCACHED_PROTOCOL_PAUSE_EVENT ||
375 fds[max_poll].events != 0))
376 {
377 memcached_protocol_client_destroy(c);
378 close(fds[x].fd);
379 fds[x].events= 0;
380
381 if (x != max_poll - 1)
382 memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
383
384 --max_poll;
385 }
386 }
387 }
388 }
389 }
390 }