From acc9ea8f918611c6d76d6979d8d44d56dce5e97e Mon Sep 17 00:00:00 2001 From: Ezra Chung <88335979+eramongodb@users.noreply.github.com> Date: Tue, 13 Feb 2024 12:43:05 -0600 Subject: [PATCH] CDRIVER-4099 Support mongos redirection during retryable operations (#1529) * Fix position of mock server port number info log message * Add tests for mongos deprioritization during retryable operations * Add support for mongos deprioritization during retryable operations --- src/libmongoc/CMakeLists.txt | 2 + .../src/mongoc/mongoc-bulk-operation.c | 2 +- .../src/mongoc/mongoc-change-stream.c | 8 +- .../src/mongoc/mongoc-client-session.c | 8 +- src/libmongoc/src/mongoc/mongoc-client.c | 74 +++- .../src/mongoc/mongoc-cluster-private.h | 3 + src/libmongoc/src/mongoc/mongoc-cluster.c | 25 +- src/libmongoc/src/mongoc/mongoc-collection.c | 30 +- src/libmongoc/src/mongoc/mongoc-cursor.c | 31 +- src/libmongoc/src/mongoc/mongoc-database.c | 1 + .../mongoc-deprioritized-servers-private.h | 51 +++ .../src/mongoc/mongoc-deprioritized-servers.c | 58 +++ .../mongoc-topology-description-private.h | 9 + .../src/mongoc/mongoc-topology-description.c | 165 ++++++-- .../src/mongoc/mongoc-topology-private.h | 3 + src/libmongoc/src/mongoc/mongoc-topology.c | 29 +- .../src/mongoc/mongoc-write-command.c | 24 +- src/libmongoc/tests/json-test.c | 5 +- src/libmongoc/tests/mock_server/mock-server.c | 3 +- .../tests/test-mongoc-client-session.c | 2 +- src/libmongoc/tests/test-mongoc-cluster.c | 48 ++- src/libmongoc/tests/test-mongoc-cyrus.c | 2 +- .../tests/test-mongoc-primary-stepdown.c | 60 +-- .../tests/test-mongoc-retryability-helpers.c | 67 +++ .../tests/test-mongoc-retryability-helpers.h | 25 ++ .../tests/test-mongoc-retryable-reads.c | 389 +++++++++++++++++- .../tests/test-mongoc-retryable-writes.c | 379 ++++++++++++++++- src/libmongoc/tests/test-mongoc-topology.c | 8 +- .../tests/test-mongoc-transactions.c | 2 +- .../tests/test-mongoc-write-commands.c | 8 +- 30 files changed, 1370 insertions(+), 151 deletions(-) create mode 100644 src/libmongoc/src/mongoc/mongoc-deprioritized-servers-private.h create mode 100644 src/libmongoc/src/mongoc/mongoc-deprioritized-servers.c create mode 100644 src/libmongoc/tests/test-mongoc-retryability-helpers.c create mode 100644 src/libmongoc/tests/test-mongoc-retryability-helpers.h diff --git a/src/libmongoc/CMakeLists.txt b/src/libmongoc/CMakeLists.txt index cf6e8e974a..f4e1c50195 100644 --- a/src/libmongoc/CMakeLists.txt +++ b/src/libmongoc/CMakeLists.txt @@ -566,6 +566,7 @@ set (SOURCES ${SOURCES} ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-cursor-array.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-database.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-error.c + ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-deprioritized-servers.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-flags.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-find-and-modify.c ${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-generation-map.c @@ -1039,6 +1040,7 @@ set (test-libmongoc-sources ${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-concern.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-prefs.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-write-concern.c + ${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryability-helpers.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-reads.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-writes.c ${PROJECT_SOURCE_DIR}/tests/test-mongoc-sample-commands.c diff --git a/src/libmongoc/src/mongoc/mongoc-bulk-operation.c b/src/libmongoc/src/mongoc/mongoc-bulk-operation.c index a21c9ba12b..d5a4a67e0b 100644 --- a/src/libmongoc/src/mongoc/mongoc-bulk-operation.c +++ b/src/libmongoc/src/mongoc/mongoc-bulk-operation.c @@ -816,7 +816,7 @@ mongoc_bulk_operation_execute (mongoc_bulk_operation_t *bulk, /* IN */ error); } else { server_stream = mongoc_cluster_stream_for_writes ( - cluster, bulk->session, reply, error); + cluster, bulk->session, NULL, reply, error); } if (!server_stream) { diff --git a/src/libmongoc/src/mongoc/mongoc-change-stream.c b/src/libmongoc/src/mongoc/mongoc-change-stream.c index 16135d113b..15c568c7f3 100644 --- a/src/libmongoc/src/mongoc/mongoc-change-stream.c +++ b/src/libmongoc/src/mongoc/mongoc-change-stream.c @@ -288,8 +288,12 @@ _make_cursor (mongoc_change_stream_t *stream) goto cleanup; } - server_stream = mongoc_cluster_stream_for_reads ( - &stream->client->cluster, stream->read_prefs, cs, &reply, &stream->err); + server_stream = mongoc_cluster_stream_for_reads (&stream->client->cluster, + stream->read_prefs, + cs, + NULL, + &reply, + &stream->err); if (!server_stream) { bson_destroy (&stream->err_doc); bson_copy_to (&reply, &stream->err_doc); diff --git a/src/libmongoc/src/mongoc/mongoc-client-session.c b/src/libmongoc/src/mongoc/mongoc-client-session.c index f36701c10f..4293e59b65 100644 --- a/src/libmongoc/src/mongoc/mongoc-client-session.c +++ b/src/libmongoc/src/mongoc/mongoc-client-session.c @@ -1104,8 +1104,12 @@ mongoc_client_session_start_transaction (mongoc_client_session_t *session, BSON_ASSERT (session); ret = true; - server_stream = mongoc_cluster_stream_for_writes ( - &session->client->cluster, session, NULL /* reply */, error); + server_stream = + mongoc_cluster_stream_for_writes (&session->client->cluster, + session, + NULL /* deprioritized servers */, + NULL /* reply */, + error); if (!server_stream) { ret = false; GOTO (done); diff --git a/src/libmongoc/src/mongoc/mongoc-client.c b/src/libmongoc/src/mongoc/mongoc-client.c index 8db232afe4..7066d4e8da 100644 --- a/src/libmongoc/src/mongoc/mongoc-client.c +++ b/src/libmongoc/src/mongoc/mongoc-client.c @@ -1727,15 +1727,26 @@ _mongoc_client_retryable_write_command_with_stream ( _mongoc_write_error_get_type (reply) == MONGOC_WRITE_ERR_RETRY) { bson_error_t ignored_error; - /* each write command may be retried at most once */ + // The write command may be retried at most once. is_retryable = false; - if (retry_server_stream) { - mongoc_server_stream_cleanup (retry_server_stream); - } + { + mongoc_deprioritized_servers_t *const ds = + mongoc_deprioritized_servers_new (); - retry_server_stream = mongoc_cluster_stream_for_writes ( - &client->cluster, parts->assembled.session, NULL, &ignored_error); + mongoc_deprioritized_servers_add_if_sharded ( + ds, server_stream->topology_type, server_stream->sd); + + BSON_ASSERT (!retry_server_stream); + retry_server_stream = + mongoc_cluster_stream_for_writes (&client->cluster, + parts->assembled.session, + ds, + NULL, + &ignored_error); + + mongoc_deprioritized_servers_destroy (ds); + } if (retry_server_stream) { parts->assembled.server_stream = retry_server_stream; @@ -1820,16 +1831,29 @@ _mongoc_client_retryable_read_command_with_stream ( /* each read command may be retried at most once */ is_retryable = false; - if (retry_server_stream) { - mongoc_server_stream_cleanup (retry_server_stream); - } + { + mongoc_deprioritized_servers_t *const ds = + mongoc_deprioritized_servers_new (); - retry_server_stream = - mongoc_cluster_stream_for_reads (&client->cluster, - parts->read_prefs, - parts->assembled.session, - NULL, - &ignored_error); + if (retry_server_stream) { + mongoc_deprioritized_servers_add_if_sharded ( + ds, retry_server_stream->topology_type, retry_server_stream->sd); + mongoc_server_stream_cleanup (retry_server_stream); + } else { + mongoc_deprioritized_servers_add_if_sharded ( + ds, server_stream->topology_type, server_stream->sd); + } + + retry_server_stream = + mongoc_cluster_stream_for_reads (&client->cluster, + parts->read_prefs, + parts->assembled.session, + ds, + NULL, + &ignored_error); + + mongoc_deprioritized_servers_destroy (ds); + } if (retry_server_stream) { parts->assembled.server_stream = retry_server_stream; @@ -1918,8 +1942,8 @@ mongoc_client_command_simple (mongoc_client_t *client, * configuration. The generic command method SHOULD allow an optional read * preference argument." */ - server_stream = - mongoc_cluster_stream_for_reads (cluster, read_prefs, NULL, reply, error); + server_stream = mongoc_cluster_stream_for_reads ( + cluster, read_prefs, NULL, NULL, reply, error); if (server_stream) { ret = _mongoc_client_command_with_stream ( @@ -2074,10 +2098,10 @@ _mongoc_client_command_with_opts (mongoc_client_t *client, } } else if (parts.is_write_command) { server_stream = - mongoc_cluster_stream_for_writes (cluster, cs, reply_ptr, error); + mongoc_cluster_stream_for_writes (cluster, cs, NULL, reply_ptr, error); } else { - server_stream = - mongoc_cluster_stream_for_reads (cluster, prefs, cs, reply_ptr, error); + server_stream = mongoc_cluster_stream_for_reads ( + cluster, prefs, cs, NULL, reply_ptr, error); } if (!server_stream) { @@ -2622,6 +2646,7 @@ mongoc_client_kill_cursor (mongoc_client_t *client, int64_t cursor_id) MONGOC_SS_WRITE, read_prefs, NULL /* chosen read mode */, + NULL /* deprioritized servers */, topology->local_threshold_msec); if (selected_server) { @@ -3060,8 +3085,13 @@ _mongoc_client_end_sessions (mongoc_client_t *client) while (!mongoc_server_session_pool_is_empty (t->session_pool)) { prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED); - server_id = mongoc_topology_select_server_id ( - t, MONGOC_SS_READ, prefs, NULL /* chosen read mode */, &error); + server_id = + mongoc_topology_select_server_id (t, + MONGOC_SS_READ, + prefs, + NULL /* chosen read mode */, + NULL /* deprioritized servers */, + &error); mongoc_read_prefs_destroy (prefs); if (!server_id) { diff --git a/src/libmongoc/src/mongoc/mongoc-cluster-private.h b/src/libmongoc/src/mongoc/mongoc-cluster-private.h index a9f3d1c885..7af22850aa 100644 --- a/src/libmongoc/src/mongoc/mongoc-cluster-private.h +++ b/src/libmongoc/src/mongoc/mongoc-cluster-private.h @@ -38,6 +38,7 @@ #include "mongoc-scram-private.h" #include "mongoc-cmd-private.h" #include "mongoc-crypto-private.h" +#include "mongoc-deprioritized-servers-private.h" BSON_BEGIN_DECLS @@ -121,6 +122,7 @@ mongoc_server_stream_t * mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster, const mongoc_read_prefs_t *read_prefs, mongoc_client_session_t *cs, + const mongoc_deprioritized_servers_t *ds, bson_t *reply, bson_error_t *error); @@ -138,6 +140,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster, mongoc_server_stream_t * mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster, mongoc_client_session_t *cs, + const mongoc_deprioritized_servers_t *ds, bson_t *reply, bson_error_t *error); diff --git a/src/libmongoc/src/mongoc/mongoc-cluster.c b/src/libmongoc/src/mongoc/mongoc-cluster.c index ef24ac81c6..343528bfce 100644 --- a/src/libmongoc/src/mongoc/mongoc-cluster.c +++ b/src/libmongoc/src/mongoc/mongoc-cluster.c @@ -2794,6 +2794,7 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs, mongoc_ss_optype_t optype, const mongoc_read_prefs_t *read_prefs, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, bson_error_t *error) { BSON_ASSERT (cs || true); @@ -2808,14 +2809,14 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs, server_id = cs->server_id; if (!server_id) { server_id = mongoc_topology_select_server_id ( - topology, optype, read_prefs, must_use_primary, error); + topology, optype, read_prefs, must_use_primary, ds, error); if (server_id) { _mongoc_client_session_pin (cs, server_id); } } } else { server_id = mongoc_topology_select_server_id ( - topology, optype, read_prefs, must_use_primary, error); + topology, optype, read_prefs, must_use_primary, ds, error); /* Transactions Spec: Additionally, any non-transaction operation using a * pinned ClientSession MUST unpin the session and the operation MUST * perform normal server selection. */ @@ -2851,6 +2852,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster, const mongoc_read_prefs_t *read_prefs, mongoc_client_session_t *cs, bool is_retryable, + const mongoc_deprioritized_servers_t *ds, bson_t *reply, bson_error_t *error) { @@ -2870,7 +2872,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster, BSON_ASSERT (cluster); server_id = _mongoc_cluster_select_server_id ( - cs, topology, optype, read_prefs, &must_use_primary, error); + cs, topology, optype, read_prefs, &must_use_primary, ds, error); if (!server_id) { if (reply) { @@ -2883,7 +2885,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster, if (!mongoc_cluster_check_interval (cluster, server_id)) { /* Server Selection Spec: try once more */ server_id = _mongoc_cluster_select_server_id ( - cs, topology, optype, read_prefs, &must_use_primary, error); + cs, topology, optype, read_prefs, &must_use_primary, ds, error); if (!server_id) { if (reply) { @@ -2967,6 +2969,7 @@ mongoc_server_stream_t * mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster, const mongoc_read_prefs_t *read_prefs, mongoc_client_session_t *cs, + const mongoc_deprioritized_servers_t *ds, bson_t *reply, bson_error_t *error) { @@ -2979,13 +2982,20 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster, const bool is_retryable = mongoc_uri_get_option_as_bool ( cluster->uri, MONGOC_URI_RETRYREADS, MONGOC_DEFAULT_RETRYREADS); - return _mongoc_cluster_stream_for_optype ( - cluster, MONGOC_SS_READ, prefs_override, cs, is_retryable, reply, error); + return _mongoc_cluster_stream_for_optype (cluster, + MONGOC_SS_READ, + prefs_override, + cs, + is_retryable, + ds, + reply, + error); } mongoc_server_stream_t * mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster, mongoc_client_session_t *cs, + const mongoc_deprioritized_servers_t *ds, bson_t *reply, bson_error_t *error) { @@ -2993,7 +3003,7 @@ mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster, cluster->uri, MONGOC_URI_RETRYWRITES, MONGOC_DEFAULT_RETRYWRITES); return _mongoc_cluster_stream_for_optype ( - cluster, MONGOC_SS_WRITE, NULL, cs, is_retryable, reply, error); + cluster, MONGOC_SS_WRITE, NULL, cs, is_retryable, ds, reply, error); } mongoc_server_stream_t * @@ -3015,6 +3025,7 @@ mongoc_cluster_stream_for_aggr_with_write ( prefs_override, cs, is_retryable, + NULL, reply, error); } diff --git a/src/libmongoc/src/mongoc/mongoc-collection.c b/src/libmongoc/src/mongoc/mongoc-collection.c index 9862ec0464..5dc87be4ef 100644 --- a/src/libmongoc/src/mongoc/mongoc-collection.c +++ b/src/libmongoc/src/mongoc/mongoc-collection.c @@ -59,7 +59,7 @@ _mongoc_collection_write_command_execute ( ENTRY; server_stream = mongoc_cluster_stream_for_writes ( - &collection->client->cluster, cs, NULL, &result->error); + &collection->client->cluster, cs, NULL, NULL, &result->error); if (!server_stream) { /* result->error has been filled out */ @@ -97,6 +97,7 @@ _mongoc_collection_write_command_execute_idl ( server_stream = mongoc_cluster_stream_for_writes (&collection->client->cluster, crud->client_session, + NULL, &reply, &result->error); @@ -815,7 +816,7 @@ mongoc_collection_estimated_document_count ( BSON_ASSERT_PARAM (coll); server_stream = mongoc_cluster_stream_for_reads ( - &coll->client->cluster, read_prefs, NULL, reply, error); + &coll->client->cluster, read_prefs, NULL, NULL, reply, error); if (opts && bson_has_field (opts, "sessionId")) { bson_set_error (error, @@ -1625,7 +1626,7 @@ mongoc_collection_create_index_with_opts (mongoc_collection_t *collection, bson_append_array_builder_end (&cmd, ar); server_stream = mongoc_cluster_stream_for_writes ( - &collection->client->cluster, parsed.client_session, reply, error); + &collection->client->cluster, parsed.client_session, NULL, reply, error); if (!server_stream) { reply_initialized = true; @@ -2245,6 +2246,7 @@ _mongoc_collection_update_or_replace (mongoc_collection_t *collection, server_stream = mongoc_cluster_stream_for_writes (&collection->client->cluster, update_opts->crud.client_session, + NULL, reply, error); @@ -3396,7 +3398,7 @@ mongoc_collection_find_and_modify_with_opts ( } server_stream = mongoc_cluster_stream_for_writes ( - cluster, appended_opts.client_session, &ss_reply, error); + cluster, appended_opts.client_session, NULL, &ss_reply, error); if (!server_stream) { bson_concat (reply_ptr, &ss_reply); @@ -3566,8 +3568,23 @@ mongoc_collection_find_and_modify_with_opts ( /* each write command may be retried at most once */ is_retryable = false; - retry_server_stream = mongoc_cluster_stream_for_writes ( - cluster, parts.assembled.session, NULL /* reply */, &ignored_error); + + { + mongoc_deprioritized_servers_t *const ds = + mongoc_deprioritized_servers_new (); + + mongoc_deprioritized_servers_add_if_sharded ( + ds, server_stream->topology_type, server_stream->sd); + + retry_server_stream = + mongoc_cluster_stream_for_writes (cluster, + parts.assembled.session, + ds, + NULL /* reply */, + &ignored_error); + + mongoc_deprioritized_servers_destroy (ds); + } if (retry_server_stream) { parts.assembled.server_stream = retry_server_stream; @@ -3782,6 +3799,7 @@ mongoc_collection_create_indexes_with_opts (mongoc_collection_t *collection, server_stream = mongoc_cluster_stream_for_writes (&collection->client->cluster, NULL /* mongoc_client_session_t */, + NULL /* deprioritized servers */, reply_ptr, error); if (server_stream->sd->max_wire_version < WIRE_VERSION_4_4) { diff --git a/src/libmongoc/src/mongoc/mongoc-cursor.c b/src/libmongoc/src/mongoc/mongoc-cursor.c index fc0d311843..8e39e3bc1d 100644 --- a/src/libmongoc/src/mongoc/mongoc-cursor.c +++ b/src/libmongoc/src/mongoc/mongoc-cursor.c @@ -666,6 +666,7 @@ _mongoc_cursor_fetch_stream (mongoc_cursor_t *cursor) : mongoc_cluster_stream_for_reads (&cursor->client->cluster, cursor->read_prefs, cursor->client_session, + NULL, &reply, &cursor->error); @@ -1098,16 +1099,28 @@ _mongoc_cursor_run_command (mongoc_cursor_t *cursor, MONGOC_READ_ERR_RETRY) { is_retryable = false; - mongoc_server_stream_cleanup (server_stream); + { + mongoc_deprioritized_servers_t *const ds = + mongoc_deprioritized_servers_new (); + + mongoc_deprioritized_servers_add_if_sharded ( + ds, server_stream->topology_type, server_stream->sd); + + mongoc_server_stream_cleanup (server_stream); - BSON_ASSERT (!cursor->is_aggr_with_write_stage && - "Cannot attempt a retry on an aggregate operation that " - "contains write stages"); - server_stream = mongoc_cluster_stream_for_reads (&cursor->client->cluster, - cursor->read_prefs, - cursor->client_session, - reply, - &cursor->error); + BSON_ASSERT (!cursor->is_aggr_with_write_stage && + "Cannot attempt a retry on an aggregate operation that " + "contains write stages"); + server_stream = + mongoc_cluster_stream_for_reads (&cursor->client->cluster, + cursor->read_prefs, + cursor->client_session, + ds, + reply, + &cursor->error); + + mongoc_deprioritized_servers_destroy (ds); + } if (server_stream) { cursor->server_id = server_stream->sd->id; diff --git a/src/libmongoc/src/mongoc/mongoc-database.c b/src/libmongoc/src/mongoc/mongoc-database.c index 015d70d92d..46fc2d6204 100644 --- a/src/libmongoc/src/mongoc/mongoc-database.c +++ b/src/libmongoc/src/mongoc/mongoc-database.c @@ -1101,6 +1101,7 @@ create_collection_with_encryptedFields (mongoc_database_t *database, mongoc_server_stream_t *stream = mongoc_cluster_stream_for_writes (&database->client->cluster, NULL /* client session */, + NULL /* deprioritized servers */, NULL /* reply */, error); if (!stream) { diff --git a/src/libmongoc/src/mongoc/mongoc-deprioritized-servers-private.h b/src/libmongoc/src/mongoc/mongoc-deprioritized-servers-private.h new file mode 100644 index 0000000000..6e9b0333d0 --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-deprioritized-servers-private.h @@ -0,0 +1,51 @@ +/* + * Copyright 2024 MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mongoc-prelude.h" + +#ifndef MONGOC_DEPRIORITIZED_SERVERS_PRIVATE_H +#define MONGOC_DEPRIORITIZED_SERVERS_PRIVATE_H + +#include + +#include "mongoc-server-description.h" + +#include + +BSON_BEGIN_DECLS + +typedef struct _mongoc_deprioritized_servers_t mongoc_deprioritized_servers_t; + +mongoc_deprioritized_servers_t * +mongoc_deprioritized_servers_new (void); + +void +mongoc_deprioritized_servers_destroy ( + mongoc_deprioritized_servers_t *ds); + +void +mongoc_deprioritized_servers_add ( + mongoc_deprioritized_servers_t *ds, + const mongoc_server_description_t *sd); + +bool +mongoc_deprioritized_servers_contains ( + const mongoc_deprioritized_servers_t *ds, + const mongoc_server_description_t *sd); + +BSON_END_DECLS + +#endif // MONGOC_DEPRIORITIZED_SERVERS_PRIVATE_H diff --git a/src/libmongoc/src/mongoc/mongoc-deprioritized-servers.c b/src/libmongoc/src/mongoc/mongoc-deprioritized-servers.c new file mode 100644 index 0000000000..71369f595a --- /dev/null +++ b/src/libmongoc/src/mongoc/mongoc-deprioritized-servers.c @@ -0,0 +1,58 @@ +#include "mongoc-deprioritized-servers-private.h" + +#include "mongoc-set-private.h" + +// Dedicated non-zero value to avoid confusing "key is present with a NULL item" +// from "key is not present" (also NULL). +#define MONGOC_DEPRIORITIZED_SERVERS_ITEM_VALUE ((void *) 1) + +struct _mongoc_deprioritized_servers_t { + // Use server ID (uint32_t) as keys to identify deprioritized servers. + mongoc_set_t *ids; +}; + +mongoc_deprioritized_servers_t * +mongoc_deprioritized_servers_new (void) +{ + mongoc_deprioritized_servers_t *const ret = bson_malloc (sizeof (*ret)); + + *ret = (mongoc_deprioritized_servers_t){ + .ids = mongoc_set_new (1u, NULL, NULL), + }; + + return ret; +} + +void +mongoc_deprioritized_servers_destroy (mongoc_deprioritized_servers_t *ds) +{ + if (!ds) { + return; + } + + mongoc_set_destroy (ds->ids); + bson_free (ds); +} + +void +mongoc_deprioritized_servers_add (mongoc_deprioritized_servers_t *ds, + const mongoc_server_description_t *sd) +{ + BSON_ASSERT_PARAM (ds); + BSON_ASSERT_PARAM (sd); + + mongoc_set_add (ds->ids, + mongoc_server_description_id (sd), + MONGOC_DEPRIORITIZED_SERVERS_ITEM_VALUE); +} + +bool +mongoc_deprioritized_servers_contains (const mongoc_deprioritized_servers_t *ds, + const mongoc_server_description_t *sd) +{ + BSON_ASSERT_PARAM (ds); + BSON_ASSERT_PARAM (sd); + + return mongoc_set_get_const (ds->ids, mongoc_server_description_id (sd)) == + MONGOC_DEPRIORITIZED_SERVERS_ITEM_VALUE; +} diff --git a/src/libmongoc/src/mongoc/mongoc-topology-description-private.h b/src/libmongoc/src/mongoc/mongoc-topology-description-private.h index 4e8f49ad16..edafc7695e 100644 --- a/src/libmongoc/src/mongoc/mongoc-topology-description-private.h +++ b/src/libmongoc/src/mongoc/mongoc-topology-description-private.h @@ -24,6 +24,7 @@ #include "mongoc-array-private.h" #include "mongoc-topology-description.h" #include "mongoc-apm-private.h" +#include "mongoc-deprioritized-servers-private.h" typedef enum { @@ -114,6 +115,7 @@ mongoc_topology_description_select ( mongoc_ss_optype_t optype, const mongoc_read_prefs_t *read_pref, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, int64_t local_threshold_ms); mongoc_server_description_t * @@ -149,6 +151,7 @@ mongoc_topology_description_suitable_servers ( const mongoc_topology_description_t *topology, const mongoc_read_prefs_t *read_pref, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, int64_t local_threshold_ms); bool @@ -194,4 +197,10 @@ _mongoc_topology_description_clear_connection_pool ( uint32_t server_id, const bson_oid_t *service_id); +void +mongoc_deprioritized_servers_add_if_sharded ( + mongoc_deprioritized_servers_t *ds, + mongoc_topology_description_type_t topology_type, + const mongoc_server_description_t *sd); + #endif /* MONGOC_TOPOLOGY_DESCRIPTION_PRIVATE_H */ diff --git a/src/libmongoc/src/mongoc/mongoc-topology-description.c b/src/libmongoc/src/mongoc/mongoc-topology-description.c index b7b35a12c0..fb885d6940 100644 --- a/src/libmongoc/src/mongoc/mongoc-topology-description.c +++ b/src/libmongoc/src/mongoc/mongoc-topology-description.c @@ -514,6 +514,7 @@ _mongoc_try_mode_secondary (mongoc_array_t *set, /* OUT */ const mongoc_topology_description_t *topology, const mongoc_read_prefs_t *read_pref, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, int64_t local_threshold_ms) { mongoc_read_prefs_t *secondary; @@ -526,26 +527,101 @@ _mongoc_try_mode_secondary (mongoc_array_t *set, /* OUT */ topology, secondary, must_use_primary, + ds, local_threshold_ms); mongoc_read_prefs_destroy (secondary); } -/* if any mongos are candidates, add them to the candidates array */ static bool -_mongoc_find_suitable_mongos_cb (const void *item, void *ctx) +_mongoc_td_servers_to_candidates_array (const void *item, void *ctx) { - const mongoc_server_description_t *server = item; - mongoc_suitable_data_t *data = (mongoc_suitable_data_t *) ctx; + BSON_ASSERT_PARAM (item); + BSON_ASSERT_PARAM (ctx); + + const mongoc_server_description_t *const server = item; + mongoc_suitable_data_t *const data = (mongoc_suitable_data_t *) ctx; + + data->candidates[data->candidates_len++] = server; - if (_mongoc_topology_description_server_is_candidate ( - server->type, data->read_mode, data->topology_type)) { - data->candidates[data->candidates_len++] = server; - } return true; } +// Server Selection Spec: If a list of deprioritized servers is provided, and +// the topology is a sharded cluster, these servers should be selected only if +// there are no other suitable servers. The server selection algorithm MUST +// ignore the deprioritized servers if the topology is not a sharded cluster. +static void +_mongoc_filter_deprioritized_servers (mongoc_suitable_data_t *data, + const mongoc_deprioritized_servers_t *ds) +{ + BSON_ASSERT_PARAM (data); + BSON_ASSERT_PARAM (ds); + + TRACE ("%s", "deprioritization: filtering list of candidates"); + + mongoc_array_t filtered_servers; + _mongoc_array_init (&filtered_servers, + sizeof (const mongoc_server_description_t *)); + + for (size_t idx = 0u; idx < data->candidates_len; ++idx) { + mongoc_server_description_t const *const sd = data->candidates[idx]; + + if (!mongoc_deprioritized_servers_contains (ds, sd)) { + TRACE ("deprioritization: - kept: %s (id: %" PRIu32 ")", + sd->host.host_and_port, + sd->id); + _mongoc_array_append_val (&filtered_servers, sd); + } else { + TRACE ("deprioritization: - removed: %s (id: %" PRIu32 ")", + sd->host.host_and_port, + sd->id); + } + } + + if (filtered_servers.len == 0u) { + TRACE ("%s", + "deprioritization: reverted due to no other suitable servers"); + _mongoc_array_destroy (&filtered_servers); + } else if (filtered_servers.len == data->candidates_len) { + TRACE ("%s", "deprioritization: none found in list of candidates"); + _mongoc_array_destroy (&filtered_servers); + } else { + TRACE ("%s", "deprioritization: using filtered list of candidates"); + data->candidates_len = filtered_servers.len; + // `(void*)`: avoid MSVC error C4090: + // 'function': different 'const' qualifiers + memmove ((void *) data->candidates, + filtered_servers.data, + filtered_servers.len * filtered_servers.element_size); + _mongoc_array_destroy (&filtered_servers); + } +} + + +// Keep only suitable mongoses in the candidates array. +static void +_mongoc_filter_suitable_mongos (mongoc_suitable_data_t *data) +{ + size_t idx = 0u; + + while (idx < data->candidates_len) { + if (_mongoc_topology_description_server_is_candidate ( + data->candidates[idx]->type, + data->read_mode, + data->topology_type)) { + // All candidates in the latency window are suitable. + ++idx; + } else { + // Remove from list using swap+pop. + // Order doesn't matter; the list will be randomized in + // mongoc_topology_description_select prior to server selection. + data->candidates[idx] = data->candidates[--data->candidates_len]; + } + } +} + /* *------------------------------------------------------------------------- @@ -765,22 +841,23 @@ mongoc_topology_description_suitable_servers ( const mongoc_topology_description_t *topology, const mongoc_read_prefs_t *read_pref, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, int64_t local_threshold_ms) { - mongoc_suitable_data_t data; - const mongoc_set_t *td_servers = mc_tpld_servers_const (topology); const mongoc_read_mode_t given_read_mode = mongoc_read_prefs_get_mode (read_pref); const bool override_use_primary = _must_use_primary (topology, optype, given_read_mode); - data.primary = NULL; - data.topology_type = topology->type; - data.has_secondary = false; - data.candidates_len = 0; - data.candidates = bson_malloc0 (sizeof (mongoc_server_description_t *) * - td_servers->items_len); + mongoc_suitable_data_t data = { + .primary = NULL, + .topology_type = topology->type, + .has_secondary = false, + .candidates_len = 0, + .candidates = bson_malloc0 (sizeof (mongoc_server_description_t *) * + td_servers->items_len), + }; /* The "effective" read mode is the read mode that we should behave for, and * depends on the user's provided read mode, the type of operation that the @@ -843,8 +920,12 @@ mongoc_topology_description_suitable_servers ( if (data.read_mode == MONGOC_READ_SECONDARY_PREFERRED) { /* try read_mode SECONDARY */ - _mongoc_try_mode_secondary ( - set, topology, read_pref, must_use_primary, local_threshold_ms); + _mongoc_try_mode_secondary (set, + topology, + read_pref, + must_use_primary, + NULL, + local_threshold_ms); /* otherwise fall back to primary */ if (!set->len && data.primary) { @@ -894,11 +975,16 @@ mongoc_topology_description_suitable_servers ( } } - /* Sharded clusters -- - * All candidates in the latency window are suitable */ + // Sharded clusters -- if (topology->type == MONGOC_TOPOLOGY_SHARDED) { mongoc_set_for_each_const ( - td_servers, _mongoc_find_suitable_mongos_cb, &data); + td_servers, _mongoc_td_servers_to_candidates_array, &data); + + if (ds) { + _mongoc_filter_deprioritized_servers (&data, ds); + } + + _mongoc_filter_suitable_mongos (&data); } /* Load balanced clusters -- @@ -998,16 +1084,16 @@ mongoc_topology_description_select ( mongoc_ss_optype_t optype, const mongoc_read_prefs_t *read_pref, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, int64_t local_threshold_ms) { mongoc_array_t suitable_servers; - mongoc_server_description_t const *sd = NULL; - int rand_n; ENTRY; if (topology->type == MONGOC_TOPOLOGY_SINGLE) { - sd = mongoc_set_get_item_const (mc_tpld_servers_const (topology), 0); + mongoc_server_description_t const *const sd = + mongoc_set_get_item_const (mc_tpld_servers_const (topology), 0); if (optype == MONGOC_SS_AGGREGATE_WITH_WRITE && sd->max_wire_version < WIRE_VERSION_5_0) { @@ -1035,12 +1121,17 @@ mongoc_topology_description_select ( topology, read_pref, must_use_primary, + ds, local_threshold_ms); + + mongoc_server_description_t const *sd = NULL; + if (suitable_servers.len != 0) { - rand_n = _mongoc_rand_simple ((unsigned *) &topology->rand_seed); + const int rand_n = + _mongoc_rand_simple ((unsigned *) &topology->rand_seed); sd = _mongoc_array_index (&suitable_servers, mongoc_server_description_t *, - rand_n % suitable_servers.len); + (size_t) rand_n % suitable_servers.len); } _mongoc_array_destroy (&suitable_servers); @@ -2328,7 +2419,7 @@ mongoc_topology_description_has_readable_server ( /* local threshold argument doesn't matter */ return mongoc_topology_description_select ( - td, MONGOC_SS_READ, prefs, NULL, 0) != NULL; + td, MONGOC_SS_READ, prefs, NULL, NULL, 0) != NULL; } /* @@ -2355,7 +2446,7 @@ mongoc_topology_description_has_writable_server ( } return mongoc_topology_description_select ( - td, MONGOC_SS_WRITE, NULL, NULL, 0) != NULL; + td, MONGOC_SS_WRITE, NULL, NULL, NULL, 0) != NULL; } /* @@ -2602,3 +2693,21 @@ _mongoc_topology_description_clear_connection_pool ( mc_tpl_sd_increment_generation (sd, service_id); } + + +void +mongoc_deprioritized_servers_add_if_sharded ( + mongoc_deprioritized_servers_t *ds, + mongoc_topology_description_type_t topology_type, + const mongoc_server_description_t *sd) +{ + // In a sharded cluster, the server on which the operation failed MUST + // be provided to the server selection mechanism as a deprioritized + // server. + if (topology_type == MONGOC_TOPOLOGY_SHARDED) { + TRACE ("deprioritization: add to list: %s (id: %" PRIu32 ")", + sd->host.host_and_port, + sd->id); + mongoc_deprioritized_servers_add (ds, sd); + } +} diff --git a/src/libmongoc/src/mongoc/mongoc-topology-private.h b/src/libmongoc/src/mongoc/mongoc-topology-private.h index 6ccadfaaad..2681001a95 100644 --- a/src/libmongoc/src/mongoc/mongoc-topology-private.h +++ b/src/libmongoc/src/mongoc/mongoc-topology-private.h @@ -284,6 +284,8 @@ mongoc_topology_select (mongoc_topology_t *topology, * @param must_use_primary An optional output parameter. Server selection might * need to override the caller's read preferences' read mode to 'primary'. * Whether or not that takes place will be set through this pointer. + * @param ds A list of servers that should be selected only if there are no + * other suitable servers. * @param error An output parameter for any error information. * @return uint32_t A non-zero integer ID of the server description. In case of * error, sets `error` and returns zero. @@ -295,6 +297,7 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology, mongoc_ss_optype_t optype, const mongoc_read_prefs_t *read_prefs, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, bson_error_t *error); /** diff --git a/src/libmongoc/src/mongoc/mongoc-topology.c b/src/libmongoc/src/mongoc/mongoc-topology.c index 8763b7ce63..6b7dc1c376 100644 --- a/src/libmongoc/src/mongoc/mongoc-topology.c +++ b/src/libmongoc/src/mongoc/mongoc-topology.c @@ -1053,7 +1053,7 @@ mongoc_topology_select (mongoc_topology_t *topology, bson_error_t *error) { uint32_t server_id = mongoc_topology_select_server_id ( - topology, optype, read_prefs, must_use_primary, error); + topology, optype, read_prefs, must_use_primary, NULL, error); if (server_id) { /* new copy of the server description */ @@ -1098,6 +1098,7 @@ _mongoc_topology_select_server_id_loadbalanced (mongoc_topology_t *topology, MONGOC_SS_WRITE, NULL /* read prefs */, NULL /* chosen read mode */, + NULL /* deprioritized servers */, 0 /* local threshold */); if (!selected_server) { @@ -1166,6 +1167,7 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology, mongoc_ss_optype_t optype, const mongoc_read_prefs_t *read_prefs, bool *must_use_primary, + const mongoc_deprioritized_servers_t *ds, bson_error_t *error) { static const char *timeout_msg = @@ -1281,8 +1283,13 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology, goto done; } - selected_server = mongoc_topology_description_select ( - td.ptr, optype, read_prefs, must_use_primary, local_threshold_ms); + selected_server = + mongoc_topology_description_select (td.ptr, + optype, + read_prefs, + must_use_primary, + ds, + local_threshold_ms); if (selected_server) { server_id = selected_server->id; @@ -1328,7 +1335,7 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology, } selected_server = mongoc_topology_description_select ( - td.ptr, optype, read_prefs, must_use_primary, local_threshold_ms); + td.ptr, optype, read_prefs, must_use_primary, ds, local_threshold_ms); if (selected_server) { server_id = selected_server->id; @@ -1344,7 +1351,7 @@ mongoc_topology_select_server_id (mongoc_topology_t *topology, * occurred while we were waiting on the lock. */ mc_tpld_renew_ref (&td, topology); selected_server = mongoc_topology_description_select ( - td.ptr, optype, read_prefs, must_use_primary, local_threshold_ms); + td.ptr, optype, read_prefs, must_use_primary, ds, local_threshold_ms); if (selected_server) { server_id = selected_server->id; bson_mutex_unlock (&topology->tpld_modification_mtx); @@ -1613,11 +1620,13 @@ _mongoc_topology_pop_server_session (mongoc_topology_t *topology, if (!loadbalanced && timeout == MONGOC_NO_SESSIONS) { /* if needed, connect and check for session timeout again */ if (!mongoc_topology_description_has_data_node (td.ptr)) { - if (!mongoc_topology_select_server_id (topology, - MONGOC_SS_READ, - NULL /* read prefs */, - NULL /* chosen read mode */, - error)) { + if (!mongoc_topology_select_server_id ( + topology, + MONGOC_SS_READ, + NULL /* read prefs */, + NULL /* chosen read mode */, + NULL /* deprioritized servers */, + error)) { ss = NULL; goto done; } diff --git a/src/libmongoc/src/mongoc/mongoc-write-command.c b/src/libmongoc/src/mongoc/mongoc-write-command.c index 413b78df14..83c71179cf 100644 --- a/src/libmongoc/src/mongoc/mongoc-write-command.c +++ b/src/libmongoc/src/mongoc/mongoc-write-command.c @@ -772,12 +772,26 @@ _mongoc_write_opmsg (mongoc_write_command_t *command, /* each write command may be retried at most once */ is_retryable = false; - if (retry_server_stream) { - mongoc_server_stream_cleanup (retry_server_stream); - } + { + mongoc_deprioritized_servers_t *const ds = + mongoc_deprioritized_servers_new (); + + if (retry_server_stream) { + mongoc_deprioritized_servers_add_if_sharded ( + ds, + retry_server_stream->topology_type, + retry_server_stream->sd); + mongoc_server_stream_cleanup (retry_server_stream); + } else { + mongoc_deprioritized_servers_add_if_sharded ( + ds, server_stream->topology_type, server_stream->sd); + } + + retry_server_stream = mongoc_cluster_stream_for_writes ( + &client->cluster, cs, ds, NULL, &ignored_error); - retry_server_stream = mongoc_cluster_stream_for_writes ( - &client->cluster, cs, NULL, &ignored_error); + mongoc_deprioritized_servers_destroy (ds); + } if (retry_server_stream) { parts.assembled.server_stream = retry_server_stream; diff --git a/src/libmongoc/tests/json-test.c b/src/libmongoc/tests/json-test.c index 9fd63b53cd..497f819560 100644 --- a/src/libmongoc/tests/json-test.c +++ b/src/libmongoc/tests/json-test.c @@ -501,6 +501,7 @@ test_server_selection_logic_cb (bson_t *test) &topology, read_prefs, NULL, + NULL, MONGOC_TOPOLOGY_LOCAL_THRESHOLD_MS); /* check each server in expected_servers is in selected_servers */ @@ -1213,7 +1214,7 @@ execute_test (const json_test_config_t *config, /* Select a primary for testing */ server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); json_test_ctx_init (&ctx, test, client, db, collection, config); @@ -1905,7 +1906,7 @@ run_json_general_test (const json_test_config_t *config) /* clean up in case a previous test aborted */ server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); deactivate_fail_points (client, server_id); r = mongoc_client_command_with_opts (client, diff --git a/src/libmongoc/tests/mock_server/mock-server.c b/src/libmongoc/tests/mock_server/mock-server.c index 21c62c6daa..882ff37d0d 100644 --- a/src/libmongoc/tests/mock_server/mock-server.c +++ b/src/libmongoc/tests/mock_server/mock-server.c @@ -329,7 +329,6 @@ mock_server_run (mock_server_t *server) size_t bind_addr_len = 0; int r; - MONGOC_INFO ("Starting mock server on port %d.", server->port); ssock = mongoc_socket_new ( server->bind_opts.family ? server->bind_opts.family : AF_INET, SOCK_STREAM, @@ -386,6 +385,8 @@ mock_server_run (mock_server_t *server) return 0; } + MONGOC_INFO ("Starting mock server on port %d.", bound_port); + bson_mutex_lock (&server->mutex); server->sock = ssock; diff --git a/src/libmongoc/tests/test-mongoc-client-session.c b/src/libmongoc/tests/test-mongoc-client-session.c index b83874d9ca..68ab962a7b 100644 --- a/src/libmongoc/tests/test-mongoc-client-session.c +++ b/src/libmongoc/tests/test-mongoc-client-session.c @@ -196,7 +196,7 @@ _test_session_pool_timeout (bool pooled) * trigger discovery */ server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_READ, NULL, NULL, &error); + client->topology, MONGOC_SS_READ, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); /* diff --git a/src/libmongoc/tests/test-mongoc-cluster.c b/src/libmongoc/tests/test-mongoc-cluster.c index 68df19f47e..c0f7f82970 100644 --- a/src/libmongoc/tests/test-mongoc-cluster.c +++ b/src/libmongoc/tests/test-mongoc-cluster.c @@ -26,7 +26,7 @@ server_id_for_reads (mongoc_cluster_t *cluster) uint32_t id; server_stream = - mongoc_cluster_stream_for_reads (cluster, NULL, NULL, NULL, &error); + mongoc_cluster_stream_for_reads (cluster, NULL, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); id = server_stream->sd->id; @@ -1140,8 +1140,8 @@ future_command_private (mongoc_client_t *client) ASSERT (client); - server_stream = - mongoc_cluster_stream_for_writes (&client->cluster, NULL, NULL, &error); + server_stream = mongoc_cluster_stream_for_writes ( + &client->cluster, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); mongoc_cmd_parts_init ( @@ -1785,8 +1785,11 @@ test_cluster_stream_invalidation_single (void) /* Test "clearing the pool". This should invalidate existing server streams. */ - stream = mongoc_cluster_stream_for_writes ( - &client->cluster, NULL /* session */, NULL /* reply */, &error); + stream = mongoc_cluster_stream_for_writes (&client->cluster, + NULL /* session */, + NULL /* deprioritized servers */, + NULL /* reply */, + &error); ASSERT_OR_PRINT (stream, error); BSON_ASSERT (mongoc_cluster_stream_valid (&client->cluster, stream)); tdmod = mc_tpld_modify_begin (client->topology); @@ -1798,8 +1801,11 @@ test_cluster_stream_invalidation_single (void) /* Test closing the connection. This should invalidate existing server * streams. */ - stream = mongoc_cluster_stream_for_writes ( - &client->cluster, NULL /* session */, NULL /* reply */, &error); + stream = mongoc_cluster_stream_for_writes (&client->cluster, + NULL /* session */, + NULL /* deprioritized servers */, + NULL /* reply */, + &error); ASSERT_OR_PRINT (stream, error); BSON_ASSERT (mongoc_cluster_stream_valid (&client->cluster, stream)); mongoc_cluster_disconnect_node (&client->cluster, sd->id); @@ -1807,8 +1813,11 @@ test_cluster_stream_invalidation_single (void) mongoc_server_stream_cleanup (stream); /* Test that a new stream is considered valid. */ - stream = mongoc_cluster_stream_for_writes ( - &client->cluster, NULL /* session */, NULL /* reply */, &error); + stream = mongoc_cluster_stream_for_writes (&client->cluster, + NULL /* session */, + NULL /* deprioritized servers */, + NULL /* reply */, + &error); ASSERT_OR_PRINT (stream, error); BSON_ASSERT (mongoc_cluster_stream_valid (&client->cluster, stream)); mongoc_server_stream_cleanup (stream); @@ -1837,8 +1846,11 @@ test_cluster_stream_invalidation_pooled (void) /* Test "clearing the pool". This should invalidate existing server streams. */ - stream = mongoc_cluster_stream_for_writes ( - &client->cluster, NULL /* session */, NULL /* reply */, &error); + stream = mongoc_cluster_stream_for_writes (&client->cluster, + NULL /* session */, + NULL /* deprioritized servers */, + NULL /* reply */, + &error); ASSERT_OR_PRINT (stream, error); BSON_ASSERT (mongoc_cluster_stream_valid (&client->cluster, stream)); tdmod = mc_tpld_modify_begin (client->topology); @@ -1850,8 +1862,11 @@ test_cluster_stream_invalidation_pooled (void) /* Test closing the connection. This should invalidate existing server * streams. */ - stream = mongoc_cluster_stream_for_writes ( - &client->cluster, NULL /* session */, NULL /* reply */, &error); + stream = mongoc_cluster_stream_for_writes (&client->cluster, + NULL /* session */, + NULL /* deprioritized servers */, + NULL /* reply */, + &error); ASSERT_OR_PRINT (stream, error); BSON_ASSERT (mongoc_cluster_stream_valid (&client->cluster, stream)); mongoc_cluster_disconnect_node (&client->cluster, sd->id); @@ -1859,8 +1874,11 @@ test_cluster_stream_invalidation_pooled (void) mongoc_server_stream_cleanup (stream); /* Test that a new stream is considered valid. */ - stream = mongoc_cluster_stream_for_writes ( - &client->cluster, NULL /* session */, NULL /* reply */, &error); + stream = mongoc_cluster_stream_for_writes (&client->cluster, + NULL /* session */, + NULL /* deprioritized servers */, + NULL /* reply */, + &error); ASSERT_OR_PRINT (stream, error); BSON_ASSERT (mongoc_cluster_stream_valid (&client->cluster, stream)); mongoc_server_stream_cleanup (stream); diff --git a/src/libmongoc/tests/test-mongoc-cyrus.c b/src/libmongoc/tests/test-mongoc-cyrus.c index 0792d8c01a..3d5c67aebb 100644 --- a/src/libmongoc/tests/test-mongoc-cyrus.c +++ b/src/libmongoc/tests/test-mongoc-cyrus.c @@ -77,7 +77,7 @@ test_sasl_canonicalize_hostname (void *ctx) client = test_framework_new_default_client (); ss = mongoc_cluster_stream_for_reads ( - &client->cluster, NULL, NULL, NULL, &error); + &client->cluster, NULL, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (ss, error); BSON_ASSERT (_mongoc_sasl_get_canonicalized_name ( diff --git a/src/libmongoc/tests/test-mongoc-primary-stepdown.c b/src/libmongoc/tests/test-mongoc-primary-stepdown.c index 4e8a8d985c..73674d85ca 100644 --- a/src/libmongoc/tests/test-mongoc-primary-stepdown.c +++ b/src/libmongoc/tests/test-mongoc-primary-stepdown.c @@ -160,11 +160,13 @@ test_getmore_iteration (mongoc_client_t *client) /* Store the primary ID. After step down, the primary may be a different * server. We must execute serverStatus against the same server to check * connection counts. */ - primary_id = mongoc_topology_select_server_id (client->topology, - MONGOC_SS_WRITE, - NULL /* read prefs */, - NULL /* chosen read mode */, - &error); + primary_id = + mongoc_topology_select_server_id (client->topology, + MONGOC_SS_WRITE, + NULL /* read prefs */, + NULL /* chosen read mode */, + NULL /* deprioritized servers */, + &error); ASSERT_OR_PRINT (primary_id, error); conn_count = _connection_count (client, primary_id); @@ -241,11 +243,13 @@ test_not_primary_keep_pool (mongoc_client_t *client) /* Store the primary ID. After step down, the primary may be a different * server. We must execute serverStatus against the same server to check * connection counts. */ - primary_id = mongoc_topology_select_server_id (client->topology, - MONGOC_SS_WRITE, - NULL /* read prefs */, - NULL /* chosen read mode */, - &error); + primary_id = + mongoc_topology_select_server_id (client->topology, + MONGOC_SS_WRITE, + NULL /* read prefs */, + NULL /* chosen read mode */, + NULL /* deprioritized servers */, + &error); ASSERT_OR_PRINT (primary_id, error); conn_count = _connection_count (client, primary_id); res = mongoc_database_command_simple ( @@ -314,11 +318,13 @@ test_not_primary_reset_pool (mongoc_client_t *client) /* Store the primary ID. After step down, the primary may be a different * server. We must execute serverStatus against the same server to check * connection counts. */ - primary_id = mongoc_topology_select_server_id (client->topology, - MONGOC_SS_WRITE, - NULL /* read prefs */, - NULL /* chosen read mode */, - &error); + primary_id = + mongoc_topology_select_server_id (client->topology, + MONGOC_SS_WRITE, + NULL /* read prefs */, + NULL /* chosen read mode */, + NULL /* deprioritized servers */, + &error); ASSERT_OR_PRINT (primary_id, error); conn_count = _connection_count (client, primary_id); res = mongoc_database_command_simple ( @@ -391,11 +397,13 @@ test_shutdown_reset_pool (mongoc_client_t *client) /* Store the primary ID. After step down, the primary may be a different * server. We must execute serverStatus against the same server to check * connection counts. */ - primary_id = mongoc_topology_select_server_id (client->topology, - MONGOC_SS_WRITE, - NULL /* read prefs */, - NULL /* chosen read mode */, - &error); + primary_id = + mongoc_topology_select_server_id (client->topology, + MONGOC_SS_WRITE, + NULL /* read prefs */, + NULL /* chosen read mode */, + NULL /* deprioritized servers */, + &error); ASSERT_OR_PRINT (primary_id, error); conn_count = _connection_count (client, primary_id); res = mongoc_database_command_simple ( @@ -462,11 +470,13 @@ test_interrupted_shutdown_reset_pool (mongoc_client_t *client) /* Store the primary ID. After step down, the primary may be a different * server. We must execute serverStatus against the same server to check * connection counts. */ - primary_id = mongoc_topology_select_server_id (client->topology, - MONGOC_SS_WRITE, - NULL /* read prefs */, - NULL /* chosen read mode */, - &error); + primary_id = + mongoc_topology_select_server_id (client->topology, + MONGOC_SS_WRITE, + NULL /* read prefs */, + NULL /* chosen read mode */, + NULL /* deprioritized servers */, + &error); ASSERT_OR_PRINT (primary_id, error); conn_count = _connection_count (client, primary_id); res = mongoc_database_command_simple ( diff --git a/src/libmongoc/tests/test-mongoc-retryability-helpers.c b/src/libmongoc/tests/test-mongoc-retryability-helpers.c new file mode 100644 index 0000000000..12f5b9ac64 --- /dev/null +++ b/src/libmongoc/tests/test-mongoc-retryability-helpers.c @@ -0,0 +1,67 @@ +#include "test-mongoc-retryability-helpers.h" + +#include + +#include "mongoc-array-private.h" + +#include "test-conveniences.h" +#include "test-libmongoc.h" +#include "TestSuite.h" + +#include + +mongoc_array_t +_test_get_mongos_clients (const char **ports, size_t num_ports) +{ + bson_error_t error = {0}; + + mongoc_array_t clients; + _mongoc_array_init (&clients, sizeof (mongoc_client_t *)); + + for (size_t i = 0u; i < num_ports; ++i) { + const char *const port = ports[i]; + + char *const host_and_port = + bson_strdup_printf ("mongodb://localhost:%s", port); + char *const uri_str = + test_framework_add_user_password_from_env (host_and_port); + + mongoc_uri_t *const uri = mongoc_uri_new_with_error (uri_str, &error); + ASSERT_OR_PRINT (uri, error); + + mongoc_client_t *const client = + mongoc_client_new_from_uri_with_error (uri, &error); + ASSERT_OR_PRINT (client, error); + test_framework_set_ssl_opts (client); + + { + bson_t reply = BSON_INITIALIZER; + + ASSERT_OR_PRINT ( + mongoc_client_command_simple (client, + "admin", + tmp_bson ("{'hello': 1}"), + NULL, + &reply, + &error), + error); + + ASSERT_WITH_MSG ( + bson_has_field (&reply, "msg") && + strcmp (bson_lookup_utf8 (&reply, "msg"), "isdbgrid") == 0, + "expected a mongos on port %s", + port); + + bson_destroy (&reply); + } + + _mongoc_array_append_val (&clients, client); // Ownership transfer. + + mongoc_uri_destroy (uri); + + bson_free (host_and_port); + bson_free (uri_str); + } + + return clients; +} diff --git a/src/libmongoc/tests/test-mongoc-retryability-helpers.h b/src/libmongoc/tests/test-mongoc-retryability-helpers.h new file mode 100644 index 0000000000..4813598786 --- /dev/null +++ b/src/libmongoc/tests/test-mongoc-retryability-helpers.h @@ -0,0 +1,25 @@ +/* + * Copyright 2024 MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef TEST_MONGOC_RETRYABILITY_HELPERS_H +#define TEST_MONGOC_RETRYABILITY_HELPERS_H + +#include + +mongoc_array_t +_test_get_mongos_clients (const char **ports, size_t num_ports); + +#endif diff --git a/src/libmongoc/tests/test-mongoc-retryable-reads.c b/src/libmongoc/tests/test-mongoc-retryable-reads.c index 4e1c0a3f0e..97fb05679d 100644 --- a/src/libmongoc/tests/test-mongoc-retryable-reads.c +++ b/src/libmongoc/tests/test-mongoc-retryable-reads.c @@ -8,6 +8,7 @@ #include "mock_server/future.h" #include "mock_server/future-functions.h" #include "json-test-operations.h" +#include "test-mongoc-retryability-helpers.h" static bool retryable_reads_test_run_operation (json_test_ctx_t *ctx, @@ -104,7 +105,7 @@ test_cmd_helpers (void *ctx) /* clean up in case a previous test aborted */ server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); deactivate_fail_points (client, server_id); @@ -189,7 +190,7 @@ test_cmd_helpers (void *ctx) /* read/write agnostic command_simple_with_server_id helper must not retry. */ server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); _set_failpoint (client); ASSERT (!mongoc_client_command_simple_with_server_id ( @@ -251,7 +252,7 @@ test_retry_reads_off (void *ctx) /* clean up in case a previous test aborted */ server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); deactivate_fail_points (client, server_id); @@ -278,6 +279,370 @@ test_retry_reads_off (void *ctx) mongoc_client_destroy (client); } +typedef struct _test_retry_reads_sharded_on_other_mongos_ctx { + int count; + uint16_t ports[2]; +} test_retry_reads_sharded_on_other_mongos_ctx; + +static void +_test_retry_reads_sharded_on_other_mongos_cb ( + const mongoc_apm_command_failed_t *event) +{ + BSON_ASSERT_PARAM (event); + + test_retry_reads_sharded_on_other_mongos_ctx *const ctx = + (test_retry_reads_sharded_on_other_mongos_ctx *) + mongoc_apm_command_failed_get_context (event); + BSON_ASSERT (ctx); + + ASSERT_WITH_MSG (ctx->count < 2, + "expected at most two failpoints to trigger"); + + const mongoc_host_list_t *const host = + mongoc_apm_command_failed_get_host (event); + BSON_ASSERT (host); + BSON_ASSERT (!host->next); + ctx->ports[ctx->count++] = host->port; +} + +// Retryable Reads Are Retried on a Different mongos if One is Available +static void +test_retry_reads_sharded_on_other_mongos (void *_ctx) +{ + BSON_UNUSED (_ctx); + + bson_error_t error = {0}; + + // Create two clients `s0` and `s1` that each connect to a single mongos from + // the sharded cluster. They must not connect to the same mongos. + const char *ports[] = {"27017", "27018"}; + const size_t num_ports = sizeof (ports) / sizeof (*ports); + mongoc_array_t clients = _test_get_mongos_clients (ports, num_ports); + BSON_ASSERT (clients.len == 2u); + mongoc_client_t *const s0 = + _mongoc_array_index (&clients, mongoc_client_t *, 0u); + mongoc_client_t *const s1 = + _mongoc_array_index (&clients, mongoc_client_t *, 1u); + BSON_ASSERT (s0 && s1); + + // Deprioritization cannot be deterministically asserted by this test due to + // randomized selection from suitable servers. Repeat the test a few times to + // increase the likelihood of detecting incorrect deprioritization behavior. + for (int i = 0; i < 10; ++i) { + // Configure the following fail point for both s0 and s1: + { + bson_t *const command = + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': { 'times': 1 }," + " 'data': {" + " 'failCommands': ['find']," + " 'errorCode': 6" + " }" + "}"); + + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, "admin", command, NULL, NULL, &error), + error); + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s1, "admin", command, NULL, NULL, &error), + error); + } + + // Create a client client with `retryReads=true` that connects to the + // cluster with both mongoses used by `s0` and `s1` in the initial seed + // list. + mongoc_client_t *client = NULL; + { + const char *const host_and_port = + "mongodb://localhost:27017,localhost:27018/?retryReads=true"; + char *const uri_str = + test_framework_add_user_password_from_env (host_and_port); + mongoc_uri_t *const uri = mongoc_uri_new (uri_str); + + client = mongoc_client_new_from_uri_with_error (uri, &error); + ASSERT_OR_PRINT (client, error); + test_framework_set_ssl_opts (client); + + mongoc_uri_destroy (uri); + bson_free (uri_str); + } + BSON_ASSERT (client); + + { + test_retry_reads_sharded_on_other_mongos_ctx ctx = {0}; + + // Enable failed command event monitoring for client. + { + mongoc_apm_callbacks_t *const callbacks = + mongoc_apm_callbacks_new (); + mongoc_apm_set_command_failed_cb ( + callbacks, _test_retry_reads_sharded_on_other_mongos_cb); + mongoc_client_set_apm_callbacks (client, callbacks, &ctx); + mongoc_apm_callbacks_destroy (callbacks); + } + + // Execute a `find` command with `client`. Assert that the command + // failed. + { + mongoc_database_t *const db = + mongoc_client_get_database (client, "db"); + mongoc_collection_t *const coll = + mongoc_database_get_collection (db, "test"); + mongoc_cursor_t *const cursor = mongoc_collection_find_with_opts ( + coll, tmp_bson ("{}"), NULL, NULL); + const bson_t *reply = NULL; + ASSERT_WITH_MSG (!mongoc_cursor_next (cursor, &reply), + "expected find command to fail"); + ASSERT_WITH_MSG (mongoc_cursor_error (cursor, &error), + "expected find command to fail"); + mongoc_cursor_destroy (cursor); + mongoc_collection_destroy (coll); + mongoc_database_destroy (db); + } + + // Assert that two failed command events occurred. + ASSERT_WITH_MSG (ctx.count == 2, + "expected exactly 2 failpoints to trigger, but " + "observed %d with error: %s", + ctx.count, + error.message); + + // Assert that both events occurred on different mongoses. + ASSERT_WITH_MSG ((ctx.ports[0] == 27017 || ctx.ports[0] == 27018) && + (ctx.ports[1] == 27017 || ctx.ports[1] == 27018) && + (ctx.ports[0] != ctx.ports[1]), + "expected failpoints to trigger once on each mongos, " + "but observed failures on %d and %d", + ctx.ports[0], + ctx.ports[1]); + + mongoc_client_destroy (client); + } + + // Disable the fail point on both s0 and s1. + { + bson_t *const command = + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': 'off'" + "}"); + + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, "admin", command, NULL, NULL, &error), + error); + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s1, "admin", command, NULL, NULL, &error), + error); + } + } + + mongoc_client_destroy (s0); + mongoc_client_destroy (s1); + _mongoc_array_destroy (&clients); +} + +typedef struct _test_retry_reads_sharded_on_same_mongos_ctx { + int failed_count; + int succeeded_count; + uint16_t failed_port; + uint16_t succeeded_port; +} test_retry_reads_sharded_on_same_mongos_ctx; + +static void +_test_retry_reads_sharded_on_same_mongos_cb ( + test_retry_reads_sharded_on_same_mongos_ctx *ctx, + const mongoc_apm_command_failed_t *failed, + const mongoc_apm_command_succeeded_t *succeeded) +{ + BSON_ASSERT_PARAM (ctx); + BSON_ASSERT (failed || true); + BSON_ASSERT (succeeded || true); + + ASSERT_WITH_MSG ( + ctx->failed_count + ctx->succeeded_count < 2, + "expected at most two events, but observed %d failed and %d succeeded", + ctx->failed_count, + ctx->succeeded_count); + + if (failed) { + ctx->failed_count += 1; + + const mongoc_host_list_t *const host = + mongoc_apm_command_failed_get_host (failed); + BSON_ASSERT (host); + BSON_ASSERT (!host->next); + ctx->failed_port = host->port; + } + + if (succeeded) { + ctx->succeeded_count += 1; + + const mongoc_host_list_t *const host = + mongoc_apm_command_succeeded_get_host (succeeded); + BSON_ASSERT (host); + BSON_ASSERT (!host->next); + ctx->succeeded_port = host->port; + } +} + +static void +_test_retry_reads_sharded_on_same_mongos_failed_cb ( + const mongoc_apm_command_failed_t *event) +{ + _test_retry_reads_sharded_on_same_mongos_cb ( + mongoc_apm_command_failed_get_context (event), event, NULL); +} + +static void +_test_retry_reads_sharded_on_same_mongos_succeeded_cb ( + const mongoc_apm_command_succeeded_t *event) +{ + _test_retry_reads_sharded_on_same_mongos_cb ( + mongoc_apm_command_succeeded_get_context (event), NULL, event); +} + +// Retryable Reads Are Retried on the Same mongos if No Others are Available +static void +test_retry_reads_sharded_on_same_mongos (void *_ctx) +{ + BSON_UNUSED (_ctx); + + bson_error_t error = {0}; + + // Create a client `s0` that connects to a single mongos from the cluster. + const char *ports[] = {"27017"}; + const size_t num_ports = sizeof (ports) / sizeof (*ports); + mongoc_array_t clients = _test_get_mongos_clients (ports, num_ports); + BSON_ASSERT (clients.len == 1u); + mongoc_client_t *const s0 = + _mongoc_array_index (&clients, mongoc_client_t *, 0u); + BSON_ASSERT (s0); + + // Configure the following fail point for `s0`: + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, + "admin", + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': { 'times': 1 }," + " 'data': {" + " 'failCommands': ['find']," + " 'errorCode': 6" + " }" + "}"), + NULL, + NULL, + &error), + error); + + // Create a client client with `directConnection=false` (when not set by + // default) and `retryReads=true` that connects to the cluster using the same + // single mongos as `s0`. + mongoc_client_t *client = NULL; + { + const char *const host_and_port = + "mongodb://localhost:27017/?retryReads=true&directConnection=false"; + char *const uri_str = + test_framework_add_user_password_from_env (host_and_port); + mongoc_uri_t *const uri = mongoc_uri_new (uri_str); + + client = mongoc_client_new_from_uri_with_error (uri, &error); + ASSERT_OR_PRINT (client, error); + test_framework_set_ssl_opts (client); + + mongoc_uri_destroy (uri); + bson_free (uri_str); + } + BSON_ASSERT (client); + + { + test_retry_reads_sharded_on_same_mongos_ctx ctx = { + .failed_count = 0, + .succeeded_count = 0, + }; + + // Enable succeeded and failed command event monitoring for `client`. + { + mongoc_apm_callbacks_t *const callbacks = mongoc_apm_callbacks_new (); + mongoc_apm_set_command_failed_cb ( + callbacks, _test_retry_reads_sharded_on_same_mongos_failed_cb); + mongoc_apm_set_command_succeeded_cb ( + callbacks, _test_retry_reads_sharded_on_same_mongos_succeeded_cb); + mongoc_client_set_apm_callbacks (client, callbacks, &ctx); + mongoc_apm_callbacks_destroy (callbacks); + } + + // Execute a `find` command with `client`. Assert that the command + // succeeded. + { + mongoc_database_t *const db = + mongoc_client_get_database (client, "db"); + mongoc_collection_t *const coll = + mongoc_database_get_collection (db, "test"); + bson_t opts = BSON_INITIALIZER; + { + // Ensure drop from earlier is observed. + mongoc_read_concern_t *const rc = mongoc_read_concern_new (); + mongoc_read_concern_set_level (rc, + MONGOC_READ_CONCERN_LEVEL_MAJORITY); + mongoc_read_concern_append (rc, &opts); + mongoc_read_concern_destroy (rc); + } + mongoc_cursor_t *const cursor = + mongoc_collection_find_with_opts (coll, &opts, NULL, NULL); + const bson_t *reply = NULL; + (void) mongoc_cursor_next (cursor, &reply); + ASSERT_WITH_MSG (!mongoc_cursor_error (cursor, &error), + "expecting find to succeed, but observed error: %s", + error.message); + mongoc_cursor_destroy (cursor); + bson_destroy (&opts); + mongoc_collection_destroy (coll); + mongoc_database_destroy (db); + } + + // Avoid capturing additional events. + mongoc_client_set_apm_callbacks (client, NULL, NULL); + + // Assert that exactly one failed command event and one succeeded command + // event occurred. + ASSERT_WITH_MSG ( + ctx.failed_count == 1 && ctx.succeeded_count == 1, + "expected exactly one failed event and one succeeded " + "event, but observed %d failures and %d successes with error: %s", + ctx.failed_count, + ctx.succeeded_count, + ctx.succeeded_count > 1 ? "none" : error.message); + + // Assert that both events occurred on the same mongos. + ASSERT_WITH_MSG ( + ctx.failed_port == ctx.succeeded_port, + "expected failed and succeeded events on the same mongos, but " + "instead observed port %d (failed) and port %d (succeeded)", + ctx.failed_port, + ctx.succeeded_port); + + mongoc_client_destroy (client); + } + + // Disable the fail point on s0. + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, + "admin", + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': 'off'" + "}"), + NULL, + NULL, + &error), + error); + + mongoc_client_destroy (s0); + _mongoc_array_destroy (&clients); +} + /* *----------------------------------------------------------------------- * @@ -318,4 +683,22 @@ test_retryable_reads_install (TestSuite *suite) test_framework_skip_if_max_wire_version_less_than_7, test_framework_skip_if_mongos, test_framework_skip_if_no_failpoint); + TestSuite_AddFull (suite, + "/retryable_reads/sharded/on_other_mongos", + test_retry_reads_sharded_on_other_mongos, + NULL, + NULL, + test_framework_skip_if_not_mongos, + test_framework_skip_if_no_failpoint, + // `retryReads=true` is a 4.2+ feature. + test_framework_skip_if_max_wire_version_less_than_8); + TestSuite_AddFull (suite, + "/retryable_reads/sharded/on_same_mongos", + test_retry_reads_sharded_on_same_mongos, + NULL, + NULL, + test_framework_skip_if_not_mongos, + test_framework_skip_if_no_failpoint, + // `retryReads=true` is a 4.2+ feature. + test_framework_skip_if_max_wire_version_less_than_8); } diff --git a/src/libmongoc/tests/test-mongoc-retryable-writes.c b/src/libmongoc/tests/test-mongoc-retryable-writes.c index 2a97a1f42a..4c9805069f 100644 --- a/src/libmongoc/tests/test-mongoc-retryable-writes.c +++ b/src/libmongoc/tests/test-mongoc-retryable-writes.c @@ -8,6 +8,7 @@ #include "mock_server/future.h" #include "mock_server/future-functions.h" #include "json-test-operations.h" +#include "test-mongoc-retryability-helpers.h" static bool @@ -157,7 +158,7 @@ test_command_with_opts (void *ctx) /* clean up in case a previous test aborted */ server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); deactivate_fail_points (client, server_id); @@ -637,7 +638,7 @@ set_up_original_error_test (mongoc_apm_callbacks_t *callbacks, // clean up in case a previous test aborted server_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_id, error); deactivate_fail_points (client, server_id); @@ -924,6 +925,360 @@ test_bulk_retry_tracks_new_server (void *unused) mongoc_client_destroy (client); } +typedef struct _test_retry_writes_sharded_on_other_mongos_ctx { + int count; + uint16_t ports[2]; +} test_retry_writes_sharded_on_other_mongos_ctx; + +static void +_test_retry_writes_sharded_on_other_mongos_cb ( + const mongoc_apm_command_failed_t *event) +{ + BSON_ASSERT_PARAM (event); + + test_retry_writes_sharded_on_other_mongos_ctx *const ctx = + (test_retry_writes_sharded_on_other_mongos_ctx *) + mongoc_apm_command_failed_get_context (event); + BSON_ASSERT (ctx); + + ASSERT_WITH_MSG (ctx->count < 2, + "expected at most two failpoints to trigger"); + + const mongoc_host_list_t *const host = + mongoc_apm_command_failed_get_host (event); + BSON_ASSERT (host); + BSON_ASSERT (!host->next); + ctx->ports[ctx->count++] = host->port; +} + +// Test that in a sharded cluster writes are retried on a different mongos when +// one is available. +static void +retryable_writes_sharded_on_other_mongos (void *_ctx) +{ + BSON_UNUSED (_ctx); + + bson_error_t error = {0}; + + // Create two clients `s0` and `s1` that each connect to a single mongos from + // the sharded cluster. They must not connect to the same mongos. + const char *ports[] = {"27017", "27018"}; + const size_t num_ports = sizeof (ports) / sizeof (*ports); + mongoc_array_t clients = _test_get_mongos_clients (ports, num_ports); + BSON_ASSERT (clients.len == 2u); + mongoc_client_t *const s0 = + _mongoc_array_index (&clients, mongoc_client_t *, 0u); + mongoc_client_t *const s1 = + _mongoc_array_index (&clients, mongoc_client_t *, 1u); + BSON_ASSERT (s0 && s1); + + // Deprioritization cannot be deterministically asserted by this test due to + // randomized selection from suitable servers. Repeat the test a few times to + // increase the likelihood of detecting incorrect deprioritization behavior. + for (int i = 0; i < 10; ++i) { + // Configure the following fail point for both `s0` and `s1`: + { + bson_t *const command = + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': { 'times': 1 }," + " 'data': {" + " 'failCommands': ['insert']," + " 'errorCode': 6," + " 'errorLabels': ['RetryableWriteError']" + " }" + "}"); + + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, "admin", command, NULL, NULL, &error), + error); + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s1, "admin", command, NULL, NULL, &error), + error); + } + + // Create a client `client` with `retryWrites=true` that connects to the + // cluster with both mongoses used by `s0` and `s1` in the initial seed + // list. + mongoc_client_t *client = NULL; + { + const char *const host_and_port = + "mongodb://localhost:27017,localhost:27018/?retryWrites=true"; + char *const uri_str = + test_framework_add_user_password_from_env (host_and_port); + mongoc_uri_t *const uri = mongoc_uri_new (uri_str); + + client = mongoc_client_new_from_uri_with_error (uri, &error); + ASSERT_OR_PRINT (client, error); + test_framework_set_ssl_opts (client); + + mongoc_uri_destroy (uri); + bson_free (uri_str); + } + BSON_ASSERT (client); + + { + test_retry_writes_sharded_on_other_mongos_ctx ctx = {0}; + + // Enable failed command event monitoring for `client`. + { + mongoc_apm_callbacks_t *const callbacks = + mongoc_apm_callbacks_new (); + mongoc_apm_set_command_failed_cb ( + callbacks, _test_retry_writes_sharded_on_other_mongos_cb); + mongoc_client_set_apm_callbacks (client, callbacks, &ctx); + mongoc_apm_callbacks_destroy (callbacks); + } + + // Execute an `insert` command with `client`. Assert that the command + // failed. + { + mongoc_database_t *const db = + mongoc_client_get_database (client, "db"); + mongoc_collection_t *const coll = + mongoc_database_get_collection (db, "test"); + ASSERT_WITH_MSG ( + !mongoc_collection_insert_one ( + coll, tmp_bson ("{'x': 1}"), NULL, NULL, &error), + "expected insert command to fail"); + MONGOC_DEBUG ("insert error: %s", error.message); + mongoc_collection_destroy (coll); + mongoc_database_destroy (db); + } + + // Assert that two failed command events occurred. + ASSERT_WITH_MSG (ctx.count == 2, + "expected exactly 2 failpoints to trigger, but " + "observed %d with error: %s", + ctx.count, + error.message); + + // Assert that the failed command events occurred on different + // mongoses. + ASSERT_WITH_MSG ((ctx.ports[0] == 27017 || ctx.ports[0] == 27018) && + (ctx.ports[1] == 27017 || ctx.ports[1] == 27018) && + (ctx.ports[0] != ctx.ports[1]), + "expected failpoints to trigger once on each mongos, " + "but observed failures on %d and %d", + ctx.ports[0], + ctx.ports[1]); + + mongoc_client_destroy (client); + } + + // Disable the fail points. + { + bson_t *const command = + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': 'off'" + "}"); + + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, "admin", command, NULL, NULL, &error), + error); + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s1, "admin", command, NULL, NULL, &error), + error); + } + } + + mongoc_client_destroy (s0); + mongoc_client_destroy (s1); + _mongoc_array_destroy (&clients); +} + +typedef struct _test_retry_writes_sharded_on_same_mongos_ctx { + int failed_count; + int succeeded_count; + uint16_t failed_port; + uint16_t succeeded_port; +} test_retry_writes_sharded_on_same_mongos_ctx; + +static void +_test_retry_writes_sharded_on_same_mongos_cb ( + test_retry_writes_sharded_on_same_mongos_ctx *ctx, + const mongoc_apm_command_failed_t *failed, + const mongoc_apm_command_succeeded_t *succeeded) +{ + BSON_ASSERT_PARAM (ctx); + BSON_ASSERT (failed || true); + BSON_ASSERT (succeeded || true); + + ASSERT_WITH_MSG ( + ctx->failed_count + ctx->succeeded_count < 2, + "expected at most two events, but observed %d failed and %d succeeded", + ctx->failed_count, + ctx->succeeded_count); + + if (failed) { + ctx->failed_count += 1; + + const mongoc_host_list_t *const host = + mongoc_apm_command_failed_get_host (failed); + BSON_ASSERT (host); + BSON_ASSERT (!host->next); + ctx->failed_port = host->port; + } + + if (succeeded) { + ctx->succeeded_count += 1; + + const mongoc_host_list_t *const host = + mongoc_apm_command_succeeded_get_host (succeeded); + BSON_ASSERT (host); + BSON_ASSERT (!host->next); + ctx->succeeded_port = host->port; + } +} + +static void +_test_retry_writes_sharded_on_same_mongos_failed_cb ( + const mongoc_apm_command_failed_t *event) +{ + _test_retry_writes_sharded_on_same_mongos_cb ( + mongoc_apm_command_failed_get_context (event), event, NULL); +} + +static void +_test_retry_writes_sharded_on_same_mongos_succeeded_cb ( + const mongoc_apm_command_succeeded_t *event) +{ + _test_retry_writes_sharded_on_same_mongos_cb ( + mongoc_apm_command_succeeded_get_context (event), NULL, event); +} + +// Test that in a sharded cluster writes are retried on the same mongos when no +// others are available. +static void +retryable_writes_sharded_on_same_mongos (void *_ctx) +{ + BSON_UNUSED (_ctx); + + bson_error_t error = {0}; + + // Create a client `s0` that connects to a single mongos from the cluster. + const char *ports[] = {"27017"}; + const size_t num_ports = sizeof (ports) / sizeof (*ports); + mongoc_array_t clients = _test_get_mongos_clients (ports, num_ports); + BSON_ASSERT (clients.len == 1u); + mongoc_client_t *const s0 = + _mongoc_array_index (&clients, mongoc_client_t *, 0u); + BSON_ASSERT (s0); + + // Configure the following fail point for `s0`: + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, + "admin", + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': { 'times': 1 }," + " 'data': {" + " 'failCommands': ['insert']," + " 'errorCode': 6," + " 'errorLabels': ['RetryableWriteError']" + " }" + "}"), + NULL, + NULL, + &error), + error); + + // Create a client client with `directConnection=false` (when not set by + // default) and `retryWrites=true` that connects to the cluster using the + // same single mongos as `s0`. + mongoc_client_t *client = NULL; + { + const char *const host_and_port = + "mongodb://localhost:27017/" + "?retryWrites=true&directConnection=false"; + char *const uri_str = + test_framework_add_user_password_from_env (host_and_port); + mongoc_uri_t *const uri = mongoc_uri_new (uri_str); + + client = mongoc_client_new_from_uri_with_error (uri, &error); + ASSERT_OR_PRINT (client, error); + test_framework_set_ssl_opts (client); + + mongoc_uri_destroy (uri); + bson_free (uri_str); + } + BSON_ASSERT (client); + + { + test_retry_writes_sharded_on_same_mongos_ctx ctx = { + .failed_count = 0, + .succeeded_count = 0, + }; + + // Enable succeeded and failed command event monitoring for `client`. + { + mongoc_apm_callbacks_t *const callbacks = mongoc_apm_callbacks_new (); + mongoc_apm_set_command_failed_cb ( + callbacks, _test_retry_writes_sharded_on_same_mongos_failed_cb); + mongoc_apm_set_command_succeeded_cb ( + callbacks, _test_retry_writes_sharded_on_same_mongos_succeeded_cb); + mongoc_client_set_apm_callbacks (client, callbacks, &ctx); + mongoc_apm_callbacks_destroy (callbacks); + } + + // Execute an `insert` command with `client`. Assert that the command + // succeeded. + { + mongoc_database_t *const db = + mongoc_client_get_database (client, "db"); + mongoc_collection_t *const coll = + mongoc_database_get_collection (db, "test"); + ASSERT_WITH_MSG (mongoc_collection_insert_one ( + coll, tmp_bson ("{'x': 1}"), NULL, NULL, &error), + "expecting insert to succeed, but observed error: %s", + error.message); + mongoc_collection_destroy (coll); + mongoc_database_destroy (db); + } + + // Avoid capturing additional events. + mongoc_client_set_apm_callbacks (client, NULL, NULL); + + // Assert that exactly one failed command event and one succeeded + // command event occurred. + ASSERT_WITH_MSG ( + ctx.failed_count == 1 && ctx.succeeded_count == 1, + "expected exactly one failed event and one succeeded " + "event, but observed %d failures and %d successes with error: %s", + ctx.failed_count, + ctx.succeeded_count, + ctx.succeeded_count > 1 ? "none" : error.message); + + // Assert that both events occurred on the same mongos. + ASSERT_WITH_MSG ( + ctx.failed_port == ctx.succeeded_port, + "expected failed and succeeded events on the same mongos, but " + "instead observed port %d (failed) and port %d (succeeded)", + ctx.failed_port, + ctx.succeeded_port); + + mongoc_client_destroy (client); + } + + // Disable the fail point. + ASSERT_OR_PRINT (mongoc_client_command_simple ( + s0, + "admin", + tmp_bson ("{" + " 'configureFailPoint': 'failCommand'," + " 'mode': 'off'" + "}"), + NULL, + NULL, + &error), + error); + + mongoc_client_destroy (s0); + _mongoc_array_destroy (&clients); +} + + void test_retryable_writes_install (TestSuite *suite) { @@ -1001,4 +1356,24 @@ test_retryable_writes_install (TestSuite *suite) test_framework_skip_if_not_replset, test_framework_skip_if_max_wire_version_less_than_17, test_framework_skip_if_no_crypto); + TestSuite_AddFull (suite, + "/retryable_writes/prose_test_4", + retryable_writes_sharded_on_other_mongos, + NULL, + NULL, + test_framework_skip_if_not_mongos, + test_framework_skip_if_no_failpoint, + // `errorLabels` is a 4.3.1+ feature. + test_framework_skip_if_max_wire_version_less_than_9, + test_framework_skip_if_no_crypto); + TestSuite_AddFull (suite, + "/retryable_writes/prose_test_5", + retryable_writes_sharded_on_same_mongos, + NULL, + NULL, + test_framework_skip_if_not_mongos, + test_framework_skip_if_no_failpoint, + // `errorLabels` is a 4.3.1+ feature. + test_framework_skip_if_max_wire_version_less_than_9, + test_framework_skip_if_no_crypto); } diff --git a/src/libmongoc/tests/test-mongoc-topology.c b/src/libmongoc/tests/test-mongoc-topology.c index 0302790760..b9efb73763 100644 --- a/src/libmongoc/tests/test-mongoc-topology.c +++ b/src/libmongoc/tests/test-mongoc-topology.c @@ -181,7 +181,7 @@ test_topology_client_creation (void) /* ensure that we are sharing streams with the client */ server_stream = mongoc_cluster_stream_for_reads ( - &client_a->cluster, NULL, NULL, NULL, &error); + &client_a->cluster, NULL, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); node = mongoc_topology_scanner_get_node (client_a->topology->scanner, @@ -495,7 +495,7 @@ _test_topology_invalidate_server (bool pooled) /* call explicitly */ server_stream = mongoc_cluster_stream_for_reads ( - &client->cluster, NULL, NULL, NULL, &error); + &client->cluster, NULL, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); sd = server_stream->sd; id = server_stream->sd->id; @@ -602,7 +602,7 @@ test_invalid_cluster_node (void *ctx) /* load stream into cluster */ server_stream = mongoc_cluster_stream_for_reads ( - &client->cluster, NULL, NULL, NULL, &error); + &client->cluster, NULL, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); id = server_stream->sd->id; mongoc_server_stream_cleanup (server_stream); @@ -675,7 +675,7 @@ test_max_wire_version_race_condition (void *ctx) /* load stream into cluster */ server_stream = mongoc_cluster_stream_for_reads ( - &client->cluster, NULL, NULL, NULL, &error); + &client->cluster, NULL, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); id = server_stream->sd->id; mongoc_server_stream_cleanup (server_stream); diff --git a/src/libmongoc/tests/test-mongoc-transactions.c b/src/libmongoc/tests/test-mongoc-transactions.c index 9c69de58ed..b4553d8797 100644 --- a/src/libmongoc/tests/test-mongoc-transactions.c +++ b/src/libmongoc/tests/test-mongoc-transactions.c @@ -983,7 +983,7 @@ test_selected_server_is_pinned_to_mongos (void *ctx) BSON_ASSERT (0 == mongoc_client_session_get_server_id (session)); expected_id = mongoc_topology_select_server_id ( - client->topology, MONGOC_SS_WRITE, NULL, NULL, &error); + client->topology, MONGOC_SS_WRITE, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (expected_id, error); /* session should still be unpinned */ diff --git a/src/libmongoc/tests/test-mongoc-write-commands.c b/src/libmongoc/tests/test-mongoc-write-commands.c index 1302f26f15..5134add6d4 100644 --- a/src/libmongoc/tests/test-mongoc-write-commands.c +++ b/src/libmongoc/tests/test-mongoc-write-commands.c @@ -54,8 +54,8 @@ test_split_insert (void) _mongoc_write_command_insert_append (&command, docs[i]); } - server_stream = - mongoc_cluster_stream_for_writes (&client->cluster, NULL, NULL, &error); + server_stream = mongoc_cluster_stream_for_writes ( + &client->cluster, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); _mongoc_write_command_execute (&command, client, @@ -125,8 +125,8 @@ test_invalid_write_concern (void) _mongoc_write_command_init_insert ( &command, doc, NULL, write_flags, ++client->cluster.operation_id); _mongoc_write_result_init (&result); - server_stream = - mongoc_cluster_stream_for_writes (&client->cluster, NULL, NULL, &error); + server_stream = mongoc_cluster_stream_for_writes ( + &client->cluster, NULL, NULL, NULL, &error); ASSERT_OR_PRINT (server_stream, error); _mongoc_write_command_execute (&command, client,