Updating AUTHORS file.
[awesomized/libmemcached] / libmemcached / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9 #include "common.h"
10 #include "memcached_io.h"
11
12 typedef enum {
13 SET_OP,
14 REPLACE_OP,
15 ADD_OP,
16 PREPEND_OP,
17 APPEND_OP,
18 CAS_OP,
19 } memcached_storage_action;
20
21 /* Inline this */
22 static const char *storage_op_string(memcached_storage_action verb)
23 {
24 switch (verb)
25 {
26 case SET_OP:
27 return "set ";
28 case REPLACE_OP:
29 return "replace ";
30 case ADD_OP:
31 return "add ";
32 case PREPEND_OP:
33 return "prepend ";
34 case APPEND_OP:
35 return "append ";
36 case CAS_OP:
37 return "cas ";
38 default:
39 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
40 }
41
42 /* NOTREACHED */
43 }
44
45 static memcached_return memcached_send_binary(memcached_st *ptr,
46 const char *master_key,
47 size_t master_key_length,
48 const char *key,
49 size_t key_length,
50 const char *value,
51 size_t value_length,
52 time_t expiration,
53 uint32_t flags,
54 uint64_t cas,
55 memcached_storage_action verb);
56
57 static inline memcached_return memcached_send(memcached_st *ptr,
58 const char *master_key, size_t master_key_length,
59 const char *key, size_t key_length,
60 const char *value, size_t value_length,
61 time_t expiration,
62 uint32_t flags,
63 uint64_t cas,
64 memcached_storage_action verb)
65 {
66 char to_write;
67 size_t write_length;
68 ssize_t sent_length;
69 memcached_return rc;
70 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
71 unsigned int server_key;
72
73 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
74
75 rc= memcached_validate_key_length(key_length, ptr->flags & MEM_BINARY_PROTOCOL);
76 unlikely (rc != MEMCACHED_SUCCESS)
77 return rc;
78
79 unlikely (ptr->number_of_hosts == 0)
80 return MEMCACHED_NO_SERVERS;
81
82 if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
83 return MEMCACHED_BAD_KEY_PROVIDED;
84
85 if (ptr->flags & MEM_BINARY_PROTOCOL)
86 return memcached_send_binary(ptr, master_key, master_key_length,
87 key, key_length,
88 value, value_length, expiration,
89 flags, cas, verb);
90
91 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
92
93 if (cas)
94 write_length= (size_t) snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
95 "%s %s%.*s %u %llu %zu %llu%s\r\n",
96 storage_op_string(verb),
97 ptr->prefix_key,
98 (int)key_length, key, flags,
99 (unsigned long long)expiration, value_length,
100 (unsigned long long)cas,
101 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
102 else
103 {
104 char *buffer_ptr= buffer;
105 const char *command= storage_op_string(verb);
106
107 /* Copy in the command, no space needed, we handle that in the command function*/
108 memcpy(buffer_ptr, command, strlen(command));
109
110 /* Copy in the key prefix, switch to the buffer_ptr */
111 buffer_ptr= memcpy(buffer_ptr + strlen(command) , ptr->prefix_key, strlen(ptr->prefix_key));
112
113 /* Copy in the key, adjust point if a key prefix was used. */
114 buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key ? strlen(ptr->prefix_key) : 0),
115 key, key_length);
116 buffer_ptr+= key_length;
117 buffer_ptr[0]= ' ';
118 buffer_ptr++;
119
120 write_length= (size_t)(buffer_ptr - buffer);
121 write_length+= (size_t) snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE,
122 "%u %llu %zu%s\r\n",
123 flags,
124 (unsigned long long)expiration, value_length,
125 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
126 }
127
128 if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS)
129 {
130 size_t cmd_size= write_length + value_length + 2;
131 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
132 return MEMCACHED_WRITE_FAILURE;
133 if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
134 memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
135 }
136
137 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
138 {
139 rc= MEMCACHED_WRITE_FAILURE;
140 goto error;
141 }
142
143 /* Send command header */
144 rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
145 if (rc != MEMCACHED_SUCCESS)
146 goto error;
147
148 /* Send command body */
149 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
150 {
151 rc= MEMCACHED_WRITE_FAILURE;
152 goto error;
153 }
154
155 if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
156 to_write= 0;
157 else
158 to_write= 1;
159
160 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
161 {
162 rc= MEMCACHED_WRITE_FAILURE;
163 goto error;
164 }
165
166 if (ptr->flags & MEM_NOREPLY)
167 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
168
169 if (to_write == 0)
170 return MEMCACHED_BUFFERED;
171
172 rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
173
174 if (rc == MEMCACHED_STORED)
175 return MEMCACHED_SUCCESS;
176 else
177 return rc;
178
179 error:
180 memcached_io_reset(&ptr->hosts[server_key]);
181
182 return rc;
183 }
184
185
186 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
187 const char *value, size_t value_length,
188 time_t expiration,
189 uint32_t flags)
190 {
191 memcached_return rc;
192 LIBMEMCACHED_MEMCACHED_SET_START();
193 rc= memcached_send(ptr, key, key_length,
194 key, key_length, value, value_length,
195 expiration, flags, 0, SET_OP);
196 LIBMEMCACHED_MEMCACHED_SET_END();
197 return rc;
198 }
199
200 memcached_return memcached_add(memcached_st *ptr,
201 const char *key, size_t key_length,
202 const char *value, size_t value_length,
203 time_t expiration,
204 uint32_t flags)
205 {
206 memcached_return rc;
207 LIBMEMCACHED_MEMCACHED_ADD_START();
208 rc= memcached_send(ptr, key, key_length,
209 key, key_length, value, value_length,
210 expiration, flags, 0, ADD_OP);
211 LIBMEMCACHED_MEMCACHED_ADD_END();
212 return rc;
213 }
214
215 memcached_return memcached_replace(memcached_st *ptr,
216 const char *key, size_t key_length,
217 const char *value, size_t value_length,
218 time_t expiration,
219 uint32_t flags)
220 {
221 memcached_return rc;
222 LIBMEMCACHED_MEMCACHED_REPLACE_START();
223 rc= memcached_send(ptr, key, key_length,
224 key, key_length, value, value_length,
225 expiration, flags, 0, REPLACE_OP);
226 LIBMEMCACHED_MEMCACHED_REPLACE_END();
227 return rc;
228 }
229
230 memcached_return memcached_prepend(memcached_st *ptr,
231 const char *key, size_t key_length,
232 const char *value, size_t value_length,
233 time_t expiration,
234 uint32_t flags)
235 {
236 memcached_return rc;
237 rc= memcached_send(ptr, key, key_length,
238 key, key_length, value, value_length,
239 expiration, flags, 0, PREPEND_OP);
240 return rc;
241 }
242
243 memcached_return memcached_append(memcached_st *ptr,
244 const char *key, size_t key_length,
245 const char *value, size_t value_length,
246 time_t expiration,
247 uint32_t flags)
248 {
249 memcached_return rc;
250 rc= memcached_send(ptr, key, key_length,
251 key, key_length, value, value_length,
252 expiration, flags, 0, APPEND_OP);
253 return rc;
254 }
255
256 memcached_return memcached_cas(memcached_st *ptr,
257 const char *key, size_t key_length,
258 const char *value, size_t value_length,
259 time_t expiration,
260 uint32_t flags,
261 uint64_t cas)
262 {
263 memcached_return rc;
264 rc= memcached_send(ptr, key, key_length,
265 key, key_length, value, value_length,
266 expiration, flags, cas, CAS_OP);
267 return rc;
268 }
269
270 memcached_return memcached_set_by_key(memcached_st *ptr,
271 const char *master_key __attribute__((unused)),
272 size_t master_key_length __attribute__((unused)),
273 const char *key, size_t key_length,
274 const char *value, size_t value_length,
275 time_t expiration,
276 uint32_t flags)
277 {
278 memcached_return rc;
279 LIBMEMCACHED_MEMCACHED_SET_START();
280 rc= memcached_send(ptr, master_key, master_key_length,
281 key, key_length, value, value_length,
282 expiration, flags, 0, SET_OP);
283 LIBMEMCACHED_MEMCACHED_SET_END();
284 return rc;
285 }
286
287 memcached_return memcached_add_by_key(memcached_st *ptr,
288 const char *master_key, size_t master_key_length,
289 const char *key, size_t key_length,
290 const char *value, size_t value_length,
291 time_t expiration,
292 uint32_t flags)
293 {
294 memcached_return rc;
295 LIBMEMCACHED_MEMCACHED_ADD_START();
296 rc= memcached_send(ptr, master_key, master_key_length,
297 key, key_length, value, value_length,
298 expiration, flags, 0, ADD_OP);
299 LIBMEMCACHED_MEMCACHED_ADD_END();
300 return rc;
301 }
302
303 memcached_return memcached_replace_by_key(memcached_st *ptr,
304 const char *master_key, size_t master_key_length,
305 const char *key, size_t key_length,
306 const char *value, size_t value_length,
307 time_t expiration,
308 uint32_t flags)
309 {
310 memcached_return rc;
311 LIBMEMCACHED_MEMCACHED_REPLACE_START();
312 rc= memcached_send(ptr, master_key, master_key_length,
313 key, key_length, value, value_length,
314 expiration, flags, 0, REPLACE_OP);
315 LIBMEMCACHED_MEMCACHED_REPLACE_END();
316 return rc;
317 }
318
319 memcached_return memcached_prepend_by_key(memcached_st *ptr,
320 const char *master_key, size_t master_key_length,
321 const char *key, size_t key_length,
322 const char *value, size_t value_length,
323 time_t expiration,
324 uint32_t flags)
325 {
326 memcached_return rc;
327 rc= memcached_send(ptr, master_key, master_key_length,
328 key, key_length, value, value_length,
329 expiration, flags, 0, PREPEND_OP);
330 return rc;
331 }
332
333 memcached_return memcached_append_by_key(memcached_st *ptr,
334 const char *master_key, size_t master_key_length,
335 const char *key, size_t key_length,
336 const char *value, size_t value_length,
337 time_t expiration,
338 uint32_t flags)
339 {
340 memcached_return rc;
341 rc= memcached_send(ptr, master_key, master_key_length,
342 key, key_length, value, value_length,
343 expiration, flags, 0, APPEND_OP);
344 return rc;
345 }
346
347 memcached_return memcached_cas_by_key(memcached_st *ptr,
348 const char *master_key, size_t master_key_length,
349 const char *key, size_t key_length,
350 const char *value, size_t value_length,
351 time_t expiration,
352 uint32_t flags,
353 uint64_t cas)
354 {
355 memcached_return rc;
356 rc= memcached_send(ptr, master_key, master_key_length,
357 key, key_length, value, value_length,
358 expiration, flags, cas, CAS_OP);
359 return rc;
360 }
361
362 static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply)
363 {
364 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
365 * be used uninitialized in this function. FAIL */
366 uint8_t ret= 0;
367
368 if (noreply)
369 switch (verb)
370 {
371 case SET_OP:
372 ret=PROTOCOL_BINARY_CMD_SETQ;
373 break;
374 case ADD_OP:
375 ret=PROTOCOL_BINARY_CMD_ADDQ;
376 break;
377 case CAS_OP: /* FALLTHROUGH */
378 case REPLACE_OP:
379 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
380 break;
381 case APPEND_OP:
382 ret=PROTOCOL_BINARY_CMD_APPENDQ;
383 break;
384 case PREPEND_OP:
385 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
386 break;
387 default:
388 WATCHPOINT_ASSERT(verb);
389 break;
390 }
391 else
392 switch (verb)
393 {
394 case SET_OP:
395 ret=PROTOCOL_BINARY_CMD_SET;
396 break;
397 case ADD_OP:
398 ret=PROTOCOL_BINARY_CMD_ADD;
399 break;
400 case CAS_OP: /* FALLTHROUGH */
401 case REPLACE_OP:
402 ret=PROTOCOL_BINARY_CMD_REPLACE;
403 break;
404 case APPEND_OP:
405 ret=PROTOCOL_BINARY_CMD_APPEND;
406 break;
407 case PREPEND_OP:
408 ret=PROTOCOL_BINARY_CMD_PREPEND;
409 break;
410 default:
411 WATCHPOINT_ASSERT(verb);
412 break;
413 }
414
415 return ret;
416 }
417
418
419
420 static memcached_return memcached_send_binary(memcached_st *ptr,
421 const char *master_key,
422 size_t master_key_length,
423 const char *key,
424 size_t key_length,
425 const char *value,
426 size_t value_length,
427 time_t expiration,
428 uint32_t flags,
429 uint64_t cas,
430 memcached_storage_action verb)
431 {
432 uint8_t flush;
433 protocol_binary_request_set request= {.bytes= {0}};
434 size_t send_length= sizeof(request.bytes);
435 uint32_t server_key= memcached_generate_hash(ptr, master_key,
436 master_key_length);
437 memcached_server_st *server= &ptr->hosts[server_key];
438 bool noreply= server->root->flags & MEM_NOREPLY;
439
440 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
441 request.message.header.request.opcode= get_com_code(verb, noreply);
442 request.message.header.request.keylen= htons((uint16_t)key_length);
443 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
444 if (verb == APPEND_OP || verb == PREPEND_OP)
445 send_length -= 8; /* append & prepend does not contain extras! */
446 else
447 {
448 request.message.header.request.extlen= 8;
449 request.message.body.flags= htonl(flags);
450 request.message.body.expiration= htonl((uint32_t)expiration);
451 }
452
453 request.message.header.request.bodylen= htonl((uint32_t) (key_length + value_length +
454 request.message.header.request.extlen));
455
456 if (cas)
457 request.message.header.request.cas= htonll(cas);
458
459 flush= (uint8_t) (((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1);
460
461 if ((server->root->flags & MEM_USE_UDP) && !flush)
462 {
463 size_t cmd_size= send_length + key_length + value_length;
464 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
465 return MEMCACHED_WRITE_FAILURE;
466 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
467 memcached_io_write(server,NULL,0, 1);
468 }
469
470 /* write the header */
471 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
472 (memcached_io_write(server, key, key_length, 0) == -1) ||
473 (memcached_io_write(server, value, value_length, (char) flush) == -1))
474 {
475 memcached_io_reset(server);
476 return MEMCACHED_WRITE_FAILURE;
477 }
478
479 unlikely (verb == SET_OP && ptr->number_of_replicas > 0)
480 {
481 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
482
483 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
484 {
485 ++server_key;
486 if (server_key == ptr->number_of_hosts)
487 server_key= 0;
488
489 memcached_server_st *srv= &ptr->hosts[server_key];
490 if ((memcached_do(srv, (const char*)request.bytes,
491 send_length, 0) != MEMCACHED_SUCCESS) ||
492 (memcached_io_write(srv, key, key_length, 0) == -1) ||
493 (memcached_io_write(srv, value, value_length, (char) flush) == -1))
494 memcached_io_reset(srv);
495 else
496 memcached_server_response_decrement(srv);
497 }
498 }
499
500 if (flush == 0)
501 return MEMCACHED_BUFFERED;
502
503 if (noreply)
504 return MEMCACHED_SUCCESS;
505
506 return memcached_response(server, NULL, 0, NULL);
507 }
508