From: Brian Aker Date: Sun, 27 Mar 2011 23:50:09 +0000 (-0700) Subject: Merge in new protocol interface bits. X-Git-Tag: 0.51~15^2~65 X-Git-Url: https://git.m6w6.name/?a=commitdiff_plain;h=2df9084da8c73d8813f8740e88cd70fe63dd742c;p=m6w6%2Flibmemcached Merge in new protocol interface bits. --- diff --git a/clients/ms_conn.c b/clients/ms_conn.c index e7ee3f12..f380a667 100644 --- a/clients/ms_conn.c +++ b/clients/ms_conn.c @@ -3288,7 +3288,7 @@ static void ms_add_bin_header(ms_conn_t *c, header->request.extlen= (uint8_t)hdr_len; header->request.datatype= (uint8_t)PROTOCOL_BINARY_RAW_BYTES; - header->request.reserved= 0; + header->request.vbucket= 0; header->request.bodylen= htonl(body_len); header->request.opaque= 0; diff --git a/example/interface_v0.c b/example/interface_v0.c index 3ca4daff..74a98eaf 100644 --- a/example/interface_v0.c +++ b/example/interface_v0.c @@ -52,7 +52,7 @@ static protocol_binary_response_status quit_command_handler(const void *cookie, response_handler(cookie, header, (void*)&response); /* I need a better way to signal to close the connection */ - return PROTOCOL_BINARY_RESPONSE_EIO; + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } static protocol_binary_response_status get_command_handler(const void *cookie, diff --git a/libmemcached/include.am b/libmemcached/include.am index de354b4c..15139365 100644 --- a/libmemcached/include.am +++ b/libmemcached/include.am @@ -45,6 +45,7 @@ nobase_include_HEADERS+= \ libmemcached/memcached.h \ libmemcached/memcached.hpp \ libmemcached/memcached/protocol_binary.h \ + libmemcached/memcached/vbucket.h \ libmemcached/options.h \ libmemcached/parse.h \ libmemcached/prefix_key.h \ diff --git a/libmemcached/memcached/protocol_binary.h b/libmemcached/memcached/protocol_binary.h index ecdc52e3..4509bbcb 100644 --- a/libmemcached/memcached/protocol_binary.h +++ b/libmemcached/memcached/protocol_binary.h @@ -1,3 +1,4 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* * Copyright (c) <2008>, Sun Microsystems, Inc. * All rights reserved. @@ -35,6 +36,8 @@ #ifndef PROTOCOL_BINARY_H #define PROTOCOL_BINARY_H +#include "vbucket.h" + /** * \addtogroup Protocol * @{ @@ -72,14 +75,15 @@ extern "C" PROTOCOL_BINARY_RESPONSE_EINVAL = 0x04, PROTOCOL_BINARY_RESPONSE_NOT_STORED = 0x05, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL = 0x06, + PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET = 0x07, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR = 0x20, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE = 0x21, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND = 0x81, PROTOCOL_BINARY_RESPONSE_ENOMEM = 0x82, - - PROTOCOL_BINARY_RESPONSE_PAUSE = 0xfe00, - PROTOCOL_BINARY_RESPONSE_EIO = 0xff00 - + PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED = 0x83, + PROTOCOL_BINARY_RESPONSE_EINTERNAL = 0x84, + PROTOCOL_BINARY_RESPONSE_EBUSY = 0x85, + PROTOCOL_BINARY_RESPONSE_ETMPFAIL = 0x86 } protocol_binary_response_status; /** @@ -114,6 +118,10 @@ extern "C" PROTOCOL_BINARY_CMD_FLUSHQ = 0x18, PROTOCOL_BINARY_CMD_APPENDQ = 0x19, PROTOCOL_BINARY_CMD_PREPENDQ = 0x1a, + PROTOCOL_BINARY_CMD_VERBOSITY = 0x1b, + PROTOCOL_BINARY_CMD_TOUCH = 0x1c, + PROTOCOL_BINARY_CMD_GAT = 0x1d, + PROTOCOL_BINARY_CMD_GATQ = 0x1e, PROTOCOL_BINARY_CMD_SASL_LIST_MECHS = 0x20, PROTOCOL_BINARY_CMD_SASL_AUTH = 0x21, @@ -135,9 +143,30 @@ extern "C" PROTOCOL_BINARY_CMD_RINCR = 0x39, PROTOCOL_BINARY_CMD_RINCRQ = 0x3a, PROTOCOL_BINARY_CMD_RDECR = 0x3b, - PROTOCOL_BINARY_CMD_RDECRQ = 0x3c + PROTOCOL_BINARY_CMD_RDECRQ = 0x3c, /* End Range operations */ + /* VBucket commands */ + PROTOCOL_BINARY_CMD_SET_VBUCKET = 0x3d, + PROTOCOL_BINARY_CMD_GET_VBUCKET = 0x3e, + PROTOCOL_BINARY_CMD_DEL_VBUCKET = 0x3f, + /* End VBucket commands */ + + /* TAP commands */ + PROTOCOL_BINARY_CMD_TAP_CONNECT = 0x40, + PROTOCOL_BINARY_CMD_TAP_MUTATION = 0x41, + PROTOCOL_BINARY_CMD_TAP_DELETE = 0x42, + PROTOCOL_BINARY_CMD_TAP_FLUSH = 0x43, + PROTOCOL_BINARY_CMD_TAP_OPAQUE = 0x44, + PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET = 0x45, + PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START = 0x46, + PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END = 0x47, + /* End TAP */ + + PROTOCOL_BINARY_CMD_LAST_RESERVED = 0xef, + + /* Scrub the data */ + PROTOCOL_BINARY_CMD_SCRUB = 0xf0 } protocol_binary_command; /** @@ -159,7 +188,7 @@ extern "C" uint16_t keylen; uint8_t extlen; uint8_t datatype; - uint16_t reserved; + uint16_t vbucket; uint32_t bodylen; uint32_t opaque; uint64_t cas; @@ -390,6 +419,64 @@ extern "C" */ typedef protocol_binary_response_no_extras protocol_binary_response_stats; + /** + * Definition of the packet used by the verbosity command + */ + typedef union { + struct { + protocol_binary_request_header header; + struct { + uint32_t level; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 4]; + } protocol_binary_request_verbosity; + + /** + * Definition of the packet returned from the verbosity command + */ + typedef protocol_binary_response_no_extras protocol_binary_response_verbosity; + + /** + * Definition of the packet used by the touch command. + */ + typedef union { + struct { + protocol_binary_request_header header; + struct { + uint32_t expiration; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 4]; + } protocol_binary_request_touch; + + /** + * Definition of the packet returned from the touch command + */ + typedef protocol_binary_response_no_extras protocol_binary_response_touch; + + /** + * Definition of the packet used by the GAT(Q) command. + */ + typedef union { + struct { + protocol_binary_request_header header; + struct { + uint32_t expiration; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 4]; + } protocol_binary_request_gat; + + typedef protocol_binary_request_gat protocol_binary_request_gatq; + + /** + * Definition of the packet returned from the GAT(Q) + */ + typedef protocol_binary_response_get protocol_binary_response_gat; + typedef protocol_binary_response_get protocol_binary_response_gatq; + + /** * Definition of a request for a range operation. * See http://code.google.com/p/memcached/wiki/RangeOps @@ -425,6 +512,209 @@ extern "C" typedef protocol_binary_request_rangeop protocol_binary_request_rdecr; typedef protocol_binary_request_rangeop protocol_binary_request_rdecrq; + + /** + * Definition of tap commands + * See To be written + * + */ + + typedef union { + struct { + protocol_binary_request_header header; + struct { + /** + * flags is a bitmask used to set properties for the + * the connection. Please In order to be forward compatible + * you should set all undefined bits to 0. + * + * If the bit require extra userdata, it will be stored + * in the user-data field of the body (passed to the engine + * as enginespeciffic). That means that when you parse the + * flags and the engine-specific data, you have to work your + * way from bit 0 and upwards to find the correct offset for + * the data. + * + */ + uint32_t flags; + + /** + * Backfill age + * + * By using this flag you can limit the amount of data being + * transmitted. If you don't specify a backfill age, the + * server will transmit everything it contains. + * + * The first 8 bytes in the engine specific data contains + * the oldest entry (from epoc) you're interested in. + * Specifying a time in the future (for the server you are + * connecting to), will cause it to start streaming current + * changes. + */ +#define TAP_CONNECT_FLAG_BACKFILL 0x01 + /** + * Dump will cause the server to send the data stored on the + * server, but disconnect when the keys stored in the server + * are transmitted. + */ +#define TAP_CONNECT_FLAG_DUMP 0x02 + /** + * The body contains a list of 16 bits words in network byte + * order specifying the vbucket ids to monitor. The first 16 + * bit word contains the number of buckets. The number of 0 + * means "all buckets" + */ +#define TAP_CONNECT_FLAG_LIST_VBUCKETS 0x04 + /** + * The responsibility of the vbuckets is to be transferred + * over to the caller when all items are transferred. + */ +#define TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS 0x08 + /** + * The tap consumer supports ack'ing of tap messages + */ +#define TAP_CONNECT_SUPPORT_ACK 0x10 + /** + * The tap consumer would prefer to just get the keys + * back. If the engine supports this it will set + * the TAP_FLAG_NO_VALUE flag in each of the + * tap packets returned. + */ +#define TAP_CONNECT_REQUEST_KEYS_ONLY 0x20 + /** + * The body contains a list of (vbucket_id, last_checkpoint_id) + * pairs. This provides the checkpoint support in TAP streams. + * The last checkpoint id represents the last checkpoint that + * was successfully persisted. + */ +#define TAP_CONNECT_CHECKPOINT 0x40 + /** + * The tap consumer is a registered tap client, which means that + * the tap server will maintain its checkpoint cursor permanently. + */ +#define TAP_CONNECT_REGISTERED_CLIENT 0x80 + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 4]; + } protocol_binary_request_tap_connect; + + typedef union { + struct { + protocol_binary_request_header header; + struct { + struct { + uint16_t enginespecific_length; + /* + * The flag section support the following flags + */ + /** + * Request that the consumer send a response packet + * for this packet. The opaque field must be preserved + * in the response. + */ +#define TAP_FLAG_ACK 0x01 + /** + * The value for the key is not included in the packet + */ +#define TAP_FLAG_NO_VALUE 0x02 + uint16_t flags; + uint8_t ttl; + uint8_t res1; + uint8_t res2; + uint8_t res3; + } tap; + struct { + uint32_t flags; + uint32_t expiration; + } item; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 16]; + } protocol_binary_request_tap_mutation; + + typedef union { + struct { + protocol_binary_request_header header; + struct { + struct { + uint16_t enginespecific_length; + /** + * See the definition of the flags for + * protocol_binary_request_tap_mutation for a description + * of the available flags. + */ + uint16_t flags; + uint8_t ttl; + uint8_t res1; + uint8_t res2; + uint8_t res3; + } tap; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + 8]; + } protocol_binary_request_tap_no_extras; + + typedef protocol_binary_request_tap_no_extras protocol_binary_request_tap_delete; + typedef protocol_binary_request_tap_no_extras protocol_binary_request_tap_flush; + typedef protocol_binary_request_tap_no_extras protocol_binary_request_tap_opaque; + typedef protocol_binary_request_tap_no_extras protocol_binary_request_tap_vbucket_set; + + + /** + * Definition of the packet used by the scrub. + */ + typedef protocol_binary_request_no_extras protocol_binary_request_scrub; + + /** + * Definition of the packet returned from scrub. + */ + typedef protocol_binary_response_no_extras protocol_binary_response_scrub; + + + /** + * Definition of the packet used by set vbucket + */ + typedef union { + struct { + protocol_binary_request_header header; + struct { + vbucket_state_t state; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_request_header) + sizeof(vbucket_state_t)]; + } protocol_binary_request_set_vbucket; + /** + * Definition of the packet returned from set vbucket + */ + typedef protocol_binary_response_no_extras protocol_binary_response_set_vbucket; + /** + * Definition of the packet used by del vbucket + */ + typedef protocol_binary_request_no_extras protocol_binary_request_del_vbucket; + /** + * Definition of the packet returned from del vbucket + */ + typedef protocol_binary_response_no_extras protocol_binary_response_del_vbucket; + + /** + * Definition of the packet used by get vbucket + */ + typedef protocol_binary_request_no_extras protocol_binary_request_get_vbucket; + + /** + * Definition of the packet returned from get vbucket + */ + typedef union { + struct { + protocol_binary_response_header header; + struct { + vbucket_state_t state; + } body; + } message; + uint8_t bytes[sizeof(protocol_binary_response_header) + sizeof(vbucket_state_t)]; + } protocol_binary_response_get_vbucket; + + /** * @} */ diff --git a/libmemcached/memcached/vbucket.h b/libmemcached/memcached/vbucket.h new file mode 100644 index 00000000..c6c4d8ca --- /dev/null +++ b/libmemcached/memcached/vbucket.h @@ -0,0 +1,26 @@ +/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ +#ifndef MEMCACHED_VBUCKET_H +#define MEMCACHED_VBUCKET_H 1 + +#ifdef __cplusplus +extern "C" +{ +#endif + +typedef enum { + vbucket_state_active = 1, /**< Actively servicing a vbucket. */ + vbucket_state_replica, /**< Servicing a vbucket as a replica only. */ + vbucket_state_pending, /**< Pending active. */ + vbucket_state_dead /**< Not in use, pending deletion. */ +} vbucket_state_t; + +#define is_valid_vbucket_state_t(state) \ + (state == vbucket_state_active || \ + state == vbucket_state_replica || \ + state == vbucket_state_pending || \ + state == vbucket_state_dead) + +#ifdef __cplusplus +} +#endif +#endif diff --git a/libmemcached/protocol/binary_handler.c b/libmemcached/protocol/binary_handler.c index 81c09c99..a9e4ce95 100644 --- a/libmemcached/protocol/binary_handler.c +++ b/libmemcached/protocol/binary_handler.c @@ -39,7 +39,7 @@ raw_response_handler(const void *cookie, if (!client->root->drain(client)) { - return PROTOCOL_BINARY_RESPONSE_EIO; + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } size_t len= sizeof(*response) + htonl(response->response.bodylen); @@ -65,7 +65,7 @@ raw_response_handler(const void *cookie, else if (get_socket_errno() != EINTR) { client->error= errno; - return PROTOCOL_BINARY_RESPONSE_EIO; + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } } else @@ -700,7 +700,7 @@ quit_command_handler(const void *cookie, } /* I need a better way to signal to close the connection */ - return PROTOCOL_BINARY_RESPONSE_EIO; + return PROTOCOL_BINARY_RESPONSE_EINTERNAL; } /** @@ -970,8 +970,8 @@ static protocol_binary_response_status execute_command(memcached_protocol_client } if (rval != PROTOCOL_BINARY_RESPONSE_SUCCESS && - rval != PROTOCOL_BINARY_RESPONSE_EIO && - rval != PROTOCOL_BINARY_RESPONSE_PAUSE) + rval != PROTOCOL_BINARY_RESPONSE_EINTERNAL && + rval != PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) { protocol_binary_response_no_extras response= { .message= { @@ -1018,12 +1018,12 @@ memcached_protocol_event_t memcached_binary_protocol_process_data(memcached_prot client->current_command= header; protocol_binary_response_status rv= execute_command(client, header); - if (rv == PROTOCOL_BINARY_RESPONSE_EIO) + if (rv == PROTOCOL_BINARY_RESPONSE_EINTERNAL) { *length= len; *endptr= (void*)header; return MEMCACHED_PROTOCOL_ERROR_EVENT; - } else if (rv == PROTOCOL_BINARY_RESPONSE_PAUSE) + } else if (rv == PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED) return MEMCACHED_PROTOCOL_PAUSE_EVENT; ssize_t total= (ssize_t)(sizeof(*header) + ntohl(header->request.bodylen));