From 49b853ccc0fb7fdf10f1f71f0a08c9eaf4240616 Mon Sep 17 00:00:00 2001 From: asafmahlev Date: Wed, 31 Jan 2024 17:20:57 +0200 Subject: [PATCH] Some progress --- azure-kusto-data/src/error.rs | 30 ++++++- azure-kusto-data/src/models/test_helpers.rs | 4 +- azure-kusto-data/src/operations/v2.rs | 96 ++++++++++----------- 3 files changed, 78 insertions(+), 52 deletions(-) diff --git a/azure-kusto-data/src/error.rs b/azure-kusto-data/src/error.rs index 5554f3c..456a142 100644 --- a/azure-kusto-data/src/error.rs +++ b/azure-kusto-data/src/error.rs @@ -13,7 +13,7 @@ pub enum Error { /// Error in an external crate #[error("Error in external crate {0}")] - ExternalError(String), + ExternalError(Box), /// Error in HTTP #[error("Error in HTTP: {0} {1}")] @@ -60,6 +60,12 @@ pub enum Error { MultipleErrors(Vec), } +impl Into> for Error { + fn into(self) -> Partial { + Err((None, self)) + } +} + impl From> for Error { fn from(errors: Vec) -> Self { if errors.len() == 1 { @@ -100,6 +106,9 @@ pub enum ParseError { /// Raised when a dynamic value is failed to be parsed. #[error("Error parsing dynamic: {0}")] Dynamic(#[from] serde_json::Error), + + #[error("Error parsing Frame: {0}")] + Frame(String), } /// Errors raised when parsing connection strings. @@ -140,3 +149,22 @@ impl ConnectionStringError { /// Result type for kusto operations. pub type Result = std::result::Result; pub type Partial = std::result::Result, Error)>; + +pub(crate) trait PartialExt { + fn ignore_partial_results(self) -> Result; +} + +impl PartialExt for Partial { + fn ignore_partial_results(self) -> Result { + match self { + Ok(v) => Ok(v), + Err((_, e)) => Err(e), + } + } +} + +impl From> for Error { + fn from(e: tokio::sync::mpsc::error::SendError) -> Self { + Error::ExternalError(Box::new(e)) + } +} diff --git a/azure-kusto-data/src/models/test_helpers.rs b/azure-kusto-data/src/models/test_helpers.rs index 9394745..533e5a9 100644 --- a/azure-kusto-data/src/models/test_helpers.rs +++ b/azure-kusto-data/src/models/test_helpers.rs @@ -577,7 +577,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec { serde_json::Value::String("Visualization".to_string()), serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()), ]), - Row::Error((OneApiErrors { + Row::Error(OneApiErrors { errors: vec![OneApiError { error_message: crate::models::v2::ErrorMessage { code: "LimitsExceeded".to_string(), @@ -601,7 +601,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec { is_permanent: false, }, }] - })), + }), ], }), Frame::DataSetCompletion(DataSetCompletion { diff --git a/azure-kusto-data/src/operations/v2.rs b/azure-kusto-data/src/operations/v2.rs index 2e23509..1153461 100644 --- a/azure-kusto-data/src/operations/v2.rs +++ b/azure-kusto-data/src/operations/v2.rs @@ -1,4 +1,4 @@ -use crate::error::{Error::JsonError, Result}; +use crate::error::{Error::JsonError, Partial, PartialExt, Result}; use crate::models::v2; use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind}; use futures::lock::Mutex; @@ -7,10 +7,11 @@ use futures::{ }; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; +use crate::error::ParseError; pub fn parse_frames_iterative( reader: impl AsyncBufRead + Unpin, -) -> impl Stream> { +) -> impl Stream> { let buf = Vec::with_capacity(4096); stream::unfold((reader, buf), |(mut reader, mut buf)| async move { buf.clear(); @@ -34,7 +35,7 @@ pub fn parse_frames_iterative( pub async fn parse_frames_full( mut reader: (impl AsyncBufRead + Send + Unpin), -) -> Result> { +) -> Result> { let mut buf = Vec::new(); reader.read_to_end(&mut buf).await?; return Ok(serde_json::from_slice(&buf)?); @@ -50,11 +51,11 @@ struct StreamingDataset { completion: OM, query_properties: OM>, query_completion_information: OM>, - results: Receiver, + results: Receiver>, } impl StreamingDataset { - fn new(stream: impl Stream> + Send + 'static) -> Arc { + fn new(stream: impl Stream> + Send + 'static) -> Arc { let (tx, rx) = tokio::sync::mpsc::channel(1); let res = StreamingDataset { header: Arc::new(Mutex::new(None)), @@ -67,7 +68,9 @@ impl StreamingDataset { let tokio_res = res.clone(); // TODO: to spawn a task we have to have a runtime. We wanted to be runtime independent, and that may still be a desire, but currently azure core isn't, so we might as well use tokio here. tokio::spawn(async move { - tokio_res.populate_with_stream(stream, tx).await; + if let Err(e) = tokio_res.populate_with_stream(stream, tx).await { + let _ = tx.send(Err(e)).await; // Best effort to send the error to the receiver + } }); res @@ -75,9 +78,9 @@ impl StreamingDataset { async fn populate_with_stream( &self, - stream: impl Stream>, - tx: Sender, - ) { + stream: impl Stream>, + tx: Sender>, + ) -> Result<()> { pin_mut!(stream); let mut current_table = Some(DataTable { @@ -89,59 +92,54 @@ impl StreamingDataset { }); while let Some(frame) = stream.try_next().await.transpose() { - // TODO: handle errors - let frame = frame.expect("failed to read frame"); + let frame = frame?; match frame { - v2::Frame::DataSetHeader(header) => { + Frame::DataSetHeader(header) => { self.header.lock().await.replace(header); } - v2::Frame::DataSetCompletion(completion) => { + Frame::DataSetCompletion(completion) => { self.completion.lock().await.replace(completion); } - // TODO: properly handle errors/missing - v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => { - self.query_properties.lock().await.replace( - table - .deserialize_values::() - .expect("failed to deserialize query properties"), - ); - } - v2::Frame::DataTable(table) - if table.table_kind == TableKind::QueryCompletionInformation => - { - self.query_completion_information.lock().await.replace( - table - .deserialize_values::() - .expect("failed to deserialize query completion information"), - ); - } - v2::Frame::DataTable(table) => { - tx.send(table).await.expect("failed to send table"); - } - // TODO - handle errors - v2::Frame::TableHeader(table_header) => { - if let Some(table) = &mut current_table { - table.table_id = table_header.table_id; - table.table_name = table_header.table_name.clone(); - table.table_kind = table_header.table_kind; - table.columns = table_header.columns.clone(); + Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => { + let mut query_properties = self.query_properties.lock().await; + match table.deserialize_values::().ignore_partial_results() { + Ok(v) => {query_properties.replace(v);}, + Err(e) => tx.send(e.into()).await?, } } - v2::Frame::TableFragment(table_fragment) => { - if let Some(table) = &mut current_table { - table.rows.extend(table_fragment.rows); + Frame::DataTable(table) + if table.table_kind == TableKind::QueryCompletionInformation => + { + let mut query_completion = self.query_completion_information.lock().await; + match table.deserialize_values::().ignore_partial_results() { + Ok(v) => {query_completion.replace(v);}, + Err(e) => tx.send(e.into()).await?, + } } + Frame::DataTable(table) => { + tx.send(Ok(table)).await?; } - v2::Frame::TableCompletion(table_completion) => { - if let Some(table) = current_table.take() { - // TODO - handle errors - - tx.send(table).await.expect("failed to send table"); - } + Frame::TableHeader(table_header) => { + let mut table = current_table.take().ok_or(ParseError::Frame("Table is unexpectedly none".into()))?; + table.table_id = table_header.table_id; + table.table_name = table_header.table_name.clone(); + table.table_kind = table_header.table_kind; + table.columns = table_header.columns.clone(); + } + Frame::TableFragment(table_fragment) => { + current_table.take().ok_or(ParseError::Frame("TableFragment without TableHeader".into()))? + .rows.extend(table_fragment.rows); + } + Frame::TableCompletion(table_completion) => { + tx.send( + Ok(current_table.take().ok_or(ParseError::Frame("TableCompletion without TableHeader".into()))?) + ).await?; } Frame::TableProgress(_) => {} } } + + Ok(()) } }