From 144a890de7342fe80537f25045df1205fbed995d Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Fri, 12 Jan 2024 20:33:37 +0000 Subject: [PATCH 1/3] Have consistent behavior of SPUBLISH within multi/exec like regular command --- src/cluster.c | 9 +++-- tests/unit/cluster/sharded-pubsub.tcl | 56 +++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 4 deletions(-) create mode 100644 tests/unit/cluster/sharded-pubsub.tcl diff --git a/src/cluster.c b/src/cluster.c index 90c1291de8..512533f5d0 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1030,9 +1030,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in mc.cmd = cmd; } - int is_pubsubshard = cmd->proc == ssubscribeCommand || - cmd->proc == sunsubscribeCommand || - cmd->proc == spublishCommand; + uint64_t cmd_flags = getCommandFlags(c); + + /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ + int is_pubsubshard = (cmd_flags & CMD_PUBSUB) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB)); /* Check that all the keys are in the same hash slot, and obtain this * slot and the node associated. */ @@ -1122,7 +1124,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * without redirections or errors in all the cases. */ if (n == NULL) return myself; - uint64_t cmd_flags = getCommandFlags(c); /* Cluster is globally down but we got keys? We only serve the request * if it is a read command and when allow_reads_when_down is enabled. */ if (!isClusterHealthy()) { diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl new file mode 100644 index 0000000000..657861df72 --- /dev/null +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -0,0 +1,56 @@ +start_cluster 1 1 {tags {external:skip cluster}} { + set primary_id 0 + set replica1_id 1 + + set primary [Rn $primary_id] + set replica [Rn $replica1_id] + + test "Sharded pubsub publish behavior within multi/exec" { + foreach {node} {primary replica} { + set node [set $node] + $node MULTI + $node SPUBLISH ch1 "hello" + $node EXEC + } + } + + test "Sharded pubsub within multi/exec with cross slot operation" { + $primary MULTI + $primary SPUBLISH ch1 "hello" + $primary GET foo + catch {[$primary EXEC]} err + assert_match {CROSSSLOT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with read operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary GET foo + $primary EXEC + } {0 {}} + + test "Sharded pubsub publish behavior within multi/exec with read operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica GET foo]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } + + test "Sharded pubsub publish behavior within multi/exec with write operation on primary" { + $primary MULTI + $primary SPUBLISH foo "hello" + $primary SET foo bar + $primary EXEC + } {0 OK} + + test "Sharded pubsub publish behavior within multi/exec with write operation on replica" { + $replica MULTI + $replica SPUBLISH foo "hello" + catch {[$replica SET foo bar]} err + assert_match {MOVED*} $err + catch {[$replica EXEC]} err + assert_match {EXECABORT*} $err + } +} \ No newline at end of file From 192ad34829df3e60e4d58a746d84e11882bddd93 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Fri, 19 Jan 2024 18:41:51 +0000 Subject: [PATCH 2/3] update variable naming --- src/cluster.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 512533f5d0..8b05aed611 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1033,7 +1033,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in uint64_t cmd_flags = getCommandFlags(c); /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ - int is_pubsubshard = (cmd_flags & CMD_PUBSUB) || + int pubsubshard_included = (cmd_flags & CMD_PUBSUB) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB)); /* Check that all the keys are in the same hash slot, and obtain this @@ -1111,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * node until the migration completes with CLUSTER SETSLOT * NODE . */ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; - if ((migrating_slot || importing_slot) && !is_pubsubshard) + if ((migrating_slot || importing_slot) && !pubsubshard_included) { if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++; else existing_keys++; @@ -1127,7 +1127,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Cluster is globally down but we got keys? We only serve the request * if it is a read command and when allow_reads_when_down is enabled. */ if (!isClusterHealthy()) { - if (is_pubsubshard) { + if (pubsubshard_included) { if (!server.cluster_allow_pubsubshard_when_down) { if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; @@ -1190,7 +1190,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * is serving, we can reply without redirection. */ int is_write_command = (cmd_flags & CMD_WRITE) || (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); - if (((c->flags & CLIENT_READONLY) || is_pubsubshard) && + if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && !is_write_command && clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == n) From bb5d253a4107781d0464e5ea3b701ba31caef95b Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Fri, 22 Mar 2024 21:09:49 +0000 Subject: [PATCH 3/3] Fix spacing --- tests/unit/cluster/sharded-pubsub.tcl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/unit/cluster/sharded-pubsub.tcl b/tests/unit/cluster/sharded-pubsub.tcl index 657861df72..b5b19ff481 100644 --- a/tests/unit/cluster/sharded-pubsub.tcl +++ b/tests/unit/cluster/sharded-pubsub.tcl @@ -1,9 +1,9 @@ start_cluster 1 1 {tags {external:skip cluster}} { - set primary_id 0 - set replica1_id 1 + set primary_id 0 + set replica1_id 1 - set primary [Rn $primary_id] - set replica [Rn $replica1_id] + set primary [Rn $primary_id] + set replica [Rn $replica1_id] test "Sharded pubsub publish behavior within multi/exec" { foreach {node} {primary replica} {