e3113f17fdb7c6d004b45f23226e67fe539ea288
1 /* -*- Mode: C; tab-width: 2; c-basic-offset: 2; indent-tabs-mode: nil -*- */
9 #include <embedded_innodb-1.0/innodb.h>
13 const char *tablename
= "memcached/items";
16 #define data_col_idx 1
17 #define flags_col_idx 2
24 * To avoid cluttering down all the code with error checking I use the
25 * following macro. It will execute the statement and verify that the
26 * result of the operation is DB_SUCCESS. If any other error code is
27 * returned it will print an "assert-like" output and jump to the
28 * label error_exit. There I release resources before returning out of
31 * @param a the expression to execute
34 #define checked(expression) \
36 ib_err_t checked_err= expression; \
37 if (checked_err != DB_SUCCESS) \
39 fprintf(stderr, "ERROR: %s at %u: Failed: <%s>\n\t%s\n", \
40 __FILE__, __LINE__, #expression, \
41 ib_strerror(checked_err)); \
47 * Create the database schema.
48 * @return true if the database schema was created without any problems
51 static bool create_schema(void)
53 ib_tbl_sch_t schema
= NULL
;
54 ib_idx_sch_t dbindex
= NULL
;
56 if (ib_database_create("memcached") != IB_TRUE
)
58 fprintf(stderr
, "Failed to create database\n");
62 ib_trx_t transaction
= ib_trx_begin(IB_TRX_SERIALIZABLE
);
65 checked(ib_table_schema_create(tablename
, &schema
, IB_TBL_COMPACT
, 0));
66 checked(ib_table_schema_add_col(schema
, "key", IB_BLOB
,
67 IB_COL_NOT_NULL
, 0, 32767));
68 checked(ib_table_schema_add_col(schema
, "data", IB_BLOB
,
69 IB_COL_NONE
, 0, 1024*1024));
70 checked(ib_table_schema_add_col(schema
, "flags", IB_INT
,
71 IB_COL_UNSIGNED
, 0, 4));
72 checked(ib_table_schema_add_col(schema
, "cas", IB_INT
,
73 IB_COL_UNSIGNED
, 0, 8));
74 checked(ib_table_schema_add_col(schema
, "exp", IB_INT
,
75 IB_COL_UNSIGNED
, 0, 4));
76 checked(ib_table_schema_add_index(schema
, "PRIMARY_KEY", &dbindex
));
77 checked(ib_index_schema_add_col(dbindex
, "key", 0));
78 checked(ib_index_schema_set_clustered(dbindex
));
79 checked(ib_schema_lock_exclusive(transaction
));
80 checked(ib_table_create(transaction
, schema
, &table_id
));
81 checked(ib_trx_commit(transaction
));
82 ib_table_schema_delete(schema
);
87 /* @todo release resources! */
89 ib_err_t error
= ib_trx_rollback(transaction
);
90 if (error
!= DB_SUCCESS
)
91 fprintf(stderr
, "Failed to roll back the transaction:\n\t%s\n",
98 * Store an item into the database. Update the CAS id on the item before
99 * storing it in the database.
101 * @param trx the transaction to use
102 * @param item the item to store
103 * @return true if we can go ahead and commit the transaction, false otherwise
105 static bool do_put_item(ib_trx_t trx
, struct item
* item
)
109 ib_crsr_t cursor
= NULL
;
110 ib_tpl_t tuple
= NULL
;
113 checked(ib_cursor_open_table(tablename
, trx
, &cursor
));
114 checked(ib_cursor_lock(cursor
, IB_LOCK_X
));
115 tuple
= ib_clust_read_tuple_create(cursor
);
117 checked(ib_col_set_value(tuple
, key_col_idx
, item
->key
, item
->nkey
));
118 checked(ib_col_set_value(tuple
, data_col_idx
, item
->data
, item
->size
));
119 checked(ib_tuple_write_u32(tuple
, flags_col_idx
, item
->flags
));
120 checked(ib_tuple_write_u64(tuple
, cas_col_idx
, item
->cas
));
121 checked(ib_tuple_write_u32(tuple
, exp_col_idx
, (ib_u32_t
)item
->exp
));
122 checked(ib_cursor_insert_row(cursor
, tuple
));
125 /* Release resources: */
130 ib_tuple_delete(tuple
);
132 ib_err_t currsor_error
;
134 currsor_error
= ib_cursor_close(cursor
);
140 * Try to locate an item in the database. Return a cursor and the tuple to
141 * the item if I found it in the database.
143 * @param trx the transaction to use
144 * @param key the key of the item to look up
145 * @param nkey the size of the key
146 * @param cursor where to store the cursor (OUT)
147 * @param tuple where to store the tuple (OUT)
148 * @return true if I found the object, false otherwise
150 static bool do_locate_item(ib_trx_t trx
,
156 ib_tpl_t tuple
= NULL
;
160 checked(ib_cursor_open_table(tablename
, trx
, cursor
));
161 tuple
= ib_clust_search_tuple_create(*cursor
);
164 fprintf(stderr
, "Failed to allocate tuple object\n");
168 checked(ib_col_set_value(tuple
, key_col_idx
, key
, nkey
));
169 ib_err_t err
= ib_cursor_moveto(*cursor
, tuple
, IB_CUR_GE
, &res
);
171 if (err
== DB_SUCCESS
&& res
== 0)
173 ib_tuple_delete(tuple
);
176 else if (err
!= DB_SUCCESS
&&
177 err
!= DB_RECORD_NOT_FOUND
&&
178 err
!= DB_END_OF_INDEX
)
180 fprintf(stderr
, "ERROR: ib_cursor_moveto(): %s\n", ib_strerror(err
));
185 ib_tuple_delete(tuple
);
187 ib_err_t cursor_error
;
189 cursor_error
= ib_cursor_close(*cursor
);
197 * Try to get an item from the database
199 * @param trx the transaction to use
200 * @param key the key to get
201 * @param nkey the lenght of the key
202 * @return a pointer to the item if I found it in the database
204 static struct item
* do_get_item(ib_trx_t trx
, const void* key
, size_t nkey
)
206 ib_crsr_t cursor
= NULL
;
207 ib_tpl_t tuple
= NULL
;
208 struct item
* retval
= NULL
;
210 if (do_locate_item(trx
, key
, nkey
, &cursor
))
212 tuple
= ib_clust_read_tuple_create(cursor
);
215 fprintf(stderr
, "Failed to create read tuple\n");
218 checked(ib_cursor_read_row(cursor
, tuple
));
220 ib_ulint_t datalen
= ib_col_get_meta(tuple
, data_col_idx
, &meta
);
221 ib_ulint_t flaglen
= ib_col_get_meta(tuple
, flags_col_idx
, &meta
);
222 ib_ulint_t caslen
= ib_col_get_meta(tuple
, cas_col_idx
, &meta
);
223 ib_ulint_t explen
= ib_col_get_meta(tuple
, exp_col_idx
, &meta
);
224 const void *dataptr
= ib_col_get_value(tuple
, data_col_idx
);
226 retval
= create_item(key
, nkey
, dataptr
, datalen
, 0, 0);
229 fprintf(stderr
, "Failed to allocate memory\n");
236 checked(ib_tuple_read_u32(tuple
, flags_col_idx
, &val
));
237 retval
->flags
= (uint32_t)val
;
242 checked(ib_tuple_read_u64(tuple
, cas_col_idx
, &val
));
243 retval
->cas
= (uint64_t)val
;
248 checked(ib_tuple_read_u32(tuple
, exp_col_idx
, &val
));
249 retval
->exp
= (time_t)val
;
253 /* Release resources */
258 ib_tuple_delete(tuple
);
260 ib_err_t cursor_error
;
262 cursor_error
= ib_cursor_close(cursor
);
268 * Delete an item from the cache
269 * @param trx the transaction to use
270 * @param key the key of the item to delete
271 * @param nkey the length of the key
272 * @return true if we should go ahead and commit the transaction
273 * or false if we should roll back (if the key didn't exists)
275 static bool do_delete_item(ib_trx_t trx
, const void* key
, size_t nkey
) {
276 ib_crsr_t cursor
= NULL
;
279 if (do_locate_item(trx
, key
, nkey
, &cursor
))
281 checked(ib_cursor_lock(cursor
, IB_LOCK_X
));
282 checked(ib_cursor_delete_row(cursor
));
285 /* Release resources */
291 ib_err_t cursor_error
;
292 cursor_error
= ib_cursor_close(cursor
);
299 /****************************************************************************
301 ***************************************************************************/
304 * Initialize the database storage
305 * @return true if the database was initialized successfully, false otherwise
307 bool initialize_storage(void)
313 checked(ib_cfg_set_text("data_home_dir", "/tmp/memcached_light"));
314 checked(ib_cfg_set_text("log_group_home_dir", "/tmp/memcached_light"));
315 checked(ib_cfg_set_bool_on("file_per_table"));
316 checked(ib_startup("barracuda"));
318 /* check to see if the table exists or if we should create the schema */
319 error
= ib_table_get_id(tablename
, &tid
);
320 if (error
== DB_TABLE_NOT_FOUND
)
322 if (!create_schema())
327 else if (error
!= DB_SUCCESS
)
329 fprintf(stderr
, "Failed to get table id: %s\n", ib_strerror(error
));
341 * Shut down this storage engine
343 void shutdown_storage(void)
345 checked(ib_shutdown(IB_SHUTDOWN_NORMAL
));
351 * Store an item in the databse
353 * @param item the item to store
355 void put_item(struct item
* item
)
357 ib_trx_t transaction
= ib_trx_begin(IB_TRX_SERIALIZABLE
);
358 if (do_put_item(transaction
, item
))
360 ib_err_t error
= ib_trx_commit(transaction
);
361 if (error
!= DB_SUCCESS
)
363 fprintf(stderr
, "Failed to store key:\n\t%s\n",
369 ib_err_t error
= ib_trx_rollback(transaction
);
370 if (error
!= DB_SUCCESS
)
371 fprintf(stderr
, "Failed to roll back the transaction:\n\t%s\n",
377 * Get an item from the engine
378 * @param key the key to grab
379 * @param nkey number of bytes in the key
380 * @return pointer to the item if found
382 struct item
* get_item(const void* key
, size_t nkey
)
384 ib_trx_t transaction
= ib_trx_begin(IB_TRX_SERIALIZABLE
);
385 struct item
* ret
= do_get_item(transaction
, key
, nkey
);
386 ib_err_t error
= ib_trx_rollback(transaction
);
388 if (error
!= DB_SUCCESS
)
389 fprintf(stderr
, "Failed to roll back the transaction:\n\t%s\n",
396 * Create an item structure and initialize it with the content
398 * @param key the key for the item
399 * @param nkey the number of bytes in the key
400 * @param data pointer to the value for the item (may be NULL)
401 * @param size the size of the data
402 * @param flags the flags to store with the data
403 * @param exp the expiry time for the item
404 * @return pointer to an initialized item object or NULL if allocation failed
406 struct item
* create_item(const void* key
, size_t nkey
, const void* data
,
407 size_t size
, uint32_t flags
, time_t exp
)
409 struct item
* ret
= calloc(1, sizeof(*ret
));
412 ret
->key
= malloc(nkey
);
415 ret
->data
= malloc(size
);
418 if (ret
->key
== NULL
|| (size
> 0 && ret
->data
== NULL
))
426 memcpy(ret
->key
, key
, nkey
);
429 memcpy(ret
->data
, data
, size
);
442 * Delete an item from the cache
443 * @param key the key of the item to delete
444 * @param nkey the length of the key
445 * @return true if the item was deleted from the cache
447 bool delete_item(const void* key
, size_t nkey
) {
448 ib_trx_t transaction
= ib_trx_begin(IB_TRX_REPEATABLE_READ
);
450 bool ret
= do_delete_item(transaction
, key
, nkey
);
454 /* object found. commit transaction */
455 ib_err_t error
= ib_trx_commit(transaction
);
456 if (error
!= DB_SUCCESS
)
458 fprintf(stderr
, "Failed to delete key:\n\t%s\n",
465 ib_err_t error
= ib_trx_rollback(transaction
);
466 if (error
!= DB_SUCCESS
)
467 fprintf(stderr
, "Failed to roll back the transaction:\n\t%s\n",
475 * Flush the entire cache
476 * @param when when the cache should be flushed (0 == immediately)
478 void flush(uint32_t when
__attribute__((unused
)))
480 /* @TODO implement support for when != 0 */
481 ib_trx_t transaction
= ib_trx_begin(IB_TRX_REPEATABLE_READ
);
482 ib_crsr_t cursor
= NULL
;
483 ib_err_t err
= DB_SUCCESS
;
485 checked(ib_cursor_open_table(tablename
, transaction
, &cursor
));
486 checked(ib_cursor_first(cursor
));
487 checked(ib_cursor_lock(cursor
, IB_LOCK_X
));
491 checked(ib_cursor_delete_row(cursor
));
492 } while ((err
= ib_cursor_next(cursor
)) == DB_SUCCESS
);
494 if (err
!= DB_END_OF_INDEX
)
496 fprintf(stderr
, "Failed to flush the cache: %s\n", ib_strerror(err
));
499 ib_err_t cursor_error
;
500 cursor_error
= ib_cursor_close(cursor
);
502 checked(ib_trx_commit(transaction
));
508 cursor_error
= ib_cursor_close(cursor
);
511 ib_err_t error
= ib_trx_rollback(transaction
);
512 if (error
!= DB_SUCCESS
)
513 fprintf(stderr
, "Failed to roll back the transaction:\n\t%s\n",
518 * Update the cas ID in the item structure
519 * @param item the item to update
521 void update_cas(struct item
* item
)
527 * Release all the resources allocated by the item
528 * @param item the item to release
530 void release_item(struct item
* item
)