Skip to content

Commit

Permalink
Rework memdx error model
Browse files Browse the repository at this point in the history
Motivation
----------
The memdx error model does not currently satisfy what we need from
it. We need to be able to attach cluster configs and error context
to errors sometimes.

Changes
--------
Rework memdx error model to include more types of error and the
top level error be a struct.
  • Loading branch information
chvck committed Jul 24, 2024
1 parent 4edc02d commit 56fc8c7
Show file tree
Hide file tree
Showing 21 changed files with 383 additions and 247 deletions.
16 changes: 8 additions & 8 deletions sdk/couchbase-core/src/cbconfig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;

use serde::Deserialize;

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct VBucketServerMap {
#[serde(alias = "hashAlgorithm")]
pub hash_algorithm: String,
Expand All @@ -14,13 +14,13 @@ pub struct VBucketServerMap {
pub vbucket_map: Vec<Vec<i16>>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct ConfigDDocs {
#[serde(alias = "uri")]
pub uri: String,
}

#[derive(Deserialize, Debug, Default)]
#[derive(Deserialize, Debug, Default, Clone)]
pub struct TerseExtNodePorts {
#[serde(alias = "kv")]
pub kv: Option<i64>,
Expand Down Expand Up @@ -61,23 +61,23 @@ pub struct TerseExtNodePorts {
pub backup_ssl: Option<i64>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseExtNodeAltAddresses {
#[serde(alias = "ports")]
pub ports: TerseExtNodePorts,
#[serde(alias = "hostname")]
pub hostname: String,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseNodePorts {
#[serde(alias = "direct")]
pub direct: Option<u16>,
#[serde(alias = "proxy")]
pub proxy: Option<u16>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseNodeConfig {
#[serde(alias = "couchbaseApiBase")]
pub couchbase_api_base: Option<String>,
Expand All @@ -87,7 +87,7 @@ pub struct TerseNodeConfig {
pub ports: Option<TerseNodePorts>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseNodeExtConfig {
#[serde(alias = "services")]
pub services: Option<TerseExtNodePorts>,
Expand All @@ -99,7 +99,7 @@ pub struct TerseNodeExtConfig {
pub alternate_addresses: HashMap<String, TerseExtNodeAltAddresses>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseConfig {
#[serde(alias = "rev")]
pub rev: i64,
Expand Down
16 changes: 8 additions & 8 deletions sdk/couchbase-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::fmt::Display;

use crate::error::CoreError::{Dispatch, Placeholder, PlaceholderMemdxWrapper};
use crate::memdx::error::MemdxError;
use crate::memdx::error::{Error, ErrorKind};

#[derive(thiserror::Error, Debug, Eq, PartialEq)]
#[derive(thiserror::Error, Debug)]
pub enum CoreError {
#[error("Dispatch error {0}")]
Dispatch(MemdxError),
Dispatch(Error),
#[error("Placeholder error {0}")]
Placeholder(String),
#[error("Placeholder memdx wrapper error {0}")]
PlaceholderMemdxWrapper(MemdxError),
PlaceholderMemdxWrapper(Error),
}

impl From<MemdxError> for CoreError {
fn from(value: MemdxError) -> Self {
match value {
MemdxError::Dispatch(_) => Dispatch(value),
impl From<Error> for CoreError {
fn from(value: Error) -> Self {
match value.kind.as_ref() {
ErrorKind::Dispatch(_) => Dispatch(value),
_ => PlaceholderMemdxWrapper(value),
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/couchbase-core/src/kvclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
}

let (connection_close_tx, mut connection_close_rx) =
oneshot::channel::<crate::memdx::client::MemdxResult<()>>();
oneshot::channel::<crate::memdx::error::Result<()>>();
let memdx_client_opts = DispatcherOptions {
on_connection_close_handler: Some(connection_close_tx),
orphan_handler: opts.orphan_handler,
Expand Down
12 changes: 6 additions & 6 deletions sdk/couchbase-core/src/memdx/auth_mechanism.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::memdx::error::MemdxError;
use crate::memdx::error::{Error, ErrorKind};

#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum AuthMechanism {
Expand All @@ -22,7 +22,7 @@ impl From<AuthMechanism> for Vec<u8> {
}

impl TryFrom<&str> for AuthMechanism {
type Error = MemdxError;
type Error = Error;

fn try_from(value: &str) -> Result<Self, Self::Error> {
let mech = match value {
Expand All @@ -31,10 +31,10 @@ impl TryFrom<&str> for AuthMechanism {
"SCRAM-SHA256" => AuthMechanism::ScramSha256,
"SCRAM-SHA512" => AuthMechanism::ScramSha512,
_ => {
return Err(MemdxError::Protocol(format!(
"Unknown auth mechanism {}",
value
)));
return Err(ErrorKind::Protocol {
msg: format!("Unknown auth mechanism {}", value),
}
.into());
}
};

Expand Down
29 changes: 16 additions & 13 deletions sdk/couchbase-core/src/memdx/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use crate::memdx::client_response::ClientResponse;
use crate::memdx::codec::KeyValueCodec;
use crate::memdx::connection::{Connection, ConnectionType};
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions};
use crate::memdx::error::{CancellationErrorKind, MemdxError};
use crate::memdx::error;
use crate::memdx::error::{CancellationErrorKind, Error, ErrorKind};
use crate::memdx::packet::{RequestPacket, ResponsePacket};
use crate::memdx::pendingop::ClientPendingOp;

pub type MemdxResult<T> = std::result::Result<T, MemdxError>;
type ResponseSender = Sender<MemdxResult<ClientResponse>>;
type ResponseSender = Sender<error::Result<ClientResponse>>;
type OpaqueMap = HashMap<u32, Arc<ResponseSender>>;
pub(crate) type CancellationSender = UnboundedSender<(u32, CancellationErrorKind)>;

Expand All @@ -41,7 +41,7 @@ struct ReadLoopOptions {
pub local_addr: Option<SocketAddr>,
pub peer_addr: Option<SocketAddr>,
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
pub on_connection_close_tx: Option<oneshot::Sender<MemdxResult<()>>>,
pub on_connection_close_tx: Option<oneshot::Sender<error::Result<()>>>,
pub on_client_close_rx: Receiver<()>,
}

Expand Down Expand Up @@ -90,7 +90,10 @@ impl Client {
for entry in opaque_map.iter() {
entry
.1
.send(Err(MemdxError::ClosedInFlight))
.send(Err(ErrorKind::Cancelled(
CancellationErrorKind::ClosedInFlight,
)
.into()))
.await
.unwrap_or_default();
}
Expand All @@ -100,7 +103,7 @@ impl Client {
stream: FramedRead<ReadHalf<TcpStream>, KeyValueCodec>,
op_cancel_rx: UnboundedReceiver<(u32, CancellationErrorKind)>,
opaque_map: MutexGuard<'_, OpaqueMap>,
on_connection_close_tx: Option<oneshot::Sender<MemdxResult<()>>>,
on_connection_close_tx: Option<oneshot::Sender<error::Result<()>>>,
) {
drop(stream);
drop(op_cancel_rx);
Expand Down Expand Up @@ -138,7 +141,7 @@ impl Client {
drop(map);

sender
.send(Err(MemdxError::Cancelled(cancel_info.1)))
.send(Err(ErrorKind::Cancelled(cancel_info.1).into()))
.await
.unwrap();
} else {
Expand Down Expand Up @@ -300,7 +303,7 @@ impl Dispatcher for Client {
}
}

async fn dispatch(&self, mut packet: RequestPacket) -> MemdxResult<ClientPendingOp> {
async fn dispatch(&self, mut packet: RequestPacket) -> error::Result<ClientPendingOp> {
let (response_tx, response_rx) = mpsc::channel(1);
let opaque = self.register_handler(Arc::new(response_tx)).await;
packet.opaque = Some(opaque);
Expand All @@ -323,14 +326,14 @@ impl Dispatcher for Client {
let mut map = requests.lock().await;
map.remove(&opaque);

Err(MemdxError::Dispatch(e.kind()))
Err(ErrorKind::Dispatch(Arc::new(e)).into())
}
}
}

async fn close(&self) -> MemdxResult<()> {
async fn close(&self) -> error::Result<()> {
if self.closed.swap(true, Ordering::SeqCst) {
return Err(MemdxError::Closed);
return Err(ErrorKind::Closed.into());
}

let mut close_err = None;
Expand All @@ -354,7 +357,7 @@ impl Dispatcher for Client {
Self::drain_opaque_map(map).await;

if let Some(e) = close_err {
return Err(MemdxError::from(e));
return Err(ErrorKind::Io(Arc::new(e)).into());
}

Ok(())
Expand Down Expand Up @@ -405,7 +408,7 @@ mod tests {
.expect("Could not connect");

let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();
let (close_tx, mut close_rx) = oneshot::channel::<crate::memdx::client::MemdxResult<()>>();
let (close_tx, mut close_rx) = oneshot::channel::<crate::memdx::error::Result<()>>();

tokio::spawn(async move {
loop {
Expand Down
11 changes: 7 additions & 4 deletions sdk/couchbase-core/src/memdx/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::io;
use tokio_util::bytes::{Buf, BufMut, BytesMut};
use tokio_util::codec::{Decoder, Encoder};

use crate::memdx::error::MemdxError;
use crate::memdx::error::MemdxError::Protocol;
use crate::memdx::error::{Error, ErrorKind};
use crate::memdx::magic::Magic;
use crate::memdx::opcode::OpCode;
use crate::memdx::packet::{RequestPacket, ResponsePacket};
Expand All @@ -17,7 +16,7 @@ pub struct KeyValueCodec(());

impl Decoder for KeyValueCodec {
type Item = ResponsePacket;
type Error = MemdxError;
type Error = Error;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let buf_len = buf.len();
Expand All @@ -28,7 +27,11 @@ impl Decoder for KeyValueCodec {

let total_body_len = match buf[8..12].try_into() {
Ok(v) => u32::from_be_bytes(v),
Err(e) => return Err(Protocol(e.to_string())),
Err(e) => {
return Err(Error {
kind: ErrorKind::Protocol { msg: e.to_string() }.into(),
})
}
} as usize;

if buf_len < (HEADER_SIZE + total_body_len) {
Expand Down
22 changes: 11 additions & 11 deletions sdk/couchbase-core/src/memdx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use tokio_rustls::rustls::client::danger::{
use tokio_rustls::rustls::pki_types::{CertificateDer, IpAddr, ServerName, UnixTime};
use tokio_rustls::TlsConnector;

use crate::memdx::client::MemdxResult;
use crate::memdx::error::MemdxError;
use crate::memdx::error::ErrorKind;
use crate::memdx::error::Result;

#[derive(Debug, Default)]
pub struct TlsConfig {
Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct Connection {
}

impl Connection {
pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> MemdxResult<Connection> {
pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> Result<Connection> {
let remote_addr = addr.to_string();

if let Some(tls_config) = opts.tls_config {
Expand All @@ -55,19 +55,19 @@ impl Connection {
} else if let Some(roots) = tls_config.root_certs {
builder.with_root_certificates(roots).with_no_client_auth()
} else {
return Err(MemdxError::Generic(
"If tls config is specified then roots or accept_all_certs must be specified"
return Err(ErrorKind::InvalidArgument {
msg: "If tls config is specified then roots or accept_all_certs must be specified"
.to_string(),
));
}.into());
};

let tcp_socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr))
.await?
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

tcp_socket
.set_nodelay(false)
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

let local_addr = match tcp_socket.local_addr() {
Ok(addr) => Some(addr),
Expand All @@ -84,7 +84,7 @@ impl Connection {
connector.connect(ServerName::IpAddress(IpAddr::from(addr.ip())), tcp_socket),
)
.await?
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

Ok(Connection {
inner: ConnectionType::Tls(socket),
Expand All @@ -94,10 +94,10 @@ impl Connection {
} else {
let socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr))
.await?
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;
socket
.set_nodelay(false)
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

let local_addr = match socket.local_addr() {
Ok(addr) => Some(addr),
Expand Down
8 changes: 4 additions & 4 deletions sdk/couchbase-core/src/memdx/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ use async_trait::async_trait;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;

use crate::memdx::client::MemdxResult;
use crate::memdx::connection::Connection;
use crate::memdx::error::Result;
use crate::memdx::packet::{RequestPacket, ResponsePacket};
use crate::memdx::pendingop::ClientPendingOp;

#[derive(Debug)]
pub struct DispatcherOptions {
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
pub on_connection_close_handler: Option<oneshot::Sender<MemdxResult<()>>>,
pub on_connection_close_handler: Option<oneshot::Sender<Result<()>>>,
}

#[async_trait]
pub trait Dispatcher: Send + Sync {
fn new(conn: Connection, opts: DispatcherOptions) -> Self;
async fn dispatch(&self, packet: RequestPacket) -> MemdxResult<ClientPendingOp>;
async fn close(&self) -> MemdxResult<()>;
async fn dispatch(&self, packet: RequestPacket) -> Result<ClientPendingOp>;
async fn close(&self) -> Result<()>;
}
Loading

0 comments on commit 56fc8c7

Please sign in to comment.