2 #include "memcached_io.h"
5 What happens if no servers exist?
7 char *memcached_get(memcached_st
*ptr
, const char *key
,
11 memcached_return
*error
)
13 return memcached_get_by_key(ptr
, NULL
, 0, key
, key_length
, value_length
,
17 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
18 const char *master_key
,
19 size_t master_key_length
,
20 const char * const *keys
,
21 const size_t *key_length
,
22 size_t number_of_keys
,
25 char *memcached_get_by_key(memcached_st
*ptr
,
26 const char *master_key
,
27 size_t master_key_length
,
28 const char *key
, size_t key_length
,
31 memcached_return
*error
)
36 memcached_return dummy_error
;
38 unlikely (ptr
->flags
& MEM_USE_UDP
)
40 *error
= MEMCACHED_NOT_SUPPORTED
;
45 *error
= memcached_mget_by_key_real(ptr
, master_key
, master_key_length
,
46 (const char * const *)&key
,
47 &key_length
, 1, false);
49 value
= memcached_fetch(ptr
, NULL
, NULL
,
50 value_length
, flags
, error
);
51 /* This is for historical reasons */
52 if (*error
== MEMCACHED_END
)
53 *error
= MEMCACHED_NOTFOUND
;
57 if (ptr
->get_key_failure
&& *error
== MEMCACHED_NOTFOUND
)
61 memcached_result_reset(&ptr
->result
);
62 rc
= ptr
->get_key_failure(ptr
, key
, key_length
, &ptr
->result
);
64 /* On all failure drop to returning NULL */
65 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
67 if (rc
== MEMCACHED_BUFFERED
)
69 uint64_t latch
; /* We use latch to track the state of the original socket */
70 latch
= memcached_behavior_get(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
);
72 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 1);
74 rc
= memcached_set(ptr
, key
, key_length
,
75 memcached_result_value(&ptr
->result
),
76 memcached_result_length(&ptr
->result
),
77 0, memcached_result_flags(&ptr
->result
));
79 if (rc
== MEMCACHED_BUFFERED
&& latch
== 0)
80 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 0);
84 rc
= memcached_set(ptr
, key
, key_length
,
85 memcached_result_value(&ptr
->result
),
86 memcached_result_length(&ptr
->result
),
87 0, memcached_result_flags(&ptr
->result
));
90 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
93 *value_length
= memcached_result_length(&ptr
->result
);
94 *flags
= memcached_result_flags(&ptr
->result
);
95 return memcached_string_c_copy(&ptr
->result
.value
);
103 (void)memcached_fetch(ptr
, NULL
, NULL
,
104 &dummy_length
, &dummy_flags
,
106 WATCHPOINT_ASSERT(dummy_length
== 0);
111 memcached_return
memcached_mget(memcached_st
*ptr
,
112 const char * const *keys
,
113 const size_t *key_length
,
114 size_t number_of_keys
)
116 return memcached_mget_by_key(ptr
, NULL
, 0, keys
, key_length
, number_of_keys
);
119 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
120 unsigned int master_server_key
,
121 bool is_master_key_set
,
122 const char * const *keys
,
123 const size_t *key_length
,
124 size_t number_of_keys
,
127 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
128 const char *master_key
,
129 size_t master_key_length
,
130 const char * const *keys
,
131 const size_t *key_length
,
132 size_t number_of_keys
,
136 memcached_return rc
= MEMCACHED_NOTFOUND
;
137 const char *get_command
= "get ";
138 uint8_t get_command_length
= 4;
139 unsigned int master_server_key
= (unsigned int)-1; /* 0 is a valid server id! */
140 bool is_master_key_set
= false;
142 unlikely (ptr
->flags
& MEM_USE_UDP
)
143 return MEMCACHED_NOT_SUPPORTED
;
145 LIBMEMCACHED_MEMCACHED_MGET_START();
146 ptr
->cursor_server
= 0;
148 if (number_of_keys
== 0)
149 return MEMCACHED_NOTFOUND
;
151 if (ptr
->number_of_hosts
== 0)
152 return MEMCACHED_NO_SERVERS
;
154 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test(keys
, key_length
, number_of_keys
) == MEMCACHED_BAD_KEY_PROVIDED
))
155 return MEMCACHED_BAD_KEY_PROVIDED
;
157 if (master_key
&& master_key_length
)
159 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test((const char * const *)&master_key
, &master_key_length
, 1) == MEMCACHED_BAD_KEY_PROVIDED
))
160 return MEMCACHED_BAD_KEY_PROVIDED
;
161 master_server_key
= memcached_generate_hash(ptr
, master_key
, master_key_length
);
162 is_master_key_set
= true;
166 Here is where we pay for the non-block API. We need to remove any data sitting
167 in the queue before we start our get.
169 It might be optimum to bounce the connection if count > some number.
171 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
173 if (memcached_server_response_count(&ptr
->hosts
[x
]))
175 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
177 if (ptr
->flags
& MEM_NO_BLOCK
)
178 (void)memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1);
180 while(memcached_server_response_count(&ptr
->hosts
[x
]))
181 (void)memcached_response(&ptr
->hosts
[x
], buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, &ptr
->result
);
185 if (ptr
->flags
& MEM_BINARY_PROTOCOL
)
186 return binary_mget_by_key(ptr
, master_server_key
, is_master_key_set
, keys
,
187 key_length
, number_of_keys
, mget_mode
);
189 if (ptr
->flags
& MEM_SUPPORT_CAS
)
191 get_command
= "gets ";
192 get_command_length
= 5;
196 If a server fails we warn about errors and start all over with sending keys
199 for (x
= 0; x
< number_of_keys
; x
++)
201 unsigned int server_key
;
203 if (is_master_key_set
)
204 server_key
= master_server_key
;
206 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
208 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
210 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
212 if (rc
!= MEMCACHED_SUCCESS
)
215 if ((memcached_io_write(&ptr
->hosts
[server_key
], get_command
, get_command_length
, 0)) == -1)
217 rc
= MEMCACHED_SOME_ERRORS
;
220 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 0);
221 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
222 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 1);
225 /* Only called when we have a prefix key */
226 if (ptr
->prefix_key
[0] != 0)
228 if ((memcached_io_write(&ptr
->hosts
[server_key
], ptr
->prefix_key
, ptr
->prefix_key_length
, 0)) == -1)
230 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
231 rc
= MEMCACHED_SOME_ERRORS
;
236 if ((memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
], key_length
[x
], 0)) == -1)
238 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
239 rc
= MEMCACHED_SOME_ERRORS
;
243 if ((memcached_io_write(&ptr
->hosts
[server_key
], " ", 1, 0)) == -1)
245 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
246 rc
= MEMCACHED_SOME_ERRORS
;
252 Should we muddle on if some servers are dead?
254 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
256 if (memcached_server_response_count(&ptr
->hosts
[x
]))
258 /* We need to do something about non-connnected hosts in the future */
259 if ((memcached_io_write(&ptr
->hosts
[x
], "\r\n", 2, 1)) == -1)
261 rc
= MEMCACHED_SOME_ERRORS
;
266 LIBMEMCACHED_MEMCACHED_MGET_END();
270 memcached_return
memcached_mget_by_key(memcached_st
*ptr
,
271 const char *master_key
,
272 size_t master_key_length
,
273 const char * const *keys
,
274 const size_t *key_length
,
275 size_t number_of_keys
)
277 return memcached_mget_by_key_real(ptr
, master_key
, master_key_length
, keys
,
278 key_length
, number_of_keys
, true);
281 memcached_return
memcached_mget_execute(memcached_st
*ptr
,
282 const char * const *keys
,
283 const size_t *key_length
,
284 size_t number_of_keys
,
285 memcached_execute_function
*callback
,
287 unsigned int number_of_callbacks
)
289 return memcached_mget_execute_by_key(ptr
, NULL
, 0, keys
, key_length
,
290 number_of_keys
, callback
,
291 context
, number_of_callbacks
);
294 memcached_return
memcached_mget_execute_by_key(memcached_st
*ptr
,
295 const char *master_key
,
296 size_t master_key_length
,
297 const char * const *keys
,
298 const size_t *key_length
,
299 size_t number_of_keys
,
300 memcached_execute_function
*callback
,
302 unsigned int number_of_callbacks
)
304 if ((ptr
->flags
& MEM_BINARY_PROTOCOL
) == 0)
305 return MEMCACHED_NOT_SUPPORTED
;
308 memcached_callback_st
*original_callbacks
= ptr
->callbacks
;
309 memcached_callback_st cb
= {
312 .number_of_callback
= number_of_callbacks
316 rc
= memcached_mget_by_key(ptr
, master_key
, master_key_length
, keys
,
317 key_length
, number_of_keys
);
318 ptr
->callbacks
= original_callbacks
;
322 static memcached_return
simple_binary_mget(memcached_st
*ptr
,
323 unsigned int master_server_key
,
324 bool is_master_key_set
,
325 const char * const *keys
,
326 const size_t *key_length
,
327 size_t number_of_keys
, bool mget_mode
)
329 memcached_return rc
= MEMCACHED_NOTFOUND
;
332 int flush
= number_of_keys
== 1;
335 If a server fails we warn about errors and start all over with sending keys
338 for (x
= 0; x
< number_of_keys
; x
++)
340 unsigned int server_key
;
342 if (is_master_key_set
)
343 server_key
= master_server_key
;
345 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
347 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
349 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
350 if (rc
!= MEMCACHED_SUCCESS
)
354 protocol_binary_request_getk request
= {.bytes
= {0}};
355 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
357 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
359 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
362 vk
= memcached_validate_key_length(key_length
[x
],
363 ptr
->flags
& MEM_BINARY_PROTOCOL
);
364 unlikely (vk
!= MEMCACHED_SUCCESS
)
367 memcached_io_reset(&ptr
->hosts
[server_key
]);
371 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
372 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
373 request
.message
.header
.request
.bodylen
= htonl((uint32_t) key_length
[x
]);
375 if ((memcached_io_write(&ptr
->hosts
[server_key
], request
.bytes
,
376 sizeof(request
.bytes
), 0) == -1) ||
377 (memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
],
378 key_length
[x
], (char) flush
) == -1))
380 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
381 rc
= MEMCACHED_SOME_ERRORS
;
385 /* We just want one pending response per server */
386 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
387 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
388 if ((x
> 0 && x
== ptr
->io_key_prefetch
) &&
389 memcached_flush_buffers(ptr
) != MEMCACHED_SUCCESS
)
390 rc
= MEMCACHED_SOME_ERRORS
;
396 * Send a noop command to flush the buffers
398 protocol_binary_request_noop request
= {.bytes
= {0}};
399 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
400 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
401 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
403 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
404 if (memcached_server_response_count(&ptr
->hosts
[x
]))
406 if (memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1) == -1)
408 memcached_server_response_reset(&ptr
->hosts
[x
]);
409 memcached_io_reset(&ptr
->hosts
[x
]);
410 rc
= MEMCACHED_SOME_ERRORS
;
413 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
414 sizeof(request
.bytes
), 1) == -1)
416 memcached_server_response_reset(&ptr
->hosts
[x
]);
417 memcached_io_reset(&ptr
->hosts
[x
]);
418 rc
= MEMCACHED_SOME_ERRORS
;
427 static memcached_return
replication_binary_mget(memcached_st
*ptr
,
430 const char *const *keys
,
431 const size_t *key_length
,
432 size_t number_of_keys
)
434 memcached_return rc
= MEMCACHED_NOTFOUND
;
437 for (uint32_t replica
= 0; replica
<= ptr
->number_of_replicas
; ++replica
)
441 for (x
= 0; x
< number_of_keys
; ++x
)
443 if (hash
[x
] == ptr
->number_of_hosts
)
444 continue; /* Already successfully sent */
446 uint32_t server
= hash
[x
] + replica
;
447 while (server
>= ptr
->number_of_hosts
)
448 server
-= ptr
->number_of_hosts
;
450 if (dead_servers
[server
])
453 if (memcached_server_response_count(&ptr
->hosts
[server
]) == 0)
455 rc
= memcached_connect(&ptr
->hosts
[server
]);
456 if (rc
!= MEMCACHED_SUCCESS
)
458 memcached_io_reset(&ptr
->hosts
[server
]);
459 dead_servers
[server
]= true;
465 protocol_binary_request_getk request
= {
466 .message
.header
.request
= {
467 .magic
= PROTOCOL_BINARY_REQ
,
468 .opcode
= PROTOCOL_BINARY_CMD_GETK
,
469 .keylen
= htons((uint16_t)key_length
[x
]),
470 .datatype
= PROTOCOL_BINARY_RAW_BYTES
,
471 .bodylen
= htonl((uint32_t)key_length
[x
])
476 * We need to disable buffering to actually know that the request was
477 * successfully sent to the server (so that we should expect a result
478 * back). It would be nice to do this in buffered mode, but then it
479 * would be complex to handle all error situations if we got to send
480 * some of the messages, and then we failed on writing out some others
481 * and we used the callback interface from memcached_mget_execute so
482 * that we might have processed some of the responses etc. For now,
483 * just make sure we work _correctly_
485 if ((memcached_io_write(&ptr
->hosts
[server
], request
.bytes
,
486 sizeof(request
.bytes
), 0) == -1) ||
487 (memcached_io_write(&ptr
->hosts
[server
], keys
[x
],
488 key_length
[x
], 1) == -1))
490 memcached_io_reset(&ptr
->hosts
[server
]);
491 dead_servers
[server
]= true;
496 memcached_server_response_increment(&ptr
->hosts
[server
]);
497 hash
[x
]= ptr
->number_of_hosts
;
507 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
508 unsigned int master_server_key
,
509 bool is_master_key_set
,
510 const char * const *keys
,
511 const size_t *key_length
,
512 size_t number_of_keys
,
517 if (ptr
->number_of_replicas
== 0)
519 rc
= simple_binary_mget(ptr
, master_server_key
, is_master_key_set
,
520 keys
, key_length
, number_of_keys
, mget_mode
);
527 hash
= ptr
->call_malloc(ptr
, sizeof(uint32_t) * number_of_keys
);
528 dead_servers
= ptr
->call_calloc(ptr
, ptr
->number_of_hosts
, sizeof(bool));
530 if (hash
== NULL
|| dead_servers
== NULL
)
532 ptr
->call_free(ptr
, hash
);
533 ptr
->call_free(ptr
, dead_servers
);
534 return MEMCACHED_MEMORY_ALLOCATION_FAILURE
;
537 if (is_master_key_set
)
538 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
539 hash
[x
]= master_server_key
;
541 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
542 hash
[x
]= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
544 rc
= replication_binary_mget(ptr
, hash
, dead_servers
, keys
,
545 key_length
, number_of_keys
);
547 ptr
->call_free(ptr
, hash
);
548 ptr
->call_free(ptr
, dead_servers
);
550 return MEMCACHED_SUCCESS
;