Skip to content

Commit

Permalink
Remove use of anyhow and replace with thiserror, other markups (#7)
Browse files Browse the repository at this point in the history
* Use thiserror

* use scopes over explicit use of drop

* update syntax

* remove std::fmt::Debug

* qualify thiserror
  • Loading branch information
krishanjmistry authored Jan 18, 2024
1 parent 6145aad commit f0dddce
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 92 deletions.
2 changes: 1 addition & 1 deletion azure-kusto-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ azure_storage = "0.17"
azure_storage_blobs = "0.17"
azure_storage_queues = "0.17"

anyhow = "1"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1", default-features = false, features = ["std"] }
url = "2"
Expand Down
9 changes: 5 additions & 4 deletions azure-kusto-ingest/examples/ingest_from_blob.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::env;

use anyhow::Result;
use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions};
use azure_kusto_ingest::data_format::DataFormat;
use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor};
Expand All @@ -16,7 +15,7 @@ use azure_kusto_ingest::queued_ingest::QueuedIngestClient;
/// - Permissions for Kusto to access storage
/// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI");
let user_mi_object_id =
env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID");
Expand Down Expand Up @@ -54,7 +53,9 @@ async fn main() -> Result<()> {
let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);

queued_ingest_client
let _ = queued_ingest_client
.ingest_from_blob(blob_descriptor, ingestion_properties)
.await
.await?;

Ok(())
}
20 changes: 20 additions & 0 deletions azure-kusto-ingest/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
//! Defines [Error] for representing failures in various operations.
/// Error type for kusto ingestion operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error raised when failing to obtain ingestion resources.
#[error("Error obtaining ingestion resources: {0}")]
ResourceManagerError(#[from] super::resource_manager::ResourceManagerError),

/// Error relating to (de-)serialization of JSON data
#[error("Error in JSON serialization/deserialization: {0}")]
JsonError(#[from] serde_json::Error),

/// Error occurring within core azure crates
#[error("Error in azure-core: {0}")]
AzureError(#[from] azure_core::error::Error),
}

/// Result type for kusto ingest operations.
pub type Result<T> = std::result::Result<T, Error>;
1 change: 1 addition & 0 deletions azure-kusto-ingest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod client_options;
pub mod data_format;
pub mod descriptors;
pub mod error;
pub(crate) mod ingestion_blob_info;
pub mod ingestion_properties;
pub mod queued_ingest;
Expand Down
18 changes: 4 additions & 14 deletions azure-kusto-ingest/src/queued_ingest.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::sync::Arc;

use anyhow::Result;
use crate::error::Result;
use azure_core::base64;
use azure_kusto_data::prelude::KustoClient;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use tracing::debug;

use crate::client_options::QueuedIngestClientOptions;
Expand Down Expand Up @@ -47,8 +44,8 @@ impl QueuedIngestClient {
blob_descriptor: BlobDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<()> {
let ingestion_queues = self.resource_manager.ingestion_queues().await?;
debug!("ingestion queues: {:#?}", ingestion_queues);
let queue_client = self.resource_manager.ingestion_queue().await?;
debug!("ingestion queues: {:#?}", queue_client);

let auth_context = self.resource_manager.authorization_context().await?;
debug!("auth_context: {:#?}\n", auth_context);
Expand All @@ -57,14 +54,7 @@ impl QueuedIngestClient {
QueuedIngestionMessage::new(&blob_descriptor, &ingestion_properties, auth_context);
debug!("message: {:#?}\n", message);

// Pick a random queue from the queue clients returned by the resource manager
let mut rng: StdRng = SeedableRng::from_entropy();
let queue_client = ingestion_queues
.choose(&mut rng)
.ok_or(anyhow::anyhow!("Failed to pick a random queue"))?;
debug!("randomly seeded queue_client: {:#?}\n", queue_client);

let message = serde_json::to_string(&message).unwrap();
let message = serde_json::to_string(&message)?;
debug!("message as string: {}\n", message);

// Base64 encode the ingestion message
Expand Down
71 changes: 68 additions & 3 deletions azure-kusto-ingest/src/resource_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub mod ingest_client_resources;
pub mod resource_uri;
pub mod utils;

use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;

use azure_storage_queues::QueueClient;
Expand All @@ -18,8 +17,24 @@ use self::{
ingest_client_resources::IngestClientResources,
};

use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};

pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60);

#[derive(Debug, thiserror::Error)]
pub enum ResourceManagerError {
#[error("Failed to obtain ingestion resources: {0}")]
IngestClientResourcesError(#[from] ingest_client_resources::IngestionResourceError),

#[error("Failed to obtain authorization token: {0}")]
AuthorizationContextError(#[from] authorization_context::KustoIdentityTokenError),

#[error("Failed to select a resource - no resources found")]
NoResourcesFound,
}

type Result<T> = std::result::Result<T, ResourceManagerError>;

/// ResourceManager is a struct that keeps track of all the resources required for ingestion using the queued flavour
pub struct ResourceManager {
ingest_client_resources: Arc<IngestClientResources>,
Expand All @@ -39,12 +54,62 @@ impl ResourceManager {
}

/// Returns the latest [QueueClient]s ready for posting ingestion messages to
pub async fn ingestion_queues(&self) -> Result<Vec<QueueClient>> {
async fn ingestion_queues(&self) -> Result<Vec<QueueClient>> {
Ok(self.ingest_client_resources.get().await?.ingestion_queues)
}

/// Returns a [QueueClient] to ingest to.
/// This is a random selection from the list of ingestion queues
pub async fn ingestion_queue(&self) -> Result<QueueClient> {
let ingestion_queues = self.ingestion_queues().await?;
let selected_queue = select_random_resource(ingestion_queues)?;
Ok(selected_queue.clone())
}

/// Returns the latest [KustoIdentityToken] to be added as an authorization context to ingestion messages
pub async fn authorization_context(&self) -> Result<KustoIdentityToken> {
self.authorization_context.get().await
self.authorization_context
.get()
.await
.map_err(ResourceManagerError::AuthorizationContextError)
}
}
/// Selects a random resource from the given list of resources
fn select_random_resource<T: Clone>(resources: Vec<T>) -> Result<T> {
let mut rng: StdRng = SeedableRng::from_entropy();
resources
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)
.cloned()
}

#[cfg(test)]
mod select_random_resource_tests {
use super::*;

#[test]
fn single_resource() {
const VALUE: i32 = 1;
let resources = vec![VALUE];
let selected_resource = select_random_resource(resources).unwrap();
assert!(selected_resource == VALUE)
}

#[test]
fn multiple_resources() {
let resources = vec![1, 2, 3, 4, 5];
let selected_resource = select_random_resource(resources.clone()).unwrap();
assert!(resources.contains(&selected_resource));
}

#[test]
fn no_resources() {
let resources: Vec<i32> = vec![];
let selected_resource = select_random_resource(resources);
assert!(selected_resource.is_err());
assert!(matches!(
selected_resource.unwrap_err(),
ResourceManagerError::NoResourcesFound
))
}
}
78 changes: 51 additions & 27 deletions azure-kusto-ingest/src/resource_manager/authorization_context.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;
use serde_json::Value;
use tokio::sync::RwLock;

use super::cache::{Cached, ThreadSafeCachedValue};
Expand All @@ -10,6 +10,30 @@ use super::RESOURCE_REFRESH_PERIOD;

pub(crate) type KustoIdentityToken = String;

const AUTHORIZATION_CONTEXT: &str = "AuthorizationContext";

#[derive(thiserror::Error, Debug)]
pub enum KustoIdentityTokenError {
#[error("Kusto expected 1 table in results, found {0}")]
ExpectedOneTable(usize),

#[error("Kusto expected 1 row in table, found {0}")]
ExpectedOneRow(usize),

#[error("Column {0} not found in table")]
ColumnNotFound(String),

#[error("Invalid JSON response from Kusto: {0:?}")]
InvalidJSONResponse(Value),

#[error("Token is empty")]
EmptyToken,

#[error(transparent)]
KustoError(#[from] azure_kusto_data::error::Error),
}

type Result<T> = std::result::Result<T, KustoIdentityTokenError>;
/// Logic to obtain a Kusto identity token from the management endpoint. This auth token is a temporary token
#[derive(Debug, Clone)]
pub(crate) struct AuthorizationContext {
Expand Down Expand Up @@ -38,54 +62,54 @@ impl AuthorizationContext {
let table = match &results.tables[..] {
[a] => a,
_ => {
return Err(anyhow::anyhow!(
"Kusto Expected 1 table in results, found {}",
results.tables.len()
return Err(KustoIdentityTokenError::ExpectedOneTable(
results.tables.len(),
))
}
};

// Check that a column in this table actually exists called `AuthorizationContext`
let index = get_column_index(table, "AuthorizationContext")?;
let index = get_column_index(table, AUTHORIZATION_CONTEXT).ok_or(
KustoIdentityTokenError::ColumnNotFound(AUTHORIZATION_CONTEXT.into()),
)?;

// Check that there is only 1 row in the table, and that the value in the first row at the given index is not empty
let token = match &table.rows[..] {
[row] => row.get(index).ok_or(anyhow::anyhow!(
"Kusto response did not contain a value in the first row at position {}",
index
))?,
_ => {
return Err(anyhow::anyhow!(
"Kusto expected 1 row in results, found {}",
table.rows.len()
))
}
[row] => row
.get(index)
.ok_or(KustoIdentityTokenError::ColumnNotFound(
AUTHORIZATION_CONTEXT.into(),
))?,
_ => return Err(KustoIdentityTokenError::ExpectedOneRow(table.rows.len())),
};

// Convert the JSON string into a Rust string
let token = token.as_str().ok_or(anyhow::anyhow!(
"Kusto response did not contain a string value: {:?}",
token
))?;
let token = token
.as_str()
.ok_or(KustoIdentityTokenError::InvalidJSONResponse(
token.to_owned(),
))?;

if token.chars().all(char::is_whitespace) {
return Err(anyhow::anyhow!("Kusto identity token is empty"));
return Err(KustoIdentityTokenError::EmptyToken);
}

Ok(token.to_string())
}

/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
pub(crate) async fn get(&self) -> Result<KustoIdentityToken> {
// Attempt to get the token from the cache
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
// first, try to get the resources from the cache by obtaining a read lock
{
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
}
// Drop the read lock and get a write lock to refresh the token
drop(token_cache);

// obtain a write lock to refresh the kusto response
let mut token_cache = self.token_cache.write().await;

// Again attempt to return from cache, check is done in case another thread
Expand Down
Loading

0 comments on commit f0dddce

Please sign in to comment.