Merge trunk
[awesomized/libmemcached] / example / memcached_light.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 /**
3 * What is a library without an example to show you how to use the library?
4 * This example use both interfaces to implement a small memcached server.
5 * Please note that this is an exemple on how to use the library, not
6 * an implementation of a scalable memcached server. If you look closely
7 * at the example it isn't even multithreaded ;-)
8 *
9 * With that in mind, let me give you some pointers into the source:
10 * storage.c/h - Implements the item store for this server and not really
11 * interesting for this example.
12 * interface_v0.c - Shows an implementation of the memcached server by using
13 * the "raw" access to the packets as they arrive
14 * interface_v1.c - Shows an implementation of the memcached server by using
15 * the more "logical" interface.
16 * memcached_light.c - This file sets up all of the sockets and run the main
17 * message loop.
18 */
19
20 #include <assert.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <netdb.h>
24 #include <netinet/tcp.h>
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <fcntl.h>
28 #include <errno.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <poll.h>
32
33 #include <libmemcached/protocol_handler.h>
34 #include "common.h"
35 #include "storage.h"
36
37 extern struct memcached_binary_protocol_callback_st interface_v0_impl;
38 extern struct memcached_binary_protocol_callback_st interface_v1_impl;
39
40 static int server_sockets[1024];
41 static int num_server_sockets= 0;
42 static void* socket_userdata_map[1024];
43
44 /**
45 * Create a socket and bind it to a specific port number
46 * @param port the port number to bind to
47 */
48 static int server_socket(const char *port) {
49 struct addrinfo *ai;
50 struct addrinfo hints= { .ai_flags= AI_PASSIVE,
51 .ai_family= AF_UNSPEC,
52 .ai_socktype= SOCK_STREAM };
53
54 int error= getaddrinfo("127.0.0.1", port, &hints, &ai);
55 if (error != 0)
56 {
57 if (error != EAI_SYSTEM)
58 fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));
59 else
60 perror("getaddrinfo()");
61
62 return 1;
63 }
64
65 struct linger ling= {0, 0};
66
67 for (struct addrinfo *next= ai; next; next= next->ai_next)
68 {
69 int sock= socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
70 if (sock == -1)
71 {
72 perror("Failed to create socket");
73 continue;
74 }
75
76 int flags= fcntl(sock, F_GETFL, 0);
77 if (flags == -1)
78 {
79 perror("Failed to get socket flags");
80 close(sock);
81 continue;
82 }
83
84 if ((flags & O_NONBLOCK) != O_NONBLOCK)
85 if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1)
86 {
87 perror("Failed to set socket to nonblocking mode");
88 close(sock);
89 continue;
90 }
91
92 flags= 1;
93 if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)) != 0)
94 perror("Failed to set SO_REUSEADDR");
95
96 if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) != 0)
97 perror("Failed to set SO_KEEPALIVE");
98
99 if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)) != 0)
100 perror("Failed to set SO_LINGER");
101
102 if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) != 0)
103 perror("Failed to set TCP_NODELAY");
104
105 if (bind(sock, next->ai_addr, next->ai_addrlen) == -1)
106 {
107 if (errno != EADDRINUSE)
108 {
109 perror("bind()");
110 freeaddrinfo(ai);
111 }
112 close(sock);
113 continue;
114 }
115
116 if (listen(sock, 1024) == -1)
117 {
118 perror("listen()");
119 close(sock);
120 continue;
121 }
122
123 server_sockets[num_server_sockets++]= sock;
124 }
125
126 freeaddrinfo(ai);
127
128 return (num_server_sockets > 0) ? 0 : 1;
129 }
130
131 /**
132 * Convert a command code to a textual string
133 * @param cmd the comcode to convert
134 * @return a textual string with the command or NULL for unknown commands
135 */
136 static const char* comcode2str(uint8_t cmd)
137 {
138 static const char * const text[] = {
139 "GET", "SET", "ADD", "REPLACE", "DELETE",
140 "INCREMENT", "DECREMENT", "QUIT", "FLUSH",
141 "GETQ", "NOOP", "VERSION", "GETK", "GETKQ",
142 "APPEND", "PREPEND", "STAT", "SETQ", "ADDQ",
143 "REPLACEQ", "DELETEQ", "INCREMENTQ", "DECREMENTQ",
144 "QUITQ", "FLUSHQ", "APPENDQ", "PREPENDQ"
145 };
146
147 if (cmd <= PROTOCOL_BINARY_CMD_PREPENDQ)
148 return text[cmd];
149
150 return NULL;
151 }
152
153 /**
154 * Print out the command we are about to execute
155 */
156 static void pre_execute(const void *cookie, protocol_binary_request_header *header)
157 {
158 const char *cmd= comcode2str(header->request.opcode);
159 if (cmd != NULL)
160 fprintf(stderr, "pre_execute from %p: %s\n", cookie, cmd);
161 else
162 fprintf(stderr, "pre_execute from %p: 0x%02x\n", cookie, header->request.opcode);
163 }
164
165 /**
166 * Print out the command we just executed
167 */
168 static void post_execute(const void *cookie, protocol_binary_request_header *header)
169 {
170 const char *cmd= comcode2str(header->request.opcode);
171 if (cmd != NULL)
172 fprintf(stderr, "post_execute from %p: %s\n", cookie, cmd);
173 else
174 fprintf(stderr, "post_execute from %p: 0x%02x\n", cookie, header->request.opcode);
175 }
176
177 /**
178 * Callback handler for all unknown commands.
179 * Send an unknown command back to the client
180 */
181 static protocol_binary_response_status unknown(const void *cookie,
182 protocol_binary_request_header *header,
183 memcached_binary_protocol_raw_response_handler response_handler)
184 {
185 protocol_binary_response_no_extras response= {
186 .message= {
187 .header.response= {
188 .magic= PROTOCOL_BINARY_RES,
189 .opcode= header->request.opcode,
190 .status= htons(PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND),
191 .opaque= header->request.opaque
192 }
193 }
194 };
195
196 return response_handler(cookie, header, (void*)&response);
197 }
198
199 static void work(void);
200
201 /**
202 * Program entry point. Bind to the specified port(s) and serve clients
203 *
204 * @param argc number of items in the argument vector
205 * @param argv argument vector
206 * @return 0 on success, 1 otherwise
207 */
208 int main(int argc, char **argv)
209 {
210 bool port_specified= false;
211 int cmd;
212 struct memcached_binary_protocol_callback_st *interface= &interface_v0_impl;
213
214 while ((cmd= getopt(argc, argv, "1p:?")) != EOF)
215 {
216 switch (cmd) {
217 case '1':
218 interface= &interface_v1_impl;
219 break;
220 case 'p':
221 port_specified= true;
222 (void)server_socket(optarg);
223 break;
224 case '?': /* FALLTHROUGH */
225 default:
226 (void)fprintf(stderr, "Usage: %s [-p port]\n", argv[0]);
227 return 1;
228 }
229 }
230
231 if (!port_specified)
232 (void)server_socket("9999");
233
234 if (num_server_sockets == 0)
235 {
236 fprintf(stderr, "I don't have any server sockets\n");
237 return 1;
238 }
239
240 /*
241 * Create and initialize the handles to the protocol handlers. I want
242 * to be able to trace the traffic throught the pre/post handlers, and
243 * set up a common handler for unknown messages
244 */
245 interface->pre_execute= pre_execute;
246 interface->post_execute= post_execute;
247 interface->unknown= unknown;
248
249 struct memcached_binary_protocol_st *protocol_handle;
250 if ((protocol_handle= memcached_binary_protocol_create_instance()) == NULL)
251 {
252 fprintf(stderr, "Failed to allocate protocol handle\n");
253 return 1;
254 }
255
256 memcached_binary_protocol_set_callbacks(protocol_handle, interface);
257 memcached_binary_protocol_set_pedantic(protocol_handle, true);
258
259 for (int xx= 0; xx < num_server_sockets; ++xx)
260 socket_userdata_map[server_sockets[xx]]= protocol_handle;
261
262 /* Serve all of the clients */
263 work();
264
265 /* NOTREACHED */
266 return 0;
267 }
268
269 static void work(void) {
270 #define MAX_SERVERS_TO_POLL 100
271 struct pollfd fds[MAX_SERVERS_TO_POLL];
272 int max_poll;
273
274 for (max_poll= 0; max_poll < num_server_sockets; ++max_poll)
275 {
276 fds[max_poll].events= POLLIN;
277 fds[max_poll].revents= 0;
278 fds[max_poll].fd= server_sockets[max_poll];
279 ++max_poll;
280 }
281
282 while (true)
283 {
284 int err= poll(fds, (nfds_t)max_poll, -1);
285
286 if (err == 0 || (err == -1 && errno != EINTR))
287 {
288 perror("poll() failed");
289 abort();
290 }
291
292 /* find the available filedescriptors */
293 for (int x= max_poll - 1; x > -1 && err > 0; --x)
294 {
295 if (fds[x].revents != 0)
296 {
297 --err;
298 if (x < num_server_sockets)
299 {
300 /* accept new client */
301 struct sockaddr_storage addr;
302 socklen_t addrlen= sizeof(addr);
303 int sock= accept(fds[x].fd, (struct sockaddr *)&addr,
304 &addrlen);
305
306 if (sock == -1)
307 {
308 perror("Failed to accept client");
309 continue;
310 }
311
312 struct memcached_binary_protocol_st *protocol;
313 protocol= socket_userdata_map[fds[x].fd];
314
315 struct memcached_binary_protocol_client_st* c;
316 c= memcached_binary_protocol_create_client(protocol, sock);
317 if (c == NULL)
318 {
319 fprintf(stderr, "Failed to create client\n");
320 close(sock);
321 }
322 else
323 {
324 socket_userdata_map[sock]= c;
325 fds[max_poll].events= POLLIN;
326 fds[max_poll].revents= 0;
327 fds[max_poll].fd= sock;
328 ++max_poll;
329 }
330 }
331 else
332 {
333 /* drive the client */
334 struct memcached_binary_protocol_client_st* c;
335 c= socket_userdata_map[fds[x].fd];
336 assert(c != NULL);
337 fds[max_poll].events= 0;
338
339 switch (memcached_binary_protocol_client_work(c))
340 {
341 case WRITE_EVENT:
342 case READ_WRITE_EVENT:
343 fds[max_poll].events= POLLOUT;
344 /* FALLTHROUGH */
345 case READ_EVENT:
346 fds[max_poll].events |= POLLIN;
347 break;
348 case ERROR_EVENT:
349 default: /* ERROR or unknown state.. close */
350 memcached_binary_protocol_client_destroy(c);
351 close(fds[x].fd);
352 fds[x].events= 0;
353
354 if (x != max_poll - 1)
355 memmove(fds + x, fds + x + 1, (size_t)(max_poll - x));
356
357 --max_poll;
358 }
359 }
360 }
361 }
362 }
363 }