Big fix for async mode to make sure all data has been pushed through socket
[m6w6/libmemcached] / lib / memcached_storage.c
1 /*
2 Memcached library
3
4 memcached_set()
5 memcached_replace()
6 memcached_add()
7
8 */
9
10 #include "common.h"
11 #include "memcached_io.h"
12
13 typedef enum {
14 SET_OP,
15 REPLACE_OP,
16 ADD_OP,
17 PREPEND_OP,
18 APPEND_OP,
19 CAS_OP,
20 } memcached_storage_action;
21
22 /* Inline this */
23 char *storage_op_string(memcached_storage_action verb)
24 {
25 switch (verb)
26 {
27 case SET_OP:
28 return "set";
29 case REPLACE_OP:
30 return "replace";
31 case ADD_OP:
32 return "add";
33 case PREPEND_OP:
34 return "prepend";
35 case APPEND_OP:
36 return "append";
37 case CAS_OP:
38 return "cas";
39 };
40
41 return SET_OP;
42 }
43
44 static inline memcached_return memcached_send(memcached_st *ptr,
45 char *key, size_t key_length,
46 char *value, size_t value_length,
47 time_t expiration,
48 uint16_t flags,
49 uint64_t cas,
50 memcached_storage_action verb)
51 {
52 char to_write;
53 size_t write_length;
54 ssize_t sent_length;
55 memcached_return rc;
56 char buffer[MEMCACHED_DEFAULT_COMMAND_SIZE];
57 unsigned int server_key;
58
59 WATCHPOINT_ASSERT(!(value == NULL && value_length > 0));
60 WATCHPOINT_ASSERT(!(value && value_length == 0));
61
62 if (key_length == 0)
63 return MEMCACHED_NO_KEY_PROVIDED;
64
65 server_key= memcached_generate_hash(ptr, key, key_length);
66
67 if (cas)
68 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
69 "%s %.*s %u %llu %zu %llu\r\n", storage_op_string(verb),
70 (int)key_length, key, flags,
71 (unsigned long long)expiration, value_length,
72 (unsigned long long)cas);
73 else
74 write_length= snprintf(buffer, MEMCACHED_DEFAULT_COMMAND_SIZE,
75 "%s %.*s %u %llu %zu\r\n", storage_op_string(verb),
76 (int)key_length, key, flags,
77 (unsigned long long)expiration, value_length);
78
79 if (write_length >= MEMCACHED_DEFAULT_COMMAND_SIZE)
80 {
81 rc= MEMCACHED_WRITE_FAILURE;
82 goto error;
83 }
84
85 rc= memcached_do(ptr, server_key, buffer, write_length, 0);
86 if (rc != MEMCACHED_SUCCESS)
87 goto error;
88
89 if ((sent_length= memcached_io_write(ptr, server_key, value, value_length, 0)) == -1)
90 {
91 rc= MEMCACHED_WRITE_FAILURE;
92 goto error;
93 }
94
95 if ((ptr->flags & MEM_NO_BLOCK) && verb == SET_OP)
96 to_write= 0;
97 else
98 to_write= 1;
99
100 if ((sent_length= memcached_io_write(ptr, server_key, "\r\n", 2, to_write)) == -1)
101 {
102 memcached_quit_server(ptr, server_key);
103 rc= MEMCACHED_WRITE_FAILURE;
104 goto error;
105 }
106
107 if (to_write == 0)
108 {
109 rc= MEMCACHED_SUCCESS;
110 memcached_server_response_increment(ptr, server_key);
111 }
112 else
113 {
114 rc= memcached_response(ptr, buffer, MEMCACHED_DEFAULT_COMMAND_SIZE, server_key);
115 }
116 if (rc == MEMCACHED_STORED)
117 return MEMCACHED_SUCCESS;
118 else
119 return rc;
120
121 error:
122 memcached_io_reset(ptr, server_key);
123
124 return rc;
125 }
126
127 memcached_return memcached_set(memcached_st *ptr, char *key, size_t key_length,
128 char *value, size_t value_length,
129 time_t expiration,
130 uint16_t flags)
131 {
132 memcached_return rc;
133 LIBMEMCACHED_MEMCACHED_SET_START();
134 rc= memcached_send(ptr, key, key_length, value, value_length,
135 expiration, flags, 0, SET_OP);
136 LIBMEMCACHED_MEMCACHED_SET_END();
137 return rc;
138 }
139
140 memcached_return memcached_add(memcached_st *ptr,
141 char *key, size_t key_length,
142 char *value, size_t value_length,
143 time_t expiration,
144 uint16_t flags)
145 {
146 memcached_return rc;
147 LIBMEMCACHED_MEMCACHED_ADD_START();
148 rc= memcached_send(ptr, key, key_length, value, value_length,
149 expiration, flags, 0, ADD_OP);
150 LIBMEMCACHED_MEMCACHED_ADD_END();
151 return rc;
152 }
153
154 memcached_return memcached_replace(memcached_st *ptr,
155 char *key, size_t key_length,
156 char *value, size_t value_length,
157 time_t expiration,
158 uint16_t flags)
159 {
160 memcached_return rc;
161 LIBMEMCACHED_MEMCACHED_REPLACE_START();
162 rc= memcached_send(ptr, key, key_length, value, value_length,
163 expiration, flags, 0, REPLACE_OP);
164 LIBMEMCACHED_MEMCACHED_REPLACE_END();
165 return rc;
166 }
167
168 memcached_return memcached_prepend(memcached_st *ptr,
169 char *key, size_t key_length,
170 char *value, size_t value_length,
171 time_t expiration,
172 uint16_t flags)
173 {
174 memcached_return rc;
175 rc= memcached_send(ptr, key, key_length, value, value_length,
176 expiration, flags, 0, PREPEND_OP);
177 return rc;
178 }
179
180 memcached_return memcached_append(memcached_st *ptr,
181 char *key, size_t key_length,
182 char *value, size_t value_length,
183 time_t expiration,
184 uint16_t flags)
185 {
186 memcached_return rc;
187 rc= memcached_send(ptr, key, key_length, value, value_length,
188 expiration, flags, 0, APPEND_OP);
189 return rc;
190 }
191
192 memcached_return memcached_cas(memcached_st *ptr,
193 char *key, size_t key_length,
194 char *value, size_t value_length,
195 time_t expiration,
196 uint16_t flags,
197 uint64_t cas)
198 {
199 memcached_return rc;
200 rc= memcached_send(ptr, key, key_length, value, value_length,
201 expiration, flags, cas, APPEND_OP);
202 return rc;
203 }