X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fmemcached_get.c;h=ee4f68e7ecb0c006c4025f346340ba0941b41ca6;hb=9131482f7923cf9e90b5a715b38e70e3a229b052;hp=dcaa8988a257344a543c34cf39e0fe6963c714c5;hpb=933e5a561b10c1450563f1421b83d72396b805d2;p=m6w6%2Flibmemcached diff --git a/libmemcached/memcached_get.c b/libmemcached/memcached_get.c index dcaa8988..ee4f68e7 100644 --- a/libmemcached/memcached_get.c +++ b/libmemcached/memcached_get.c @@ -27,11 +27,17 @@ char *memcached_get_by_key(memcached_st *ptr, uint32_t dummy_flags; memcached_return dummy_error; + unlikely (ptr->flags & MEM_USE_UDP) + { + *error= MEMCACHED_NOT_SUPPORTED; + return NULL; + } + /* Request the key */ *error= memcached_mget_by_key(ptr, master_key, master_key_length, - (char **)&key, &key_length, 1); + (const char **)&key, &key_length, 1); value= memcached_fetch(ptr, NULL, NULL, value_length, flags, error); @@ -53,7 +59,7 @@ char *memcached_get_by_key(memcached_st *ptr, { if (rc == MEMCACHED_BUFFERED) { - uint8_t latch; /* We use latch to track the state of the original socket */ + uint64_t latch; /* We use latch to track the state of the original socket */ latch= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS); if (latch == 0) memcached_behavior_set(ptr, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1); @@ -96,29 +102,34 @@ char *memcached_get_by_key(memcached_st *ptr, } memcached_return memcached_mget(memcached_st *ptr, - char **keys, size_t *key_length, - unsigned int number_of_keys) + const char **keys, size_t *key_length, + size_t number_of_keys) { return memcached_mget_by_key(ptr, NULL, 0, keys, key_length, number_of_keys); } static memcached_return binary_mget_by_key(memcached_st *ptr, unsigned int master_server_key, - char **keys, size_t *key_length, - unsigned int number_of_keys); + bool is_master_key_set, + const char **keys, size_t *key_length, + size_t number_of_keys); memcached_return memcached_mget_by_key(memcached_st *ptr, const char *master_key, size_t master_key_length, - char **keys, + const char **keys, size_t *key_length, - unsigned int number_of_keys) + size_t number_of_keys) { unsigned int x; memcached_return rc= MEMCACHED_NOTFOUND; - char *get_command= "get "; + const char *get_command= "get "; uint8_t get_command_length= 4; - unsigned int master_server_key= 0; + unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */ + bool is_master_key_set= false; + + unlikely (ptr->flags & MEM_USE_UDP) + return MEMCACHED_NOT_SUPPORTED; LIBMEMCACHED_MEMCACHED_MGET_START(); ptr->cursor_server= 0; @@ -129,20 +140,15 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, if (ptr->number_of_hosts == 0) return MEMCACHED_NO_SERVERS; - if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED)) + if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test(keys, key_length, number_of_keys) == MEMCACHED_BAD_KEY_PROVIDED)) return MEMCACHED_BAD_KEY_PROVIDED; - if (ptr->flags & MEM_SUPPORT_CAS) - { - get_command= "gets "; - get_command_length= 5; - } - if (master_key && master_key_length) { - if ((ptr->flags & MEM_VERIFY_KEY) && (memcachd_key_test((char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) + if ((ptr->flags & MEM_VERIFY_KEY) && (memcached_key_test((const char **)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) return MEMCACHED_BAD_KEY_PROVIDED; master_server_key= memcached_generate_hash(ptr, master_key, master_key_length); + is_master_key_set= true; } /* @@ -166,9 +172,15 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, } if (ptr->flags & MEM_BINARY_PROTOCOL) - return binary_mget_by_key(ptr, master_server_key, keys, + return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys, key_length, number_of_keys); + if (ptr->flags & MEM_SUPPORT_CAS) + { + get_command= "gets "; + get_command_length= 5; + } + /* If a server fails we warn about errors and start all over with sending keys to the server. @@ -177,7 +189,7 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, { unsigned int server_key; - if (master_server_key) + if (is_master_key_set) server_key= master_server_key; else server_key= memcached_generate_hash(ptr, keys[x], key_length[x]); @@ -244,12 +256,14 @@ memcached_return memcached_mget_by_key(memcached_st *ptr, return rc; } -static memcached_return binary_mget_by_key(memcached_st *ptr, +static memcached_return simple_binary_mget(memcached_st *ptr, unsigned int master_server_key, - char **keys, size_t *key_length, - unsigned int number_of_keys) + bool is_master_key_set, + const char **keys, size_t *key_length, + size_t number_of_keys) { memcached_return rc= MEMCACHED_NOTFOUND; + uint32_t x; int flush= number_of_keys == 1; @@ -257,11 +271,11 @@ static memcached_return binary_mget_by_key(memcached_st *ptr, If a server fails we warn about errors and start all over with sending keys to the server. */ - for (int x= 0; x < number_of_keys; x++) + for (x= 0; x < number_of_keys; x++) { unsigned int server_key; - if (master_server_key) + if (is_master_key_set) server_key= master_server_key; else server_key= memcached_generate_hash(ptr, keys[x], key_length[x]); @@ -273,27 +287,40 @@ static memcached_return binary_mget_by_key(memcached_st *ptr, continue; } - protocol_binary_request_getk request= {0}; + protocol_binary_request_getk request= {.bytes= {0}}; request.message.header.request.magic= PROTOCOL_BINARY_REQ; if (number_of_keys == 1) request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK; else request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ; + memcached_return vk; + vk= memcached_validate_key_length(key_length[x], + ptr->flags & MEM_BINARY_PROTOCOL); + unlikely (vk != MEMCACHED_SUCCESS) + { + if (x > 0) + memcached_io_reset(&ptr->hosts[server_key]); + return vk; + } + request.message.header.request.keylen= htons((uint16_t)key_length[x]); request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - request.message.header.request.bodylen= htonl(key_length[x]); + request.message.header.request.bodylen= htonl((uint32_t) key_length[x]); if ((memcached_io_write(&ptr->hosts[server_key], request.bytes, sizeof(request.bytes), 0) == -1) || (memcached_io_write(&ptr->hosts[server_key], keys[x], - key_length[x], flush) == -1)) + key_length[x], (char) flush) == -1)) { memcached_server_response_reset(&ptr->hosts[server_key]); rc= MEMCACHED_SOME_ERRORS; continue; } memcached_server_response_increment(&ptr->hosts[server_key]); + if ((x > 0 && x == ptr->io_key_prefetch) && + memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS) + rc= MEMCACHED_SOME_ERRORS; } if (number_of_keys > 1) @@ -301,12 +328,12 @@ static memcached_return binary_mget_by_key(memcached_st *ptr, /* * Send a noop command to flush the buffers */ - protocol_binary_request_noop request= {0}; + protocol_binary_request_noop request= {.bytes= {0}}; request.message.header.request.magic= PROTOCOL_BINARY_REQ; request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP; request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - for (int x= 0; x < ptr->number_of_hosts; x++) + for (x= 0; x < ptr->number_of_hosts; x++) if (memcached_server_response_count(&ptr->hosts[x])) { if (memcached_io_write(&ptr->hosts[x], NULL, 0, 1) == -1) @@ -330,3 +357,148 @@ static memcached_return binary_mget_by_key(memcached_st *ptr, return rc; } + +static memcached_return replication_binary_mget(memcached_st *ptr, + uint32_t* hash, bool* dead_servers, + const char **keys, size_t *key_length, + size_t number_of_keys) +{ + memcached_return rc= MEMCACHED_NOTFOUND; + uint32_t x; + + int flush= number_of_keys == 1; + + for (uint32_t replica= 0; replica <= ptr->number_of_replicas; ++replica) + { + bool success= true; + + for (x= 0; x < number_of_keys; ++x) + { + if (hash[x] == ptr->number_of_hosts) + continue; /* Already successfully sent */ + + uint32_t server= hash[x] + replica; + while (server >= ptr->number_of_hosts) + server -= ptr->number_of_hosts; + + if (dead_servers[server]) + continue; + + if (memcached_server_response_count(&ptr->hosts[server]) == 0) + { + rc= memcached_connect(&ptr->hosts[server]); + if (rc != MEMCACHED_SUCCESS) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + } + + protocol_binary_request_getk request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + if (number_of_keys == 1) + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETK; + else + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_GETKQ; + + request.message.header.request.keylen= htons((uint16_t)key_length[x]); + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + request.message.header.request.bodylen= htonl((uint32_t) key_length[x]); + + if ((memcached_io_write(&ptr->hosts[server], request.bytes, + sizeof(request.bytes), 0) == -1) || + (memcached_io_write(&ptr->hosts[server], keys[x], + key_length[x], (char) flush) == -1)) + { + memcached_io_reset(&ptr->hosts[server]); + dead_servers[server]= true; + success= false; + continue; + } + memcached_server_response_increment(&ptr->hosts[server]); + } + + if (number_of_keys > 1) + { + /* + * Send a noop command to flush the buffers + */ + protocol_binary_request_noop request= {.bytes= {0}}; + request.message.header.request.magic= PROTOCOL_BINARY_REQ; + request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP; + request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; + + for (x= 0; x < ptr->number_of_hosts; x++) + if (memcached_server_response_count(&ptr->hosts[x])) + { + if (memcached_io_write(&ptr->hosts[x], request.bytes, + sizeof(request.bytes), 1) == -1) + { + memcached_io_reset(&ptr->hosts[x]); + dead_servers[x]= true; + success= false; + } + memcached_server_response_increment(&ptr->hosts[x]); + + /* mark all of the messages bound for this server as sent! */ + for (x= 0; x < number_of_keys; ++x) + if (hash[x] == x) + hash[x]= ptr->number_of_hosts; + } + } + + if (success) + break; + } + + return rc; +} + +static memcached_return binary_mget_by_key(memcached_st *ptr, + unsigned int master_server_key, + bool is_master_key_set, + const char **keys, size_t *key_length, + size_t number_of_keys) +{ + memcached_return rc; + + if (ptr->number_of_replicas == 0) + { + rc= simple_binary_mget(ptr, master_server_key, is_master_key_set, + keys, key_length, number_of_keys); + } + else + { + uint32_t* hash; + bool* dead_servers; + + hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys); + dead_servers= ptr->call_calloc(ptr, ptr->number_of_hosts, sizeof(bool)); + + if (hash == NULL || dead_servers == NULL) + { + ptr->call_free(ptr, hash); + ptr->call_free(ptr, dead_servers); + return MEMCACHED_MEMORY_ALLOCATION_FAILURE; + } + + if (is_master_key_set) + for (unsigned int x= 0; x < number_of_keys; x++) + hash[x]= master_server_key; + else + for (unsigned int x= 0; x < number_of_keys; x++) + hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]); + + rc= replication_binary_mget(ptr, hash, dead_servers, keys, + key_length, number_of_keys); + + ptr->call_free(ptr, hash); + ptr->call_free(ptr, dead_servers); + + return MEMCACHED_SUCCESS; + } + + return rc; +}