Skip to content

Commit

Permalink
RSCBC-63: Refactor analytics options ownership model
Browse files Browse the repository at this point in the history
  • Loading branch information
chvck committed Jan 15, 2025
1 parent d435288 commit e6aaf4a
Show file tree
Hide file tree
Showing 13 changed files with 623 additions and 126 deletions.
5 changes: 1 addition & 4 deletions sdk/couchbase-core/src/agent_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,7 @@ impl Agent {
self.inner.search.query(opts).await
}

pub async fn analytics<'a>(
&self,
opts: &AnalyticsOptions<'a>,
) -> Result<AnalyticsResultStream> {
pub async fn analytics<'a>(&self, opts: AnalyticsOptions<'a>) -> Result<AnalyticsResultStream> {
self.inner.analytics.query(opts).await
}

Expand Down
3 changes: 2 additions & 1 deletion sdk/couchbase-core/src/analyticscomponent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl<C: Client> AnalyticsComponent<C> {

pub async fn query<'a>(
&self,
opts: &AnalyticsOptions<'a>,
opts: AnalyticsOptions<'a>,
) -> error::Result<AnalyticsResultStream> {
let retry = if let Some(retry_strategy) = opts.retry_strategy.clone() {
retry_strategy
Expand All @@ -95,6 +95,7 @@ impl<C: Client> AnalyticsComponent<C> {
let retry_info = RetryInfo::new(opts.read_only.unwrap_or_default(), retry);

let endpoint = opts.endpoint.clone();

let copts = opts.into();

orchestrate_retries(self.retry_manager.clone(), retry_info, async || {
Expand Down
97 changes: 87 additions & 10 deletions sdk/couchbase-core/src/analyticsoptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,31 @@ use crate::analyticsx;
use crate::analyticsx::query_options::ScanConsistency;
use crate::httpx::request::OnBehalfOfInfo;
use crate::retry::RetryStrategy;
use serde_json::value::RawValue;
use serde_json::value::Value;
use std::collections::HashMap;
use std::sync::Arc;
use typed_builder::TypedBuilder;

#[derive(Debug, Clone, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct AnalyticsOptions<'a> {
pub client_context_id: Option<&'a str>,
pub priority: Option<i32>,
pub query_context: Option<&'a str>,
pub read_only: Option<bool>,
pub scan_consistency: Option<ScanConsistency>,
#[builder(!default)]
pub statement: &'a str,

pub args: Option<&'a [&'a RawValue]>,
pub named_args: Option<&'a HashMap<&'a str, &'a RawValue>>,
pub raw: Option<&'a HashMap<&'a str, &'a RawValue>>,
pub args: Option<&'a [Value]>,
pub named_args: Option<&'a HashMap<String, Value>>,
pub raw: Option<&'a HashMap<String, Value>>,

pub on_behalf_of: Option<&'a OnBehalfOfInfo>,
pub endpoint: Option<String>,
pub retry_strategy: Option<Arc<dyn RetryStrategy>>,
}

impl<'a> From<&AnalyticsOptions<'a>> for analyticsx::query_options::QueryOptions<'a> {
fn from(opts: &AnalyticsOptions<'a>) -> Self {
impl<'a> From<AnalyticsOptions<'a>> for analyticsx::query_options::QueryOptions<'a> {
fn from(opts: AnalyticsOptions<'a>) -> Self {
Self {
client_context_id: opts.client_context_id,
priority: opts.priority,
Expand All @@ -44,3 +41,83 @@ impl<'a> From<&AnalyticsOptions<'a>> for analyticsx::query_options::QueryOptions
}
}
}

impl<'a> AnalyticsOptions<'a> {
pub fn new(statement: &'a str) -> Self {
Self {
client_context_id: None,
priority: None,
query_context: None,
read_only: None,
scan_consistency: None,
statement,
args: None,
named_args: None,
raw: None,
on_behalf_of: None,
endpoint: None,
retry_strategy: None,
}
}

pub fn client_context_id(mut self, client_context_id: impl Into<Option<&'a str>>) -> Self {
self.client_context_id = client_context_id.into();
self
}

pub fn priority(mut self, priority: impl Into<Option<i32>>) -> Self {
self.priority = priority.into();
self
}

pub fn query_context(mut self, query_context: impl Into<Option<&'a str>>) -> Self {
self.query_context = query_context.into();
self
}

pub fn read_only(mut self, read_only: impl Into<Option<bool>>) -> Self {
self.read_only = read_only.into();
self
}

pub fn scan_consistency(
mut self,
scan_consistency: impl Into<Option<ScanConsistency>>,
) -> Self {
self.scan_consistency = scan_consistency.into();
self
}

pub fn args(mut self, args: impl Into<Option<&'a [Value]>>) -> Self {
self.args = args.into();
self
}

pub fn named_args(mut self, named_args: impl Into<Option<&'a HashMap<String, Value>>>) -> Self {
self.named_args = named_args.into();
self
}

pub fn raw(mut self, raw: impl Into<Option<&'a HashMap<String, Value>>>) -> Self {
self.raw = raw.into();
self
}

pub fn on_behalf_of(mut self, on_behalf_of: impl Into<Option<&'a OnBehalfOfInfo>>) -> Self {
self.on_behalf_of = on_behalf_of.into();
self
}

pub fn endpoint(mut self, endpoint: impl Into<Option<String>>) -> Self {
self.endpoint = endpoint.into();
self
}

pub fn retry_strategy(
mut self,
retry_strategy: impl Into<Option<Arc<dyn RetryStrategy>>>,
) -> Self {
self.retry_strategy = retry_strategy.into();
self
}
}
34 changes: 31 additions & 3 deletions sdk/couchbase-core/src/analyticsx/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,40 @@ impl<C: Client> Analytics<C> {
let statement = opts.statement;
let on_behalf_of = opts.on_behalf_of;

let body = serde_json::to_vec(opts).map_err(|e| {
let mut body = serde_json::to_value(opts).map_err(|e| {
Error::new_generic_error(
e.to_string(),
&self.endpoint,
statement,
client_context_id.clone().map(|s| s.to_string()),
client_context_id.clone(),
)
})?;

// Unwrap is fine, we know this is an object.
let mut body_obj = body.as_object_mut().unwrap();
if let Some(named_args) = &opts.named_args {
for (k, v) in named_args.iter() {
let key = if k.starts_with("$") {
k.clone()
} else {
format!("${}", k)
};
body_obj.insert(key, v.clone());
}
}

if let Some(raw) = &opts.raw {
for (k, v) in raw.iter() {
body_obj.insert(k.to_string(), v.clone());
}
}

let body = serde_json::to_vec(body_obj).map_err(|e| {
Error::new_generic_error(
e.to_string(),
&self.endpoint,
statement,
client_context_id.clone(),
)
})?;

Expand Down Expand Up @@ -116,6 +144,6 @@ impl<C: Client> Analytics<C> {
)
})?;

QueryRespReader::new(res, &self.endpoint, statement, client_context_id).await
QueryRespReader::new(res, &self.endpoint, statement, client_context_id.clone()).await
}
}
83 changes: 73 additions & 10 deletions sdk/couchbase-core/src/analyticsx/query_options.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::httpx::request::OnBehalfOfInfo;
use serde::Serialize;
use serde_json::value::RawValue;
use serde_json::Value;
use std::collections::HashMap;
use typed_builder::TypedBuilder;

#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
Expand All @@ -12,8 +11,7 @@ pub enum ScanConsistency {
RequestPlus,
}

#[derive(Debug, Clone, Default, Serialize, TypedBuilder)]
#[builder(field_defaults(setter(into)))]
#[derive(Debug, Clone, Serialize)]
#[non_exhaustive]
pub struct QueryOptions<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
Expand All @@ -26,16 +24,81 @@ pub struct QueryOptions<'a> {
pub read_only: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub scan_consistency: Option<ScanConsistency>,
#[builder(!default)]
pub statement: &'a str,

#[serde(skip_serializing_if = "Option::is_none")]
pub args: Option<&'a [&'a RawValue]>,
#[serde(skip_serializing_if = "Option::is_none", flatten)]
pub named_args: Option<&'a HashMap<&'a str, &'a RawValue>>,
#[serde(skip_serializing_if = "Option::is_none", flatten)]
pub raw: Option<&'a HashMap<&'a str, &'a RawValue>>,
pub args: Option<&'a [Value]>,
#[serde(skip_serializing)]
pub named_args: Option<&'a HashMap<String, Value>>,
#[serde(skip_serializing)]
pub raw: Option<&'a HashMap<String, Value>>,

#[serde(skip_serializing)]
pub on_behalf_of: Option<&'a OnBehalfOfInfo>,
}

impl<'a> QueryOptions<'a> {
pub fn new(statement: &'a str) -> Self {
Self {
client_context_id: None,
priority: None,
query_context: None,
read_only: None,
scan_consistency: None,
statement,

args: None,
named_args: None,
raw: None,
on_behalf_of: None,
}
}

pub fn client_context_id(mut self, client_context_id: impl Into<Option<&'a str>>) -> Self {
self.client_context_id = client_context_id.into();
self
}

pub fn priority(mut self, priority: impl Into<Option<i32>>) -> Self {
self.priority = priority.into();
self
}

pub fn query_context(mut self, query_context: impl Into<Option<&'a str>>) -> Self {
self.query_context = query_context.into();
self
}

pub fn read_only(mut self, read_only: impl Into<Option<bool>>) -> Self {
self.read_only = read_only.into();
self
}

pub fn scan_consistency(
mut self,
scan_consistency: impl Into<Option<ScanConsistency>>,
) -> Self {
self.scan_consistency = scan_consistency.into();
self
}

pub fn args(mut self, args: impl Into<Option<&'a [Value]>>) -> Self {
self.args = args.into();
self
}

pub fn named_args(mut self, named_args: impl Into<Option<&'a HashMap<String, Value>>>) -> Self {
self.named_args = named_args.into();
self
}

pub fn raw(mut self, raw: impl Into<Option<&'a HashMap<String, Value>>>) -> Self {
self.raw = raw.into();
self
}

pub fn on_behalf_of(mut self, on_behalf_of: impl Into<Option<&'a OnBehalfOfInfo>>) -> Self {
self.on_behalf_of = on_behalf_of.into();
self
}
}
2 changes: 1 addition & 1 deletion sdk/couchbase-core/src/retrybesteffort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;

use crate::retry::{RetryAction, RetryInfo, RetryReason, RetryStrategy};

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BestEffortRetryStrategy<Calc> {
backoff_calc: Calc,
}
Expand Down
51 changes: 51 additions & 0 deletions sdk/couchbase-core/tests/analytics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::common::default_agent_options::create_default_options;
use crate::common::test_config::setup_tests;
use couchbase_core::agent::Agent;
use couchbase_core::analyticsoptions::AnalyticsOptions;
use couchbase_core::analyticsx::query_respreader::Status;
use futures::StreamExt;
use serde_json::Value;

mod common;

#[tokio::test]
async fn test_analytics_basic() {
setup_tests().await;

let agent_opts = create_default_options().await;

let agent = Agent::new(agent_opts).await.unwrap();
let opts = AnalyticsOptions::new("FROM RANGE(0, 999) AS i SELECT *");

let mut res = agent.analytics(opts).await.unwrap();

let mut rows = vec![];
while let Some(row) = res.next().await {
rows.push(row.unwrap());
}

assert_eq!(1000, rows.len());

let row = rows.first().unwrap();

let row_value: Value = serde_json::from_slice(row).unwrap();
let row_obj = row_value.as_object().unwrap();

assert_eq!(0, row_obj.get("i").unwrap().as_u64().unwrap());

let meta = res.metadata().unwrap();

assert!(meta.request_id.is_some());
assert!(meta.client_context_id.is_none());
assert_eq!(Status::Success, meta.status.clone().unwrap());
assert!(meta.warnings.is_empty());

assert!(!meta.metrics.elapsed_time.is_zero());
assert!(!meta.metrics.execution_time.is_zero());
assert_eq!(1000, meta.metrics.result_count);
assert_ne!(0, meta.metrics.result_size);
assert_eq!(0, meta.metrics.error_count);
assert_eq!(0, meta.metrics.warning_count);

assert_eq!("{\"*\":\"*\"}", meta.signature.as_ref().unwrap().get());
}
Loading

0 comments on commit e6aaf4a

Please sign in to comment.