diff --git a/include/valkey/cluster.h b/include/valkey/cluster.h index 1025bdde..29c32f32 100644 --- a/include/valkey/cluster.h +++ b/include/valkey/cluster.h @@ -152,6 +152,8 @@ typedef struct valkeyClusterNodeIterator { /* Enable parsing of replica nodes. Currently not used, but the * information is added to its primary node structure. */ #define VALKEY_OPT_USE_REPLICAS 0x2000 +/* Use a blocking slotmap update after an initial async connect. */ +#define VALKEY_OPT_BLOCKING_INITIAL_UPDATE 0x4000 typedef struct { const char *initial_nodes; @@ -191,8 +193,6 @@ valkeyClusterContext *valkeyClusterConnectWithOptions(const valkeyClusterOptions valkeyClusterContext *valkeyClusterConnect(const char *addrs); valkeyClusterContext *valkeyClusterConnectWithTimeout(const char *addrs, const struct timeval tv); -int valkeyClusterConnect2(valkeyClusterContext *cc); - void valkeyClusterFree(valkeyClusterContext *cc); /* Options configurable in runtime. */ diff --git a/src/cluster.c b/src/cluster.c index 6c55cf0a..6ed6f8f4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -55,6 +55,7 @@ vk_static_assert(VALKEY_OPT_USE_CLUSTER_SLOTS > VALKEY_OPT_LAST_SA_OPTION); #define VALKEY_FLAG_USE_CLUSTER_SLOTS 0x1 #define VALKEY_FLAG_PARSE_REPLICAS 0x2 #define VALKEY_FLAG_DISCONNECTING 0x4 +#define VALKEY_FLAG_BLOCKING_INITIAL_UPDATE 0x8 // Cluster errors are offset by 100 to be sufficiently out of range of // standard Valkey errors @@ -1272,6 +1273,9 @@ static valkeyClusterContext *valkeyClusterContextInit(const valkeyClusterOptions if (options->options & VALKEY_OPT_USE_REPLICAS) { cc->flags |= VALKEY_FLAG_PARSE_REPLICAS; } + if (options->options & VALKEY_OPT_BLOCKING_INITIAL_UPDATE) { + cc->flags |= VALKEY_FLAG_BLOCKING_INITIAL_UPDATE; + } if (options->max_retry_count > 0) { cc->max_retry_count = options->max_retry_count; } @@ -1627,24 +1631,6 @@ int valkeyClusterSetOptionTimeout(valkeyClusterContext *cc, return VALKEY_OK; } -int valkeyClusterConnect2(valkeyClusterContext *cc) { - - if (cc == NULL) { - return VALKEY_ERR; - } - - if (dictSize(cc->nodes) == 0) { - valkeyClusterSetError(cc, VALKEY_ERR_OTHER, - "server address not configured"); - return VALKEY_ERR; - } - /* Clear a previously set shutdown flag since we allow a - * reconnection of an async context using this API (legacy). */ - cc->flags &= ~VALKEY_FLAG_DISCONNECTING; - - return valkeyClusterUpdateSlotmap(cc); -} - valkeyContext *valkeyClusterGetValkeyContext(valkeyClusterContext *cc, valkeyClusterNode *node) { valkeyContext *c = NULL; @@ -2840,10 +2826,7 @@ valkeyClusterAsyncContext *valkeyClusterAsyncConnectWithOptions(const valkeyClus return NULL; } - //TODO: valkeyClusterAsyncConnect(acc); - if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) { - valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr); - } + valkeyClusterAsyncConnect(acc); return acc; } @@ -2852,7 +2835,19 @@ int valkeyClusterAsyncConnect(valkeyClusterAsyncContext *acc) { if (acc->attach_fn == NULL) { return VALKEY_ERR; } - /* TODO: add options to use: valkeyClusterUpdateSlotmap(acc->cc); */ + + /* Clear a previously set shutdown flag to allow a + * reconnection of an async context using this API. */ + acc->cc->flags &= ~VALKEY_FLAG_DISCONNECTING; + + /* Blocking or non-blocking initial slotmap update. */ + if (acc->cc->flags & VALKEY_FLAG_BLOCKING_INITIAL_UPDATE) { + if (valkeyClusterUpdateSlotmap(acc->cc) != VALKEY_OK) { + valkeyClusterAsyncSetError(acc, acc->cc->err, acc->cc->errstr); + return VALKEY_ERR; + } + return VALKEY_OK; + } return updateSlotMapAsync(acc, NULL /*any node*/); } diff --git a/tests/clusterclient_async.c b/tests/clusterclient_async.c index 028bb20d..6b5d68fb 100644 --- a/tests/clusterclient_async.c +++ b/tests/clusterclient_async.c @@ -251,11 +251,12 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = initnode; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.connect_timeout = &timeout; options.command_timeout = &timeout; options.max_retry_count = 1; if (use_cluster_slots) { - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options |= VALKEY_OPT_USE_CLUSTER_SLOTS; } if (show_events) { options.event_callback = eventCallback; diff --git a/tests/clusterclient_reconnect_async.c b/tests/clusterclient_reconnect_async.c index 0b123a6a..4ceda21e 100644 --- a/tests/clusterclient_reconnect_async.c +++ b/tests/clusterclient_reconnect_async.c @@ -27,16 +27,15 @@ void connectToValkey(valkeyClusterAsyncContext *acc) { /* reset context in case of reconnect */ valkeyClusterAsyncDisconnect(acc); - int status = valkeyClusterConnect2(acc->cc); - if (status == VALKEY_OK) { + if (valkeyClusterAsyncConnect(acc) == VALKEY_OK) { // cluster mode - } else if (acc->cc->err && - strcmp(acc->cc->errstr, VALKEY_ENOCLUSTER) == 0) { + } else if (acc->err && + strcmp(acc->errstr, VALKEY_ENOCLUSTER) == 0) { printf("[no cluster]\n"); - acc->cc->err = 0; - memset(acc->cc->errstr, '\0', strlen(acc->cc->errstr)); + acc->err = 0; + memset(acc->errstr, '\0', strlen(acc->errstr)); } else { - printf("Connect error: %s\n", acc->cc->errstr); + printf("Connect error: %s\n", acc->errstr); exit(-1); } } @@ -99,7 +98,8 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = initnode; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base); valkeyClusterAsyncContext *acc = valkeyClusterAsyncContextInit(&options); diff --git a/tests/ct_async_glib.c b/tests/ct_async_glib.c index 96f01c7a..8ae8b326 100644 --- a/tests/ct_async_glib.c +++ b/tests/ct_async_glib.c @@ -43,6 +43,7 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.onConnect = connectCallback; options.onDisconnect = disconnectCallback; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_GLIB(&options, context); diff --git a/tests/ct_async_libev.c b/tests/ct_async_libev.c index 5dbfd3e4..2747afe2 100644 --- a/tests/ct_async_libev.c +++ b/tests/ct_async_libev.c @@ -37,6 +37,7 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.onConnect = connectCallback; options.onDisconnect = disconnectCallback; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEV(&options, EV_DEFAULT); diff --git a/tests/ct_async_libuv.c b/tests/ct_async_libuv.c index fcd71df0..fdce2d2a 100644 --- a/tests/ct_async_libuv.c +++ b/tests/ct_async_libuv.c @@ -40,6 +40,7 @@ int main(int argc, char **argv) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.onConnect = connectCallback; options.onDisconnect = disconnectCallback; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBUV(&options, loop); diff --git a/tests/ct_connection.c b/tests/ct_connection.c index bdbfd7a9..2841038f 100644 --- a/tests/ct_connection.c +++ b/tests/ct_connection.c @@ -293,6 +293,7 @@ void test_async_password_ok(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.password = CLUSTER_PASSWORD; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; @@ -316,9 +317,13 @@ void test_async_password_ok(void) { /* Connect to a password protected cluster using the wrong password. An eventloop is not attached since it is not needed is this case. */ void test_async_password_wrong(void) { + struct event_base *base = event_base_new(); + valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.password = "faultypass"; + VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base); valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); assert(acc); @@ -336,15 +341,20 @@ void test_async_password_wrong(void) { assert(strcmp(acc->errstr, "slotmap not available") == 0); valkeyClusterAsyncFree(acc); + event_base_free(base); } /* Connect to a password protected cluster without providing a password. An eventloop is not attached since it is not needed is this case. */ void test_async_password_missing(void) { + struct event_base *base = event_base_new(); + valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; + VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base); // Password not configured valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); @@ -360,6 +370,7 @@ void test_async_password_missing(void) { assert(strcmp(acc->errstr, "slotmap not available") == 0); valkeyClusterAsyncFree(acc); + event_base_free(base); } // Connect to a cluster and authenticate using username and password @@ -371,6 +382,7 @@ void test_async_username_ok(void) { // Connect to the cluster using username and password valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; options.username = "missing-user"; @@ -411,6 +423,7 @@ void test_async_multicluster(void) { valkeyClusterOptions options1 = {0}; options1.initial_nodes = CLUSTER_NODE; + options1.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options1.onConnect = callbackExpectOk; options1.onDisconnect = callbackExpectOk; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options1, base); @@ -421,6 +434,7 @@ void test_async_multicluster(void) { valkeyClusterOptions options2 = {0}; options2.initial_nodes = CLUSTER_NODE_WITH_PASSWORD; + options2.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options2.password = CLUSTER_PASSWORD; options2.onConnect = callbackExpectOk; options2.onDisconnect = callbackExpectOk; @@ -471,6 +485,7 @@ void test_async_connect_timeout(void) { valkeyClusterOptions options = {0}; /* Configure a non-routable IP address and a timeout */ options.initial_nodes = "192.168.0.0:7000"; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.connect_timeout = &timeout; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base); @@ -492,6 +507,7 @@ void test_async_command_timeout(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.command_timeout = &timeout; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base); diff --git a/tests/ct_out_of_memory_handling.c b/tests/ct_out_of_memory_handling.c index 3cdd365a..7470998a 100644 --- a/tests/ct_out_of_memory_handling.c +++ b/tests/ct_out_of_memory_handling.c @@ -450,7 +450,8 @@ void test_alloc_failure_handling_async(void) { options.initial_nodes = CLUSTER_NODE; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; - options.options = VALKEY_OPT_USE_REPLICAS; + options.options = VALKEY_OPT_USE_REPLICAS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base); // Connect diff --git a/tests/ct_pipeline.c b/tests/ct_pipeline.c index f45808a2..d908b989 100644 --- a/tests/ct_pipeline.c +++ b/tests/ct_pipeline.c @@ -92,6 +92,7 @@ void test_async_pipeline(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; VALKEY_CLUSTER_OPTIONS_SET_ADAPTER_LIBEVENT(&options, base); diff --git a/tests/ct_specific_nodes.c b/tests/ct_specific_nodes.c index 9f017cca..f58d4d70 100644 --- a/tests/ct_specific_nodes.c +++ b/tests/ct_specific_nodes.c @@ -318,7 +318,8 @@ void test_async_to_single_node(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry_count = 1; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; @@ -348,7 +349,8 @@ void test_async_formatted_to_single_node(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry_count = 1; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; @@ -379,7 +381,8 @@ void test_async_command_argv_to_single_node(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry_count = 1; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; @@ -410,7 +413,8 @@ void test_async_to_all_nodes(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry_count = 1; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk; @@ -450,7 +454,8 @@ void test_async_transaction(void) { valkeyClusterOptions options = {0}; options.initial_nodes = CLUSTER_NODE; - options.options = VALKEY_OPT_USE_CLUSTER_SLOTS; + options.options = VALKEY_OPT_USE_CLUSTER_SLOTS | + VALKEY_OPT_BLOCKING_INITIAL_UPDATE; options.max_retry_count = 1; options.onConnect = callbackExpectOk; options.onDisconnect = callbackExpectOk;