Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharded pubsub command execution within multi/exec #13

Merged
merged 3 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1030,9 +1030,11 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
mc.cmd = cmd;
}

int is_pubsubshard = cmd->proc == ssubscribeCommand ||
cmd->proc == sunsubscribeCommand ||
cmd->proc == spublishCommand;
uint64_t cmd_flags = getCommandFlags(c);

/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
int pubsubshard_included = (cmd_flags & CMD_PUBSUB) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_PUBSUB));
zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved

/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
Expand Down Expand Up @@ -1109,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !is_pubsubshard)
if ((migrating_slot || importing_slot) && !pubsubshard_included)
{
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
else existing_keys++;
Expand All @@ -1122,11 +1124,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* without redirections or errors in all the cases. */
if (n == NULL) return myself;

uint64_t cmd_flags = getCommandFlags(c);
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (!isClusterHealthy()) {
if (is_pubsubshard) {
if (pubsubshard_included) {
if (!server.cluster_allow_pubsubshard_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
Expand Down Expand Up @@ -1189,7 +1190,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* is serving, we can reply without redirection. */
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
!is_write_command &&
clusterNodeIsSlave(myself) &&
clusterNodeGetSlaveof(myself) == n)
Expand Down
56 changes: 56 additions & 0 deletions tests/unit/cluster/sharded-pubsub.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
start_cluster 1 1 {tags {external:skip cluster}} {
set primary_id 0
set replica1_id 1

set primary [Rn $primary_id]
set replica [Rn $replica1_id]

test "Sharded pubsub publish behavior within multi/exec" {
foreach {node} {primary replica} {
set node [set $node]
$node MULTI
$node SPUBLISH ch1 "hello"
$node EXEC
}
}

test "Sharded pubsub within multi/exec with cross slot operation" {
$primary MULTI
$primary SPUBLISH ch1 "hello"
$primary GET foo
catch {[$primary EXEC]} err
assert_match {CROSSSLOT*} $err
}

test "Sharded pubsub publish behavior within multi/exec with read operation on primary" {
$primary MULTI
$primary SPUBLISH foo "hello"
$primary GET foo
$primary EXEC
} {0 {}}

test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica GET foo]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
}

test "Sharded pubsub publish behavior within multi/exec with write operation on primary" {
$primary MULTI
$primary SPUBLISH foo "hello"
$primary SET foo bar
$primary EXEC
} {0 OK}

test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
$replica MULTI
$replica SPUBLISH foo "hello"
catch {[$replica SET foo bar]} err
assert_match {MOVED*} $err
catch {[$replica EXEC]} err
assert_match {EXECABORT*} $err
}
}
Loading