From f0dddceca75a9530333fb26d52d7bc35833d5f67 Mon Sep 17 00:00:00 2001 From: Krishan Date: Thu, 18 Jan 2024 20:01:17 +0000 Subject: [PATCH] Remove use of anyhow and replace with thiserror, other markups (#7) * Use thiserror * use scopes over explicit use of drop * update syntax * remove std::fmt::Debug * qualify thiserror --- azure-kusto-ingest/Cargo.toml | 2 +- .../examples/ingest_from_blob.rs | 9 ++- azure-kusto-ingest/src/error.rs | 20 +++++ azure-kusto-ingest/src/lib.rs | 1 + azure-kusto-ingest/src/queued_ingest.rs | 18 +---- azure-kusto-ingest/src/resource_manager.rs | 71 ++++++++++++++++- .../resource_manager/authorization_context.rs | 78 ++++++++++++------- .../ingest_client_resources.rs | 74 ++++++++++++------ .../src/resource_manager/resource_uri.rs | 48 ++++++++---- .../src/resource_manager/utils.rs | 9 +-- 10 files changed, 238 insertions(+), 92 deletions(-) create mode 100644 azure-kusto-ingest/src/error.rs diff --git a/azure-kusto-ingest/Cargo.toml b/azure-kusto-ingest/Cargo.toml index 7ed7692..b389b9d 100644 --- a/azure-kusto-ingest/Cargo.toml +++ b/azure-kusto-ingest/Cargo.toml @@ -13,11 +13,11 @@ azure_storage = "0.17" azure_storage_blobs = "0.17" azure_storage_queues = "0.17" -anyhow = "1" chrono = { version = "0.4", default-features = false, features = ["serde"] } rand = "0.8" serde = { version = "1", features = ["serde_derive"] } serde_json = "1" +thiserror = "1" tokio = { version = "1", features = ["full"] } tracing = { version = "0.1", default-features = false, features = ["std"] } url = "2" diff --git a/azure-kusto-ingest/examples/ingest_from_blob.rs b/azure-kusto-ingest/examples/ingest_from_blob.rs index ceae4b9..d69f126 100644 --- a/azure-kusto-ingest/examples/ingest_from_blob.rs +++ b/azure-kusto-ingest/examples/ingest_from_blob.rs @@ -1,6 +1,5 @@ use std::env; -use anyhow::Result; use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions}; use azure_kusto_ingest::data_format::DataFormat; use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor}; @@ -16,7 +15,7 @@ use azure_kusto_ingest::queued_ingest::QueuedIngestClient; /// - Permissions for Kusto to access storage /// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), Box> { let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI"); let user_mi_object_id = env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID"); @@ -54,7 +53,9 @@ async fn main() -> Result<()> { let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None) .with_blob_auth(BlobAuth::SystemAssignedManagedIdentity); - queued_ingest_client + let _ = queued_ingest_client .ingest_from_blob(blob_descriptor, ingestion_properties) - .await + .await?; + + Ok(()) } diff --git a/azure-kusto-ingest/src/error.rs b/azure-kusto-ingest/src/error.rs new file mode 100644 index 0000000..cfa9462 --- /dev/null +++ b/azure-kusto-ingest/src/error.rs @@ -0,0 +1,20 @@ +//! Defines [Error] for representing failures in various operations. + +/// Error type for kusto ingestion operations. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Error raised when failing to obtain ingestion resources. + #[error("Error obtaining ingestion resources: {0}")] + ResourceManagerError(#[from] super::resource_manager::ResourceManagerError), + + /// Error relating to (de-)serialization of JSON data + #[error("Error in JSON serialization/deserialization: {0}")] + JsonError(#[from] serde_json::Error), + + /// Error occurring within core azure crates + #[error("Error in azure-core: {0}")] + AzureError(#[from] azure_core::error::Error), +} + +/// Result type for kusto ingest operations. +pub type Result = std::result::Result; diff --git a/azure-kusto-ingest/src/lib.rs b/azure-kusto-ingest/src/lib.rs index eecc220..27eb0cb 100644 --- a/azure-kusto-ingest/src/lib.rs +++ b/azure-kusto-ingest/src/lib.rs @@ -1,6 +1,7 @@ pub mod client_options; pub mod data_format; pub mod descriptors; +pub mod error; pub(crate) mod ingestion_blob_info; pub mod ingestion_properties; pub mod queued_ingest; diff --git a/azure-kusto-ingest/src/queued_ingest.rs b/azure-kusto-ingest/src/queued_ingest.rs index 13e9fff..a732fee 100644 --- a/azure-kusto-ingest/src/queued_ingest.rs +++ b/azure-kusto-ingest/src/queued_ingest.rs @@ -1,11 +1,8 @@ use std::sync::Arc; -use anyhow::Result; +use crate::error::Result; use azure_core::base64; use azure_kusto_data::prelude::KustoClient; -use rand::rngs::StdRng; -use rand::seq::SliceRandom; -use rand::SeedableRng; use tracing::debug; use crate::client_options::QueuedIngestClientOptions; @@ -47,8 +44,8 @@ impl QueuedIngestClient { blob_descriptor: BlobDescriptor, ingestion_properties: IngestionProperties, ) -> Result<()> { - let ingestion_queues = self.resource_manager.ingestion_queues().await?; - debug!("ingestion queues: {:#?}", ingestion_queues); + let queue_client = self.resource_manager.ingestion_queue().await?; + debug!("ingestion queues: {:#?}", queue_client); let auth_context = self.resource_manager.authorization_context().await?; debug!("auth_context: {:#?}\n", auth_context); @@ -57,14 +54,7 @@ impl QueuedIngestClient { QueuedIngestionMessage::new(&blob_descriptor, &ingestion_properties, auth_context); debug!("message: {:#?}\n", message); - // Pick a random queue from the queue clients returned by the resource manager - let mut rng: StdRng = SeedableRng::from_entropy(); - let queue_client = ingestion_queues - .choose(&mut rng) - .ok_or(anyhow::anyhow!("Failed to pick a random queue"))?; - debug!("randomly seeded queue_client: {:#?}\n", queue_client); - - let message = serde_json::to_string(&message).unwrap(); + let message = serde_json::to_string(&message)?; debug!("message as string: {}\n", message); // Base64 encode the ingestion message diff --git a/azure-kusto-ingest/src/resource_manager.rs b/azure-kusto-ingest/src/resource_manager.rs index a91c041..24c2f08 100644 --- a/azure-kusto-ingest/src/resource_manager.rs +++ b/azure-kusto-ingest/src/resource_manager.rs @@ -6,7 +6,6 @@ pub mod ingest_client_resources; pub mod resource_uri; pub mod utils; -use anyhow::Result; use azure_kusto_data::prelude::KustoClient; use azure_storage_queues::QueueClient; @@ -18,8 +17,24 @@ use self::{ ingest_client_resources::IngestClientResources, }; +use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng}; + pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60); +#[derive(Debug, thiserror::Error)] +pub enum ResourceManagerError { + #[error("Failed to obtain ingestion resources: {0}")] + IngestClientResourcesError(#[from] ingest_client_resources::IngestionResourceError), + + #[error("Failed to obtain authorization token: {0}")] + AuthorizationContextError(#[from] authorization_context::KustoIdentityTokenError), + + #[error("Failed to select a resource - no resources found")] + NoResourcesFound, +} + +type Result = std::result::Result; + /// ResourceManager is a struct that keeps track of all the resources required for ingestion using the queued flavour pub struct ResourceManager { ingest_client_resources: Arc, @@ -39,12 +54,62 @@ impl ResourceManager { } /// Returns the latest [QueueClient]s ready for posting ingestion messages to - pub async fn ingestion_queues(&self) -> Result> { + async fn ingestion_queues(&self) -> Result> { Ok(self.ingest_client_resources.get().await?.ingestion_queues) } + /// Returns a [QueueClient] to ingest to. + /// This is a random selection from the list of ingestion queues + pub async fn ingestion_queue(&self) -> Result { + let ingestion_queues = self.ingestion_queues().await?; + let selected_queue = select_random_resource(ingestion_queues)?; + Ok(selected_queue.clone()) + } + /// Returns the latest [KustoIdentityToken] to be added as an authorization context to ingestion messages pub async fn authorization_context(&self) -> Result { - self.authorization_context.get().await + self.authorization_context + .get() + .await + .map_err(ResourceManagerError::AuthorizationContextError) + } +} +/// Selects a random resource from the given list of resources +fn select_random_resource(resources: Vec) -> Result { + let mut rng: StdRng = SeedableRng::from_entropy(); + resources + .choose(&mut rng) + .ok_or(ResourceManagerError::NoResourcesFound) + .cloned() +} + +#[cfg(test)] +mod select_random_resource_tests { + use super::*; + + #[test] + fn single_resource() { + const VALUE: i32 = 1; + let resources = vec![VALUE]; + let selected_resource = select_random_resource(resources).unwrap(); + assert!(selected_resource == VALUE) + } + + #[test] + fn multiple_resources() { + let resources = vec![1, 2, 3, 4, 5]; + let selected_resource = select_random_resource(resources.clone()).unwrap(); + assert!(resources.contains(&selected_resource)); + } + + #[test] + fn no_resources() { + let resources: Vec = vec![]; + let selected_resource = select_random_resource(resources); + assert!(selected_resource.is_err()); + assert!(matches!( + selected_resource.unwrap_err(), + ResourceManagerError::NoResourcesFound + )) } } diff --git a/azure-kusto-ingest/src/resource_manager/authorization_context.rs b/azure-kusto-ingest/src/resource_manager/authorization_context.rs index d8709d6..b27892e 100644 --- a/azure-kusto-ingest/src/resource_manager/authorization_context.rs +++ b/azure-kusto-ingest/src/resource_manager/authorization_context.rs @@ -1,7 +1,7 @@ use std::sync::Arc; -use anyhow::Result; use azure_kusto_data::prelude::KustoClient; +use serde_json::Value; use tokio::sync::RwLock; use super::cache::{Cached, ThreadSafeCachedValue}; @@ -10,6 +10,30 @@ use super::RESOURCE_REFRESH_PERIOD; pub(crate) type KustoIdentityToken = String; +const AUTHORIZATION_CONTEXT: &str = "AuthorizationContext"; + +#[derive(thiserror::Error, Debug)] +pub enum KustoIdentityTokenError { + #[error("Kusto expected 1 table in results, found {0}")] + ExpectedOneTable(usize), + + #[error("Kusto expected 1 row in table, found {0}")] + ExpectedOneRow(usize), + + #[error("Column {0} not found in table")] + ColumnNotFound(String), + + #[error("Invalid JSON response from Kusto: {0:?}")] + InvalidJSONResponse(Value), + + #[error("Token is empty")] + EmptyToken, + + #[error(transparent)] + KustoError(#[from] azure_kusto_data::error::Error), +} + +type Result = std::result::Result; /// Logic to obtain a Kusto identity token from the management endpoint. This auth token is a temporary token #[derive(Debug, Clone)] pub(crate) struct AuthorizationContext { @@ -38,38 +62,36 @@ impl AuthorizationContext { let table = match &results.tables[..] { [a] => a, _ => { - return Err(anyhow::anyhow!( - "Kusto Expected 1 table in results, found {}", - results.tables.len() + return Err(KustoIdentityTokenError::ExpectedOneTable( + results.tables.len(), )) } }; // Check that a column in this table actually exists called `AuthorizationContext` - let index = get_column_index(table, "AuthorizationContext")?; + let index = get_column_index(table, AUTHORIZATION_CONTEXT).ok_or( + KustoIdentityTokenError::ColumnNotFound(AUTHORIZATION_CONTEXT.into()), + )?; // Check that there is only 1 row in the table, and that the value in the first row at the given index is not empty let token = match &table.rows[..] { - [row] => row.get(index).ok_or(anyhow::anyhow!( - "Kusto response did not contain a value in the first row at position {}", - index - ))?, - _ => { - return Err(anyhow::anyhow!( - "Kusto expected 1 row in results, found {}", - table.rows.len() - )) - } + [row] => row + .get(index) + .ok_or(KustoIdentityTokenError::ColumnNotFound( + AUTHORIZATION_CONTEXT.into(), + ))?, + _ => return Err(KustoIdentityTokenError::ExpectedOneRow(table.rows.len())), }; // Convert the JSON string into a Rust string - let token = token.as_str().ok_or(anyhow::anyhow!( - "Kusto response did not contain a string value: {:?}", - token - ))?; + let token = token + .as_str() + .ok_or(KustoIdentityTokenError::InvalidJSONResponse( + token.to_owned(), + ))?; if token.chars().all(char::is_whitespace) { - return Err(anyhow::anyhow!("Kusto identity token is empty")); + return Err(KustoIdentityTokenError::EmptyToken); } Ok(token.to_string()) @@ -77,15 +99,17 @@ impl AuthorizationContext { /// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query pub(crate) async fn get(&self) -> Result { - // Attempt to get the token from the cache - let token_cache = self.token_cache.read().await; - if !token_cache.is_expired() { - if let Some(token) = token_cache.get() { - return Ok(token.clone()); + // first, try to get the resources from the cache by obtaining a read lock + { + let token_cache = self.token_cache.read().await; + if !token_cache.is_expired() { + if let Some(token) = token_cache.get() { + return Ok(token.clone()); + } } } - // Drop the read lock and get a write lock to refresh the token - drop(token_cache); + + // obtain a write lock to refresh the kusto response let mut token_cache = self.token_cache.write().await; // Again attempt to return from cache, check is done in case another thread diff --git a/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs b/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs index d1a85d2..c59e4aa 100644 --- a/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs +++ b/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs @@ -5,16 +5,44 @@ use crate::client_options::QueuedIngestClientOptions; use super::{ cache::{Cached, ThreadSafeCachedValue}, resource_uri::{ClientFromResourceUri, ResourceUri}, - utils::get_column_index, - RESOURCE_REFRESH_PERIOD, + utils, RESOURCE_REFRESH_PERIOD, }; -use anyhow::Result; use azure_core::ClientOptions; use azure_kusto_data::{models::TableV1, prelude::KustoClient}; use azure_storage_blobs::prelude::ContainerClient; use azure_storage_queues::QueueClient; +use serde_json::Value; use tokio::sync::RwLock; +#[derive(Debug, thiserror::Error)] +pub enum IngestionResourceError { + #[error("{column_name} column is missing in the table")] + ColumnNotFoundError { column_name: String }, + + #[error("Response returned from Kusto could not be parsed as a string: {0}")] + ParseAsStringError(Value), + + #[error("No {0} resources found in the table")] + NoResourcesFound(String), + + #[error(transparent)] + KustoError(#[from] azure_kusto_data::error::Error), + + #[error(transparent)] + ResourceUriError(#[from] super::resource_uri::ResourceUriError), + + #[error("Kusto expected a table containing ingestion resource results, found no tables")] + NoTablesFound, +} + +type Result = std::result::Result; + +fn get_column_index(table: &TableV1, column_name: &str) -> Result { + utils::get_column_index(table, column_name).ok_or(IngestionResourceError::ColumnNotFoundError { + column_name: column_name.to_string(), + }) +} + /// Helper to get a resource URI from a table, erroring if there are no resources of the given name fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result> { let storage_root_index = get_column_index(table, "StorageRoot")?; @@ -25,18 +53,15 @@ fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result for InnerIngestClientResources { - type Error = anyhow::Error; + type Error = IngestionResourceError; /// Attempts to create a new InnerIngestClientResources from the given [TableV1] and [QueuedIngestClientOptions] - fn try_from((table, client_options): (&TableV1, &QueuedIngestClientOptions)) -> Result { + fn try_from( + (table, client_options): (&TableV1, &QueuedIngestClientOptions), + ) -> std::result::Result { let secured_ready_for_aggregation_queues = get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string())?; let temp_storage = get_resource_by_name(table, "TempStorage".to_string())?; @@ -104,24 +131,27 @@ impl IngestClientResources { .execute_command("NetDefaultDB", ".get ingestion resources", None) .await?; - let new_resources = results.tables.first().ok_or(anyhow::anyhow!( - "Kusto expected a table containing ingestion resource results, found no tables", - ))?; + let new_resources = results + .tables + .first() + .ok_or(IngestionResourceError::NoTablesFound)?; InnerIngestClientResources::try_from((new_resources, &self.client_options)) } /// Gets the latest resources either from cache, or fetching from Kusto and updating the cached resources pub async fn get(&self) -> Result { - let resources = self.resources.read().await; - if !resources.is_expired() { - if let Some(inner_value) = resources.get() { - return Ok(inner_value.clone()); + // first, try to get the resources from the cache by obtaining a read lock + { + let resources = self.resources.read().await; + if !resources.is_expired() { + if let Some(inner_value) = resources.get() { + return Ok(inner_value.clone()); + } } } - // otherwise, drop the read lock and get a write lock to refresh the kusto response - drop(resources); + // obtain a write lock to refresh the kusto response let mut resources = self.resources.write().await; // check again in case another thread refreshed while we were waiting on the write lock diff --git a/azure-kusto-ingest/src/resource_manager/resource_uri.rs b/azure-kusto-ingest/src/resource_manager/resource_uri.rs index 41f2287..f41d1ff 100644 --- a/azure-kusto-ingest/src/resource_manager/resource_uri.rs +++ b/azure-kusto-ingest/src/resource_manager/resource_uri.rs @@ -4,7 +4,23 @@ use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient}; use azure_storage_queues::{QueueClient, QueueServiceClientBuilder}; use url::Url; -use anyhow::Result; +#[derive(Debug, thiserror::Error)] +pub enum ResourceUriError { + #[error("URI scheme must be 'https', was '{0}'")] + InvalidScheme(String), + + #[error("Object name is missing in the URI")] + MissingObjectName, + + #[error("SAS token is missing in the URI as a query parameter")] + MissingSasToken, + + #[error(transparent)] + ParseError(#[from] url::ParseError), + + #[error(transparent)] + AzureError(#[from] azure_core::Error), +} /// Parsing logic of resource URIs as returned by the Kusto management endpoint #[derive(Debug, Clone)] @@ -15,18 +31,14 @@ pub(crate) struct ResourceUri { } impl TryFrom<&str> for ResourceUri { - type Error = anyhow::Error; + type Error = ResourceUriError; - fn try_from(uri: &str) -> Result { + fn try_from(uri: &str) -> Result { let parsed_uri = Url::parse(uri)?; let scheme = match parsed_uri.scheme() { "https" => "https".to_string(), - other_scheme => { - return Err(anyhow::anyhow!( - "URI scheme must be 'https', was '{other_scheme}'" - )) - } + other_scheme => return Err(ResourceUriError::InvalidScheme(other_scheme.to_string())), }; let service_uri = scheme @@ -36,17 +48,13 @@ impl TryFrom<&str> for ResourceUri { .expect("Url::parse should always return a host for a URI"); let object_name = match parsed_uri.path().trim_start().trim_start_matches('/') { - "" => return Err(anyhow::anyhow!("Object name is missing in the URI")), + "" => return Err(ResourceUriError::MissingObjectName), name => name.to_string(), }; let sas_token = match parsed_uri.query() { Some(query) => query.to_string(), - None => { - return Err(anyhow::anyhow!( - "SAS token is missing in the URI as a query parameter" - )) - } + None => return Err(ResourceUriError::MissingSasToken), }; let sas_token = StorageCredentials::sas_token(sas_token)?; @@ -138,6 +146,10 @@ mod tests { println!("{:#?}", resource_uri); assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::ParseError(_) + )); } #[test] @@ -147,6 +159,10 @@ mod tests { println!("{:#?}", resource_uri); assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::MissingObjectName + )); } #[test] @@ -156,6 +172,10 @@ mod tests { println!("{:#?}", resource_uri); assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::MissingSasToken + )); } #[test] diff --git a/azure-kusto-ingest/src/resource_manager/utils.rs b/azure-kusto-ingest/src/resource_manager/utils.rs index 62a7167..ebf5616 100644 --- a/azure-kusto-ingest/src/resource_manager/utils.rs +++ b/azure-kusto-ingest/src/resource_manager/utils.rs @@ -1,15 +1,10 @@ -use anyhow::Result; use azure_kusto_data::models::TableV1; /// Helper to get a column index from a table -// TODO: this could be moved upstream into Kusto Data - would likely result in a change to the API of this function to return an Option -pub fn get_column_index(table: &TableV1, column_name: &str) -> Result { +// TODO: this could be moved upstream into Kusto Data +pub fn get_column_index(table: &TableV1, column_name: &str) -> Option { table .columns .iter() .position(|c| c.column_name == column_name) - .ok_or(anyhow::anyhow!( - "{} column is missing in the table", - column_name - )) }