Merge Trond
[m6w6/libmemcached] / example / storage_innodb.c
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
2 #include <stdlib.h>
3 #include <inttypes.h>
4 #include <time.h>
5 #include <stdbool.h>
6 #include <string.h>
7 #include <unistd.h>
8 #include <assert.h>
9 #include <embedded_innodb-1.0/innodb.h>
10
11 #include "storage.h"
12
13 const char *tablename= "memcached/items";
14
15 #define key_col_idx 0
16 #define data_col_idx 1
17 #define flags_col_idx 2
18 #define cas_col_idx 3
19 #define exp_col_idx 4
20
21 static uint64_t cas;
22
23 /**
24 * To avoid cluttering down all the code with error checking I use the
25 * following macro. It will execute the statement and verify that the
26 * result of the operation is DB_SUCCESS. If any other error code is
27 * returned it will print an "assert-like" output and jump to the
28 * label error_exit. There I release resources before returning out of
29 * the function.
30 *
31 * @param a the expression to execute
32 *
33 */
34 #define checked(expression) \
35 do { \
36 ib_err_t checked_err= expression; \
37 if (checked_err != DB_SUCCESS) \
38 { \
39 fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \
40 __FILE__, __LINE__, #expression, \
41 ib_strerror(checked_err)); \
42 goto error_exit; \
43 } \
44 } while (0);
45
46 /**
47 * Create the database schema.
48 * @return true if the database schema was created without any problems
49 * false otherwise.
50 */
51 static bool create_schema(void)
52 {
53 ib_tbl_sch_t schema= NULL;
54 ib_idx_sch_t dbindex= NULL;
55
56 if (ib_database_create("memcached") != IB_TRUE)
57 {
58 fprintf(stderr, "Failed to create database\n");
59 return false;
60 }
61
62 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
63 ib_id_t table_id;
64
65 checked(ib_table_schema_create(tablename, &schema, IB_TBL_COMPACT, 0));
66 checked(ib_table_schema_add_col(schema, "key", IB_BLOB,
67 IB_COL_NOT_NULL, 0, 32767));
68 checked(ib_table_schema_add_col(schema, "data", IB_BLOB,
69 IB_COL_NONE, 0, 1024*1024));
70 checked(ib_table_schema_add_col(schema, "flags", IB_INT,
71 IB_COL_UNSIGNED, 0, 4));
72 checked(ib_table_schema_add_col(schema, "cas", IB_INT,
73 IB_COL_UNSIGNED, 0, 8));
74 checked(ib_table_schema_add_col(schema, "exp", IB_INT,
75 IB_COL_UNSIGNED, 0, 4));
76 checked(ib_table_schema_add_index(schema, "PRIMARY_KEY", &dbindex));
77 checked(ib_index_schema_add_col(dbindex, "key", 0));
78 checked(ib_index_schema_set_clustered(dbindex));
79 checked(ib_schema_lock_exclusive(transaction));
80 checked(ib_table_create(transaction, schema, &table_id));
81 checked(ib_trx_commit(transaction));
82 ib_table_schema_delete(schema);
83
84 return true;
85
86 error_exit:
87 /* @todo release resources! */
88 {
89 ib_err_t error= ib_trx_rollback(transaction);
90 if (error != DB_SUCCESS)
91 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
92 ib_strerror(error));
93 }
94 return false;
95 }
96
97 /**
98 * Store an item into the database. Update the CAS id on the item before
99 * storing it in the database.
100 *
101 * @param trx the transaction to use
102 * @param item the item to store
103 * @return true if we can go ahead and commit the transaction, false otherwise
104 */
105 static bool do_put_item(ib_trx_t trx, struct item* item)
106 {
107 update_cas(item);
108
109 ib_crsr_t cursor= NULL;
110 ib_tpl_t tuple= NULL;
111 bool retval= false;
112
113 checked(ib_cursor_open_table(tablename, trx, &cursor));
114 checked(ib_cursor_lock(cursor, IB_LOCK_X));
115 tuple= ib_clust_read_tuple_create(cursor);
116
117 checked(ib_col_set_value(tuple, key_col_idx, item->key, item->nkey));
118 checked(ib_col_set_value(tuple, data_col_idx, item->data, item->size));
119 checked(ib_tuple_write_u32(tuple, flags_col_idx, item->flags));
120 checked(ib_tuple_write_u64(tuple, cas_col_idx, item->cas));
121 checked(ib_tuple_write_u32(tuple, exp_col_idx, (ib_u32_t)item->exp));
122 checked(ib_cursor_insert_row(cursor, tuple));
123
124 retval= true;
125 /* Release resources: */
126 /* FALLTHROUGH */
127
128 error_exit:
129 if (tuple != NULL)
130 ib_tuple_delete(tuple);
131
132 if (cursor != NULL)
133 ib_cursor_close(cursor);
134
135 return retval;
136 }
137
138 /**
139 * Try to locate an item in the database. Return a cursor and the tuple to
140 * the item if I found it in the database.
141 *
142 * @param trx the transaction to use
143 * @param key the key of the item to look up
144 * @param nkey the size of the key
145 * @param cursor where to store the cursor (OUT)
146 * @param tuple where to store the tuple (OUT)
147 * @return true if I found the object, false otherwise
148 */
149 static bool do_locate_item(ib_trx_t trx,
150 const void* key,
151 size_t nkey,
152 ib_crsr_t *cursor)
153 {
154 int res;
155 ib_tpl_t tuple= NULL;
156
157 *cursor= NULL;
158
159 checked(ib_cursor_open_table(tablename, trx, cursor));
160 tuple= ib_clust_search_tuple_create(*cursor);
161 if (tuple == NULL)
162 {
163 fprintf(stderr, "Failed to allocate tuple object\n");
164 goto error_exit;
165 }
166
167 checked(ib_col_set_value(tuple, key_col_idx, key, nkey));
168 ib_err_t err= ib_cursor_moveto(*cursor, tuple, IB_CUR_GE, &res);
169
170 if (err == DB_SUCCESS && res == 0)
171 {
172 ib_tuple_delete(tuple);
173 return true;
174 }
175 else if (err != DB_SUCCESS &&
176 err != DB_RECORD_NOT_FOUND &&
177 err != DB_END_OF_INDEX)
178 {
179 fprintf(stderr, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err));
180 }
181 /* FALLTHROUGH */
182 error_exit:
183 if (tuple != NULL)
184 ib_tuple_delete(tuple);
185 if (*cursor != NULL)
186 ib_cursor_close(*cursor);
187 *cursor= NULL;
188
189 return false;
190 }
191
192 /**
193 * Try to get an item from the database
194 *
195 * @param trx the transaction to use
196 * @param key the key to get
197 * @param nkey the lenght of the key
198 * @return a pointer to the item if I found it in the database
199 */
200 static struct item* do_get_item(ib_trx_t trx, const void* key, size_t nkey)
201 {
202 ib_crsr_t cursor= NULL;
203 ib_tpl_t tuple= NULL;
204 struct item* retval= NULL;
205
206 if (do_locate_item(trx, key, nkey, &cursor))
207 {
208 tuple= ib_clust_read_tuple_create(cursor);
209 if (tuple == NULL)
210 {
211 fprintf(stderr, "Failed to create read tuple\n");
212 goto error_exit;
213 }
214 checked(ib_cursor_read_row(cursor, tuple));
215 ib_col_meta_t meta;
216 ib_ulint_t datalen= ib_col_get_meta(tuple, data_col_idx, &meta);
217 ib_ulint_t flaglen= ib_col_get_meta(tuple, flags_col_idx, &meta);
218 ib_ulint_t caslen= ib_col_get_meta(tuple, cas_col_idx, &meta);
219 ib_ulint_t explen= ib_col_get_meta(tuple, exp_col_idx, &meta);
220 const void *dataptr= ib_col_get_value(tuple, data_col_idx);
221
222 retval= create_item(key, nkey, dataptr, datalen, 0, 0);
223 if (retval == NULL)
224 {
225 fprintf(stderr, "Failed to allocate memory\n");
226 goto error_exit;
227 }
228
229 if (flaglen != 0)
230 {
231 ib_u32_t val;
232 checked(ib_tuple_read_u32(tuple, flags_col_idx, &val));
233 retval->flags= (uint32_t)val;
234 }
235 if (caslen != 0)
236 {
237 ib_u64_t val;
238 checked(ib_tuple_read_u64(tuple, cas_col_idx, &val));
239 retval->cas= (uint64_t)val;
240 }
241 if (explen != 0)
242 {
243 ib_u32_t val;
244 checked(ib_tuple_read_u32(tuple, exp_col_idx, &val));
245 retval->exp= (time_t)val;
246 }
247 }
248
249 /* Release resources */
250 /* FALLTHROUGH */
251
252 error_exit:
253 if (tuple != NULL)
254 ib_tuple_delete(tuple);
255
256 if (cursor != NULL)
257 ib_cursor_close(cursor);
258
259 return retval;
260 }
261
262 /**
263 * Delete an item from the cache
264 * @param trx the transaction to use
265 * @param key the key of the item to delete
266 * @param nkey the length of the key
267 * @return true if we should go ahead and commit the transaction
268 * or false if we should roll back (if the key didn't exists)
269 */
270 static bool do_delete_item(ib_trx_t trx, const void* key, size_t nkey) {
271 ib_crsr_t cursor= NULL;
272 bool retval= false;
273
274 if (do_locate_item(trx, key, nkey, &cursor))
275 {
276 checked(ib_cursor_lock(cursor, IB_LOCK_X));
277 checked(ib_cursor_delete_row(cursor));
278 retval= true;
279 }
280 /* Release resources */
281 /* FALLTHROUGH */
282
283 error_exit:
284 if (cursor != NULL)
285 ib_cursor_close(cursor);
286
287 return retval;
288 }
289
290
291 /****************************************************************************
292 * External interface
293 ***************************************************************************/
294
295 /**
296 * Initialize the database storage
297 * @return true if the database was initialized successfully, false otherwise
298 */
299 bool initialize_storage(void)
300 {
301 ib_err_t error;
302 ib_id_t tid;
303
304 checked(ib_init());
305 checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
306 checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
307 checked(ib_cfg_set_bool_on("file_per_table"));
308 checked(ib_startup("barracuda"));
309
310 /* check to see if the table exists or if we should create the schema */
311 error= ib_table_get_id(tablename, &tid);
312 if (error == DB_TABLE_NOT_FOUND)
313 {
314 if (!create_schema())
315 {
316 return false;
317 }
318 }
319 else if (error != DB_SUCCESS)
320 {
321 fprintf(stderr, "Failed to get table id: %s\n", ib_strerror(error));
322 return false;
323 }
324
325 return true;
326
327 error_exit:
328 return false;
329 }
330
331 /**
332 * Shut down this storage engine
333 */
334 void shutdown_storage(void)
335 {
336 checked(ib_shutdown());
337 error_exit:
338 ;
339 }
340
341 /**
342 * Store an item in the databse
343 *
344 * @param item the item to store
345 */
346 void put_item(struct item* item)
347 {
348 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
349 if (do_put_item(transaction, item))
350 {
351 ib_err_t error= ib_trx_commit(transaction);
352 if (error != DB_SUCCESS)
353 {
354 fprintf(stderr, "Failed to store key:\n\t%s\n",
355 ib_strerror(error));
356 }
357 }
358 else
359 {
360 ib_err_t error= ib_trx_rollback(transaction);
361 if (error != DB_SUCCESS)
362 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
363 ib_strerror(error));
364 }
365 }
366
367 /**
368 * Get an item from the engine
369 * @param key the key to grab
370 * @param nkey number of bytes in the key
371 * @return pointer to the item if found
372 */
373 struct item* get_item(const void* key, size_t nkey)
374 {
375 ib_trx_t transaction= ib_trx_begin(IB_TRX_SERIALIZABLE);
376 struct item* ret= do_get_item(transaction, key, nkey);
377 ib_err_t error= ib_trx_rollback(transaction);
378
379 if (error != DB_SUCCESS)
380 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
381 ib_strerror(error));
382
383 return ret;
384 }
385
386 /**
387 * Create an item structure and initialize it with the content
388 *
389 * @param key the key for the item
390 * @param nkey the number of bytes in the key
391 * @param data pointer to the value for the item (may be NULL)
392 * @param size the size of the data
393 * @param flags the flags to store with the data
394 * @param exp the expiry time for the item
395 * @return pointer to an initialized item object or NULL if allocation failed
396 */
397 struct item* create_item(const void* key, size_t nkey, const void* data,
398 size_t size, uint32_t flags, time_t exp)
399 {
400 struct item* ret= calloc(1, sizeof(*ret));
401 if (ret != NULL)
402 {
403 ret->key= malloc(nkey);
404 if (size > 0)
405 {
406 ret->data= malloc(size);
407 }
408
409 if (ret->key == NULL || (size > 0 && ret->data == NULL))
410 {
411 free(ret->key);
412 free(ret->data);
413 free(ret);
414 return NULL;
415 }
416
417 memcpy(ret->key, key, nkey);
418 if (data != NULL)
419 {
420 memcpy(ret->data, data, size);
421 }
422
423 ret->nkey= nkey;
424 ret->size= size;
425 ret->flags= flags;
426 ret->exp= exp;
427 }
428
429 return ret;
430 }
431
432 /**
433 * Delete an item from the cache
434 * @param key the key of the item to delete
435 * @param nkey the length of the key
436 * @return true if the item was deleted from the cache
437 */
438 bool delete_item(const void* key, size_t nkey) {
439 ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
440
441 bool ret= do_delete_item(transaction, key, nkey);
442
443 if (ret)
444 {
445 /* object found. commit transaction */
446 ib_err_t error= ib_trx_commit(transaction);
447 if (error != DB_SUCCESS)
448 {
449 fprintf(stderr, "Failed to delete key:\n\t%s\n",
450 ib_strerror(error));
451 ret= false;
452 }
453 }
454 else
455 {
456 ib_err_t error= ib_trx_rollback(transaction);
457 if (error != DB_SUCCESS)
458 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
459 ib_strerror(error));
460 }
461
462 return ret;
463 }
464
465 /**
466 * Flush the entire cache
467 * @param when when the cache should be flushed (0 == immediately)
468 */
469 void flush(uint32_t when __attribute__((unused)))
470 {
471 /* @TODO implement support for when != 0 */
472 ib_trx_t transaction= ib_trx_begin(IB_TRX_REPEATABLE_READ);
473 ib_crsr_t cursor= NULL;
474 ib_err_t err= DB_SUCCESS;
475
476 checked(ib_cursor_open_table(tablename, transaction, &cursor));
477 checked(ib_cursor_first(cursor));
478 checked(ib_cursor_lock(cursor, IB_LOCK_X));
479
480 do
481 {
482 checked(ib_cursor_delete_row(cursor));
483 } while ((err= ib_cursor_next(cursor)) == DB_SUCCESS);
484
485 if (err != DB_END_OF_INDEX)
486 {
487 fprintf(stderr, "Failed to flush the cache: %s\n", ib_strerror(err));
488 goto error_exit;
489 }
490 ib_cursor_close(cursor);
491 cursor= NULL;
492 checked(ib_trx_commit(transaction));
493 return;
494
495 error_exit:
496 if (cursor != NULL)
497 ib_cursor_close(cursor);
498
499 ib_err_t error= ib_trx_rollback(transaction);
500 if (error != DB_SUCCESS)
501 fprintf(stderr, "Failed to roll back the transaction:\n\t%s\n",
502 ib_strerror(error));
503 }
504
505 /**
506 * Update the cas ID in the item structure
507 * @param item the item to update
508 */
509 void update_cas(struct item* item)
510 {
511 item->cas= ++cas;
512 }
513
514 /**
515 * Release all the resources allocated by the item
516 * @param item the item to release
517 */
518 void release_item(struct item* item)
519 {
520 free(item->key);
521 free(item->data);
522 free(item);
523 }