bin: consolidate clients
[m6w6/libmemcached] / src / bin / memslap.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #define PROGRAM_NAME "memslap"
19 #define PROGRAM_DESCRIPTION "Generate load against a cluster of memcached servers."
20 #define PROGRAM_VERSION "1.1"
21
22 #define DEFAULT_INITIAL_LOAD 10000ul
23 #define DEFAULT_EXECUTE_NUMBER 10000ul
24 #define DEFAULT_CONCURRENCY 1ul
25
26 #include "common/options.hpp"
27 #include "common/checks.hpp"
28 #include "common/time.hpp"
29 #include "common/random.hpp"
30
31 #include <atomic>
32 #include <thread>
33 #include <iomanip>
34
35 static std::atomic_bool wakeup;
36
37 static memcached_return_t counter(const memcached_st *, memcached_result_st *, void *ctx) {
38 auto c = static_cast<size_t *>(ctx);
39 ++(*c);
40 return MEMCACHED_SUCCESS;
41 }
42
43 struct keyval_st {
44 struct data {
45 std::vector<char *> chr;
46 std::vector<size_t> len;
47 explicit data(size_t num)
48 : chr(num)
49 , len(num)
50 {}
51 };
52
53 data key;
54 data val;
55
56 size_t num;
57 random64 rnd;
58
59 explicit keyval_st(size_t num_)
60 : key{num_}
61 , val{num_}
62 , num{num_}
63 , rnd{}
64 {
65 for (auto i = 0u; i < num; ++i) {
66 gen(key.chr[i], key.len[i], val.chr[i], val.len[i]);
67 }
68 }
69
70 ~keyval_st() {
71 for (auto i = 0u; i < num; ++i) {
72 delete [] key.chr[i];
73 delete [] val.chr[i];
74 }
75 }
76
77 private:
78 void gen_key(char *&key_chr, size_t &key_len) {
79 key_len = rnd(20,60);
80 key_chr = new char[key_len + 1];
81 rnd.fill(key_chr, key_len);
82 key_chr[key_len] = 0;
83 }
84 void gen_val(const char *key_chr, const size_t key_len, char *&val_chr, size_t &val_len) {
85 val_len = rnd(50, 5000);
86 val_chr = new char[val_len];
87
88 for (auto len = 0u; len < val_len; len += key_len) {
89 auto val_pos = val_chr + len;
90 auto rem_len = len + key_len > val_len ? val_len % key_len : key_len;
91 memcpy(val_pos, key_chr, rem_len);
92 }
93 }
94 void gen(char *&key_chr, size_t &key_len, char *&val_chr, size_t &val_len) {
95 gen_key(key_chr, key_len);
96 gen_val(key_chr, key_len, val_chr, val_len);
97 }
98 };
99
100 static size_t execute_get(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
101 size_t retrieved = 0;
102 random64 rnd{};
103
104 for (auto i = 0u; i < kv.num; ++i) {
105 memcached_return_t rc;
106 auto r = rnd(0, kv.num);
107 free(memcached_get(&memc, kv.key.chr[r], kv.key.len[r], nullptr, nullptr, &rc));
108
109 if (check_return(opt, memc, kv.key.chr[r], rc)) {
110 ++retrieved;
111 }
112 }
113
114 return retrieved;
115 }
116
117 static size_t execute_mget(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
118 size_t retrieved = 0;
119 memcached_execute_fn cb[] = {&counter};
120
121 auto rc = memcached_mget_execute(&memc, kv.key.chr.data(), kv.key.len.data(), kv.num, cb, &retrieved, 1);
122
123 while (rc != MEMCACHED_END && memcached_success(rc)) {
124 rc = memcached_fetch_execute(&memc, cb, &retrieved, 1);
125 }
126
127 if (memcached_fatal(rc)) {
128 if (!opt.isset("quiet")) {
129 std::cerr << "Failed mget: " << memcached_strerror(&memc, rc) << ": "
130 << memcached_last_error_message(&memc);
131 }
132 }
133 return retrieved;
134 }
135
136 static size_t execute_set(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
137 for (auto i = 0u; i < kv.num; ++i) {
138 auto rc = memcached_set(&memc, kv.key.chr[i], kv.key.len[i], kv.val.chr[i], kv.val.len[i], 0, 0);
139
140 if (!check_return(opt, memc, kv.key.chr[i], rc)) {
141 memcached_quit(&memc);
142 return i;
143 }
144 }
145
146 return kv.num;
147 }
148
149 class thread_context {
150 public:
151 thread_context(const client_options &opt_, const memcached_st &memc_, const keyval_st &kv_)
152 : opt{opt_}
153 , kv{kv_}
154 , count{}
155 , root(memc_)
156 , memc{}
157 , thread([this]{ execute(); })
158 {}
159
160 ~thread_context() {
161 memcached_free(&memc);
162 }
163
164 size_t complete() {
165 thread.join();
166 return count;
167 }
168
169 private:
170 const client_options &opt;
171 const keyval_st &kv;
172 size_t count;
173 const memcached_st &root;
174 memcached_st memc;
175 std::thread thread;
176
177 void execute() {
178 memcached_clone(&memc, &root);
179
180 while (!wakeup.load(std::memory_order_acquire)) {
181 std::this_thread::yield();
182 }
183
184 if (!strcmp("set", opt.argof("test"))) {
185 count = execute_set(opt, memc, kv);
186 } else if (!strcmp("mget", opt.argof("test"))) {
187 count = execute_mget(opt, memc, kv);
188 } else {
189 if (strcmp("get", opt.argof("test"))) {
190 if (!opt.isset("quiet")) {
191 std::cerr << "Unknown --test: '" << opt.argof("test") << "'.\n";
192 }
193 }
194 count = execute_get(opt, memc, kv);
195 }
196 }
197 };
198
199 using opt_apply = std::function<bool(const client_options &, const client_options::extended_option &ext, memcached_st *)>;
200
201 static opt_apply wrap_stoul(unsigned long &ul) {
202 return [&ul](const client_options &, const client_options::extended_option &ext, memcached_st *) {
203 if (ext.arg && *ext.arg) {
204 auto c = std::stoul(ext.arg);
205 if (c) {
206 ul = c;
207 }
208 }
209 return true;
210 };
211 }
212
213 int main(int argc, char *argv[]) {
214 client_options opt{PROGRAM_NAME, PROGRAM_VERSION, PROGRAM_DESCRIPTION};
215 auto concurrency = DEFAULT_CONCURRENCY;
216 auto load_count = DEFAULT_INITIAL_LOAD;
217 auto test_count = DEFAULT_EXECUTE_NUMBER;
218
219 for (const auto &def : opt.defaults) {
220 opt.add(def);
221 }
222
223 opt.add("noreply", 'R', no_argument, "Enable the NOREPLY behavior for storage commands.")
224 .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) {
225 if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, ext.set)) {
226 if(!opt_.isset("quiet")) {
227 std::cerr << memcached_last_error_message(memc);
228 }
229 return false;
230 }
231 return true;
232 };
233 opt.add("udp", 'U', no_argument, "Use UDP.")
234 .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) {
235 if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, ext.set)) {
236 if (!opt_.isset("quiet")) {
237 std::cerr << memcached_last_error_message(memc) << "\n";
238 }
239 return false;
240 }
241 return true;
242 };
243 opt.add("flush", 'F', no_argument, "Flush all servers prior test.");
244 opt.add("test", 't', required_argument, "Test to perform (options: get,mget,set; default: get).");
245 opt.add("concurrency", 'c', required_argument, "Concurrency (number of threads to start; default: 1).")
246 .apply = wrap_stoul(concurrency);
247 opt.add("execute-number", 'e', required_argument, "Number of times to execute the tests (default: 10000).")
248 .apply = wrap_stoul(test_count);
249 opt.add("initial-load", 'l', required_argument, "Number of keys to load before executing tests (default: 10000)."
250 "\n\t\tDEPRECATED: --execute-number takes precedence.")
251 .apply = wrap_stoul(load_count);
252
253 char set[] = "set";
254 opt.set("test", true, set);
255
256 if (!opt.parse(argc, argv)) {
257 exit(EXIT_FAILURE);
258 }
259
260 memcached_st memc;
261 if (!check_memcached(opt, memc)) {
262 exit(EXIT_FAILURE);
263 }
264
265 if (!opt.apply(&memc)) {
266 memcached_free(&memc);
267 exit(EXIT_FAILURE);
268 }
269
270 auto total_start = time_clock::now();
271 std::cout << std::fixed << std::setprecision(3);
272
273 auto align = [](std::ostream &io) -> std::ostream &{
274 return io << std::right << std::setw(8);
275 };
276
277 if (opt.isset("flush")) {
278 if (opt.isset("verbose")) {
279 std::cout << "- Flushing servers ...\n";
280 }
281 auto flush_start = time_clock::now();
282 auto rc = memcached_flush(&memc, 0);
283 auto flush_elapsed = time_clock::now() - flush_start;
284 if (!memcached_success(rc)) {
285 if (!opt.isset("quiet")) {
286 std::cerr << "Failed to FLUSH: " << memcached_last_error_message(&memc) << "\n";
287 }
288 memcached_free(&memc);
289 exit(EXIT_FAILURE);
290 }
291 if (!opt.isset("quiet")) {
292 std::cout << "Time to flush " << align
293 << memcached_server_count(&memc)
294 << " servers: "
295 << align << time_format(flush_elapsed).count()
296 << " seconds.\n";
297 }
298 }
299
300 if (opt.isset("verbose")) {
301 std::cout << "- Generating random test data ...\n";
302 }
303 auto keyval_start = time_clock::now();
304 keyval_st kv{test_count};
305 auto keyval_elapsed = time_clock::now() - keyval_start;
306
307 if (!opt.isset("quiet")) {
308 std::cout << "Time to generate "
309 << align << test_count
310 << " test keys: "
311 << align << time_format(keyval_elapsed).count()
312 << " seconds.\n";
313 }
314
315 if (strcmp(opt.argof("test"), "set")) {
316 if (opt.isset("verbose")) {
317 std::cout << "- Feeding initial load to servers ...\n";
318 }
319 auto feed_start = time_clock::now();
320 auto count = execute_set(opt, memc, kv);
321 check_return(opt, memc, memcached_flush_buffers(&memc));
322 auto feed_elapsed = time_clock::now() - feed_start;
323
324 if (!opt.isset("quiet")) {
325 std::cout << "Time to set "
326 << align << count
327 << " keys: "
328 << align << time_format(feed_elapsed).count()
329 << " seconds.\n";
330 }
331 }
332
333 if (opt.isset("verbose")) {
334 std::cout << "- Starting " << concurrency << " threads ...\n";
335 }
336 auto thread_start = time_clock::now();
337 std::vector<thread_context *> threads{};
338 threads.reserve(concurrency);
339 for (auto i = 0ul; i < concurrency; ++i) {
340 threads.push_back(new thread_context(opt, memc, kv));
341 }
342 auto thread_elapsed = time_clock::now() - thread_start;
343 if (!opt.isset("quiet")) {
344 std::cout << "Time to start "
345 << align << concurrency
346 << " threads: "
347 << time_format(thread_elapsed).count()
348 << " seconds.\n";
349 }
350 if (opt.isset("verbose")) {
351 std::cout << "- Starting test: " << test_count
352 << " x " << opt.argof("test")
353 << " x " << concurrency
354 << " ...\n";
355 }
356 auto count = 0ul;
357 auto test_start = time_clock::now();
358 wakeup.store(true, std::memory_order_release);
359 for (auto &thread : threads) {
360 count += thread->complete();
361 delete thread;
362 }
363 auto test_elapsed = time_clock::now() - test_start;
364
365 if (!opt.isset("quiet")) {
366 std::cout << "--------------------------------------------------------------------\n"
367 << "Time to " << std::left << std::setw(4)
368 << opt.argof("test") << " "
369 << align << count
370 << " keys by "
371 << std::setw(4)
372 << concurrency << " threads: "
373 << align << time_format(test_elapsed).count()
374 << " seconds.\n";
375
376 std::cout << "--------------------------------------------------------------------\n"
377 << "Time total: "
378 << align << std::setw(12)
379 << time_format(time_clock::now() - total_start).count()
380 << " seconds.\n";
381 }
382 exit(EXIT_SUCCESS);
383 }