2 #include "memcached_io.h"
5 What happens if no servers exist?
7 char *memcached_get(memcached_st
*ptr
, const char *key
,
11 memcached_return
*error
)
13 return memcached_get_by_key(ptr
, NULL
, 0, key
, key_length
, value_length
,
17 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
18 const char *master_key
,
19 size_t master_key_length
,
22 size_t number_of_keys
,
25 char *memcached_get_by_key(memcached_st
*ptr
,
26 const char *master_key
,
27 size_t master_key_length
,
28 const char *key
, size_t key_length
,
31 memcached_return
*error
)
36 memcached_return dummy_error
;
38 unlikely (ptr
->flags
& MEM_USE_UDP
)
40 *error
= MEMCACHED_NOT_SUPPORTED
;
45 *error
= memcached_mget_by_key_real(ptr
,
48 (const char **)&key
, &key_length
, 1, false);
50 value
= memcached_fetch(ptr
, NULL
, NULL
,
51 value_length
, flags
, error
);
52 /* This is for historical reasons */
53 if (*error
== MEMCACHED_END
)
54 *error
= MEMCACHED_NOTFOUND
;
58 if (ptr
->get_key_failure
&& *error
== MEMCACHED_NOTFOUND
)
62 memcached_result_reset(&ptr
->result
);
63 rc
= ptr
->get_key_failure(ptr
, key
, key_length
, &ptr
->result
);
65 /* On all failure drop to returning NULL */
66 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
68 if (rc
== MEMCACHED_BUFFERED
)
70 uint64_t latch
; /* We use latch to track the state of the original socket */
71 latch
= memcached_behavior_get(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
);
73 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 1);
75 rc
= memcached_set(ptr
, key
, key_length
,
76 memcached_result_value(&ptr
->result
),
77 memcached_result_length(&ptr
->result
),
78 0, memcached_result_flags(&ptr
->result
));
80 if (rc
== MEMCACHED_BUFFERED
&& latch
== 0)
81 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 0);
85 rc
= memcached_set(ptr
, key
, key_length
,
86 memcached_result_value(&ptr
->result
),
87 memcached_result_length(&ptr
->result
),
88 0, memcached_result_flags(&ptr
->result
));
91 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
94 *value_length
= memcached_result_length(&ptr
->result
);
95 *flags
= memcached_result_flags(&ptr
->result
);
96 return memcached_string_c_copy(&ptr
->result
.value
);
104 (void)memcached_fetch(ptr
, NULL
, NULL
,
105 &dummy_length
, &dummy_flags
,
107 WATCHPOINT_ASSERT(dummy_length
== 0);
112 memcached_return
memcached_mget(memcached_st
*ptr
,
113 const char **keys
, size_t *key_length
,
114 size_t number_of_keys
)
116 return memcached_mget_by_key(ptr
, NULL
, 0, keys
, key_length
, number_of_keys
);
119 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
120 unsigned int master_server_key
,
121 bool is_master_key_set
,
122 const char **keys
, size_t *key_length
,
123 size_t number_of_keys
,
126 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
127 const char *master_key
,
128 size_t master_key_length
,
131 size_t number_of_keys
,
135 memcached_return rc
= MEMCACHED_NOTFOUND
;
136 const char *get_command
= "get ";
137 uint8_t get_command_length
= 4;
138 unsigned int master_server_key
= (unsigned int)-1; /* 0 is a valid server id! */
139 bool is_master_key_set
= false;
141 unlikely (ptr
->flags
& MEM_USE_UDP
)
142 return MEMCACHED_NOT_SUPPORTED
;
144 LIBMEMCACHED_MEMCACHED_MGET_START();
145 ptr
->cursor_server
= 0;
147 if (number_of_keys
== 0)
148 return MEMCACHED_NOTFOUND
;
150 if (ptr
->number_of_hosts
== 0)
151 return MEMCACHED_NO_SERVERS
;
153 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test(keys
, key_length
, number_of_keys
) == MEMCACHED_BAD_KEY_PROVIDED
))
154 return MEMCACHED_BAD_KEY_PROVIDED
;
156 if (master_key
&& master_key_length
)
158 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test((const char **)&master_key
, &master_key_length
, 1) == MEMCACHED_BAD_KEY_PROVIDED
))
159 return MEMCACHED_BAD_KEY_PROVIDED
;
160 master_server_key
= memcached_generate_hash(ptr
, master_key
, master_key_length
);
161 is_master_key_set
= true;
165 Here is where we pay for the non-block API. We need to remove any data sitting
166 in the queue before we start our get.
168 It might be optimum to bounce the connection if count > some number.
170 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
172 if (memcached_server_response_count(&ptr
->hosts
[x
]))
174 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
176 if (ptr
->flags
& MEM_NO_BLOCK
)
177 (void)memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1);
179 while(memcached_server_response_count(&ptr
->hosts
[x
]))
180 (void)memcached_response(&ptr
->hosts
[x
], buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, &ptr
->result
);
184 if (ptr
->flags
& MEM_BINARY_PROTOCOL
)
185 return binary_mget_by_key(ptr
, master_server_key
, is_master_key_set
, keys
,
186 key_length
, number_of_keys
, mget_mode
);
188 if (ptr
->flags
& MEM_SUPPORT_CAS
)
190 get_command
= "gets ";
191 get_command_length
= 5;
195 If a server fails we warn about errors and start all over with sending keys
198 for (x
= 0; x
< number_of_keys
; x
++)
200 unsigned int server_key
;
202 if (is_master_key_set
)
203 server_key
= master_server_key
;
205 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
207 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
209 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
211 if (rc
!= MEMCACHED_SUCCESS
)
214 if ((memcached_io_write(&ptr
->hosts
[server_key
], get_command
, get_command_length
, 0)) == -1)
216 rc
= MEMCACHED_SOME_ERRORS
;
219 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 0);
220 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
221 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 1);
224 /* Only called when we have a prefix key */
225 if (ptr
->prefix_key
[0] != 0)
227 if ((memcached_io_write(&ptr
->hosts
[server_key
], ptr
->prefix_key
, ptr
->prefix_key_length
, 0)) == -1)
229 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
230 rc
= MEMCACHED_SOME_ERRORS
;
235 if ((memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
], key_length
[x
], 0)) == -1)
237 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
238 rc
= MEMCACHED_SOME_ERRORS
;
242 if ((memcached_io_write(&ptr
->hosts
[server_key
], " ", 1, 0)) == -1)
244 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
245 rc
= MEMCACHED_SOME_ERRORS
;
251 Should we muddle on if some servers are dead?
253 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
255 if (memcached_server_response_count(&ptr
->hosts
[x
]))
257 /* We need to do something about non-connnected hosts in the future */
258 if ((memcached_io_write(&ptr
->hosts
[x
], "\r\n", 2, 1)) == -1)
260 rc
= MEMCACHED_SOME_ERRORS
;
265 LIBMEMCACHED_MEMCACHED_MGET_END();
269 memcached_return
memcached_mget_by_key(memcached_st
*ptr
,
270 const char *master_key
,
271 size_t master_key_length
,
274 size_t number_of_keys
)
276 return memcached_mget_by_key_real(ptr
, master_key
, master_key_length
, keys
,
277 key_length
, number_of_keys
, true);
280 static memcached_return
simple_binary_mget(memcached_st
*ptr
,
281 unsigned int master_server_key
,
282 bool is_master_key_set
,
283 const char **keys
, size_t *key_length
,
284 size_t number_of_keys
, bool mget_mode
)
286 memcached_return rc
= MEMCACHED_NOTFOUND
;
289 int flush
= number_of_keys
== 1;
292 If a server fails we warn about errors and start all over with sending keys
295 for (x
= 0; x
< number_of_keys
; x
++)
297 unsigned int server_key
;
299 if (is_master_key_set
)
300 server_key
= master_server_key
;
302 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
304 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
306 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
307 if (rc
!= MEMCACHED_SUCCESS
)
311 protocol_binary_request_getk request
= {.bytes
= {0}};
312 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
314 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
316 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
319 vk
= memcached_validate_key_length(key_length
[x
],
320 ptr
->flags
& MEM_BINARY_PROTOCOL
);
321 unlikely (vk
!= MEMCACHED_SUCCESS
)
324 memcached_io_reset(&ptr
->hosts
[server_key
]);
328 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
329 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
330 request
.message
.header
.request
.bodylen
= htonl((uint32_t) key_length
[x
]);
332 if ((memcached_io_write(&ptr
->hosts
[server_key
], request
.bytes
,
333 sizeof(request
.bytes
), 0) == -1) ||
334 (memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
],
335 key_length
[x
], (char) flush
) == -1))
337 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
338 rc
= MEMCACHED_SOME_ERRORS
;
342 /* We just want one pending response per server */
343 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
344 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
345 if ((x
> 0 && x
== ptr
->io_key_prefetch
) &&
346 memcached_flush_buffers(ptr
) != MEMCACHED_SUCCESS
)
347 rc
= MEMCACHED_SOME_ERRORS
;
353 * Send a noop command to flush the buffers
355 protocol_binary_request_noop request
= {.bytes
= {0}};
356 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
357 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
358 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
360 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
361 if (memcached_server_response_count(&ptr
->hosts
[x
]))
363 if (memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1) == -1)
365 memcached_server_response_reset(&ptr
->hosts
[x
]);
366 memcached_io_reset(&ptr
->hosts
[x
]);
367 rc
= MEMCACHED_SOME_ERRORS
;
370 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
371 sizeof(request
.bytes
), 1) == -1)
373 memcached_server_response_reset(&ptr
->hosts
[x
]);
374 memcached_io_reset(&ptr
->hosts
[x
]);
375 rc
= MEMCACHED_SOME_ERRORS
;
384 static memcached_return
replication_binary_mget(memcached_st
*ptr
,
385 uint32_t* hash
, bool* dead_servers
,
386 const char **keys
, size_t *key_length
,
387 size_t number_of_keys
, bool mget_mode
)
389 memcached_return rc
= MEMCACHED_NOTFOUND
;
392 int flush
= number_of_keys
== 1;
394 for (uint32_t replica
= 0; replica
<= ptr
->number_of_replicas
; ++replica
)
398 for (x
= 0; x
< number_of_keys
; ++x
)
400 if (hash
[x
] == ptr
->number_of_hosts
)
401 continue; /* Already successfully sent */
403 uint32_t server
= hash
[x
] + replica
;
404 while (server
>= ptr
->number_of_hosts
)
405 server
-= ptr
->number_of_hosts
;
407 if (dead_servers
[server
])
410 if (memcached_server_response_count(&ptr
->hosts
[server
]) == 0)
412 rc
= memcached_connect(&ptr
->hosts
[server
]);
413 if (rc
!= MEMCACHED_SUCCESS
)
415 memcached_io_reset(&ptr
->hosts
[server
]);
416 dead_servers
[server
]= true;
422 protocol_binary_request_getk request
= {.bytes
= {0}};
423 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
425 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
427 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
429 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
430 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
431 request
.message
.header
.request
.bodylen
= htonl((uint32_t) key_length
[x
]);
433 if ((memcached_io_write(&ptr
->hosts
[server
], request
.bytes
,
434 sizeof(request
.bytes
), 0) == -1) ||
435 (memcached_io_write(&ptr
->hosts
[server
], keys
[x
],
436 key_length
[x
], (char) flush
) == -1))
438 memcached_io_reset(&ptr
->hosts
[server
]);
439 dead_servers
[server
]= true;
443 /* we just want one pending response per server */
444 memcached_server_response_reset(&ptr
->hosts
[server
]);
445 memcached_server_response_increment(&ptr
->hosts
[server
]);
451 * Send a noop command to flush the buffers
453 protocol_binary_request_noop request
= {.bytes
= {0}};
454 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
455 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
456 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
458 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
459 if (memcached_server_response_count(&ptr
->hosts
[x
]))
461 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
462 sizeof(request
.bytes
), 1) == -1)
464 memcached_io_reset(&ptr
->hosts
[x
]);
465 dead_servers
[x
]= true;
469 /* mark all of the messages bound for this server as sent! */
470 for (x
= 0; x
< number_of_keys
; ++x
)
472 hash
[x
]= ptr
->number_of_hosts
;
483 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
484 unsigned int master_server_key
,
485 bool is_master_key_set
,
486 const char **keys
, size_t *key_length
,
487 size_t number_of_keys
, bool mget_mode
)
491 if (ptr
->number_of_replicas
== 0)
493 rc
= simple_binary_mget(ptr
, master_server_key
, is_master_key_set
,
494 keys
, key_length
, number_of_keys
, mget_mode
);
501 hash
= ptr
->call_malloc(ptr
, sizeof(uint32_t) * number_of_keys
);
502 dead_servers
= ptr
->call_calloc(ptr
, ptr
->number_of_hosts
, sizeof(bool));
504 if (hash
== NULL
|| dead_servers
== NULL
)
506 ptr
->call_free(ptr
, hash
);
507 ptr
->call_free(ptr
, dead_servers
);
508 return MEMCACHED_MEMORY_ALLOCATION_FAILURE
;
511 if (is_master_key_set
)
512 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
513 hash
[x
]= master_server_key
;
515 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
516 hash
[x
]= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
518 rc
= replication_binary_mget(ptr
, hash
, dead_servers
, keys
,
519 key_length
, number_of_keys
, mget_mode
);
521 ptr
->call_free(ptr
, hash
);
522 ptr
->call_free(ptr
, dead_servers
);
524 return MEMCACHED_SUCCESS
;