Add an example using innodb as a storage (NOTE: this version currently leaks memorybz...
[awesomized/libmemcached] / example / storage_innodb.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include <stdlib.h>
3 #include <inttypes.h>
4 #include <time.h>
5 #include <stdbool.h>
6 #include <string.h>
7 #include <unistd.h>
8 #include <assert.h>
9 #include <embedded_innodb-1.0/innodb.h>
10
11 #include "storage.h"
12
13 const char *tablename = "memcached/items";
14
15 #define key_col_idx 0
16 #define data_col_idx 1
17 #define flags_col_idx 2
18 #define cas_col_idx 3
19 #define exp_col_idx 4
20
21 static uint64_t cas;
22
23 /**
24 * To avoid cluttering down all the code with error checking I use the
25 * following macro. It will execute the statement and verify that the
26 * result of the operation is DB_SUCCESS. If any other error code is
27 * returned it will print an "assert-like" output and jump to the
28 * label error_exit. There I release resources before returning out of
29 * the function.
30 *
31 * @param a the expression to execute
32 *
33 */
34 #define checked(expression) \
35 do { \
36 ib_err_t checked_err= expression; \
37 if (checked_err != DB_SUCCESS) \
38 { \
39 fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \
40 __FILE__, __LINE__, #expression, \
41 ib_strerror(checked_err)); \
42 goto error_exit; \
43 } \
44 } while (0);
45
46 /**
47 * Create the database schema.
48 * @return true if the database schema was created without any problems
49 * false otherwise.
50 */
51 static bool create_schema(void) {
52 ib_tbl_sch_t schema= NULL;
53 ib_idx_sch_t index= NULL;
54
55 if (ib_database_create("memcached") != IB_TRUE)
56 {
57 fprintf(stderr, "Failed to create database\n");
58 return false;
59 }
60
61 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
62 ib_id_t table_id;
63
64 checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
65 checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
66 IB_COL_NOT_NULL, 0, 32767));
67 checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
68 IB_COL_NONE, 0, 1024*1024));
69 checked(ib_table_schema_add_col(schema, "flags", IB_INT,
70 IB_COL_UNSIGNED, 0, 4));
71 checked(ib_table_schema_add_col(schema, "cas", IB_INT,
72 IB_COL_UNSIGNED, 0, 8));
73 checked(ib_table_schema_add_col(schema, "exp", IB_INT,
74 IB_COL_UNSIGNED, 0, 4));
75 checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &index));
76 checked(ib_index_schema_add_col(index, "key", 0));
77 checked(ib_index_schema_set_clustered(index));
78 checked(ib_schema_lock_exclusive(transaction));
79 checked(ib_table_create(transaction, schema, &table_id));
80 checked(ib_trx_commit(transaction));
81 ib_table_schema_delete(schema);
82
83 return true;
84
85 error_exit:
86 /* @todo release resources! */
87 {
88 ib_err_t error= ib_trx_rollback(transaction);
89 if (error != DB_SUCCESS)
90 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
91 ib_strerror(error));
92 }
93 return false;
94 }
95
96 /**
97 * Store an item into the database. Update the CAS id on the item before
98 * storing it in the database.
99 *
100 * @param trx the transaction to use
101 * @param item the item to store
102 * @return true if we can go ahead and commit the transaction, false otherwise
103 */
104 bool do_put_item(ib_trx_t trx, struct item* item) {
105 update_cas(item);
106
107 ib_crsr_t cursor = NULL;
108 ib_tpl_t tuple= NULL;
109 bool retval = false;
110
111 checked(ib_cursor_open_table(tablename, trx, &cursor));
112 checked(ib_cursor_lock(cursor, IB_LOCK_X));
113 tuple= ib_clust_read_tuple_create(cursor);
114
115 checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
116 checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
117 checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
118 checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
119 checked(ib_tuple_write_u32(tuple, exp_col_idx, item->exp));
120 checked(ib_cursor_insert_row(cursor, tuple));
121
122 retval= true;
123 /* Release resources: */
124 /* FALLTHROUGH */
125
126 error_exit:
127 if (tuple != NULL)
128 ib_tuple_delete(tuple);
129
130 if (cursor != NULL)
131 ib_cursor_close(cursor);
132
133 return retval;
134 }
135
136 /**
137 * Try to locate an item in the database. Return a cursor and the tuple to
138 * the item if I found it in the database.
139 *
140 * @param trx the transaction to use
141 * @param key the key of the item to look up
142 * @param nkey the size of the key
143 * @param cursor where to store the cursor (OUT)
144 * @param tuple where to store the tuple (OUT)
145 * @return true if I found the object, false otherwise
146 */
147 static bool do_locate_item(ib_trx_t trx,
148 const void* key,
149 size_t nkey,
150 ib_crsr_t *cursor)
151 {
152 int res;
153 ib_tpl_t tuple;
154
155 *cursor = NULL;
156
157 checked(ib_cursor_open_table(tablename, trx, cursor));
158 tuple= ib_clust_search_tuple_create(*cursor);
159 if (tuple == NULL)
160 {
161 fprintf(stderr, "Failed to allocate tuple object\n");
162 goto error_exit;
163 }
164
165 checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
166 ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
167
168 if (err == DB_SUCCESS && res == 0)
169 {
170 ib_tuple_delete(tuple);
171 return true;
172 }
173 else if (err != DB_SUCCESS &&
174 err != DB_RECORD_NOT_FOUND &&
175 err != DB_END_OF_INDEX)
176 {
177 fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
178 }
179 /* FALLTHROUGH */
180 error_exit:
181 if (tuple != NULL)
182 ib_tuple_delete(tuple);
183 if (*cursor != NULL)
184 ib_cursor_close(*cursor);
185 *cursor= NULL;
186
187 return false;
188 }
189
190 /**
191 * Try to get an item from the database
192 *
193 * @param trx the transaction to use
194 * @param key the key to get
195 * @param nkey the lenght of the key
196 * @return a pointer to the item if I found it in the database
197 */
198 static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey) {
199 ib_crsr_t cursor= NULL;
200 ib_tpl_t tuple= NULL;
201 struct item* retval= NULL;
202
203 if (do_locate_item(trx, key, nkey, &cursor)) {
204 tuple= ib_clust_read_tuple_create(cursor);
205 if (tuple == NULL)
206 {
207 fprintf(stderr, "Failed to create read tuple\n");
208 goto error_exit;
209 }
210 checked(ib_cursor_read_row(cursor, tuple));
211 ib_col_meta_t meta;
212 ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
213 ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
214 ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
215 ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
216 const void *dataptr= ib_col_get_value(tuple, data_col_idx);
217
218 retval= create_item(key, nkey, dataptr, datalen, 0, 0);
219 if (retval == NULL) {
220 fprintf(stderr, "Failed to allocate memory\n");
221 goto error_exit;
222 }
223
224 if (flaglen != 0) {
225 ib_u32_t val;
226 checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
227 retval->flags= (uint32_t)val;
228 }
229 if (caslen != 0) {
230 ib_u64_t val;
231 checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
232 retval->cas= (uint64_t)val;
233 }
234 if (explen != 0) {
235 ib_u32_t val;
236 checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
237 retval->exp= (uint32_t)val;
238 }
239 }
240
241 /* Release resources */
242 /* FALLTHROUGH */
243
244 error_exit:
245 if (tuple != NULL)
246 ib_tuple_delete(tuple);
247
248 if (cursor != NULL)
249 ib_cursor_close(cursor);
250
251 return retval;
252 }
253
254 /**
255 * Delete an item from the cache
256 * @param trx the transaction to use
257 * @param key the key of the item to delete
258 * @param nkey the length of the key
259 * @return true if we should go ahead and commit the transaction
260 * or false if we should roll back (if the key didn't exists)
261 */
262 static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
263 ib_crsr_t cursor= NULL;
264 bool retval= false;
265
266 if (do_locate_item(trx, key, nkey, &cursor))
267 {
268 checked(ib_cursor_lock(cursor, IB_LOCK_X));
269 checked(ib_cursor_delete_row(cursor));
270 retval = true;
271 }
272 /* Release resources */
273 /* FALLTHROUGH */
274
275 error_exit:
276 if (cursor != NULL)
277 ib_cursor_close(cursor);
278
279 return retval;
280 }
281
282
283 /****************************************************************************
284 * External interface
285 ***************************************************************************/
286
287 /**
288 * Initialize the database storage
289 * @return true if the database was initialized successfully, false otherwise
290 */
291 bool initialize_storage(void) {
292 ib_err_t error;
293 ib_id_t tid;
294
295 checked(ib_init());
296 checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
297 checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
298 checked(ib_cfg_set_bool_on("file_per_table"));
299 checked(ib_startup("barracuda"));
300
301 /* check to see if the table exists or if we should create the schema */
302 error= ib_table_get_id(tablename, &tid);
303 if (error == DB_TABLE_NOT_FOUND) {
304 if (!create_schema()) {
305 return false;
306 }
307 } else if (error != DB_SUCCESS) {
308 fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
309 return false;
310 }
311
312 return true;
313
314 error_exit:
315 return false;
316 }
317
318 /**
319 * Shut down this storage engine
320 */
321 void shutdown_storage(void) {
322 checked(ib_shutdown());
323 error_exit:
324 ;
325 }
326
327 /**
328 * Store an item in the databse
329 *
330 * @param item the item to store
331 */
332 void put_item(struct item* item) {
333 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
334 if (do_put_item(transaction, item)) {
335 ib_err_t error= ib_trx_commit(transaction);
336 if (error != DB_SUCCESS) {
337 fprintf(stderr, "Failed to store key:\n\t%s\n",
338 ib_strerror(error));
339 }
340 } else {
341 ib_err_t error= ib_trx_rollback(transaction);
342 if (error != DB_SUCCESS)
343 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
344 ib_strerror(error));
345 }
346 }
347
348 /**
349 * Get an item from the engine
350 * @param key the key to grab
351 * @param nkey number of bytes in the key
352 * @return pointer to the item if found
353 */
354 struct item* get_item(const void* key, size_t nkey) {
355 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
356 struct item* ret= do_get_item(transaction, key, nkey);
357 ib_err_t error= ib_trx_rollback(transaction);
358 if (error != DB_SUCCESS)
359 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
360 ib_strerror(error));
361
362 return ret;
363 }
364
365 /**
366 * Create an item structure and initialize it with the content
367 *
368 * @param key the key for the item
369 * @param nkey the number of bytes in the key
370 * @param data pointer to the value for the item (may be NULL)
371 * @param size the size of the data
372 * @param flags the flags to store with the data
373 * @param exp the expiry time for the item
374 * @return pointer to an initialized item object or NULL if allocation failed
375 */
376 struct item* create_item(const void* key, size_t nkey, const void* data,
377 size_t size, uint32_t flags, time_t exp)
378 {
379 struct item* ret= calloc(1, sizeof(*ret));
380 if (ret != NULL)
381 {
382 ret->key= malloc(nkey);
383 if (size > 0)
384 {
385 ret->data= malloc(size);
386 }
387
388 if (ret->key == NULL || (size > 0 && ret->data == NULL))
389 {
390 free(ret->key);
391 free(ret->data);
392 free(ret);
393 return NULL;
394 }
395
396 memcpy(ret->key, key, nkey);
397 if (data != NULL)
398 {
399 memcpy(ret->data, data, size);
400 }
401
402 ret->nkey= nkey;
403 ret->size= size;
404 ret->flags= flags;
405 ret->exp= exp;
406 }
407
408 return ret;
409 }
410
411 /**
412 * Delete an item from the cache
413 * @param key the key of the item to delete
414 * @param nkey the length of the key
415 * @return true if the item was deleted from the cache
416 */
417 bool delete_item(const void* key, size_t nkey) {
418 ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
419
420 bool ret= do_delete_item(transaction, key, nkey);
421
422 if (ret)
423 {
424 /* object found. commit transaction */
425 ib_err_t error= ib_trx_commit(transaction);
426 if (error != DB_SUCCESS)
427 {
428 fprintf(stderr, "Failed to delete key:\n\t%s\n",
429 ib_strerror(error));
430 ret= false;
431 }
432 }
433 else
434 {
435 ib_err_t error= ib_trx_rollback(transaction);
436 if (error != DB_SUCCESS)
437 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
438 ib_strerror(error));
439 }
440
441 return ret;
442 }
443
444 /**
445 * Flush the entire cache
446 * @param when when the cache should be flushed (0 == immediately)
447 */
448 void flush(uint32_t when) {
449 /* @TODO implement support for when != 0 */
450 ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
451 ib_crsr_t cursor= NULL;
452 ib_err_t err= DB_SUCCESS;
453
454 checked(ib_cursor_open_table(tablename, transaction, &cursor));
455 checked(ib_cursor_first(cursor));
456 checked(ib_cursor_lock(cursor, IB_LOCK_X));
457
458 do {
459 checked(ib_cursor_delete_row(cursor));
460 } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
461
462 if (err != DB_END_OF_INDEX)
463 {
464 fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
465 goto error_exit;
466 }
467 ib_cursor_close(cursor);
468 cursor= NULL;
469 checked(ib_trx_commit(transaction));
470 return;
471
472 error_exit:
473 if (cursor != NULL)
474 ib_cursor_close(cursor);
475
476 ib_err_t error= ib_trx_rollback(transaction);
477 if (error != DB_SUCCESS)
478 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
479 ib_strerror(error));
480 }
481
482 /**
483 * Update the cas ID in the item structure
484 * @param item the item to update
485 */
486 void update_cas(struct item* item) {
487 item->cas= ++cas;
488 }