X-Git-Url: https://git.m6w6.name/?a=blobdiff_plain;f=libmemcached%2Fget.c;h=13fe60799418e2bc8cc451f7d6009319553fe60a;hb=385cc099521ab79b8ecb289cd25c46d7cf3caa5c;hp=67670e5c574986a966e4fcb9969783686611341b;hpb=ac40b10adf3a182fe62d24899eb96c10364ba91b;p=m6w6%2Flibmemcached diff --git a/libmemcached/get.c b/libmemcached/get.c index 67670e5c..13fe6079 100644 --- a/libmemcached/get.c +++ b/libmemcached/get.c @@ -1,5 +1,5 @@ /* LibMemcached - * Copyright (C) 2006-2009 Brian Aker + * Copyright (C) 2006-2009 Brian Aker * All rights reserved. * * Use and distribution licensed under the BSD license. See @@ -129,7 +129,7 @@ memcached_return_t memcached_mget(memcached_st *ptr, } static memcached_return_t binary_mget_by_key(memcached_st *ptr, - unsigned int master_server_key, + uint32_t master_server_key, bool is_master_key_set, const char * const *keys, const size_t *key_length, @@ -144,8 +144,7 @@ static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr, size_t number_of_keys, bool mget_mode) { - unsigned int x; - memcached_return_t rc= MEMCACHED_NOTFOUND; + bool failures_occured_in_sending= false; const char *get_command= "get "; uint8_t get_command_length= 4; unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */ @@ -155,7 +154,6 @@ static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr, return MEMCACHED_NOT_SUPPORTED; LIBMEMCACHED_MEMCACHED_MGET_START(); - ptr->cursor_server= 0; if (number_of_keys == 0) return MEMCACHED_NOTFOUND; @@ -170,7 +168,7 @@ static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr, { if (ptr->flags.verify_key && (memcached_key_test((const char * const *)&master_key, &master_key_length, 1) == MEMCACHED_BAD_KEY_PROVIDED)) return MEMCACHED_BAD_KEY_PROVIDED; - master_server_key= memcached_generate_hash(ptr, master_key, master_key_length); + master_server_key= memcached_generate_hash_with_redistribution(ptr, master_key, master_key_length); is_master_key_set= true; } @@ -180,23 +178,28 @@ static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr, It might be optimum to bounce the connection if count > some number. */ - for (x= 0; x < memcached_server_count(ptr); x++) + for (uint32_t x= 0; x < memcached_server_count(ptr); x++) { - if (memcached_server_response_count(&ptr->hosts[x])) + memcached_server_write_instance_st instance= + memcached_server_instance_fetch(ptr, x); + + if (memcached_server_response_count(instance)) { char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE]; if (ptr->flags.no_block) - (void)memcached_io_write(&ptr->hosts[x], NULL, 0, 1); + (void)memcached_io_write(instance, NULL, 0, true); - while(memcached_server_response_count(&ptr->hosts[x])) - (void)memcached_response(&ptr->hosts[x], buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result); + while(memcached_server_response_count(instance)) + (void)memcached_response(instance, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, &ptr->result); } } if (ptr->flags.binary_protocol) + { return binary_mget_by_key(ptr, master_server_key, is_master_key_set, keys, key_length, number_of_keys, mget_mode); + } if (ptr->flags.support_cas) { @@ -208,75 +211,106 @@ static memcached_return_t memcached_mget_by_key_real(memcached_st *ptr, If a server fails we warn about errors and start all over with sending keys to the server. */ - for (x= 0; x < number_of_keys; x++) + memcached_return_t rc= MEMCACHED_SUCCESS; + size_t hosts_connected= 0; + for (uint32_t x= 0; x < number_of_keys; x++) { - unsigned int server_key; + memcached_server_write_instance_st instance; + uint32_t server_key; if (is_master_key_set) + { server_key= master_server_key; + } else - server_key= memcached_generate_hash(ptr, keys[x], key_length[x]); + { + server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); + } + + instance= memcached_server_instance_fetch(ptr, server_key); - if (memcached_server_response_count(&ptr->hosts[server_key]) == 0) + struct libmemcached_io_vector_st vector[]= { - rc= memcached_connect(&ptr->hosts[server_key]); + { .length= get_command_length, .buffer= get_command }, + { .length= memcached_array_size(ptr->prefix_key), .buffer= memcached_array_string(ptr->prefix_key) }, + { .length= key_length[x], .buffer= keys[x] }, + { .length= 1, .buffer= " " } + }; + + + if (memcached_server_response_count(instance) == 0) + { + rc= memcached_connect(instance); if (rc != MEMCACHED_SUCCESS) + { continue; + } + hosts_connected++; - if ((memcached_io_write(&ptr->hosts[server_key], get_command, get_command_length, 0)) == -1) + if ((memcached_io_writev(instance, vector, 4, false)) == -1) { - rc= MEMCACHED_SOME_ERRORS; + failures_occured_in_sending= true; continue; } - WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 0); - memcached_server_response_increment(&ptr->hosts[server_key]); - WATCHPOINT_ASSERT(ptr->hosts[server_key].cursor_active == 1); + WATCHPOINT_ASSERT(instance->cursor_active == 0); + memcached_server_response_increment(instance); + WATCHPOINT_ASSERT(instance->cursor_active == 1); } - - /* Only called when we have a prefix key */ - if (ptr->prefix_key[0] != 0) + else { - if ((memcached_io_write(&ptr->hosts[server_key], ptr->prefix_key, ptr->prefix_key_length, 0)) == -1) + if ((memcached_io_writev(instance, (vector + 1), 3, false)) == -1) { - memcached_server_response_reset(&ptr->hosts[server_key]); - rc= MEMCACHED_SOME_ERRORS; + memcached_server_response_reset(instance); + failures_occured_in_sending= true; continue; } } + } - if ((memcached_io_write(&ptr->hosts[server_key], keys[x], key_length[x], 0)) == -1) - { - memcached_server_response_reset(&ptr->hosts[server_key]); - rc= MEMCACHED_SOME_ERRORS; - continue; - } + if (hosts_connected == 0) + { + LIBMEMCACHED_MEMCACHED_MGET_END(); - if ((memcached_io_write(&ptr->hosts[server_key], " ", 1, 0)) == -1) - { - memcached_server_response_reset(&ptr->hosts[server_key]); - rc= MEMCACHED_SOME_ERRORS; - continue; - } + if (rc != MEMCACHED_SUCCESS) + return rc; + + return MEMCACHED_NO_SERVERS; } + /* Should we muddle on if some servers are dead? */ - for (x= 0; x < memcached_server_count(ptr); x++) + bool success_happened= false; + for (uint32_t x= 0; x < memcached_server_count(ptr); x++) { - if (memcached_server_response_count(&ptr->hosts[x])) + memcached_server_write_instance_st instance= + memcached_server_instance_fetch(ptr, x); + + if (memcached_server_response_count(instance)) { /* We need to do something about non-connnected hosts in the future */ - if ((memcached_io_write(&ptr->hosts[x], "\r\n", 2, 1)) == -1) + if ((memcached_io_write(instance, "\r\n", 2, true)) == -1) + { + failures_occured_in_sending= true; + } + else { - rc= MEMCACHED_SOME_ERRORS; + success_happened= true; } } } LIBMEMCACHED_MEMCACHED_MGET_END(); - return rc; + + if (failures_occured_in_sending && success_happened) + return MEMCACHED_SOME_ERRORS; + + if (success_happened) + return MEMCACHED_SUCCESS; + + return MEMCACHED_FAILURE; } memcached_return_t memcached_mget_by_key(memcached_st *ptr, @@ -332,14 +366,13 @@ memcached_return_t memcached_mget_execute_by_key(memcached_st *ptr, } static memcached_return_t simple_binary_mget(memcached_st *ptr, - unsigned int master_server_key, + uint32_t master_server_key, bool is_master_key_set, const char * const *keys, const size_t *key_length, size_t number_of_keys, bool mget_mode) { memcached_return_t rc= MEMCACHED_NOTFOUND; - uint32_t x; int flush= number_of_keys == 1; @@ -347,18 +380,25 @@ static memcached_return_t simple_binary_mget(memcached_st *ptr, If a server fails we warn about errors and start all over with sending keys to the server. */ - for (x= 0; x < number_of_keys; x++) + for (uint32_t x= 0; x < number_of_keys; ++x) { - unsigned int server_key; + uint32_t server_key; + memcached_server_write_instance_st instance; if (is_master_key_set) + { server_key= master_server_key; + } else - server_key= memcached_generate_hash(ptr, keys[x], key_length[x]); + { + server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); + } - if (memcached_server_response_count(&ptr->hosts[server_key]) == 0) + instance= memcached_server_instance_fetch(ptr, server_key); + + if (memcached_server_response_count(instance) == 0) { - rc= memcached_connect(&ptr->hosts[server_key]); + rc= memcached_connect(instance); if (rc != MEMCACHED_SUCCESS) continue; } @@ -376,30 +416,38 @@ static memcached_return_t simple_binary_mget(memcached_st *ptr, unlikely (vk != MEMCACHED_SUCCESS) { if (x > 0) - memcached_io_reset(&ptr->hosts[server_key]); + { + memcached_io_reset(instance); + } + return vk; } - request.message.header.request.keylen= htons((uint16_t)key_length[x]); + request.message.header.request.keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key))); request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - request.message.header.request.bodylen= htonl((uint32_t) key_length[x]); + request.message.header.request.bodylen= htonl((uint32_t)( key_length[x] + memcached_array_size(ptr->prefix_key))); - if ((memcached_io_write(&ptr->hosts[server_key], request.bytes, - sizeof(request.bytes), 0) == -1) || - (memcached_io_write(&ptr->hosts[server_key], keys[x], - key_length[x], (char) flush) == -1)) + struct libmemcached_io_vector_st vector[]= { - memcached_server_response_reset(&ptr->hosts[server_key]); + { .length= sizeof(request.bytes), .buffer= request.bytes }, + { .length= memcached_array_size(ptr->prefix_key), .buffer= memcached_array_string(ptr->prefix_key) }, + { .length= key_length[x], .buffer= keys[x] } + }; + + if (memcached_io_writev(instance, vector, 3, flush) == -1) + { + memcached_server_response_reset(instance); rc= MEMCACHED_SOME_ERRORS; continue; } /* We just want one pending response per server */ - memcached_server_response_reset(&ptr->hosts[server_key]); - memcached_server_response_increment(&ptr->hosts[server_key]); - if ((x > 0 && x == ptr->io_key_prefetch) && - memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS) + memcached_server_response_reset(instance); + memcached_server_response_increment(instance); + if ((x > 0 && x == ptr->io_key_prefetch) && memcached_flush_buffers(ptr) != MEMCACHED_SUCCESS) + { rc= MEMCACHED_SOME_ERRORS; + } } if (mget_mode) @@ -412,24 +460,29 @@ static memcached_return_t simple_binary_mget(memcached_st *ptr, request.message.header.request.opcode= PROTOCOL_BINARY_CMD_NOOP; request.message.header.request.datatype= PROTOCOL_BINARY_RAW_BYTES; - for (x= 0; x < memcached_server_count(ptr); x++) - if (memcached_server_response_count(&ptr->hosts[x])) + for (uint32_t x= 0; x < memcached_server_count(ptr); ++x) + { + memcached_server_write_instance_st instance= + memcached_server_instance_fetch(ptr, x); + + if (memcached_server_response_count(instance)) { - if (memcached_io_write(&ptr->hosts[x], NULL, 0, 1) == -1) + if (memcached_io_write(instance, NULL, 0, true) == -1) { - memcached_server_response_reset(&ptr->hosts[x]); - memcached_io_reset(&ptr->hosts[x]); + memcached_server_response_reset(instance); + memcached_io_reset(instance); rc= MEMCACHED_SOME_ERRORS; } - if (memcached_io_write(&ptr->hosts[x], request.bytes, - sizeof(request.bytes), 1) == -1) + if (memcached_io_write(instance, request.bytes, + sizeof(request.bytes), true) == -1) { - memcached_server_response_reset(&ptr->hosts[x]); - memcached_io_reset(&ptr->hosts[x]); + memcached_server_response_reset(instance); + memcached_io_reset(instance); rc= MEMCACHED_SOME_ERRORS; } } + } } @@ -444,7 +497,7 @@ static memcached_return_t replication_binary_mget(memcached_st *ptr, size_t number_of_keys) { memcached_return_t rc= MEMCACHED_NOTFOUND; - uint32_t x, start= 0; + uint32_t start= 0; uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ); if (randomize_read) @@ -455,8 +508,10 @@ static memcached_return_t replication_binary_mget(memcached_st *ptr, { bool success= true; - for (x= 0; x < number_of_keys; ++x) + for (uint32_t x= 0; x < number_of_keys; ++x) { + memcached_server_write_instance_st instance; + if (hash[x] == memcached_server_count(ptr)) continue; /* Already successfully sent */ @@ -472,12 +527,14 @@ static memcached_return_t replication_binary_mget(memcached_st *ptr, if (dead_servers[server]) continue; - if (memcached_server_response_count(&ptr->hosts[server]) == 0) + instance= memcached_server_instance_fetch(ptr, server); + + if (memcached_server_response_count(instance) == 0) { - rc= memcached_connect(&ptr->hosts[server]); + rc= memcached_connect(instance); if (rc != MEMCACHED_SUCCESS) { - memcached_io_reset(&ptr->hosts[server]); + memcached_io_reset(instance); dead_servers[server]= true; success= false; continue; @@ -488,9 +545,9 @@ static memcached_return_t replication_binary_mget(memcached_st *ptr, .message.header.request= { .magic= PROTOCOL_BINARY_REQ, .opcode= PROTOCOL_BINARY_CMD_GETK, - .keylen= htons((uint16_t)key_length[x]), + .keylen= htons((uint16_t)(key_length[x] + memcached_array_size(ptr->prefix_key))), .datatype= PROTOCOL_BINARY_RAW_BYTES, - .bodylen= htonl((uint32_t)key_length[x]) + .bodylen= htonl((uint32_t)(key_length[x] + memcached_array_size(ptr->prefix_key))) } }; @@ -504,18 +561,22 @@ static memcached_return_t replication_binary_mget(memcached_st *ptr, * that we might have processed some of the responses etc. For now, * just make sure we work _correctly_ */ - if ((memcached_io_write(&ptr->hosts[server], request.bytes, - sizeof(request.bytes), 0) == -1) || - (memcached_io_write(&ptr->hosts[server], keys[x], - key_length[x], 1) == -1)) + struct libmemcached_io_vector_st vector[]= + { + { .length= sizeof(request.bytes), .buffer= request.bytes }, + { .length= memcached_array_size(ptr->prefix_key), .buffer= memcached_array_string(ptr->prefix_key) }, + { .length= key_length[x], .buffer= keys[x] } + }; + + if (memcached_io_writev(instance, vector, 3, true) == -1) { - memcached_io_reset(&ptr->hosts[server]); + memcached_io_reset(instance); dead_servers[server]= true; success= false; continue; } - memcached_server_response_increment(&ptr->hosts[server]); + memcached_server_response_increment(instance); hash[x]= memcached_server_count(ptr); } @@ -527,7 +588,7 @@ static memcached_return_t replication_binary_mget(memcached_st *ptr, } static memcached_return_t binary_mget_by_key(memcached_st *ptr, - unsigned int master_server_key, + uint32_t master_server_key, bool is_master_key_set, const char * const *keys, const size_t *key_length, @@ -546,28 +607,36 @@ static memcached_return_t binary_mget_by_key(memcached_st *ptr, uint32_t* hash; bool* dead_servers; - hash= ptr->call_malloc(ptr, sizeof(uint32_t) * number_of_keys); - dead_servers= ptr->call_calloc(ptr, memcached_server_count(ptr), sizeof(bool)); + hash= libmemcached_malloc(ptr, sizeof(uint32_t) * number_of_keys); + dead_servers= libmemcached_calloc(ptr, memcached_server_count(ptr), sizeof(bool)); if (hash == NULL || dead_servers == NULL) { - ptr->call_free(ptr, hash); - ptr->call_free(ptr, dead_servers); + libmemcached_free(ptr, hash); + libmemcached_free(ptr, dead_servers); return MEMCACHED_MEMORY_ALLOCATION_FAILURE; } if (is_master_key_set) - for (unsigned int x= 0; x < number_of_keys; x++) + { + for (size_t x= 0; x < number_of_keys; x++) + { hash[x]= master_server_key; + } + } else - for (unsigned int x= 0; x < number_of_keys; x++) - hash[x]= memcached_generate_hash(ptr, keys[x], key_length[x]); + { + for (size_t x= 0; x < number_of_keys; x++) + { + hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); + } + } rc= replication_binary_mget(ptr, hash, dead_servers, keys, key_length, number_of_keys); - ptr->call_free(ptr, hash); - ptr->call_free(ptr, dead_servers); + libmemcached_free(ptr, hash); + libmemcached_free(ptr, dead_servers); return MEMCACHED_SUCCESS; }