2 #include "memcached_io.h"
5 What happens if no servers exist?
7 char *memcached_get(memcached_st
*ptr
, const char *key
,
11 memcached_return
*error
)
13 return memcached_get_by_key(ptr
, NULL
, 0, key
, key_length
, value_length
,
17 char *memcached_get_by_key(memcached_st
*ptr
,
18 const char *master_key
,
19 size_t master_key_length
,
20 const char *key
, size_t key_length
,
23 memcached_return
*error
)
28 memcached_return dummy_error
;
30 unlikely (ptr
->flags
& MEM_USE_UDP
)
32 *error
= MEMCACHED_NOT_SUPPORTED
;
37 *error
= memcached_mget_by_key(ptr
,
40 (char **)&key
, &key_length
, 1);
42 value
= memcached_fetch(ptr
, NULL
, NULL
,
43 value_length
, flags
, error
);
44 /* This is for historical reasons */
45 if (*error
== MEMCACHED_END
)
46 *error
= MEMCACHED_NOTFOUND
;
50 if (ptr
->get_key_failure
&& *error
== MEMCACHED_NOTFOUND
)
54 memcached_result_reset(&ptr
->result
);
55 rc
= ptr
->get_key_failure(ptr
, key
, key_length
, &ptr
->result
);
57 /* On all failure drop to returning NULL */
58 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
60 if (rc
== MEMCACHED_BUFFERED
)
62 uint8_t latch
; /* We use latch to track the state of the original socket */
63 latch
= memcached_behavior_get(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
);
65 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 1);
67 rc
= memcached_set(ptr
, key
, key_length
,
68 memcached_result_value(&ptr
->result
),
69 memcached_result_length(&ptr
->result
),
70 0, memcached_result_flags(&ptr
->result
));
72 if (rc
== MEMCACHED_BUFFERED
&& latch
== 0)
73 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 0);
77 rc
= memcached_set(ptr
, key
, key_length
,
78 memcached_result_value(&ptr
->result
),
79 memcached_result_length(&ptr
->result
),
80 0, memcached_result_flags(&ptr
->result
));
83 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
86 *value_length
= memcached_result_length(&ptr
->result
);
87 *flags
= memcached_result_flags(&ptr
->result
);
88 return memcached_string_c_copy(&ptr
->result
.value
);
96 (void)memcached_fetch(ptr
, NULL
, NULL
,
97 &dummy_length
, &dummy_flags
,
99 WATCHPOINT_ASSERT(dummy_length
== 0);
104 memcached_return
memcached_mget(memcached_st
*ptr
,
105 char **keys
, size_t *key_length
,
106 unsigned int number_of_keys
)
108 return memcached_mget_by_key(ptr
, NULL
, 0, keys
, key_length
, number_of_keys
);
111 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
112 unsigned int master_server_key
,
113 bool is_master_key_set
,
114 char **keys
, size_t *key_length
,
115 unsigned int number_of_keys
);
117 memcached_return
memcached_mget_by_key(memcached_st
*ptr
,
118 const char *master_key
,
119 size_t master_key_length
,
122 unsigned int number_of_keys
)
125 memcached_return rc
= MEMCACHED_NOTFOUND
;
126 char *get_command
= "get ";
127 uint8_t get_command_length
= 4;
128 unsigned int master_server_key
= (unsigned int)-1; /* 0 is a valid server id! */
129 bool is_master_key_set
= false;
131 unlikely (ptr
->flags
& MEM_USE_UDP
)
132 return MEMCACHED_NOT_SUPPORTED
;
134 LIBMEMCACHED_MEMCACHED_MGET_START();
135 ptr
->cursor_server
= 0;
137 if (number_of_keys
== 0)
138 return MEMCACHED_NOTFOUND
;
140 if (ptr
->number_of_hosts
== 0)
141 return MEMCACHED_NO_SERVERS
;
143 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test(keys
, key_length
, number_of_keys
) == MEMCACHED_BAD_KEY_PROVIDED
))
144 return MEMCACHED_BAD_KEY_PROVIDED
;
146 if (master_key
&& master_key_length
)
148 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test((char **)&master_key
, &master_key_length
, 1) == MEMCACHED_BAD_KEY_PROVIDED
))
149 return MEMCACHED_BAD_KEY_PROVIDED
;
150 master_server_key
= memcached_generate_hash(ptr
, master_key
, master_key_length
);
151 is_master_key_set
= true;
155 Here is where we pay for the non-block API. We need to remove any data sitting
156 in the queue before we start our get.
158 It might be optimum to bounce the connection if count > some number.
160 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
162 if (memcached_server_response_count(&ptr
->hosts
[x
]))
164 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
166 if (ptr
->flags
& MEM_NO_BLOCK
)
167 (void)memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1);
169 while(memcached_server_response_count(&ptr
->hosts
[x
]))
170 (void)memcached_response(&ptr
->hosts
[x
], buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, &ptr
->result
);
174 if (ptr
->flags
& MEM_BINARY_PROTOCOL
)
175 return binary_mget_by_key(ptr
, master_server_key
, is_master_key_set
, keys
,
176 key_length
, number_of_keys
);
178 if (ptr
->flags
& MEM_SUPPORT_CAS
)
180 get_command
= "gets ";
181 get_command_length
= 5;
185 If a server fails we warn about errors and start all over with sending keys
188 for (x
= 0; x
< number_of_keys
; x
++)
190 unsigned int server_key
;
192 if (is_master_key_set
)
193 server_key
= master_server_key
;
195 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
197 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
199 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
201 if (rc
!= MEMCACHED_SUCCESS
)
204 if ((memcached_io_write(&ptr
->hosts
[server_key
], get_command
, get_command_length
, 0)) == -1)
206 rc
= MEMCACHED_SOME_ERRORS
;
209 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 0);
210 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
211 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 1);
214 /* Only called when we have a prefix key */
215 if (ptr
->prefix_key
[0] != 0)
217 if ((memcached_io_write(&ptr
->hosts
[server_key
], ptr
->prefix_key
, ptr
->prefix_key_length
, 0)) == -1)
219 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
220 rc
= MEMCACHED_SOME_ERRORS
;
225 if ((memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
], key_length
[x
], 0)) == -1)
227 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
228 rc
= MEMCACHED_SOME_ERRORS
;
232 if ((memcached_io_write(&ptr
->hosts
[server_key
], " ", 1, 0)) == -1)
234 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
235 rc
= MEMCACHED_SOME_ERRORS
;
241 Should we muddle on if some servers are dead?
243 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
245 if (memcached_server_response_count(&ptr
->hosts
[x
]))
247 /* We need to do something about non-connnected hosts in the future */
248 if ((memcached_io_write(&ptr
->hosts
[x
], "\r\n", 2, 1)) == -1)
250 rc
= MEMCACHED_SOME_ERRORS
;
255 LIBMEMCACHED_MEMCACHED_MGET_END();
259 static memcached_return
simple_binary_mget(memcached_st
*ptr
,
260 unsigned int master_server_key
,
261 bool is_master_key_set
,
262 char **keys
, size_t *key_length
,
263 unsigned int number_of_keys
)
265 memcached_return rc
= MEMCACHED_NOTFOUND
;
268 int flush
= number_of_keys
== 1;
271 If a server fails we warn about errors and start all over with sending keys
274 for (x
= 0; x
< number_of_keys
; x
++)
276 unsigned int server_key
;
278 if (is_master_key_set
)
279 server_key
= master_server_key
;
281 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
283 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
285 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
286 if (rc
!= MEMCACHED_SUCCESS
)
290 protocol_binary_request_getk request
= {.bytes
= {0}};
291 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
292 if (number_of_keys
== 1)
293 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
295 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
298 vk
= memcached_validate_key_length(key_length
[x
],
299 ptr
->flags
& MEM_BINARY_PROTOCOL
);
300 unlikely (vk
!= MEMCACHED_SUCCESS
)
303 memcached_io_reset(&ptr
->hosts
[server_key
]);
307 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
308 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
309 request
.message
.header
.request
.bodylen
= htonl(key_length
[x
]);
311 if ((memcached_io_write(&ptr
->hosts
[server_key
], request
.bytes
,
312 sizeof(request
.bytes
), 0) == -1) ||
313 (memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
],
314 key_length
[x
], flush
) == -1))
316 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
317 rc
= MEMCACHED_SOME_ERRORS
;
320 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
321 if ((x
> 0 && x
== ptr
->io_key_prefetch
) &&
322 memcached_flush_buffers(ptr
) != MEMCACHED_SUCCESS
)
323 rc
= MEMCACHED_SOME_ERRORS
;
326 if (number_of_keys
> 1)
329 * Send a noop command to flush the buffers
331 protocol_binary_request_noop request
= {.bytes
= {0}};
332 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
333 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
334 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
336 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
337 if (memcached_server_response_count(&ptr
->hosts
[x
]))
339 if (memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1) == -1)
341 memcached_server_response_reset(&ptr
->hosts
[x
]);
342 memcached_io_reset(&ptr
->hosts
[x
]);
343 rc
= MEMCACHED_SOME_ERRORS
;
346 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
347 sizeof(request
.bytes
), 1) == -1)
349 memcached_server_response_reset(&ptr
->hosts
[x
]);
350 memcached_io_reset(&ptr
->hosts
[x
]);
351 rc
= MEMCACHED_SOME_ERRORS
;
353 memcached_server_response_increment(&ptr
->hosts
[x
]);
361 static memcached_return
replication_binary_mget(memcached_st
*ptr
,
362 uint32_t* hash
, bool* dead_servers
,
363 char **keys
, size_t *key_length
,
364 unsigned int number_of_keys
)
366 memcached_return rc
= MEMCACHED_NOTFOUND
;
369 int flush
= number_of_keys
== 1;
371 for (uint32_t replica
= 0; replica
<= ptr
->number_of_replicas
; ++replica
)
375 for (x
= 0; x
< number_of_keys
; ++x
)
377 if (hash
[x
] == ptr
->number_of_hosts
)
378 continue; /* Already successfully sent */
380 uint32_t server
= hash
[x
] + replica
;
381 while (server
>= ptr
->number_of_hosts
)
382 server
-= ptr
->number_of_hosts
;
384 if (dead_servers
[server
])
387 if (memcached_server_response_count(&ptr
->hosts
[server
]) == 0)
389 rc
= memcached_connect(&ptr
->hosts
[server
]);
390 if (rc
!= MEMCACHED_SUCCESS
)
392 memcached_io_reset(&ptr
->hosts
[server
]);
393 dead_servers
[server
]= true;
399 protocol_binary_request_getk request
= {.bytes
= {0}};
400 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
401 if (number_of_keys
== 1)
402 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
404 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
406 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
407 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
408 request
.message
.header
.request
.bodylen
= htonl(key_length
[x
]);
410 if ((memcached_io_write(&ptr
->hosts
[server
], request
.bytes
,
411 sizeof(request
.bytes
), 0) == -1) ||
412 (memcached_io_write(&ptr
->hosts
[server
], keys
[x
],
413 key_length
[x
], flush
) == -1))
415 memcached_io_reset(&ptr
->hosts
[server
]);
416 dead_servers
[server
]= true;
420 memcached_server_response_increment(&ptr
->hosts
[server
]);
423 if (number_of_keys
> 1)
426 * Send a noop command to flush the buffers
428 protocol_binary_request_noop request
= {.bytes
= {0}};
429 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
430 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
431 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
433 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
434 if (memcached_server_response_count(&ptr
->hosts
[x
]))
436 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
437 sizeof(request
.bytes
), 1) == -1)
439 memcached_io_reset(&ptr
->hosts
[x
]);
440 dead_servers
[x
]= true;
443 memcached_server_response_increment(&ptr
->hosts
[x
]);
445 /* mark all of the messages bound for this server as sent! */
446 for (x
= 0; x
< number_of_keys
; ++x
)
448 hash
[x
]= ptr
->number_of_hosts
;
459 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
460 unsigned int master_server_key
,
461 bool is_master_key_set
,
462 char **keys
, size_t *key_length
,
463 unsigned int number_of_keys
)
467 if (ptr
->number_of_replicas
== 0)
469 rc
= simple_binary_mget(ptr
, master_server_key
, is_master_key_set
,
470 keys
, key_length
, number_of_keys
);
477 hash
= ptr
->call_malloc(ptr
, sizeof(uint32_t) * number_of_keys
);
478 dead_servers
= ptr
->call_calloc(ptr
, ptr
->number_of_hosts
, sizeof(bool));
480 if (hash
== NULL
|| dead_servers
== NULL
)
482 ptr
->call_free(ptr
, hash
);
483 ptr
->call_free(ptr
, dead_servers
);
484 return MEMCACHED_MEMORY_ALLOCATION_FAILURE
;
487 if (is_master_key_set
)
488 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
489 hash
[x
]= master_server_key
;
491 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
492 hash
[x
]= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
494 rc
= replication_binary_mget(ptr
, hash
, dead_servers
, keys
,
495 key_length
, number_of_keys
);
497 ptr
->call_free(ptr
, hash
);
498 ptr
->call_free(ptr
, dead_servers
);
500 return MEMCACHED_SUCCESS
;