Skip to content

Commit

Permalink
New option: Blocking slotmap updates after initial async connect
Browse files Browse the repository at this point in the history
Signed-off-by: Björn Svensson <bjorn.a.svensson@est.tech>
  • Loading branch information
bjosv committed Dec 4, 2024
1 parent 9007335 commit 8f0e516
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 40 deletions.
4 changes: 2 additions & 2 deletions include/valkey/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ typedef struct valkeyClusterNodeIterator {
/* Enable parsing of replica nodes. Currently not used, but the
* information is added to its primary node structure. */
#define VALKEY_OPT_USE_REPLICAS 0x2000
/* Use a blocking slotmap update after an initial async connect. */
#define VALKEY_OPT_BLOCKING_INITIAL_UPDATE 0x4000

typedef struct {
const char *initial_nodes;
Expand Down Expand Up @@ -191,8 +193,6 @@ valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions
valkeyClusterContext *valkeyClusterConnect(const char *addrs);
valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs,
const struct timeval tv);
int valkeyClusterConnect2(valkeyClusterContext *cc);

void valkeyClusterFree(valkeyClusterContext *cc);

/* Options configurable in runtime. */
Expand Down
41 changes: 18 additions & 23 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ vk_static_assert(VALKEY_OPT_USE_CLUSTER_SLOTS > VALKEY_OPT_LAST_SA_OPTION);
#define VALKEY_FLAG_USE_CLUSTER_SLOTS 0x1
#define VALKEY_FLAG_PARSE_REPLICAS 0x2
#define VALKEY_FLAG_DISCONNECTING 0x4
#define VALKEY_FLAG_BLOCKING_INITIAL_UPDATE 0x8

// Cluster errors are offset by 100 to be sufficiently out of range of
// standard Valkey errors
Expand Down Expand Up @@ -1272,6 +1273,9 @@ static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions
if (options->options & VALKEY_OPT_USE_REPLICAS) {
cc->flags |= VALKEY_FLAG_PARSE_REPLICAS;
}
if (options->options & VALKEY_OPT_BLOCKING_INITIAL_UPDATE) {
cc->flags |= VALKEY_FLAG_BLOCKING_INITIAL_UPDATE;
}
if (options->max_retry_count > 0) {
cc->max_retry_count = options->max_retry_count;
}
Expand Down Expand Up @@ -1627,24 +1631,6 @@ int valkeyClusterSetOptionTimeout(valkeyClusterContext *cc,
return VALKEY_OK;
}

int valkeyClusterConnect2(valkeyClusterContext *cc) {

if (cc == NULL) {
return VALKEY_ERR;
}

if (dictSize(cc->nodes) == 0) {
valkeyClusterSetError(cc, VALKEY_ERR_OTHER,
"server address not configured");
return VALKEY_ERR;
}
/* Clear a previously set shutdown flag since we allow a
* reconnection of an async context using this API (legacy). */
cc->flags &= ~VALKEY_FLAG_DISCONNECTING;

return valkeyClusterUpdateSlotmap(cc);
}

valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc,
valkeyClusterNode *node) {
valkeyContext *c = NULL;
Expand Down Expand Up @@ -2840,10 +2826,7 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClus
return NULL;
}

//TODO: valkeyClusterAsyncConnect(acc);
if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) {
valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr);
}
valkeyClusterAsyncConnect(acc);
return acc;
}

Expand All @@ -2852,7 +2835,19 @@ int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) {
if (acc->attach_fn == NULL) {
return VALKEY_ERR;
}
/* TODO: add options to use: valkeyClusterUpdateSlotmap(acc->cc); */

/* Clear a previously set shutdown flag to allow a
* reconnection of an async context using this API. */
acc->cc->flags &= ~VALKEY_FLAG_DISCONNECTING;

/* Blocking or non-blocking initial slotmap update. */
if (acc->cc->flags & VALKEY_FLAG_BLOCKING_INITIAL_UPDATE) {
if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) {
valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr);
return VALKEY_ERR;
}
return VALKEY_OK;
}
return updateSlotMapAsync(acc, NULL /*any node*/);
}

Expand Down
3 changes: 2 additions & 1 deletion tests/clusterclient_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,12 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = initnode;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.connect_timeout = &timeout;
options.command_timeout = &timeout;
options.max_retry_count = 1;
if (use_cluster_slots) {
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options |= VALKEY_OPT_USE_CLUSTER_SLOTS;
}
if (show_events) {
options.event_callback = eventCallback;
Expand Down
16 changes: 8 additions & 8 deletions tests/clusterclient_reconnect_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ void connectToValkey(valkeyClusterAsyncContext *acc) {
/* reset context in case of reconnect */
valkeyClusterAsyncDisconnect(acc);

int status = valkeyClusterConnect2(acc->cc);
if (status == VALKEY_OK) {
if (valkeyClusterAsyncConnect(acc) == VALKEY_OK) {
// cluster mode
} else if (acc->cc->err &&
strcmp(acc->cc->errstr, VALKEY_ENOCLUSTER) == 0) {
} else if (acc->err &&
strcmp(acc->errstr, VALKEY_ENOCLUSTER) == 0) {
printf("[no cluster]\n");
acc->cc->err = 0;
memset(acc->cc->errstr, '\0', strlen(acc->cc->errstr));
acc->err = 0;
memset(acc->errstr, '\0', strlen(acc->errstr));
} else {
printf("Connect error: %s\n", acc->cc->errstr);
printf("Connect error: %s\n", acc->errstr);
exit(-1);
}
}
Expand Down Expand Up @@ -99,7 +98,8 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = initnode;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options);
Expand Down
1 change: 1 addition & 0 deletions tests/ct_async_glib.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.onConnect = connectCallback;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_GLIB(&options, context);
Expand Down
1 change: 1 addition & 0 deletions tests/ct_async_libev.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.onConnect = connectCallback;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEV(&options, EV_DEFAULT);
Expand Down
1 change: 1 addition & 0 deletions tests/ct_async_libuv.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ int main(int argc, char **argv) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.onConnect = connectCallback;
options.onDisconnect = disconnectCallback;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBUV(&options, loop);
Expand Down
16 changes: 16 additions & 0 deletions tests/ct_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ void test_async_password_ok(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.password = CLUSTER_PASSWORD;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
Expand All @@ -316,9 +317,13 @@ void test_async_password_ok(void) {
/* Connect to a password protected cluster using the wrong password.
An eventloop is not attached since it is not needed is this case. */
void test_async_password_wrong(void) {
struct event_base *base = event_base_new();

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.password = "faultypass";
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options);
assert(acc);
Expand All @@ -336,15 +341,20 @@ void test_async_password_wrong(void) {
assert(strcmp(acc->errstr, "slotmap not available") == 0);

valkeyClusterAsyncFree(acc);
event_base_free(base);
}

/* Connect to a password protected cluster without providing a password.
An eventloop is not attached since it is not needed is this case. */
void test_async_password_missing(void) {
struct event_base *base = event_base_new();

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);
// Password not configured

valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options);
Expand All @@ -360,6 +370,7 @@ void test_async_password_missing(void) {
assert(strcmp(acc->errstr, "slotmap not available") == 0);

valkeyClusterAsyncFree(acc);
event_base_free(base);
}

// Connect to a cluster and authenticate using username and password
Expand All @@ -371,6 +382,7 @@ void test_async_username_ok(void) {
// Connect to the cluster using username and password
valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
options.username = "missing-user";
Expand Down Expand Up @@ -411,6 +423,7 @@ void test_async_multicluster(void) {

valkeyClusterOptions options1 = {0};
options1.initial_nodes = CLUSTER_NODE;
options1.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options1.onConnect = callbackExpectOk;
options1.onDisconnect = callbackExpectOk;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options1, base);
Expand All @@ -421,6 +434,7 @@ void test_async_multicluster(void) {

valkeyClusterOptions options2 = {0};
options2.initial_nodes = CLUSTER_NODE_WITH_PASSWORD;
options2.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options2.password = CLUSTER_PASSWORD;
options2.onConnect = callbackExpectOk;
options2.onDisconnect = callbackExpectOk;
Expand Down Expand Up @@ -471,6 +485,7 @@ void test_async_connect_timeout(void) {
valkeyClusterOptions options = {0};
/* Configure a non-routable IP address and a timeout */
options.initial_nodes = "192.168.0.0:7000";
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.connect_timeout = &timeout;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

Expand All @@ -492,6 +507,7 @@ void test_async_command_timeout(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.command_timeout = &timeout;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

Expand Down
3 changes: 2 additions & 1 deletion tests/ct_out_of_memory_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ void test_alloc_failure_handling_async(void) {
options.initial_nodes = CLUSTER_NODE;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
options.options = VALKEY_OPT_USE_REPLICAS;
options.options = VALKEY_OPT_USE_REPLICAS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);

// Connect
Expand Down
1 change: 1 addition & 0 deletions tests/ct_pipeline.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ void test_async_pipeline(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base);
Expand Down
15 changes: 10 additions & 5 deletions tests/ct_specific_nodes.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ void test_async_to_single_node(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry_count = 1;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
Expand Down Expand Up @@ -348,7 +349,8 @@ void test_async_formatted_to_single_node(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry_count = 1;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
Expand Down Expand Up @@ -379,7 +381,8 @@ void test_async_command_argv_to_single_node(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry_count = 1;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
Expand Down Expand Up @@ -410,7 +413,8 @@ void test_async_to_all_nodes(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry_count = 1;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
Expand Down Expand Up @@ -450,7 +454,8 @@ void test_async_transaction(void) {

valkeyClusterOptions options = {0};
options.initial_nodes = CLUSTER_NODE;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS;
options.options = VALKEY_OPT_USE_CLUSTER_SLOTS |
VALKEY_OPT_BLOCKING_INITIAL_UPDATE;
options.max_retry_count = 1;
options.onConnect = callbackExpectOk;
options.onDisconnect = callbackExpectOk;
Expand Down

0 comments on commit 8f0e516

Please sign in to comment.