Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDRIVER-4099 Support mongos redirection during retryable operations #1529

Merged
merged 9 commits into from
Feb 13, 2024
2 changes: 2 additions & 0 deletions src/libmongoc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ set (SOURCES ${SOURCES}
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-cursor-array.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-database.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-error.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-deprioritized-servers.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-flags.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-find-and-modify.c
${PROJECT_SOURCE_DIR}/src/mongoc/mongoc-generation-map.c
Expand Down Expand Up @@ -1039,6 +1040,7 @@ set (test-libmongoc-sources
${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-concern.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-prefs.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-read-write-concern.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryability-helpers.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-reads.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-retryable-writes.c
${PROJECT_SOURCE_DIR}/tests/test-mongoc-sample-commands.c
Expand Down
2 changes: 1 addition & 1 deletion src/libmongoc/src/mongoc/mongoc-bulk-operation.c
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ mongoc_bulk_operation_execute (mongoc_bulk_operation_t *bulk, /* IN */
error);
} else {
server_stream = mongoc_cluster_stream_for_writes (
cluster, bulk->session, reply, error);
cluster, bulk->session, NULL, reply, error);
}

if (!server_stream) {
Expand Down
8 changes: 6 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-change-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,12 @@ _make_cursor (mongoc_change_stream_t *stream)
goto cleanup;
}

server_stream = mongoc_cluster_stream_for_reads (
&stream->client->cluster, stream->read_prefs, cs, &reply, &stream->err);
server_stream = mongoc_cluster_stream_for_reads (&stream->client->cluster,
stream->read_prefs,
cs,
NULL,
&reply,
&stream->err);
if (!server_stream) {
bson_destroy (&stream->err_doc);
bson_copy_to (&reply, &stream->err_doc);
Expand Down
8 changes: 6 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-client-session.c
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,12 @@ mongoc_client_session_start_transaction (mongoc_client_session_t *session,
BSON_ASSERT (session);

ret = true;
server_stream = mongoc_cluster_stream_for_writes (
&session->client->cluster, session, NULL /* reply */, error);
server_stream =
mongoc_cluster_stream_for_writes (&session->client->cluster,
session,
NULL /* deprioritized servers */,
NULL /* reply */,
error);
if (!server_stream) {
ret = false;
GOTO (done);
Expand Down
74 changes: 52 additions & 22 deletions src/libmongoc/src/mongoc/mongoc-client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1727,15 +1727,26 @@ _mongoc_client_retryable_write_command_with_stream (
_mongoc_write_error_get_type (reply) == MONGOC_WRITE_ERR_RETRY) {
bson_error_t ignored_error;

/* each write command may be retried at most once */
// The write command may be retried at most once.
is_retryable = false;

if (retry_server_stream) {
mongoc_server_stream_cleanup (retry_server_stream);
}
{
mongoc_deprioritized_servers_t *const ds =
mongoc_deprioritized_servers_new ();

retry_server_stream = mongoc_cluster_stream_for_writes (
&client->cluster, parts->assembled.session, NULL, &ignored_error);
mongoc_deprioritized_servers_add_if_sharded (
ds, server_stream->topology_type, server_stream->sd);

BSON_ASSERT (!retry_server_stream);
retry_server_stream =
mongoc_cluster_stream_for_writes (&client->cluster,
parts->assembled.session,
ds,
NULL,
&ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}

if (retry_server_stream) {
parts->assembled.server_stream = retry_server_stream;
Expand Down Expand Up @@ -1820,16 +1831,29 @@ _mongoc_client_retryable_read_command_with_stream (
/* each read command may be retried at most once */
is_retryable = false;

if (retry_server_stream) {
mongoc_server_stream_cleanup (retry_server_stream);
}
{
mongoc_deprioritized_servers_t *const ds =
mongoc_deprioritized_servers_new ();

retry_server_stream =
mongoc_cluster_stream_for_reads (&client->cluster,
parts->read_prefs,
parts->assembled.session,
NULL,
&ignored_error);
if (retry_server_stream) {
mongoc_deprioritized_servers_add_if_sharded (
ds, retry_server_stream->topology_type, retry_server_stream->sd);
mongoc_server_stream_cleanup (retry_server_stream);
} else {
mongoc_deprioritized_servers_add_if_sharded (
ds, server_stream->topology_type, server_stream->sd);
}
kevinAlbs marked this conversation as resolved.
Show resolved Hide resolved

retry_server_stream =
mongoc_cluster_stream_for_reads (&client->cluster,
parts->read_prefs,
parts->assembled.session,
ds,
NULL,
&ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}

if (retry_server_stream) {
parts->assembled.server_stream = retry_server_stream;
Expand Down Expand Up @@ -1918,8 +1942,8 @@ mongoc_client_command_simple (mongoc_client_t *client,
* configuration. The generic command method SHOULD allow an optional read
* preference argument."
*/
server_stream =
mongoc_cluster_stream_for_reads (cluster, read_prefs, NULL, reply, error);
server_stream = mongoc_cluster_stream_for_reads (
cluster, read_prefs, NULL, NULL, reply, error);

if (server_stream) {
ret = _mongoc_client_command_with_stream (
Expand Down Expand Up @@ -2074,10 +2098,10 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
}
} else if (parts.is_write_command) {
server_stream =
mongoc_cluster_stream_for_writes (cluster, cs, reply_ptr, error);
mongoc_cluster_stream_for_writes (cluster, cs, NULL, reply_ptr, error);
} else {
server_stream =
mongoc_cluster_stream_for_reads (cluster, prefs, cs, reply_ptr, error);
server_stream = mongoc_cluster_stream_for_reads (
cluster, prefs, cs, NULL, reply_ptr, error);
}

if (!server_stream) {
Expand Down Expand Up @@ -2622,6 +2646,7 @@ mongoc_client_kill_cursor (mongoc_client_t *client, int64_t cursor_id)
MONGOC_SS_WRITE,
read_prefs,
NULL /* chosen read mode */,
NULL /* deprioritized servers */,
topology->local_threshold_msec);

if (selected_server) {
Expand Down Expand Up @@ -3060,8 +3085,13 @@ _mongoc_client_end_sessions (mongoc_client_t *client)

while (!mongoc_server_session_pool_is_empty (t->session_pool)) {
prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED);
server_id = mongoc_topology_select_server_id (
t, MONGOC_SS_READ, prefs, NULL /* chosen read mode */, &error);
server_id =
mongoc_topology_select_server_id (t,
MONGOC_SS_READ,
prefs,
NULL /* chosen read mode */,
NULL /* deprioritized servers */,
&error);

mongoc_read_prefs_destroy (prefs);
if (!server_id) {
Expand Down
3 changes: 3 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-cluster-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mongoc-scram-private.h"
#include "mongoc-cmd-private.h"
#include "mongoc-crypto-private.h"
#include "mongoc-deprioritized-servers-private.h"

BSON_BEGIN_DECLS

Expand Down Expand Up @@ -121,6 +122,7 @@ mongoc_server_stream_t *
mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error);

Expand All @@ -138,6 +140,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
mongoc_server_stream_t *
mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error);

Expand Down
25 changes: 18 additions & 7 deletions src/libmongoc/src/mongoc/mongoc-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2794,6 +2794,7 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
mongoc_ss_optype_t optype,
const mongoc_read_prefs_t *read_prefs,
bool *must_use_primary,
const mongoc_deprioritized_servers_t *ds,
bson_error_t *error)
{
BSON_ASSERT (cs || true);
Expand All @@ -2808,14 +2809,14 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
server_id = cs->server_id;
if (!server_id) {
server_id = mongoc_topology_select_server_id (
topology, optype, read_prefs, must_use_primary, error);
topology, optype, read_prefs, must_use_primary, ds, error);
if (server_id) {
_mongoc_client_session_pin (cs, server_id);
}
}
} else {
server_id = mongoc_topology_select_server_id (
topology, optype, read_prefs, must_use_primary, error);
topology, optype, read_prefs, must_use_primary, ds, error);
/* Transactions Spec: Additionally, any non-transaction operation using a
* pinned ClientSession MUST unpin the session and the operation MUST
* perform normal server selection. */
Expand Down Expand Up @@ -2851,6 +2852,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
bool is_retryable,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error)
{
Expand All @@ -2870,7 +2872,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
BSON_ASSERT (cluster);

server_id = _mongoc_cluster_select_server_id (
cs, topology, optype, read_prefs, &must_use_primary, error);
cs, topology, optype, read_prefs, &must_use_primary, ds, error);

if (!server_id) {
if (reply) {
Expand All @@ -2883,7 +2885,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
if (!mongoc_cluster_check_interval (cluster, server_id)) {
/* Server Selection Spec: try once more */
server_id = _mongoc_cluster_select_server_id (
cs, topology, optype, read_prefs, &must_use_primary, error);
cs, topology, optype, read_prefs, &must_use_primary, ds, error);

if (!server_id) {
if (reply) {
Expand Down Expand Up @@ -2967,6 +2969,7 @@ mongoc_server_stream_t *
mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error)
{
Expand All @@ -2979,21 +2982,28 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const bool is_retryable = mongoc_uri_get_option_as_bool (
cluster->uri, MONGOC_URI_RETRYREADS, MONGOC_DEFAULT_RETRYREADS);

return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_READ, prefs_override, cs, is_retryable, reply, error);
return _mongoc_cluster_stream_for_optype (cluster,
MONGOC_SS_READ,
prefs_override,
cs,
is_retryable,
ds,
reply,
error);
}

mongoc_server_stream_t *
mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
mongoc_client_session_t *cs,
const mongoc_deprioritized_servers_t *ds,
bson_t *reply,
bson_error_t *error)
{
const bool is_retryable = mongoc_uri_get_option_as_bool (
cluster->uri, MONGOC_URI_RETRYWRITES, MONGOC_DEFAULT_RETRYWRITES);

return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_WRITE, NULL, cs, is_retryable, reply, error);
cluster, MONGOC_SS_WRITE, NULL, cs, is_retryable, ds, reply, error);
}

mongoc_server_stream_t *
Expand All @@ -3015,6 +3025,7 @@ mongoc_cluster_stream_for_aggr_with_write (
prefs_override,
cs,
is_retryable,
NULL,
reply,
error);
}
Expand Down
30 changes: 24 additions & 6 deletions src/libmongoc/src/mongoc/mongoc-collection.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ _mongoc_collection_write_command_execute (
ENTRY;

server_stream = mongoc_cluster_stream_for_writes (
&collection->client->cluster, cs, NULL, &result->error);
&collection->client->cluster, cs, NULL, NULL, &result->error);

if (!server_stream) {
/* result->error has been filled out */
Expand Down Expand Up @@ -97,6 +97,7 @@ _mongoc_collection_write_command_execute_idl (
server_stream =
mongoc_cluster_stream_for_writes (&collection->client->cluster,
crud->client_session,
NULL,
&reply,
&result->error);

Expand Down Expand Up @@ -815,7 +816,7 @@ mongoc_collection_estimated_document_count (
BSON_ASSERT_PARAM (coll);

server_stream = mongoc_cluster_stream_for_reads (
&coll->client->cluster, read_prefs, NULL, reply, error);
&coll->client->cluster, read_prefs, NULL, NULL, reply, error);

if (opts && bson_has_field (opts, "sessionId")) {
bson_set_error (error,
Expand Down Expand Up @@ -1625,7 +1626,7 @@ mongoc_collection_create_index_with_opts (mongoc_collection_t *collection,
bson_append_array_builder_end (&cmd, ar);

server_stream = mongoc_cluster_stream_for_writes (
&collection->client->cluster, parsed.client_session, reply, error);
&collection->client->cluster, parsed.client_session, NULL, reply, error);

if (!server_stream) {
reply_initialized = true;
Expand Down Expand Up @@ -2245,6 +2246,7 @@ _mongoc_collection_update_or_replace (mongoc_collection_t *collection,
server_stream =
mongoc_cluster_stream_for_writes (&collection->client->cluster,
update_opts->crud.client_session,
NULL,
reply,
error);

Expand Down Expand Up @@ -3396,7 +3398,7 @@ mongoc_collection_find_and_modify_with_opts (
}

server_stream = mongoc_cluster_stream_for_writes (
cluster, appended_opts.client_session, &ss_reply, error);
cluster, appended_opts.client_session, NULL, &ss_reply, error);

if (!server_stream) {
bson_concat (reply_ptr, &ss_reply);
Expand Down Expand Up @@ -3566,8 +3568,23 @@ mongoc_collection_find_and_modify_with_opts (

/* each write command may be retried at most once */
is_retryable = false;
retry_server_stream = mongoc_cluster_stream_for_writes (
cluster, parts.assembled.session, NULL /* reply */, &ignored_error);

{
mongoc_deprioritized_servers_t *const ds =
mongoc_deprioritized_servers_new ();

mongoc_deprioritized_servers_add_if_sharded (
ds, server_stream->topology_type, server_stream->sd);

retry_server_stream =
mongoc_cluster_stream_for_writes (cluster,
parts.assembled.session,
ds,
NULL /* reply */,
&ignored_error);

mongoc_deprioritized_servers_destroy (ds);
}

if (retry_server_stream) {
parts.assembled.server_stream = retry_server_stream;
Expand Down Expand Up @@ -3782,6 +3799,7 @@ mongoc_collection_create_indexes_with_opts (mongoc_collection_t *collection,
server_stream =
mongoc_cluster_stream_for_writes (&collection->client->cluster,
NULL /* mongoc_client_session_t */,
NULL /* deprioritized servers */,
reply_ptr,
error);
if (server_stream->sd->max_wire_version < WIRE_VERSION_4_4) {
Expand Down
Loading