Implemented support for noreply in the binary protocol
[awesomized/libmemcached] / libmemcached / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9 #include "common.h"
10 #include "memcached_io.h"
11
12 typedef enum {
13 SET_OP,
14 REPLACE_OP,
15 ADD_OP,
16 PREPEND_OP,
17 APPEND_OP,
18 CAS_OP,
19 } memcached_storage_action;
20
21 /* Inline this */
22 static char *storage_op_string(memcached_storage_action verb)
23 {
24 switch (verb)
25 {
26 case SET_OP:
27 return "set";
28 case REPLACE_OP:
29 return "replace";
30 case ADD_OP:
31 return "add";
32 case PREPEND_OP:
33 return "prepend";
34 case APPEND_OP:
35 return "append";
36 case CAS_OP:
37 return "cas";
38 default:
39 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
40 };
41
42 /* NOTREACHED */
43 }
44
45 static memcached_return memcached_send_binary(memcached_server_st* server,
46 const char *key,
47 size_t key_length,
48 const char *value,
49 size_t value_length,
50 time_t expiration,
51 uint32_t flags,
52 uint64_t cas,
53 memcached_storage_action verb);
54
55 static inline memcached_return memcached_send(memcached_st *ptr,
56 const char *master_key, size_t master_key_length,
57 const char *key, size_t key_length,
58 const char *value, size_t value_length,
59 time_t expiration,
60 uint32_t flags,
61 uint64_t cas,
62 memcached_storage_action verb)
63 {
64 char to_write;
65 size_t write_length;
66 ssize_t sent_length;
67 memcached_return rc;
68 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
69 unsigned int server_key;
70
71 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
72
73 rc= memcached_validate_key_length(key_length, ptr->flags & MEM_BINARY_PROTOCOL);
74 unlikely (rc != MEMCACHED_SUCCESS)
75 return rc;
76
77 unlikely (ptr->number_of_hosts == 0)
78 return MEMCACHED_NO_SERVERS;
79
80 if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
81 return MEMCACHED_BAD_KEY_PROVIDED;
82
83 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
84
85 if (ptr->flags & MEM_BINARY_PROTOCOL)
86 return memcached_send_binary(&ptr->hosts[server_key], key, key_length,
87 value, value_length, expiration,
88 flags, cas, verb);
89
90 if (cas)
91 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
92 "%s %s%.*s %u %llu %zu %llu%s\r\n",
93 storage_op_string(verb),
94 ptr->prefix_key,
95 (int)key_length, key, flags,
96 (unsigned long long)expiration, value_length,
97 (unsigned long long)cas,
98 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
99 else
100 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
101 "%s %s%.*s %u %llu %zu%s\r\n",
102 storage_op_string(verb),
103 ptr->prefix_key,
104 (int)key_length, key, flags,
105 (unsigned long long)expiration, value_length,
106 (ptr->flags & MEM_NOREPLY) ? " noreply" : "");
107
108 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
109 {
110 rc= MEMCACHED_WRITE_FAILURE;
111 goto error;
112 }
113
114 /* Send command header */
115 rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
116 if (rc != MEMCACHED_SUCCESS)
117 goto error;
118
119 /* Send command body */
120 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
121 {
122 rc= MEMCACHED_WRITE_FAILURE;
123 goto error;
124 }
125
126 if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
127 to_write= 0;
128 else
129 to_write= 1;
130
131 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
132 {
133 rc= MEMCACHED_WRITE_FAILURE;
134 goto error;
135 }
136
137 if (ptr->flags & MEM_NOREPLY)
138 {
139 memcached_server_response_decrement(&ptr->hosts[server_key]);
140 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
141 }
142
143 if (to_write == 0)
144 return MEMCACHED_BUFFERED;
145
146 rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
147
148 if (rc == MEMCACHED_STORED)
149 return MEMCACHED_SUCCESS;
150 else
151 return rc;
152
153 error:
154 memcached_io_reset(&ptr->hosts[server_key]);
155
156 return rc;
157 }
158
159
160 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
161 const char *value, size_t value_length,
162 time_t expiration,
163 uint32_t flags)
164 {
165 memcached_return rc;
166 LIBMEMCACHED_MEMCACHED_SET_START();
167 rc= memcached_send(ptr, key, key_length,
168 key, key_length, value, value_length,
169 expiration, flags, 0, SET_OP);
170 LIBMEMCACHED_MEMCACHED_SET_END();
171 return rc;
172 }
173
174 memcached_return memcached_add(memcached_st *ptr,
175 const char *key, size_t key_length,
176 const char *value, size_t value_length,
177 time_t expiration,
178 uint32_t flags)
179 {
180 memcached_return rc;
181 LIBMEMCACHED_MEMCACHED_ADD_START();
182 rc= memcached_send(ptr, key, key_length,
183 key, key_length, value, value_length,
184 expiration, flags, 0, ADD_OP);
185 LIBMEMCACHED_MEMCACHED_ADD_END();
186 return rc;
187 }
188
189 memcached_return memcached_replace(memcached_st *ptr,
190 const char *key, size_t key_length,
191 const char *value, size_t value_length,
192 time_t expiration,
193 uint32_t flags)
194 {
195 memcached_return rc;
196 LIBMEMCACHED_MEMCACHED_REPLACE_START();
197 rc= memcached_send(ptr, key, key_length,
198 key, key_length, value, value_length,
199 expiration, flags, 0, REPLACE_OP);
200 LIBMEMCACHED_MEMCACHED_REPLACE_END();
201 return rc;
202 }
203
204 memcached_return memcached_prepend(memcached_st *ptr,
205 const char *key, size_t key_length,
206 const char *value, size_t value_length,
207 time_t expiration,
208 uint32_t flags)
209 {
210 memcached_return rc;
211 rc= memcached_send(ptr, key, key_length,
212 key, key_length, value, value_length,
213 expiration, flags, 0, PREPEND_OP);
214 return rc;
215 }
216
217 memcached_return memcached_append(memcached_st *ptr,
218 const char *key, size_t key_length,
219 const char *value, size_t value_length,
220 time_t expiration,
221 uint32_t flags)
222 {
223 memcached_return rc;
224 rc= memcached_send(ptr, key, key_length,
225 key, key_length, value, value_length,
226 expiration, flags, 0, APPEND_OP);
227 return rc;
228 }
229
230 memcached_return memcached_cas(memcached_st *ptr,
231 const char *key, size_t key_length,
232 const char *value, size_t value_length,
233 time_t expiration,
234 uint32_t flags,
235 uint64_t cas)
236 {
237 memcached_return rc;
238 rc= memcached_send(ptr, key, key_length,
239 key, key_length, value, value_length,
240 expiration, flags, cas, CAS_OP);
241 return rc;
242 }
243
244 memcached_return memcached_set_by_key(memcached_st *ptr,
245 const char *master_key __attribute__((unused)),
246 size_t master_key_length __attribute__((unused)),
247 const char *key, size_t key_length,
248 const char *value, size_t value_length,
249 time_t expiration,
250 uint32_t flags)
251 {
252 memcached_return rc;
253 LIBMEMCACHED_MEMCACHED_SET_START();
254 rc= memcached_send(ptr, master_key, master_key_length,
255 key, key_length, value, value_length,
256 expiration, flags, 0, SET_OP);
257 LIBMEMCACHED_MEMCACHED_SET_END();
258 return rc;
259 }
260
261 memcached_return memcached_add_by_key(memcached_st *ptr,
262 const char *master_key, size_t master_key_length,
263 const char *key, size_t key_length,
264 const char *value, size_t value_length,
265 time_t expiration,
266 uint32_t flags)
267 {
268 memcached_return rc;
269 LIBMEMCACHED_MEMCACHED_ADD_START();
270 rc= memcached_send(ptr, master_key, master_key_length,
271 key, key_length, value, value_length,
272 expiration, flags, 0, ADD_OP);
273 LIBMEMCACHED_MEMCACHED_ADD_END();
274 return rc;
275 }
276
277 memcached_return memcached_replace_by_key(memcached_st *ptr,
278 const char *master_key, size_t master_key_length,
279 const char *key, size_t key_length,
280 const char *value, size_t value_length,
281 time_t expiration,
282 uint32_t flags)
283 {
284 memcached_return rc;
285 LIBMEMCACHED_MEMCACHED_REPLACE_START();
286 rc= memcached_send(ptr, master_key, master_key_length,
287 key, key_length, value, value_length,
288 expiration, flags, 0, REPLACE_OP);
289 LIBMEMCACHED_MEMCACHED_REPLACE_END();
290 return rc;
291 }
292
293 memcached_return memcached_prepend_by_key(memcached_st *ptr,
294 const char *master_key, size_t master_key_length,
295 const char *key, size_t key_length,
296 const char *value, size_t value_length,
297 time_t expiration,
298 uint32_t flags)
299 {
300 memcached_return rc;
301 rc= memcached_send(ptr, master_key, master_key_length,
302 key, key_length, value, value_length,
303 expiration, flags, 0, PREPEND_OP);
304 return rc;
305 }
306
307 memcached_return memcached_append_by_key(memcached_st *ptr,
308 const char *master_key, size_t master_key_length,
309 const char *key, size_t key_length,
310 const char *value, size_t value_length,
311 time_t expiration,
312 uint32_t flags)
313 {
314 memcached_return rc;
315 rc= memcached_send(ptr, master_key, master_key_length,
316 key, key_length, value, value_length,
317 expiration, flags, 0, APPEND_OP);
318 return rc;
319 }
320
321 memcached_return memcached_cas_by_key(memcached_st *ptr,
322 const char *master_key, size_t master_key_length,
323 const char *key, size_t key_length,
324 const char *value, size_t value_length,
325 time_t expiration,
326 uint32_t flags,
327 uint64_t cas)
328 {
329 memcached_return rc;
330 rc= memcached_send(ptr, master_key, master_key_length,
331 key, key_length, value, value_length,
332 expiration, flags, cas, CAS_OP);
333 return rc;
334 }
335
336 static inline uint8_t get_com_code(memcached_storage_action verb, bool noreply) {
337 uint8_t ret;
338
339 if (noreply)
340 switch (verb)
341 {
342 case SET_OP:
343 ret=PROTOCOL_BINARY_CMD_SETQ;
344 break;
345 case ADD_OP:
346 ret=PROTOCOL_BINARY_CMD_ADDQ;
347 break;
348 case CAS_OP: /* FALLTHROUGH */
349 case REPLACE_OP:
350 ret=PROTOCOL_BINARY_CMD_REPLACEQ;
351 break;
352 case APPEND_OP:
353 ret=PROTOCOL_BINARY_CMD_APPENDQ;
354 break;
355 case PREPEND_OP:
356 ret=PROTOCOL_BINARY_CMD_PREPENDQ;
357 break;
358 }
359 else
360 switch (verb)
361 {
362 case SET_OP:
363 ret=PROTOCOL_BINARY_CMD_SET;
364 break;
365 case ADD_OP:
366 ret=PROTOCOL_BINARY_CMD_ADD;
367 break;
368 case CAS_OP: /* FALLTHROUGH */
369 case REPLACE_OP:
370 ret=PROTOCOL_BINARY_CMD_REPLACE;
371 break;
372 case APPEND_OP:
373 ret=PROTOCOL_BINARY_CMD_APPEND;
374 break;
375 case PREPEND_OP:
376 ret=PROTOCOL_BINARY_CMD_PREPEND;
377 break;
378 }
379
380 return ret;
381 }
382
383
384
385 static memcached_return memcached_send_binary(memcached_server_st* server,
386 const char *key,
387 size_t key_length,
388 const char *value,
389 size_t value_length,
390 time_t expiration,
391 uint32_t flags,
392 uint64_t cas,
393 memcached_storage_action verb)
394 {
395 char flush;
396 protocol_binary_request_set request= {.bytes= {0}};
397 size_t send_length= sizeof(request.bytes);
398 bool noreply = server->root->flags & MEM_NOREPLY;
399
400 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
401 request.message.header.request.opcode= get_com_code(verb, noreply);
402 request.message.header.request.keylen= htons((uint16_t)key_length);
403 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
404 if (verb == APPEND_OP || verb == PREPEND_OP)
405 send_length -= 8; /* append & prepend does not contain extras! */
406 else
407 {
408 request.message.header.request.extlen= 8;
409 request.message.body.flags= htonl(flags);
410 request.message.body.expiration= htonl((uint32_t)expiration);
411 }
412
413 request.message.header.request.bodylen= htonl(key_length + value_length +
414 request.message.header.request.extlen);
415
416 if (cas)
417 request.message.header.request.cas= htonll(cas);
418
419 flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
420
421 /* write the header */
422 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
423 (memcached_io_write(server, key, key_length, 0) == -1) ||
424 (memcached_io_write(server, value, value_length, flush) == -1))
425 {
426 memcached_io_reset(server);
427 return MEMCACHED_WRITE_FAILURE;
428 }
429
430 if (noreply)
431 memcached_server_response_decrement(server);
432
433 if (flush == 0)
434 return MEMCACHED_BUFFERED;
435
436 if (noreply)
437 return MEMCACHED_SUCCESS;
438
439 return memcached_response(server, NULL, 0, NULL);
440 }
441