Added prototype for memcached_flush_buffers
[awesomized/libmemcached] / libmemcached / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9 #include "common.h"
10 #include "memcached_io.h"
11
12 typedef enum {
13 SET_OP,
14 REPLACE_OP,
15 ADD_OP,
16 PREPEND_OP,
17 APPEND_OP,
18 CAS_OP,
19 } memcached_storage_action;
20
21 /* Inline this */
22 static char *storage_op_string(memcached_storage_action verb)
23 {
24 switch (verb)
25 {
26 case SET_OP:
27 return "set";
28 case REPLACE_OP:
29 return "replace";
30 case ADD_OP:
31 return "add";
32 case PREPEND_OP:
33 return "prepend";
34 case APPEND_OP:
35 return "append";
36 case CAS_OP:
37 return "cas";
38 default:
39 return "tosserror"; /* This is impossible, fixes issue for compiler warning in VisualStudio */
40 };
41
42 return SET_OP;
43 }
44
45 static memcached_return memcached_send_binary(memcached_server_st* server,
46 const char *key,
47 size_t key_length,
48 const char *value,
49 size_t value_length,
50 time_t expiration,
51 uint32_t flags,
52 uint64_t cas,
53 memcached_storage_action verb);
54
55 static inline memcached_return memcached_send(memcached_st *ptr,
56 const char *master_key, size_t master_key_length,
57 const char *key, size_t key_length,
58 const char *value, size_t value_length,
59 time_t expiration,
60 uint32_t flags,
61 uint64_t cas,
62 memcached_storage_action verb)
63 {
64 char to_write;
65 size_t write_length;
66 ssize_t sent_length;
67 memcached_return rc;
68 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
69 unsigned int server_key;
70
71 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
72
73 rc= memcached_validate_key_length(key_length, ptr->flags & MEM_BINARY_PROTOCOL);
74 unlikely (rc != MEMCACHED_SUCCESS)
75 return rc;
76
77 unlikely (ptr->number_of_hosts == 0)
78 return MEMCACHED_NO_SERVERS;
79
80 if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test((char **)&key, &key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
81 return MEMCACHED_BAD_KEY_PROVIDED;
82
83 server_key= memcached_generate_hash(ptr, master_key, master_key_length);
84
85 if (ptr->flags & MEM_BINARY_PROTOCOL)
86 return memcached_send_binary(&ptr->hosts[server_key], key, key_length,
87 value, value_length, expiration,
88 flags, cas, verb);
89
90 if (cas)
91 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
92 "%s %s%.*s %u %llu %zu %llu\r\n", storage_op_string(verb),
93 ptr->prefix_key,
94 (int)key_length, key, flags,
95 (unsigned long long)expiration, value_length,
96 (unsigned long long)cas);
97 else
98 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
99 "%s %s%.*s %u %llu %zu\r\n", storage_op_string(verb),
100 ptr->prefix_key,
101 (int)key_length, key, flags,
102 (unsigned long long)expiration, value_length);
103
104 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
105 {
106 rc= MEMCACHED_WRITE_FAILURE;
107 goto error;
108 }
109
110 /* Send command header */
111 rc= memcached_do(&ptr->hosts[server_key], buffer, write_length, 0);
112 if (rc != MEMCACHED_SUCCESS)
113 goto error;
114
115 /* Send command body */
116 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], value, value_length, 0)) == -1)
117 {
118 rc= MEMCACHED_WRITE_FAILURE;
119 goto error;
120 }
121
122 if ((ptr->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP)
123 to_write= 0;
124 else
125 to_write= 1;
126
127 if (ptr->flags & MEM_NOREPLY)
128 {
129 if (memcached_io_write(&ptr->hosts[server_key], " noreply\r\n",
130 10, to_write) == -1)
131 {
132 rc= MEMCACHED_WRITE_FAILURE;
133 goto error;
134 }
135
136 memcached_server_response_decrement(&ptr->hosts[server_key]);
137 return (to_write == 0) ? MEMCACHED_BUFFERED : MEMCACHED_SUCCESS;
138 }
139
140 if ((sent_length= memcached_io_write(&ptr->hosts[server_key], "\r\n", 2, to_write)) == -1)
141 {
142 rc= MEMCACHED_WRITE_FAILURE;
143 goto error;
144 }
145
146 if (to_write == 0)
147 return MEMCACHED_BUFFERED;
148
149 rc= memcached_response(&ptr->hosts[server_key], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, NULL);
150
151 if (rc == MEMCACHED_STORED)
152 return MEMCACHED_SUCCESS;
153 else
154 return rc;
155
156 error:
157 memcached_io_reset(&ptr->hosts[server_key]);
158
159 return rc;
160 }
161
162
163 memcached_return memcached_set(memcached_st *ptr, const char *key, size_t key_length,
164 const char *value, size_t value_length,
165 time_t expiration,
166 uint32_t flags)
167 {
168 memcached_return rc;
169 LIBMEMCACHED_MEMCACHED_SET_START();
170 rc= memcached_send(ptr, key, key_length,
171 key, key_length, value, value_length,
172 expiration, flags, 0, SET_OP);
173 LIBMEMCACHED_MEMCACHED_SET_END();
174 return rc;
175 }
176
177 memcached_return memcached_add(memcached_st *ptr,
178 const char *key, size_t key_length,
179 const char *value, size_t value_length,
180 time_t expiration,
181 uint32_t flags)
182 {
183 memcached_return rc;
184 LIBMEMCACHED_MEMCACHED_ADD_START();
185 rc= memcached_send(ptr, key, key_length,
186 key, key_length, value, value_length,
187 expiration, flags, 0, ADD_OP);
188 LIBMEMCACHED_MEMCACHED_ADD_END();
189 return rc;
190 }
191
192 memcached_return memcached_replace(memcached_st *ptr,
193 const char *key, size_t key_length,
194 const char *value, size_t value_length,
195 time_t expiration,
196 uint32_t flags)
197 {
198 memcached_return rc;
199 LIBMEMCACHED_MEMCACHED_REPLACE_START();
200 rc= memcached_send(ptr, key, key_length,
201 key, key_length, value, value_length,
202 expiration, flags, 0, REPLACE_OP);
203 LIBMEMCACHED_MEMCACHED_REPLACE_END();
204 return rc;
205 }
206
207 memcached_return memcached_prepend(memcached_st *ptr,
208 const char *key, size_t key_length,
209 const char *value, size_t value_length,
210 time_t expiration,
211 uint32_t flags)
212 {
213 memcached_return rc;
214 rc= memcached_send(ptr, key, key_length,
215 key, key_length, value, value_length,
216 expiration, flags, 0, PREPEND_OP);
217 return rc;
218 }
219
220 memcached_return memcached_append(memcached_st *ptr,
221 const char *key, size_t key_length,
222 const char *value, size_t value_length,
223 time_t expiration,
224 uint32_t flags)
225 {
226 memcached_return rc;
227 rc= memcached_send(ptr, key, key_length,
228 key, key_length, value, value_length,
229 expiration, flags, 0, APPEND_OP);
230 return rc;
231 }
232
233 memcached_return memcached_cas(memcached_st *ptr,
234 const char *key, size_t key_length,
235 const char *value, size_t value_length,
236 time_t expiration,
237 uint32_t flags,
238 uint64_t cas)
239 {
240 memcached_return rc;
241 rc= memcached_send(ptr, key, key_length,
242 key, key_length, value, value_length,
243 expiration, flags, cas, CAS_OP);
244 return rc;
245 }
246
247 memcached_return memcached_set_by_key(memcached_st *ptr,
248 const char *master_key __attribute__((unused)),
249 size_t master_key_length __attribute__((unused)),
250 const char *key, size_t key_length,
251 const char *value, size_t value_length,
252 time_t expiration,
253 uint32_t flags)
254 {
255 memcached_return rc;
256 LIBMEMCACHED_MEMCACHED_SET_START();
257 rc= memcached_send(ptr, master_key, master_key_length,
258 key, key_length, value, value_length,
259 expiration, flags, 0, SET_OP);
260 LIBMEMCACHED_MEMCACHED_SET_END();
261 return rc;
262 }
263
264 memcached_return memcached_add_by_key(memcached_st *ptr,
265 const char *master_key, size_t master_key_length,
266 const char *key, size_t key_length,
267 const char *value, size_t value_length,
268 time_t expiration,
269 uint32_t flags)
270 {
271 memcached_return rc;
272 LIBMEMCACHED_MEMCACHED_ADD_START();
273 rc= memcached_send(ptr, master_key, master_key_length,
274 key, key_length, value, value_length,
275 expiration, flags, 0, ADD_OP);
276 LIBMEMCACHED_MEMCACHED_ADD_END();
277 return rc;
278 }
279
280 memcached_return memcached_replace_by_key(memcached_st *ptr,
281 const char *master_key, size_t master_key_length,
282 const char *key, size_t key_length,
283 const char *value, size_t value_length,
284 time_t expiration,
285 uint32_t flags)
286 {
287 memcached_return rc;
288 LIBMEMCACHED_MEMCACHED_REPLACE_START();
289 rc= memcached_send(ptr, master_key, master_key_length,
290 key, key_length, value, value_length,
291 expiration, flags, 0, REPLACE_OP);
292 LIBMEMCACHED_MEMCACHED_REPLACE_END();
293 return rc;
294 }
295
296 memcached_return memcached_prepend_by_key(memcached_st *ptr,
297 const char *master_key, size_t master_key_length,
298 const char *key, size_t key_length,
299 const char *value, size_t value_length,
300 time_t expiration,
301 uint32_t flags)
302 {
303 memcached_return rc;
304 rc= memcached_send(ptr, master_key, master_key_length,
305 key, key_length, value, value_length,
306 expiration, flags, 0, PREPEND_OP);
307 return rc;
308 }
309
310 memcached_return memcached_append_by_key(memcached_st *ptr,
311 const char *master_key, size_t master_key_length,
312 const char *key, size_t key_length,
313 const char *value, size_t value_length,
314 time_t expiration,
315 uint32_t flags)
316 {
317 memcached_return rc;
318 rc= memcached_send(ptr, master_key, master_key_length,
319 key, key_length, value, value_length,
320 expiration, flags, 0, APPEND_OP);
321 return rc;
322 }
323
324 memcached_return memcached_cas_by_key(memcached_st *ptr,
325 const char *master_key, size_t master_key_length,
326 const char *key, size_t key_length,
327 const char *value, size_t value_length,
328 time_t expiration,
329 uint32_t flags,
330 uint64_t cas)
331 {
332 memcached_return rc;
333 rc= memcached_send(ptr, master_key, master_key_length,
334 key, key_length, value, value_length,
335 expiration, flags, cas, CAS_OP);
336 return rc;
337 }
338
339 static memcached_return memcached_send_binary(memcached_server_st* server,
340 const char *key,
341 size_t key_length,
342 const char *value,
343 size_t value_length,
344 time_t expiration,
345 uint32_t flags,
346 uint64_t cas,
347 memcached_storage_action verb)
348 {
349 char flush;
350 protocol_binary_request_set request= {.bytes= {0}};
351 size_t send_length= sizeof(request.bytes);
352
353 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
354 switch (verb)
355 {
356 case SET_OP:
357 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_SET;
358 break;
359 case ADD_OP:
360 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_ADD;
361 break;
362 case REPLACE_OP:
363 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
364 break;
365 case APPEND_OP:
366 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_APPEND;
367 break;
368 case PREPEND_OP:
369 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_PREPEND;
370 break;
371 case CAS_OP:
372 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_REPLACE;
373 break;
374 }
375
376 request.message.header.request.keylen= htons((uint16_t)key_length);
377 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
378 if (verb == APPEND_OP || verb == PREPEND_OP)
379 send_length -= 8; /* append & prepend does not contain extras! */
380 else
381 {
382 request.message.header.request.extlen= 8;
383 request.message.body.flags= htonl(flags);
384 request.message.body.expiration= htonl((uint32_t)expiration);
385 }
386
387 request.message.header.request.bodylen= htonl(key_length + value_length +
388 request.message.header.request.extlen);
389
390 if (cas)
391 request.message.header.request.cas= htonll(cas);
392
393 flush= ((server->root->flags & MEM_BUFFER_REQUESTS) && verb == SET_OP) ? 0 : 1;
394
395 /* write the header */
396 if ((memcached_do(server, (const char*)request.bytes, send_length, 0) != MEMCACHED_SUCCESS) ||
397 (memcached_io_write(server, key, key_length, 0) == -1) ||
398 (memcached_io_write(server, value, value_length, flush) == -1))
399 {
400 memcached_io_reset(server);
401 return MEMCACHED_WRITE_FAILURE;
402 }
403
404 if (flush == 0)
405 return MEMCACHED_BUFFERED;
406
407 return memcached_response(server, NULL, 0, NULL);
408 }
409