ea8b1d249b98d5d1189d4d151bf5191120f3f5b6
[awesomized/libmemcached] / libmemcached / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9 #include "common.h"
10 #include "memcached_io.h"
11
12 typedef enum {
13 SET_OP,
14 REPLACE_OP,
15 ADD_OP,
16 PREPEND_OP,
17 APPEND_OP,
18 CAS_OP,
19 } memcached_storage_action;
20
21 /* Inline this */
22 static char *storage_op_string(memcached_storage_action verb)
23 {
24 switch (verb)
25 {
26 case SET_OP:
27 return "set";
28 case REPLACE_OP:
29 return "replace";
30 case ADD_OP:
31 return "add";
32 case PREPEND_OP:
33 return "prepend";
34 case APPEND_OP:
35 return "append";
36 case CAS_OP:
37 return "cas";
38 default:
39 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
40 };
41
42 /* NOTREACHED */
43 }
44
45 static memcached_return memcached_send_binary(memcached_server_st* server,
46 const char *key,
47 size_t key_length,
48 const char *value,
49 size_t value_length,
50 time_t expiration,
51 uint32_t flags,
52 uint64_t cas,
53 memcached_storage_action verb);
54
55 static inline memcached_return memcached_send(memcached_st *ptr,
56 const char *master_key, size_t master_key_length,
57 const char *key, size_t key_length,
58 const char *value, size_t value_length,
59 time_t expiration,
60 uint32_t flags,
61 uint64_t cas,
62 memcached_storage_action verb)
63 {
64 char to_write;
65 size_t write_length;
66 ssize_t sent_length;
67 memcached_return rc;
68 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
69 unsigned int server_key;
70
71 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
72
73 rc= memcached_validate_key_length(key_length, ptr->flags & MEM_BINARY_PROTOCOL);
74 unlikely (rc != MEMCACHED_SUCCESS)
75 return rc;
76
77 unlikely (ptr->number_of_hosts == 0)
78 return MEMCACHED_NO_SERVERS;
79
80 if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
81 return MEMCACHED_BAD_KEY_PROVIDED;
82
83 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
84
85 if (ptr->flags & MEM_BINARY_PROTOCOL)
86 return memcached_send_binary(&ptr->hosts[server_key], key, key_length,
87 value, value_length, expiration,
88 flags, cas, verb);
89
90 if (cas)
91 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
92 "%s %s%.*s %u %llu %zu %llu%s\r\n",
93 storage_op_string(verb),
94 ptr->prefix_key,
95 (int)key_length, key, flags,
96 (unsigned long long)expiration, value_length,
97 (unsigned long long)cas,
98 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
99 else
100 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
101 "%s %s%.*s %u %llu %zu%s\r\n",
102 storage_op_string(verb),
103 ptr->prefix_key,
104 (int)key_length, key, flags,
105 (unsigned long long)expiration, value_length,
106 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
107
108 if (ptr->flags & MEM_USE_UDP && ptr->flags & MEM_BUFFER_REQUESTS)
109 {
110 size_t cmd_size= write_length + value_length + 2;
111 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
112 return MEMCACHED_WRITE_FAILURE;
113 if (cmd_size + ptr->hosts[server_key].write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
114 memcached_io_write(&ptr->hosts[server_key], NULL, 0, 1);
115 }
116
117 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
118 {
119 rc= MEMCACHED_WRITE_FAILURE;
120 goto error;
121 }
122
123 /* Send command header */
124 rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
125 if (rc != MEMCACHED_SUCCESS)
126 goto error;
127
128 /* Send command body */
129 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
130 {
131 rc= MEMCACHED_WRITE_FAILURE;
132 goto error;
133 }
134
135 if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
136 to_write= 0;
137 else
138 to_write= 1;
139
140 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
141 {
142 rc= MEMCACHED_WRITE_FAILURE;
143 goto error;
144 }
145
146 if (ptr->flags & MEM_NOREPLY)
147 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
148
149 if (to_write == 0)
150 return MEMCACHED_BUFFERED;
151
152 rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
153
154 if (rc == MEMCACHED_STORED)
155 return MEMCACHED_SUCCESS;
156 else
157 return rc;
158
159 error:
160 memcached_io_reset(&ptr->hosts[server_key]);
161
162 return rc;
163 }
164
165
166 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
167 const char *value, size_t value_length,
168 time_t expiration,
169 uint32_t flags)
170 {
171 memcached_return rc;
172 LIBMEMCACHED_MEMCACHED_SET_START();
173 rc= memcached_send(ptr, key, key_length,
174 key, key_length, value, value_length,
175 expiration, flags, 0, SET_OP);
176 LIBMEMCACHED_MEMCACHED_SET_END();
177 return rc;
178 }
179
180 memcached_return memcached_add(memcached_st *ptr,
181 const char *key, size_t key_length,
182 const char *value, size_t value_length,
183 time_t expiration,
184 uint32_t flags)
185 {
186 memcached_return rc;
187 LIBMEMCACHED_MEMCACHED_ADD_START();
188 rc= memcached_send(ptr, key, key_length,
189 key, key_length, value, value_length,
190 expiration, flags, 0, ADD_OP);
191 LIBMEMCACHED_MEMCACHED_ADD_END();
192 return rc;
193 }
194
195 memcached_return memcached_replace(memcached_st *ptr,
196 const char *key, size_t key_length,
197 const char *value, size_t value_length,
198 time_t expiration,
199 uint32_t flags)
200 {
201 memcached_return rc;
202 LIBMEMCACHED_MEMCACHED_REPLACE_START();
203 rc= memcached_send(ptr, key, key_length,
204 key, key_length, value, value_length,
205 expiration, flags, 0, REPLACE_OP);
206 LIBMEMCACHED_MEMCACHED_REPLACE_END();
207 return rc;
208 }
209
210 memcached_return memcached_prepend(memcached_st *ptr,
211 const char *key, size_t key_length,
212 const char *value, size_t value_length,
213 time_t expiration,
214 uint32_t flags)
215 {
216 memcached_return rc;
217 rc= memcached_send(ptr, key, key_length,
218 key, key_length, value, value_length,
219 expiration, flags, 0, PREPEND_OP);
220 return rc;
221 }
222
223 memcached_return memcached_append(memcached_st *ptr,
224 const char *key, size_t key_length,
225 const char *value, size_t value_length,
226 time_t expiration,
227 uint32_t flags)
228 {
229 memcached_return rc;
230 rc= memcached_send(ptr, key, key_length,
231 key, key_length, value, value_length,
232 expiration, flags, 0, APPEND_OP);
233 return rc;
234 }
235
236 memcached_return memcached_cas(memcached_st *ptr,
237 const char *key, size_t key_length,
238 const char *value, size_t value_length,
239 time_t expiration,
240 uint32_t flags,
241 uint64_t cas)
242 {
243 memcached_return rc;
244 rc= memcached_send(ptr, key, key_length,
245 key, key_length, value, value_length,
246 expiration, flags, cas, CAS_OP);
247 return rc;
248 }
249
250 memcached_return memcached_set_by_key(memcached_st *ptr,
251 const char *master_key __attribute__((unused)),
252 size_t master_key_length __attribute__((unused)),
253 const char *key, size_t key_length,
254 const char *value, size_t value_length,
255 time_t expiration,
256 uint32_t flags)
257 {
258 memcached_return rc;
259 LIBMEMCACHED_MEMCACHED_SET_START();
260 rc= memcached_send(ptr, master_key, master_key_length,
261 key, key_length, value, value_length,
262 expiration, flags, 0, SET_OP);
263 LIBMEMCACHED_MEMCACHED_SET_END();
264 return rc;
265 }
266
267 memcached_return memcached_add_by_key(memcached_st *ptr,
268 const char *master_key, size_t master_key_length,
269 const char *key, size_t key_length,
270 const char *value, size_t value_length,
271 time_t expiration,
272 uint32_t flags)
273 {
274 memcached_return rc;
275 LIBMEMCACHED_MEMCACHED_ADD_START();
276 rc= memcached_send(ptr, master_key, master_key_length,
277 key, key_length, value, value_length,
278 expiration, flags, 0, ADD_OP);
279 LIBMEMCACHED_MEMCACHED_ADD_END();
280 return rc;
281 }
282
283 memcached_return memcached_replace_by_key(memcached_st *ptr,
284 const char *master_key, size_t master_key_length,
285 const char *key, size_t key_length,
286 const char *value, size_t value_length,
287 time_t expiration,
288 uint32_t flags)
289 {
290 memcached_return rc;
291 LIBMEMCACHED_MEMCACHED_REPLACE_START();
292 rc= memcached_send(ptr, master_key, master_key_length,
293 key, key_length, value, value_length,
294 expiration, flags, 0, REPLACE_OP);
295 LIBMEMCACHED_MEMCACHED_REPLACE_END();
296 return rc;
297 }
298
299 memcached_return memcached_prepend_by_key(memcached_st *ptr,
300 const char *master_key, size_t master_key_length,
301 const char *key, size_t key_length,
302 const char *value, size_t value_length,
303 time_t expiration,
304 uint32_t flags)
305 {
306 memcached_return rc;
307 rc= memcached_send(ptr, master_key, master_key_length,
308 key, key_length, value, value_length,
309 expiration, flags, 0, PREPEND_OP);
310 return rc;
311 }
312
313 memcached_return memcached_append_by_key(memcached_st *ptr,
314 const char *master_key, size_t master_key_length,
315 const char *key, size_t key_length,
316 const char *value, size_t value_length,
317 time_t expiration,
318 uint32_t flags)
319 {
320 memcached_return rc;
321 rc= memcached_send(ptr, master_key, master_key_length,
322 key, key_length, value, value_length,
323 expiration, flags, 0, APPEND_OP);
324 return rc;
325 }
326
327 memcached_return memcached_cas_by_key(memcached_st *ptr,
328 const char *master_key, size_t master_key_length,
329 const char *key, size_t key_length,
330 const char *value, size_t value_length,
331 time_t expiration,
332 uint32_t flags,
333 uint64_t cas)
334 {
335 memcached_return rc;
336 rc= memcached_send(ptr, master_key, master_key_length,
337 key, key_length, value, value_length,
338 expiration, flags, cas, CAS_OP);
339 return rc;
340 }
341
342 static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply)
343 {
344 uint8_t ret;
345
346 if (noreply)
347 switch (verb)
348 {
349 case SET_OP:
350 ret=PROTOCOL_BINARY_CMD_SETQ;
351 break;
352 case ADD_OP:
353 ret=PROTOCOL_BINARY_CMD_ADDQ;
354 break;
355 case CAS_OP: /* FALLTHROUGH */
356 case REPLACE_OP:
357 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
358 break;
359 case APPEND_OP:
360 ret=PROTOCOL_BINARY_CMD_APPENDQ;
361 break;
362 case PREPEND_OP:
363 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
364 break;
365 }
366 else
367 switch (verb)
368 {
369 case SET_OP:
370 ret=PROTOCOL_BINARY_CMD_SET;
371 break;
372 case ADD_OP:
373 ret=PROTOCOL_BINARY_CMD_ADD;
374 break;
375 case CAS_OP: /* FALLTHROUGH */
376 case REPLACE_OP:
377 ret=PROTOCOL_BINARY_CMD_REPLACE;
378 break;
379 case APPEND_OP:
380 ret=PROTOCOL_BINARY_CMD_APPEND;
381 break;
382 case PREPEND_OP:
383 ret=PROTOCOL_BINARY_CMD_PREPEND;
384 break;
385 }
386
387 return ret;
388 }
389
390
391
392 static memcached_return memcached_send_binary(memcached_server_st* server,
393 const char *key,
394 size_t key_length,
395 const char *value,
396 size_t value_length,
397 time_t expiration,
398 uint32_t flags,
399 uint64_t cas,
400 memcached_storage_action verb)
401 {
402 char flush;
403 protocol_binary_request_set request= {.bytes= {0}};
404 size_t send_length= sizeof(request.bytes);
405 bool noreply= server->root->flags & MEM_NOREPLY;
406
407 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
408 request.message.header.request.opcode= get_com_code(verb, noreply);
409 request.message.header.request.keylen= htons((uint16_t)key_length);
410 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
411 if (verb == APPEND_OP || verb == PREPEND_OP)
412 send_length -= 8; /* append & prepend does not contain extras! */
413 else
414 {
415 request.message.header.request.extlen= 8;
416 request.message.body.flags= htonl(flags);
417 request.message.body.expiration= htonl((uint32_t)expiration);
418 }
419
420 request.message.header.request.bodylen= htonl(key_length + value_length +
421 request.message.header.request.extlen);
422
423 if (cas)
424 request.message.header.request.cas= htonll(cas);
425
426 flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
427
428 if ((server->root->flags & MEM_USE_UDP) && !flush)
429 {
430 size_t cmd_size= send_length + key_length + value_length;
431 if (cmd_size > MAX_UDP_DATAGRAM_LENGTH - UDP_DATAGRAM_HEADER_LENGTH)
432 return MEMCACHED_WRITE_FAILURE;
433 if (cmd_size + server->write_buffer_offset > MAX_UDP_DATAGRAM_LENGTH)
434 memcached_io_write(server,NULL,0, 1);
435 }
436
437 /* write the header */
438 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
439 (memcached_io_write(server, key, key_length, 0) == -1) ||
440 (memcached_io_write(server, value, value_length, flush) == -1))
441 {
442 memcached_io_reset(server);
443 return MEMCACHED_WRITE_FAILURE;
444 }
445
446 if (flush == 0)
447 return MEMCACHED_BUFFERED;
448
449 if (noreply)
450 return MEMCACHED_SUCCESS;
451
452 return memcached_response(server, NULL, 0, NULL);
453 }
454