Merge Trond's protocol work.
[awesomized/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 */
19
20 #include <assert.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <netdb.h>
24 #include <netinet/tcp.h>
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <errno.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <poll.h>
32
33 #include <libmemcached/protocol_handler.h>
34 #include <libmemcached/byteorder.h>
35 #include "storage.h"
36
37 extern struct memcached_binary_protocol_callback_st interface_v0_impl;
38 extern struct memcached_binary_protocol_callback_st interface_v1_impl;
39
40 static int server_sockets[1024];
41 static int num_server_sockets= 0;
42 static void* socket_userdata_map[1024];
43 static bool verbose= false;
44
45 /**
46 * Create a socket and bind it to a specific port number
47 * @param port the port number to bind to
48 */
49 static int server_socket(const char *port) {
50 struct addrinfo *ai;
51 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
52 .ai_family= AF_UNSPEC,
53 .ai_socktype= SOCK_STREAM };
54
55 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
56 if (error != 0)
57 {
58 if (error != EAI_SYSTEM)
59 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
60 else
61 perror("getaddrinfo()");
62
63 return 1;
64 }
65
66 struct linger ling= {0, 0};
67
68 for (struct addrinfo *next= ai; next; next= next->ai_next)
69 {
70 int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
71 if (sock == -1)
72 {
73 perror("Failed to create socket");
74 continue;
75 }
76
77 int flags= fcntl(sock, F_GETFL, 0);
78 if (flags == -1)
79 {
80 perror("Failed to get socket flags");
81 close(sock);
82 continue;
83 }
84
85 if ((flags & O_NONBLOCK) != O_NONBLOCK)
86 {
87 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
88 {
89 perror("Failed to set socket to nonblocking mode");
90 close(sock);
91 continue;
92 }
93 }
94
95 flags= 1;
96 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
97 perror("Failed to set SO_REUSEADDR");
98
99 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
100 perror("Failed to set SO_KEEPALIVE");
101
102 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
103 perror("Failed to set SO_LINGER");
104
105 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
106 perror("Failed to set TCP_NODELAY");
107
108 if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
109 {
110 if (errno != EADDRINUSE)
111 {
112 perror("bind()");
113 freeaddrinfo(ai);
114 }
115 close(sock);
116 continue;
117 }
118
119 if (listen(sock, 1024) == -1)
120 {
121 perror("listen()");
122 close(sock);
123 continue;
124 }
125
126 server_sockets[num_server_sockets++]= sock;
127 }
128
129 freeaddrinfo(ai);
130
131 return (num_server_sockets > 0) ? 0 : 1;
132 }
133
134 /**
135 * Convert a command code to a textual string
136 * @param cmd the comcode to convert
137 * @return a textual string with the command or NULL for unknown commands
138 */
139 static const char* comcode2str(uint8_t cmd)
140 {
141 static const char * const text[] = {
142 "GET", "SET", "ADD", "REPLACE", "DELETE",
143 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
144 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
145 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
146 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
147 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
148 };
149
150 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
151 return text[cmd];
152
153 return NULL;
154 }
155
156 /**
157 * Print out the command we are about to execute
158 */
159 static void pre_execute(const void *cookie __attribute__((unused)),
160 protocol_binary_request_header *header __attribute__((unused)))
161 {
162 if (verbose)
163 {
164 const char *cmd= comcode2str(header->request.opcode);
165 if (cmd != NULL)
166 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
167 else
168 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
169 }
170 }
171
172 /**
173 * Print out the command we just executed
174 */
175 static void post_execute(const void *cookie __attribute__((unused)),
176 protocol_binary_request_header *header __attribute__((unused)))
177 {
178 if (verbose)
179 {
180 const char *cmd= comcode2str(header->request.opcode);
181 if (cmd != NULL)
182 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
183 else
184 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
185 }
186 }
187
188 /**
189 * Callback handler for all unknown commands.
190 * Send an unknown command back to the client
191 */
192 static protocol_binary_response_status unknown(const void *cookie,
193 protocol_binary_request_header *header,
194 memcached_binary_protocol_raw_response_handler response_handler)
195 {
196 protocol_binary_response_no_extras response= {
197 .message= {
198 .header.response= {
199 .magic= PROTOCOL_BINARY_RES,
200 .opcode= header->request.opcode,
201 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
202 .opaque= header->request.opaque
203 }
204 }
205 };
206
207 return response_handler(cookie, header, (void*)&response);
208 }
209
210 static void work(void);
211
212 /**
213 * Program entry point. Bind to the specified port(s) and serve clients
214 *
215 * @param argc number of items in the argument vector
216 * @param argv argument vector
217 * @return 0 on success, 1 otherwise
218 */
219 int main(int argc, char **argv)
220 {
221 bool port_specified= false;
222 int cmd;
223 struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
224
225 while ((cmd= getopt(argc, argv, "v1p:?")) != EOF)
226 {
227 switch (cmd) {
228 case '1':
229 interface= &interface_v1_impl;
230 break;
231 case 'p':
232 port_specified= true;
233 (void)server_socket(optarg);
234 break;
235 case 'v':
236 verbose= true;
237 break;
238 case '?': /* FALLTHROUGH */
239 default:
240 (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
241 return 1;
242 }
243 }
244
245 if (!initialize_storage())
246 {
247 /* Error message already printed */
248 return 1;
249 }
250
251 if (!port_specified)
252 (void)server_socket("9999");
253
254 if (num_server_sockets == 0)
255 {
256 fprintf(stderr, "I don't have any server sockets\n");
257 return 1;
258 }
259
260 /*
261 * Create and initialize the handles to the protocol handlers. I want
262 * to be able to trace the traffic throught the pre/post handlers, and
263 * set up a common handler for unknown messages
264 */
265 interface->pre_execute= pre_execute;
266 interface->post_execute= post_execute;
267 interface->unknown= unknown;
268
269 struct memcached_protocol_st *protocol_handle;
270 if ((protocol_handle= memcached_protocol_create_instance()) == NULL)
271 {
272 fprintf(stderr, "Failed to allocate protocol handle\n");
273 return 1;
274 }
275
276 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
277 memcached_binary_protocol_set_pedantic(protocol_handle, true);
278
279 for (int xx= 0; xx < num_server_sockets; ++xx)
280 socket_userdata_map[server_sockets[xx]]= protocol_handle;
281
282 /* Serve all of the clients */
283 work();
284
285 /* NOTREACHED */
286 return 0;
287 }
288
289 static void work(void)
290 {
291 #define MAX_SERVERS_TO_POLL 100
292 struct pollfd fds[MAX_SERVERS_TO_POLL];
293 int max_poll;
294
295 for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
296 {
297 fds[max_poll].events= POLLIN;
298 fds[max_poll].revents= 0;
299 fds[max_poll].fd= server_sockets[max_poll];
300 ++max_poll;
301 }
302
303 while (true)
304 {
305 int err= poll(fds, (nfds_t)max_poll, -1);
306
307 if (err == 0 || (err == -1 && errno != EINTR))
308 {
309 perror("poll() failed");
310 abort();
311 }
312
313 /* find the available filedescriptors */
314 for (int x= max_poll - 1; x > -1 && err > 0; --x)
315 {
316 if (fds[x].revents != 0)
317 {
318 --err;
319 if (x < num_server_sockets)
320 {
321 /* accept new client */
322 struct sockaddr_storage addr;
323 socklen_t addrlen= sizeof(addr);
324 int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
325 &addrlen);
326
327 if (sock == -1)
328 {
329 perror("Failed to accept client");
330 continue;
331 }
332
333 struct memcached_protocol_st *protocol;
334 protocol= socket_userdata_map[fds[x].fd];
335
336 struct memcached_protocol_client_st* c;
337 c= memcached_protocol_create_client(protocol, sock);
338 if (c == NULL)
339 {
340 fprintf(stderr, "Failed to create client\n");
341 close(sock);
342 }
343 else
344 {
345 socket_userdata_map[sock]= c;
346 fds[max_poll].events= POLLIN;
347 fds[max_poll].revents= 0;
348 fds[max_poll].fd= sock;
349 ++max_poll;
350 }
351 }
352 else
353 {
354 /* drive the client */
355 struct memcached_protocol_client_st* c;
356 c= socket_userdata_map[fds[x].fd];
357 assert(c != NULL);
358 fds[max_poll].events= 0;
359
360 switch (memcached_protocol_client_work(c))
361 {
362 case WRITE_EVENT:
363 case READ_WRITE_EVENT:
364 fds[max_poll].events= POLLOUT;
365 /* FALLTHROUGH */
366 case READ_EVENT:
367 fds[max_poll].events |= POLLIN;
368 break;
369 case ERROR_EVENT:
370 default: /* ERROR or unknown state.. close */
371 memcached_protocol_client_destroy(c);
372 close(fds[x].fd);
373 fds[x].events= 0;
374
375 if (x != max_poll - 1)
376 memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
377
378 --max_poll;
379 }
380 }
381 }
382 }
383 }
384 }