Abstract out ptr->hosts[server_key] references.
[m6w6/libmemcached] / libmemcached / storage.c
1 /* LibMemcached
2 * Copyright (C) 2006-2009 Brian Aker
3 * All rights reserved.
4 *
5 * Use and distribution licensed under the BSD license. See
6 * the COPYING file in the parent directory for full text.
7 *
8 * Summary: Storage related functions, aka set, replace,..
9 *
10 */
11
12 #include "common.h"
13
14 typedef enum {
15 SET_OP,
16 REPLACE_OP,
17 ADD_OP,
18 PREPEND_OP,
19 APPEND_OP,
20 CAS_OP,
21 } memcached_storage_action_t;
22
23 /* Inline this */
24 static inline const char *storage_op_string(memcached_storage_action_t verb)
25 {
26 switch (verb)
27 {
28 case SET_OP:
29 return "set ";
30 case REPLACE_OP:
31 return "replace ";
32 case ADD_OP:
33 return "add ";
34 case PREPEND_OP:
35 return "prepend ";
36 case APPEND_OP:
37 return "append ";
38 case CAS_OP:
39 return "cas ";
40 default:
41 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
42 }
43
44 /* NOTREACHED */
45 }
46
47 static memcached_return_t memcached_send_binary(memcached_st *ptr,
48 const char *master_key,
49 size_t master_key_length,
50 const char *key,
51 size_t key_length,
52 const char *value,
53 size_t value_length,
54 time_t expiration,
55 uint32_t flags,
56 uint64_t cas,
57 memcached_storage_action_t verb);
58
59 static inline memcached_return_t memcached_send(memcached_st *ptr,
60 const char *master_key, size_t master_key_length,
61 const char *key, size_t key_length,
62 const char *value, size_t value_length,
63 time_t expiration,
64 uint32_t flags,
65 uint64_t cas,
66 memcached_storage_action_t verb)
67 {
68 char to_write;
69 size_t write_length;
70 memcached_return_t rc;
71 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
72 uint32_t server_key;
73 memcached_server_instance_st *instance;
74
75 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
76
77 rc= memcached_validate_key_length(key_length, ptr->flags.binary_protocol);
78 unlikely (rc != MEMCACHED_SUCCESS)
79 return rc;
80
81 unlikely (memcached_server_count(ptr) == 0)
82 return MEMCACHED_NO_SERVERS;
83
84 if (ptr->flags.verify_key && (memcached_key_test((const char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
85 return MEMCACHED_BAD_KEY_PROVIDED;
86
87 if (ptr->flags.binary_protocol)
88 {
89 return memcached_send_binary(ptr, master_key, master_key_length,
90 key, key_length,
91 value, value_length, expiration,
92 flags, cas, verb);
93 }
94
95 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
96 instance= memcached_server_instance_fetch(ptr, server_key);
97
98 if (cas)
99 {
100 write_length= (size_t) snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
101 "%s %s%.*s %u %llu %zu %llu%s\r\n",
102 storage_op_string(verb),
103 ptr->prefix_key,
104 (int)key_length, key, flags,
105 (unsigned long long)expiration, value_length,
106 (unsigned long long)cas,
107 (ptr->flags.no_reply) ? " noreply" : "");
108 }
109 else
110 {
111 char *buffer_ptr= buffer;
112 const char *command= storage_op_string(verb);
113
114 /* Copy in the command, no space needed, we handle that in the command function*/
115 memcpy(buffer_ptr, command, strlen(command));
116
117 /* Copy in the key prefix, switch to the buffer_ptr */
118 buffer_ptr= memcpy((buffer_ptr + strlen(command)), ptr->prefix_key, strlen(ptr->prefix_key));
119
120 /* Copy in the key, adjust point if a key prefix was used. */
121 buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key ? strlen(ptr->prefix_key) : 0),
122 key, key_length);
123 buffer_ptr+= key_length;
124 buffer_ptr[0]= ' ';
125 buffer_ptr++;
126
127 write_length= (size_t)(buffer_ptr - buffer);
128 write_length+= (size_t) snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE,
129 "%u %llu %zu%s\r\n",
130 flags,
131 (unsigned long long)expiration, value_length,
132 ptr->flags.no_reply ? " noreply" : "");
133 }
134
135 if (ptr->flags.use_udp && ptr->flags.buffer_requests)
136 {
137 size_t cmd_size= write_length + value_length + 2;
138 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
139 return MEMCACHED_WRITE_FAILURE;
140 if (cmd_size + instance->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
141 memcached_io_write(instance, NULL, 0, 1);
142 }
143
144 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
145 {
146 rc= MEMCACHED_WRITE_FAILURE;
147 goto error;
148 }
149
150 /* Send command header */
151 rc= memcached_do(instance, buffer, write_length, 0);
152 if (rc != MEMCACHED_SUCCESS)
153 goto error;
154
155 /* Send command body */
156 if (memcached_io_write(instance, value, value_length, 0) == -1)
157 {
158 rc= MEMCACHED_WRITE_FAILURE;
159 goto error;
160 }
161
162 if (ptr->flags.buffer_requests && verb == SET_OP)
163 {
164 to_write= 0;
165 }
166 else
167 {
168 to_write= 1;
169 }
170
171 if (memcached_io_write(instance, "\r\n", 2, to_write) == -1)
172 {
173 rc= MEMCACHED_WRITE_FAILURE;
174 goto error;
175 }
176
177 if (ptr->flags.no_reply)
178 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
179
180 if (to_write == 0)
181 return MEMCACHED_BUFFERED;
182
183 rc= memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
184
185 if (rc == MEMCACHED_STORED)
186 return MEMCACHED_SUCCESS;
187 else
188 return rc;
189
190 error:
191 memcached_io_reset(instance);
192
193 return rc;
194 }
195
196
197 memcached_return_t memcached_set(memcached_st *ptr, const char *key, size_t key_length,
198 const char *value, size_t value_length,
199 time_t expiration,
200 uint32_t flags)
201 {
202 memcached_return_t rc;
203 LIBMEMCACHED_MEMCACHED_SET_START();
204 rc= memcached_send(ptr, key, key_length,
205 key, key_length, value, value_length,
206 expiration, flags, 0, SET_OP);
207 LIBMEMCACHED_MEMCACHED_SET_END();
208 return rc;
209 }
210
211 memcached_return_t memcached_add(memcached_st *ptr,
212 const char *key, size_t key_length,
213 const char *value, size_t value_length,
214 time_t expiration,
215 uint32_t flags)
216 {
217 memcached_return_t rc;
218 LIBMEMCACHED_MEMCACHED_ADD_START();
219 rc= memcached_send(ptr, key, key_length,
220 key, key_length, value, value_length,
221 expiration, flags, 0, ADD_OP);
222 LIBMEMCACHED_MEMCACHED_ADD_END();
223 return rc;
224 }
225
226 memcached_return_t memcached_replace(memcached_st *ptr,
227 const char *key, size_t key_length,
228 const char *value, size_t value_length,
229 time_t expiration,
230 uint32_t flags)
231 {
232 memcached_return_t rc;
233 LIBMEMCACHED_MEMCACHED_REPLACE_START();
234 rc= memcached_send(ptr, key, key_length,
235 key, key_length, value, value_length,
236 expiration, flags, 0, REPLACE_OP);
237 LIBMEMCACHED_MEMCACHED_REPLACE_END();
238 return rc;
239 }
240
241 memcached_return_t memcached_prepend(memcached_st *ptr,
242 const char *key, size_t key_length,
243 const char *value, size_t value_length,
244 time_t expiration,
245 uint32_t flags)
246 {
247 memcached_return_t rc;
248 rc= memcached_send(ptr, key, key_length,
249 key, key_length, value, value_length,
250 expiration, flags, 0, PREPEND_OP);
251 return rc;
252 }
253
254 memcached_return_t memcached_append(memcached_st *ptr,
255 const char *key, size_t key_length,
256 const char *value, size_t value_length,
257 time_t expiration,
258 uint32_t flags)
259 {
260 memcached_return_t rc;
261 rc= memcached_send(ptr, key, key_length,
262 key, key_length, value, value_length,
263 expiration, flags, 0, APPEND_OP);
264 return rc;
265 }
266
267 memcached_return_t memcached_cas(memcached_st *ptr,
268 const char *key, size_t key_length,
269 const char *value, size_t value_length,
270 time_t expiration,
271 uint32_t flags,
272 uint64_t cas)
273 {
274 memcached_return_t rc;
275 rc= memcached_send(ptr, key, key_length,
276 key, key_length, value, value_length,
277 expiration, flags, cas, CAS_OP);
278 return rc;
279 }
280
281 memcached_return_t memcached_set_by_key(memcached_st *ptr,
282 const char *master_key __attribute__((unused)),
283 size_t master_key_length __attribute__((unused)),
284 const char *key, size_t key_length,
285 const char *value, size_t value_length,
286 time_t expiration,
287 uint32_t flags)
288 {
289 memcached_return_t rc;
290 LIBMEMCACHED_MEMCACHED_SET_START();
291 rc= memcached_send(ptr, master_key, master_key_length,
292 key, key_length, value, value_length,
293 expiration, flags, 0, SET_OP);
294 LIBMEMCACHED_MEMCACHED_SET_END();
295 return rc;
296 }
297
298 memcached_return_t memcached_add_by_key(memcached_st *ptr,
299 const char *master_key, size_t master_key_length,
300 const char *key, size_t key_length,
301 const char *value, size_t value_length,
302 time_t expiration,
303 uint32_t flags)
304 {
305 memcached_return_t rc;
306 LIBMEMCACHED_MEMCACHED_ADD_START();
307 rc= memcached_send(ptr, master_key, master_key_length,
308 key, key_length, value, value_length,
309 expiration, flags, 0, ADD_OP);
310 LIBMEMCACHED_MEMCACHED_ADD_END();
311 return rc;
312 }
313
314 memcached_return_t memcached_replace_by_key(memcached_st *ptr,
315 const char *master_key, size_t master_key_length,
316 const char *key, size_t key_length,
317 const char *value, size_t value_length,
318 time_t expiration,
319 uint32_t flags)
320 {
321 memcached_return_t rc;
322 LIBMEMCACHED_MEMCACHED_REPLACE_START();
323 rc= memcached_send(ptr, master_key, master_key_length,
324 key, key_length, value, value_length,
325 expiration, flags, 0, REPLACE_OP);
326 LIBMEMCACHED_MEMCACHED_REPLACE_END();
327 return rc;
328 }
329
330 memcached_return_t memcached_prepend_by_key(memcached_st *ptr,
331 const char *master_key, size_t master_key_length,
332 const char *key, size_t key_length,
333 const char *value, size_t value_length,
334 time_t expiration,
335 uint32_t flags)
336 {
337 memcached_return_t rc;
338 rc= memcached_send(ptr, master_key, master_key_length,
339 key, key_length, value, value_length,
340 expiration, flags, 0, PREPEND_OP);
341 return rc;
342 }
343
344 memcached_return_t memcached_append_by_key(memcached_st *ptr,
345 const char *master_key, size_t master_key_length,
346 const char *key, size_t key_length,
347 const char *value, size_t value_length,
348 time_t expiration,
349 uint32_t flags)
350 {
351 memcached_return_t rc;
352 rc= memcached_send(ptr, master_key, master_key_length,
353 key, key_length, value, value_length,
354 expiration, flags, 0, APPEND_OP);
355 return rc;
356 }
357
358 memcached_return_t memcached_cas_by_key(memcached_st *ptr,
359 const char *master_key, size_t master_key_length,
360 const char *key, size_t key_length,
361 const char *value, size_t value_length,
362 time_t expiration,
363 uint32_t flags,
364 uint64_t cas)
365 {
366 memcached_return_t rc;
367 rc= memcached_send(ptr, master_key, master_key_length,
368 key, key_length, value, value_length,
369 expiration, flags, cas, CAS_OP);
370 return rc;
371 }
372
373 static inline uint8_t get_com_code(memcached_storage_action_t verb, bool noreply)
374 {
375 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
376 * be used uninitialized in this function. FAIL */
377 uint8_t ret= 0;
378
379 if (noreply)
380 switch (verb)
381 {
382 case SET_OP:
383 ret=PROTOCOL_BINARY_CMD_SETQ;
384 break;
385 case ADD_OP:
386 ret=PROTOCOL_BINARY_CMD_ADDQ;
387 break;
388 case CAS_OP: /* FALLTHROUGH */
389 case REPLACE_OP:
390 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
391 break;
392 case APPEND_OP:
393 ret=PROTOCOL_BINARY_CMD_APPENDQ;
394 break;
395 case PREPEND_OP:
396 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
397 break;
398 default:
399 WATCHPOINT_ASSERT(verb);
400 break;
401 }
402 else
403 switch (verb)
404 {
405 case SET_OP:
406 ret=PROTOCOL_BINARY_CMD_SET;
407 break;
408 case ADD_OP:
409 ret=PROTOCOL_BINARY_CMD_ADD;
410 break;
411 case CAS_OP: /* FALLTHROUGH */
412 case REPLACE_OP:
413 ret=PROTOCOL_BINARY_CMD_REPLACE;
414 break;
415 case APPEND_OP:
416 ret=PROTOCOL_BINARY_CMD_APPEND;
417 break;
418 case PREPEND_OP:
419 ret=PROTOCOL_BINARY_CMD_PREPEND;
420 break;
421 default:
422 WATCHPOINT_ASSERT(verb);
423 break;
424 }
425
426 return ret;
427 }
428
429
430
431 static memcached_return_t memcached_send_binary(memcached_st *ptr,
432 const char *master_key,
433 size_t master_key_length,
434 const char *key,
435 size_t key_length,
436 const char *value,
437 size_t value_length,
438 time_t expiration,
439 uint32_t flags,
440 uint64_t cas,
441 memcached_storage_action_t verb)
442 {
443 uint8_t flush;
444 protocol_binary_request_set request= {.bytes= {0}};
445 size_t send_length= sizeof(request.bytes);
446 uint32_t server_key= memcached_generate_hash(ptr, master_key,
447 master_key_length);
448
449 memcached_server_instance_st *server=
450 memcached_server_instance_fetch(ptr, server_key);
451
452 bool noreply= server->root->flags.no_reply;
453
454 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
455 request.message.header.request.opcode= get_com_code(verb, noreply);
456 request.message.header.request.keylen= htons((uint16_t)key_length);
457 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
458 if (verb == APPEND_OP || verb == PREPEND_OP)
459 send_length -= 8; /* append & prepend does not contain extras! */
460 else
461 {
462 request.message.header.request.extlen= 8;
463 request.message.body.flags= htonl(flags);
464 request.message.body.expiration= htonl((uint32_t)expiration);
465 }
466
467 request.message.header.request.bodylen= htonl((uint32_t) (key_length + value_length +
468 request.message.header.request.extlen));
469
470 if (cas)
471 request.message.header.request.cas= htonll(cas);
472
473 flush= (uint8_t) ((server->root->flags.buffer_requests && verb == SET_OP) ? 0 : 1);
474
475 if (server->root->flags.use_udp && !flush)
476 {
477 size_t cmd_size= send_length + key_length + value_length;
478
479 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
480 {
481 return MEMCACHED_WRITE_FAILURE;
482 }
483 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
484 {
485 memcached_io_write(server, NULL, 0, 1);
486 }
487 }
488
489 /* write the header */
490 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
491 (memcached_io_write(server, key, key_length, 0) == -1) ||
492 (memcached_io_write(server, value, value_length, (char) flush) == -1))
493 {
494 memcached_io_reset(server);
495 return MEMCACHED_WRITE_FAILURE;
496 }
497
498 unlikely (verb == SET_OP && ptr->number_of_replicas > 0)
499 {
500 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SETQ;
501
502 for (uint32_t x= 0; x < ptr->number_of_replicas; x++)
503 {
504 memcached_server_instance_st *instance;
505
506 ++server_key;
507 if (server_key == memcached_server_count(ptr))
508 server_key= 0;
509
510 instance= memcached_server_instance_fetch(ptr, server_key);
511
512 if ((memcached_do(instance, (const char*)request.bytes,
513 send_length, 0) != MEMCACHED_SUCCESS) ||
514 (memcached_io_write(instance, key, key_length, 0) == -1) ||
515 (memcached_io_write(instance, value, value_length, (char) flush) == -1))
516 {
517 memcached_io_reset(instance);
518 }
519 else
520 {
521 memcached_server_response_decrement(instance);
522 }
523 }
524 }
525
526 if (flush == 0)
527 {
528 return MEMCACHED_BUFFERED;
529 }
530
531 if (noreply)
532 {
533 return MEMCACHED_SUCCESS;
534 }
535
536 return memcached_response(server, NULL, 0, NULL);
537 }
538