2 #include "memcached_io.h"
5 What happens if no servers exist?
7 char *memcached_get(memcached_st
*ptr
, const char *key
,
11 memcached_return
*error
)
13 return memcached_get_by_key(ptr
, NULL
, 0, key
, key_length
, value_length
,
17 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
18 const char *master_key
,
19 size_t master_key_length
,
22 size_t number_of_keys
,
25 char *memcached_get_by_key(memcached_st
*ptr
,
26 const char *master_key
,
27 size_t master_key_length
,
28 const char *key
, size_t key_length
,
31 memcached_return
*error
)
36 memcached_return dummy_error
;
38 unlikely (ptr
->flags
& MEM_USE_UDP
)
40 *error
= MEMCACHED_NOT_SUPPORTED
;
45 *error
= memcached_mget_by_key_real(ptr
,
48 (const char **)&key
, &key_length
, 1, false);
50 value
= memcached_fetch(ptr
, NULL
, NULL
,
51 value_length
, flags
, error
);
52 /* This is for historical reasons */
53 if (*error
== MEMCACHED_END
)
54 *error
= MEMCACHED_NOTFOUND
;
58 if (ptr
->get_key_failure
&& *error
== MEMCACHED_NOTFOUND
)
62 memcached_result_reset(&ptr
->result
);
63 rc
= ptr
->get_key_failure(ptr
, key
, key_length
, &ptr
->result
);
65 /* On all failure drop to returning NULL */
66 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
68 if (rc
== MEMCACHED_BUFFERED
)
70 uint64_t latch
; /* We use latch to track the state of the original socket */
71 latch
= memcached_behavior_get(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
);
73 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 1);
75 rc
= memcached_set(ptr
, key
, key_length
,
76 memcached_result_value(&ptr
->result
),
77 memcached_result_length(&ptr
->result
),
78 0, memcached_result_flags(&ptr
->result
));
80 if (rc
== MEMCACHED_BUFFERED
&& latch
== 0)
81 memcached_behavior_set(ptr
, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS
, 0);
85 rc
= memcached_set(ptr
, key
, key_length
,
86 memcached_result_value(&ptr
->result
),
87 memcached_result_length(&ptr
->result
),
88 0, memcached_result_flags(&ptr
->result
));
91 if (rc
== MEMCACHED_SUCCESS
|| rc
== MEMCACHED_BUFFERED
)
94 *value_length
= memcached_result_length(&ptr
->result
);
95 *flags
= memcached_result_flags(&ptr
->result
);
96 return memcached_string_c_copy(&ptr
->result
.value
);
104 (void)memcached_fetch(ptr
, NULL
, NULL
,
105 &dummy_length
, &dummy_flags
,
107 WATCHPOINT_ASSERT(dummy_length
== 0);
112 memcached_return
memcached_mget(memcached_st
*ptr
,
113 const char **keys
, size_t *key_length
,
114 size_t number_of_keys
)
116 return memcached_mget_by_key(ptr
, NULL
, 0, keys
, key_length
, number_of_keys
);
119 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
120 unsigned int master_server_key
,
121 bool is_master_key_set
,
122 const char **keys
, size_t *key_length
,
123 size_t number_of_keys
,
126 static memcached_return
memcached_mget_by_key_real(memcached_st
*ptr
,
127 const char *master_key
,
128 size_t master_key_length
,
131 size_t number_of_keys
,
135 memcached_return rc
= MEMCACHED_NOTFOUND
;
136 const char *get_command
= "get ";
137 uint8_t get_command_length
= 4;
138 unsigned int master_server_key
= (unsigned int)-1; /* 0 is a valid server id! */
139 bool is_master_key_set
= false;
141 unlikely (ptr
->flags
& MEM_USE_UDP
)
142 return MEMCACHED_NOT_SUPPORTED
;
144 LIBMEMCACHED_MEMCACHED_MGET_START();
145 ptr
->cursor_server
= 0;
147 if (number_of_keys
== 0)
148 return MEMCACHED_NOTFOUND
;
150 if (ptr
->number_of_hosts
== 0)
151 return MEMCACHED_NO_SERVERS
;
153 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test(keys
, key_length
, number_of_keys
) == MEMCACHED_BAD_KEY_PROVIDED
))
154 return MEMCACHED_BAD_KEY_PROVIDED
;
156 if (master_key
&& master_key_length
)
158 if ((ptr
->flags
& MEM_VERIFY_KEY
) && (memcached_key_test((const char **)&master_key
, &master_key_length
, 1) == MEMCACHED_BAD_KEY_PROVIDED
))
159 return MEMCACHED_BAD_KEY_PROVIDED
;
160 master_server_key
= memcached_generate_hash(ptr
, master_key
, master_key_length
);
161 is_master_key_set
= true;
165 Here is where we pay for the non-block API. We need to remove any data sitting
166 in the queue before we start our get.
168 It might be optimum to bounce the connection if count > some number.
170 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
172 if (memcached_server_response_count(&ptr
->hosts
[x
]))
174 char buffer
[MEMCACHED_DEFAULT_COMMAND_SIZE
];
176 if (ptr
->flags
& MEM_NO_BLOCK
)
177 (void)memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1);
179 while(memcached_server_response_count(&ptr
->hosts
[x
]))
180 (void)memcached_response(&ptr
->hosts
[x
], buffer
, MEMCACHED_DEFAULT_COMMAND_SIZE
, &ptr
->result
);
184 if (ptr
->flags
& MEM_BINARY_PROTOCOL
)
185 return binary_mget_by_key(ptr
, master_server_key
, is_master_key_set
, keys
,
186 key_length
, number_of_keys
, mget_mode
);
188 if (ptr
->flags
& MEM_SUPPORT_CAS
)
190 get_command
= "gets ";
191 get_command_length
= 5;
195 If a server fails we warn about errors and start all over with sending keys
198 for (x
= 0; x
< number_of_keys
; x
++)
200 unsigned int server_key
;
202 if (is_master_key_set
)
203 server_key
= master_server_key
;
205 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
207 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
209 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
211 if (rc
!= MEMCACHED_SUCCESS
)
214 if ((memcached_io_write(&ptr
->hosts
[server_key
], get_command
, get_command_length
, 0)) == -1)
216 rc
= MEMCACHED_SOME_ERRORS
;
219 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 0);
220 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
221 WATCHPOINT_ASSERT(ptr
->hosts
[server_key
].cursor_active
== 1);
224 /* Only called when we have a prefix key */
225 if (ptr
->prefix_key
[0] != 0)
227 if ((memcached_io_write(&ptr
->hosts
[server_key
], ptr
->prefix_key
, ptr
->prefix_key_length
, 0)) == -1)
229 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
230 rc
= MEMCACHED_SOME_ERRORS
;
235 if ((memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
], key_length
[x
], 0)) == -1)
237 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
238 rc
= MEMCACHED_SOME_ERRORS
;
242 if ((memcached_io_write(&ptr
->hosts
[server_key
], " ", 1, 0)) == -1)
244 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
245 rc
= MEMCACHED_SOME_ERRORS
;
251 Should we muddle on if some servers are dead?
253 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
255 if (memcached_server_response_count(&ptr
->hosts
[x
]))
257 /* We need to do something about non-connnected hosts in the future */
258 if ((memcached_io_write(&ptr
->hosts
[x
], "\r\n", 2, 1)) == -1)
260 rc
= MEMCACHED_SOME_ERRORS
;
265 LIBMEMCACHED_MEMCACHED_MGET_END();
269 memcached_return
memcached_mget_by_key(memcached_st
*ptr
,
270 const char *master_key
,
271 size_t master_key_length
,
274 size_t number_of_keys
)
276 return memcached_mget_by_key_real(ptr
, master_key
, master_key_length
, keys
,
277 key_length
, number_of_keys
, true);
280 static memcached_return
simple_binary_mget(memcached_st
*ptr
,
281 unsigned int master_server_key
,
282 bool is_master_key_set
,
283 const char **keys
, size_t *key_length
,
284 size_t number_of_keys
, bool mget_mode
)
286 memcached_return rc
= MEMCACHED_NOTFOUND
;
289 int flush
= number_of_keys
== 1;
292 If a server fails we warn about errors and start all over with sending keys
295 for (x
= 0; x
< number_of_keys
; x
++)
297 unsigned int server_key
;
299 if (is_master_key_set
)
300 server_key
= master_server_key
;
302 server_key
= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
304 if (memcached_server_response_count(&ptr
->hosts
[server_key
]) == 0)
306 rc
= memcached_connect(&ptr
->hosts
[server_key
]);
307 if (rc
!= MEMCACHED_SUCCESS
)
311 protocol_binary_request_getk request
= {.bytes
= {0}};
312 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
314 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
316 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
319 vk
= memcached_validate_key_length(key_length
[x
],
320 ptr
->flags
& MEM_BINARY_PROTOCOL
);
321 unlikely (vk
!= MEMCACHED_SUCCESS
)
324 memcached_io_reset(&ptr
->hosts
[server_key
]);
328 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
329 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
330 request
.message
.header
.request
.bodylen
= htonl((uint32_t) key_length
[x
]);
332 if ((memcached_io_write(&ptr
->hosts
[server_key
], request
.bytes
,
333 sizeof(request
.bytes
), 0) == -1) ||
334 (memcached_io_write(&ptr
->hosts
[server_key
], keys
[x
],
335 key_length
[x
], (char) flush
) == -1))
337 memcached_server_response_reset(&ptr
->hosts
[server_key
]);
338 rc
= MEMCACHED_SOME_ERRORS
;
341 memcached_server_response_increment(&ptr
->hosts
[server_key
]);
342 if ((x
> 0 && x
== ptr
->io_key_prefetch
) &&
343 memcached_flush_buffers(ptr
) != MEMCACHED_SUCCESS
)
344 rc
= MEMCACHED_SOME_ERRORS
;
350 * Send a noop command to flush the buffers
352 protocol_binary_request_noop request
= {.bytes
= {0}};
353 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
354 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
355 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
357 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
358 if (memcached_server_response_count(&ptr
->hosts
[x
]))
360 if (memcached_io_write(&ptr
->hosts
[x
], NULL
, 0, 1) == -1)
362 memcached_server_response_reset(&ptr
->hosts
[x
]);
363 memcached_io_reset(&ptr
->hosts
[x
]);
364 rc
= MEMCACHED_SOME_ERRORS
;
367 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
368 sizeof(request
.bytes
), 1) == -1)
370 memcached_server_response_reset(&ptr
->hosts
[x
]);
371 memcached_io_reset(&ptr
->hosts
[x
]);
372 rc
= MEMCACHED_SOME_ERRORS
;
374 memcached_server_response_increment(&ptr
->hosts
[x
]);
382 static memcached_return
replication_binary_mget(memcached_st
*ptr
,
383 uint32_t* hash
, bool* dead_servers
,
384 const char **keys
, size_t *key_length
,
385 size_t number_of_keys
, bool mget_mode
)
387 memcached_return rc
= MEMCACHED_NOTFOUND
;
390 int flush
= number_of_keys
== 1;
392 for (uint32_t replica
= 0; replica
<= ptr
->number_of_replicas
; ++replica
)
396 for (x
= 0; x
< number_of_keys
; ++x
)
398 if (hash
[x
] == ptr
->number_of_hosts
)
399 continue; /* Already successfully sent */
401 uint32_t server
= hash
[x
] + replica
;
402 while (server
>= ptr
->number_of_hosts
)
403 server
-= ptr
->number_of_hosts
;
405 if (dead_servers
[server
])
408 if (memcached_server_response_count(&ptr
->hosts
[server
]) == 0)
410 rc
= memcached_connect(&ptr
->hosts
[server
]);
411 if (rc
!= MEMCACHED_SUCCESS
)
413 memcached_io_reset(&ptr
->hosts
[server
]);
414 dead_servers
[server
]= true;
420 protocol_binary_request_getk request
= {.bytes
= {0}};
421 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
423 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETKQ
;
425 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_GETK
;
427 request
.message
.header
.request
.keylen
= htons((uint16_t)key_length
[x
]);
428 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
429 request
.message
.header
.request
.bodylen
= htonl((uint32_t) key_length
[x
]);
431 if ((memcached_io_write(&ptr
->hosts
[server
], request
.bytes
,
432 sizeof(request
.bytes
), 0) == -1) ||
433 (memcached_io_write(&ptr
->hosts
[server
], keys
[x
],
434 key_length
[x
], (char) flush
) == -1))
436 memcached_io_reset(&ptr
->hosts
[server
]);
437 dead_servers
[server
]= true;
441 memcached_server_response_increment(&ptr
->hosts
[server
]);
447 * Send a noop command to flush the buffers
449 protocol_binary_request_noop request
= {.bytes
= {0}};
450 request
.message
.header
.request
.magic
= PROTOCOL_BINARY_REQ
;
451 request
.message
.header
.request
.opcode
= PROTOCOL_BINARY_CMD_NOOP
;
452 request
.message
.header
.request
.datatype
= PROTOCOL_BINARY_RAW_BYTES
;
454 for (x
= 0; x
< ptr
->number_of_hosts
; x
++)
455 if (memcached_server_response_count(&ptr
->hosts
[x
]))
457 if (memcached_io_write(&ptr
->hosts
[x
], request
.bytes
,
458 sizeof(request
.bytes
), 1) == -1)
460 memcached_io_reset(&ptr
->hosts
[x
]);
461 dead_servers
[x
]= true;
464 memcached_server_response_increment(&ptr
->hosts
[x
]);
466 /* mark all of the messages bound for this server as sent! */
467 for (x
= 0; x
< number_of_keys
; ++x
)
469 hash
[x
]= ptr
->number_of_hosts
;
480 static memcached_return
binary_mget_by_key(memcached_st
*ptr
,
481 unsigned int master_server_key
,
482 bool is_master_key_set
,
483 const char **keys
, size_t *key_length
,
484 size_t number_of_keys
, bool mget_mode
)
488 if (ptr
->number_of_replicas
== 0)
490 rc
= simple_binary_mget(ptr
, master_server_key
, is_master_key_set
,
491 keys
, key_length
, number_of_keys
, mget_mode
);
498 hash
= ptr
->call_malloc(ptr
, sizeof(uint32_t) * number_of_keys
);
499 dead_servers
= ptr
->call_calloc(ptr
, ptr
->number_of_hosts
, sizeof(bool));
501 if (hash
== NULL
|| dead_servers
== NULL
)
503 ptr
->call_free(ptr
, hash
);
504 ptr
->call_free(ptr
, dead_servers
);
505 return MEMCACHED_MEMORY_ALLOCATION_FAILURE
;
508 if (is_master_key_set
)
509 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
510 hash
[x
]= master_server_key
;
512 for (unsigned int x
= 0; x
< number_of_keys
; x
++)
513 hash
[x
]= memcached_generate_hash(ptr
, keys
[x
], key_length
[x
]);
515 rc
= replication_binary_mget(ptr
, hash
, dead_servers
, keys
,
516 key_length
, number_of_keys
, mget_mode
);
518 ptr
->call_free(ptr
, hash
);
519 ptr
->call_free(ptr
, dead_servers
);
521 return MEMCACHED_SUCCESS
;