From a85174b2297e65991b8b62a078c640a03fae3248 Mon Sep 17 00:00:00 2001 From: Charles Dixon Date: Mon, 22 Jul 2024 16:24:04 +0100 Subject: [PATCH] Rework memdx error model Motivation ---------- The memdx error model does not currently satisfy what we need from it. We need to be able to attach cluster configs and error context to errors sometimes. Changes -------- Rework memdx error model to include more types of error and the top level error be a struct. --- sdk/couchbase-core/src/cbconfig/mod.rs | 16 +- sdk/couchbase-core/src/error.rs | 16 +- sdk/couchbase-core/src/kvclient.rs | 2 +- .../src/memdx/auth_mechanism.rs | 12 +- sdk/couchbase-core/src/memdx/client.rs | 29 +-- sdk/couchbase-core/src/memdx/codec.rs | 11 +- sdk/couchbase-core/src/memdx/connection.rs | 22 +-- sdk/couchbase-core/src/memdx/dispatcher.rs | 8 +- sdk/couchbase-core/src/memdx/error.rs | 168 ++++++++++++++---- sdk/couchbase-core/src/memdx/magic.rs | 9 +- .../src/memdx/op_auth_saslauto.rs | 32 ++-- .../src/memdx/op_auth_saslbyname.rs | 4 +- .../src/memdx/op_auth_saslplain.rs | 15 +- .../src/memdx/op_auth_saslscram.rs | 33 ++-- sdk/couchbase-core/src/memdx/op_bootstrap.rs | 12 +- sdk/couchbase-core/src/memdx/opcode.rs | 13 +- sdk/couchbase-core/src/memdx/ops_core.rs | 26 +-- sdk/couchbase-core/src/memdx/ops_crud.rs | 84 ++++----- sdk/couchbase-core/src/memdx/pendingop.rs | 27 ++- sdk/couchbase-core/src/memdx/response.rs | 85 +++++---- sdk/couchbase-core/src/memdx/sync_helpers.rs | 6 +- 21 files changed, 383 insertions(+), 247 deletions(-) diff --git a/sdk/couchbase-core/src/cbconfig/mod.rs b/sdk/couchbase-core/src/cbconfig/mod.rs index f7ea03ac..3034e72c 100644 --- a/sdk/couchbase-core/src/cbconfig/mod.rs +++ b/sdk/couchbase-core/src/cbconfig/mod.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use serde::Deserialize; -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct VBucketServerMap { #[serde(alias = "hashAlgorithm")] pub hash_algorithm: String, @@ -14,13 +14,13 @@ pub struct VBucketServerMap { pub vbucket_map: Vec>, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct ConfigDDocs { #[serde(alias = "uri")] pub uri: String, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, Clone)] pub struct TerseExtNodePorts { #[serde(alias = "kv")] pub kv: Option, @@ -61,7 +61,7 @@ pub struct TerseExtNodePorts { pub backup_ssl: Option, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct TerseExtNodeAltAddresses { #[serde(alias = "ports")] pub ports: TerseExtNodePorts, @@ -69,7 +69,7 @@ pub struct TerseExtNodeAltAddresses { pub hostname: String, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct TerseNodePorts { #[serde(alias = "direct")] pub direct: Option, @@ -77,7 +77,7 @@ pub struct TerseNodePorts { pub proxy: Option, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct TerseNodeConfig { #[serde(alias = "couchbaseApiBase")] pub couchbase_api_base: Option, @@ -87,7 +87,7 @@ pub struct TerseNodeConfig { pub ports: Option, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct TerseNodeExtConfig { #[serde(alias = "services")] pub services: Option, @@ -99,7 +99,7 @@ pub struct TerseNodeExtConfig { pub alternate_addresses: HashMap, } -#[derive(Deserialize, Debug)] +#[derive(Deserialize, Debug, Clone)] pub struct TerseConfig { #[serde(alias = "rev")] pub rev: i64, diff --git a/sdk/couchbase-core/src/error.rs b/sdk/couchbase-core/src/error.rs index 6272d1b9..ba932bae 100644 --- a/sdk/couchbase-core/src/error.rs +++ b/sdk/couchbase-core/src/error.rs @@ -1,22 +1,22 @@ use std::fmt::Display; use crate::error::CoreError::{Dispatch, Placeholder, PlaceholderMemdxWrapper}; -use crate::memdx::error::MemdxError; +use crate::memdx::error::{Error, ErrorKind}; -#[derive(thiserror::Error, Debug, Eq, PartialEq)] +#[derive(thiserror::Error, Debug)] pub enum CoreError { #[error("Dispatch error {0}")] - Dispatch(MemdxError), + Dispatch(Error), #[error("Placeholder error {0}")] Placeholder(String), #[error("Placeholder memdx wrapper error {0}")] - PlaceholderMemdxWrapper(MemdxError), + PlaceholderMemdxWrapper(Error), } -impl From for CoreError { - fn from(value: MemdxError) -> Self { - match value { - MemdxError::Dispatch(_) => Dispatch(value), +impl From for CoreError { + fn from(value: Error) -> Self { + match value.kind.as_ref() { + ErrorKind::Dispatch(_) => Dispatch(value), _ => PlaceholderMemdxWrapper(value), } } diff --git a/sdk/couchbase-core/src/kvclient.rs b/sdk/couchbase-core/src/kvclient.rs index 870f672e..4662d497 100644 --- a/sdk/couchbase-core/src/kvclient.rs +++ b/sdk/couchbase-core/src/kvclient.rs @@ -176,7 +176,7 @@ where } let (connection_close_tx, mut connection_close_rx) = - oneshot::channel::>(); + oneshot::channel::>(); let memdx_client_opts = DispatcherOptions { on_connection_close_handler: Some(connection_close_tx), orphan_handler: opts.orphan_handler, diff --git a/sdk/couchbase-core/src/memdx/auth_mechanism.rs b/sdk/couchbase-core/src/memdx/auth_mechanism.rs index 9a9da2b5..68174c22 100644 --- a/sdk/couchbase-core/src/memdx/auth_mechanism.rs +++ b/sdk/couchbase-core/src/memdx/auth_mechanism.rs @@ -1,4 +1,4 @@ -use crate::memdx::error::MemdxError; +use crate::memdx::error::{Error, ErrorKind}; #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] pub enum AuthMechanism { @@ -22,7 +22,7 @@ impl From for Vec { } impl TryFrom<&str> for AuthMechanism { - type Error = MemdxError; + type Error = Error; fn try_from(value: &str) -> Result { let mech = match value { @@ -31,10 +31,10 @@ impl TryFrom<&str> for AuthMechanism { "SCRAM-SHA256" => AuthMechanism::ScramSha256, "SCRAM-SHA512" => AuthMechanism::ScramSha512, _ => { - return Err(MemdxError::Protocol(format!( - "Unknown auth mechanism {}", - value - ))); + return Err(ErrorKind::Protocol { + msg: format!("Unknown auth mechanism {}", value), + } + .into()); } }; diff --git a/sdk/couchbase-core/src/memdx/client.rs b/sdk/couchbase-core/src/memdx/client.rs index 1472e34b..880c0544 100644 --- a/sdk/couchbase-core/src/memdx/client.rs +++ b/sdk/couchbase-core/src/memdx/client.rs @@ -26,12 +26,12 @@ use crate::memdx::client_response::ClientResponse; use crate::memdx::codec::KeyValueCodec; use crate::memdx::connection::{Connection, ConnectionType}; use crate::memdx::dispatcher::{Dispatcher, DispatcherOptions}; -use crate::memdx::error::{CancellationErrorKind, MemdxError}; +use crate::memdx::error; +use crate::memdx::error::{CancellationErrorKind, Error, ErrorKind}; use crate::memdx::packet::{RequestPacket, ResponsePacket}; use crate::memdx::pendingop::ClientPendingOp; -pub type MemdxResult = std::result::Result; -type ResponseSender = Sender>; +type ResponseSender = Sender>; type OpaqueMap = HashMap>; pub(crate) type CancellationSender = UnboundedSender<(u32, CancellationErrorKind)>; @@ -41,7 +41,7 @@ struct ReadLoopOptions { pub local_addr: Option, pub peer_addr: Option, pub orphan_handler: Arc>, - pub on_connection_close_tx: Option>>, + pub on_connection_close_tx: Option>>, pub on_client_close_rx: Receiver<()>, } @@ -90,7 +90,10 @@ impl Client { for entry in opaque_map.iter() { entry .1 - .send(Err(MemdxError::ClosedInFlight)) + .send(Err(ErrorKind::Cancelled( + CancellationErrorKind::ClosedInFlight, + ) + .into())) .await .unwrap_or_default(); } @@ -100,7 +103,7 @@ impl Client { stream: FramedRead, KeyValueCodec>, op_cancel_rx: UnboundedReceiver<(u32, CancellationErrorKind)>, opaque_map: MutexGuard<'_, OpaqueMap>, - on_connection_close_tx: Option>>, + on_connection_close_tx: Option>>, ) { drop(stream); drop(op_cancel_rx); @@ -138,7 +141,7 @@ impl Client { drop(map); sender - .send(Err(MemdxError::Cancelled(cancel_info.1))) + .send(Err(ErrorKind::Cancelled(cancel_info.1).into())) .await .unwrap(); } else { @@ -300,7 +303,7 @@ impl Dispatcher for Client { } } - async fn dispatch(&self, mut packet: RequestPacket) -> MemdxResult { + async fn dispatch(&self, mut packet: RequestPacket) -> error::Result { let (response_tx, response_rx) = mpsc::channel(1); let opaque = self.register_handler(Arc::new(response_tx)).await; packet.opaque = Some(opaque); @@ -323,14 +326,14 @@ impl Dispatcher for Client { let mut map = requests.lock().await; map.remove(&opaque); - Err(MemdxError::Dispatch(e.kind())) + Err(ErrorKind::Dispatch(Arc::new(e)).into()) } } } - async fn close(&self) -> MemdxResult<()> { + async fn close(&self) -> error::Result<()> { if self.closed.swap(true, Ordering::SeqCst) { - return Err(MemdxError::Closed); + return Err(ErrorKind::Closed.into()); } let mut close_err = None; @@ -354,7 +357,7 @@ impl Dispatcher for Client { Self::drain_opaque_map(map).await; if let Some(e) = close_err { - return Err(MemdxError::from(e)); + return Err(ErrorKind::Io(Arc::new(e)).into()); } Ok(()) @@ -405,7 +408,7 @@ mod tests { .expect("Could not connect"); let (orphan_tx, mut orphan_rx) = unbounded_channel::(); - let (close_tx, mut close_rx) = oneshot::channel::>(); + let (close_tx, mut close_rx) = oneshot::channel::>(); tokio::spawn(async move { loop { diff --git a/sdk/couchbase-core/src/memdx/codec.rs b/sdk/couchbase-core/src/memdx/codec.rs index 23b36fa2..a110cb3f 100644 --- a/sdk/couchbase-core/src/memdx/codec.rs +++ b/sdk/couchbase-core/src/memdx/codec.rs @@ -3,8 +3,7 @@ use std::io; use tokio_util::bytes::{Buf, BufMut, BytesMut}; use tokio_util::codec::{Decoder, Encoder}; -use crate::memdx::error::MemdxError; -use crate::memdx::error::MemdxError::Protocol; +use crate::memdx::error::{Error, ErrorKind}; use crate::memdx::magic::Magic; use crate::memdx::opcode::OpCode; use crate::memdx::packet::{RequestPacket, ResponsePacket}; @@ -17,7 +16,7 @@ pub struct KeyValueCodec(()); impl Decoder for KeyValueCodec { type Item = ResponsePacket; - type Error = MemdxError; + type Error = Error; fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { let buf_len = buf.len(); @@ -28,7 +27,11 @@ impl Decoder for KeyValueCodec { let total_body_len = match buf[8..12].try_into() { Ok(v) => u32::from_be_bytes(v), - Err(e) => return Err(Protocol(e.to_string())), + Err(e) => { + return Err(Error { + kind: ErrorKind::Protocol { msg: e.to_string() }.into(), + }) + } } as usize; if buf_len < (HEADER_SIZE + total_body_len) { diff --git a/sdk/couchbase-core/src/memdx/connection.rs b/sdk/couchbase-core/src/memdx/connection.rs index 826998fa..16ffff8e 100644 --- a/sdk/couchbase-core/src/memdx/connection.rs +++ b/sdk/couchbase-core/src/memdx/connection.rs @@ -12,8 +12,8 @@ use tokio_rustls::rustls::client::danger::{ use tokio_rustls::rustls::pki_types::{CertificateDer, IpAddr, ServerName, UnixTime}; use tokio_rustls::TlsConnector; -use crate::memdx::client::MemdxResult; -use crate::memdx::error::MemdxError; +use crate::memdx::error::ErrorKind; +use crate::memdx::error::Result; #[derive(Debug, Default)] pub struct TlsConfig { @@ -42,7 +42,7 @@ pub struct Connection { } impl Connection { - pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> MemdxResult { + pub async fn connect(addr: SocketAddr, opts: ConnectOptions) -> Result { let remote_addr = addr.to_string(); if let Some(tls_config) = opts.tls_config { @@ -55,19 +55,19 @@ impl Connection { } else if let Some(roots) = tls_config.root_certs { builder.with_root_certificates(roots).with_no_client_auth() } else { - return Err(MemdxError::Generic( - "If tls config is specified then roots or accept_all_certs must be specified" + return Err(ErrorKind::InvalidArgument { + msg: "If tls config is specified then roots or accept_all_certs must be specified" .to_string(), - )); + }.into()); }; let tcp_socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr)) .await? - .map_err(|e| MemdxError::Connect(e.kind()))?; + .map_err(|e| ErrorKind::Connect(Arc::new(e)))?; tcp_socket .set_nodelay(false) - .map_err(|e| MemdxError::Connect(e.kind()))?; + .map_err(|e| ErrorKind::Connect(Arc::new(e)))?; let local_addr = match tcp_socket.local_addr() { Ok(addr) => Some(addr), @@ -84,7 +84,7 @@ impl Connection { connector.connect(ServerName::IpAddress(IpAddr::from(addr.ip())), tcp_socket), ) .await? - .map_err(|e| MemdxError::Connect(e.kind()))?; + .map_err(|e| ErrorKind::Connect(Arc::new(e)))?; Ok(Connection { inner: ConnectionType::Tls(socket), @@ -94,10 +94,10 @@ impl Connection { } else { let socket = timeout_at(opts.deadline, TcpStream::connect(remote_addr)) .await? - .map_err(|e| MemdxError::Connect(e.kind()))?; + .map_err(|e| ErrorKind::Connect(Arc::new(e)))?; socket .set_nodelay(false) - .map_err(|e| MemdxError::Connect(e.kind()))?; + .map_err(|e| ErrorKind::Connect(Arc::new(e)))?; let local_addr = match socket.local_addr() { Ok(addr) => Some(addr), diff --git a/sdk/couchbase-core/src/memdx/dispatcher.rs b/sdk/couchbase-core/src/memdx/dispatcher.rs index a108f673..98d0e9e7 100644 --- a/sdk/couchbase-core/src/memdx/dispatcher.rs +++ b/sdk/couchbase-core/src/memdx/dispatcher.rs @@ -4,20 +4,20 @@ use async_trait::async_trait; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::oneshot; -use crate::memdx::client::MemdxResult; use crate::memdx::connection::Connection; +use crate::memdx::error::Result; use crate::memdx::packet::{RequestPacket, ResponsePacket}; use crate::memdx::pendingop::ClientPendingOp; #[derive(Debug)] pub struct DispatcherOptions { pub orphan_handler: Arc>, - pub on_connection_close_handler: Option>>, + pub on_connection_close_handler: Option>>, } #[async_trait] pub trait Dispatcher: Send + Sync { fn new(conn: Connection, opts: DispatcherOptions) -> Self; - async fn dispatch(&self, packet: RequestPacket) -> MemdxResult; - async fn close(&self) -> MemdxResult<()>; + async fn dispatch(&self, packet: RequestPacket) -> Result; + async fn close(&self) -> Result<()>; } diff --git a/sdk/couchbase-core/src/memdx/error.rs b/sdk/couchbase-core/src/memdx/error.rs index 56589ba0..df925fed 100644 --- a/sdk/couchbase-core/src/memdx/error.rs +++ b/sdk/couchbase-core/src/memdx/error.rs @@ -1,22 +1,85 @@ -use std::fmt::{Display, Formatter}; +use std::fmt::{Display, Formatter, Pointer}; use std::io; +use std::sync::Arc; +use thiserror::Error; use tokio::time::error::Elapsed; +use crate::memdx::error; +use crate::memdx::status::Status; use crate::scram::ScramError; -#[derive(thiserror::Error, Debug, Eq, PartialEq)] -pub enum MemdxError { - #[error("Connect failed {0}")] - Connect(io::ErrorKind), - #[error("Dispatch failed {0}")] - Dispatch(io::ErrorKind), +pub type Result = std::result::Result; + +#[derive(Clone, Debug, Error)] +#[error("{kind}")] +#[non_exhaustive] +pub struct Error { + /// Taken from serde_json: This `Box` allows us to keep the size of `Error` as small as possible. + /// A larger `Error` type was substantially slower due to all the functions + /// that pass around `Result`. + pub kind: Box, +} + +#[derive(Clone, Debug, Error)] +#[non_exhaustive] +pub enum ErrorKind { + #[error("{0:?}")] + Server(ServerError), + #[error("Dispatch failed {0:?}")] + #[non_exhaustive] + Dispatch(Arc), + #[error("Protocol error {msg}")] + #[non_exhaustive] + Protocol { msg: String }, + #[error("Connect error {0}")] + Connect(Arc), #[error("Request cancelled {0}")] Cancelled(CancellationErrorKind), + #[error("Connection closed")] + Closed, + #[error("Unknown bucket name")] + UnknownBucketName, + #[error("Unknown IO error {0:?}")] + #[non_exhaustive] + Io(Arc), + #[error("Invalid argument {msg}")] + #[non_exhaustive] + InvalidArgument { msg: String }, +} + +#[derive(Clone, Debug, Error, PartialEq, Eq)] +#[error("Server error: {kind}")] +#[non_exhaustive] +pub struct ServerError { + pub kind: ServerErrorKind, + pub config: Option>, + pub context: Option, +} + +impl ServerError { + pub(crate) fn new(kind: ServerErrorKind) -> Self { + Self { + kind, + config: None, + context: None, + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +#[non_exhaustive] +pub struct ServerErrorContext { + pub text: String, + pub error_ref: String, + pub manifest_rev: u64, +} + +#[derive(Error, Clone, Debug, Eq, Hash, PartialEq)] +#[non_exhaustive] +pub enum ServerErrorKind { #[error("Not my vbucket")] NotMyVbucket, - #[error("Protocol error {0}")] - Protocol(String), #[error("Key exists")] KeyExists, #[error("Key not found")] @@ -27,60 +90,105 @@ pub enum MemdxError { Locked, #[error("Too big")] TooBig, - #[error("Collections not enabled")] - CollectionsNotEnabled, #[error("Unknown collection id")] UnknownCollectionID, #[error("Unknown bucket name")] UnknownBucketName, #[error("Access error")] Access, - #[error("Auth error")] - Auth(String), - #[error("Connection closed")] - Closed, + #[error("Auth error {msg}")] + Auth { msg: String }, #[error("Config not set")] ConfigNotSet, - #[error("Closed in flight")] - ClosedInFlight, - #[error("{0}")] - Generic(String), - #[error("Unknown error {0}")] - Unknown(String), + #[error("Server status unexpected for operation: {status}")] + UnknownStatus { status: Status }, } #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[non_exhaustive] pub enum CancellationErrorKind { Timeout, RequestCancelled, + ClosedInFlight, +} + +impl Error { + pub(crate) fn new_protocol_error(msg: &str) -> Self { + Self { + kind: Box::new(ErrorKind::Protocol { msg: msg.into() }), + } + } + + pub fn has_server_config(&self) -> Option<&Vec> { + if let ErrorKind::Server(ServerError { config, .. }) = self.kind.as_ref() { + config.as_ref() + } else { + None + } + } + + pub fn has_server_error_context(&self) -> Option<&ServerErrorContext> { + if let ErrorKind::Server(ServerError { context, .. }) = self.kind.as_ref() { + context.as_ref() + } else { + None + } + } } impl Display for CancellationErrorKind { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let txt = match self { - CancellationErrorKind::Timeout => "timeout", - CancellationErrorKind::RequestCancelled => "request cancelled", + CancellationErrorKind::Timeout => "Timeout", + CancellationErrorKind::RequestCancelled => "Request cancelled", + CancellationErrorKind::ClosedInFlight => "Closed in flight", }; write!(f, "{}", txt) } } -// TODO: improve this. -impl From for MemdxError { +impl From for Error +where + ErrorKind: From, +{ + fn from(err: E) -> Self { + Self { + kind: Box::new(err.into()), + } + } +} + +impl From for Error { + fn from(kind: ServerErrorKind) -> Self { + Self { + kind: Box::new(ErrorKind::Server(ServerError::new(kind))), + } + } +} + +impl From for Error { fn from(value: io::Error) -> Self { - MemdxError::Unknown(value.to_string()) + Self { + kind: Box::new(ErrorKind::Io(Arc::new(value))), + } } } -impl From for MemdxError { +impl From for Error { fn from(value: ScramError) -> Self { - Self::Auth(value.to_string()) + Self { + kind: Box::new(ErrorKind::Server(ServerError::new(ServerErrorKind::Auth { + msg: value.to_string(), + }))), + } } } -impl From for MemdxError { +impl From for Error { fn from(_value: Elapsed) -> Self { - Self::Cancelled(CancellationErrorKind::Timeout) + Self { + kind: Box::new(ErrorKind::Cancelled(CancellationErrorKind::Timeout)), + } } } diff --git a/sdk/couchbase-core/src/memdx/magic.rs b/sdk/couchbase-core/src/memdx/magic.rs index e9858e4d..8cbc2bad 100644 --- a/sdk/couchbase-core/src/memdx/magic.rs +++ b/sdk/couchbase-core/src/memdx/magic.rs @@ -1,6 +1,6 @@ use std::fmt::{Debug, Display}; -use crate::memdx::error::MemdxError; +use crate::memdx::error::{Error, ErrorKind}; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum Magic { @@ -36,7 +36,7 @@ impl From for u8 { } impl TryFrom for Magic { - type Error = MemdxError; + type Error = Error; fn try_from(value: u8) -> Result { let magic = match value { @@ -45,7 +45,10 @@ impl TryFrom for Magic { 0x08 => Magic::ReqExt, 0x18 => Magic::ResExt, _ => { - return Err(MemdxError::Protocol(format!("unknown magic {}", value))); + return Err(ErrorKind::Protocol { + msg: format!("unknown magic {}", value), + } + .into()); } }; diff --git a/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs b/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs index d95c47f3..0a5845f1 100644 --- a/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs +++ b/sdk/couchbase-core/src/memdx/op_auth_saslauto.rs @@ -3,11 +3,10 @@ use std::cmp::PartialEq; use tokio::time::Instant; use crate::memdx::auth_mechanism::AuthMechanism; -use crate::memdx::client::MemdxResult; use crate::memdx::dispatcher::Dispatcher; +use crate::memdx::error::{ErrorKind, ServerErrorKind}; use crate::memdx::error::CancellationErrorKind::{RequestCancelled, Timeout}; -use crate::memdx::error::MemdxError; -use crate::memdx::error::MemdxError::Generic; +use crate::memdx::error::Result; use crate::memdx::op_auth_saslbyname::{ OpSASLAuthByNameEncoder, OpsSASLAuthByName, SASLAuthByNameOptions, }; @@ -31,7 +30,7 @@ pub trait OpSASLAutoEncoder: OpSASLAuthByNameEncoder { &self, dispatcher: &D, request: SASLListMechsRequest, - ) -> impl std::future::Future>> + ) -> impl std::future::Future>> where D: Dispatcher; } @@ -46,15 +45,16 @@ impl OpsSASLAuthAuto { dispatcher: &D, deadline: Instant, opts: SASLAuthAutoOptions, - ) -> MemdxResult<()> + ) -> Result<()> where E: OpSASLAutoEncoder, D: Dispatcher, { if opts.enabled_mechs.is_empty() { - return Err(Generic( - "Must specify at least one allowed authentication mechanism".to_string(), - )); + return Err(ErrorKind::InvalidArgument { + msg: "Must specify at least one allowed authentication mechanism".to_string(), + } + .into()); } let mut op = encoder @@ -81,10 +81,11 @@ impl OpsSASLAuthAuto { { Ok(()) => Ok(()), Err(e) => { - if e == MemdxError::Cancelled(Timeout) - || e == MemdxError::Cancelled(RequestCancelled) - { - return Err(e); + match e.kind.as_ref() { + ErrorKind::Cancelled(Timeout) | ErrorKind::Cancelled(RequestCancelled) => { + return Err(e); + } + _ => {} } // There is no obvious way to differentiate between a mechanism being unsupported @@ -105,10 +106,13 @@ impl OpsSASLAuthAuto { let selected_mech = match selected_mech { Some(mech) => mech, None => { - return Err(Generic(format!( + return Err(ServerErrorKind::Auth { + msg: format!( "No supported auth mechanism was found (enabled: {:?}, server: {:?})", opts.enabled_mechs, server_mechs - ))); + ), + } + .into()); } }; diff --git a/sdk/couchbase-core/src/memdx/op_auth_saslbyname.rs b/sdk/couchbase-core/src/memdx/op_auth_saslbyname.rs index 445c44e1..deaa2508 100644 --- a/sdk/couchbase-core/src/memdx/op_auth_saslbyname.rs +++ b/sdk/couchbase-core/src/memdx/op_auth_saslbyname.rs @@ -1,8 +1,8 @@ use tokio::time::Instant; use crate::memdx::auth_mechanism::AuthMechanism; -use crate::memdx::client::MemdxResult; use crate::memdx::dispatcher::Dispatcher; +use crate::memdx::error::Result; use crate::memdx::op_auth_saslplain::{OpSASLPlainEncoder, OpsSASLAuthPlain, SASLAuthPlainOptions}; use crate::memdx::op_auth_saslscram::{OpSASLScramEncoder, OpsSASLAuthScram, SASLAuthScramOptions}; @@ -27,7 +27,7 @@ impl OpsSASLAuthByName { encoder: &E, dispatcher: &D, opts: SASLAuthByNameOptions, - ) -> MemdxResult<()> + ) -> Result<()> where E: OpSASLAuthByNameEncoder, D: Dispatcher, diff --git a/sdk/couchbase-core/src/memdx/op_auth_saslplain.rs b/sdk/couchbase-core/src/memdx/op_auth_saslplain.rs index d9c7af24..16bc73a5 100644 --- a/sdk/couchbase-core/src/memdx/op_auth_saslplain.rs +++ b/sdk/couchbase-core/src/memdx/op_auth_saslplain.rs @@ -1,9 +1,9 @@ use tokio::time::Instant; use crate::memdx::auth_mechanism::AuthMechanism; -use crate::memdx::client::MemdxResult; use crate::memdx::dispatcher::Dispatcher; -use crate::memdx::error::MemdxError; +use crate::memdx::error::ErrorKind; +use crate::memdx::error::Result; use crate::memdx::pendingop::{run_op_future_with_deadline, StandardPendingOp}; use crate::memdx::request::SASLAuthRequest; use crate::memdx::response::SASLAuthResponse; @@ -13,7 +13,7 @@ pub trait OpSASLPlainEncoder { &self, dispatcher: &D, req: SASLAuthRequest, - ) -> impl std::future::Future>> + ) -> impl std::future::Future>> where D: Dispatcher; } @@ -44,7 +44,7 @@ impl OpsSASLAuthPlain { encoder: &E, dispatcher: &D, opts: SASLAuthPlainOptions, - ) -> MemdxResult<()> + ) -> Result<()> where E: OpSASLPlainEncoder, D: Dispatcher, @@ -64,9 +64,10 @@ impl OpsSASLAuthPlain { run_op_future_with_deadline(opts.deadline, encoder.sasl_auth(dispatcher, req)).await?; if resp.needs_more_steps { - return Err(MemdxError::Protocol( - "Server did not accept auth when the client expected".to_string(), - )); + return Err(ErrorKind::Protocol { + msg: "Server did not accept auth when the client expected".to_string(), + } + .into()); } Ok(()) diff --git a/sdk/couchbase-core/src/memdx/op_auth_saslscram.rs b/sdk/couchbase-core/src/memdx/op_auth_saslscram.rs index 70fe828f..31282d79 100644 --- a/sdk/couchbase-core/src/memdx/op_auth_saslscram.rs +++ b/sdk/couchbase-core/src/memdx/op_auth_saslscram.rs @@ -4,9 +4,9 @@ use sha2::{Sha256, Sha512}; use tokio::time::Instant; use crate::memdx::auth_mechanism::AuthMechanism; -use crate::memdx::client::MemdxResult; use crate::memdx::dispatcher::Dispatcher; -use crate::memdx::error::MemdxError; +use crate::memdx::error::ErrorKind; +use crate::memdx::error::Result; use crate::memdx::op_auth_saslplain::OpSASLPlainEncoder; use crate::memdx::pendingop::{run_op_future_with_deadline, StandardPendingOp}; use crate::memdx::request::{SASLAuthRequest, SASLStepRequest}; @@ -18,7 +18,7 @@ pub trait OpSASLScramEncoder: OpSASLPlainEncoder { &self, dispatcher: &D, request: SASLStepRequest, - ) -> impl std::future::Future>> + ) -> impl std::future::Future>> where D: Dispatcher; } @@ -52,7 +52,7 @@ impl OpsSASLAuthScram { encoder: &E, dispatcher: &D, opts: SASLAuthScramOptions, - ) -> MemdxResult<()> + ) -> Result<()> where E: OpSASLScramEncoder, D: Dispatcher, @@ -86,9 +86,10 @@ impl OpsSASLAuthScram { run_op_future_with_deadline(opts.deadline, encoder.sasl_step(dispatcher, req)).await?; if resp.needs_more_steps { - return Err(MemdxError::Protocol( - "Server did not accept auth when the client expected".to_string(), - )); + return Err(ErrorKind::Protocol { + msg: "Server did not accept auth when the client expected".to_string(), + } + .into()); } Ok(()) @@ -99,7 +100,7 @@ impl OpsSASLAuthScram { encoder: &E, dispatcher: &D, opts: SASLAuthScramOptions, - ) -> MemdxResult<()> + ) -> Result<()> where E: OpSASLScramEncoder, D: Dispatcher, @@ -133,9 +134,10 @@ impl OpsSASLAuthScram { run_op_future_with_deadline(opts.deadline, encoder.sasl_step(dispatcher, req)).await?; if resp.needs_more_steps { - return Err(MemdxError::Protocol( - "Server did not accept auth when the client expected".to_string(), - )); + return Err(ErrorKind::Protocol { + msg: "Server did not accept auth when the client expected".to_string(), + } + .into()); } Ok(()) @@ -146,7 +148,7 @@ impl OpsSASLAuthScram { encoder: &E, dispatcher: &D, opts: SASLAuthScramOptions, - ) -> MemdxResult<()> + ) -> Result<()> where E: OpSASLScramEncoder, D: Dispatcher, @@ -179,9 +181,10 @@ impl OpsSASLAuthScram { run_op_future_with_deadline(opts.deadline, encoder.sasl_step(dispatcher, req)).await?; if resp.needs_more_steps { - return Err(MemdxError::Protocol( - "Server did not accept auth when the client expected".to_string(), - )); + return Err(ErrorKind::Protocol { + msg: "Server did not accept auth when the client expected".to_string(), + } + .into()); } Ok(()) diff --git a/sdk/couchbase-core/src/memdx/op_bootstrap.rs b/sdk/couchbase-core/src/memdx/op_bootstrap.rs index 962a645b..44effdb9 100644 --- a/sdk/couchbase-core/src/memdx/op_bootstrap.rs +++ b/sdk/couchbase-core/src/memdx/op_bootstrap.rs @@ -2,9 +2,9 @@ use log::warn; use tokio::select; use tokio::time::{Instant, sleep}; -use crate::memdx::client::MemdxResult; use crate::memdx::dispatcher::Dispatcher; use crate::memdx::error::CancellationErrorKind; +use crate::memdx::error::Result; use crate::memdx::op_auth_saslauto::{OpSASLAutoEncoder, OpsSASLAuthAuto, SASLAuthAutoOptions}; use crate::memdx::op_auth_saslplain::OpSASLPlainEncoder; use crate::memdx::pendingop::{PendingOp, run_op_future_with_deadline, StandardPendingOp}; @@ -22,7 +22,7 @@ pub trait OpBootstrapEncoder { &self, dispatcher: &D, request: HelloRequest, - ) -> impl std::future::Future>> + ) -> impl std::future::Future>> where D: Dispatcher; @@ -30,7 +30,7 @@ pub trait OpBootstrapEncoder { &self, dispatcher: &D, request: GetErrorMapRequest, - ) -> impl std::future::Future>> + ) -> impl std::future::Future>> where D: Dispatcher; @@ -38,7 +38,7 @@ pub trait OpBootstrapEncoder { &self, dispatcher: &D, request: SelectBucketRequest, - ) -> impl std::future::Future>> + ) -> impl std::future::Future>> where D: Dispatcher; @@ -46,7 +46,7 @@ pub trait OpBootstrapEncoder { &self, dispatcher: &D, request: GetClusterConfigRequest, - ) -> impl std::future::Future>> + ) -> impl std::future::Future>> where D: Dispatcher; } @@ -70,7 +70,7 @@ impl OpBootstrap { encoder: E, dispatcher: &D, opts: BootstrapOptions, - ) -> MemdxResult + ) -> Result where E: OpBootstrapEncoder + OpSASLAutoEncoder, D: Dispatcher, diff --git a/sdk/couchbase-core/src/memdx/opcode.rs b/sdk/couchbase-core/src/memdx/opcode.rs index f74771e7..57d3fc80 100644 --- a/sdk/couchbase-core/src/memdx/opcode.rs +++ b/sdk/couchbase-core/src/memdx/opcode.rs @@ -1,6 +1,6 @@ use std::fmt::{Display, Formatter}; -use crate::memdx::error::MemdxError; +use crate::memdx::error::Error; #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum OpCode { @@ -14,6 +14,7 @@ pub enum OpCode { SASLAuth, SASLListMechs, SASLStep, + Unknown(u8), } impl From for u8 { @@ -29,12 +30,13 @@ impl From for u8 { OpCode::SelectBucket => 0x89, OpCode::GetClusterConfig => 0xb5, OpCode::GetErrorMap => 0xfe, + OpCode::Unknown(code) => code, } } } impl TryFrom for OpCode { - type Error = MemdxError; + type Error = Error; fn try_from(value: u8) -> Result { let code = match value { @@ -48,9 +50,7 @@ impl TryFrom for OpCode { 0x89 => OpCode::SelectBucket, 0xb5 => OpCode::GetClusterConfig, 0xfe => OpCode::GetErrorMap, - _ => { - return Err(MemdxError::Protocol(format!("unknown opcode {}", value))); - } + _ => OpCode::Unknown(value), }; Ok(code) @@ -70,6 +70,9 @@ impl Display for OpCode { OpCode::SASLAuth => "SASL auth", OpCode::SASLListMechs => "SASL list mechanisms", OpCode::SASLStep => "SASL step", + OpCode::Unknown(code) => { + return write!(f, "x{:02x}", code); + } }; write!(f, "{}", txt) } diff --git a/sdk/couchbase-core/src/memdx/ops_core.rs b/sdk/couchbase-core/src/memdx/ops_core.rs index df076f12..b5b73d29 100644 --- a/sdk/couchbase-core/src/memdx/ops_core.rs +++ b/sdk/couchbase-core/src/memdx/ops_core.rs @@ -2,9 +2,9 @@ use std::io::Write; use byteorder::{BigEndian, WriteBytesExt}; -use crate::memdx::client::MemdxResult; use crate::memdx::dispatcher::Dispatcher; -use crate::memdx::error::MemdxError; +use crate::memdx::error::{Error, ServerErrorKind}; +use crate::memdx::error::Result; use crate::memdx::magic::Magic; use crate::memdx::op_auth_saslauto::OpSASLAutoEncoder; use crate::memdx::op_auth_saslbyname::OpSASLAuthByNameEncoder; @@ -27,14 +27,14 @@ use crate::memdx::status::Status; pub struct OpsCore {} impl OpsCore { - pub(crate) fn decode_error(resp: &ResponsePacket) -> MemdxError { + pub(crate) fn decode_error(resp: &ResponsePacket) -> Error { let status = resp.status; if status == Status::NotMyVbucket { - MemdxError::NotMyVbucket + ServerErrorKind::NotMyVbucket.into() } else if status == Status::TmpFail { - MemdxError::TmpFail + ServerErrorKind::TmpFail.into() } else { - MemdxError::Unknown(format!("{}", status)) + ServerErrorKind::UnknownStatus { status }.into() } // TODO: decode error context @@ -46,7 +46,7 @@ impl OpBootstrapEncoder for OpsCore { &self, dispatcher: &D, request: HelloRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -77,7 +77,7 @@ impl OpBootstrapEncoder for OpsCore { &self, dispatcher: &D, request: GetErrorMapRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -106,7 +106,7 @@ impl OpBootstrapEncoder for OpsCore { &self, dispatcher: &D, request: SelectBucketRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -135,7 +135,7 @@ impl OpBootstrapEncoder for OpsCore { &self, dispatcher: &D, _request: GetClusterConfigRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -163,7 +163,7 @@ impl OpSASLPlainEncoder for OpsCore { &self, dispatcher: &D, request: SASLAuthRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -196,7 +196,7 @@ impl OpSASLAutoEncoder for OpsCore { &self, dispatcher: &D, _request: SASLListMechsRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -224,7 +224,7 @@ impl OpSASLScramEncoder for OpsCore { &self, dispatcher: &D, request: SASLStepRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { diff --git a/sdk/couchbase-core/src/memdx/ops_crud.rs b/sdk/couchbase-core/src/memdx/ops_crud.rs index 88cd5afd..ef28d453 100644 --- a/sdk/couchbase-core/src/memdx/ops_crud.rs +++ b/sdk/couchbase-core/src/memdx/ops_crud.rs @@ -3,10 +3,10 @@ use std::time::Duration; use byteorder::{BigEndian, WriteBytesExt}; use bytes::{BufMut, BytesMut}; -use crate::memdx::client::MemdxResult; use crate::memdx::dispatcher::Dispatcher; use crate::memdx::durability_level::{DurabilityLevel, DurabilityLevelSettings}; -use crate::memdx::error::MemdxError; +use crate::memdx::error::{Error, ErrorKind, ServerErrorKind}; +use crate::memdx::error::Result; use crate::memdx::ext_frame_code::{ExtReqFrameCode, ExtResFrameCode}; use crate::memdx::magic::Magic; use crate::memdx::opcode::OpCode; @@ -30,7 +30,7 @@ impl OpsCrud { &self, dispatcher: &D, request: SetRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -76,7 +76,7 @@ impl OpsCrud { &self, dispatcher: &D, request: GetRequest, - ) -> MemdxResult> + ) -> Result> where D: Dispatcher, { @@ -110,10 +110,13 @@ impl OpsCrud { Ok(StandardPendingOp::new(pending_op)) } - fn encode_collection_and_key(&self, collection_id: u32, key: Vec) -> MemdxResult> { + fn encode_collection_and_key(&self, collection_id: u32, key: Vec) -> Result> { if !self.collections_enabled { if collection_id != 0 { - return Err(MemdxError::CollectionsNotEnabled); + return Err(ErrorKind::InvalidArgument { + msg: "collections are not enabled".to_string(), + } + .into()); } return Ok(key); @@ -129,15 +132,15 @@ impl OpsCrud { preserve_expiry: Option, on_behalf_of: Option, buf: &mut Vec, - ) -> MemdxResult { + ) -> Result { if let Some(obo) = on_behalf_of { append_ext_frame(ExtReqFrameCode::OnBehalfOf, obo.into_bytes(), buf)?; } if let Some(dura) = durability_level { if !self.durability_enabled { - return Err(MemdxError::Protocol( - "Cannot use synchronous durability when its not enabled".to_string(), + return Err(Error::new_protocol_error( + "Cannot use synchronous durability when its not enabled", )); } @@ -145,15 +148,15 @@ impl OpsCrud { append_ext_frame(ExtReqFrameCode::Durability, dura_buf, buf)?; } else if durability_timeout.is_some() { - return Err(MemdxError::Protocol( - "Cannot encode durability timeout without durability level".to_string(), + return Err(Error::new_protocol_error( + "Cannot encode durability timeout without durability level", )); } if preserve_expiry.is_some() { if !self.preserve_expiry_enabled { - return Err(MemdxError::Protocol( - "Cannot use preserve expiry when its not enabled".to_string(), + return Err(Error::new_protocol_error( + "Cannot use preserve expiry when its not enabled", )); } @@ -162,8 +165,8 @@ impl OpsCrud { let magic = if !buf.is_empty() { if !self.ext_frames_enabled { - return Err(MemdxError::Protocol( - "Cannot use framing extras when its not enabled".to_string(), + return Err(Error::new_protocol_error( + "Cannot use framing extras when its not enabled", )); } @@ -175,10 +178,10 @@ impl OpsCrud { Ok(magic) } - pub(crate) fn decode_common_status(status: Status) -> MemdxResult<()> { + pub(crate) fn decode_common_status(status: Status) -> Result<()> { let err = match status { - Status::CollectionUnknown => MemdxError::UnknownCollectionID, - Status::AccessError => MemdxError::Access, + Status::CollectionUnknown => ServerErrorKind::UnknownCollectionID.into(), + Status::AccessError => ServerErrorKind::Access.into(), _ => { return Ok(()); } @@ -187,7 +190,7 @@ impl OpsCrud { Err(err) } - pub(crate) fn decode_common_error(resp: &ResponsePacket) -> MemdxError { + pub(crate) fn decode_common_error(resp: &ResponsePacket) -> Error { if let Err(e) = Self::decode_common_status(resp.status) { return e; }; @@ -196,7 +199,7 @@ impl OpsCrud { } } -pub(crate) fn decode_res_ext_frames(buf: &[u8]) -> MemdxResult> { +pub(crate) fn decode_res_ext_frames(buf: &[u8]) -> Result> { let mut server_duration_data = None; iter_ext_frames(buf, |code, data| { @@ -212,10 +215,10 @@ pub(crate) fn decode_res_ext_frames(buf: &[u8]) -> MemdxResult> Ok(None) } -pub fn decode_ext_frame(buf: &[u8]) -> MemdxResult<(ExtResFrameCode, Vec, usize)> { +pub fn decode_ext_frame(buf: &[u8]) -> Result<(ExtResFrameCode, Vec, usize)> { if buf.is_empty() { - return Err(MemdxError::Protocol( - "Framing extras protocol error".to_string(), + return Err(Error::new_protocol_error( + "Framing extras new_protocol_error error", )); } @@ -229,7 +232,7 @@ pub fn decode_ext_frame(buf: &[u8]) -> MemdxResult<(ExtResFrameCode, Vec, us if u_frame_code == 15 { if buf.len() < buf_pos + 1 { - return Err(MemdxError::Protocol("Unexpected eof".to_string())); + return Err(Error::new_protocol_error("Unexpected eof")); } let frame_code_ext = buf[buf_pos]; @@ -240,7 +243,7 @@ pub fn decode_ext_frame(buf: &[u8]) -> MemdxResult<(ExtResFrameCode, Vec, us if frame_len == 15 { if buf.len() < buf_pos + 1 { - return Err(MemdxError::Protocol("Unexpected eof".to_string())); + return Err(Error::new_protocol_error("Unexpected eof")); } let frame_len_ext = buf[buf_pos]; @@ -250,7 +253,7 @@ pub fn decode_ext_frame(buf: &[u8]) -> MemdxResult<(ExtResFrameCode, Vec, us let u_frame_len = frame_len as usize; if buf.len() < buf_pos + u_frame_len { - return Err(MemdxError::Protocol("unexpected eof".to_string())); + return Err(Error::new_protocol_error("unexpected eof")); } let frame_body = &buf[buf_pos..buf_pos + u_frame_len]; @@ -259,10 +262,7 @@ pub fn decode_ext_frame(buf: &[u8]) -> MemdxResult<(ExtResFrameCode, Vec, us Ok((frame_code, frame_body.to_vec(), buf_pos)) } -fn iter_ext_frames( - buf: &[u8], - mut cb: impl FnMut(ExtResFrameCode, Vec), -) -> MemdxResult> { +fn iter_ext_frames(buf: &[u8], mut cb: impl FnMut(ExtResFrameCode, Vec)) -> Result> { if !buf.is_empty() { let (frame_code, frame_body, buf_pos) = decode_ext_frame(buf)?; @@ -278,7 +278,7 @@ pub fn append_ext_frame( frame_code: ExtReqFrameCode, frame_body: Vec, buf: &mut Vec, -) -> MemdxResult<()> { +) -> Result<()> { let frame_len = frame_body.len(); let buf_len = buf.len(); @@ -290,8 +290,8 @@ pub fn append_ext_frame( *hdr_byte_ptr = (*hdr_byte_ptr as u16 | ((u_frame_code & 0x0f) << 4)) as u8; } else { if u_frame_code - 15 >= 15 { - return Err(MemdxError::Protocol( - "Extframe code too large to encode".to_string(), + return Err(Error::new_protocol_error( + "Extframe code too large to encode", )); } @@ -331,7 +331,7 @@ pub fn make_uleb128_32(key: Vec, collection_id: u32) -> Vec { fn encode_durability_ext_frame( level: DurabilityLevel, timeout: Option, -) -> MemdxResult> { +) -> Result> { if timeout.is_none() { return Ok(vec![level.into()]); } @@ -340,8 +340,8 @@ fn encode_durability_ext_frame( let mut timeout_millis = timeout.as_millis(); if timeout_millis > 65535 { - return Err(MemdxError::Protocol( - "Cannot encode durability timeout greater than 65535 milliseconds".to_string(), + return Err(Error::new_protocol_error( + "Cannot encode durability timeout greater than 65535 milliseconds", )); } @@ -356,10 +356,10 @@ fn encode_durability_ext_frame( Ok(buf) } -pub(crate) fn decode_server_duration_ext_frame(mut data: Vec) -> MemdxResult { +pub(crate) fn decode_server_duration_ext_frame(mut data: Vec) -> Result { if data.len() != 2 { - return Err(MemdxError::Protocol( - "Invalid server duration extframe length".to_string(), + return Err(Error::new_protocol_error( + "Invalid server duration extframe length", )); } @@ -371,7 +371,7 @@ pub(crate) fn decode_server_duration_ext_frame(mut data: Vec) -> MemdxResult pub(crate) fn decode_durability_level_ext_frame( data: &mut Vec, -) -> MemdxResult { +) -> Result { if data.len() == 1 { let durability = DurabilityLevel::from(data.remove(0)); @@ -386,7 +386,7 @@ pub(crate) fn decode_durability_level_ext_frame( )); } - Err(MemdxError::Protocol( - "Invalid durability extframe length".to_string(), + Err(Error::new_protocol_error( + "Invalid durability extframe length", )) } diff --git a/sdk/couchbase-core/src/memdx/pendingop.rs b/sdk/couchbase-core/src/memdx/pendingop.rs index 58437d70..89f22052 100644 --- a/sdk/couchbase-core/src/memdx/pendingop.rs +++ b/sdk/couchbase-core/src/memdx/pendingop.rs @@ -8,14 +8,14 @@ use tokio::sync::mpsc::Receiver; use tokio::time::{Instant, timeout_at}; use crate::memdx::client::CancellationSender; -use crate::memdx::client::MemdxResult; use crate::memdx::client_response::ClientResponse; -use crate::memdx::error::CancellationErrorKind; -use crate::memdx::error::MemdxError::{Cancelled, Closed}; +use crate::memdx::error::{CancellationErrorKind, ErrorKind}; +use crate::memdx::error::Error; +use crate::memdx::error::Result; use crate::memdx::response::TryFromClientResponse; pub trait PendingOp { - fn recv(&mut self) -> impl std::future::Future> + fn recv(&mut self) -> impl std::future::Future> where T: TryFromClientResponse; fn cancel(&mut self, e: CancellationErrorKind); @@ -28,14 +28,14 @@ pub(crate) trait OpCanceller { pub struct ClientPendingOp { opaque: u32, cancel_chan: CancellationSender, - response_receiver: Receiver>, + response_receiver: Receiver>, } impl ClientPendingOp { pub(crate) fn new( opaque: u32, cancel_chan: CancellationSender, - response_receiver: Receiver>, + response_receiver: Receiver>, ) -> Self { ClientPendingOp { opaque, @@ -44,10 +44,10 @@ impl ClientPendingOp { } } - pub async fn recv(&mut self) -> MemdxResult { + pub async fn recv(&mut self) -> Result { match self.response_receiver.recv().await { Some(r) => r, - None => Err(Closed), + None => Err(ErrorKind::Closed.into()), } } @@ -76,7 +76,7 @@ impl StandardPendingOp { } impl PendingOp for StandardPendingOp { - async fn recv(&mut self) -> MemdxResult { + async fn recv(&mut self) -> Result { let packet = self.wrapped.recv().await?; T::try_from(packet) @@ -87,19 +87,16 @@ impl PendingOp for StandardPendingOp { } } -pub(super) async fn run_op_future_with_deadline( - deadline: Instant, - fut: F, -) -> MemdxResult +pub(super) async fn run_op_future_with_deadline(deadline: Instant, fut: F) -> Result where O: PendingOp, - F: Future>, + F: Future>, T: TryFromClientResponse, { let mut op = match timeout_at(deadline, fut).await { Ok(op) => op?, Err(_e) => { - return Err(Cancelled(CancellationErrorKind::Timeout)); + return Err(ErrorKind::Cancelled(CancellationErrorKind::Timeout).into()); } }; diff --git a/sdk/couchbase-core/src/memdx/response.rs b/sdk/couchbase-core/src/memdx/response.rs index 8c9b05ea..42c53f90 100644 --- a/sdk/couchbase-core/src/memdx/response.rs +++ b/sdk/couchbase-core/src/memdx/response.rs @@ -5,14 +5,14 @@ use byteorder::{BigEndian, ReadBytesExt}; use crate::memdx::auth_mechanism::AuthMechanism; use crate::memdx::client_response::ClientResponse; -use crate::memdx::error::MemdxError; +use crate::memdx::error::{Error, ErrorKind, ServerErrorKind}; use crate::memdx::hello_feature::HelloFeature; use crate::memdx::ops_core::OpsCore; use crate::memdx::ops_crud::{decode_res_ext_frames, OpsCrud}; use crate::memdx::status::Status; pub trait TryFromClientResponse: Sized { - fn try_from(resp: ClientResponse) -> Result; + fn try_from(resp: ClientResponse) -> Result; } #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] @@ -21,7 +21,7 @@ pub struct HelloResponse { } impl TryFromClientResponse for HelloResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status != Status::Success { @@ -31,7 +31,10 @@ impl TryFromClientResponse for HelloResponse { let mut features: Vec = Vec::new(); if let Some(value) = &packet.value { if value.len() % 2 != 0 { - return Err(MemdxError::Protocol("invalid hello features length".into())); + return Err(ErrorKind::Protocol { + msg: "invalid hello features length".into(), + } + .into()); } let mut cursor = Cursor::new(value); @@ -53,7 +56,7 @@ pub struct GetErrorMapResponse { } impl TryFromClientResponse for GetErrorMapResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status != Status::Success { @@ -72,12 +75,12 @@ impl TryFromClientResponse for GetErrorMapResponse { pub struct SelectBucketResponse {} impl TryFromClientResponse for SelectBucketResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status != Status::Success { if status == Status::AccessError || status == Status::KeyNotFound { - return Err(MemdxError::UnknownBucketName); + return Err(ErrorKind::UnknownBucketName.into()); } return Err(OpsCore::decode_error(packet)); } @@ -93,7 +96,7 @@ pub struct SASLAuthResponse { } impl TryFromClientResponse for SASLAuthResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status == Status::SASLAuthContinue { @@ -124,7 +127,7 @@ pub struct SASLStepResponse { } impl TryFromClientResponse for SASLStepResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status != Status::Success { @@ -145,7 +148,7 @@ pub struct SASLListMechsResponse { } impl TryFromClientResponse for SASLListMechsResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status != Status::Success { @@ -154,7 +157,7 @@ impl TryFromClientResponse for SASLListMechsResponse { // ns_server has not posted a configuration for the bucket to kv_engine yet. We // transform this into a ErrTmpFail as we make the assumption that the // SelectBucket will have failed if this was anything but a transient issue. - return Err(MemdxError::ConfigNotSet); + return Err(ServerErrorKind::ConfigNotSet.into()); } return Err(OpsCore::decode_error(packet)); } @@ -164,7 +167,7 @@ impl TryFromClientResponse for SASLListMechsResponse { let mechs_list_string = match String::from_utf8(value) { Ok(v) => v, Err(e) => { - return Err(MemdxError::Protocol(e.to_string())); + return Err(ErrorKind::Protocol { msg: e.to_string() }.into()); } }; let mechs_list_split = mechs_list_string.split(' '); @@ -201,7 +204,7 @@ impl GetClusterConfigResponse { } impl TryFromClientResponse for GetClusterConfigResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status != Status::Success { @@ -211,16 +214,17 @@ impl TryFromClientResponse for GetClusterConfigResponse { let host = match resp.local_addr() { Some(addr) => addr.ip().to_string(), None => { - return Err(MemdxError::Generic( - "Failed to identify memd hostname for $HOST replacement".to_string(), - )) + return Err(ErrorKind::Protocol { + msg: "Failed to identify memd hostname for $HOST replacement".to_string(), + } + .into()); } }; // TODO: Clone, maybe also inefficient? let value = match std::str::from_utf8(packet.value.clone().unwrap_or_default().as_slice()) { Ok(v) => v.to_string(), - Err(e) => "".to_string(), + Err(_e) => "".to_string(), }; let out = value.replace("$HOST", host.as_ref()); @@ -252,35 +256,36 @@ pub struct SetResponse { } impl TryFromClientResponse for SetResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status == Status::TooBig { - return Err(MemdxError::TooBig); + return Err(ServerErrorKind::TooBig.into()); } else if status == Status::Locked { - return Err(MemdxError::Locked); + return Err(ServerErrorKind::Locked.into()); } else if status == Status::KeyExists { - return Err(MemdxError::KeyExists); + return Err(ServerErrorKind::KeyExists.into()); } else if status != Status::Success { - return Err(MemdxError::Unknown( - OpsCrud::decode_common_error(resp.packet()).to_string(), - )); + return Err(OpsCrud::decode_common_error(resp.packet())); } let mutation_token = if let Some(extras) = &packet.extras { if extras.len() != 16 { - return Err(MemdxError::Protocol("Bad extras length".to_string())); + return Err(ErrorKind::Protocol { + msg: "Bad extras length".to_string(), + } + .into()); } let mut extras = Cursor::new(extras); Some(MutationToken { vbuuid: extras .read_u64::() - .map_err(|e| MemdxError::Unknown(e.to_string()))?, + .map_err(|e| Error::from(ErrorKind::Protocol { msg: e.to_string() }))?, seqno: extras .read_u64::() - .map_err(|e| MemdxError::Unknown(e.to_string()))?, + .map_err(|e| Error::from(ErrorKind::Protocol { msg: e.to_string() }))?, }) } else { None @@ -310,29 +315,35 @@ pub struct GetResponse { } impl TryFromClientResponse for GetResponse { - fn try_from(resp: ClientResponse) -> Result { + fn try_from(resp: ClientResponse) -> Result { let packet = resp.packet(); let status = packet.status; if status == Status::KeyNotFound { - return Err(MemdxError::KeyNotFound); + return Err(ServerErrorKind::KeyNotFound.into()); } else if status != Status::Success { - return Err(MemdxError::Unknown( - OpsCrud::decode_common_error(resp.packet()).to_string(), - )); + return Err(OpsCrud::decode_common_error(resp.packet())); } let flags = if let Some(extras) = &packet.extras { if extras.len() != 4 { - return Err(MemdxError::Protocol("Bad extras length".to_string())); + return Err(ErrorKind::Protocol { + msg: "Bad extras length".to_string(), + } + .into()); } let mut extras = Cursor::new(extras); - extras - .read_u32::() - .map_err(|e| MemdxError::Unknown(e.to_string()))? + extras.read_u32::().map_err(|e| { + Error::from(ErrorKind::Protocol { + msg: "Bad extras length".to_string(), + }) + })? } else { - return Err(MemdxError::Protocol("Bad extras length".to_string())); + return Err(ErrorKind::Protocol { + msg: "Bad extras length".to_string(), + } + .into()); }; let server_duration = if let Some(f) = &packet.framing_extras { diff --git a/sdk/couchbase-core/src/memdx/sync_helpers.rs b/sdk/couchbase-core/src/memdx/sync_helpers.rs index 38b481b8..61f924e4 100644 --- a/sdk/couchbase-core/src/memdx/sync_helpers.rs +++ b/sdk/couchbase-core/src/memdx/sync_helpers.rs @@ -1,13 +1,13 @@ use std::future::Future; -use crate::memdx::client::MemdxResult; +use crate::memdx::error::Result; use crate::memdx::pendingop::{PendingOp, StandardPendingOp}; use crate::memdx::response::TryFromClientResponse; -pub async fn sync_unary_call(fut: Fut) -> MemdxResult +pub async fn sync_unary_call(fut: Fut) -> Result where RespT: TryFromClientResponse, - Fut: Future>>, + Fut: Future>>, { let mut op = fut.await?;