Skip to content

Commit

Permalink
Modify the graph interface to make possible select the type of operat…
Browse files Browse the repository at this point in the history
…ion to perform (read or write)
  • Loading branch information
madchicken committed Dec 19, 2024
1 parent 79722b5 commit abf4dd8
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 10 deletions.
117 changes: 107 additions & 10 deletions lib/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ enum ConnectionPoolManager {
}

impl ConnectionPoolManager {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
async fn get(&self, operation: Option<Operation>) -> Result<ManagedConnection> {
match self {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
Expand All @@ -37,6 +38,15 @@ impl ConnectionPoolManager {
}
}

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
async fn get(&self) -> Result<ManagedConnection> {
match self {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
Routed(manager) => manager.get(operation).await,
Normal(pool) => pool.get().await.map_err(crate::Error::from),
}
}

fn backoff(&self) -> ExponentialBackoff {
match self {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
Expand Down Expand Up @@ -134,7 +144,19 @@ impl Graph {
///
/// Transactions will not be automatically retried on any failure.
pub async fn start_txn(&self) -> Result<Txn> {
self.impl_start_txn_on(self.config.db.clone()).await
self.impl_start_txn_on(self.config.db.clone(), Operation::Write)
.await
}

/// Starts a new transaction on the configured database specifying the desired operation.
/// All queries that needs to be run/executed within the transaction
/// should be executed using either [`Txn::run`] or [`Txn::execute`]
///
/// Transactions will not be automatically retried on any failure.
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
pub async fn start_txn_as(&self, operation: Operation) -> Result<Txn> {
self.impl_start_txn_on(self.config.db.clone(), operation)
.await
}

/// Starts a new transaction on the provided database.
Expand All @@ -143,11 +165,16 @@ impl Graph {
///
/// Transactions will not be automatically retried on any failure.
pub async fn start_txn_on(&self, db: impl Into<Database>) -> Result<Txn> {
self.impl_start_txn_on(Some(db.into())).await
self.impl_start_txn_on(Some(db.into()), Operation::Write)
.await
}

async fn impl_start_txn_on(&self, db: Option<Database>) -> Result<Txn> {
let connection = self.pool.get(Some(Operation::Write)).await?;
#[allow(unused_variables)]
async fn impl_start_txn_on(&self, db: Option<Database>, operation: Operation) -> Result<Txn> {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
let connection = self.pool.get(Some(operation)).await?;
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
let connection = self.pool.get().await?;
Txn::new(db, self.config.fetch_size, connection).await
}

Expand All @@ -163,6 +190,22 @@ impl Graph {
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run(&self, q: Query) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q, Operation::Write)
.await
}

/// Runs a READ ONLY query on the configured database using a connection from the connection pool,
/// It doesn't return any [`DetachedRowStream`] as the `run` abstraction discards any stream.
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
///
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run_read(&self, q: Query) -> Result<()> {
self.impl_run_on(self.config.db.clone(), q, Operation::Read)
.await
}
Expand All @@ -178,10 +221,26 @@ impl Graph {
/// Use [`Graph::run`] for cases where you just want a write operation
///
/// use [`Graph::execute`] when you are interested in the result stream
pub async fn run_on(&self, db: impl Into<Database>, q: Query) -> Result<()> {
self.impl_run_on(Some(db.into()), q, Operation::Read).await
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
pub async fn run_on(
&self,
db: impl Into<Database>,
q: Query,
operation: Operation,
) -> Result<()> {
self.impl_run_on(Some(db.into()), q, operation).await
}

#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
pub async fn run_on(
&self,
db: impl Into<Database>,
q: Query,
) -> Result<()> {
self.impl_run_on(Some(db.into()), q, Operation::Write).await
}

#[allow(unused_variables)]
async fn impl_run_on(
&self,
db: Option<Database>,
Expand All @@ -196,7 +255,10 @@ impl Graph {
let db = db.as_deref();
let operation = operation.clone();
async move {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
let mut connection = pool.get(Some(operation)).await?;
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
let mut connection = pool.get().await?;
query.run_retryable(db, &mut connection).await
}
},
Expand All @@ -205,7 +267,7 @@ impl Graph {
.await
}

/// Executes a query on the configured database and returns a [`DetachedRowStream`]
/// Executes a READ/WRITE query on the configured database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
Expand All @@ -216,17 +278,48 @@ impl Graph {
.await
}

/// Executes a query on the provided database and returns a [`DetachedRowStream`]
/// Executes a query READ on the configured database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
pub async fn execute_on(&self, db: impl Into<Database>, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, Operation::Write)
pub async fn execute_read(&self, q: Query) -> Result<DetachedRowStream> {
self.impl_execute_on(self.config.db.clone(), q, Operation::Read)
.await
}

/// Executes a query on the provided database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
/// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry.
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
pub async fn execute_on(
&self,
db: impl Into<Database>,
q: Query,
operation: Operation,
) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, operation).await
}

/// Executes a query on the provided database and returns a [`DetachedRowStream`]
///
/// This operation retires the query on certain failures.
/// All errors with the `Transient` error class as well as a few other error classes are considered retryable.
/// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted.
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
pub async fn execute_on(
&self,
db: impl Into<Database>,
q: Query,
) -> Result<DetachedRowStream> {
self.impl_execute_on(Some(db.into()), q, Operation::Write).await
}

#[allow(unused_variables)]
async fn impl_execute_on(
&self,
db: Option<Database>,
Expand All @@ -240,9 +333,13 @@ impl Graph {
let fetch_size = self.config.fetch_size;
let query = &q;
let db = db.as_deref();
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
let operation = operation.clone();
async move {
#[cfg(feature = "unstable-bolt-protocol-impl-v2")]
let connection = pool.get(Some(operation)).await?;
#[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))]
let connection = pool.get().await?;
query.execute_retryable(db, fetch_size, connection).await
}
},
Expand Down
1 change: 1 addition & 0 deletions lib/src/routing/routed_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl RoutedConnectionManager {
.load_balancing_strategy
.select_reader(available_servers.as_slice()),
} {
debug!("requesting connection for server: {:?}", server);
if let Some(pool) = self.registry.get_pool(&server) {
match pool.get().await {
Ok(connection) => return Ok(connection),
Expand Down

0 comments on commit abf4dd8

Please sign in to comment.