Work inspired by matt knox
[m6w6/libmemcached] / libmemcached / memcached_get.c
1 #include "common.h"
2 #include "memcached_io.h"
3
4 /*
5 What happens if no servers exist?
6 */
7 char *memcached_get(memcached_st *ptr, const char *key,
8 size_t key_length,
9 size_t *value_length,
10 uint32_t *flags,
11 memcached_return *error)
12 {
13 return memcached_get_by_key(ptr, NULL, 0, key, key_length, value_length,
14 flags, error);
15 }
16
17 char *memcached_get_by_key(memcached_st *ptr,
18 const char *master_key,
19 size_t master_key_length,
20 const char *key, size_t key_length,
21 size_t *value_length,
22 uint32_t *flags,
23 memcached_return *error)
24 {
25 char *value;
26 size_t dummy_length;
27 uint32_t dummy_flags;
28 memcached_return dummy_error;
29
30 unlikely (ptr->flags & MEM_USE_UDP)
31 {
32 *error= MEMCACHED_NOT_SUPPORTED;
33 return NULL;
34 }
35
36 /* Request the key */
37 *error= memcached_mget_by_key(ptr,
38 master_key,
39 master_key_length,
40 (char **)&key, &key_length, 1);
41
42 value= memcached_fetch(ptr, NULL, NULL,
43 value_length, flags, error);
44 /* This is for historical reasons */
45 if (*error == MEMCACHED_END)
46 *error= MEMCACHED_NOTFOUND;
47
48 if (value == NULL)
49 {
50 if (ptr->get_key_failure && *error == MEMCACHED_NOTFOUND)
51 {
52 memcached_return rc;
53
54 memcached_result_reset(&ptr->result);
55 rc= ptr->get_key_failure(ptr, key, key_length, &ptr->result);
56
57 /* On all failure drop to returning NULL */
58 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
59 {
60 if (rc == MEMCACHED_BUFFERED)
61 {
62 uint8_t latch; /* We use latch to track the state of the original socket */
63 latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS);
64 if (latch == 0)
65 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
66
67 rc= memcached_set(ptr, key, key_length,
68 memcached_result_value(&ptr->result),
69 memcached_result_length(&ptr->result),
70 0, memcached_result_flags(&ptr->result));
71
72 if (rc == MEMCACHED_BUFFERED && latch == 0)
73 memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 0);
74 }
75 else
76 {
77 rc= memcached_set(ptr, key, key_length,
78 memcached_result_value(&ptr->result),
79 memcached_result_length(&ptr->result),
80 0, memcached_result_flags(&ptr->result));
81 }
82
83 if (rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED)
84 {
85 *error= rc;
86 *value_length= memcached_result_length(&ptr->result);
87 *flags= memcached_result_flags(&ptr->result);
88 return memcached_string_c_copy(&ptr->result.value);
89 }
90 }
91 }
92
93 return NULL;
94 }
95
96 (void)memcached_fetch(ptr, NULL, NULL,
97 &dummy_length, &dummy_flags,
98 &dummy_error);
99 WATCHPOINT_ASSERT(dummy_length == 0);
100
101 return value;
102 }
103
104 memcached_return memcached_mget(memcached_st *ptr,
105 char **keys, size_t *key_length,
106 unsigned int number_of_keys)
107 {
108 return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys);
109 }
110
111 static memcached_return binary_mget_by_key(memcached_st *ptr,
112 unsigned int master_server_key,
113 bool is_master_key_set,
114 char **keys, size_t *key_length,
115 unsigned int number_of_keys);
116
117 memcached_return memcached_mget_by_key(memcached_st *ptr,
118 const char *master_key,
119 size_t master_key_length,
120 char **keys,
121 size_t *key_length,
122 unsigned int number_of_keys)
123 {
124 unsigned int x;
125 memcached_return rc= MEMCACHED_NOTFOUND;
126 char *get_command= "get ";
127 uint8_t get_command_length= 4;
128 unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */
129 bool is_master_key_set= false;
130
131 unlikely (ptr->flags & MEM_USE_UDP)
132 return MEMCACHED_NOT_SUPPORTED;
133
134 LIBMEMCACHED_MEMCACHED_MGET_START();
135 ptr->cursor_server= 0;
136
137 if (number_of_keys == 0)
138 return MEMCACHED_NOTFOUND;
139
140 if (ptr->number_of_hosts == 0)
141 return MEMCACHED_NO_SERVERS;
142
143 if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED))
144 return MEMCACHED_BAD_KEY_PROVIDED;
145
146 if (master_key && master_key_length)
147 {
148 if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED))
149 return MEMCACHED_BAD_KEY_PROVIDED;
150 master_server_key= memcached_generate_hash(ptr, master_key, master_key_length);
151 is_master_key_set= true;
152 }
153
154 /*
155 Here is where we pay for the non-block API. We need to remove any data sitting
156 in the queue before we start our get.
157
158 It might be optimum to bounce the connection if count > some number.
159 */
160 for (x= 0; x < ptr->number_of_hosts; x++)
161 {
162 if (memcached_server_response_count(&ptr->hosts[x]))
163 {
164 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
165
166 if (ptr->flags & MEM_NO_BLOCK)
167 (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1);
168
169 while(memcached_server_response_count(&ptr->hosts[x]))
170 (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result);
171 }
172 }
173
174 if (ptr->flags & MEM_BINARY_PROTOCOL)
175 return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys,
176 key_length, number_of_keys);
177
178 if (ptr->flags & MEM_SUPPORT_CAS)
179 {
180 get_command= "gets ";
181 get_command_length= 5;
182 }
183
184 /*
185 If a server fails we warn about errors and start all over with sending keys
186 to the server.
187 */
188 for (x= 0; x < number_of_keys; x++)
189 {
190 unsigned int server_key;
191
192 if (is_master_key_set)
193 server_key= master_server_key;
194 else
195 server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
196
197 if (memcached_server_response_count(&ptr->hosts[server_key]) == 0)
198 {
199 rc= memcached_connect(&ptr->hosts[server_key]);
200
201 if (rc != MEMCACHED_SUCCESS)
202 continue;
203
204 if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1)
205 {
206 rc= MEMCACHED_SOME_ERRORS;
207 continue;
208 }
209 WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0);
210 memcached_server_response_increment(&ptr->hosts[server_key]);
211 WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1);
212 }
213
214 /* Only called when we have a prefix key */
215 if (ptr->prefix_key[0] != 0)
216 {
217 if ((memcached_io_write(&ptr->hosts[server_key], ptr->prefix_key, ptr->prefix_key_length, 0)) == -1)
218 {
219 memcached_server_response_reset(&ptr->hosts[server_key]);
220 rc= MEMCACHED_SOME_ERRORS;
221 continue;
222 }
223 }
224
225 if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1)
226 {
227 memcached_server_response_reset(&ptr->hosts[server_key]);
228 rc= MEMCACHED_SOME_ERRORS;
229 continue;
230 }
231
232 if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1)
233 {
234 memcached_server_response_reset(&ptr->hosts[server_key]);
235 rc= MEMCACHED_SOME_ERRORS;
236 continue;
237 }
238 }
239
240 /*
241 Should we muddle on if some servers are dead?
242 */
243 for (x= 0; x < ptr->number_of_hosts; x++)
244 {
245 if (memcached_server_response_count(&ptr->hosts[x]))
246 {
247 /* We need to do something about non-connnected hosts in the future */
248 if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1)
249 {
250 rc= MEMCACHED_SOME_ERRORS;
251 }
252 }
253 }
254
255 LIBMEMCACHED_MEMCACHED_MGET_END();
256 return rc;
257 }
258
259 static memcached_return simple_binary_mget(memcached_st *ptr,
260 unsigned int master_server_key,
261 bool is_master_key_set,
262 char **keys, size_t *key_length,
263 unsigned int number_of_keys)
264 {
265 memcached_return rc= MEMCACHED_NOTFOUND;
266 uint32_t x;
267
268 int flush= number_of_keys == 1;
269
270 /*
271 If a server fails we warn about errors and start all over with sending keys
272 to the server.
273 */
274 for (x= 0; x < number_of_keys; x++)
275 {
276 unsigned int server_key;
277
278 if (is_master_key_set)
279 server_key= master_server_key;
280 else
281 server_key= memcached_generate_hash(ptr, keys[x], key_length[x]);
282
283 if (memcached_server_response_count(&ptr->hosts[server_key]) == 0)
284 {
285 rc= memcached_connect(&ptr->hosts[server_key]);
286 if (rc != MEMCACHED_SUCCESS)
287 continue;
288 }
289
290 protocol_binary_request_getk request= {.bytes= {0}};
291 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
292 if (number_of_keys == 1)
293 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
294 else
295 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
296
297 memcached_return vk;
298 vk= memcached_validate_key_length(key_length[x],
299 ptr->flags & MEM_BINARY_PROTOCOL);
300 unlikely (vk != MEMCACHED_SUCCESS)
301 {
302 if (x > 0)
303 memcached_io_reset(&ptr->hosts[server_key]);
304 return vk;
305 }
306
307 request.message.header.request.keylen= htons((uint16_t)key_length[x]);
308 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
309 request.message.header.request.bodylen= htonl(key_length[x]);
310
311 if ((memcached_io_write(&ptr->hosts[server_key], request.bytes,
312 sizeof(request.bytes), 0) == -1) ||
313 (memcached_io_write(&ptr->hosts[server_key], keys[x],
314 key_length[x], flush) == -1))
315 {
316 memcached_server_response_reset(&ptr->hosts[server_key]);
317 rc= MEMCACHED_SOME_ERRORS;
318 continue;
319 }
320 memcached_server_response_increment(&ptr->hosts[server_key]);
321 if ((x > 0 && x == ptr->io_key_prefetch) &&
322 memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS)
323 rc= MEMCACHED_SOME_ERRORS;
324 }
325
326 if (number_of_keys > 1)
327 {
328 /*
329 * Send a noop command to flush the buffers
330 */
331 protocol_binary_request_noop request= {.bytes= {0}};
332 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
333 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
334 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
335
336 for (x= 0; x < ptr->number_of_hosts; x++)
337 if (memcached_server_response_count(&ptr->hosts[x]))
338 {
339 if (memcached_io_write(&ptr->hosts[x], NULL, 0, 1) == -1)
340 {
341 memcached_server_response_reset(&ptr->hosts[x]);
342 memcached_io_reset(&ptr->hosts[x]);
343 rc= MEMCACHED_SOME_ERRORS;
344 }
345
346 if (memcached_io_write(&ptr->hosts[x], request.bytes,
347 sizeof(request.bytes), 1) == -1)
348 {
349 memcached_server_response_reset(&ptr->hosts[x]);
350 memcached_io_reset(&ptr->hosts[x]);
351 rc= MEMCACHED_SOME_ERRORS;
352 }
353 memcached_server_response_increment(&ptr->hosts[x]);
354 }
355 }
356
357
358 return rc;
359 }
360
361 static memcached_return replication_binary_mget(memcached_st *ptr,
362 uint32_t* hash, bool* dead_servers,
363 char **keys, size_t *key_length,
364 unsigned int number_of_keys)
365 {
366 memcached_return rc= MEMCACHED_NOTFOUND;
367 uint32_t x;
368
369 int flush= number_of_keys == 1;
370
371 for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica)
372 {
373 bool success= true;
374
375 for (x= 0; x < number_of_keys; ++x)
376 {
377 if (hash[x] == ptr->number_of_hosts)
378 continue; /* Already successfully sent */
379
380 uint32_t server= hash[x] + replica;
381 while (server >= ptr->number_of_hosts)
382 server -= ptr->number_of_hosts;
383
384 if (dead_servers[server])
385 continue;
386
387 if (memcached_server_response_count(&ptr->hosts[server]) == 0)
388 {
389 rc= memcached_connect(&ptr->hosts[server]);
390 if (rc != MEMCACHED_SUCCESS)
391 {
392 memcached_io_reset(&ptr->hosts[server]);
393 dead_servers[server]= true;
394 success= false;
395 continue;
396 }
397 }
398
399 protocol_binary_request_getk request= {.bytes= {0}};
400 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
401 if (number_of_keys == 1)
402 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK;
403 else
404 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ;
405
406 request.message.header.request.keylen= htons((uint16_t)key_length[x]);
407 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
408 request.message.header.request.bodylen= htonl(key_length[x]);
409
410 if ((memcached_io_write(&ptr->hosts[server], request.bytes,
411 sizeof(request.bytes), 0) == -1) ||
412 (memcached_io_write(&ptr->hosts[server], keys[x],
413 key_length[x], flush) == -1))
414 {
415 memcached_io_reset(&ptr->hosts[server]);
416 dead_servers[server]= true;
417 success= false;
418 continue;
419 }
420 memcached_server_response_increment(&ptr->hosts[server]);
421 }
422
423 if (number_of_keys > 1)
424 {
425 /*
426 * Send a noop command to flush the buffers
427 */
428 protocol_binary_request_noop request= {.bytes= {0}};
429 request.message.header.request.magic= PROTOCOL_BINARY_REQ;
430 request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP;
431 request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES;
432
433 for (x= 0; x < ptr->number_of_hosts; x++)
434 if (memcached_server_response_count(&ptr->hosts[x]))
435 {
436 if (memcached_io_write(&ptr->hosts[x], request.bytes,
437 sizeof(request.bytes), 1) == -1)
438 {
439 memcached_io_reset(&ptr->hosts[x]);
440 dead_servers[x]= true;
441 success= false;
442 }
443 memcached_server_response_increment(&ptr->hosts[x]);
444
445 /* mark all of the messages bound for this server as sent! */
446 for (x= 0; x < number_of_keys; ++x)
447 if (hash[x] == x)
448 hash[x]= ptr->number_of_hosts;
449 }
450 }
451
452 if (success)
453 break;
454 }
455
456 return rc;
457 }
458
459 static memcached_return binary_mget_by_key(memcached_st *ptr,
460 unsigned int master_server_key,
461 bool is_master_key_set,
462 char **keys, size_t *key_length,
463 unsigned int number_of_keys)
464 {
465 memcached_return rc;
466
467 if (ptr->number_of_replicas == 0)
468 {
469 rc= simple_binary_mget(ptr, master_server_key, is_master_key_set,
470 keys, key_length, number_of_keys);
471 }
472 else
473 {
474 uint32_t* hash;
475 bool* dead_servers;
476
477 hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys);
478 dead_servers= ptr->call_calloc(ptr, ptr->number_of_hosts, sizeof(bool));
479
480 if (hash == NULL || dead_servers == NULL)
481 {
482 ptr->call_free(ptr, hash);
483 ptr->call_free(ptr, dead_servers);
484 return MEMCACHED_MEMORY_ALLOCATION_FAILURE;
485 }
486
487 if (is_master_key_set)
488 for (unsigned int x= 0; x < number_of_keys; x++)
489 hash[x]= master_server_key;
490 else
491 for (unsigned int x= 0; x < number_of_keys; x++)
492 hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]);
493
494 rc= replication_binary_mget(ptr, hash, dead_servers, keys,
495 key_length, number_of_keys);
496
497 ptr->call_free(ptr, hash);
498 ptr->call_free(ptr, dead_servers);
499
500 return MEMCACHED_SUCCESS;
501 }
502
503 return rc;
504 }