udp support in fire and forget mode for all ops but get/gets, stat and version
[m6w6/libmemcached] / libmemcached / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9 #include "common.h"
10 #include "memcached_io.h"
11
12 typedef enum {
13 SET_OP,
14 REPLACE_OP,
15 ADD_OP,
16 PREPEND_OP,
17 APPEND_OP,
18 CAS_OP,
19 } memcached_storage_action;
20
21 /* Inline this */
22 static char *storage_op_string(memcached_storage_action verb)
23 {
24 switch (verb)
25 {
26 case SET_OP:
27 return "set";
28 case REPLACE_OP:
29 return "replace";
30 case ADD_OP:
31 return "add";
32 case PREPEND_OP:
33 return "prepend";
34 case APPEND_OP:
35 return "append";
36 case CAS_OP:
37 return "cas";
38 default:
39 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
40 };
41
42 /* NOTREACHED */
43 }
44
45 static memcached_return memcached_send_binary(memcached_server_st* server,
46 const char *key,
47 size_t key_length,
48 const char *value,
49 size_t value_length,
50 time_t expiration,
51 uint32_t flags,
52 uint64_t cas,
53 memcached_storage_action verb);
54
55 static inline memcached_return memcached_send(memcached_st *ptr,
56 const char *master_key, size_t master_key_length,
57 const char *key, size_t key_length,
58 const char *value, size_t value_length,
59 time_t expiration,
60 uint32_t flags,
61 uint64_t cas,
62 memcached_storage_action verb)
63 {
64 char to_write;
65 size_t write_length;
66 ssize_t sent_length;
67 memcached_return rc;
68 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
69 unsigned int server_key;
70
71 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
72
73 rc= memcached_validate_key_length(key_length, ptr->flags & MEM_BINARY_PROTOCOL);
74 unlikely (rc != MEMCACHED_SUCCESS)
75 return rc;
76
77 unlikely (ptr->number_of_hosts == 0)
78 return MEMCACHED_NO_SERVERS;
79
80 if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
81 return MEMCACHED_BAD_KEY_PROVIDED;
82
83 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
84
85 if (ptr->flags & MEM_BINARY_PROTOCOL)
86 return memcached_send_binary(&ptr->hosts[server_key], key, key_length,
87 value, value_length, expiration,
88 flags, cas, verb);
89
90 if (cas)
91 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
92 "%s %s%.*s %u %llu %zu %llu%s\r\n",
93 storage_op_string(verb),
94 ptr->prefix_key,
95 (int)key_length, key, flags,
96 (unsigned long long)expiration, value_length,
97 (unsigned long long)cas,
98 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
99 else
100 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
101 "%s %s%.*s %u %llu %zu%s\r\n",
102 storage_op_string(verb),
103 ptr->prefix_key,
104 (int)key_length, key, flags,
105 (unsigned long long)expiration, value_length,
106 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
107
108 if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS) {
109 size_t cmd_size= write_length + value_length + 2;
110 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
111 return MEMCACHED_WRITE_FAILURE;
112 if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
113 memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
114 }
115
116 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
117 {
118 rc= MEMCACHED_WRITE_FAILURE;
119 goto error;
120 }
121
122 /* Send command header */
123 rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
124 if (rc != MEMCACHED_SUCCESS)
125 goto error;
126
127 /* Send command body */
128 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
129 {
130 rc= MEMCACHED_WRITE_FAILURE;
131 goto error;
132 }
133
134 if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
135 to_write= 0;
136 else
137 to_write= 1;
138
139 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
140 {
141 rc= MEMCACHED_WRITE_FAILURE;
142 goto error;
143 }
144
145 if (ptr->flags & MEM_NOREPLY)
146 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
147
148 if (to_write == 0)
149 return MEMCACHED_BUFFERED;
150
151 rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
152
153 if (rc == MEMCACHED_STORED)
154 return MEMCACHED_SUCCESS;
155 else
156 return rc;
157
158 error:
159 memcached_io_reset(&ptr->hosts[server_key]);
160
161 return rc;
162 }
163
164
165 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
166 const char *value, size_t value_length,
167 time_t expiration,
168 uint32_t flags)
169 {
170 memcached_return rc;
171 LIBMEMCACHED_MEMCACHED_SET_START();
172 rc= memcached_send(ptr, key, key_length,
173 key, key_length, value, value_length,
174 expiration, flags, 0, SET_OP);
175 LIBMEMCACHED_MEMCACHED_SET_END();
176 return rc;
177 }
178
179 memcached_return memcached_add(memcached_st *ptr,
180 const char *key, size_t key_length,
181 const char *value, size_t value_length,
182 time_t expiration,
183 uint32_t flags)
184 {
185 memcached_return rc;
186 LIBMEMCACHED_MEMCACHED_ADD_START();
187 rc= memcached_send(ptr, key, key_length,
188 key, key_length, value, value_length,
189 expiration, flags, 0, ADD_OP);
190 LIBMEMCACHED_MEMCACHED_ADD_END();
191 return rc;
192 }
193
194 memcached_return memcached_replace(memcached_st *ptr,
195 const char *key, size_t key_length,
196 const char *value, size_t value_length,
197 time_t expiration,
198 uint32_t flags)
199 {
200 memcached_return rc;
201 LIBMEMCACHED_MEMCACHED_REPLACE_START();
202 rc= memcached_send(ptr, key, key_length,
203 key, key_length, value, value_length,
204 expiration, flags, 0, REPLACE_OP);
205 LIBMEMCACHED_MEMCACHED_REPLACE_END();
206 return rc;
207 }
208
209 memcached_return memcached_prepend(memcached_st *ptr,
210 const char *key, size_t key_length,
211 const char *value, size_t value_length,
212 time_t expiration,
213 uint32_t flags)
214 {
215 memcached_return rc;
216 rc= memcached_send(ptr, key, key_length,
217 key, key_length, value, value_length,
218 expiration, flags, 0, PREPEND_OP);
219 return rc;
220 }
221
222 memcached_return memcached_append(memcached_st *ptr,
223 const char *key, size_t key_length,
224 const char *value, size_t value_length,
225 time_t expiration,
226 uint32_t flags)
227 {
228 memcached_return rc;
229 rc= memcached_send(ptr, key, key_length,
230 key, key_length, value, value_length,
231 expiration, flags, 0, APPEND_OP);
232 return rc;
233 }
234
235 memcached_return memcached_cas(memcached_st *ptr,
236 const char *key, size_t key_length,
237 const char *value, size_t value_length,
238 time_t expiration,
239 uint32_t flags,
240 uint64_t cas)
241 {
242 memcached_return rc;
243 rc= memcached_send(ptr, key, key_length,
244 key, key_length, value, value_length,
245 expiration, flags, cas, CAS_OP);
246 return rc;
247 }
248
249 memcached_return memcached_set_by_key(memcached_st *ptr,
250 const char *master_key __attribute__((unused)),
251 size_t master_key_length __attribute__((unused)),
252 const char *key, size_t key_length,
253 const char *value, size_t value_length,
254 time_t expiration,
255 uint32_t flags)
256 {
257 memcached_return rc;
258 LIBMEMCACHED_MEMCACHED_SET_START();
259 rc= memcached_send(ptr, master_key, master_key_length,
260 key, key_length, value, value_length,
261 expiration, flags, 0, SET_OP);
262 LIBMEMCACHED_MEMCACHED_SET_END();
263 return rc;
264 }
265
266 memcached_return memcached_add_by_key(memcached_st *ptr,
267 const char *master_key, size_t master_key_length,
268 const char *key, size_t key_length,
269 const char *value, size_t value_length,
270 time_t expiration,
271 uint32_t flags)
272 {
273 memcached_return rc;
274 LIBMEMCACHED_MEMCACHED_ADD_START();
275 rc= memcached_send(ptr, master_key, master_key_length,
276 key, key_length, value, value_length,
277 expiration, flags, 0, ADD_OP);
278 LIBMEMCACHED_MEMCACHED_ADD_END();
279 return rc;
280 }
281
282 memcached_return memcached_replace_by_key(memcached_st *ptr,
283 const char *master_key, size_t master_key_length,
284 const char *key, size_t key_length,
285 const char *value, size_t value_length,
286 time_t expiration,
287 uint32_t flags)
288 {
289 memcached_return rc;
290 LIBMEMCACHED_MEMCACHED_REPLACE_START();
291 rc= memcached_send(ptr, master_key, master_key_length,
292 key, key_length, value, value_length,
293 expiration, flags, 0, REPLACE_OP);
294 LIBMEMCACHED_MEMCACHED_REPLACE_END();
295 return rc;
296 }
297
298 memcached_return memcached_prepend_by_key(memcached_st *ptr,
299 const char *master_key, size_t master_key_length,
300 const char *key, size_t key_length,
301 const char *value, size_t value_length,
302 time_t expiration,
303 uint32_t flags)
304 {
305 memcached_return rc;
306 rc= memcached_send(ptr, master_key, master_key_length,
307 key, key_length, value, value_length,
308 expiration, flags, 0, PREPEND_OP);
309 return rc;
310 }
311
312 memcached_return memcached_append_by_key(memcached_st *ptr,
313 const char *master_key, size_t master_key_length,
314 const char *key, size_t key_length,
315 const char *value, size_t value_length,
316 time_t expiration,
317 uint32_t flags)
318 {
319 memcached_return rc;
320 rc= memcached_send(ptr, master_key, master_key_length,
321 key, key_length, value, value_length,
322 expiration, flags, 0, APPEND_OP);
323 return rc;
324 }
325
326 memcached_return memcached_cas_by_key(memcached_st *ptr,
327 const char *master_key, size_t master_key_length,
328 const char *key, size_t key_length,
329 const char *value, size_t value_length,
330 time_t expiration,
331 uint32_t flags,
332 uint64_t cas)
333 {
334 memcached_return rc;
335 rc= memcached_send(ptr, master_key, master_key_length,
336 key, key_length, value, value_length,
337 expiration, flags, cas, CAS_OP);
338 return rc;
339 }
340
341 static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply) {
342 uint8_t ret;
343
344 if (noreply)
345 switch (verb)
346 {
347 case SET_OP:
348 ret=PROTOCOL_BINARY_CMD_SETQ;
349 break;
350 case ADD_OP:
351 ret=PROTOCOL_BINARY_CMD_ADDQ;
352 break;
353 case CAS_OP: /* FALLTHROUGH */
354 case REPLACE_OP:
355 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
356 break;
357 case APPEND_OP:
358 ret=PROTOCOL_BINARY_CMD_APPENDQ;
359 break;
360 case PREPEND_OP:
361 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
362 break;
363 }
364 else
365 switch (verb)
366 {
367 case SET_OP:
368 ret=PROTOCOL_BINARY_CMD_SET;
369 break;
370 case ADD_OP:
371 ret=PROTOCOL_BINARY_CMD_ADD;
372 break;
373 case CAS_OP: /* FALLTHROUGH */
374 case REPLACE_OP:
375 ret=PROTOCOL_BINARY_CMD_REPLACE;
376 break;
377 case APPEND_OP:
378 ret=PROTOCOL_BINARY_CMD_APPEND;
379 break;
380 case PREPEND_OP:
381 ret=PROTOCOL_BINARY_CMD_PREPEND;
382 break;
383 }
384
385 return ret;
386 }
387
388
389
390 static memcached_return memcached_send_binary(memcached_server_st* server,
391 const char *key,
392 size_t key_length,
393 const char *value,
394 size_t value_length,
395 time_t expiration,
396 uint32_t flags,
397 uint64_t cas,
398 memcached_storage_action verb)
399 {
400 char flush;
401 protocol_binary_request_set request= {.bytes= {0}};
402 size_t send_length= sizeof(request.bytes);
403 bool noreply= server->root->flags & MEM_NOREPLY;
404
405 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
406 request.message.header.request.opcode= get_com_code(verb, noreply);
407 request.message.header.request.keylen= htons((uint16_t)key_length);
408 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
409 if (verb == APPEND_OP || verb == PREPEND_OP)
410 send_length -= 8; /* append & prepend does not contain extras! */
411 else
412 {
413 request.message.header.request.extlen= 8;
414 request.message.body.flags= htonl(flags);
415 request.message.body.expiration= htonl((uint32_t)expiration);
416 }
417
418 request.message.header.request.bodylen= htonl(key_length + value_length +
419 request.message.header.request.extlen);
420
421 if (cas)
422 request.message.header.request.cas= htonll(cas);
423
424 flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
425
426 if ((server->root->flags & MEM_USE_UDP) && !flush)
427 {
428 size_t cmd_size= send_length + key_length + value_length;
429 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
430 return MEMCACHED_WRITE_FAILURE;
431 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
432 memcached_io_write(server,NULL,0, 1);
433 }
434
435 /* write the header */
436 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
437 (memcached_io_write(server, key, key_length, 0) == -1) ||
438 (memcached_io_write(server, value, value_length, flush) == -1))
439 {
440 memcached_io_reset(server);
441 return MEMCACHED_WRITE_FAILURE;
442 }
443
444 if (flush == 0)
445 return MEMCACHED_BUFFERED;
446
447 if (noreply)
448 return MEMCACHED_SUCCESS;
449
450 return memcached_response(server, NULL, 0, NULL);
451 }
452