Skip to content

Commit

Permalink
CDRIVER-4099 Support mongos redirection during retryable operations (#…
Browse files Browse the repository at this point in the history
…1529)

* Fix position of mock server port number info log message

* Add tests for mongos deprioritization during retryable operations

* Add support for mongos deprioritization during retryable operations
  • Loading branch information
eramongodb authored Feb 13, 2024
1 parent 9fa6f2e commit acc9ea8
Show file tree
Hide file tree
Showing 30 changed files with 1,370 additions and 151 deletions.
2 changes: 2 additions & 0 deletions src/libmongoc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ set (SOURCES ${SOURCES}
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-cursor-array.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-database.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-error.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-deprioritized-servers.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-flags.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-find-and-modify.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-generation-map.c
Expand Down Expand Up @@ -1039,6 +1040,7 @@ set (test-libmongoc-sources
${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-concern.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-prefs.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-write-concern.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryability-helpers.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-reads.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-writes.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-sample-commands.c
Expand Down
2 changes: 1 addition & 1 deletion src/libmongoc/src/mongoc/mongoc-bulk-operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ mongoc_bulk_operation_execute (mongoc_bulk_operation_t *bulk, /* IN */
error);
} else {
server_stream = mongoc_cluster_stream_for_writes (
cluster, bulk->session, reply, error);
cluster, bulk->session, NULL, reply, error);
}

if (!server_stream) {
Expand Down
8 changes: 6 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-change-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,12 @@ _make_cursor (mongoc_change_stream_t *stream)
goto cleanup;
}

server_stream = mongoc_cluster_stream_for_reads (
&stream->client->cluster, stream->read_prefs, cs, &reply, &stream->err);
server_stream = mongoc_cluster_stream_for_reads (&stream->client->cluster,
stream->read_prefs,
cs,
NULL,
&reply,
&stream->err);
if (!server_stream) {
bson_destroy (&stream->err_doc);
bson_copy_to (&reply, &stream->err_doc);
Expand Down
8 changes: 6 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-client-session.c
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,12 @@ mongoc_client_session_start_transaction (mongoc_client_session_t *session,
BSON_ASSERT (session);

ret = true;
server_stream = mongoc_cluster_stream_for_writes (
&session->client->cluster, session, NULL /* reply */, error);
server_stream =
mongoc_cluster_stream_for_writes (&session->client->cluster,
session,
NULL /* deprioritized servers */,
NULL /* reply */,
error);
if (!server_stream) {
ret = false;
GOTO (done);
Expand Down
74 changes: 52 additions & 22 deletions src/libmongoc/src/mongoc/mongoc-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1727,15 +1727,26 @@ _mongoc_client_retryable_write_command_with_stream (
_mongoc_write_error_get_type (reply) == MONGOC_WRITE_ERR_RETRY) {
bson_error_t ignored_error;

/* each write command may be retried at most once */
// The write command may be retried at most once.
is_retryable = false;

if (retry_server_stream) {
mongoc_server_stream_cleanup (retry_server_stream);
}
{
mongoc_deprioritized_servers_t *const ds =
mongoc_deprioritized_servers_new ();

retry_server_stream = mongoc_cluster_stream_for_writes (
&client->cluster, parts->assembled.session, NULL, &ignored_error);
mongoc_deprioritized_servers_add_if_sharded (
ds, server_stream->topology_type, server_stream->sd);

BSON_ASSERT (!retry_server_stream);
retry_server_stream =
mongoc_cluster_stream_for_writes (&client->cluster,
parts->assembled.session,
ds,
NULL,
&ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}

if (retry_server_stream) {
parts->assembled.server_stream = retry_server_stream;
Expand Down Expand Up @@ -1820,16 +1831,29 @@ _mongoc_client_retryable_read_command_with_stream (
/* each read command may be retried at most once */
is_retryable = false;

if (retry_server_stream) {
mongoc_server_stream_cleanup (retry_server_stream);
}
{
mongoc_deprioritized_servers_t *const ds =
mongoc_deprioritized_servers_new ();

retry_server_stream =
mongoc_cluster_stream_for_reads (&client->cluster,
parts->read_prefs,
parts->assembled.session,
NULL,
&ignored_error);
if (retry_server_stream) {
mongoc_deprioritized_servers_add_if_sharded (
ds, retry_server_stream->topology_type, retry_server_stream->sd);
mongoc_server_stream_cleanup (retry_server_stream);
} else {
mongoc_deprioritized_servers_add_if_sharded (
ds, server_stream->topology_type, server_stream->sd);
}

retry_server_stream =
mongoc_cluster_stream_for_reads (&client->cluster,
parts->read_prefs,
parts->assembled.session,
ds,
NULL,
&ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}

if (retry_server_stream) {
parts->assembled.server_stream = retry_server_stream;
Expand Down Expand Up @@ -1918,8 +1942,8 @@ mongoc_client_command_simple (mongoc_client_t *client,
* configuration. The generic command method SHOULD allow an optional read
* preference argument."
*/
server_stream =
mongoc_cluster_stream_for_reads (cluster, read_prefs, NULL, reply, error);
server_stream = mongoc_cluster_stream_for_reads (
cluster, read_prefs, NULL, NULL, reply, error);

if (server_stream) {
ret = _mongoc_client_command_with_stream (
Expand Down Expand Up @@ -2074,10 +2098,10 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
}
} else if (parts.is_write_command) {
server_stream =
mongoc_cluster_stream_for_writes (cluster, cs, reply_ptr, error);
mongoc_cluster_stream_for_writes (cluster, cs, NULL, reply_ptr, error);
} else {
server_stream =
mongoc_cluster_stream_for_reads (cluster, prefs, cs, reply_ptr, error);
server_stream = mongoc_cluster_stream_for_reads (
cluster, prefs, cs, NULL, reply_ptr, error);
}

if (!server_stream) {
Expand Down Expand Up @@ -2622,6 +2646,7 @@ mongoc_client_kill_cursor (mongoc_client_t *client, int64_t cursor_id)
MONGOC_SS_WRITE,
read_prefs,
NULL /* chosen read mode */,
NULL /* deprioritized servers */,
topology->local_threshold_msec);

if (selected_server) {
Expand Down Expand Up @@ -3060,8 +3085,13 @@ _mongoc_client_end_sessions (mongoc_client_t *client)

while (!mongoc_server_session_pool_is_empty (t->session_pool)) {
prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED);
server_id = mongoc_topology_select_server_id (
t, MONGOC_SS_READ, prefs, NULL /* chosen read mode */, &error);
server_id =
mongoc_topology_select_server_id (t,
MONGOC_SS_READ,
prefs,
NULL /* chosen read mode */,
NULL /* deprioritized servers */,
&error);

mongoc_read_prefs_destroy (prefs);
if (!server_id) {
Expand Down
3 changes: 3 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-cluster-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mongoc-scram-private.h"
#include "mongoc-cmd-private.h"
#include "mongoc-crypto-private.h"
#include "mongoc-deprioritized-servers-private.h"

BSON_BEGIN_DECLS

Expand Down Expand Up @@ -121,6 +122,7 @@ mongoc_server_stream_t *
mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error);

Expand All @@ -138,6 +140,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
mongoc_server_stream_t *
mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error);

Expand Down
25 changes: 18 additions & 7 deletions src/libmongoc/src/mongoc/mongoc-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2794,6 +2794,7 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
mongoc_ss_optype_t optype,
const mongoc_read_prefs_t *read_prefs,
bool *must_use_primary,
const mongoc_deprioritized_servers_t *ds,
bson_error_t *error)
{
BSON_ASSERT (cs || true);
Expand All @@ -2808,14 +2809,14 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
server_id = cs->server_id;
if (!server_id) {
server_id = mongoc_topology_select_server_id (
topology, optype, read_prefs, must_use_primary, error);
topology, optype, read_prefs, must_use_primary, ds, error);
if (server_id) {
_mongoc_client_session_pin (cs, server_id);
}
}
} else {
server_id = mongoc_topology_select_server_id (
topology, optype, read_prefs, must_use_primary, error);
topology, optype, read_prefs, must_use_primary, ds, error);
/* Transactions Spec: Additionally, any non-transaction operation using a
* pinned ClientSession MUST unpin the session and the operation MUST
* perform normal server selection. */
Expand Down Expand Up @@ -2851,6 +2852,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
bool is_retryable,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error)
{
Expand All @@ -2870,7 +2872,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
BSON_ASSERT (cluster);

server_id = _mongoc_cluster_select_server_id (
cs, topology, optype, read_prefs, &must_use_primary, error);
cs, topology, optype, read_prefs, &must_use_primary, ds, error);

if (!server_id) {
if (reply) {
Expand All @@ -2883,7 +2885,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
if (!mongoc_cluster_check_interval (cluster, server_id)) {
/* Server Selection Spec: try once more */
server_id = _mongoc_cluster_select_server_id (
cs, topology, optype, read_prefs, &must_use_primary, error);
cs, topology, optype, read_prefs, &must_use_primary, ds, error);

if (!server_id) {
if (reply) {
Expand Down Expand Up @@ -2967,6 +2969,7 @@ mongoc_server_stream_t *
mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error)
{
Expand All @@ -2979,21 +2982,28 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const bool is_retryable = mongoc_uri_get_option_as_bool (
cluster->uri, MONGOC_URI_RETRYREADS, MONGOC_DEFAULT_RETRYREADS);

return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_READ, prefs_override, cs, is_retryable, reply, error);
return _mongoc_cluster_stream_for_optype (cluster,
MONGOC_SS_READ,
prefs_override,
cs,
is_retryable,
ds,
reply,
error);
}

mongoc_server_stream_t *
mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error)
{
const bool is_retryable = mongoc_uri_get_option_as_bool (
cluster->uri, MONGOC_URI_RETRYWRITES, MONGOC_DEFAULT_RETRYWRITES);

return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_WRITE, NULL, cs, is_retryable, reply, error);
cluster, MONGOC_SS_WRITE, NULL, cs, is_retryable, ds, reply, error);
}

mongoc_server_stream_t *
Expand All @@ -3015,6 +3025,7 @@ mongoc_cluster_stream_for_aggr_with_write (
prefs_override,
cs,
is_retryable,
NULL,
reply,
error);
}
Expand Down
30 changes: 24 additions & 6 deletions src/libmongoc/src/mongoc/mongoc-collection.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ _mongoc_collection_write_command_execute (
ENTRY;

server_stream = mongoc_cluster_stream_for_writes (
&collection->client->cluster, cs, NULL, &result->error);
&collection->client->cluster, cs, NULL, NULL, &result->error);

if (!server_stream) {
/* result->error has been filled out */
Expand Down Expand Up @@ -97,6 +97,7 @@ _mongoc_collection_write_command_execute_idl (
server_stream =
mongoc_cluster_stream_for_writes (&collection->client->cluster,
crud->client_session,
NULL,
&reply,
&result->error);

Expand Down Expand Up @@ -815,7 +816,7 @@ mongoc_collection_estimated_document_count (
BSON_ASSERT_PARAM (coll);

server_stream = mongoc_cluster_stream_for_reads (
&coll->client->cluster, read_prefs, NULL, reply, error);
&coll->client->cluster, read_prefs, NULL, NULL, reply, error);

if (opts && bson_has_field (opts, "sessionId")) {
bson_set_error (error,
Expand Down Expand Up @@ -1625,7 +1626,7 @@ mongoc_collection_create_index_with_opts (mongoc_collection_t *collection,
bson_append_array_builder_end (&cmd, ar);

server_stream = mongoc_cluster_stream_for_writes (
&collection->client->cluster, parsed.client_session, reply, error);
&collection->client->cluster, parsed.client_session, NULL, reply, error);

if (!server_stream) {
reply_initialized = true;
Expand Down Expand Up @@ -2245,6 +2246,7 @@ _mongoc_collection_update_or_replace (mongoc_collection_t *collection,
server_stream =
mongoc_cluster_stream_for_writes (&collection->client->cluster,
update_opts->crud.client_session,
NULL,
reply,
error);

Expand Down Expand Up @@ -3396,7 +3398,7 @@ mongoc_collection_find_and_modify_with_opts (
}

server_stream = mongoc_cluster_stream_for_writes (
cluster, appended_opts.client_session, &ss_reply, error);
cluster, appended_opts.client_session, NULL, &ss_reply, error);

if (!server_stream) {
bson_concat (reply_ptr, &ss_reply);
Expand Down Expand Up @@ -3566,8 +3568,23 @@ mongoc_collection_find_and_modify_with_opts (

/* each write command may be retried at most once */
is_retryable = false;
retry_server_stream = mongoc_cluster_stream_for_writes (
cluster, parts.assembled.session, NULL /* reply */, &ignored_error);

{
mongoc_deprioritized_servers_t *const ds =
mongoc_deprioritized_servers_new ();

mongoc_deprioritized_servers_add_if_sharded (
ds, server_stream->topology_type, server_stream->sd);

retry_server_stream =
mongoc_cluster_stream_for_writes (cluster,
parts.assembled.session,
ds,
NULL /* reply */,
&ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}

if (retry_server_stream) {
parts.assembled.server_stream = retry_server_stream;
Expand Down Expand Up @@ -3782,6 +3799,7 @@ mongoc_collection_create_indexes_with_opts (mongoc_collection_t *collection,
server_stream =
mongoc_cluster_stream_for_writes (&collection->client->cluster,
NULL /* mongoc_client_session_t */,
NULL /* deprioritized servers */,
reply_ptr,
error);
if (server_stream->sd->max_wire_version < WIRE_VERSION_4_4) {
Expand Down
Loading

0 comments on commit acc9ea8

Please sign in to comment.