revert most of d7a0084bf99d618d1dc26a54fd413db7ae8b8e63
[awesomized/libmemcached] / src / libmemcached / purge.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached-awesome - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020-2021 Michael Wallner https://awesome.co/ |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17
18 #define memcached_set_purging(object_, value_) ((object_)->state.is_purging = (value_))
19
20 class Purge {
21 public:
22 Purge(Memcached *arg)
23 : _memc(arg) {
24 memcached_set_purging(_memc, true);
25 }
26
27 ~Purge() {
28 memcached_set_purging(_memc, false);
29 }
30
31 private:
32 Memcached *_memc;
33 };
34
35 class PollTimeout {
36 public:
37 PollTimeout(Memcached *arg, int32_t ms = 50)
38 : _timeout(arg->poll_timeout)
39 , _origin(arg->poll_timeout) {
40 _origin = ms;
41 }
42
43 ~PollTimeout() {
44 _origin = _timeout;
45 }
46
47 private:
48 int32_t _timeout;
49 int32_t &_origin;
50 };
51
52 bool memcached_purge(memcached_instance_st *ptr) {
53 Memcached *root = (Memcached *) ptr->root;
54
55 if (memcached_is_purging(ptr->root) || /* already purging */
56 (memcached_server_response_count(ptr) < ptr->root->io_msg_watermark
57 && ptr->io_bytes_sent < ptr->root->io_bytes_watermark)
58 || (ptr->io_bytes_sent >= ptr->root->io_bytes_watermark
59 && memcached_server_response_count(ptr) < 2))
60 {
61 return true;
62 }
63
64 /*
65 memcached_io_write and memcached_response may call memcached_purge
66 so we need to be able stop any recursion..
67 */
68 Purge set_purge(root);
69
70 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
71 /*
72 Force a flush of the buffer to ensure that we don't have the n-1 pending
73 requests buffered up..
74 */
75 if (memcached_io_write(ptr) == false) {
76 memcached_io_reset(ptr);
77 memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
78 return false;
79 }
80 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
81
82 bool is_successful = true;
83 uint32_t no_msg = memcached_server_response_count(ptr);
84 if (no_msg > 1) {
85 memcached_result_st result;
86
87 /*
88 * We need to increase the timeout, because we might be waiting for
89 * data to be sent from the server (the commands was in the output buffer
90 * and just flushed
91 */
92 PollTimeout poll_timeout(ptr->root);
93
94 memcached_result_st *result_ptr = memcached_result_create(root, &result);
95 assert(result_ptr);
96
97 for (uint32_t x = 0; x < no_msg - 1; x++) {
98 memcached_result_reset(result_ptr);
99 memcached_return_t rc = memcached_read_one_response(ptr, result_ptr);
100 /*
101 * Purge doesn't care for what kind of command results that is received.
102 * The only kind of errors I care about if is I'm out of sync with the
103 * protocol or have problems reading data from the network..
104 */
105 if (rc == MEMCACHED_PROTOCOL_ERROR or rc == MEMCACHED_UNKNOWN_READ_FAILURE
106 or rc == MEMCACHED_READ_FAILURE)
107 {
108 WATCHPOINT_ERROR(rc);
109 is_successful = false;
110 }
111
112 if (ptr->root->callbacks) {
113 memcached_callback_st cb = *ptr->root->callbacks;
114 if (memcached_success(rc)) {
115 for (uint32_t y = 0; y < cb.number_of_callback; y++) {
116 if (memcached_fatal((*cb.callback[y])(ptr->root, result_ptr, cb.context))) {
117 break;
118 }
119 }
120 }
121 }
122 }
123
124 memcached_result_free(result_ptr);
125 }
126
127 return is_successful;
128 }