97d345bdfd3d791e210d20d69928556cc5bd2e45
[m6w6/libmemcached] / libmemcached / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9 #include "common.h"
10 #include "memcached_io.h"
11
12 typedef enum {
13 SET_OP,
14 REPLACE_OP,
15 ADD_OP,
16 PREPEND_OP,
17 APPEND_OP,
18 CAS_OP,
19 } memcached_storage_action;
20
21 /* Inline this */
22 static char *storage_op_string(memcached_storage_action verb)
23 {
24 switch (verb)
25 {
26 case SET_OP:
27 return "set ";
28 case REPLACE_OP:
29 return "replace ";
30 case ADD_OP:
31 return "add ";
32 case PREPEND_OP:
33 return "prepend ";
34 case APPEND_OP:
35 return "append ";
36 case CAS_OP:
37 return "cas ";
38 default:
39 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
40 };
41
42 /* NOTREACHED */
43 }
44
45 static memcached_return memcached_send_binary(memcached_server_st* server,
46 const char *key,
47 size_t key_length,
48 const char *value,
49 size_t value_length,
50 time_t expiration,
51 uint32_t flags,
52 uint64_t cas,
53 memcached_storage_action verb);
54
55 static inline memcached_return memcached_send(memcached_st *ptr,
56 const char *master_key, size_t master_key_length,
57 const char *key, size_t key_length,
58 const char *value, size_t value_length,
59 time_t expiration,
60 uint32_t flags,
61 uint64_t cas,
62 memcached_storage_action verb)
63 {
64 char to_write;
65 size_t write_length;
66 ssize_t sent_length;
67 memcached_return rc;
68 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
69 unsigned int server_key;
70
71 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
72
73 rc= memcached_validate_key_length(key_length, ptr->flags & MEM_BINARY_PROTOCOL);
74 unlikely (rc != MEMCACHED_SUCCESS)
75 return rc;
76
77 unlikely (ptr->number_of_hosts == 0)
78 return MEMCACHED_NO_SERVERS;
79
80 if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
81 return MEMCACHED_BAD_KEY_PROVIDED;
82
83 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
84
85 if (ptr->flags & MEM_BINARY_PROTOCOL)
86 return memcached_send_binary(&ptr->hosts[server_key], key, key_length,
87 value, value_length, expiration,
88 flags, cas, verb);
89
90 if (cas)
91 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
92 "%s %s%.*s %u %llu %zu %llu%s\r\n",
93 storage_op_string(verb),
94 ptr->prefix_key,
95 (int)key_length, key, flags,
96 (unsigned long long)expiration, value_length,
97 (unsigned long long)cas,
98 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
99 else
100 {
101 char *buffer_ptr= buffer;
102 const char *command= storage_op_string(verb);
103
104 /* Copy in the command, no space needed, we handle that in the command function*/
105 memcpy(buffer_ptr, command, strlen(command));
106
107 /* Copy in the key prefix, switch to the buffer_ptr */
108 buffer_ptr= memcpy(buffer_ptr + strlen(command) , ptr->prefix_key, strlen(ptr->prefix_key));
109
110 /* Copy in the key, adjust point if a key prefix was used. */
111 buffer_ptr= memcpy(buffer_ptr + (ptr->prefix_key ? strlen(ptr->prefix_key) : 0),
112 key, key_length);
113 buffer_ptr+= key_length;
114 buffer_ptr[0]= ' ';
115 buffer_ptr++;
116
117 write_length= (size_t)(buffer_ptr - buffer);
118 write_length+= snprintf(buffer_ptr, MEMCACHED_DEFAULT_COMMAND_SIZE,
119 "%u %llu %zu%s\r\n",
120 flags,
121 (unsigned long long)expiration, value_length,
122 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
123 }
124
125 if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS)
126 {
127 size_t cmd_size= write_length + value_length + 2;
128 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
129 return MEMCACHED_WRITE_FAILURE;
130 if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
131 memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
132 }
133
134 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
135 {
136 rc= MEMCACHED_WRITE_FAILURE;
137 goto error;
138 }
139
140 /* Send command header */
141 rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
142 if (rc != MEMCACHED_SUCCESS)
143 goto error;
144
145 /* Send command body */
146 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
147 {
148 rc= MEMCACHED_WRITE_FAILURE;
149 goto error;
150 }
151
152 if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
153 to_write= 0;
154 else
155 to_write= 1;
156
157 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
158 {
159 rc= MEMCACHED_WRITE_FAILURE;
160 goto error;
161 }
162
163 if (ptr->flags & MEM_NOREPLY)
164 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
165
166 if (to_write == 0)
167 return MEMCACHED_BUFFERED;
168
169 rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
170
171 if (rc == MEMCACHED_STORED)
172 return MEMCACHED_SUCCESS;
173 else
174 return rc;
175
176 error:
177 memcached_io_reset(&ptr->hosts[server_key]);
178
179 return rc;
180 }
181
182
183 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
184 const char *value, size_t value_length,
185 time_t expiration,
186 uint32_t flags)
187 {
188 memcached_return rc;
189 LIBMEMCACHED_MEMCACHED_SET_START();
190 rc= memcached_send(ptr, key, key_length,
191 key, key_length, value, value_length,
192 expiration, flags, 0, SET_OP);
193 LIBMEMCACHED_MEMCACHED_SET_END();
194 return rc;
195 }
196
197 memcached_return memcached_add(memcached_st *ptr,
198 const char *key, size_t key_length,
199 const char *value, size_t value_length,
200 time_t expiration,
201 uint32_t flags)
202 {
203 memcached_return rc;
204 LIBMEMCACHED_MEMCACHED_ADD_START();
205 rc= memcached_send(ptr, key, key_length,
206 key, key_length, value, value_length,
207 expiration, flags, 0, ADD_OP);
208 LIBMEMCACHED_MEMCACHED_ADD_END();
209 return rc;
210 }
211
212 memcached_return memcached_replace(memcached_st *ptr,
213 const char *key, size_t key_length,
214 const char *value, size_t value_length,
215 time_t expiration,
216 uint32_t flags)
217 {
218 memcached_return rc;
219 LIBMEMCACHED_MEMCACHED_REPLACE_START();
220 rc= memcached_send(ptr, key, key_length,
221 key, key_length, value, value_length,
222 expiration, flags, 0, REPLACE_OP);
223 LIBMEMCACHED_MEMCACHED_REPLACE_END();
224 return rc;
225 }
226
227 memcached_return memcached_prepend(memcached_st *ptr,
228 const char *key, size_t key_length,
229 const char *value, size_t value_length,
230 time_t expiration,
231 uint32_t flags)
232 {
233 memcached_return rc;
234 rc= memcached_send(ptr, key, key_length,
235 key, key_length, value, value_length,
236 expiration, flags, 0, PREPEND_OP);
237 return rc;
238 }
239
240 memcached_return memcached_append(memcached_st *ptr,
241 const char *key, size_t key_length,
242 const char *value, size_t value_length,
243 time_t expiration,
244 uint32_t flags)
245 {
246 memcached_return rc;
247 rc= memcached_send(ptr, key, key_length,
248 key, key_length, value, value_length,
249 expiration, flags, 0, APPEND_OP);
250 return rc;
251 }
252
253 memcached_return memcached_cas(memcached_st *ptr,
254 const char *key, size_t key_length,
255 const char *value, size_t value_length,
256 time_t expiration,
257 uint32_t flags,
258 uint64_t cas)
259 {
260 memcached_return rc;
261 rc= memcached_send(ptr, key, key_length,
262 key, key_length, value, value_length,
263 expiration, flags, cas, CAS_OP);
264 return rc;
265 }
266
267 memcached_return memcached_set_by_key(memcached_st *ptr,
268 const char *master_key __attribute__((unused)),
269 size_t master_key_length __attribute__((unused)),
270 const char *key, size_t key_length,
271 const char *value, size_t value_length,
272 time_t expiration,
273 uint32_t flags)
274 {
275 memcached_return rc;
276 LIBMEMCACHED_MEMCACHED_SET_START();
277 rc= memcached_send(ptr, master_key, master_key_length,
278 key, key_length, value, value_length,
279 expiration, flags, 0, SET_OP);
280 LIBMEMCACHED_MEMCACHED_SET_END();
281 return rc;
282 }
283
284 memcached_return memcached_add_by_key(memcached_st *ptr,
285 const char *master_key, size_t master_key_length,
286 const char *key, size_t key_length,
287 const char *value, size_t value_length,
288 time_t expiration,
289 uint32_t flags)
290 {
291 memcached_return rc;
292 LIBMEMCACHED_MEMCACHED_ADD_START();
293 rc= memcached_send(ptr, master_key, master_key_length,
294 key, key_length, value, value_length,
295 expiration, flags, 0, ADD_OP);
296 LIBMEMCACHED_MEMCACHED_ADD_END();
297 return rc;
298 }
299
300 memcached_return memcached_replace_by_key(memcached_st *ptr,
301 const char *master_key, size_t master_key_length,
302 const char *key, size_t key_length,
303 const char *value, size_t value_length,
304 time_t expiration,
305 uint32_t flags)
306 {
307 memcached_return rc;
308 LIBMEMCACHED_MEMCACHED_REPLACE_START();
309 rc= memcached_send(ptr, master_key, master_key_length,
310 key, key_length, value, value_length,
311 expiration, flags, 0, REPLACE_OP);
312 LIBMEMCACHED_MEMCACHED_REPLACE_END();
313 return rc;
314 }
315
316 memcached_return memcached_prepend_by_key(memcached_st *ptr,
317 const char *master_key, size_t master_key_length,
318 const char *key, size_t key_length,
319 const char *value, size_t value_length,
320 time_t expiration,
321 uint32_t flags)
322 {
323 memcached_return rc;
324 rc= memcached_send(ptr, master_key, master_key_length,
325 key, key_length, value, value_length,
326 expiration, flags, 0, PREPEND_OP);
327 return rc;
328 }
329
330 memcached_return memcached_append_by_key(memcached_st *ptr,
331 const char *master_key, size_t master_key_length,
332 const char *key, size_t key_length,
333 const char *value, size_t value_length,
334 time_t expiration,
335 uint32_t flags)
336 {
337 memcached_return rc;
338 rc= memcached_send(ptr, master_key, master_key_length,
339 key, key_length, value, value_length,
340 expiration, flags, 0, APPEND_OP);
341 return rc;
342 }
343
344 memcached_return memcached_cas_by_key(memcached_st *ptr,
345 const char *master_key, size_t master_key_length,
346 const char *key, size_t key_length,
347 const char *value, size_t value_length,
348 time_t expiration,
349 uint32_t flags,
350 uint64_t cas)
351 {
352 memcached_return rc;
353 rc= memcached_send(ptr, master_key, master_key_length,
354 key, key_length, value, value_length,
355 expiration, flags, cas, CAS_OP);
356 return rc;
357 }
358
359 static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply)
360 {
361 /* 0 isn't a value we want, but GCC 4.2 seems to think ret can otherwise
362 * be used uninitialized in this function. FAIL */
363 uint8_t ret= 0;
364
365 if (noreply)
366 switch (verb)
367 {
368 case SET_OP:
369 ret=PROTOCOL_BINARY_CMD_SETQ;
370 break;
371 case ADD_OP:
372 ret=PROTOCOL_BINARY_CMD_ADDQ;
373 break;
374 case CAS_OP: /* FALLTHROUGH */
375 case REPLACE_OP:
376 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
377 break;
378 case APPEND_OP:
379 ret=PROTOCOL_BINARY_CMD_APPENDQ;
380 break;
381 case PREPEND_OP:
382 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
383 break;
384 }
385 else
386 switch (verb)
387 {
388 case SET_OP:
389 ret=PROTOCOL_BINARY_CMD_SET;
390 break;
391 case ADD_OP:
392 ret=PROTOCOL_BINARY_CMD_ADD;
393 break;
394 case CAS_OP: /* FALLTHROUGH */
395 case REPLACE_OP:
396 ret=PROTOCOL_BINARY_CMD_REPLACE;
397 break;
398 case APPEND_OP:
399 ret=PROTOCOL_BINARY_CMD_APPEND;
400 break;
401 case PREPEND_OP:
402 ret=PROTOCOL_BINARY_CMD_PREPEND;
403 break;
404 }
405
406 return ret;
407 }
408
409
410
411 static memcached_return memcached_send_binary(memcached_server_st* server,
412 const char *key,
413 size_t key_length,
414 const char *value,
415 size_t value_length,
416 time_t expiration,
417 uint32_t flags,
418 uint64_t cas,
419 memcached_storage_action verb)
420 {
421 char flush;
422 protocol_binary_request_set request= {.bytes= {0}};
423 size_t send_length= sizeof(request.bytes);
424 bool noreply= server->root->flags & MEM_NOREPLY;
425
426 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
427 request.message.header.request.opcode= get_com_code(verb, noreply);
428 request.message.header.request.keylen= htons((uint16_t)key_length);
429 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
430 if (verb == APPEND_OP || verb == PREPEND_OP)
431 send_length -= 8; /* append & prepend does not contain extras! */
432 else
433 {
434 request.message.header.request.extlen= 8;
435 request.message.body.flags= htonl(flags);
436 request.message.body.expiration= htonl((uint32_t)expiration);
437 }
438
439 request.message.header.request.bodylen= htonl(key_length + value_length +
440 request.message.header.request.extlen);
441
442 if (cas)
443 request.message.header.request.cas= htonll(cas);
444
445 flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
446
447 if ((server->root->flags & MEM_USE_UDP) && !flush)
448 {
449 size_t cmd_size= send_length + key_length + value_length;
450 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
451 return MEMCACHED_WRITE_FAILURE;
452 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
453 memcached_io_write(server,NULL,0, 1);
454 }
455
456 /* write the header */
457 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
458 (memcached_io_write(server, key, key_length, 0) == -1) ||
459 (memcached_io_write(server, value, value_length, flush) == -1))
460 {
461 memcached_io_reset(server);
462 return MEMCACHED_WRITE_FAILURE;
463 }
464
465 if (flush == 0)
466 return MEMCACHED_BUFFERED;
467
468 if (noreply)
469 return MEMCACHED_SUCCESS;
470
471 return memcached_response(server, NULL, 0, NULL);
472 }
473