changelog
[awesomized/libmemcached] / src / bin / memslap.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached-awesome - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "mem_config.h"
17
18 #define PROGRAM_NAME "memslap"
19 #define PROGRAM_DESCRIPTION "Generate load against a cluster of memcached servers."
20 #define PROGRAM_VERSION "1.1"
21
22 #define DEFAULT_INITIAL_LOAD 10000ul
23 #define DEFAULT_EXECUTE_NUMBER 10000ul
24 #define DEFAULT_CONCURRENCY 1ul
25
26 #include "common/options.hpp"
27 #include "common/checks.hpp"
28 #include "common/time.hpp"
29 #include "common/random.hpp"
30
31 #include <atomic>
32 #include <thread>
33 #include <iomanip>
34
35 static std::atomic_bool wakeup;
36
37 static memcached_return_t counter(const memcached_st *, memcached_result_st *, void *ctx) {
38 auto c = static_cast<size_t *>(ctx);
39 ++(*c);
40 return MEMCACHED_SUCCESS;
41 }
42
43 struct keyval_st {
44 struct data {
45 std::vector<char *> chr;
46 std::vector<size_t> len;
47 explicit data(size_t num)
48 : chr(num)
49 , len(num)
50 {}
51 };
52
53 data key;
54 data val;
55
56 size_t num;
57 random64 rnd;
58
59 explicit keyval_st(size_t num_)
60 : key{num_}
61 , val{num_}
62 , num{num_}
63 , rnd{}
64 {
65 for (auto i = 0u; i < num; ++i) {
66 gen(key.chr[i], key.len[i], val.chr[i], val.len[i]);
67 }
68 }
69
70 ~keyval_st() {
71 for (auto i = 0u; i < num; ++i) {
72 delete [] key.chr[i];
73 delete [] val.chr[i];
74 }
75 }
76
77 private:
78 void gen_key(char *&key_chr, size_t &key_len) {
79 key_len = rnd(20,60);
80 key_chr = new char[key_len + 1];
81 rnd.fill(key_chr, key_len);
82 key_chr[key_len] = 0;
83 }
84 void gen_val(const char *key_chr, const size_t key_len, char *&val_chr, size_t &val_len) {
85 val_len = rnd(50, 5000);
86 val_chr = new char[val_len];
87
88 for (auto len = 0u; len < val_len; len += key_len) {
89 auto val_pos = val_chr + len;
90 auto rem_len = len + key_len > val_len ? val_len % key_len : key_len;
91 memcpy(val_pos, key_chr, rem_len);
92 }
93 }
94 void gen(char *&key_chr, size_t &key_len, char *&val_chr, size_t &val_len) {
95 gen_key(key_chr, key_len);
96 gen_val(key_chr, key_len, val_chr, val_len);
97 }
98 };
99
100 static size_t execute_get(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
101 size_t retrieved = 0;
102 random64 rnd{};
103
104 for (auto i = 0u; i < kv.num; ++i) {
105 memcached_return_t rc;
106 auto r = rnd(0, kv.num);
107 free(memcached_get(&memc, kv.key.chr[r], kv.key.len[r], nullptr, nullptr, &rc));
108
109 if (check_return(opt, memc, kv.key.chr[r], rc)) {
110 ++retrieved;
111 }
112 }
113
114 return retrieved;
115 }
116
117 static size_t execute_mget(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
118 size_t retrieved = 0;
119 memcached_execute_fn cb[] = {&counter};
120 memcached_return_t rc;
121
122 if (memcached_is_binary(&memc)) {
123 rc = memcached_mget_execute(&memc, kv.key.chr.data(), kv.key.len.data(), kv.num, cb,
124 &retrieved, 1);
125
126 while (rc != MEMCACHED_END && memcached_success(rc)) {
127 rc = memcached_fetch_execute(&memc, cb, &retrieved, 1);
128 }
129 } else {
130 memcached_result_st res;
131 memcached_result_create(&memc, &res);
132
133 rc = memcached_mget(&memc, kv.key.chr.data(), kv.key.len.data(), kv.num);
134
135 while (rc != MEMCACHED_END && memcached_success(rc)) {
136 if (memcached_fetch_result(&memc, &res, &rc)) {
137 ++retrieved;
138 }
139 }
140 memcached_result_free(&res);
141 }
142 if (memcached_fatal(rc)) {
143 if (!opt.isset("quiet")) {
144 std::cerr << "Failed mget: " << memcached_strerror(&memc, rc) << ": "
145 << memcached_last_error_message(&memc);
146 }
147 }
148 return retrieved;
149 }
150
151 static size_t execute_set(const client_options &opt, memcached_st &memc, const keyval_st &kv) {
152 for (auto i = 0u; i < kv.num; ++i) {
153 auto rc = memcached_set(&memc, kv.key.chr[i], kv.key.len[i], kv.val.chr[i], kv.val.len[i], 0, 0);
154
155 if (!check_return(opt, memc, kv.key.chr[i], rc)) {
156 memcached_quit(&memc);
157 return i;
158 }
159 }
160
161 return kv.num;
162 }
163
164 class thread_context {
165 public:
166 thread_context(const client_options &opt_, const memcached_st &memc_, const keyval_st &kv_)
167 : opt{opt_}
168 , kv{kv_}
169 , count{}
170 , root(memc_)
171 , memc{}
172 , thread([this]{ execute(); })
173 {}
174
175 ~thread_context() {
176 memcached_free(&memc);
177 }
178
179 size_t complete() {
180 thread.join();
181 return count;
182 }
183
184 private:
185 const client_options &opt;
186 const keyval_st &kv;
187 size_t count;
188 const memcached_st &root;
189 memcached_st memc;
190 std::thread thread;
191
192 void execute() {
193 memcached_clone(&memc, &root);
194
195 while (!wakeup.load(std::memory_order_acquire)) {
196 std::this_thread::yield();
197 }
198
199 if (!strcmp("set", opt.argof("test"))) {
200 count = execute_set(opt, memc, kv);
201 } else if (!strcmp("mget", opt.argof("test"))) {
202 count = execute_mget(opt, memc, kv);
203 } else {
204 if (strcmp("get", opt.argof("test"))) {
205 if (!opt.isset("quiet")) {
206 std::cerr << "Unknown --test: '" << opt.argof("test") << "'.\n";
207 }
208 }
209 count = execute_get(opt, memc, kv);
210 }
211 }
212 };
213
214 using opt_apply = std::function<bool(const client_options &, const client_options::extended_option &ext, memcached_st *)>;
215
216 static opt_apply wrap_stoul(unsigned long &ul) {
217 return [&ul](const client_options &, const client_options::extended_option &ext, memcached_st *) {
218 if (ext.arg && *ext.arg) {
219 auto c = std::stoul(ext.arg);
220 if (c) {
221 ul = c;
222 }
223 }
224 return true;
225 };
226 }
227
228 static std::ostream &align(std::ostream &io) {
229 return io << std::right << std::setw(8);
230 }
231
232 int main(int argc, char *argv[]) {
233 client_options opt{PROGRAM_NAME, PROGRAM_VERSION, PROGRAM_DESCRIPTION};
234 auto concurrency = DEFAULT_CONCURRENCY;
235 auto load_count = DEFAULT_INITIAL_LOAD;
236 auto test_count = DEFAULT_EXECUTE_NUMBER;
237
238 for (const auto &def : opt.defaults) {
239 opt.add(def);
240 }
241
242 opt.add("noreply", 'R', no_argument, "Enable the NOREPLY behavior for storage commands.")
243 .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) {
244 if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_NOREPLY, ext.set)) {
245 if(!opt_.isset("quiet")) {
246 std::cerr << memcached_last_error_message(memc);
247 }
248 return false;
249 }
250 return true;
251 };
252 opt.add("udp", 'U', no_argument, "Use UDP.")
253 .apply = [](const client_options &opt_, const client_options::extended_option &ext, memcached_st *memc) {
254 if (MEMCACHED_SUCCESS != memcached_behavior_set(memc, MEMCACHED_BEHAVIOR_USE_UDP, ext.set)) {
255 if (!opt_.isset("quiet")) {
256 std::cerr << memcached_last_error_message(memc) << "\n";
257 }
258 return false;
259 }
260 return true;
261 };
262 opt.add("flush", 'F', no_argument, "Flush all servers prior test.");
263 opt.add("test", 't', required_argument, "Test to perform (options: get,mget,set; default: get).");
264 opt.add("concurrency", 'c', required_argument, "Concurrency (number of threads to start; default: 1).")
265 .apply = wrap_stoul(concurrency);
266 opt.add("execute-number", 'e', required_argument, "Number of times to execute the tests (default: 10000).")
267 .apply = wrap_stoul(test_count);
268 opt.add("initial-load", 'l', required_argument, "Number of keys to load before executing tests (default: 10000)."
269 "\n\t\tDEPRECATED: --execute-number takes precedence.")
270 .apply = wrap_stoul(load_count);
271
272 char set[] = "set";
273 opt.set("test", true, set);
274
275 if (!opt.parse(argc, argv)) {
276 exit(EXIT_FAILURE);
277 }
278
279 memcached_st memc;
280 if (!check_memcached(opt, memc)) {
281 exit(EXIT_FAILURE);
282 }
283
284 if (!opt.apply(&memc)) {
285 memcached_free(&memc);
286 exit(EXIT_FAILURE);
287 }
288
289 auto total_start = time_clock::now();
290 std::cout << std::fixed << std::setprecision(3);
291
292 if (opt.isset("flush")) {
293 if (opt.isset("verbose")) {
294 std::cout << "- Flushing servers ...\n";
295 }
296 auto flush_start = time_clock::now();
297 auto rc = memcached_flush(&memc, 0);
298 auto flush_elapsed = time_clock::now() - flush_start;
299 if (!memcached_success(rc)) {
300 if (!opt.isset("quiet")) {
301 std::cerr << "Failed to FLUSH: " << memcached_last_error_message(&memc) << "\n";
302 }
303 memcached_free(&memc);
304 exit(EXIT_FAILURE);
305 }
306 if (!opt.isset("quiet")) {
307 std::cout << "Time to flush " << align
308 << memcached_server_count(&memc)
309 << " servers: "
310 << align << time_format(flush_elapsed).count()
311 << " seconds.\n";
312 }
313 }
314
315 if (opt.isset("verbose")) {
316 std::cout << "- Generating random test data ...\n";
317 }
318 auto keyval_start = time_clock::now();
319 keyval_st kv{test_count};
320 auto keyval_elapsed = time_clock::now() - keyval_start;
321
322 if (!opt.isset("quiet")) {
323 std::cout << "Time to generate "
324 << align << test_count
325 << " test keys: "
326 << align << time_format(keyval_elapsed).count()
327 << " seconds.\n";
328 }
329
330 if (strcmp(opt.argof("test"), "set")) {
331 if (opt.isset("verbose")) {
332 std::cout << "- Feeding initial load to servers ...\n";
333 }
334 auto feed_start = time_clock::now();
335 auto count = execute_set(opt, memc, kv);
336 check_return(opt, memc, memcached_flush_buffers(&memc));
337 auto feed_elapsed = time_clock::now() - feed_start;
338
339 if (!opt.isset("quiet")) {
340 std::cout << "Time to set "
341 << align << count
342 << " keys: "
343 << align << time_format(feed_elapsed).count()
344 << " seconds.\n";
345 }
346 }
347
348 if (opt.isset("verbose")) {
349 std::cout << "- Starting " << concurrency << " threads ...\n";
350 }
351 auto thread_start = time_clock::now();
352 std::vector<thread_context *> threads{};
353 threads.reserve(concurrency);
354 for (auto i = 0ul; i < concurrency; ++i) {
355 threads.push_back(new thread_context(opt, memc, kv));
356 }
357 auto thread_elapsed = time_clock::now() - thread_start;
358 if (!opt.isset("quiet")) {
359 std::cout << "Time to start "
360 << align << concurrency
361 << " threads: "
362 << time_format(thread_elapsed).count()
363 << " seconds.\n";
364 }
365 if (opt.isset("verbose")) {
366 std::cout << "- Starting test: " << test_count
367 << " x " << opt.argof("test")
368 << " x " << concurrency
369 << " ...\n";
370 }
371 auto count = 0ul;
372 auto test_start = time_clock::now();
373 wakeup.store(true, std::memory_order_release);
374 for (auto &thread : threads) {
375 count += thread->complete();
376 delete thread;
377 }
378 auto test_elapsed = time_clock::now() - test_start;
379
380 if (!opt.isset("quiet")) {
381 std::cout << "--------------------------------------------------------------------\n"
382 << "Time to " << std::left << std::setw(4)
383 << opt.argof("test") << " "
384 << align << count
385 << " keys by "
386 << std::setw(4)
387 << concurrency << " threads: "
388 << align << time_format(test_elapsed).count()
389 << " seconds.\n";
390
391 std::cout << "--------------------------------------------------------------------\n"
392 << "Time total: "
393 << align << std::setw(12)
394 << time_format(time_clock::now() - total_start).count()
395 << " seconds.\n";
396 }
397 exit(EXIT_SUCCESS);
398 }