Skip to content

Commit

Permalink
Add support to hash-field-expiration RDB_TYPE v2
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Jul 10, 2024
1 parent 9d584ea commit 89d6e50
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 51 deletions.
2 changes: 2 additions & 0 deletions src/ext/handlersFilter.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ static void initOpcodeToType(RdbxFilter *ctx) {
ctx->opToType[RDB_TYPE_ZSET_LISTPACK] = RDB_DATA_TYPE_ZSET;
/*hash*/
ctx->opToType[RDB_TYPE_HASH] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_METADATA_PRE_GA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_METADATA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPMAP] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_ZIPLIST] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK_EX_PRE_GA] = RDB_DATA_TYPE_HASH;
ctx->opToType[RDB_TYPE_HASH_LISTPACK_EX] = RDB_DATA_TYPE_HASH;
/*module*/
ctx->opToType[RDB_TYPE_MODULE_2] = RDB_DATA_TYPE_MODULE;
Expand Down
11 changes: 6 additions & 5 deletions src/lib/defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#define RDB_TYPE_MODULE_PRE_GA 6 /* Used in 4.0 release candidates */
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
the generating module being loaded. */
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */

/* Object types for encoded objects. */
#define RDB_TYPE_HASH_ZIPMAP 9
Expand All @@ -32,10 +31,12 @@
#define RDB_TYPE_STREAM_LISTPACKS_2 19
#define RDB_TYPE_SET_LISTPACK 20
#define RDB_TYPE_STREAM_LISTPACKS_3 21
#define RDB_TYPE_HASH_METADATA 22
#define RDB_TYPE_HASH_LISTPACK_EX 23
#define RDB_TYPE_MAX 24
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
#define RDB_TYPE_HASH_METADATA_PRE_GA 22 /* Hash with HFEs. Doesn't attach min TTL at start (7.4 RC) */
#define RDB_TYPE_HASH_LISTPACK_EX_PRE_GA 23 /* Hash LP with HFEs. Doesn't attach min TTL at start (7.4 RC) */
#define RDB_TYPE_HASH_METADATA 24 /* Hash with HFEs. Attach min TTL at start */
#define RDB_TYPE_HASH_LISTPACK_EX 25 /* Hash LP with HFEs. Attach min TTL at start */
#define RDB_TYPE_MAX 26


/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_SLOT_INFO 244 /* Individual slot info, such as slot id and size (cluster mode only). */
Expand Down
21 changes: 20 additions & 1 deletion src/lib/parser.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,12 @@ _LIBRDB_API int RDB_handleByLevel(RdbParser *p, RdbDataType type, RdbHandlersLev
break;
case RDB_DATA_TYPE_HASH:
p->handleTypeObjByLevel[RDB_TYPE_HASH] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_METADATA_PRE_GA] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_METADATA] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_ZIPMAP] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_ZIPLIST] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_LISTPACK] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_LISTPACK_EX_PRE_GA] = lvl;
p->handleTypeObjByLevel[RDB_TYPE_HASH_LISTPACK_EX] = lvl;
break;
case RDB_DATA_TYPE_MODULE:
Expand Down Expand Up @@ -1467,9 +1469,11 @@ RdbStatus elementNextRdbType(RdbParser *p) {
case RDB_TYPE_LIST_ZIPLIST: return nextParsingElementKeyValue(p, PE_RAW_LIST_ZL, PE_LIST_ZL);
/* hash */
case RDB_TYPE_HASH: return nextParsingElementKeyValue(p, PE_RAW_HASH, PE_HASH);
case RDB_TYPE_HASH_METADATA_PRE_GA: return nextParsingElementKeyValue(p, PE_RAW_HASH_META, PE_HASH_META);
case RDB_TYPE_HASH_METADATA: return nextParsingElementKeyValue(p, PE_RAW_HASH_META, PE_HASH_META);
case RDB_TYPE_HASH_ZIPLIST: return nextParsingElementKeyValue(p, PE_RAW_HASH_ZL, PE_HASH_ZL);
case RDB_TYPE_HASH_LISTPACK: return nextParsingElementKeyValue(p, PE_RAW_HASH_LP, PE_HASH_LP);
case RDB_TYPE_HASH_LISTPACK_EX_PRE_GA: return nextParsingElementKeyValue(p, PE_RAW_HASH_LP_EX, PE_HASH_LP_EX);
case RDB_TYPE_HASH_LISTPACK_EX: return nextParsingElementKeyValue(p, PE_RAW_HASH_LP_EX, PE_HASH_LP_EX);
case RDB_TYPE_HASH_ZIPMAP: return nextParsingElementKeyValue(p, PE_RAW_HASH_ZM, PE_HASH_ZM);
/* set */
Expand Down Expand Up @@ -1692,6 +1696,13 @@ RdbStatus elementHash(RdbParser *p) {

switch (ctx->state) {
case ST_HASH_HEADER:
if (p->currOpcode == RDB_TYPE_HASH_METADATA) {
/* digest min HFE expiration time. No need to pass it to handlers
each field goanna report its expiration time anyway */
BulkInfo *binfoExpire;
IF_NOT_OK_RETURN(rdbLoad(p, 8, RQ_ALLOC, NULL, &binfoExpire));
}

IF_NOT_OK_RETURN(rdbLoadLen(p, NULL, &(ctx->hash.numFields), NULL, NULL));

ctx->key.numItemsHint = ctx->hash.numFields;
Expand Down Expand Up @@ -1757,6 +1768,13 @@ RdbStatus elementHashZL(RdbParser *p) {
RdbStatus elementHashLPEx(RdbParser *p) {
BulkInfo *listpackBulk;

if (p->currOpcode != RDB_TYPE_HASH_LISTPACK_EX_PRE_GA) {
/* digest min HFE expiration time. No need to pass it to handlers
each field goanna report its expiration time anyway */
BulkInfo *binfoExpire;
IF_NOT_OK_RETURN(rdbLoad(p, 8, RQ_ALLOC, NULL, &binfoExpire));
}

IF_NOT_OK_RETURN(rdbLoadString(p, RQ_ALLOC_APP_BULK, NULL, &listpackBulk));

/*** ENTER SAFE STATE ***/
Expand Down Expand Up @@ -2648,7 +2666,8 @@ RdbStatus rdbLoadString(RdbParser *p, AllocTypeRq type, char *refBuf, BulkInfo *
return rdbLoadLzfString(p, type, refBuf, binfo);
default:
RDB_reportError(p, RDB_ERR_STRING_UNKNOWN_ENCODING_TYPE,
"rdbLoadString(): Unknown RDB string encoding type: %lu",len);
"rdbLoadString(): Unknown RDB string encoding type: %lu (0x%lx)",
len, len);
return RDB_STATUS_ERROR;
}
}
Expand Down
67 changes: 45 additions & 22 deletions src/lib/parserRaw.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ static int listpackValidateIntegrityCb(unsigned char* str, size_t size, RdbParse
static int zipmapValidateIntegrityCb(unsigned char* str, size_t size, RdbParser *p);
static int intsetValidateIntegrityCb(unsigned char* str, size_t size, RdbParser *p);
typedef int (*singleStringTypeValidateCb)(unsigned char* str, size_t size, RdbParser *p); // return 0 for error
static RdbStatus singleStringTypeHandling(RdbParser *p, singleStringTypeValidateCb validateCb, char *callerName);
static RdbStatus singleStringTypeHandling(RdbParser *p,
singleStringTypeValidateCb validateCb,
int digestHdrSize,
char *callerName);
void moduleTypeNameByID(char *name, uint64_t moduleid);

/*** init & release ***/
Expand Down Expand Up @@ -315,7 +318,8 @@ RdbStatus elementRawString(RdbParser *p) {
break;
default:
RDB_reportError(p, RDB_ERR_STRING_UNKNOWN_ENCODING_TYPE,
"elementRawString(): Unknown RDB string encoding type: %lu", strCtx->len);
"elementRawString(): Unknown RDB string encoding type: %lu (0x%lx)",
strCtx->len, strCtx->len);
return RDB_STATUS_ERROR;
}
}
Expand Down Expand Up @@ -419,7 +423,8 @@ RdbStatus elementRawString(RdbParser *p) {
}

RDB_reportError(p, RDB_ERR_STRING_UNKNOWN_ENCODING_TYPE,
"elementRawString(): Unknown RDB string encoding type: %lu", strCtx->encoding);
"elementRawString(): Unknown RDB string encoding type: %lu (0x%lx)",
strCtx->encoding, strCtx->encoding);
return RDB_STATUS_ERROR;
}

Expand All @@ -432,12 +437,13 @@ RdbStatus elementRawString(RdbParser *p) {
}

RdbStatus elementRawListZL(RdbParser *p) {
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, "elementRawListZL");
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, 0, "elementRawListZL");
}

RdbStatus elementRawHash(RdbParser *p) {
BulkInfo *binfo;
uint64_t expireAt;
int numDigits;
int offset, processedBytes;
size_t len;
unsigned char *unusedData;

Expand All @@ -456,18 +462,25 @@ RdbStatus elementRawHash(RdbParser *p) {
switch (p->elmCtx.state) {

case ST_RAW_HASH_HEADER:
numDigits = 0;
aggMakeRoom(p, 10); /* worse case 9 bytes for len */
offset = processedBytes = 0;

aggMakeRoom(p, 20); /* > optional 8 bytes + worse case 9 bytes for len */
if (p->currOpcode == RDB_TYPE_HASH_METADATA) {
/* load min expiration time. Do nothing with it since each field
* goanna report anyway its expiration time */
IF_NOT_OK_RETURN(rdbLoad(p, 8, RQ_ALLOC_REF, rawCtx->at, &binfo));
offset = processedBytes = 8;
}

IF_NOT_OK_RETURN(rdbLoadLen(p, NULL, &hashCtx->numFields,
(unsigned char *) rawCtx->at, &numDigits));
(unsigned char *) rawCtx->at + offset, &processedBytes));

/*** ENTER SAFE STATE ***/

hashCtx->visitField = 0;

IF_NOT_OK_RETURN(cbHandleBegin(p, DATA_SIZE_UNKNOWN_AHEAD));
IF_NOT_OK_RETURN(aggUpdateWritten(p, numDigits));
IF_NOT_OK_RETURN(aggUpdateWritten(p, processedBytes));

if (hashCtx->numFields == 0)
return nextParsingElement(p, PE_RAW_END_KEY); /* empty-key */
Expand All @@ -476,13 +489,13 @@ RdbStatus elementRawHash(RdbParser *p) {

case ST_RAW_HASH_READ_NEXT_EXPIRE:
if (p->parsingElement == PE_RAW_HASH_META) {
numDigits = 0;
processedBytes = 0;
aggMakeRoom(p, 32);
IF_NOT_OK_RETURN(rdbLoadLen(p, NULL, &expireAt,
(unsigned char *) rawCtx->at,
&numDigits));
&processedBytes));
/*** ENTER SAFE STATE ***/
IF_NOT_OK_RETURN(aggUpdateWritten(p, numDigits));
IF_NOT_OK_RETURN(aggUpdateWritten(p, processedBytes));
}
updateElementState(p, ST_RAW_HASH_READ_NEXT_FIELD_STR, 0); /* fall-thru */

Expand Down Expand Up @@ -518,27 +531,28 @@ RdbStatus elementRawHash(RdbParser *p) {
}

RdbStatus elementRawHashZL(RdbParser *p) {
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, "elementRawHashZL");
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, 0, "elementRawHashZL");
}

RdbStatus elementRawHashLP(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawHashLP");
return singleStringTypeHandling(p, listpackValidateIntegrityCb, 0, "elementRawHashLP");
}

RdbStatus elementRawHashLPEx(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawHashLPEx");
int digestMinExpireSize = (p->currOpcode != RDB_TYPE_HASH_LISTPACK_EX_PRE_GA) ? 8 : 0;
return singleStringTypeHandling(p, listpackValidateIntegrityCb, digestMinExpireSize, "elementRawHashLPEx");
}

RdbStatus elementRawHashZM(RdbParser *p) {
return singleStringTypeHandling(p, zipmapValidateIntegrityCb, "elementRawHashZM");
return singleStringTypeHandling(p, zipmapValidateIntegrityCb, 0, "elementRawHashZM");
}

RdbStatus elementRawSetIS(RdbParser *p) {
return singleStringTypeHandling(p, intsetValidateIntegrityCb, "elementRawSetIS");
return singleStringTypeHandling(p, intsetValidateIntegrityCb, 0, "elementRawSetIS");
}

RdbStatus elementRawSetLP(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawSetLP");
return singleStringTypeHandling(p, listpackValidateIntegrityCb, 0, "elementRawSetLP");
}

RdbStatus elementRawSet(RdbParser *p) {
Expand Down Expand Up @@ -595,11 +609,11 @@ RdbStatus elementRawSet(RdbParser *p) {
}

RdbStatus elementRawZsetLP(RdbParser *p) {
return singleStringTypeHandling(p, listpackValidateIntegrityCb, "elementRawZsetLP");
return singleStringTypeHandling(p, listpackValidateIntegrityCb, 0, "elementRawZsetLP");
}

RdbStatus elementRawZsetZL(RdbParser *p) {
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, "elementRawZsetZL");
return singleStringTypeHandling(p, ziplistValidateIntegrityCb, 0, "elementRawZsetZL");
}

RdbStatus elementRawZset(RdbParser *p) {
Expand Down Expand Up @@ -1122,8 +1136,11 @@ static int intsetValidateIntegrityCb(unsigned char* str, size_t size, RdbParser
return intsetValidateIntegrity(str, size, 1);
}

static RdbStatus singleStringTypeHandling(RdbParser *p, singleStringTypeValidateCb validateCb, char *callerName) {

static RdbStatus singleStringTypeHandling(RdbParser *p,
singleStringTypeValidateCb validateCb,
int digestHdrSize,
char *callerName) {
BulkInfo *binfo;
enum RAW_SINGLE_STRING_TYPE_STATES {
ST_RAW_SSTYPE_START=0,
ST_RAW_SSTYPE_CALL_STR, /* Call PE_RAW_STRING as sub-element */
Expand All @@ -1132,6 +1149,12 @@ static RdbStatus singleStringTypeHandling(RdbParser *p, singleStringTypeValidate

switch (p->elmCtx.state) {
case ST_RAW_SSTYPE_START:
if (digestHdrSize) {
IF_NOT_OK_RETURN(aggMakeRoom(p, digestHdrSize));
IF_NOT_OK_RETURN(rdbLoad(p, digestHdrSize, RQ_ALLOC_REF, p->rawCtx.at, &binfo));
/*** ENTER SAFE STATE ***/
IF_NOT_OK_RETURN(aggUpdateWritten(p, digestHdrSize));
}
/* take care string won't propagate for having integrity check */
IF_NOT_OK_RETURN(cbHandleBegin(p, DATA_SIZE_UNKNOWN_AHEAD));

Expand Down
2 changes: 1 addition & 1 deletion test/test_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ size_t serializeRedisReply(const redisReply *reply, char *buffer, size_t bsize)
*
* Return the response serialized
*/
char *sendRedisCmd(char *cmd, int expRetType, char *expRsp) {
char *sendRedisCmd(const char *cmd, int expRetType, char *expRsp) {
static char rspbuf[1024];

assert_int_not_equal(currRedisInst, -1);
Expand Down
2 changes: 1 addition & 1 deletion test/test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const char *getTargetRedisVersion(int *major, int *minor); /* call only after se
void teardownRedisServer(void);
void cleanup_json_sign_service(void);
int isSetRedisServer(void);
char *sendRedisCmd(char *cmd, int expRetType, char *expRsp);
char *sendRedisCmd(const char *cmd, int expRetType, char *expRsp);
int isSupportRestoreModuleAux(void);

/* test groups */
Expand Down
36 changes: 15 additions & 21 deletions test/test_rdb_to_redis.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,33 +189,27 @@ static void test_rdb_to_redis_hash(void **state) {

static void test_rdb_to_redis_hash_with_expire(void **state) {
UNUSED(state);
const char* configs[] = {
"CONFIG SET HASH-MAX-LISTPACK-ENTRIES 0", /*HT*/
"CONFIG SET HASH-MAX-LISTPACK-ENTRIES 512", /*listpack*/
};

/* hash-field-expiration available since 7.4 */
if ((serverMajorVer<7) || ((serverMajorVer==7) && (serverMinorVer<4)))
skip();

setupRedisServer("--enable-debug-command yes --dbfilename expire.rdb");

/* listpack */
sendRedisCmd("FLUSHALL", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("CONFIG SET HASH-MAX-LISTPACK-ENTRIES 512", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("HSET myhash f4 v1 f5 v2 f6 v3", REDIS_REPLY_INTEGER, "3");
sendRedisCmd("HPEXPIREAT myhash 70368744177663 FIELDS 2 f4 f5", REDIS_REPLY_ARRAY, "1 1");
rdb_save_librdb_reload_eq(0 /*restore*/, TMP_FOLDER("expire.rdb"));
rdb_save_librdb_reload_eq(1 /*restore*/, TMP_FOLDER("expire.rdb"));
sendRedisCmd("HPEXPIRETIME myhash FIELDS 3 f4 f5 f6", REDIS_REPLY_ARRAY,
"70368744177663 70368744177663 -1"); /* verify expected output */

/* dict (max-lp-entries=0) */
sendRedisCmd("FLUSHALL", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("CONFIG SET HASH-MAX-LISTPACK-ENTRIES 0", REDIS_REPLY_STATUS, NULL);
sendRedisCmd("HSET myhash f4 v1 f5 v2 f6 v3", REDIS_REPLY_INTEGER, "3");
sendRedisCmd("HPEXPIREAT myhash 70368744177663 FIELDS 2 f4 f5", REDIS_REPLY_ARRAY, "1 1");
rdb_save_librdb_reload_eq(0 /*restore*/, TMP_FOLDER("expire.rdb"));
rdb_save_librdb_reload_eq(1 /*restore*/, TMP_FOLDER("expire.rdb"));
sendRedisCmd("HPEXPIRETIME myhash FIELDS 3 f4 f5 f6", REDIS_REPLY_ARRAY,
"70368744177663 70368744177663 -1"); /* verify expected output */

for (int i = 0; i < 2; i++) {
sendRedisCmd("FLUSHALL", REDIS_REPLY_STATUS, NULL);
sendRedisCmd(configs[i], REDIS_REPLY_STATUS, NULL);
sendRedisCmd("HSET myhash f4 v1 f5 v2 f6 v3", REDIS_REPLY_INTEGER, "3");
sendRedisCmd("HPEXPIREAT myhash 70368744177663 FIELDS 2 f4 f5",
REDIS_REPLY_ARRAY, "1 1"); /*time=0x3fffffffffff*/
rdb_save_librdb_reload_eq(0 /*restore*/, TMP_FOLDER("expire.rdb")); /// <<<< ----- problem here
//rdb_save_librdb_reload_eq(1 /*restore*/, TMP_FOLDER("expire.rdb"));
sendRedisCmd("HPEXPIRETIME myhash FIELDS 3 f4 f5 f6", REDIS_REPLY_ARRAY,
"70368744177663 70368744177663 -1"); /* verify expected output */
}
teardownRedisServer();
}

Expand Down

0 comments on commit 89d6e50

Please sign in to comment.