testing: pools
[m6w6/libmemcached] / test / tests / memcached / regression / lp_000-962-815.cpp
1 #include "test/lib/common.hpp"
2 #include "test/lib/MemcachedCluster.hpp"
3
4 #include "libmemcachedutil-1.0/pool.h"
5
6 #include <atomic>
7 #include <sstream>
8
9 static atomic<bool> running = false;
10
11 static inline void set_running(bool is) {
12 running.store(is, memory_order_release);
13 }
14 static inline bool is_running() {
15 return running.load(memory_order_consume);
16 }
17
18 struct worker_ctx {
19 memcached_pool_st *pool;
20 vector<stringstream> errors;
21
22 explicit worker_ctx(memcached_st *memc)
23 : pool{memcached_pool_create(memc, 5, 10)}
24 , errors{}
25 {
26 }
27
28 ~worker_ctx() {
29 memcached_pool_destroy(pool);
30 for (const auto &err : errors) {
31 cerr << err.str() << endl;
32 }
33 }
34
35 stringstream &err() {
36 errors.resize(errors.size()+1);
37 return errors[errors.size()-1];
38 }
39 };
40
41 static void *worker(void *arg) {
42 auto *ctx = static_cast<worker_ctx *>(arg);
43
44 while (is_running()) {
45 memcached_return_t rc;
46 timespec block{0, 1000};
47 auto *mc = memcached_pool_fetch(ctx->pool, &block, &rc);
48
49 if (!mc && rc == MEMCACHED_TIMEOUT) {
50 continue;
51 }
52 if (!mc || memcached_failed(rc)) {
53 cerr << "failed to fetch connection from pool: "
54 << memcached_strerror(nullptr, rc)
55 << endl;
56 this_thread::sleep_for(10ms);
57 continue;
58 }
59
60 auto rs = random_ascii_string(12);
61 rc = memcached_set(mc, rs.c_str(), rs.length(), rs.c_str(), rs.length(), 0, 0);
62 if (memcached_failed(rc)) {
63 cerr << "failed to memcached_set() "
64 << memcached_last_error_message(mc)
65 << endl;
66 }
67 rc = memcached_pool_release(ctx->pool, mc);
68 if (memcached_failed(rc)) {
69 cerr << "failed to release connection to pool: "
70 << memcached_strerror(nullptr, rc)
71 << endl;
72 }
73 }
74
75 return ctx;
76 }
77
78 TEST_CASE("memcached_regression_lp962815") {
79 auto test = MemcachedCluster::mixed();
80 auto memc = &test.memc;
81
82 constexpr auto NUM_THREADS = 20;
83 array<pthread_t, NUM_THREADS> tid;
84 worker_ctx ctx{memc};
85
86 set_running(true);
87
88 for (auto &t : tid) {
89 REQUIRE(0 == pthread_create(&t, nullptr, worker, &ctx));
90 }
91
92 this_thread::sleep_for(5s);
93 set_running(false);
94
95 for (auto t : tid) {
96 void *ret = nullptr;
97 REQUIRE(0 == pthread_join(t, &ret));
98 REQUIRE(ret == &ctx);
99 }
100 }