Skip to content

Commit

Permalink
Some progress
Browse files Browse the repository at this point in the history
  • Loading branch information
AsafMah committed Jan 31, 2024
1 parent 26ac4e3 commit 49b853c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 52 deletions.
30 changes: 29 additions & 1 deletion azure-kusto-data/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub enum Error {

/// Error in an external crate
#[error("Error in external crate {0}")]
ExternalError(String),
ExternalError(Box<dyn std::error::Error + Send + Sync>),

/// Error in HTTP
#[error("Error in HTTP: {0} {1}")]
Expand Down Expand Up @@ -60,6 +60,12 @@ pub enum Error {
MultipleErrors(Vec<Error>),
}

impl<T> Into<Partial<T>> for Error {
fn into(self) -> Partial<T> {
Err((None, self))
}
}

impl From<Vec<Error>> for Error {
fn from(errors: Vec<Error>) -> Self {
if errors.len() == 1 {
Expand Down Expand Up @@ -100,6 +106,9 @@ pub enum ParseError {
/// Raised when a dynamic value is failed to be parsed.
#[error("Error parsing dynamic: {0}")]
Dynamic(#[from] serde_json::Error),

#[error("Error parsing Frame: {0}")]
Frame(String),
}

/// Errors raised when parsing connection strings.
Expand Down Expand Up @@ -140,3 +149,22 @@ impl ConnectionStringError {
/// Result type for kusto operations.
pub type Result<T> = std::result::Result<T, Error>;
pub type Partial<T> = std::result::Result<T, (Option<T>, Error)>;

pub(crate) trait PartialExt<T> {
fn ignore_partial_results(self) -> Result<T>;
}

impl<T> PartialExt<T> for Partial<T> {
fn ignore_partial_results(self) -> Result<T> {
match self {
Ok(v) => Ok(v),
Err((_, e)) => Err(e),
}
}
}

impl<T: Send + Sync> From<tokio::sync::mpsc::error::SendError<T>> for Error {
fn from(e: tokio::sync::mpsc::error::SendError<T>) -> Self {
Error::ExternalError(Box::new(e))
}
}
4 changes: 2 additions & 2 deletions azure-kusto-data/src/models/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
serde_json::Value::String("Visualization".to_string()),
serde_json::Value::String("{\"Visualization\":null,\"Title\":null,\"XColumn\":null,\"Series\":null,\"YColumns\":null,\"AnomalyColumns\":null,\"XTitle\":null,\"YTitle\":null,\"XAxis\":null,\"YAxis\":null,\"Legend\":null,\"YSplit\":null,\"Accumulate\":false,\"IsQuerySorted\":false,\"Kind\":null,\"Ymin\":\"NaN\",\"Ymax\":\"NaN\",\"Xmin\":null,\"Xmax\":null}".to_string()),
]),
Row::Error((OneApiErrors {
Row::Error(OneApiErrors {
errors: vec![OneApiError {
error_message: crate::models::v2::ErrorMessage {
code: "LimitsExceeded".to_string(),
Expand All @@ -601,7 +601,7 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
is_permanent: false,
},
}]
})),
}),
],
}),
Frame::DataSetCompletion(DataSetCompletion {
Expand Down
96 changes: 47 additions & 49 deletions azure-kusto-data/src/operations/v2.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::error::{Error::JsonError, Result};
use crate::error::{Error::JsonError, Partial, PartialExt, Result};
use crate::models::v2;
use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind};
use futures::lock::Mutex;
Expand All @@ -7,10 +7,11 @@ use futures::{
};

Check warning on line 7 in azure-kusto-data/src/operations/v2.rs

View workflow job for this annotation

GitHub Actions / clippy

unused import: `StreamExt`

warning: unused import: `StreamExt` --> azure-kusto-data/src/operations/v2.rs:7:75 | 7 | pin_mut, stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt, | ^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::error::ParseError;

pub fn parse_frames_iterative(
reader: impl AsyncBufRead + Unpin,
) -> impl Stream<Item = Result<v2::Frame>> {
) -> impl Stream<Item=Result<Frame>> {
let buf = Vec::with_capacity(4096);
stream::unfold((reader, buf), |(mut reader, mut buf)| async move {
buf.clear();
Expand All @@ -34,7 +35,7 @@ pub fn parse_frames_iterative(

pub async fn parse_frames_full(
mut reader: (impl AsyncBufRead + Send + Unpin),
) -> Result<Vec<v2::Frame>> {
) -> Result<Vec<Frame>> {
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;
return Ok(serde_json::from_slice(&buf)?);
Expand All @@ -50,11 +51,11 @@ struct StreamingDataset {
completion: OM<v2::DataSetCompletion>,
query_properties: OM<Vec<QueryProperties>>,
query_completion_information: OM<Vec<QueryCompletionInformation>>,
results: Receiver<DataTable>,
results: Receiver<Result<DataTable>>,
}

impl StreamingDataset {
fn new(stream: impl Stream<Item = Result<Frame>> + Send + 'static) -> Arc<Self> {
fn new(stream: impl Stream<Item=Result<Frame>> + Send + 'static) -> Arc<Self> {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let res = StreamingDataset {
header: Arc::new(Mutex::new(None)),
Expand All @@ -67,17 +68,19 @@ impl StreamingDataset {
let tokio_res = res.clone();
// TODO: to spawn a task we have to have a runtime. We wanted to be runtime independent, and that may still be a desire, but currently azure core isn't, so we might as well use tokio here.
tokio::spawn(async move {
tokio_res.populate_with_stream(stream, tx).await;
if let Err(e) = tokio_res.populate_with_stream(stream, tx).await {

Check failure on line 71 in azure-kusto-data/src/operations/v2.rs

View workflow job for this annotation

GitHub Actions / clippy

mismatched types

error[E0308]: mismatched types --> azure-kusto-data/src/operations/v2.rs:71:68 | 71 | if let Err(e) = tokio_res.populate_with_stream(stream, tx).await { | -------------------- ^^ expected `Sender<Result<DataTable, ...>>`, found `Sender<Result<DataTable, Error>>` | | | arguments to this method are incorrect | = note: expected struct `tokio::sync::mpsc::Sender<std::result::Result<_, (std::option::Option<models::v2::frames::DataTable>, error::Error)>>` found struct `tokio::sync::mpsc::Sender<std::result::Result<_, error::Error>>` note: method defined here --> azure-kusto-data/src/operations/v2.rs:79:14 | 79 | async fn populate_with_stream( | ^^^^^^^^^^^^^^^^^^^^ ... 82 | tx: Sender<Partial<DataTable>>, | ------------------------------
let _ = tx.send(Err(e)).await; // Best effort to send the error to the receiver
}
});

res
}

async fn populate_with_stream(
&self,
stream: impl Stream<Item = Result<Frame>>,
tx: Sender<DataTable>,
) {
stream: impl Stream<Item=Result<Frame>>,
tx: Sender<Partial<DataTable>>,
) -> Result<()> {
pin_mut!(stream);

let mut current_table = Some(DataTable {
Expand All @@ -89,59 +92,54 @@ impl StreamingDataset {
});

while let Some(frame) = stream.try_next().await.transpose() {
// TODO: handle errors
let frame = frame.expect("failed to read frame");
let frame = frame?;
match frame {
v2::Frame::DataSetHeader(header) => {
Frame::DataSetHeader(header) => {
self.header.lock().await.replace(header);
}
v2::Frame::DataSetCompletion(completion) => {
Frame::DataSetCompletion(completion) => {
self.completion.lock().await.replace(completion);
}
// TODO: properly handle errors/missing
v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
self.query_properties.lock().await.replace(
table
.deserialize_values::<QueryProperties>()
.expect("failed to deserialize query properties"),
);
}
v2::Frame::DataTable(table)
if table.table_kind == TableKind::QueryCompletionInformation =>
{
self.query_completion_information.lock().await.replace(
table
.deserialize_values::<QueryCompletionInformation>()
.expect("failed to deserialize query completion information"),
);
}
v2::Frame::DataTable(table) => {
tx.send(table).await.expect("failed to send table");
}
// TODO - handle errors
v2::Frame::TableHeader(table_header) => {
if let Some(table) = &mut current_table {
table.table_id = table_header.table_id;
table.table_name = table_header.table_name.clone();
table.table_kind = table_header.table_kind;
table.columns = table_header.columns.clone();
Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
let mut query_properties = self.query_properties.lock().await;
match table.deserialize_values::<QueryProperties>().ignore_partial_results() {
Ok(v) => {query_properties.replace(v);},
Err(e) => tx.send(e.into()).await?,
}
}
v2::Frame::TableFragment(table_fragment) => {
if let Some(table) = &mut current_table {
table.rows.extend(table_fragment.rows);
Frame::DataTable(table)
if table.table_kind == TableKind::QueryCompletionInformation =>
{
let mut query_completion = self.query_completion_information.lock().await;
match table.deserialize_values::<QueryCompletionInformation>().ignore_partial_results() {
Ok(v) => {query_completion.replace(v);},
Err(e) => tx.send(e.into()).await?,
}
}
Frame::DataTable(table) => {
tx.send(Ok(table)).await?;
}
v2::Frame::TableCompletion(table_completion) => {
if let Some(table) = current_table.take() {
// TODO - handle errors

tx.send(table).await.expect("failed to send table");
}
Frame::TableHeader(table_header) => {
let mut table = current_table.take().ok_or(ParseError::Frame("Table is unexpectedly none".into()))?;
table.table_id = table_header.table_id;
table.table_name = table_header.table_name.clone();
table.table_kind = table_header.table_kind;
table.columns = table_header.columns.clone();
}
Frame::TableFragment(table_fragment) => {
current_table.take().ok_or(ParseError::Frame("TableFragment without TableHeader".into()))?
.rows.extend(table_fragment.rows);
}
Frame::TableCompletion(table_completion) => {
tx.send(
Ok(current_table.take().ok_or(ParseError::Frame("TableCompletion without TableHeader".into()))?)
).await?;
}
Frame::TableProgress(_) => {}
}
}

Ok(())
}
}

Expand Down

0 comments on commit 49b853c

Please sign in to comment.