diff --git a/api/librdb-ext-api.h b/api/librdb-ext-api.h index 0f43667..7b5d974 100644 --- a/api/librdb-ext-api.h +++ b/api/librdb-ext-api.h @@ -199,7 +199,9 @@ _LIBRDB_API RdbxToResp *RDBX_createHandlersToResp(RdbParser *, RdbxToRespConf *) * ****************************************************************/ -/* On start command pass command info. NULL otherwise. */ +/* As streaming RESP protocol, when starting a new command, provide details + * about the command. Otherwise, pass NULL. This information will be used to log + * and report the command in case of a failure from Redis server. */ typedef struct RdbxRespWriterStartCmd { /* Redis Command name (Ex: "SET", "RESTORE"). Owned by the caller. It is * constant static string and Valid for ref behind the duration of the call. */ diff --git a/src/ext/handlersToResp.c b/src/ext/handlersToResp.c index 8856b85..b7e834f 100644 --- a/src/ext/handlersToResp.c +++ b/src/ext/handlersToResp.c @@ -225,10 +225,7 @@ static inline RdbRes onWriteNewCmdDbg(RdbxToResp *ctx) { if (ctx->debug.flags & RFLAG_ENUM_CMD_ID) { char keyLenStr[32], cmdIdLenStr[32], cmdIdStr[32]; - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "SET"; - startCmd.key = KEY_CMD_ID_DBG; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"SET", KEY_CMD_ID_DBG, 0}; struct iovec iov[7]; /* write SET */ @@ -297,10 +294,7 @@ static inline RdbRes sendFirstRestoreFrag(RdbxToResp *ctx, RdbBulk frag, size_t if (ctx->keyCtx.delBeforeWrite == DEL_KEY_BEFORE_BY_RESTORE_REPLACE) extra_args++; - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "RESTORE"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = ctx->restoreCtx.restoreSize; + RdbxRespWriterStartCmd startCmd = {"RESTORE", ctx->keyCtx.key, ctx->restoreCtx.restoreSize}; /* writev RESTORE */ char cmd[64]; @@ -328,10 +322,7 @@ static inline RdbRes sendFirstRestoreFragModuleAux(RdbxToResp *ctx, RdbBulk frag struct iovec iov[3]; char lenStr[32]; - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "RESTOREMODAUX"; - startCmd.key = ""; - startCmd.restoreSize = ctx->restoreCtx.restoreSize; + RdbxRespWriterStartCmd startCmd = {"RESTOREMODAUX", "", ctx->restoreCtx.restoreSize}; /* writev RESTOREMODAUX */ iov[0].iov_base = ctx->restoreCtx.moduleAux.cmdPrefix; @@ -357,10 +348,7 @@ static RdbRes toRespNewDb(RdbParser *p, void *userData, int dbid) { int cnt = ll2string(dbidStr, sizeof(dbidStr), dbid); - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "SELECT"; - startCmd.key = ""; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"SELECT", "", 0}; IOV_CONST(&iov[0], "*2\r\n$6\r\nSELECT"); IOV_LENGTH(&iov[1], cnt, cntStr); @@ -398,10 +386,7 @@ static RdbRes toRespNewKey(RdbParser *p, void *userData, RdbBulk key, RdbKeyInfo struct iovec iov[4]; char keyLenStr[32]; - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "DEL"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"DEL", ctx->keyCtx.key, 0}; IOV_CONST(&iov[0], "*2\r\n$3\r\nDEL"); IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr); @@ -420,10 +405,7 @@ static RdbRes toRespEndKey(RdbParser *p, void *userData) { /* key is in db. Set its expiration time */ if (ctx->keyCtx.info.expiretime != -1) { struct iovec iov[6]; - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "PEXPIREAT"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"PEXPIREAT", ctx->keyCtx.key, 0}; char keyLenStr[32], expireLenStr[32], expireStr[32]; /* PEXPIREAT */ @@ -454,10 +436,7 @@ static RdbRes toRespString(RdbParser *p, void *userData, RdbBulk string) { struct iovec iov[7]; - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "SET"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"SET", ctx->keyCtx.key, 0}; /* write SET */ IOV_CONST(&iov[0], "*3\r\n$3\r\nSET"); @@ -480,10 +459,7 @@ static RdbRes toRespList(RdbParser *p, void *userData, RdbBulk item) { char keyLenStr[32], valLenStr[32]; int valLen = RDB_bulkLen(p, item); - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "RPUSH"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"RPUSH", ctx->keyCtx.key, 0}; /* write RPUSH */ IOV_CONST(&iov[0], "*3\r\n$5\r\nRPUSH"); @@ -508,10 +484,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va int fieldLen = RDB_bulkLen(p, field); int valueLen = RDB_bulkLen(p, value); - RdbxRespWriterStartCmd hsetCmd; - hsetCmd.cmd = "HSET"; - hsetCmd.key = ctx->keyCtx.key; - hsetCmd.restoreSize = 0; + RdbxRespWriterStartCmd hsetCmd = {"HSET", ctx->keyCtx.key, 0}; /* write RPUSH */ IOV_CONST(&iov[0], "*4\r\n$4\r\nHSET"); @@ -529,10 +502,7 @@ static RdbRes toRespHash(RdbParser *p, void *userData, RdbBulk field, RdbBulk va if (expireAt == -1) return RDB_OK; - RdbxRespWriterStartCmd hpexpireatCmd; - hpexpireatCmd.cmd = "HPEXPIREAT"; - hpexpireatCmd.key = ctx->keyCtx.key; - hpexpireatCmd.restoreSize = 0; + RdbxRespWriterStartCmd hpexpireatCmd = {"HPEXPIREAT", ctx->keyCtx.key, 0}; /* write HPEXPIREAT */ IOV_CONST(&iov[0], "*6\r\n$10\r\nHPEXPIREAT"); @@ -556,10 +526,7 @@ static RdbRes toRespSet(RdbParser *p, void *userData, RdbBulk member) { int valLen = RDB_bulkLen(p, member); - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "SADD"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"SADD", ctx->keyCtx.key, 0}; /* write RPUSH */ IOV_CONST(&iov[0], "*3\r\n$4\r\nSADD"); @@ -580,10 +547,7 @@ static RdbRes toRespZset(RdbParser *p, void *userData, RdbBulk member, double sc int valLen = RDB_bulkLen(p, member); - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "ZADD"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"ZADD", ctx->keyCtx.key, 0}; /* write ZADD */ IOV_CONST(&iov[0], "*4\r\n$4\r\nZADD"); @@ -628,10 +592,7 @@ static RdbRes toRespFunction(RdbParser *p, void *userData, RdbBulk func) { int funcLen = RDB_bulkLen(p, func); - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "FUNCTION"; - startCmd.key = ""; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"FUNCTION", "", 0}; if (ctx->conf.funcLibReplaceIfExist) IOV_CONST(&iov[0], "*4\r\n$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n$7\r\nREPLACE"); @@ -658,10 +619,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta * * for the Stream type. (We don't use the MAXLEN 0 trick from aof.c * because of Redis Enterprise CRDT compatibility issues - Can't XSETID "back") */ - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "XGROUP CREATE"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"XGROUP CREATE", ctx->keyCtx.key, 0}; IOV_CONST(&iov[0], "*6\r\n$6\r\nXGROUP\r\n$6\r\nCREATE"); IOV_LENGTH(&iov[1], ctx->keyCtx.keyLen, keyLenStr); @@ -686,10 +644,7 @@ static RdbRes toRespStreamMetaData(RdbParser *p, void *userData, RdbStreamMeta * int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastID.ms,meta->lastID.seq); int maxDelEntryIdLen = snprintf(maxDelEntryId, sizeof(maxDelEntryId), "%lu-%lu", meta->maxDelEntryID.ms, meta->maxDelEntryID.seq); - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "XSETID"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"XSETID", ctx->keyCtx.key, 0}; if ((ctx->keyCtx.info.opcode >= _RDB_TYPE_STREAM_LISTPACKS_2) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) { IOV_CONST(&iov[0], "*7\r\n$6\r\nXSETID"); @@ -727,9 +682,7 @@ static RdbRes toRespStreamItem(RdbParser *p, void *userData, RdbStreamID *id, Rd /* Start of (another) stream item? */ if ((ctx->streamCtx.xaddStartEndCounter % 2) == 0) { - startCmd.cmd = "XADD"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + startCmd = (RdbxRespWriterStartCmd) {"XADD", ctx->keyCtx.key, 0}; startCmdRef = &startCmd; /* writev XADD */ @@ -780,10 +733,7 @@ static RdbRes toRespStreamNewCGroup(RdbParser *p, void *userData, RdbBulk grpNam int idLen = snprintf(idStr, sizeof(idStr), "%lu-%lu",meta->lastId.ms,meta->lastId.seq); - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "XGROUP"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = { "XGROUP", ctx->keyCtx.key, 0}; /* writev XGROUP */ if ( (meta->entriesRead>=0) && (ctx->targetRedisVerVal >= VER_VAL(7, 0))) { @@ -863,10 +813,7 @@ static RdbRes toRespStreamConsumerPendingEntry(RdbParser *p, void *userData, Rdb return (RdbRes) RDBX_ERR_STREAM_INTEG_CHECK; } - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "XCLAIM"; - startCmd.key = ctx->keyCtx.key; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"XCLAIM", ctx->keyCtx.key, 0}; /* writev XCLAIM */ IOV_CONST(&iov[iovs++], "*12\r\n$6\r\nXCLAIM"); diff --git a/src/ext/respToRedisLoader.c b/src/ext/respToRedisLoader.c index 3a1ed54..4d7d960 100644 --- a/src/ext/respToRedisLoader.c +++ b/src/ext/respToRedisLoader.c @@ -243,10 +243,7 @@ static RdbRes redisAuthCustomized(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *aut char prefix[32]; - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = ""; - startCmd.key = ""; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"", "", 0}; /* allocate iovec (2 for header and trailer. 3 for each argument) */ struct iovec *iov = (struct iovec *)malloc((auth->cmd.argc * 3 + 2) * sizeof(struct iovec)); @@ -293,10 +290,7 @@ static RdbRes redisAuth(RdbxRespToRedisLoader *ctx, RdbxRedisAuth *auth) { /* AUTH [username] password */ - RdbxRespWriterStartCmd startCmd; - startCmd.cmd = "AUTH"; - startCmd.key = ""; - startCmd.restoreSize = 0; + RdbxRespWriterStartCmd startCmd = {"AUTH", "", 0}; struct iovec iov[10]; if (auth->user) {