d4b3146da983790322440437337da27901b3f233
[m6w6/libmemcached] / src / libmemcached / purge.cc
1 /*
2 +--------------------------------------------------------------------+
3 | libmemcached - C/C++ Client Library for memcached |
4 +--------------------------------------------------------------------+
5 | Redistribution and use in source and binary forms, with or without |
6 | modification, are permitted under the terms of the BSD license. |
7 | You should have received a copy of the license in a bundled file |
8 | named LICENSE; in case you did not receive a copy you can review |
9 | the terms online at: https://opensource.org/licenses/BSD-3-Clause |
10 +--------------------------------------------------------------------+
11 | Copyright (c) 2006-2014 Brian Aker https://datadifferential.com/ |
12 | Copyright (c) 2020 Michael Wallner <mike@php.net> |
13 +--------------------------------------------------------------------+
14 */
15
16 #include "libmemcached/common.h"
17 #include "p9y/socket.hpp"
18
19 #define memcached_set_purging(__object, __value) ((__object)->state.is_purging = (__value))
20
21 class Purge {
22 public:
23 Purge(Memcached *arg)
24 : _memc(arg) {
25 memcached_set_purging(_memc, true);
26 }
27
28 ~Purge() {
29 memcached_set_purging(_memc, false);
30 }
31
32 private:
33 Memcached *_memc;
34 };
35
36 class PollTimeout {
37 public:
38 PollTimeout(Memcached *arg, int32_t ms = 50)
39 : _timeout(arg->poll_timeout)
40 , _origin(arg->poll_timeout) {
41 _origin = ms;
42 }
43
44 ~PollTimeout() {
45 _origin = _timeout;
46 }
47
48 private:
49 int32_t _timeout;
50 int32_t &_origin;
51 };
52
53 bool memcached_purge(memcached_instance_st *ptr) {
54 Memcached *root = (Memcached *) ptr->root;
55
56 if (memcached_is_purging(ptr->root) || /* already purging */
57 (memcached_server_response_count(ptr) < ptr->root->io_msg_watermark
58 && ptr->io_bytes_sent < ptr->root->io_bytes_watermark)
59 || (ptr->io_bytes_sent >= ptr->root->io_bytes_watermark
60 && memcached_server_response_count(ptr) < 2))
61 {
62 return true;
63 }
64
65 /*
66 memcached_io_write and memcached_response may call memcached_purge
67 so we need to be able stop any recursion..
68 */
69 Purge set_purge(root);
70
71 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
72 /*
73 Force a flush of the buffer to ensure that we don't have the n-1 pending
74 requests buffered up..
75 */
76 if (memcached_io_write(ptr) == false) {
77 memcached_io_reset(ptr);
78 memcached_set_error(*ptr, MEMCACHED_WRITE_FAILURE, MEMCACHED_AT);
79 return false;
80 }
81 WATCHPOINT_ASSERT(ptr->fd != INVALID_SOCKET);
82
83 bool is_successful = true;
84 uint32_t no_msg = memcached_server_response_count(ptr);
85 if (no_msg > 1) {
86 memcached_result_st result;
87
88 /*
89 * We need to increase the timeout, because we might be waiting for
90 * data to be sent from the server (the commands was in the output buffer
91 * and just flushed
92 */
93 PollTimeout poll_timeout(ptr->root);
94
95 memcached_result_st *result_ptr = memcached_result_create(root, &result);
96 assert(result_ptr);
97
98 for (uint32_t x = 0; x < no_msg - 1; x++) {
99 memcached_result_reset(result_ptr);
100 memcached_return_t rc = memcached_read_one_response(ptr, result_ptr);
101 /*
102 * Purge doesn't care for what kind of command results that is received.
103 * The only kind of errors I care about if is I'm out of sync with the
104 * protocol or have problems reading data from the network..
105 */
106 if (rc == MEMCACHED_PROTOCOL_ERROR or rc == MEMCACHED_UNKNOWN_READ_FAILURE
107 or rc == MEMCACHED_READ_FAILURE)
108 {
109 WATCHPOINT_ERROR(rc);
110 is_successful = false;
111 }
112 if (rc == MEMCACHED_TIMEOUT) {
113 break;
114 }
115
116 if (ptr->root->callbacks) {
117 memcached_callback_st cb = *ptr->root->callbacks;
118 if (memcached_success(rc)) {
119 for (uint32_t y = 0; y < cb.number_of_callback; y++) {
120 if (memcached_fatal((*cb.callback[y])(ptr->root, result_ptr, cb.context))) {
121 break;
122 }
123 }
124 }
125 }
126 }
127
128 memcached_result_free(result_ptr);
129 }
130
131 return is_successful;
132 }