Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework memdx error model #162

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions sdk/couchbase-core/src/cbconfig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;

use serde::Deserialize;

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct VBucketServerMap {
#[serde(alias = "hashAlgorithm")]
pub hash_algorithm: String,
Expand All @@ -14,13 +14,13 @@ pub struct VBucketServerMap {
pub vbucket_map: Vec<Vec<i16>>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct ConfigDDocs {
#[serde(alias = "uri")]
pub uri: String,
}

#[derive(Deserialize, Debug, Default)]
#[derive(Deserialize, Debug, Default, Clone)]
pub struct TerseExtNodePorts {
#[serde(alias = "kv")]
pub kv: Option<i64>,
Expand Down Expand Up @@ -61,23 +61,23 @@ pub struct TerseExtNodePorts {
pub backup_ssl: Option<i64>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseExtNodeAltAddresses {
#[serde(alias = "ports")]
pub ports: TerseExtNodePorts,
#[serde(alias = "hostname")]
pub hostname: String,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseNodePorts {
#[serde(alias = "direct")]
pub direct: Option<u16>,
#[serde(alias = "proxy")]
pub proxy: Option<u16>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseNodeConfig {
#[serde(alias = "couchbaseApiBase")]
pub couchbase_api_base: Option<String>,
Expand All @@ -87,7 +87,7 @@ pub struct TerseNodeConfig {
pub ports: Option<TerseNodePorts>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseNodeExtConfig {
#[serde(alias = "services")]
pub services: Option<TerseExtNodePorts>,
Expand All @@ -99,7 +99,7 @@ pub struct TerseNodeExtConfig {
pub alternate_addresses: HashMap<String, TerseExtNodeAltAddresses>,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
pub struct TerseConfig {
#[serde(alias = "rev")]
pub rev: i64,
Expand Down
16 changes: 8 additions & 8 deletions sdk/couchbase-core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::fmt::Display;

use crate::error::CoreError::{Dispatch, Placeholder, PlaceholderMemdxWrapper};
use crate::memdx::error::MemdxError;
use crate::memdx::error::{Error, ErrorKind};
Matt-Woz marked this conversation as resolved.
Show resolved Hide resolved

#[derive(thiserror::Error, Debug, Eq, PartialEq)]
#[derive(thiserror::Error, Debug)]
pub enum CoreError {
#[error("Dispatch error {0}")]
Dispatch(MemdxError),
Dispatch(Error),
#[error("Placeholder error {0}")]
Placeholder(String),
#[error("Placeholder memdx wrapper error {0}")]
PlaceholderMemdxWrapper(MemdxError),
PlaceholderMemdxWrapper(Error),
}

impl From<MemdxError> for CoreError {
fn from(value: MemdxError) -> Self {
match value {
MemdxError::Dispatch(_) => Dispatch(value),
impl From<Error> for CoreError {
fn from(value: Error) -> Self {
match value.kind.as_ref() {
ErrorKind::Dispatch(_) => Dispatch(value),
_ => PlaceholderMemdxWrapper(value),
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/couchbase-core/src/kvclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ where
}

let (connection_close_tx, mut connection_close_rx) =
oneshot::channel::<crate::memdx::client::MemdxResult<()>>();
oneshot::channel::<crate::memdx::error::Result<()>>();
let memdx_client_opts = DispatcherOptions {
on_connection_close_handler: Some(connection_close_tx),
orphan_handler: opts.orphan_handler,
Expand Down
12 changes: 6 additions & 6 deletions sdk/couchbase-core/src/memdx/auth_mechanism.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::memdx::error::MemdxError;
use crate::memdx::error::{Error, ErrorKind};

#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum AuthMechanism {
Expand All @@ -22,7 +22,7 @@ impl From<AuthMechanism> for Vec<u8> {
}

impl TryFrom<&str> for AuthMechanism {
type Error = MemdxError;
type Error = Error;

fn try_from(value: &str) -> Result<Self, Self::Error> {
let mech = match value {
Expand All @@ -31,10 +31,10 @@ impl TryFrom<&str> for AuthMechanism {
"SCRAM-SHA256" => AuthMechanism::ScramSha256,
"SCRAM-SHA512" => AuthMechanism::ScramSha512,
_ => {
return Err(MemdxError::Protocol(format!(
"Unknown auth mechanism {}",
value
)));
return Err(ErrorKind::Protocol {
msg: format!("Unknown auth mechanism {}", value),
}
.into());
}
};

Expand Down
29 changes: 16 additions & 13 deletions sdk/couchbase-core/src/memdx/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use crate::memdx::client_response::ClientResponse;
use crate::memdx::codec::KeyValueCodec;
use crate::memdx::connection::{Connection, ConnectionType};
use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions};
use crate::memdx::error::{CancellationErrorKind, MemdxError};
use crate::memdx::error;
use crate::memdx::error::{CancellationErrorKind, Error, ErrorKind};
use crate::memdx::packet::{RequestPacket, ResponsePacket};
use crate::memdx::pendingop::ClientPendingOp;

pub type MemdxResult<T> = std::result::Result<T, MemdxError>;
type ResponseSender = Sender<MemdxResult<ClientResponse>>;
type ResponseSender = Sender<error::Result<ClientResponse>>;
type OpaqueMap = HashMap<u32, Arc<ResponseSender>>;
pub(crate) type CancellationSender = UnboundedSender<(u32, CancellationErrorKind)>;

Expand All @@ -41,7 +41,7 @@ struct ReadLoopOptions {
pub local_addr: Option<SocketAddr>,
pub peer_addr: Option<SocketAddr>,
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
pub on_connection_close_tx: Option<oneshot::Sender<MemdxResult<()>>>,
pub on_connection_close_tx: Option<oneshot::Sender<error::Result<()>>>,
pub on_client_close_rx: Receiver<()>,
}

Expand Down Expand Up @@ -90,7 +90,10 @@ impl Client {
for entry in opaque_map.iter() {
entry
.1
.send(Err(MemdxError::ClosedInFlight))
.send(Err(ErrorKind::Cancelled(
CancellationErrorKind::ClosedInFlight,
)
.into()))
.await
.unwrap_or_default();
}
Expand All @@ -100,7 +103,7 @@ impl Client {
stream: FramedRead<ReadHalf<TcpStream>, KeyValueCodec>,
op_cancel_rx: UnboundedReceiver<(u32, CancellationErrorKind)>,
opaque_map: MutexGuard<'_, OpaqueMap>,
on_connection_close_tx: Option<oneshot::Sender<MemdxResult<()>>>,
on_connection_close_tx: Option<oneshot::Sender<error::Result<()>>>,
) {
drop(stream);
drop(op_cancel_rx);
Expand Down Expand Up @@ -138,7 +141,7 @@ impl Client {
drop(map);

sender
.send(Err(MemdxError::Cancelled(cancel_info.1)))
.send(Err(ErrorKind::Cancelled(cancel_info.1).into()))
.await
.unwrap();
} else {
Expand Down Expand Up @@ -300,7 +303,7 @@ impl Dispatcher for Client {
}
}

async fn dispatch(&self, mut packet: RequestPacket) -> MemdxResult<ClientPendingOp> {
async fn dispatch(&self, mut packet: RequestPacket) -> error::Result<ClientPendingOp> {
let (response_tx, response_rx) = mpsc::channel(1);
let opaque = self.register_handler(Arc::new(response_tx)).await;
packet.opaque = Some(opaque);
Expand All @@ -323,14 +326,14 @@ impl Dispatcher for Client {
let mut map = requests.lock().await;
map.remove(&opaque);

Err(MemdxError::Dispatch(e.kind()))
Err(ErrorKind::Dispatch(Arc::new(e)).into())
}
}
}

async fn close(&self) -> MemdxResult<()> {
async fn close(&self) -> error::Result<()> {
if self.closed.swap(true, Ordering::SeqCst) {
return Err(MemdxError::Closed);
return Err(ErrorKind::Closed.into());
}

let mut close_err = None;
Expand All @@ -354,7 +357,7 @@ impl Dispatcher for Client {
Self::drain_opaque_map(map).await;

if let Some(e) = close_err {
return Err(MemdxError::from(e));
return Err(ErrorKind::Io(Arc::new(e)).into());
}

Ok(())
Expand Down Expand Up @@ -405,7 +408,7 @@ mod tests {
.expect("Could not connect");

let (orphan_tx, mut orphan_rx) = unbounded_channel::<ResponsePacket>();
let (close_tx, mut close_rx) = oneshot::channel::<crate::memdx::client::MemdxResult<()>>();
let (close_tx, mut close_rx) = oneshot::channel::<crate::memdx::error::Result<()>>();

tokio::spawn(async move {
loop {
Expand Down
11 changes: 7 additions & 4 deletions sdk/couchbase-core/src/memdx/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use std::io;
use tokio_util::bytes::{Buf, BufMut, BytesMut};
use tokio_util::codec::{Decoder, Encoder};

use crate::memdx::error::MemdxError;
use crate::memdx::error::MemdxError::Protocol;
use crate::memdx::error::{Error, ErrorKind};
use crate::memdx::magic::Magic;
use crate::memdx::opcode::OpCode;
use crate::memdx::packet::{RequestPacket, ResponsePacket};
Expand All @@ -17,7 +16,7 @@ pub struct KeyValueCodec(());

impl Decoder for KeyValueCodec {
type Item = ResponsePacket;
type Error = MemdxError;
type Error = Error;

fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let buf_len = buf.len();
Expand All @@ -28,7 +27,11 @@ impl Decoder for KeyValueCodec {

let total_body_len = match buf[8..12].try_into() {
Ok(v) => u32::from_be_bytes(v),
Err(e) => return Err(Protocol(e.to_string())),
Err(e) => {
return Err(Error {
kind: ErrorKind::Protocol { msg: e.to_string() }.into(),
})
}
} as usize;

if buf_len < (HEADER_SIZE + total_body_len) {
Expand Down
22 changes: 11 additions & 11 deletions sdk/couchbase-core/src/memdx/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use tokio_rustls::rustls::client::danger::{
use tokio_rustls::rustls::pki_types::{CertificateDer, IpAddr, ServerName, UnixTime};
use tokio_rustls::TlsConnector;

use crate::memdx::client::MemdxResult;
use crate::memdx::error::MemdxError;
use crate::memdx::error::ErrorKind;
use crate::memdx::error::Result;

#[derive(Debug, Default)]
pub struct TlsConfig {
Expand Down Expand Up @@ -42,7 +42,7 @@ pub struct Connection {
}

impl Connection {
pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> MemdxResult<Connection> {
pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> Result<Connection> {
let remote_addr = addr.to_string();

if let Some(tls_config) = opts.tls_config {
Expand All @@ -55,19 +55,19 @@ impl Connection {
} else if let Some(roots) = tls_config.root_certs {
builder.with_root_certificates(roots).with_no_client_auth()
} else {
return Err(MemdxError::Generic(
"If tls config is specified then roots or accept_all_certs must be specified"
return Err(ErrorKind::InvalidArgument {
msg: "If tls config is specified then roots or accept_all_certs must be specified"
.to_string(),
));
}.into());
};

let tcp_socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr))
.await?
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

tcp_socket
.set_nodelay(false)
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

let local_addr = match tcp_socket.local_addr() {
Ok(addr) => Some(addr),
Expand All @@ -84,7 +84,7 @@ impl Connection {
connector.connect(ServerName::IpAddress(IpAddr::from(addr.ip())), tcp_socket),
)
.await?
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

Ok(Connection {
inner: ConnectionType::Tls(socket),
Expand All @@ -94,10 +94,10 @@ impl Connection {
} else {
let socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr))
.await?
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;
socket
.set_nodelay(false)
.map_err(|e| MemdxError::Connect(e.kind()))?;
.map_err(|e| ErrorKind::Connect(Arc::new(e)))?;

let local_addr = match socket.local_addr() {
Ok(addr) => Some(addr),
Expand Down
8 changes: 4 additions & 4 deletions sdk/couchbase-core/src/memdx/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ use async_trait::async_trait;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;

use crate::memdx::client::MemdxResult;
use crate::memdx::connection::Connection;
use crate::memdx::error::Result;
use crate::memdx::packet::{RequestPacket, ResponsePacket};
use crate::memdx::pendingop::ClientPendingOp;

#[derive(Debug)]
pub struct DispatcherOptions {
pub orphan_handler: Arc<UnboundedSender<ResponsePacket>>,
pub on_connection_close_handler: Option<oneshot::Sender<MemdxResult<()>>>,
pub on_connection_close_handler: Option<oneshot::Sender<Result<()>>>,
}

#[async_trait]
pub trait Dispatcher: Send + Sync {
fn new(conn: Connection, opts: DispatcherOptions) -> Self;
async fn dispatch(&self, packet: RequestPacket) -> MemdxResult<ClientPendingOp>;
async fn close(&self) -> MemdxResult<()>;
async fn dispatch(&self, packet: RequestPacket) -> Result<ClientPendingOp>;
async fn close(&self) -> Result<()>;
}
Loading
Loading