Skip to content

Commit

Permalink
Format Rust code using rustfmt
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions[bot] authored Jan 4, 2024
1 parent 71aaff7 commit 26ac4e3
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 42 deletions.
6 changes: 5 additions & 1 deletion azure-kusto-data/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ impl KustoClient {
.rows
.into_iter()
.map(Row::into_result)
.map(|r| r.and_then(|v| serde_json::from_value::<T>(serde_json::Value::Array(v)).map_err(Error::from)))
.map(|r| {
r.and_then(|v| {
serde_json::from_value::<T>(serde_json::Value::Array(v)).map_err(Error::from)
})
})
.collect::<Result<Vec<T>>>()?;

Ok(results)
Expand Down
36 changes: 25 additions & 11 deletions azure-kusto-data/src/models/test_helpers.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
use crate::models::ColumnType;
use crate::models::v2::{Column, DataSetCompletion, DataSetHeader, DataTable, Frame, OneApiError, OneApiErrors, Row, TableCompletion, TableFragment, TableFragmentType, TableHeader, TableKind};
use crate::models::v2::ErrorReportingPlacement::EndOfTable;
use crate::models::v2::{
Column, DataSetCompletion, DataSetHeader, DataTable, Frame, OneApiError, OneApiErrors, Row,
TableCompletion, TableFragment, TableFragmentType, TableHeader, TableKind,
};
use crate::models::ColumnType;

const V2_VALID_FRAMES: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/validFrames.json"));
const V2_TWO_TABLES: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/twoTables.json"));
const V2_PARTIAL_ERROR: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/partialError.json"));
const V2_PARTIAL_ERROR_FULL_DATASET: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/inputs/v2/partialErrorFullDataset.json"));

const V2_VALID_FRAMES: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/validFrames.json"
));
const V2_TWO_TABLES: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/twoTables.json"
));
const V2_PARTIAL_ERROR: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/partialError.json"
));
const V2_PARTIAL_ERROR_FULL_DATASET: &str = include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/inputs/v2/partialErrorFullDataset.json"
));

fn expected_v2_valid_frames() -> Vec<Frame> {
vec![
Expand Down Expand Up @@ -205,7 +219,6 @@ fn expected_v2_valid_frames() -> Vec<Frame> {
]
}


fn expected_v2_two_tables() -> Vec<Frame> {
vec![
Frame::DataSetHeader(DataSetHeader {
Expand Down Expand Up @@ -623,17 +636,18 @@ fn expected_v2_partial_error_full_dataset() -> Vec<Frame> {
]
}


pub fn v2_files_full() -> Vec<(&'static str, Vec<Frame>)> {
vec![
(V2_VALID_FRAMES, expected_v2_valid_frames()),
(V2_TWO_TABLES, expected_v2_two_tables()),
(V2_PARTIAL_ERROR, expected_v2_partial_error()),
(V2_PARTIAL_ERROR_FULL_DATASET, expected_v2_partial_error_full_dataset()),
(
V2_PARTIAL_ERROR_FULL_DATASET,
expected_v2_partial_error_full_dataset(),
),
]
}


pub fn v2_files_iterative() -> Vec<(&'static str, Vec<Frame>)> {
vec![
(V2_VALID_FRAMES, expected_v2_valid_frames()),
Expand Down
2 changes: 1 addition & 1 deletion azure-kusto-data/src/models/v2/known_tables.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{KustoDateTime, KustoDynamic, KustoGuid, KustoInt, KustoString};
use serde::{Deserialize, Serialize};
use crate::{KustoInt, KustoString, KustoDateTime, KustoDynamic, KustoGuid};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
Expand Down
13 changes: 9 additions & 4 deletions azure-kusto-data/src/models/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ mod errors;
mod frames;
mod known_tables;

use crate::error::{Error, Partial};
pub use consts::*;
pub use errors::*;
pub use frames::*;
pub use known_tables::*;
use crate::error::{Error, Partial};

/// A result of a V2 query.
/// Could be a table, a part of a table, or metadata about the dataset.
Expand Down Expand Up @@ -57,7 +57,12 @@ impl Into<Result<Vec<serde_json::Value>, Error>> for Row {
fn into(self) -> Result<Vec<serde_json::Value>, Error> {
match self {
Row::Values(v) => Ok(v),
Row::Error(e) => Err(e.errors.into_iter().map(Error::QueryApiError).collect::<Vec<_>>().into()),
Row::Error(e) => Err(e
.errors
.into_iter()
.map(Error::QueryApiError)
.collect::<Vec<_>>()
.into()),
}
}
}
Expand All @@ -78,7 +83,7 @@ impl DataTable {
Err(e) => match e {
Error::MultipleErrors(e) => errors.extend(e),
_ => errors.push(e),
}
},
}
}
match (values.len(), errors.len()) {
Expand All @@ -100,7 +105,7 @@ impl DataTable {
Err(e) => match e {
Error::MultipleErrors(e) => errors.extend(e),
_ => errors.push(e),
}
},
}
}

Expand Down
63 changes: 39 additions & 24 deletions azure-kusto-data/src/operations/v2.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::sync::Arc;
use crate::error::{Error::JsonError, Result};
use crate::models::v2;
use futures::{stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt, pin_mut};
use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind};
use futures::lock::Mutex;
use futures::{
pin_mut, stream, AsyncBufRead, AsyncBufReadExt, AsyncReadExt, Stream, StreamExt, TryStreamExt,
};
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use crate::models::v2::{DataTable, Frame, QueryCompletionInformation, QueryProperties, TableKind};

pub fn parse_frames_iterative(
reader: impl AsyncBufRead + Unpin,
Expand Down Expand Up @@ -38,22 +40,21 @@ pub async fn parse_frames_full(
return Ok(serde_json::from_slice(&buf)?);
}


/// Arc Mutex
type M<T> = Arc<Mutex<T>>;
/// Arc Mutex Option
type OM<T> = M<Option<T>>;

struct StreamingDataset {
header : OM<v2::DataSetHeader>,
completion : OM<v2::DataSetCompletion>,
query_properties : OM<Vec<QueryProperties>>,
query_completion_information : OM<Vec<QueryCompletionInformation>>,
results : Receiver<DataTable>
header: OM<v2::DataSetHeader>,
completion: OM<v2::DataSetCompletion>,
query_properties: OM<Vec<QueryProperties>>,
query_completion_information: OM<Vec<QueryCompletionInformation>>,
results: Receiver<DataTable>,
}

impl StreamingDataset {
fn new(stream: impl Stream<Item=Result<Frame>> + Send + 'static) -> Arc<Self> {
fn new(stream: impl Stream<Item = Result<Frame>> + Send + 'static) -> Arc<Self> {
let (tx, rx) = tokio::sync::mpsc::channel(1);
let res = StreamingDataset {
header: Arc::new(Mutex::new(None)),
Expand All @@ -72,7 +73,11 @@ impl StreamingDataset {
res
}

async fn populate_with_stream(&self, stream: impl Stream<Item = Result<Frame>>, tx: Sender<DataTable>) {
async fn populate_with_stream(
&self,
stream: impl Stream<Item = Result<Frame>>,
tx: Sender<DataTable>,
) {
pin_mut!(stream);

let mut current_table = Some(DataTable {
Expand All @@ -89,20 +94,30 @@ impl StreamingDataset {
match frame {
v2::Frame::DataSetHeader(header) => {
self.header.lock().await.replace(header);
},
}
v2::Frame::DataSetCompletion(completion) => {
self.completion.lock().await.replace(completion);
},
}
// TODO: properly handle errors/missing
v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryProperties => {
self.query_properties.lock().await.replace(table.deserialize_values::<QueryProperties>().expect("failed to deserialize query properties"));
},
v2::Frame::DataTable(table) if table.table_kind == TableKind::QueryCompletionInformation => {
self.query_completion_information.lock().await.replace(table.deserialize_values::<QueryCompletionInformation>().expect("failed to deserialize query completion information"));
},
self.query_properties.lock().await.replace(
table
.deserialize_values::<QueryProperties>()
.expect("failed to deserialize query properties"),
);
}
v2::Frame::DataTable(table)
if table.table_kind == TableKind::QueryCompletionInformation =>
{
self.query_completion_information.lock().await.replace(
table
.deserialize_values::<QueryCompletionInformation>()
.expect("failed to deserialize query completion information"),
);
}
v2::Frame::DataTable(table) => {
tx.send(table).await.expect("failed to send table");
},
}
// TODO - handle errors
v2::Frame::TableHeader(table_header) => {
if let Some(table) = &mut current_table {
Expand Down Expand Up @@ -134,13 +149,13 @@ impl StreamingDataset {

#[cfg(test)]
mod tests {
use crate::models::test_helpers::{v2_files_full, v2_files_iterative};
use futures::io::Cursor;
use futures::StreamExt;
use crate::models::test_helpers::{v2_files_full, v2_files_iterative};

#[tokio::test]
async fn test_parse_frames_full() {
for (contents ,frames) in v2_files_full() {
for (contents, frames) in v2_files_full() {
println!("testing: {}", contents);
let reader = Cursor::new(contents.as_bytes());
let parsed_frames = super::parse_frames_full(reader).await.unwrap();
Expand All @@ -150,14 +165,14 @@ mod tests {

#[tokio::test]
async fn test_parse_frames_iterative() {
for (contents ,frames) in v2_files_iterative() {
for (contents, frames) in v2_files_iterative() {
println!("testing: {}", contents);
let reader = Cursor::new(contents.as_bytes());
let parsed_frames = super::parse_frames_iterative(reader)
.map(|f| f.expect("failed to parse frame"))
.collect::<Vec<_>>().await;
.collect::<Vec<_>>()
.await;
assert_eq!(parsed_frames, frames);
}
}
}

1 change: 0 additions & 1 deletion azure-kusto-data/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ enum KustoValue {
Dynamic(KustoDynamic),
}


impl FromStr for KustoString {
type Err = Infallible;

Expand Down

0 comments on commit 26ac4e3

Please sign in to comment.