Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into iso
Browse files Browse the repository at this point in the history
  • Loading branch information
bjosv committed Jul 1, 2024
2 parents f76825c + ac09913 commit f6622ca
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 403 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ SET(valkey_sources
src/sockcompat.c
src/valkey.c
src/valkeycluster.c
src/vkarray.c
src/vkutil.c)

IF(WIN32)
Expand Down
75 changes: 18 additions & 57 deletions src/command.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,18 @@
#else
#include <malloc.h>
#endif
#include <stdio.h>
#include <string.h>

#include "alloc.h"
#include "command.h"
#include "vkarray.h"
#include "sds.h"
#include "vkutil.h"
#include "win32.h"

#define LF (uint8_t)10
#define CR (uint8_t)13

static uint64_t cmd_id = 0; /* command id counter */

typedef enum {
KEYPOS_NONE,
KEYPOS_UNKNOWN,
Expand Down Expand Up @@ -93,9 +92,8 @@ static inline void to_upper(char *dst, const char *src, uint32_t len) {
}

/* Looks up a command or subcommand in the command table. Arg0 and arg1 are used
* to lookup the command. The function returns CMD_UNKNOWN on failure. On
* success, the command type is returned and *firstkey and *arity are
* populated. */
* to lookup the command. The function returns the cmddef for a found command,
* or NULL on failure. */
cmddef *valkey_lookup_cmd(const char *arg0, uint32_t arg0_len, const char *arg1,
uint32_t arg1_len) {
int num_commands = sizeof(server_commands) / sizeof(cmddef);
Expand Down Expand Up @@ -167,15 +165,6 @@ char *valkey_parse_bulk(char *p, char *end, char **str, uint32_t *len) {
return p;
}

static inline int push_keypos(struct cmd *r, char *arg, uint32_t arglen) {
struct keypos *kpos = vkarray_push(r->keys);
if (kpos == NULL)
return 0;
kpos->start = arg;
kpos->end = arg + arglen;
return 1;
}

/*
* Reference: https://valkey.io/docs/topics/protocol/
*
Expand Down Expand Up @@ -217,7 +206,6 @@ void valkey_parse_cmd(struct cmd *r) {
goto error;
if (rnarg == 0)
goto error;
r->narg = rnarg;

/* Parse the first two args. */
if ((p = valkey_parse_bulk(p, end, &arg0, &arg0_len)) == NULL)
Expand All @@ -232,7 +220,6 @@ void valkey_parse_cmd(struct cmd *r) {
/* Lookup command. */
if ((info = valkey_lookup_cmd(arg0, arg0_len, arg1, arg1_len)) == NULL)
goto error; /* Command not found. */
r->type = info->type;

/* Arity check (negative arity means minimum num args) */
if ((info->arity >= 0 && (int)rnarg != info->arity) ||
Expand All @@ -251,10 +238,10 @@ void valkey_parse_cmd(struct cmd *r) {
/* Keyword-based first key position */
const char *keyword;
int startfrom;
if (r->type == CMD_REQ_VALKEY_XREAD) {
if (info->type == CMD_REQ_VALKEY_XREAD) {
keyword = "STREAMS";
startfrom = 1;
} else if (r->type == CMD_REQ_VALKEY_XREADGROUP) {
} else if (info->type == CMD_REQ_VALKEY_XREADGROUP) {
keyword = "STREAMS";
startfrom = 4;
} else {
Expand All @@ -274,8 +261,9 @@ void valkey_parse_cmd(struct cmd *r) {
/* Keyword found. Now the first key is the next arg. */
if ((p = valkey_parse_bulk(p, end, &arg, &arglen)) == NULL)
goto error;
if (!push_keypos(r, arg, arglen))
goto oom;
/* Keep found key. */
r->key.start = arg;
r->key.len = arglen;
goto done;
}
}
Expand Down Expand Up @@ -317,9 +305,9 @@ void valkey_parse_cmd(struct cmd *r) {
* from the end of the command line. This is not implemented. */
goto error;
}

if (!push_keypos(r, arg, arglen))
goto oom;
/* Keep found key. */
r->key.start = arg;
r->key.len = arglen;

done:
ASSERT(r->type > CMD_UNKNOWN && r->type < CMD_SENTINEL);
Expand All @@ -333,7 +321,8 @@ void valkey_parse_cmd(struct cmd *r) {
if (r->errstr == NULL) {
r->errstr = vk_malloc(errmaxlen);
if (r->errstr == NULL) {
goto oom;
r->result = CMD_PARSE_ENOMEM;
return;
}
}

Expand All @@ -343,17 +332,14 @@ void valkey_parse_cmd(struct cmd *r) {
else if (info != NULL)
snprintf(r->errstr, errmaxlen, "Failed to find keys of command %s",
info->name);
else if (r->type == CMD_UNKNOWN && arg0 != NULL && arg1 != NULL)
else if (info == NULL && arg0 != NULL && arg1 != NULL)
snprintf(r->errstr, errmaxlen, "Unknown command %.*s %.*s", arg0_len,
arg0, arg1_len, arg1);
else if (r->type == CMD_UNKNOWN && arg0 != NULL)
else if (info == NULL && arg0 != NULL)
snprintf(r->errstr, errmaxlen, "Unknown command %.*s", arg0_len, arg0);
else
snprintf(r->errstr, errmaxlen, "Command parse error");
return;

oom:
r->result = CMD_PARSE_ENOMEM;
}

struct cmd *command_get(void) {
Expand All @@ -363,27 +349,15 @@ struct cmd *command_get(void) {
return NULL;
}

command->id = ++cmd_id;
command->result = CMD_PARSE_OK;
command->errstr = NULL;
command->type = CMD_UNKNOWN;
command->cmd = NULL;
command->clen = 0;
command->keys = NULL;
command->narg = 0;
command->quit = 0;
command->noforward = 0;
command->key.start = NULL;
command->key.len = 0;
command->slot_num = -1;
command->frag_seq = NULL;
command->reply = NULL;
command->node_addr = NULL;

command->keys = vkarray_create(1, sizeof(struct keypos));
if (command->keys == NULL) {
vk_free(command);
return NULL;
}

return command;
}

Expand All @@ -402,19 +376,6 @@ void command_destroy(struct cmd *command) {
command->errstr = NULL;
}

if (command->keys != NULL) {
command->keys->nelem = 0;
vkarray_destroy(command->keys);
command->keys = NULL;
}

if (command->frag_seq != NULL) {
vk_free(command->frag_seq);
command->frag_seq = NULL;
}

freeReplyObject(command->reply);

if (command->node_addr != NULL) {
sdsfree(command->node_addr);
command->node_addr = NULL;
Expand Down
26 changes: 3 additions & 23 deletions src/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@

#include <stdint.h>

#include "adlist.h"
#include "valkey.h"

typedef enum cmd_parse_result {
CMD_PARSE_OK, /* parsing ok */
CMD_PARSE_ENOMEM, /* out of memory */
Expand All @@ -62,41 +59,24 @@ typedef enum cmd_type {
} cmd_type_t;

struct keypos {
char *start; /* key start pos */
char *end; /* key end pos */
uint32_t remain_len; /* remain length after keypos->end for more key-value
pairs in command, like mset */
char *start; /* key start pos */
uint32_t len; /* Length of key */
};

struct cmd {

uint64_t id; /* command id */

cmd_parse_result_t result; /* command parsing result */
char *errstr; /* error info when the command parse failed */

cmd_type_t type; /* command type */

char *cmd;
uint32_t clen; /* command length */

struct vkarray *keys; /* array of keypos, for req */

uint32_t narg; /* # arguments (valkey) */

unsigned quit : 1; /* quit request? */
unsigned noforward : 1; /* not need forward (example: ping) */
struct keypos key; /* First found key in command. */

/* Command destination */
int slot_num; /* Command should be sent to slot.
* Set to -1 if command is sent to a given node,
* or if a slot can not be found or calculated. */
char *node_addr; /* Command sent to this node address */

struct cmd *
*frag_seq; /* sequence of fragment command, map from keys to fragments*/

valkeyReply *reply;
};

void valkey_parse_cmd(struct cmd *r);
Expand Down
7 changes: 2 additions & 5 deletions src/valkeycluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
#include "command.h"
#include "dict.h"
#include "valkeycluster.h"
#include "vkarray.h"
#include "vkutil.h"
#include "win32.h"

Expand Down Expand Up @@ -2263,15 +2262,13 @@ static int prepareCommand(valkeyClusterContext *cc, struct cmd *command) {
valkeyClusterSetError(cc, VALKEY_ERR_PROTOCOL, command->errstr);
return VALKEY_ERR;
}
if (vkarray_n(command->keys) <= 0) {
if (command->key.len == 0) {
valkeyClusterSetError(
cc, VALKEY_ERR_OTHER,
"No keys in command(must have keys for valkey cluster mode)");
return VALKEY_ERR;
}

struct keypos *kp = vkarray_get(command->keys, 0);
command->slot_num = keyHashSlot(kp->start, kp->end - kp->start);
command->slot_num = keyHashSlot(command->key.start, command->key.len);
return VALKEY_OK;
}

Expand Down
Loading

0 comments on commit f6622ca

Please sign in to comment.