Skip to content

Commit

Permalink
RUST-1585 Do not perform server selection to determine sessions suppo…
Browse files Browse the repository at this point in the history
…rt (#854)
  • Loading branch information
isabelatkinson authored Apr 17, 2023
1 parent 573aa54 commit d551dab
Show file tree
Hide file tree
Showing 18 changed files with 213 additions and 273 deletions.
2 changes: 1 addition & 1 deletion src/client/csfle/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
error::{Error, Result},
operation::{RawOutput, RunCommand},
options::ReadConcern,
runtime::{AsyncStream, Process, TlsConfig},
runtime::{process::Process, AsyncStream, TlsConfig},
Client,
Namespace,
};
Expand Down
52 changes: 11 additions & 41 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,7 @@ use crate::{
Retryability,
},
options::{ChangeStreamOptions, SelectionCriteria},
sdam::{
HandshakePhase,
SelectedServer,
ServerType,
SessionSupportStatus,
TopologyType,
TransactionSupportStatus,
},
sdam::{HandshakePhase, SelectedServer, ServerType, TopologyType, TransactionSupportStatus},
selection_criteria::ReadPreference,
ClusterTime,
};
Expand Down Expand Up @@ -349,8 +342,16 @@ impl Client {
}
};

if session.is_none() {
implicit_session = self.start_implicit_session(&op).await?;
if !conn.supports_sessions() && session.is_some() {
return Err(ErrorKind::SessionsNotSupported.into());
}

if conn.supports_sessions()
&& session.is_none()
&& op.supports_sessions()
&& op.is_acknowledged()
{
implicit_session = Some(ClientSession::new(self.clone(), None, true).await);
session = implicit_session.as_mut();
}

Expand Down Expand Up @@ -790,19 +791,6 @@ impl Client {
})
}

/// Start an implicit session if the operation and write concern are compatible with sessions.
async fn start_implicit_session<T: Operation>(&self, op: &T) -> Result<Option<ClientSession>> {
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
} if op.supports_sessions() && op.is_acknowledged() => Ok(Some(
self.start_session_with_timeout(logical_session_timeout, None, true)
.await,
)),
_ => Ok(None),
}
}

async fn select_data_bearing_server(&self, operation_name: &str) -> Result<()> {
let topology_type = self.inner.topology.topology_type();
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
Expand All @@ -814,24 +802,6 @@ impl Client {
Ok(())
}

/// Gets whether the topology supports sessions, and if so, returns the topology's logical
/// session timeout. If it has yet to be determined if the topology supports sessions, this
/// method will perform a server selection that will force that determination to be made.
pub(crate) async fn get_session_support_status(&self) -> Result<SessionSupportStatus> {
let initial_status = self.inner.topology.session_support_status();

// Need to guarantee that we're connected to at least one server that can determine if
// sessions are supported or not.
match initial_status {
SessionSupportStatus::Undetermined => {
self.select_data_bearing_server(crate::client::SESSIONS_SUPPORT_OP_NAME)
.await?;
Ok(self.inner.topology.session_support_status())
}
_ => Ok(initial_status),
}
}

/// Gets whether the topology supports transactions. If it has yet to be determined if the
/// topology supports transactions, this method will perform a server selection that will force
/// that determination to be made.
Expand Down
54 changes: 7 additions & 47 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
SessionOptions,
},
results::DatabaseSpecification,
sdam::{server_selection, SelectedServer, SessionSupportStatus, Topology},
sdam::{server_selection, SelectedServer, Topology},
ClientSession,
};

Expand All @@ -56,8 +56,6 @@ pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
use session::{ServerSession, ServerSessionPool};

const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
// TODO: RUST-1585 Remove this constant.
pub(crate) const SESSIONS_SUPPORT_OP_NAME: &str = "Check sessions support status";

/// This is the main entry point for the API. A `Client` is used to connect to a MongoDB cluster.
/// By default, it will monitor the topology of the cluster, keeping track of any changes, such
Expand Down Expand Up @@ -367,14 +365,7 @@ impl Client {
if let Some(ref options) = options {
options.validate()?;
}
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
} => Ok(self
.start_session_with_timeout(logical_session_timeout, options, false)
.await),
_ => Err(ErrorKind::SessionsNotSupported.into()),
}
Ok(ClientSession::new(self.clone(), options, false).await)
}

/// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
Expand Down Expand Up @@ -428,42 +419,11 @@ impl Client {
.await
}

/// Check in a server session to the server session pool.
/// If the session is expired or dirty, or the topology no longer supports sessions, the session
/// will be discarded.
/// Check in a server session to the server session pool. The session will be discarded if it is
/// expired or dirty.
pub(crate) async fn check_in_server_session(&self, session: ServerSession) {
let session_support_status = self.inner.topology.session_support_status();
if let SessionSupportStatus::Supported {
logical_session_timeout,
} = session_support_status
{
self.inner
.session_pool
.check_in(session, logical_session_timeout)
.await;
}
}

/// Starts a `ClientSession`.
///
/// This method will attempt to re-use server sessions from the pool which are not about to
/// expire according to the provided logical session timeout. If no such sessions are
/// available, a new one will be created.
pub(crate) async fn start_session_with_timeout(
&self,
logical_session_timeout: Option<Duration>,
options: Option<SessionOptions>,
is_implicit: bool,
) -> ClientSession {
ClientSession::new(
self.inner
.session_pool
.check_out(logical_session_timeout)
.await,
self.clone(),
options,
is_implicit,
)
let timeout = self.inner.topology.logical_session_timeout();
self.inner.session_pool.check_in(session, timeout).await;
}

#[cfg(test)]
Expand Down Expand Up @@ -601,7 +561,7 @@ impl Client {
}

#[cfg(test)]
pub(crate) fn topology(&self) -> &crate::sdam::Topology {
pub(crate) fn topology(&self) -> &Topology {
&self.inner.topology
}

Expand Down
8 changes: 5 additions & 3 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,15 @@ pub(crate) enum TransactionPin {
}

impl ClientSession {
/// Creates a new `ClientSession` wrapping the provided server session.
pub(crate) fn new(
server_session: ServerSession,
/// Creates a new `ClientSession` by checking out a corresponding `ServerSession` from the
/// provided client's session pool.
pub(crate) async fn new(
client: Client,
options: Option<SessionOptions>,
is_implicit: bool,
) -> Self {
let timeout = client.inner.topology.logical_session_timeout();
let server_session = client.inner.session_pool.check_out(timeout).await;
Self {
client,
server_session,
Expand Down
2 changes: 2 additions & 0 deletions src/client/session/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ async fn pool_is_lifo() {
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await;

let client = TestClient::new().await;
// Wait for the implicit sessions created in TestClient::new to be returned to the pool.
runtime::delay_for(Duration::from_millis(500)).await;

if client.is_standalone() {
return;
Expand Down
8 changes: 8 additions & 0 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,14 @@ impl Connection {
pub(crate) fn is_streaming(&self) -> bool {
self.more_to_come
}

/// Whether the connection supports sessions.
pub(crate) fn supports_sessions(&self) -> bool {
self.stream_description
.as_ref()
.and_then(|sd| sd.logical_session_timeout)
.is_some()
}
}

impl Drop for Connection {
Expand Down
9 changes: 5 additions & 4 deletions src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ mod http;
#[cfg(feature = "async-std-runtime")]
mod interval;
mod join_handle;
#[cfg(feature = "in-use-encryption-unstable")]
mod process;
#[cfg(any(
feature = "in-use-encryption-unstable",
all(test, not(feature = "sync"), not(feature = "tokio-sync"))
))]
pub(crate) mod process;
mod resolver;
pub(crate) mod stream;
mod sync_read_ext;
Expand All @@ -16,8 +19,6 @@ mod worker_handle;

use std::{future::Future, net::SocketAddr, time::Duration};

#[cfg(feature = "in-use-encryption-unstable")]
pub(crate) use self::process::Process;
pub(crate) use self::{
acknowledged_message::AcknowledgedMessage,
join_handle::AsyncJoinHandle,
Expand Down
Loading

0 comments on commit d551dab

Please sign in to comment.