Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove use of anyhow and replace with thiserror, other markups #7

Merged
merged 5 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-kusto-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ azure_storage = "0.17"
azure_storage_blobs = "0.17"
azure_storage_queues = "0.17"

anyhow = "1"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
krishanjmistry marked this conversation as resolved.
Show resolved Hide resolved
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1", default-features = false, features = ["std"] }
url = "2"
Expand Down
9 changes: 5 additions & 4 deletions azure-kusto-ingest/examples/ingest_from_blob.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::env;

use anyhow::Result;
use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions};
use azure_kusto_ingest::data_format::DataFormat;
use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor};
Expand All @@ -16,7 +15,7 @@ use azure_kusto_ingest::queued_ingest::QueuedIngestClient;
/// - Permissions for Kusto to access storage
/// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI");
let user_mi_object_id =
env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID");
Expand Down Expand Up @@ -54,7 +53,9 @@ async fn main() -> Result<()> {
let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);

queued_ingest_client
let _ = queued_ingest_client
.ingest_from_blob(blob_descriptor, ingestion_properties)
.await
.await?;

Ok(())
}
23 changes: 23 additions & 0 deletions azure-kusto-ingest/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
//! Defines [Error] for representing failures in various operations.
use std::fmt::Debug;
krishanjmistry marked this conversation as resolved.
Show resolved Hide resolved

use thiserror;

/// Error type for kusto ingestion operations.
#[derive(thiserror::Error, Debug)]
pub enum Error {
/// Error raised when failing to obtain ingestion resources.
#[error("Error obtaining ingestion resources: {0}")]
ResourceManagerError(#[from] super::resource_manager::ResourceManagerError),

/// Error relating to (de-)serialization of JSON data
#[error("Error in JSON serialization/deserialization: {0}")]
JsonError(#[from] serde_json::Error),

/// Error occurring within core azure crates
#[error("Error in azure-core: {0}")]
AzureError(#[from] azure_core::error::Error),
}

/// Result type for kusto ingest operations.
pub type Result<T> = std::result::Result<T, Error>;
1 change: 1 addition & 0 deletions azure-kusto-ingest/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod client_options;
pub mod data_format;
pub mod descriptors;
pub mod error;
pub(crate) mod ingestion_blob_info;
pub mod ingestion_properties;
pub mod queued_ingest;
Expand Down
18 changes: 4 additions & 14 deletions azure-kusto-ingest/src/queued_ingest.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::sync::Arc;

use anyhow::Result;
use crate::error::Result;
use azure_core::base64;
use azure_kusto_data::prelude::KustoClient;
use rand::rngs::StdRng;
use rand::seq::SliceRandom;
use rand::SeedableRng;
use tracing::debug;

use crate::client_options::QueuedIngestClientOptions;
Expand Down Expand Up @@ -47,8 +44,8 @@ impl QueuedIngestClient {
blob_descriptor: BlobDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<()> {
let ingestion_queues = self.resource_manager.ingestion_queues().await?;
debug!("ingestion queues: {:#?}", ingestion_queues);
let queue_client = self.resource_manager.ingestion_queue().await?;
debug!("ingestion queues: {:#?}", queue_client);

let auth_context = self.resource_manager.authorization_context().await?;
debug!("auth_context: {:#?}\n", auth_context);
Expand All @@ -57,14 +54,7 @@ impl QueuedIngestClient {
QueuedIngestionMessage::new(&blob_descriptor, &ingestion_properties, auth_context);
debug!("message: {:#?}\n", message);

// Pick a random queue from the queue clients returned by the resource manager
let mut rng: StdRng = SeedableRng::from_entropy();
let queue_client = ingestion_queues
.choose(&mut rng)
.ok_or(anyhow::anyhow!("Failed to pick a random queue"))?;
debug!("randomly seeded queue_client: {:#?}\n", queue_client);

let message = serde_json::to_string(&message).unwrap();
let message = serde_json::to_string(&message)?;
debug!("message as string: {}\n", message);

// Base64 encode the ingestion message
Expand Down
72 changes: 69 additions & 3 deletions azure-kusto-ingest/src/resource_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ pub mod ingest_client_resources;
pub mod resource_uri;
pub mod utils;

use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;

use azure_storage_queues::QueueClient;
use thiserror::Error;

use crate::client_options::QueuedIngestClientOptions;

Expand All @@ -18,8 +18,24 @@ use self::{
ingest_client_resources::IngestClientResources,
};

use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};

pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60);

#[derive(Debug, Error)]
pub enum ResourceManagerError {
#[error("Failed to obtain ingestion resources: {0}")]
IngestClientResourcesError(#[from] ingest_client_resources::IngestionResourceError),

#[error("Failed to obtain authorization token: {0}")]
AuthorizationContextError(#[from] authorization_context::KustoIdentityTokenError),

#[error("Failed to select a resource - no resources found")]
NoResourcesFound,
}

type Result<T> = std::result::Result<T, ResourceManagerError>;

/// ResourceManager is a struct that keeps track of all the resources required for ingestion using the queued flavour
pub struct ResourceManager {
ingest_client_resources: Arc<IngestClientResources>,
Expand All @@ -39,12 +55,62 @@ impl ResourceManager {
}

/// Returns the latest [QueueClient]s ready for posting ingestion messages to
pub async fn ingestion_queues(&self) -> Result<Vec<QueueClient>> {
async fn ingestion_queues(&self) -> Result<Vec<QueueClient>> {
Ok(self.ingest_client_resources.get().await?.ingestion_queues)
}

/// Returns a [QueueClient] to ingest to.
/// This is a random selection from the list of ingestion queues
pub async fn ingestion_queue(&self) -> Result<QueueClient> {
let ingestion_queues = self.ingestion_queues().await?;
let selected_queue = select_random_resource(ingestion_queues)?;
Ok(selected_queue.clone())
}

/// Returns the latest [KustoIdentityToken] to be added as an authorization context to ingestion messages
pub async fn authorization_context(&self) -> Result<KustoIdentityToken> {
self.authorization_context.get().await
self.authorization_context
.get()
.await
.map_err(ResourceManagerError::AuthorizationContextError)
}
}
/// Selects a random resource from the given list of resources
fn select_random_resource<T: Clone>(resources: Vec<T>) -> Result<T> {
let mut rng: StdRng = SeedableRng::from_entropy();
resources
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)
.cloned()
}

#[cfg(test)]
mod select_random_resource_tests {
use super::*;

#[test]
fn single_resource() {
const VALUE: i32 = 1;
let resources = vec![VALUE];
let selected_resource = select_random_resource(resources).unwrap();
assert!(selected_resource == VALUE)
}

#[test]
fn multiple_resources() {
let resources = vec![1, 2, 3, 4, 5];
let selected_resource = select_random_resource(resources.clone()).unwrap();
assert!(resources.contains(&selected_resource));
}

#[test]
fn no_resources() {
let resources: Vec<i32> = vec![];
let selected_resource = select_random_resource(resources);
assert!(selected_resource.is_err());
assert!(matches!(
selected_resource.unwrap_err(),
ResourceManagerError::NoResourcesFound
))
}
}
79 changes: 52 additions & 27 deletions azure-kusto-ingest/src/resource_manager/authorization_context.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
use std::sync::Arc;

use anyhow::Result;
use azure_kusto_data::prelude::KustoClient;
use serde_json::Value;
use tokio::sync::RwLock;

use super::cache::{Cached, ThreadSafeCachedValue};
use super::utils::get_column_index;
use super::RESOURCE_REFRESH_PERIOD;
use thiserror::Error;

pub(crate) type KustoIdentityToken = String;

const AUTHORIZATION_CONTEXT: &str = "AuthorizationContext";

#[derive(Error, Debug)]
pub enum KustoIdentityTokenError {
#[error("Kusto expected 1 table in results, found {0}")]
ExpectedOneTable(usize),

#[error("Kusto expected 1 row in table, found {0}")]
ExpectedOneRow(usize),

#[error("Column {0} not found in table")]
ColumnNotFound(String),

#[error("Invalid JSON response from Kusto: {0:?}")]
InvalidJSONResponse(Value),

#[error("Token is empty")]
EmptyToken,

#[error(transparent)]
KustoError(#[from] azure_kusto_data::error::Error),
}

type Result<T> = std::result::Result<T, KustoIdentityTokenError>;
/// Logic to obtain a Kusto identity token from the management endpoint. This auth token is a temporary token
#[derive(Debug, Clone)]
pub(crate) struct AuthorizationContext {
Expand Down Expand Up @@ -38,54 +63,54 @@ impl AuthorizationContext {
let table = match &results.tables[..] {
[a] => a,
_ => {
return Err(anyhow::anyhow!(
"Kusto Expected 1 table in results, found {}",
results.tables.len()
return Err(KustoIdentityTokenError::ExpectedOneTable(
results.tables.len(),
))
}
};

// Check that a column in this table actually exists called `AuthorizationContext`
let index = get_column_index(table, "AuthorizationContext")?;
let index = get_column_index(table, AUTHORIZATION_CONTEXT).ok_or(
KustoIdentityTokenError::ColumnNotFound(AUTHORIZATION_CONTEXT.into()),
)?;

// Check that there is only 1 row in the table, and that the value in the first row at the given index is not empty
let token = match &table.rows[..] {
[row] => row.get(index).ok_or(anyhow::anyhow!(
"Kusto response did not contain a value in the first row at position {}",
index
))?,
_ => {
return Err(anyhow::anyhow!(
"Kusto expected 1 row in results, found {}",
table.rows.len()
))
}
[row] => row
.get(index)
.ok_or(KustoIdentityTokenError::ColumnNotFound(
AUTHORIZATION_CONTEXT.into(),
))?,
_ => return Err(KustoIdentityTokenError::ExpectedOneRow(table.rows.len())),
};

// Convert the JSON string into a Rust string
let token = token.as_str().ok_or(anyhow::anyhow!(
"Kusto response did not contain a string value: {:?}",
token
))?;
let token = token
.as_str()
.ok_or(KustoIdentityTokenError::InvalidJSONResponse(
token.to_owned(),
))?;

if token.chars().all(char::is_whitespace) {
return Err(anyhow::anyhow!("Kusto identity token is empty"));
return Err(KustoIdentityTokenError::EmptyToken);
}

Ok(token.to_string())
}

/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
pub(crate) async fn get(&self) -> Result<KustoIdentityToken> {
// Attempt to get the token from the cache
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
// first, try to get the resources from the cache by obtaining a read lock
{
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
}
// Drop the read lock and get a write lock to refresh the token
drop(token_cache);

// obtain a write lock to refresh the kusto response
let mut token_cache = self.token_cache.write().await;

// Again attempt to return from cache, check is done in case another thread
Expand Down
Loading
Loading