Rollup merge.
[awesomized/libmemcached] / example / storage_innodb.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include <stdlib.h>
3 #include <inttypes.h>
4 #include <time.h>
5 #include <stdbool.h>
6 #include <string.h>
7 #include <unistd.h>
8 #include <assert.h>
9 #include <embedded_innodb-1.0/innodb.h>
10
11 #include "storage.h"
12
13 const char *tablename= "memcached/items";
14
15 #define key_col_idx 0
16 #define data_col_idx 1
17 #define flags_col_idx 2
18 #define cas_col_idx 3
19 #define exp_col_idx 4
20
21 static uint64_t cas;
22
23 /**
24 * To avoid cluttering down all the code with error checking I use the
25 * following macro. It will execute the statement and verify that the
26 * result of the operation is DB_SUCCESS. If any other error code is
27 * returned it will print an "assert-like" output and jump to the
28 * label error_exit. There I release resources before returning out of
29 * the function.
30 *
31 * @param a the expression to execute
32 *
33 */
34 #define checked(expression) \
35 do { \
36 ib_err_t checked_err= expression; \
37 if (checked_err != DB_SUCCESS) \
38 { \
39 fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \
40 __FILE__, __LINE__, #expression, \
41 ib_strerror(checked_err)); \
42 goto error_exit; \
43 } \
44 } while (0);
45
46 /**
47 * Create the database schema.
48 * @return true if the database schema was created without any problems
49 * false otherwise.
50 */
51 static bool create_schema(void)
52 {
53 ib_tbl_sch_t schema= NULL;
54 ib_idx_sch_t dbindex= NULL;
55
56 if (ib_database_create("memcached") != IB_TRUE)
57 {
58 fprintf(stderr, "Failed to create database\n");
59 return false;
60 }
61
62 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
63 ib_id_t table_id;
64
65 checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
66 checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
67 IB_COL_NOT_NULL, 0, 32767));
68 checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
69 IB_COL_NONE, 0, 1024*1024));
70 checked(ib_table_schema_add_col(schema, "flags", IB_INT,
71 IB_COL_UNSIGNED, 0, 4));
72 checked(ib_table_schema_add_col(schema, "cas", IB_INT,
73 IB_COL_UNSIGNED, 0, 8));
74 checked(ib_table_schema_add_col(schema, "exp", IB_INT,
75 IB_COL_UNSIGNED, 0, 4));
76 checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
77 checked(ib_index_schema_add_col(dbindex, "key", 0));
78 checked(ib_index_schema_set_clustered(dbindex));
79 checked(ib_schema_lock_exclusive(transaction));
80 checked(ib_table_create(transaction, schema, &table_id));
81 checked(ib_trx_commit(transaction));
82 ib_table_schema_delete(schema);
83
84 return true;
85
86 error_exit:
87 /* @todo release resources! */
88 {
89 ib_err_t error= ib_trx_rollback(transaction);
90 if (error != DB_SUCCESS)
91 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
92 ib_strerror(error));
93 }
94 return false;
95 }
96
97 /**
98 * Store an item into the database. Update the CAS id on the item before
99 * storing it in the database.
100 *
101 * @param trx the transaction to use
102 * @param item the item to store
103 * @return true if we can go ahead and commit the transaction, false otherwise
104 */
105 static bool do_put_item(ib_trx_t trx, struct item* item)
106 {
107 update_cas(item);
108
109 ib_crsr_t cursor= NULL;
110 ib_tpl_t tuple= NULL;
111 bool retval= false;
112
113 checked(ib_cursor_open_table(tablename, trx, &cursor));
114 checked(ib_cursor_lock(cursor, IB_LOCK_X));
115 tuple= ib_clust_read_tuple_create(cursor);
116
117 checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
118 checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
119 checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
120 checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
121 checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
122 checked(ib_cursor_insert_row(cursor, tuple));
123
124 retval= true;
125 /* Release resources: */
126 /* FALLTHROUGH */
127
128 error_exit:
129 if (tuple != NULL)
130 ib_tuple_delete(tuple);
131
132 ib_err_t currsor_error;
133 if (cursor != NULL)
134 currsor_error= ib_cursor_close(cursor);
135
136 return retval;
137 }
138
139 /**
140 * Try to locate an item in the database. Return a cursor and the tuple to
141 * the item if I found it in the database.
142 *
143 * @param trx the transaction to use
144 * @param key the key of the item to look up
145 * @param nkey the size of the key
146 * @param cursor where to store the cursor (OUT)
147 * @param tuple where to store the tuple (OUT)
148 * @return true if I found the object, false otherwise
149 */
150 static bool do_locate_item(ib_trx_t trx,
151 const void* key,
152 size_t nkey,
153 ib_crsr_t *cursor)
154 {
155 int res;
156 ib_tpl_t tuple= NULL;
157
158 *cursor= NULL;
159
160 checked(ib_cursor_open_table(tablename, trx, cursor));
161 tuple= ib_clust_search_tuple_create(*cursor);
162 if (tuple == NULL)
163 {
164 fprintf(stderr, "Failed to allocate tuple object\n");
165 goto error_exit;
166 }
167
168 checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
169 ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
170
171 if (err == DB_SUCCESS && res == 0)
172 {
173 ib_tuple_delete(tuple);
174 return true;
175 }
176 else if (err != DB_SUCCESS &&
177 err != DB_RECORD_NOT_FOUND &&
178 err != DB_END_OF_INDEX)
179 {
180 fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
181 }
182 /* FALLTHROUGH */
183 error_exit:
184 if (tuple != NULL)
185 ib_tuple_delete(tuple);
186
187 ib_err_t cursor_error;
188 if (*cursor != NULL)
189 cursor_error= ib_cursor_close(*cursor);
190
191 *cursor= NULL;
192
193 return false;
194 }
195
196 /**
197 * Try to get an item from the database
198 *
199 * @param trx the transaction to use
200 * @param key the key to get
201 * @param nkey the lenght of the key
202 * @return a pointer to the item if I found it in the database
203 */
204 static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey)
205 {
206 ib_crsr_t cursor= NULL;
207 ib_tpl_t tuple= NULL;
208 struct item* retval= NULL;
209
210 if (do_locate_item(trx, key, nkey, &cursor))
211 {
212 tuple= ib_clust_read_tuple_create(cursor);
213 if (tuple == NULL)
214 {
215 fprintf(stderr, "Failed to create read tuple\n");
216 goto error_exit;
217 }
218 checked(ib_cursor_read_row(cursor, tuple));
219 ib_col_meta_t meta;
220 ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
221 ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
222 ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
223 ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
224 const void *dataptr= ib_col_get_value(tuple, data_col_idx);
225
226 retval= create_item(key, nkey, dataptr, datalen, 0, 0);
227 if (retval == NULL)
228 {
229 fprintf(stderr, "Failed to allocate memory\n");
230 goto error_exit;
231 }
232
233 if (flaglen != 0)
234 {
235 ib_u32_t val;
236 checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
237 retval->flags= (uint32_t)val;
238 }
239 if (caslen != 0)
240 {
241 ib_u64_t val;
242 checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
243 retval->cas= (uint64_t)val;
244 }
245 if (explen != 0)
246 {
247 ib_u32_t val;
248 checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
249 retval->exp= (time_t)val;
250 }
251 }
252
253 /* Release resources */
254 /* FALLTHROUGH */
255
256 error_exit:
257 if (tuple != NULL)
258 ib_tuple_delete(tuple);
259
260 ib_err_t cursor_error;
261 if (cursor != NULL)
262 cursor_error= ib_cursor_close(cursor);
263
264 return retval;
265 }
266
267 /**
268 * Delete an item from the cache
269 * @param trx the transaction to use
270 * @param key the key of the item to delete
271 * @param nkey the length of the key
272 * @return true if we should go ahead and commit the transaction
273 * or false if we should roll back (if the key didn't exists)
274 */
275 static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
276 ib_crsr_t cursor= NULL;
277 bool retval= false;
278
279 if (do_locate_item(trx, key, nkey, &cursor))
280 {
281 checked(ib_cursor_lock(cursor, IB_LOCK_X));
282 checked(ib_cursor_delete_row(cursor));
283 retval= true;
284 }
285 /* Release resources */
286 /* FALLTHROUGH */
287
288 error_exit:
289 if (cursor != NULL)
290 {
291 ib_err_t cursor_error;
292 cursor_error= ib_cursor_close(cursor);
293 }
294
295 return retval;
296 }
297
298
299 /****************************************************************************
300 * External interface
301 ***************************************************************************/
302
303 /**
304 * Initialize the database storage
305 * @return true if the database was initialized successfully, false otherwise
306 */
307 bool initialize_storage(void)
308 {
309 ib_err_t error;
310 ib_id_t tid;
311
312 checked(ib_init());
313 checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
314 checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
315 checked(ib_cfg_set_bool_on("file_per_table"));
316 checked(ib_startup("barracuda"));
317
318 /* check to see if the table exists or if we should create the schema */
319 error= ib_table_get_id(tablename, &tid);
320 if (error == DB_TABLE_NOT_FOUND)
321 {
322 if (!create_schema())
323 {
324 return false;
325 }
326 }
327 else if (error != DB_SUCCESS)
328 {
329 fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
330 return false;
331 }
332
333 return true;
334
335 error_exit:
336
337 return false;
338 }
339
340 /**
341 * Shut down this storage engine
342 */
343 void shutdown_storage(void)
344 {
345 checked(ib_shutdown(IB_SHUTDOWN_NORMAL));
346 error_exit:
347 ;
348 }
349
350 /**
351 * Store an item in the databse
352 *
353 * @param item the item to store
354 */
355 void put_item(struct item* item)
356 {
357 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
358 if (do_put_item(transaction, item))
359 {
360 ib_err_t error= ib_trx_commit(transaction);
361 if (error != DB_SUCCESS)
362 {
363 fprintf(stderr, "Failed to store key:\n\t%s\n",
364 ib_strerror(error));
365 }
366 }
367 else
368 {
369 ib_err_t error= ib_trx_rollback(transaction);
370 if (error != DB_SUCCESS)
371 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
372 ib_strerror(error));
373 }
374 }
375
376 /**
377 * Get an item from the engine
378 * @param key the key to grab
379 * @param nkey number of bytes in the key
380 * @return pointer to the item if found
381 */
382 struct item* get_item(const void* key, size_t nkey)
383 {
384 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
385 struct item* ret= do_get_item(transaction, key, nkey);
386 ib_err_t error= ib_trx_rollback(transaction);
387
388 if (error != DB_SUCCESS)
389 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
390 ib_strerror(error));
391
392 return ret;
393 }
394
395 /**
396 * Create an item structure and initialize it with the content
397 *
398 * @param key the key for the item
399 * @param nkey the number of bytes in the key
400 * @param data pointer to the value for the item (may be NULL)
401 * @param size the size of the data
402 * @param flags the flags to store with the data
403 * @param exp the expiry time for the item
404 * @return pointer to an initialized item object or NULL if allocation failed
405 */
406 struct item* create_item(const void* key, size_t nkey, const void* data,
407 size_t size, uint32_t flags, time_t exp)
408 {
409 struct item* ret= calloc(1, sizeof(*ret));
410 if (ret != NULL)
411 {
412 ret->key= malloc(nkey);
413 if (size > 0)
414 {
415 ret->data= malloc(size);
416 }
417
418 if (ret->key == NULL || (size > 0 && ret->data == NULL))
419 {
420 free(ret->key);
421 free(ret->data);
422 free(ret);
423 return NULL;
424 }
425
426 memcpy(ret->key, key, nkey);
427 if (data != NULL)
428 {
429 memcpy(ret->data, data, size);
430 }
431
432 ret->nkey= nkey;
433 ret->size= size;
434 ret->flags= flags;
435 ret->exp= exp;
436 }
437
438 return ret;
439 }
440
441 /**
442 * Delete an item from the cache
443 * @param key the key of the item to delete
444 * @param nkey the length of the key
445 * @return true if the item was deleted from the cache
446 */
447 bool delete_item(const void* key, size_t nkey) {
448 ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
449
450 bool ret= do_delete_item(transaction, key, nkey);
451
452 if (ret)
453 {
454 /* object found. commit transaction */
455 ib_err_t error= ib_trx_commit(transaction);
456 if (error != DB_SUCCESS)
457 {
458 fprintf(stderr, "Failed to delete key:\n\t%s\n",
459 ib_strerror(error));
460 ret= false;
461 }
462 }
463 else
464 {
465 ib_err_t error= ib_trx_rollback(transaction);
466 if (error != DB_SUCCESS)
467 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
468 ib_strerror(error));
469 }
470
471 return ret;
472 }
473
474 /**
475 * Flush the entire cache
476 * @param when when the cache should be flushed (0 == immediately)
477 */
478 void flush(uint32_t when __attribute__((unused)))
479 {
480 /* @TODO implement support for when != 0 */
481 ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
482 ib_crsr_t cursor= NULL;
483 ib_err_t err= DB_SUCCESS;
484
485 checked(ib_cursor_open_table(tablename, transaction, &cursor));
486 checked(ib_cursor_first(cursor));
487 checked(ib_cursor_lock(cursor, IB_LOCK_X));
488
489 do
490 {
491 checked(ib_cursor_delete_row(cursor));
492 } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
493
494 if (err != DB_END_OF_INDEX)
495 {
496 fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
497 goto error_exit;
498 }
499 ib_err_t cursor_error;
500 cursor_error= ib_cursor_close(cursor);
501 cursor= NULL;
502 checked(ib_trx_commit(transaction));
503 return;
504
505 error_exit:
506 if (cursor != NULL)
507 {
508 cursor_error= ib_cursor_close(cursor);
509 }
510
511 ib_err_t error= ib_trx_rollback(transaction);
512 if (error != DB_SUCCESS)
513 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
514 ib_strerror(error));
515 }
516
517 /**
518 * Update the cas ID in the item structure
519 * @param item the item to update
520 */
521 void update_cas(struct item* item)
522 {
523 item->cas= ++cas;
524 }
525
526 /**
527 * Release all the resources allocated by the item
528 * @param item the item to release
529 */
530 void release_item(struct item* item)
531 {
532 free(item->key);
533 free(item->data);
534 free(item);
535 }