+static void increment_request_id(uint16_t *id)
+{
+ (*id)++;
+ if ((*id & UDP_REQUEST_ID_THREAD_MASK) != 0)
+ *id= 0;
+}
+
+static uint16_t *get_udp_request_ids(memcached_st *memc)
+{
+ uint16_t *ids= malloc(sizeof(uint16_t) * memc->number_of_hosts);
+ assert(ids != NULL);
+ unsigned int x;
+ for (x= 0; x < memc->number_of_hosts; x++)
+ ids[x]= get_udp_datagram_request_id((struct udp_datagram_header_st *) memc->hosts[x].write_buffer);
+
+ return ids;
+}
+
+static test_return post_udp_op_check(memcached_st *memc, uint16_t *expected_req_ids)
+{
+ unsigned int x;
+ memcached_server_st *cur_server = memc->hosts;
+ uint16_t *cur_req_ids = get_udp_request_ids(memc);
+ for (x= 0; x < memc->number_of_hosts; x++)
+ {
+ assert(cur_server[x].cursor_active == 0);
+ assert(cur_req_ids[x] == expected_req_ids[x]);
+ }
+ free(expected_req_ids);
+ free(cur_req_ids);
+ return TEST_SUCCESS;
+}
+
+/*
+** There is a little bit of a hack here, instead of removing
+** the servers, I just set num host to 0 and them add then new udp servers
+**/
+static memcached_return init_udp(memcached_st *memc)
+{
+ memcached_version(memc);
+ /* For the time being, only support udp test for >= 1.2.6 && < 1.3 */
+ if (memc->hosts[0].major_version != 1 || memc->hosts[0].minor_version != 2
+ || memc->hosts[0].micro_version < 6)
+ return MEMCACHED_FAILURE;
+
+ uint32_t num_hosts= memc->number_of_hosts;
+ unsigned int x= 0;
+ memcached_server_st servers[num_hosts];
+ memcpy(servers, memc->hosts, sizeof(memcached_server_st) * num_hosts);
+ memc->number_of_hosts= 0;
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1);
+ for (x= 0; x < num_hosts; x++)
+ {
+ assert(memcached_server_add_udp(memc, servers[x].hostname, servers[x].port) == MEMCACHED_SUCCESS);
+ assert(memc->hosts[x].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+ }
+ return MEMCACHED_SUCCESS;
+}
+
+static memcached_return binary_init_udp(memcached_st *memc)
+{
+ pre_binary(memc);
+ return init_udp(memc);
+}
+
+/* Make sure that I cant add a tcp server to a udp client */
+static test_return add_tcp_server_udp_client_test(memcached_st *memc)
+{
+ memcached_server_st server;
+ memcached_server_clone(&server, &memc->hosts[0]);
+ assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS);
+ assert(memcached_server_add(memc, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL);
+ return TEST_SUCCESS;
+}
+
+/* Make sure that I cant add a udp server to a tcp client */
+static test_return add_udp_server_tcp_client_test(memcached_st *memc)
+{
+ memcached_server_st server;
+ memcached_server_clone(&server, &memc->hosts[0]);
+ assert(memcached_server_remove(&(memc->hosts[0])) == MEMCACHED_SUCCESS);
+
+ memcached_st tcp_client;
+ memcached_create(&tcp_client);
+ assert(memcached_server_add_udp(&tcp_client, server.hostname, server.port) == MEMCACHED_INVALID_HOST_PROTOCOL);
+ return TEST_SUCCESS;
+}
+
+static test_return set_udp_behavior_test(memcached_st *memc)
+{
+
+ memcached_quit(memc);
+ memc->number_of_hosts= 0;
+ run_distribution(memc);
+ assert(memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, 1) == MEMCACHED_SUCCESS);
+ assert(memc->flags & MEM_USE_UDP);
+ assert(memc->flags & MEM_NOREPLY);;
+
+ assert(memc->number_of_hosts == 0);
+
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP,0);
+ assert(!(memc->flags & MEM_USE_UDP));
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY,0);
+ assert(!(memc->flags & MEM_NOREPLY));
+ return TEST_SUCCESS;
+}
+
+static test_return udp_set_test(memcached_st *memc)
+{
+ unsigned int x= 0;
+ unsigned int num_iters= 1025; //request id rolls over at 1024
+ for (x= 0; x < num_iters;x++)
+ {
+ memcached_return rc;
+ char *key= "foo";
+ char *value= "when we sanitize";
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc,key,strlen(key));
+ size_t init_offset= memc->hosts[server_key].write_buffer_offset;
+ rc= memcached_set(memc, key, strlen(key),
+ value, strlen(value),
+ (time_t)0, (uint32_t)0);
+ assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+ /** NB, the check below assumes that if new write_ptr is less than
+ * the original write_ptr that we have flushed. For large payloads, this
+ * maybe an invalid assumption, but for the small payload we have it is OK
+ */
+ if (rc == MEMCACHED_SUCCESS ||
+ memc->hosts[server_key].write_buffer_offset < init_offset)
+ increment_request_id(&expected_ids[server_key]);
+
+ if (rc == MEMCACHED_SUCCESS)
+ {
+ assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+ }
+ else
+ {
+ assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
+ assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
+ }
+ assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS);
+ }
+ return TEST_SUCCESS;
+}
+
+static test_return udp_buffered_set_test(memcached_st *memc)
+{
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
+ return udp_set_test(memc);
+}
+
+static test_return udp_set_too_big_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "bar";
+ char value[MAX_UDP_DATAGRAM_LENGTH];
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ rc= memcached_set(memc, key, strlen(key),
+ value, MAX_UDP_DATAGRAM_LENGTH,
+ (time_t)0, (uint32_t)0);
+ assert(rc == MEMCACHED_WRITE_FAILURE);
+ return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_delete_test(memcached_st *memc)
+{
+ unsigned int x= 0;
+ unsigned int num_iters= 1025; //request id rolls over at 1024
+ for (x= 0; x < num_iters;x++)
+ {
+ memcached_return rc;
+ char *key= "foo";
+ uint16_t *expected_ids=get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+ size_t init_offset= memc->hosts[server_key].write_buffer_offset;
+ rc= memcached_delete(memc, key, strlen(key), 0);
+ assert(rc == MEMCACHED_SUCCESS || rc == MEMCACHED_BUFFERED);
+ if (rc == MEMCACHED_SUCCESS || memc->hosts[server_key].write_buffer_offset < init_offset)
+ increment_request_id(&expected_ids[server_key]);
+ if (rc == MEMCACHED_SUCCESS)
+ assert(memc->hosts[server_key].write_buffer_offset == UDP_DATAGRAM_HEADER_LENGTH);
+ else
+ {
+ assert(memc->hosts[server_key].write_buffer_offset != UDP_DATAGRAM_HEADER_LENGTH);
+ assert(memc->hosts[server_key].write_buffer_offset <= MAX_UDP_DATAGRAM_LENGTH);
+ }
+ assert(post_udp_op_check(memc,expected_ids) == TEST_SUCCESS);
+ }
+ return TEST_SUCCESS;
+}
+
+static test_return udp_buffered_delete_test(memcached_st *memc)
+{
+ memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_BUFFER_REQUESTS, 1);
+ return udp_delete_test(memc);
+}
+
+test_return udp_verbosity_test(memcached_st *memc)
+{
+ memcached_return rc;
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int x;
+ for (x= 0; x < memc->number_of_hosts;x++)
+ increment_request_id(&expected_ids[x]);
+
+ rc= memcached_verbosity(memc,3);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_quit_test(memcached_st *memc)
+{
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ memcached_quit(memc);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_flush_test(memcached_st *memc)
+{
+ memcached_return rc;
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int x;
+ for (x= 0; x < memc->number_of_hosts;x++)
+ increment_request_id(&expected_ids[x]);
+
+ rc= memcached_flush(memc,0);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc,expected_ids);
+}
+
+test_return udp_incr_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "incr";
+ char *value= "1";
+ rc= memcached_set(memc, key, strlen(key),
+ value, strlen(value),
+ (time_t)0, (uint32_t)0);
+
+ assert(rc == MEMCACHED_SUCCESS);
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+ increment_request_id(&expected_ids[server_key]);
+ uint64_t newvalue;
+ rc= memcached_increment(memc, key, strlen(key), 1, &newvalue);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_decr_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "decr";
+ char *value= "1";
+ rc= memcached_set(memc, key, strlen(key),
+ value, strlen(value),
+ (time_t)0, (uint32_t)0);
+
+ assert(rc == MEMCACHED_SUCCESS);
+ uint16_t *expected_ids= get_udp_request_ids(memc);
+ unsigned int server_key= memcached_generate_hash(memc, key, strlen(key));
+ increment_request_id(&expected_ids[server_key]);
+ uint64_t newvalue;
+ rc= memcached_decrement(memc, key, strlen(key), 1, &newvalue);
+ assert(rc == MEMCACHED_SUCCESS);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+
+test_return udp_stat_test(memcached_st *memc)
+{
+ memcached_stat_st * rv= NULL;
+ memcached_return rc;
+ char args[]= "";
+ uint16_t *expected_ids = get_udp_request_ids(memc);
+ rv = memcached_stat(memc, args, &rc);
+ free(rv);
+ assert(rc == MEMCACHED_NOT_SUPPORTED);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_version_test(memcached_st *memc)
+{
+ memcached_return rc;
+ uint16_t *expected_ids = get_udp_request_ids(memc);
+ rc = memcached_version(memc);
+ assert(rc == MEMCACHED_NOT_SUPPORTED);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_get_test(memcached_st *memc)
+{
+ memcached_return rc;
+ char *key= "foo";
+ size_t vlen;
+ uint16_t *expected_ids = get_udp_request_ids(memc);
+ char *val= memcached_get(memc, key, strlen(key), &vlen, (uint32_t)0, &rc);
+ assert(rc == MEMCACHED_NOT_SUPPORTED);
+ assert(val == NULL);
+ return post_udp_op_check(memc, expected_ids);
+}
+
+test_return udp_mixed_io_test(memcached_st *memc)
+{
+ test_st current_op;
+ test_st mixed_io_ops [] ={
+ {"udp_set_test", 0, udp_set_test},
+ {"udp_set_too_big_test", 0, udp_set_too_big_test},
+ {"udp_delete_test", 0, udp_delete_test},
+ {"udp_verbosity_test", 0, udp_verbosity_test},
+ {"udp_quit_test", 0, udp_quit_test},
+ {"udp_flush_test", 0, udp_flush_test},
+ {"udp_incr_test", 0, udp_incr_test},
+ {"udp_decr_test", 0, udp_decr_test},
+ {"udp_version_test", 0, udp_version_test}
+ };
+ unsigned int x= 0;
+ for (x= 0; x < 500; x++)
+ {
+ current_op= mixed_io_ops[random() % 9];
+ assert(current_op.function(memc) == TEST_SUCCESS);
+ }
+ return TEST_SUCCESS;
+}
+
+test_st udp_setup_server_tests[] ={
+ {"set_udp_behavior_test", 0, set_udp_behavior_test},
+ {"add_tcp_server_udp_client_test", 0, add_tcp_server_udp_client_test},
+ {"add_udp_server_tcp_client_test", 0, add_udp_server_tcp_client_test},
+ {0, 0, 0}
+};
+
+test_st upd_io_tests[] ={
+ {"udp_set_test", 0, udp_set_test},
+ {"udp_buffered_set_test", 0, udp_buffered_set_test},
+ {"udp_set_too_big_test", 0, udp_set_too_big_test},
+ {"udp_delete_test", 0, udp_delete_test},
+ {"udp_buffered_delete_test", 0, udp_buffered_delete_test},
+ {"udp_verbosity_test", 0, udp_verbosity_test},
+ {"udp_quit_test", 0, udp_quit_test},
+ {"udp_flush_test", 0, udp_flush_test},
+ {"udp_incr_test", 0, udp_incr_test},
+ {"udp_decr_test", 0, udp_decr_test},
+ {"udp_stat_test", 0, udp_stat_test},
+ {"udp_version_test", 0, udp_version_test},
+ {"udp_get_test", 0, udp_get_test},
+ {"udp_mixed_io_test", 0, udp_mixed_io_test},
+ {0, 0, 0}
+};
+