Add an example using innodb as a storage (NOTE: this version currently leaks memorybz...
[awesomized/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 */
19
20 #include <assert.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <netdb.h>
24 #include <netinet/tcp.h>
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <errno.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <poll.h>
32
33 #include <libmemcached/protocol_handler.h>
34 #include <libmemcached/byteorder.h>
35 #include "storage.h"
36
37 extern struct memcached_binary_protocol_callback_st interface_v0_impl;
38 extern struct memcached_binary_protocol_callback_st interface_v1_impl;
39
40 static int server_sockets[1024];
41 static int num_server_sockets= 0;
42 static void* socket_userdata_map[1024];
43 static bool verbose= false;
44
45 /**
46 * Create a socket and bind it to a specific port number
47 * @param port the port number to bind to
48 */
49 static int server_socket(const char *port) {
50 struct addrinfo *ai;
51 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
52 .ai_family= AF_UNSPEC,
53 .ai_socktype= SOCK_STREAM };
54
55 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
56 if (error != 0)
57 {
58 if (error != EAI_SYSTEM)
59 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
60 else
61 perror("getaddrinfo()");
62
63 return 1;
64 }
65
66 struct linger ling= {0, 0};
67
68 for (struct addrinfo *next= ai; next; next= next->ai_next)
69 {
70 int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
71 if (sock == -1)
72 {
73 perror("Failed to create socket");
74 continue;
75 }
76
77 int flags= fcntl(sock, F_GETFL, 0);
78 if (flags == -1)
79 {
80 perror("Failed to get socket flags");
81 close(sock);
82 continue;
83 }
84
85 if ((flags & O_NONBLOCK) != O_NONBLOCK)
86 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
87 {
88 perror("Failed to set socket to nonblocking mode");
89 close(sock);
90 continue;
91 }
92
93 flags= 1;
94 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
95 perror("Failed to set SO_REUSEADDR");
96
97 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
98 perror("Failed to set SO_KEEPALIVE");
99
100 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
101 perror("Failed to set SO_LINGER");
102
103 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
104 perror("Failed to set TCP_NODELAY");
105
106 if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
107 {
108 if (errno != EADDRINUSE)
109 {
110 perror("bind()");
111 freeaddrinfo(ai);
112 }
113 close(sock);
114 continue;
115 }
116
117 if (listen(sock, 1024) == -1)
118 {
119 perror("listen()");
120 close(sock);
121 continue;
122 }
123
124 server_sockets[num_server_sockets++]= sock;
125 }
126
127 freeaddrinfo(ai);
128
129 return (num_server_sockets > 0) ? 0 : 1;
130 }
131
132 /**
133 * Convert a command code to a textual string
134 * @param cmd the comcode to convert
135 * @return a textual string with the command or NULL for unknown commands
136 */
137 static const char* comcode2str(uint8_t cmd)
138 {
139 static const char * const text[] = {
140 "GET", "SET", "ADD", "REPLACE", "DELETE",
141 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
142 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
143 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
144 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
145 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
146 };
147
148 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
149 return text[cmd];
150
151 return NULL;
152 }
153
154 /**
155 * Print out the command we are about to execute
156 */
157 static void pre_execute(const void *cookie __attribute__((unused)),
158 protocol_binary_request_header *header __attribute__((unused)))
159 {
160 if (verbose)
161 {
162 const char *cmd= comcode2str(header->request.opcode);
163 if (cmd != NULL)
164 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
165 else
166 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
167 }
168 }
169
170 /**
171 * Print out the command we just executed
172 */
173 static void post_execute(const void *cookie __attribute__((unused)),
174 protocol_binary_request_header *header __attribute__((unused)))
175 {
176 if (verbose)
177 {
178 const char *cmd= comcode2str(header->request.opcode);
179 if (cmd != NULL)
180 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
181 else
182 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
183 }
184 }
185
186 /**
187 * Callback handler for all unknown commands.
188 * Send an unknown command back to the client
189 */
190 static protocol_binary_response_status unknown(const void *cookie,
191 protocol_binary_request_header *header,
192 memcached_binary_protocol_raw_response_handler response_handler)
193 {
194 protocol_binary_response_no_extras response= {
195 .message= {
196 .header.response= {
197 .magic= PROTOCOL_BINARY_RES,
198 .opcode= header->request.opcode,
199 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
200 .opaque= header->request.opaque
201 }
202 }
203 };
204
205 return response_handler(cookie, header, (void*)&response);
206 }
207
208 static void work(void);
209
210 /**
211 * Program entry point. Bind to the specified port(s) and serve clients
212 *
213 * @param argc number of items in the argument vector
214 * @param argv argument vector
215 * @return 0 on success, 1 otherwise
216 */
217 int main(int argc, char **argv)
218 {
219 bool port_specified= false;
220 int cmd;
221 struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
222
223 while ((cmd= getopt(argc, argv, "v1p:?")) != EOF)
224 {
225 switch (cmd) {
226 case '1':
227 interface= &interface_v1_impl;
228 break;
229 case 'p':
230 port_specified= true;
231 (void)server_socket(optarg);
232 break;
233 case 'v':
234 verbose= true;
235 break;
236 case '?': /* FALLTHROUGH */
237 default:
238 (void)fprintf(stderr, "Usage: %s [-p port] [-v] [-1]\n", argv[0]);
239 return 1;
240 }
241 }
242
243 if (!initialize_storage())
244 {
245 /* Error message already printed */
246 return 1;
247 }
248
249 if (!port_specified)
250 (void)server_socket("9999");
251
252 if (num_server_sockets == 0)
253 {
254 fprintf(stderr, "I don't have any server sockets\n");
255 return 1;
256 }
257
258 /*
259 * Create and initialize the handles to the protocol handlers. I want
260 * to be able to trace the traffic throught the pre/post handlers, and
261 * set up a common handler for unknown messages
262 */
263 interface->pre_execute= pre_execute;
264 interface->post_execute= post_execute;
265 interface->unknown= unknown;
266
267 struct memcached_binary_protocol_st *protocol_handle;
268 if ((protocol_handle= memcached_binary_protocol_create_instance()) == NULL)
269 {
270 fprintf(stderr, "Failed to allocate protocol handle\n");
271 return 1;
272 }
273
274 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
275 memcached_binary_protocol_set_pedantic(protocol_handle, true);
276
277 for (int xx= 0; xx < num_server_sockets; ++xx)
278 socket_userdata_map[server_sockets[xx]]= protocol_handle;
279
280 /* Serve all of the clients */
281 work();
282
283 /* NOTREACHED */
284 return 0;
285 }
286
287 static void work(void) {
288 #define MAX_SERVERS_TO_POLL 100
289 struct pollfd fds[MAX_SERVERS_TO_POLL];
290 int max_poll;
291
292 for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
293 {
294 fds[max_poll].events= POLLIN;
295 fds[max_poll].revents= 0;
296 fds[max_poll].fd= server_sockets[max_poll];
297 ++max_poll;
298 }
299
300 while (true)
301 {
302 int err= poll(fds, (nfds_t)max_poll, -1);
303
304 if (err == 0 || (err == -1 && errno != EINTR))
305 {
306 perror("poll() failed");
307 abort();
308 }
309
310 /* find the available filedescriptors */
311 for (int x= max_poll - 1; x > -1 && err > 0; --x)
312 {
313 if (fds[x].revents != 0)
314 {
315 --err;
316 if (x < num_server_sockets)
317 {
318 /* accept new client */
319 struct sockaddr_storage addr;
320 socklen_t addrlen= sizeof(addr);
321 int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
322 &addrlen);
323
324 if (sock == -1)
325 {
326 perror("Failed to accept client");
327 continue;
328 }
329
330 struct memcached_binary_protocol_st *protocol;
331 protocol= socket_userdata_map[fds[x].fd];
332
333 struct memcached_binary_protocol_client_st* c;
334 c= memcached_binary_protocol_create_client(protocol, sock);
335 if (c == NULL)
336 {
337 fprintf(stderr, "Failed to create client\n");
338 close(sock);
339 }
340 else
341 {
342 socket_userdata_map[sock]= c;
343 fds[max_poll].events= POLLIN;
344 fds[max_poll].revents= 0;
345 fds[max_poll].fd= sock;
346 ++max_poll;
347 }
348 }
349 else
350 {
351 /* drive the client */
352 struct memcached_binary_protocol_client_st* c;
353 c= socket_userdata_map[fds[x].fd];
354 assert(c != NULL);
355 fds[max_poll].events= 0;
356
357 switch (memcached_binary_protocol_client_work(c))
358 {
359 case WRITE_EVENT:
360 case READ_WRITE_EVENT:
361 fds[max_poll].events= POLLOUT;
362 /* FALLTHROUGH */
363 case READ_EVENT:
364 fds[max_poll].events |= POLLIN;
365 break;
366 case ERROR_EVENT:
367 default: /* ERROR or unknown state.. close */
368 memcached_binary_protocol_client_destroy(c);
369 close(fds[x].fd);
370 fds[x].events= 0;
371
372 if (x != max_poll - 1)
373 memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
374
375 --max_poll;
376 }
377 }
378 }
379 }
380 }
381 }