fe8024d305e0e72bf9b3b9f0c334e2ebe2b8a890
[awesomized/libmemcached] / libmemcached / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9 #include "common.h"
10 #include "memcached_io.h"
11
12 typedef enum {
13 SET_OP,
14 REPLACE_OP,
15 ADD_OP,
16 PREPEND_OP,
17 APPEND_OP,
18 CAS_OP,
19 } memcached_storage_action;
20
21 /* Inline this */
22 static char *storage_op_string(memcached_storage_action verb)
23 {
24 switch (verb)
25 {
26 case SET_OP:
27 return "set ";
28 case REPLACE_OP:
29 return "replace ";
30 case ADD_OP:
31 return "add ";
32 case PREPEND_OP:
33 return "prepend ";
34 case APPEND_OP:
35 return "append ";
36 case CAS_OP:
37 return "cas ";
38 default:
39 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
40 };
41
42 /* NOTREACHED */
43 }
44
45 static memcached_return memcached_send_binary(memcached_st *ptr,
46 const char *master_key,
47 size_t master_key_length,
48 const char *key,
49 size_t key_length,
50 const char *value,
51 size_t value_length,
52 time_t expiration,
53 uint32_t flags,
54 uint64_t cas,
55 memcached_storage_action verb);
56
57 static inline memcached_return memcached_send(memcached_st *ptr,
58 const char *master_key, size_t master_key_length,
59 const char *key, size_t key_length,
60 const char *value, size_t value_length,
61 time_t expiration,
62 uint32_t flags,
63 uint64_t cas,
64 memcached_storage_action verb)
65 {
66 char to_write;
67 size_t write_length;
68 ssize_t sent_length;
69 memcached_return rc;
70 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
71 unsigned int server_key;
72
73 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
74
75 rc= memcached_validate_key_length(key_length, ptr->flags & MEM_BINARY_PROTOCOL);
76 unlikely (rc != MEMCACHED_SUCCESS)
77 return rc;
78
79 unlikely (ptr->number_of_hosts == 0)
80 return MEMCACHED_NO_SERVERS;
81
82 if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
83 return MEMCACHED_BAD_KEY_PROVIDED;
84
85 if (ptr->flags & MEM_BINARY_PROTOCOL)
86 return memcached_send_binary(ptr, master_key, master_key_length,
87 key, key_length,
88 value, value_length, expiration,
89 flags, cas, verb);
90
91 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
92
93 if (cas)
94 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
95 "%s %s%.*s %u %llu %zu %llu%s\r\n",
96 storage_op_string(verb),
97 ptr->prefix_key,
98 (int)key_length, key, flags,
99 (unsigned long long)expiration, value_length,
100 (unsigned long long)cas,
101 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
102 else
103 {
104 char *buffer_ptr= buffer;
105 const char *command= storage_op_string(verb);
106
107 /* Copy in the command, no space needed, we handle that in the command function*/
108 memcpy(buffer_ptr, command, strlen(command));
109
110 /* Copy in the key prefix, switch to the buffer_ptr */
111 buffer_ptr= memcpy(buffer_ptr + strlen(command) , ptr->prefix_key, strlen(ptr->prefix_key));
112
113 /* Copy in the key, adjust point if a key prefix was used. */
114 buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key ? strlen(ptr->prefix_key) : 0),
115 key, key_length);
116 buffer_ptr+= key_length;
117 buffer_ptr[0]= ' ';
118 buffer_ptr++;
119
120 write_length= (size_t)(buffer_ptr - buffer);
121 write_length+= snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE,
122 "%u %llu %zu%s\r\n",
123 flags,
124 (unsigned long long)expiration, value_length,
125 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
126 }
127
128 if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS)
129 {
130 size_t cmd_size= write_length + value_length + 2;
131 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
132 return MEMCACHED_WRITE_FAILURE;
133 if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
134 memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
135 }
136
137 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
138 {
139 rc= MEMCACHED_WRITE_FAILURE;
140 goto error;
141 }
142
143 /* Send command header */
144 rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
145 if (rc != MEMCACHED_SUCCESS)
146 goto error;
147
148 /* Send command body */
149 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
150 {
151 rc= MEMCACHED_WRITE_FAILURE;
152 goto error;
153 }
154
155 if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
156 to_write= 0;
157 else
158 to_write= 1;
159
160 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
161 {
162 rc= MEMCACHED_WRITE_FAILURE;
163 goto error;
164 }
165
166 if (ptr->flags & MEM_NOREPLY)
167 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
168
169 if (to_write == 0)
170 return MEMCACHED_BUFFERED;
171
172 rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
173
174 if (rc == MEMCACHED_STORED)
175 return MEMCACHED_SUCCESS;
176 else
177 return rc;
178
179 error:
180 memcached_io_reset(&ptr->hosts[server_key]);
181
182 return rc;
183 }
184
185
186 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
187 const char *value, size_t value_length,
188 time_t expiration,
189 uint32_t flags)
190 {
191 memcached_return rc;
192 LIBMEMCACHED_MEMCACHED_SET_START();
193 rc= memcached_send(ptr, key, key_length,
194 key, key_length, value, value_length,
195 expiration, flags, 0, SET_OP);
196 LIBMEMCACHED_MEMCACHED_SET_END();
197 return rc;
198 }
199
200 memcached_return memcached_add(memcached_st *ptr,
201 const char *key, size_t key_length,
202 const char *value, size_t value_length,
203 time_t expiration,
204 uint32_t flags)
205 {
206 memcached_return rc;
207 LIBMEMCACHED_MEMCACHED_ADD_START();
208 rc= memcached_send(ptr, key, key_length,
209 key, key_length, value, value_length,
210 expiration, flags, 0, ADD_OP);
211 LIBMEMCACHED_MEMCACHED_ADD_END();
212 return rc;
213 }
214
215 memcached_return memcached_replace(memcached_st *ptr,
216 const char *key, size_t key_length,
217 const char *value, size_t value_length,
218 time_t expiration,
219 uint32_t flags)
220 {
221 memcached_return rc;
222 LIBMEMCACHED_MEMCACHED_REPLACE_START();
223 rc= memcached_send(ptr, key, key_length,
224 key, key_length, value, value_length,
225 expiration, flags, 0, REPLACE_OP);
226 LIBMEMCACHED_MEMCACHED_REPLACE_END();
227 return rc;
228 }
229
230 memcached_return memcached_prepend(memcached_st *ptr,
231 const char *key, size_t key_length,
232 const char *value, size_t value_length,
233 time_t expiration,
234 uint32_t flags)
235 {
236 memcached_return rc;
237 rc= memcached_send(ptr, key, key_length,
238 key, key_length, value, value_length,
239 expiration, flags, 0, PREPEND_OP);
240 return rc;
241 }
242
243 memcached_return memcached_append(memcached_st *ptr,
244 const char *key, size_t key_length,
245 const char *value, size_t value_length,
246 time_t expiration,
247 uint32_t flags)
248 {
249 memcached_return rc;
250 rc= memcached_send(ptr, key, key_length,
251 key, key_length, value, value_length,
252 expiration, flags, 0, APPEND_OP);
253 return rc;
254 }
255
256 memcached_return memcached_cas(memcached_st *ptr,
257 const char *key, size_t key_length,
258 const char *value, size_t value_length,
259 time_t expiration,
260 uint32_t flags,
261 uint64_t cas)
262 {
263 memcached_return rc;
264 rc= memcached_send(ptr, key, key_length,
265 key, key_length, value, value_length,
266 expiration, flags, cas, CAS_OP);
267 return rc;
268 }
269
270 memcached_return memcached_set_by_key(memcached_st *ptr,
271 const char *master_key __attribute__((unused)),
272 size_t master_key_length __attribute__((unused)),
273 const char *key, size_t key_length,
274 const char *value, size_t value_length,
275 time_t expiration,
276 uint32_t flags)
277 {
278 memcached_return rc;
279 LIBMEMCACHED_MEMCACHED_SET_START();
280 rc= memcached_send(ptr, master_key, master_key_length,
281 key, key_length, value, value_length,
282 expiration, flags, 0, SET_OP);
283 LIBMEMCACHED_MEMCACHED_SET_END();
284 return rc;
285 }
286
287 memcached_return memcached_add_by_key(memcached_st *ptr,
288 const char *master_key, size_t master_key_length,
289 const char *key, size_t key_length,
290 const char *value, size_t value_length,
291 time_t expiration,
292 uint32_t flags)
293 {
294 memcached_return rc;
295 LIBMEMCACHED_MEMCACHED_ADD_START();
296 rc= memcached_send(ptr, master_key, master_key_length,
297 key, key_length, value, value_length,
298 expiration, flags, 0, ADD_OP);
299 LIBMEMCACHED_MEMCACHED_ADD_END();
300 return rc;
301 }
302
303 memcached_return memcached_replace_by_key(memcached_st *ptr,
304 const char *master_key, size_t master_key_length,
305 const char *key, size_t key_length,
306 const char *value, size_t value_length,
307 time_t expiration,
308 uint32_t flags)
309 {
310 memcached_return rc;
311 LIBMEMCACHED_MEMCACHED_REPLACE_START();
312 rc= memcached_send(ptr, master_key, master_key_length,
313 key, key_length, value, value_length,
314 expiration, flags, 0, REPLACE_OP);
315 LIBMEMCACHED_MEMCACHED_REPLACE_END();
316 return rc;
317 }
318
319 memcached_return memcached_prepend_by_key(memcached_st *ptr,
320 const char *master_key, size_t master_key_length,
321 const char *key, size_t key_length,
322 const char *value, size_t value_length,
323 time_t expiration,
324 uint32_t flags)
325 {
326 memcached_return rc;
327 rc= memcached_send(ptr, master_key, master_key_length,
328 key, key_length, value, value_length,
329 expiration, flags, 0, PREPEND_OP);
330 return rc;
331 }
332
333 memcached_return memcached_append_by_key(memcached_st *ptr,
334 const char *master_key, size_t master_key_length,
335 const char *key, size_t key_length,
336 const char *value, size_t value_length,
337 time_t expiration,
338 uint32_t flags)
339 {
340 memcached_return rc;
341 rc= memcached_send(ptr, master_key, master_key_length,
342 key, key_length, value, value_length,
343 expiration, flags, 0, APPEND_OP);
344 return rc;
345 }
346
347 memcached_return memcached_cas_by_key(memcached_st *ptr,
348 const char *master_key, size_t master_key_length,
349 const char *key, size_t key_length,
350 const char *value, size_t value_length,
351 time_t expiration,
352 uint32_t flags,
353 uint64_t cas)
354 {
355 memcached_return rc;
356 rc= memcached_send(ptr, master_key, master_key_length,
357 key, key_length, value, value_length,
358 expiration, flags, cas, CAS_OP);
359 return rc;
360 }
361
362 static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply)
363 {
364 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
365 * be used uninitialized in this function. FAIL */
366 uint8_t ret= 0;
367
368 if (noreply)
369 switch (verb)
370 {
371 case SET_OP:
372 ret=PROTOCOL_BINARY_CMD_SETQ;
373 break;
374 case ADD_OP:
375 ret=PROTOCOL_BINARY_CMD_ADDQ;
376 break;
377 case CAS_OP: /* FALLTHROUGH */
378 case REPLACE_OP:
379 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
380 break;
381 case APPEND_OP:
382 ret=PROTOCOL_BINARY_CMD_APPENDQ;
383 break;
384 case PREPEND_OP:
385 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
386 break;
387 }
388 else
389 switch (verb)
390 {
391 case SET_OP:
392 ret=PROTOCOL_BINARY_CMD_SET;
393 break;
394 case ADD_OP:
395 ret=PROTOCOL_BINARY_CMD_ADD;
396 break;
397 case CAS_OP: /* FALLTHROUGH */
398 case REPLACE_OP:
399 ret=PROTOCOL_BINARY_CMD_REPLACE;
400 break;
401 case APPEND_OP:
402 ret=PROTOCOL_BINARY_CMD_APPEND;
403 break;
404 case PREPEND_OP:
405 ret=PROTOCOL_BINARY_CMD_PREPEND;
406 break;
407 }
408
409 return ret;
410 }
411
412
413
414 static memcached_return memcached_send_binary(memcached_st *ptr,
415 const char *master_key,
416 size_t master_key_length,
417 const char *key,
418 size_t key_length,
419 const char *value,
420 size_t value_length,
421 time_t expiration,
422 uint32_t flags,
423 uint64_t cas,
424 memcached_storage_action verb)
425 {
426 char flush;
427 protocol_binary_request_set request= {.bytes= {0}};
428 size_t send_length= sizeof(request.bytes);
429 uint32_t server_key= memcached_generate_hash(ptr, master_key,
430 master_key_length);
431 memcached_server_st *server= &ptr->hosts[server_key];
432 bool noreply= server->root->flags & MEM_NOREPLY;
433
434 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
435 request.message.header.request.opcode= get_com_code(verb, noreply);
436 request.message.header.request.keylen= htons((uint16_t)key_length);
437 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
438 if (verb == APPEND_OP || verb == PREPEND_OP)
439 send_length -= 8; /* append & prepend does not contain extras! */
440 else
441 {
442 request.message.header.request.extlen= 8;
443 request.message.body.flags= htonl(flags);
444 request.message.body.expiration= htonl((uint32_t)expiration);
445 }
446
447 request.message.header.request.bodylen= htonl(key_length + value_length +
448 request.message.header.request.extlen);
449
450 if (cas)
451 request.message.header.request.cas= htonll(cas);
452
453 flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
454
455 if ((server->root->flags & MEM_USE_UDP) && !flush)
456 {
457 size_t cmd_size= send_length + key_length + value_length;
458 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
459 return MEMCACHED_WRITE_FAILURE;
460 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
461 memcached_io_write(server,NULL,0, 1);
462 }
463
464 /* write the header */
465 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
466 (memcached_io_write(server, key, key_length, 0) == -1) ||
467 (memcached_io_write(server, value, value_length, flush) == -1))
468 {
469 memcached_io_reset(server);
470 return MEMCACHED_WRITE_FAILURE;
471 }
472
473 if (verb == SET_OP && ptr->number_of_replicas > 0)
474 {
475 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
476
477 for (int x= 0; x < ptr->number_of_replicas; ++x)
478 {
479 ++server_key;
480 if (server_key == ptr->number_of_hosts)
481 server_key= 0;
482
483 memcached_server_st *srv= &ptr->hosts[server_key];
484 if ((memcached_do(srv, (const char*)request.bytes,
485 send_length, 0) != MEMCACHED_SUCCESS) ||
486 (memcached_io_write(srv, key, key_length, 0) == -1) ||
487 (memcached_io_write(srv, value, value_length, flush) == -1))
488 memcached_io_reset(server);
489 }
490 }
491
492 if (flush == 0)
493 return MEMCACHED_BUFFERED;
494
495 if (noreply)
496 return MEMCACHED_SUCCESS;
497
498 return memcached_response(server, NULL, 0, NULL);
499 }
500