2 #include "memcached_io.h"
5 What happens if no servers exist?
7 char *memcached_get(memcached_st
*ptr
, const char *key
,
11 memcached_return
*error
)
13 return memcached_get_by_key(ptr
, NULL
, 0, key
, key_length
, value_length
,
17 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
18 const char *master_key
,
19 size_t master_key_length
,
20 const char * const *keys
,
21 const size_t *key_length
,
22 size_t number_of_keys
,
25 char *memcached_get_by_key(memcached_st
*ptr
,
26 const char *master_key
,
27 size_t master_key_length
,
28 const char *key
, size_t key_length
,
31 memcached_return
*error
)
36 memcached_return dummy_error
;
38 unlikely (ptr
->flags
& MEM_USE_UDP
)
40 *error
= MEMCACHED_NOT_SUPPORTED
;
45 *error
= memcached_mget_by_key_real(ptr
, master_key
, master_key_length
,
46 (const char * const *)&key
,
47 &key_length
, 1, false);
49 value
= memcached_fetch(ptr
, NULL
, NULL
,
50 value_length
, flags
, error
);
51 /* This is for historical reasons */
52 if (*error
== MEMCACHED_END
)
53 *error
= MEMCACHED_NOTFOUND
;
57 if (ptr
->get_key_failure
&& *error
== MEMCACHED_NOTFOUND
)
61 memcached_result_reset(&ptr
->result
);
62 rc
= ptr
->get_key_failure(ptr
, key
, key_length
, &ptr
->result
);
64 /* On all failure drop to returning NULL */
65 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
67 if (rc
== MEMCACHED_BUFFERED
)
69 uint64_t latch
; /* We use latch to track the state of the original socket */
70 latch
= memcached_behavior_get(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
);
72 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 1);
74 rc
= memcached_set(ptr
, key
, key_length
,
75 memcached_result_value(&ptr
->result
),
76 memcached_result_length(&ptr
->result
),
77 0, memcached_result_flags(&ptr
->result
));
79 if (rc
== MEMCACHED_BUFFERED
&& latch
== 0)
80 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 0);
84 rc
= memcached_set(ptr
, key
, key_length
,
85 memcached_result_value(&ptr
->result
),
86 memcached_result_length(&ptr
->result
),
87 0, memcached_result_flags(&ptr
->result
));
90 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
93 *value_length
= memcached_result_length(&ptr
->result
);
94 *flags
= memcached_result_flags(&ptr
->result
);
95 return memcached_string_c_copy(&ptr
->result
.value
);
103 (void)memcached_fetch(ptr
, NULL
, NULL
,
104 &dummy_length
, &dummy_flags
,
106 WATCHPOINT_ASSERT(dummy_length
== 0);
111 memcached_return
memcached_mget(memcached_st
*ptr
,
112 const char * const *keys
,
113 const size_t *key_length
,
114 size_t number_of_keys
)
116 return memcached_mget_by_key(ptr
, NULL
, 0, keys
, key_length
, number_of_keys
);
119 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
120 unsigned int master_server_key
,
121 bool is_master_key_set
,
122 const char * const *keys
,
123 const size_t *key_length
,
124 size_t number_of_keys
,
127 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
128 const char *master_key
,
129 size_t master_key_length
,
130 const char * const *keys
,
131 const size_t *key_length
,
132 size_t number_of_keys
,
136 memcached_return rc
= MEMCACHED_NOTFOUND
;
137 const char *get_command
= "get ";
138 uint8_t get_command_length
= 4;
139 unsigned int master_server_key
= (unsigned int)-1; /* 0 is a valid server id! */
140 bool is_master_key_set
= false;
142 unlikely (ptr
->flags
& MEM_USE_UDP
)
143 return MEMCACHED_NOT_SUPPORTED
;
145 LIBMEMCACHED_MEMCACHED_MGET_START();
146 ptr
->cursor_server
= 0;
148 if (number_of_keys
== 0)
149 return MEMCACHED_NOTFOUND
;
151 if (ptr
->number_of_hosts
== 0)
152 return MEMCACHED_NO_SERVERS
;
154 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test(keys
, key_length
, number_of_keys
) == MEMCACHED_BAD_KEY_PROVIDED
))
155 return MEMCACHED_BAD_KEY_PROVIDED
;
157 if (master_key
&& master_key_length
)
159 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test((const char * const *)&master_key
, &master_key_length
, 1) == MEMCACHED_BAD_KEY_PROVIDED
))
160 return MEMCACHED_BAD_KEY_PROVIDED
;
161 master_server_key
= memcached_generate_hash(ptr
, master_key
, master_key_length
);
162 is_master_key_set
= true;
166 Here is where we pay for the non-block API. We need to remove any data sitting
167 in the queue before we start our get.
169 It might be optimum to bounce the connection if count > some number.
171 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
173 if (memcached_server_response_count(&ptr
->hosts
[x
]))
175 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
177 if (ptr
->flags
& MEM_NO_BLOCK
)
178 (void)memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1);
180 while(memcached_server_response_count(&ptr
->hosts
[x
]))
181 (void)memcached_response(&ptr
->hosts
[x
], buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, &ptr
->result
);
185 if (ptr
->flags
& MEM_BINARY_PROTOCOL
)
186 return binary_mget_by_key(ptr
, master_server_key
, is_master_key_set
, keys
,
187 key_length
, number_of_keys
, mget_mode
);
189 if (ptr
->flags
& MEM_SUPPORT_CAS
)
191 get_command
= "gets ";
192 get_command_length
= 5;
196 If a server fails we warn about errors and start all over with sending keys
199 for (x
= 0; x
< number_of_keys
; x
++)
201 unsigned int server_key
;
203 if (is_master_key_set
)
204 server_key
= master_server_key
;
206 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
208 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
210 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
212 if (rc
!= MEMCACHED_SUCCESS
)
215 if ((memcached_io_write(&ptr
->hosts
[server_key
], get_command
, get_command_length
, 0)) == -1)
217 rc
= MEMCACHED_SOME_ERRORS
;
220 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 0);
221 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
222 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 1);
225 /* Only called when we have a prefix key */
226 if (ptr
->prefix_key
[0] != 0)
228 if ((memcached_io_write(&ptr
->hosts
[server_key
], ptr
->prefix_key
, ptr
->prefix_key_length
, 0)) == -1)
230 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
231 rc
= MEMCACHED_SOME_ERRORS
;
236 if ((memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
], key_length
[x
], 0)) == -1)
238 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
239 rc
= MEMCACHED_SOME_ERRORS
;
243 if ((memcached_io_write(&ptr
->hosts
[server_key
], " ", 1, 0)) == -1)
245 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
246 rc
= MEMCACHED_SOME_ERRORS
;
252 Should we muddle on if some servers are dead?
254 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
256 if (memcached_server_response_count(&ptr
->hosts
[x
]))
258 /* We need to do something about non-connnected hosts in the future */
259 if ((memcached_io_write(&ptr
->hosts
[x
], "\r\n", 2, 1)) == -1)
261 rc
= MEMCACHED_SOME_ERRORS
;
266 LIBMEMCACHED_MEMCACHED_MGET_END();
270 memcached_return
memcached_mget_by_key(memcached_st
*ptr
,
271 const char *master_key
,
272 size_t master_key_length
,
273 const char * const *keys
,
274 const size_t *key_length
,
275 size_t number_of_keys
)
277 return memcached_mget_by_key_real(ptr
, master_key
, master_key_length
, keys
,
278 key_length
, number_of_keys
, true);
281 memcached_return
memcached_mget_execute(memcached_st
*ptr
,
282 const char *master_key
,
283 size_t master_key_length
,
284 const char * const *keys
,
285 const size_t *key_length
,
286 size_t number_of_keys
,
287 memcached_execute_function
*callback
,
289 unsigned int number_of_callbacks
)
291 if ((ptr
->flags
& MEM_BINARY_PROTOCOL
) == 0)
292 return MEMCACHED_NOT_SUPPORTED
;
295 memcached_callback_st
*original_callbacks
= ptr
->callbacks
;
296 memcached_callback_st cb
= {
299 .number_of_callback
= number_of_callbacks
303 rc
= memcached_mget_by_key(ptr
, master_key
, master_key_length
, keys
,
304 key_length
, number_of_keys
);
305 ptr
->callbacks
= original_callbacks
;
309 static memcached_return
simple_binary_mget(memcached_st
*ptr
,
310 unsigned int master_server_key
,
311 bool is_master_key_set
,
312 const char * const *keys
,
313 const size_t *key_length
,
314 size_t number_of_keys
, bool mget_mode
)
316 memcached_return rc
= MEMCACHED_NOTFOUND
;
319 int flush
= number_of_keys
== 1;
322 If a server fails we warn about errors and start all over with sending keys
325 for (x
= 0; x
< number_of_keys
; x
++)
327 unsigned int server_key
;
329 if (is_master_key_set
)
330 server_key
= master_server_key
;
332 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
334 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
336 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
337 if (rc
!= MEMCACHED_SUCCESS
)
341 protocol_binary_request_getk request
= {.bytes
= {0}};
342 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
344 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
346 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
349 vk
= memcached_validate_key_length(key_length
[x
],
350 ptr
->flags
& MEM_BINARY_PROTOCOL
);
351 unlikely (vk
!= MEMCACHED_SUCCESS
)
354 memcached_io_reset(&ptr
->hosts
[server_key
]);
358 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
359 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
360 request
.message
.header
.request
.bodylen
= htonl((uint32_t) key_length
[x
]);
362 if ((memcached_io_write(&ptr
->hosts
[server_key
], request
.bytes
,
363 sizeof(request
.bytes
), 0) == -1) ||
364 (memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
],
365 key_length
[x
], (char) flush
) == -1))
367 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
368 rc
= MEMCACHED_SOME_ERRORS
;
372 /* We just want one pending response per server */
373 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
374 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
375 if ((x
> 0 && x
== ptr
->io_key_prefetch
) &&
376 memcached_flush_buffers(ptr
) != MEMCACHED_SUCCESS
)
377 rc
= MEMCACHED_SOME_ERRORS
;
383 * Send a noop command to flush the buffers
385 protocol_binary_request_noop request
= {.bytes
= {0}};
386 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
387 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
388 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
390 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
391 if (memcached_server_response_count(&ptr
->hosts
[x
]))
393 if (memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1) == -1)
395 memcached_server_response_reset(&ptr
->hosts
[x
]);
396 memcached_io_reset(&ptr
->hosts
[x
]);
397 rc
= MEMCACHED_SOME_ERRORS
;
400 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
401 sizeof(request
.bytes
), 1) == -1)
403 memcached_server_response_reset(&ptr
->hosts
[x
]);
404 memcached_io_reset(&ptr
->hosts
[x
]);
405 rc
= MEMCACHED_SOME_ERRORS
;
414 static memcached_return
replication_binary_mget(memcached_st
*ptr
,
417 const char *const *keys
,
418 const size_t *key_length
,
419 size_t number_of_keys
,
422 memcached_return rc
= MEMCACHED_NOTFOUND
;
425 int flush
= number_of_keys
== 1;
427 for (uint32_t replica
= 0; replica
<= ptr
->number_of_replicas
; ++replica
)
431 for (x
= 0; x
< number_of_keys
; ++x
)
433 if (hash
[x
] == ptr
->number_of_hosts
)
434 continue; /* Already successfully sent */
436 uint32_t server
= hash
[x
] + replica
;
437 while (server
>= ptr
->number_of_hosts
)
438 server
-= ptr
->number_of_hosts
;
440 if (dead_servers
[server
])
443 if (memcached_server_response_count(&ptr
->hosts
[server
]) == 0)
445 rc
= memcached_connect(&ptr
->hosts
[server
]);
446 if (rc
!= MEMCACHED_SUCCESS
)
448 memcached_io_reset(&ptr
->hosts
[server
]);
449 dead_servers
[server
]= true;
455 protocol_binary_request_getk request
= {.bytes
= {0}};
456 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
458 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
460 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
462 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
463 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
464 request
.message
.header
.request
.bodylen
= htonl((uint32_t) key_length
[x
]);
466 if ((memcached_io_write(&ptr
->hosts
[server
], request
.bytes
,
467 sizeof(request
.bytes
), 0) == -1) ||
468 (memcached_io_write(&ptr
->hosts
[server
], keys
[x
],
469 key_length
[x
], (char) flush
) == -1))
471 memcached_io_reset(&ptr
->hosts
[server
]);
472 dead_servers
[server
]= true;
476 /* we just want one pending response per server */
477 memcached_server_response_reset(&ptr
->hosts
[server
]);
478 memcached_server_response_increment(&ptr
->hosts
[server
]);
484 * Send a noop command to flush the buffers
486 protocol_binary_request_noop request
= {.bytes
= {0}};
487 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
488 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
489 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
491 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
492 if (memcached_server_response_count(&ptr
->hosts
[x
]))
494 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
495 sizeof(request
.bytes
), 1) == -1)
497 memcached_io_reset(&ptr
->hosts
[x
]);
498 dead_servers
[x
]= true;
502 /* mark all of the messages bound for this server as sent! */
503 for (x
= 0; x
< number_of_keys
; ++x
)
505 hash
[x
]= ptr
->number_of_hosts
;
516 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
517 unsigned int master_server_key
,
518 bool is_master_key_set
,
519 const char * const *keys
,
520 const size_t *key_length
,
521 size_t number_of_keys
,
526 if (ptr
->number_of_replicas
== 0)
528 rc
= simple_binary_mget(ptr
, master_server_key
, is_master_key_set
,
529 keys
, key_length
, number_of_keys
, mget_mode
);
536 hash
= ptr
->call_malloc(ptr
, sizeof(uint32_t) * number_of_keys
);
537 dead_servers
= ptr
->call_calloc(ptr
, ptr
->number_of_hosts
, sizeof(bool));
539 if (hash
== NULL
|| dead_servers
== NULL
)
541 ptr
->call_free(ptr
, hash
);
542 ptr
->call_free(ptr
, dead_servers
);
543 return MEMCACHED_MEMORY_ALLOCATION_FAILURE
;
546 if (is_master_key_set
)
547 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
548 hash
[x
]= master_server_key
;
550 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
551 hash
[x
]= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
553 rc
= replication_binary_mget(ptr
, hash
, dead_servers
, keys
,
554 key_length
, number_of_keys
, mget_mode
);
556 ptr
->call_free(ptr
, hash
);
557 ptr
->call_free(ptr
, dead_servers
);
559 return MEMCACHED_SUCCESS
;