diff --git a/Cargo.toml b/Cargo.toml index 303c71e..b4a2833 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] -members = ["azure-kusto-data"] +members = ["azure-kusto-data", "azure-kusto-ingest"] resolver = "2" diff --git a/azure-kusto-ingest/Cargo.toml b/azure-kusto-ingest/Cargo.toml new file mode 100644 index 0000000..e322d71 --- /dev/null +++ b/azure-kusto-ingest/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "azure-kusto-ingest" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +azure-kusto-data = { path = "../azure-kusto-data", default-features = false } +# Azure SDK for Rust crates versions must be kept in sync +azure_core = "0.19" +azure_storage = "0.19" +azure_storage_blobs = "0.19" +azure_storage_queues = "0.19" + +async-lock = "3" +rand = "0.8" +serde = { version = "1", features = ["serde_derive"] } +serde_json = "1" +thiserror = "1" +time = { version = "0.3", features = ["serde-human-readable", "macros"] } +url = "2" +uuid = { version = "1", features = ["v4", "serde"] } + +[dev-dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread"] } diff --git a/azure-kusto-ingest/examples/ingest_from_blob.rs b/azure-kusto-ingest/examples/ingest_from_blob.rs new file mode 100644 index 0000000..d69f126 --- /dev/null +++ b/azure-kusto-ingest/examples/ingest_from_blob.rs @@ -0,0 +1,61 @@ +use std::env; + +use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions}; +use azure_kusto_ingest::data_format::DataFormat; +use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor}; +use azure_kusto_ingest::ingestion_properties::IngestionProperties; +use azure_kusto_ingest::queued_ingest::QueuedIngestClient; + +/// Example of ingesting data into Kusto from Azure Blob Storage using managed identities. +/// This example enforces that the Kusto cluster has a system assigned managed identity with access to the storage account +/// +/// There are some steps that need to be taken to allow for managed identities to work: +/// - Permissions as the ingestor to initiate ingestion +/// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-permissions +/// - Permissions for Kusto to access storage +/// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity +#[tokio::main] +async fn main() -> Result<(), Box> { + let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI"); + let user_mi_object_id = + env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID"); + + // Create a Kusto client with managed identity authentication via the user assigned identity + let kusto_client = KustoClient::new( + ConnectionString::with_managed_identity_auth(cluster_ingest_uri, Some(user_mi_object_id)), + KustoClientOptions::default(), + )?; + + // Create a queued ingest client + let queued_ingest_client = QueuedIngestClient::new(kusto_client); + + // Define ingestion properties + let ingestion_properties = IngestionProperties { + database_name: env::var("KUSTO_DATABASE_NAME").expect("Must define KUSTO_DATABASE_NAME"), + table_name: env::var("KUSTO_TABLE_NAME").expect("Must define KUSTO_TABLE_NAME"), + // Don't delete the blob on successful ingestion + retain_blob_on_success: Some(true), + // File format of the blob is Parquet + data_format: DataFormat::Parquet, + // Assume the server side default for flush_immediately + flush_immediately: None, + }; + + // Define the blob to ingest from + let blob_uri = env::var("BLOB_URI").expect("Must define BLOB_URI"); + // Define the size of the blob if known, this improves ingestion performance as Kusto does not need to access the blob to determine the size + let blob_size: Option = match env::var("BLOB_SIZE") { + Ok(blob_size) => Some(blob_size.parse().expect("BLOB_SIZE must be a valid u64")), + Err(_) => None, + }; + + // Create the blob descriptor, also specifying that the blob should be accessed using the system assigned managed identity of the Kusto cluster + let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None) + .with_blob_auth(BlobAuth::SystemAssignedManagedIdentity); + + let _ = queued_ingest_client + .ingest_from_blob(blob_descriptor, ingestion_properties) + .await?; + + Ok(()) +} diff --git a/azure-kusto-ingest/src/client_options.rs b/azure-kusto-ingest/src/client_options.rs new file mode 100644 index 0000000..b8677eb --- /dev/null +++ b/azure-kusto-ingest/src/client_options.rs @@ -0,0 +1,51 @@ +use azure_core::ClientOptions; + +/// Allows configurability of ClientOptions for the storage clients used within [QueuedIngestClient](crate::queued_ingest::QueuedIngestClient) +#[derive(Clone, Default)] +pub struct QueuedIngestClientOptions { + pub queue_service_options: ClientOptions, + pub blob_service_options: ClientOptions, +} + +impl From for QueuedIngestClientOptions { + /// Creates a `QueuedIngestClientOptions` struct where the same [ClientOptions] are used for all services + fn from(client_options: ClientOptions) -> Self { + Self { + queue_service_options: client_options.clone(), + blob_service_options: client_options, + } + } +} + +/// Builder for [QueuedIngestClientOptions], call `build()` to create the [QueuedIngestClientOptions] +#[derive(Clone, Default)] +pub struct QueuedIngestClientOptionsBuilder { + queue_service_options: ClientOptions, + blob_service_options: ClientOptions, +} + +impl QueuedIngestClientOptionsBuilder { + pub fn new() -> Self { + Self { + queue_service_options: ClientOptions::default(), + blob_service_options: ClientOptions::default(), + } + } + + pub fn with_queue_service_options(mut self, queue_service_options: ClientOptions) -> Self { + self.queue_service_options = queue_service_options; + self + } + + pub fn with_blob_service_options(mut self, blob_service_options: ClientOptions) -> Self { + self.blob_service_options = blob_service_options; + self + } + + pub fn build(self) -> QueuedIngestClientOptions { + QueuedIngestClientOptions { + queue_service_options: self.queue_service_options, + blob_service_options: self.blob_service_options, + } + } +} diff --git a/azure-kusto-ingest/src/data_format.rs b/azure-kusto-ingest/src/data_format.rs new file mode 100644 index 0000000..6dd52e6 --- /dev/null +++ b/azure-kusto-ingest/src/data_format.rs @@ -0,0 +1,37 @@ +use serde::Serialize; + +/// All data formats supported by Kusto. +/// Default is [DataFormat::CSV] +#[derive(Serialize, Clone, Debug, Default, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum DataFormat { + ApacheAvro, + Avro, + #[default] + CSV, + JSON, + MultiJSON, + ORC, + Parquet, + PSV, + RAW, + SCSV, + SOHsv, + SingleJSON, + SStream, + TSV, + TSVe, + TXT, + W3CLOGFILE, +} + +// Unit tests +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn data_format_default() { + assert_eq!(DataFormat::default(), DataFormat::CSV); + } +} diff --git a/azure-kusto-ingest/src/descriptors.rs b/azure-kusto-ingest/src/descriptors.rs new file mode 100644 index 0000000..44cfc38 --- /dev/null +++ b/azure-kusto-ingest/src/descriptors.rs @@ -0,0 +1,148 @@ +use uuid::Uuid; + +/// Encapsulates the information related to a blob that is required to ingest from a blob +#[derive(Debug, Clone)] +pub struct BlobDescriptor { + uri: String, + pub(crate) size: Option, + pub(crate) source_id: Uuid, + /// Authentication information for the blob; when [None], the uri is passed through as is + blob_auth: Option, +} + +impl BlobDescriptor { + /// Create a new BlobDescriptor. + /// + /// Parameters: + /// - `uri`: the uri of the blob to ingest from, note you can use the optional helper method `with_blob_auth` to add authentication information to the uri + /// - `size`: although the size is not required, providing it is recommended as it allows Kusto to better plan the ingestion process + /// - `source_id`: optional, useful if tracking ingestion status, if not provided, a random uuid will be generated + pub fn new(uri: impl Into, size: Option, source_id: Option) -> Self { + let source_id = match source_id { + Some(source_id) => source_id, + None => Uuid::new_v4(), + }; + + Self { + uri: uri.into(), + size, + source_id, + blob_auth: None, + } + } + + /// Mutator to modify the authentication information of the BlobDescriptor + pub fn with_blob_auth(mut self, blob_auth: BlobAuth) -> Self { + self.blob_auth = Some(blob_auth); + self + } + + /// Returns the uri with the authentication information concatenated, ready to be serialized into the ingestion message + pub(crate) fn uri(&self) -> String { + match &self.blob_auth { + Some(BlobAuth::SASToken(sas_token)) => { + format!("{}?{}", self.uri, sas_token.as_str()) + } + Some(BlobAuth::UserAssignedManagedIdentity(object_id)) => { + format!("{};managed_identity={}", self.uri, object_id) + } + Some(BlobAuth::SystemAssignedManagedIdentity) => { + format!("{};managed_identity=system", self.uri) + } + None => self.uri.to_string(), + } + } +} + +/// Helper for adding authentication information to a blob path in the format expected by Kusto +#[derive(Clone)] +pub enum BlobAuth { + /// adds `?` to the blob path + SASToken(String), + /// adds `;managed_identity=` to the blob path + UserAssignedManagedIdentity(String), + /// adds `;managed_identity=system` to the blob path + SystemAssignedManagedIdentity, +} + +/// Custom impl of Debug to avoid leaking sensitive information +impl std::fmt::Debug for BlobAuth { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + BlobAuth::SASToken(_) => f.debug_struct("SASToken").finish(), + BlobAuth::UserAssignedManagedIdentity(object_id) => f + .debug_struct("UserAssignedManagedIdentity") + .field("object_id", object_id) + .finish(), + BlobAuth::SystemAssignedManagedIdentity => { + f.debug_struct("SystemAssignedManagedIdentity").finish() + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn blob_descriptor_with_no_auth_modification() { + let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob"; + let blob_descriptor = BlobDescriptor::new(uri, None, None); + + assert_eq!(blob_descriptor.uri(), uri); + } + + #[test] + fn blob_descriptor_with_sas_token() { + let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob"; + let sas_token = "my_sas_token"; + let blob_descriptor = BlobDescriptor::new(uri, None, None) + .with_blob_auth(BlobAuth::SASToken(sas_token.to_string())); + + assert_eq!(blob_descriptor.uri(), format!("{uri}?{sas_token}")); + } + + #[test] + fn blob_descriptor_with_user_assigned_managed_identity() { + let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob"; + let object_id = "my_object_id"; + let blob_descriptor = BlobDescriptor::new(uri, None, None) + .with_blob_auth(BlobAuth::UserAssignedManagedIdentity(object_id.to_string())); + + assert_eq!( + blob_descriptor.uri(), + format!("{uri};managed_identity={object_id}") + ); + } + + #[test] + fn blob_descriptor_with_system_assigned_managed_identity() { + let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob"; + let blob_descriptor = BlobDescriptor::new(uri, None, None) + .with_blob_auth(BlobAuth::SystemAssignedManagedIdentity); + + assert_eq!( + blob_descriptor.uri(), + format!("{uri};managed_identity=system") + ); + } + + #[test] + fn blob_descriptor_with_size() { + let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob"; + let size = 123; + let blob_descriptor = BlobDescriptor::new(uri, Some(size), None); + + assert_eq!(blob_descriptor.size, Some(size)); + } + + #[test] + fn blob_descriptor_with_source_id() { + let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob"; + let source_id = Uuid::new_v4(); + let blob_descriptor = BlobDescriptor::new(uri, None, Some(source_id)); + + assert_eq!(blob_descriptor.source_id, source_id); + } +} diff --git a/azure-kusto-ingest/src/error.rs b/azure-kusto-ingest/src/error.rs new file mode 100644 index 0000000..cfa9462 --- /dev/null +++ b/azure-kusto-ingest/src/error.rs @@ -0,0 +1,20 @@ +//! Defines [Error] for representing failures in various operations. + +/// Error type for kusto ingestion operations. +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// Error raised when failing to obtain ingestion resources. + #[error("Error obtaining ingestion resources: {0}")] + ResourceManagerError(#[from] super::resource_manager::ResourceManagerError), + + /// Error relating to (de-)serialization of JSON data + #[error("Error in JSON serialization/deserialization: {0}")] + JsonError(#[from] serde_json::Error), + + /// Error occurring within core azure crates + #[error("Error in azure-core: {0}")] + AzureError(#[from] azure_core::error::Error), +} + +/// Result type for kusto ingest operations. +pub type Result = std::result::Result; diff --git a/azure-kusto-ingest/src/ingestion_blob_info.rs b/azure-kusto-ingest/src/ingestion_blob_info.rs new file mode 100644 index 0000000..ef554fa --- /dev/null +++ b/azure-kusto-ingest/src/ingestion_blob_info.rs @@ -0,0 +1,119 @@ +use serde::Serialize; +use uuid::Uuid; + +use crate::{ + data_format::DataFormat, descriptors::BlobDescriptor, + ingestion_properties::IngestionProperties, + resource_manager::authorization_context::KustoIdentityToken, +}; + +use time::{ + format_description::well_known::{iso8601, Iso8601}, + OffsetDateTime, +}; +/// The [DEFAULT](iso8601::Config::DEFAULT) ISO8601 format that the time crate serializes to uses a 6 digit year, +/// Here we create our own serializer function that uses a 4 digit year which is exposed as `kusto_ingest_iso8601_format` +const CONFIG: iso8601::EncodedConfig = iso8601::Config::DEFAULT + .set_year_is_six_digits(false) + .encode(); +const FORMAT: Iso8601 = Iso8601::; +time::serde::format_description!(kusto_ingest_iso8601_format, OffsetDateTime, FORMAT); + +/// Message to be serialized as JSON and sent to the ingestion queue +/// +/// Basing the ingestion message on +/// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-rest#ingestion-message-internal-structure +#[derive(Serialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub(crate) struct QueuedIngestionMessage { + /// Message identifier for this upload + id: Uuid, + /// Path (URI) to the blob. + /// This should include any SAS token required to access the blob, or hints to use managed identity auth. + /// Extra permissions are required if the `RetainBlobOnSuccess` option is not true so that the ingestion service can delete the blob once it has completed ingesting the data. + blob_path: String, + // Name of the Kusto database the data will ingest into + database_name: String, + // Name of the Kusto table the the data will ingest into + table_name: String, + /// Size of the uncompressed data in bytes. + /// Providing this value allows the ingestion service to optimize ingestion by potentially aggregating multiple blobs. + /// Although this property is optional, it is recommended to provide the size as otherwise the service will access the blob just to retrieve the size. + #[serde(skip_serializing_if = "Option::is_none")] + raw_data_size: Option, + /// If set to `true`, the blob won't be deleted once ingestion is successfully completed. + /// Default is `false` when this property is not specified. Note that this has implications on permissions required against the blob. + #[serde(skip_serializing_if = "Option::is_none")] + retain_blob_on_success: Option, + /// If set to `true`, any server side aggregation will be skipped - thus overriding the batching policy. Default is `false`. + #[serde(skip_serializing_if = "Option::is_none")] + flush_immediately: Option, + #[serde(with = "kusto_ingest_iso8601_format")] + source_message_creation_time: OffsetDateTime, + // source_message_creation_time: DateTime, + // Extra properties added to the ingestion command + additional_properties: AdditionalProperties, +} + +impl QueuedIngestionMessage { + pub(crate) fn new( + blob_descriptor: &BlobDescriptor, + ingestion_properties: &IngestionProperties, + authorization_context: KustoIdentityToken, + ) -> Self { + let additional_properties = AdditionalProperties { + authorization_context, + data_format: ingestion_properties.data_format.clone(), + }; + + Self { + id: blob_descriptor.source_id, + blob_path: blob_descriptor.uri(), + raw_data_size: blob_descriptor.size, + database_name: ingestion_properties.database_name.clone(), + table_name: ingestion_properties.table_name.clone(), + retain_blob_on_success: ingestion_properties.retain_blob_on_success, + flush_immediately: ingestion_properties.flush_immediately, + source_message_creation_time: OffsetDateTime::now_utc(), + additional_properties, + } + } +} + +/// Additional properties to be added to the ingestion message +/// This struct is modelled on: https://learn.microsoft.com/en-us/azure/data-explorer/ingestion-properties +#[derive(Serialize, Clone, Debug)] +struct AdditionalProperties { + /// Authorization string obtained from Kusto to allow for ingestion + #[serde(rename = "authorizationContext")] + authorization_context: KustoIdentityToken, + #[serde(rename = "format")] + data_format: DataFormat, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn time_custom_iso8601_serialization() { + #[derive(Serialize, Debug)] + struct TestTimeSerialize { + #[serde(with = "kusto_ingest_iso8601_format")] + customised_time_format: time::OffsetDateTime, + } + + let test_message = TestTimeSerialize { + customised_time_format: time::OffsetDateTime::from_unix_timestamp_nanos( + 1_234_567_890_123_456_789, + ) + .unwrap(), + }; + + let serialized_message = serde_json::to_string(&test_message).unwrap(); + assert_eq!( + serialized_message, + "{\"customised_time_format\":\"2009-02-13T23:31:30.123456789Z\"}" + ); + } +} diff --git a/azure-kusto-ingest/src/ingestion_properties.rs b/azure-kusto-ingest/src/ingestion_properties.rs new file mode 100644 index 0000000..1c99141 --- /dev/null +++ b/azure-kusto-ingest/src/ingestion_properties.rs @@ -0,0 +1,18 @@ +use crate::data_format::DataFormat; + +/// Properties of ingestion that can be used when ingesting data into Kusto allowing for customisation of the ingestion process +#[derive(Clone, Debug, Default)] +pub struct IngestionProperties { + /// Name of the database to ingest into + pub database_name: String, + /// Name of the table to ingest into + pub table_name: String, + /// Whether the blob is retained after ingestion. + /// Note that the default when not provided is `false`, meaning that Kusto will attempt to delete the blob upon ingestion. + /// This will only be successful if provided sufficient permissions on the blob + pub retain_blob_on_success: Option, + /// Format of the data being ingested + pub data_format: DataFormat, + /// If set to `true`, any aggregation will be skipped. Default is `false` + pub flush_immediately: Option, +} diff --git a/azure-kusto-ingest/src/lib.rs b/azure-kusto-ingest/src/lib.rs new file mode 100644 index 0000000..27eb0cb --- /dev/null +++ b/azure-kusto-ingest/src/lib.rs @@ -0,0 +1,8 @@ +pub mod client_options; +pub mod data_format; +pub mod descriptors; +pub mod error; +pub(crate) mod ingestion_blob_info; +pub mod ingestion_properties; +pub mod queued_ingest; +pub(crate) mod resource_manager; diff --git a/azure-kusto-ingest/src/queued_ingest.rs b/azure-kusto-ingest/src/queued_ingest.rs new file mode 100644 index 0000000..125cadb --- /dev/null +++ b/azure-kusto-ingest/src/queued_ingest.rs @@ -0,0 +1,62 @@ +use std::sync::Arc; + +use crate::error::Result; +use azure_core::base64; +use azure_kusto_data::prelude::KustoClient; + +use crate::client_options::QueuedIngestClientOptions; +use crate::descriptors::BlobDescriptor; +use crate::ingestion_blob_info::QueuedIngestionMessage; +use crate::ingestion_properties::IngestionProperties; +use crate::resource_manager::ResourceManager; + +/// Client for ingesting data into Kusto using the queued flavour of ingestion +#[derive(Clone)] +pub struct QueuedIngestClient { + resource_manager: Arc, +} + +impl QueuedIngestClient { + /// Creates a new client from the given [KustoClient]. + /// + /// **WARNING**: the [KustoClient] must be created with a connection string that points to the ingestion endpoint + pub fn new(kusto_client: KustoClient) -> Self { + Self::new_with_client_options(kusto_client, QueuedIngestClientOptions::default()) + } + + /// Creates a new client from the given [KustoClient] and [QueuedIngestClientOptions] + /// This allows for customisation of the [ClientOptions] used for the storage clients + /// + /// **WARNING**: the [KustoClient] must be created with a connection string that points to the ingestion endpoint + pub fn new_with_client_options( + kusto_client: KustoClient, + options: QueuedIngestClientOptions, + ) -> Self { + Self { + resource_manager: Arc::new(ResourceManager::new(kusto_client, options)), + } + } + + /// Ingest a file into Kusto from Azure Blob Storage + pub async fn ingest_from_blob( + &self, + blob_descriptor: BlobDescriptor, + ingestion_properties: IngestionProperties, + ) -> Result<()> { + let queue_client = self.resource_manager.random_ingestion_queue().await?; + + let auth_context = self.resource_manager.authorization_context().await?; + + let message = + QueuedIngestionMessage::new(&blob_descriptor, &ingestion_properties, auth_context); + + let message = serde_json::to_string(&message)?; + + // Base64 encode the ingestion message + let message = base64::encode(&message); + + let _resp = queue_client.put_message(message).await?; + + Ok(()) + } +} diff --git a/azure-kusto-ingest/src/resource_manager.rs b/azure-kusto-ingest/src/resource_manager.rs new file mode 100644 index 0000000..396b339 --- /dev/null +++ b/azure-kusto-ingest/src/resource_manager.rs @@ -0,0 +1,81 @@ +use std::{sync::Arc, time::Duration}; + +pub mod authorization_context; +pub mod cache; +pub mod ingest_client_resources; +pub mod resource_uri; +pub mod utils; + +use azure_kusto_data::prelude::KustoClient; + +use azure_storage_queues::QueueClient; + +use crate::client_options::QueuedIngestClientOptions; + +use self::{ + authorization_context::{AuthorizationContext, KustoIdentityToken}, + ingest_client_resources::IngestClientResources, +}; + +use rand::{seq::SliceRandom, thread_rng}; + +pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60); + +#[derive(Debug, thiserror::Error)] +pub enum ResourceManagerError { + #[error("Failed to obtain ingestion resources: {0}")] + IngestClientResourcesError(#[from] ingest_client_resources::IngestionResourceError), + + #[error("Failed to obtain authorization token: {0}")] + AuthorizationContextError(#[from] authorization_context::KustoIdentityTokenError), + + #[error("Failed to select a resource - no resources found")] + NoResourcesFound, +} + +type Result = std::result::Result; + +/// ResourceManager is a struct that keeps track of all the resources required for ingestion using the queued flavour +pub struct ResourceManager { + ingest_client_resources: Arc, + authorization_context: Arc, +} + +impl ResourceManager { + /// Creates a new ResourceManager from the given [KustoClient] and the [QueuedIngestClientOptions] as provided by the user + pub fn new(client: KustoClient, client_options: QueuedIngestClientOptions) -> Self { + Self { + ingest_client_resources: Arc::new(IngestClientResources::new( + client.clone(), + client_options, + )), + authorization_context: Arc::new(AuthorizationContext::new(client)), + } + } + + /// Returns the latest [QueueClient]s ready for posting ingestion messages to + async fn ingestion_queues(&self) -> Result> { + Ok(self.ingest_client_resources.get().await?.ingestion_queues) + } + + /// Returns a [QueueClient] to ingest to. + /// This is a random selection from the list of ingestion queues + pub async fn random_ingestion_queue(&self) -> Result { + let ingestion_queues = self.ingestion_queues().await?; + + let mut rng = thread_rng(); + let selected_queue = ingestion_queues + .choose(&mut rng) + .ok_or(ResourceManagerError::NoResourcesFound)?; + + Ok(selected_queue.clone()) + } + + /// Returns the latest [KustoIdentityToken] to be added as an authorization context to ingestion messages + pub async fn authorization_context(&self) -> Result { + self.authorization_context + .get() + .await + .map_err(ResourceManagerError::AuthorizationContextError) + } +} diff --git a/azure-kusto-ingest/src/resource_manager/authorization_context.rs b/azure-kusto-ingest/src/resource_manager/authorization_context.rs new file mode 100644 index 0000000..91eeca6 --- /dev/null +++ b/azure-kusto-ingest/src/resource_manager/authorization_context.rs @@ -0,0 +1,103 @@ +use azure_kusto_data::prelude::KustoClient; +use serde_json::Value; + +use super::cache::ThreadSafeCachedValue; +use super::utils::get_column_index; +use super::RESOURCE_REFRESH_PERIOD; + +pub(crate) type KustoIdentityToken = String; + +const AUTHORIZATION_CONTEXT: &str = "AuthorizationContext"; + +#[derive(thiserror::Error, Debug)] +pub enum KustoIdentityTokenError { + #[error("Kusto expected 1 table in results, found {0}")] + ExpectedOneTable(usize), + + #[error("Kusto expected 1 row in table, found {0}")] + ExpectedOneRow(usize), + + #[error("Column {0} not found in table")] + ColumnNotFound(String), + + #[error("Invalid JSON response from Kusto: {0:?}")] + InvalidJSONResponse(Value), + + #[error("Token is empty")] + EmptyToken, + + #[error(transparent)] + KustoError(#[from] azure_kusto_data::error::Error), +} + +type Result = std::result::Result; +/// Logic to obtain a Kusto identity token from the management endpoint. This auth token is a temporary token +#[derive(Debug, Clone)] +pub(crate) struct AuthorizationContext { + /// A client against a Kusto ingestion cluster + client: KustoClient, + /// Cache of the Kusto identity token + token_cache: ThreadSafeCachedValue, +} + +impl AuthorizationContext { + pub fn new(client: KustoClient) -> Self { + Self { + client, + token_cache: ThreadSafeCachedValue::new(RESOURCE_REFRESH_PERIOD), + } + } + + /// Executes a KQL query to get the Kusto identity token from the management endpoint + async fn query_kusto_identity_token(&self) -> Result { + let results = self + .client + .execute_command("NetDefaultDB", ".get kusto identity token", None) + .await?; + + // Check that there is only 1 table in the results returned by the query + let table = match &results.tables[..] { + [a] => a, + _ => { + return Err(KustoIdentityTokenError::ExpectedOneTable( + results.tables.len(), + )) + } + }; + + // Check that a column in this table actually exists called `AuthorizationContext` + let index = get_column_index(table, AUTHORIZATION_CONTEXT).ok_or( + KustoIdentityTokenError::ColumnNotFound(AUTHORIZATION_CONTEXT.into()), + )?; + + // Check that there is only 1 row in the table, and that the value in the first row at the given index is not empty + let token = match &table.rows[..] { + [row] => row + .get(index) + .ok_or(KustoIdentityTokenError::ColumnNotFound( + AUTHORIZATION_CONTEXT.into(), + ))?, + _ => return Err(KustoIdentityTokenError::ExpectedOneRow(table.rows.len())), + }; + + // Convert the JSON string into a Rust string + let token = token + .as_str() + .ok_or(KustoIdentityTokenError::InvalidJSONResponse( + token.to_owned(), + ))?; + + if token.chars().all(char::is_whitespace) { + return Err(KustoIdentityTokenError::EmptyToken); + } + + Ok(token.to_string()) + } + + /// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query + pub(crate) async fn get(&self) -> Result { + self.token_cache + .get(self.query_kusto_identity_token()) + .await + } +} diff --git a/azure-kusto-ingest/src/resource_manager/cache.rs b/azure-kusto-ingest/src/resource_manager/cache.rs new file mode 100644 index 0000000..0b024e1 --- /dev/null +++ b/azure-kusto-ingest/src/resource_manager/cache.rs @@ -0,0 +1,183 @@ +use std::{ + error::Error, + future::Future, + sync::Arc, + time::{Duration, Instant}, +}; + +use async_lock::RwLock; + +/// Wrapper around a value that allows for storing when the value was last updated, +/// as well as the period after which it should be refreshed (i.e. expired) +#[derive(Debug, Clone)] +pub struct Cached { + inner: T, + last_updated: Instant, + refresh_period: Duration, +} + +impl Cached { + pub fn new(inner: T, refresh_period: Duration) -> Self { + Self { + inner, + last_updated: Instant::now(), + refresh_period, + } + } + + pub fn get(&self) -> &T { + &self.inner + } + + pub fn is_expired(&self) -> bool { + self.last_updated.elapsed() >= self.refresh_period + } + + pub fn update(&mut self, inner: T) { + self.inner = inner; + self.last_updated = Instant::now(); + } +} + +#[derive(Debug, Clone)] +pub struct ThreadSafeCachedValue +where + T: Clone, +{ + cache: Arc>>>, +} + +impl ThreadSafeCachedValue { + pub fn new(refresh_period: Duration) -> Self { + Self { + cache: Arc::new(RwLock::new(Cached::new(None, refresh_period))), + } + } + + /// Fetches the latest value, either retrieving from cache if valid, or by executing the callback + pub async fn get(&self, callback: F) -> Result + where + F: Future>, + { + // First, try to get a value from the cache by obtaining a read lock + { + let cache = self.cache.read().await; + if !cache.is_expired() { + if let Some(cached_value) = cache.get() { + return Ok(cached_value.clone()); + } + } + } + + // Obtain a write lock to refresh the cached value + let mut cache = self.cache.write().await; + + // Again attempt to return from cache, check is done in case another thread + // refreshed the cached value while we were waiting on the write lock and its now valid + if !cache.is_expired() { + if let Some(cached_value) = cache.get() { + return Ok(cached_value.clone()); + } + } + + // Fetch new value by executing the callback, update the cache, and return the value + let fetched_value = callback.await?; + cache.update(Some(fetched_value.clone())); + + Ok(fetched_value) + } +} + +#[cfg(test)] +mod cached_tests { + use super::*; + use std::time::Duration; + + #[test] + fn test_cached_get() { + let value = "hello"; + let cached_string = Cached::new(value.to_string(), Duration::from_secs(60)); + + assert_eq!(cached_string.get(), value); + } + + #[test] + fn test_cached_is_expired() { + let value = "hello"; + let mut cached_string = Cached::new(value.to_string(), Duration::from_secs(60)); + + assert!(!cached_string.is_expired()); + + cached_string.last_updated = Instant::now() - Duration::from_secs(61); + + assert!(cached_string.is_expired()); + } + + #[test] + fn test_cached_update() { + let value = "hello"; + let mut cached_string = Cached::new(value.to_string(), Duration::from_secs(60)); + + assert_eq!(cached_string.get(), value); + + let new_value = "world"; + cached_string.update(new_value.to_string()); + + assert!(!cached_string.is_expired()); + assert_eq!(cached_string.get(), new_value); + } +} + +#[cfg(test)] +mod thread_safe_cached_value_tests { + use super::*; + use std::{fmt::Error, sync::Mutex}; + + #[derive(Debug)] + struct MockToken { + get_token_call_count: Mutex, + } + + impl MockToken { + fn new() -> Self { + Self { + get_token_call_count: Mutex::new(0), + } + } + + async fn get_new_token(&self) -> Result { + // Include an incrementing counter in the token to track how many times the token has been refreshed + let mut call_count = self.get_token_call_count.lock().unwrap(); + *call_count += 1; + Ok(call_count.clone()) + } + } + + #[tokio::test] + async fn returns_same_value_if_unexpired() -> Result<(), Error> { + let cache = ThreadSafeCachedValue::new(Duration::from_secs(300)); + let mock_token = MockToken::new(); + + let token1 = cache.get(mock_token.get_new_token()).await?; + let token2 = cache.get(mock_token.get_new_token()).await?; + + assert_eq!(token1, 1); + assert_eq!(token2, 1); + Ok(()) + } + + #[tokio::test] + async fn returns_new_value_if_expired() -> Result<(), Error> { + let cache = ThreadSafeCachedValue::new(Duration::from_millis(1)); + let mock_token = MockToken::new(); + + let token1 = cache.get(mock_token.get_new_token()).await?; + // Sleep to ensure the token expires + tokio::time::sleep(Duration::from_secs(1)).await; + let token2 = cache.get(mock_token.get_new_token()).await?; + + assert_eq!(token1, 1); + assert_eq!(token2, 2); + Ok(()) + } +} diff --git a/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs b/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs new file mode 100644 index 0000000..0043d28 --- /dev/null +++ b/azure-kusto-ingest/src/resource_manager/ingest_client_resources.rs @@ -0,0 +1,149 @@ +use crate::client_options::QueuedIngestClientOptions; + +use super::{ + cache::ThreadSafeCachedValue, + resource_uri::{ClientFromResourceUri, ResourceUri}, + utils, RESOURCE_REFRESH_PERIOD, +}; + +use azure_core::ClientOptions; +use azure_kusto_data::{models::TableV1, prelude::KustoClient}; +use azure_storage_blobs::prelude::ContainerClient; +use azure_storage_queues::QueueClient; +use serde_json::Value; + +#[derive(Debug, thiserror::Error)] +pub enum IngestionResourceError { + #[error("{column_name} column is missing in the table")] + ColumnNotFoundError { column_name: String }, + + #[error("Response returned from Kusto could not be parsed as a string: {0}")] + ParseAsStringError(Value), + + #[error("No {0} resources found in the table")] + NoResourcesFound(String), + + #[error(transparent)] + KustoError(#[from] azure_kusto_data::error::Error), + + #[error(transparent)] + ResourceUriError(#[from] super::resource_uri::ResourceUriError), + + #[error("Kusto expected a table containing ingestion resource results, found no tables")] + NoTablesFound, +} + +type Result = std::result::Result; + +fn get_column_index(table: &TableV1, column_name: &str) -> Result { + utils::get_column_index(table, column_name).ok_or(IngestionResourceError::ColumnNotFoundError { + column_name: column_name.to_string(), + }) +} + +/// Helper to get a resource URI from a table, erroring if there are no resources of the given name +fn get_resource_by_name(table: &TableV1, resource_name: String) -> Result> { + let storage_root_index = get_column_index(table, "StorageRoot")?; + let resource_type_name_index = get_column_index(table, "ResourceTypeName")?; + + let resource_uris: Vec> = table + .rows + .iter() + .filter(|r| r[resource_type_name_index] == resource_name) + .map(|r| { + let x = r[storage_root_index].as_str().ok_or( + IngestionResourceError::ParseAsStringError(r[storage_root_index].clone()), + )?; + ResourceUri::try_from(x).map_err(IngestionResourceError::ResourceUriError) + }) + .collect(); + + if resource_uris.is_empty() { + return Err(IngestionResourceError::NoResourcesFound(resource_name)); + } + + resource_uris.into_iter().collect() +} + +/// Helper to turn a vector of resource URIs into a vector of Azure clients of type T with the provided [ClientOptions] +fn create_clients_vec(resource_uris: &[ResourceUri], client_options: &ClientOptions) -> Vec +where + T: ClientFromResourceUri, +{ + resource_uris + .iter() + .map(|uri| T::create_client(uri.clone(), client_options.clone())) + .collect() +} + +/// Storage of the clients required for ingestion +#[derive(Debug, Clone)] +pub struct InnerIngestClientResources { + pub ingestion_queues: Vec, + pub temp_storage_containers: Vec, +} + +impl TryFrom<(&TableV1, &QueuedIngestClientOptions)> for InnerIngestClientResources { + type Error = IngestionResourceError; + + /// Attempts to create a new InnerIngestClientResources from the given [TableV1] and [QueuedIngestClientOptions] + fn try_from( + (table, client_options): (&TableV1, &QueuedIngestClientOptions), + ) -> std::result::Result { + let secured_ready_for_aggregation_queues = + get_resource_by_name(table, "SecuredReadyForAggregationQueue".to_string())?; + let temp_storage = get_resource_by_name(table, "TempStorage".to_string())?; + + Ok(Self { + ingestion_queues: create_clients_vec( + &secured_ready_for_aggregation_queues, + &client_options.queue_service_options, + ), + temp_storage_containers: create_clients_vec( + &temp_storage, + &client_options.blob_service_options, + ), + }) + } +} + +pub struct IngestClientResources { + /// A client against a Kusto ingestion cluster + client: KustoClient, + /// Cache of the ingest client resources + resources_cache: ThreadSafeCachedValue, + /// Options to customise the storage clients + client_options: QueuedIngestClientOptions, +} + +impl IngestClientResources { + pub fn new(client: KustoClient, client_options: QueuedIngestClientOptions) -> Self { + Self { + client, + resources_cache: ThreadSafeCachedValue::new(RESOURCE_REFRESH_PERIOD), + client_options, + } + } + + /// Executes a KQL management query that retrieves resource URIs for the various Azure resources used for ingestion + async fn query_ingestion_resources(&self) -> Result { + let results = self + .client + .execute_command("NetDefaultDB", ".get ingestion resources", None) + .await?; + + let new_resources = results + .tables + .first() + .ok_or(IngestionResourceError::NoTablesFound)?; + + InnerIngestClientResources::try_from((new_resources, &self.client_options)) + } + + /// Gets the latest resources either from cache, or fetching from Kusto and updating the cached resources + pub async fn get(&self) -> Result { + self.resources_cache + .get(self.query_ingestion_resources()) + .await + } +} diff --git a/azure-kusto-ingest/src/resource_manager/resource_uri.rs b/azure-kusto-ingest/src/resource_manager/resource_uri.rs new file mode 100644 index 0000000..615f930 --- /dev/null +++ b/azure-kusto-ingest/src/resource_manager/resource_uri.rs @@ -0,0 +1,266 @@ +use azure_core::ClientOptions; +use azure_storage::StorageCredentials; +use azure_storage_blobs::prelude::{ClientBuilder, ContainerClient}; +use azure_storage_queues::{QueueClient, QueueServiceClientBuilder}; +use url::Url; + +#[derive(Debug, thiserror::Error)] +pub enum ResourceUriError { + #[error("URI scheme must be 'https', was '{0}'")] + InvalidScheme(String), + + #[error("URI host must be a domain")] + InvalidHost, + + #[error("Object name is missing in the URI")] + MissingObjectName, + + #[error("SAS token is missing in the URI as a query parameter")] + MissingSasToken, + + #[error("Account name is missing in the URI")] + MissingAccountName, + + #[error(transparent)] + ParseError(#[from] url::ParseError), + + #[error(transparent)] + AzureError(#[from] azure_core::Error), +} + +/// Parsing logic of resource URIs as returned by the Kusto management endpoint +#[derive(Debug, Clone)] +pub(crate) struct ResourceUri { + pub(crate) service_uri: String, + pub(crate) object_name: String, + pub(crate) account_name: String, + pub(crate) sas_token: StorageCredentials, +} + +impl TryFrom<&str> for ResourceUri { + type Error = ResourceUriError; + + fn try_from(uri: &str) -> Result { + let parsed_uri = Url::parse(uri)?; + + match parsed_uri.scheme() { + "https" => {} + other_scheme => return Err(ResourceUriError::InvalidScheme(other_scheme.to_string())), + }; + + let host_string = match parsed_uri.host() { + Some(url::Host::Domain(host_string)) => host_string, + _ => return Err(ResourceUriError::InvalidHost), + }; + + let service_uri = String::from("https://") + host_string; + + // WIBNI: better parsing that this conforms to a storage resource URI, + // perhaps then ResourceUri could take a type like ResourceUri or ResourceUri + let (account_name, _service_endpoint) = host_string + .split_once('.') + .ok_or(ResourceUriError::MissingAccountName)?; + + let object_name = match parsed_uri.path_segments() { + Some(mut path_segments) => { + let object_name = match path_segments.next() { + Some(object_name) if !object_name.is_empty() => object_name, + _ => return Err(ResourceUriError::MissingObjectName), + }; + // Ensure there is only one path segment (i.e. the object name) + if path_segments.next().is_some() { + return Err(ResourceUriError::MissingObjectName); + }; + object_name + } + None => return Err(ResourceUriError::MissingObjectName), + }; + + let sas_token = parsed_uri + .query() + .ok_or(ResourceUriError::MissingSasToken)?; + + let sas_token = StorageCredentials::sas_token(sas_token)?; + + Ok(Self { + service_uri, + object_name: object_name.to_string(), + account_name: account_name.to_string(), + sas_token, + }) + } +} + +/// Trait to be used to create an Azure client from a resource URI with configurability of ClientOptions +pub(crate) trait ClientFromResourceUri { + fn create_client(resource_uri: ResourceUri, client_options: ClientOptions) -> Self; +} + +impl ClientFromResourceUri for QueueClient { + fn create_client(resource_uri: ResourceUri, client_options: ClientOptions) -> Self { + QueueServiceClientBuilder::with_location( + azure_storage::CloudLocation::Custom { + uri: resource_uri.service_uri, + account: resource_uri.account_name, + }, + resource_uri.sas_token, + ) + .client_options(client_options) + .build() + .queue_client(resource_uri.object_name) + } +} + +impl ClientFromResourceUri for ContainerClient { + fn create_client(resource_uri: ResourceUri, client_options: ClientOptions) -> Self { + ClientBuilder::with_location( + azure_storage::CloudLocation::Custom { + uri: resource_uri.service_uri, + account: resource_uri.account_name, + }, + resource_uri.sas_token, + ) + .client_options(client_options) + .container_client(resource_uri.object_name) + } +} + +#[cfg(test)] +mod tests { + use azure_storage::StorageCredentialsInner; + + use super::*; + use std::convert::TryFrom; + + #[test] + fn resource_uri_try_from() { + let uri = "https://storageaccountname.blob.core.windows.com/containerobjectname?sas=token"; + let resource_uri = ResourceUri::try_from(uri).unwrap(); + + assert_eq!( + resource_uri.service_uri, + "https://storageaccountname.blob.core.windows.com" + ); + assert_eq!(resource_uri.object_name, "containerobjectname"); + + let storage_credential_inner = std::sync::Arc::into_inner(resource_uri.sas_token.0) + .unwrap() + .into_inner(); + assert!(matches!( + storage_credential_inner, + StorageCredentialsInner::SASToken(_) + )); + + if let StorageCredentialsInner::SASToken(sas_vec) = storage_credential_inner { + assert_eq!(sas_vec.len(), 1); + assert_eq!(sas_vec[0].0, "sas"); + assert_eq!(sas_vec[0].1, "token"); + } + } + + #[test] + fn invalid_scheme() { + let uri = "http://storageaccountname.blob.core.windows.com/containerobjectname?sas=token"; + let resource_uri = ResourceUri::try_from(uri); + + assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::InvalidScheme(_) + )); + } + + #[test] + fn missing_host_str() { + let uri = "https:"; + let resource_uri = ResourceUri::try_from(uri); + println!("{:#?}", resource_uri); + + assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::ParseError(_) + )); + } + + #[test] + fn invalid_host_ipv4() { + let uri = "https://127.0.0.1/containerobjectname?sas=token"; + let resource_uri = ResourceUri::try_from(uri); + + assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::InvalidHost + )); + } + + #[test] + fn invalid_host_ipv6() { + let uri = "https://[3FFE:FFFF:0::CD30]/containerobjectname?sas=token"; + let resource_uri = ResourceUri::try_from(uri); + println!("{:#?}", resource_uri); + + assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::InvalidHost + )); + } + + #[test] + fn missing_object_name() { + let uri = "https://storageaccountname.blob.core.windows.com/?sas=token"; + let resource_uri = ResourceUri::try_from(uri); + println!("{:#?}", resource_uri); + + assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::MissingObjectName + )); + } + + #[test] + fn missing_sas_token() { + let uri = "https://storageaccountname.blob.core.windows.com/containerobjectname"; + let resource_uri = ResourceUri::try_from(uri); + println!("{:#?}", resource_uri); + + assert!(resource_uri.is_err()); + assert!(matches!( + resource_uri.unwrap_err(), + ResourceUriError::MissingSasToken + )); + } + + #[test] + fn queue_client_from_resource_uri() { + let resource_uri = ResourceUri { + service_uri: "https://mystorageaccount.queue.core.windows.net".to_string(), + object_name: "queuename".to_string(), + account_name: "mystorageaccount".to_string(), + sas_token: StorageCredentials::sas_token("sas=token").unwrap(), + }; + + let client_options = ClientOptions::default(); + let queue_client = QueueClient::create_client(resource_uri, client_options); + + assert_eq!(queue_client.queue_name(), "queuename"); + } + + #[test] + fn container_client_from_resource_uri() { + let resource_uri = ResourceUri { + service_uri: "https://mystorageaccount.blob.core.windows.net".to_string(), + object_name: "containername".to_string(), + account_name: "mystorageaccount".to_string(), + sas_token: StorageCredentials::sas_token("sas=token").unwrap(), + }; + + let client_options = ClientOptions::default(); + let container_client = ContainerClient::create_client(resource_uri, client_options); + + assert_eq!(container_client.container_name(), "containername"); + } +} diff --git a/azure-kusto-ingest/src/resource_manager/utils.rs b/azure-kusto-ingest/src/resource_manager/utils.rs new file mode 100644 index 0000000..ebf5616 --- /dev/null +++ b/azure-kusto-ingest/src/resource_manager/utils.rs @@ -0,0 +1,10 @@ +use azure_kusto_data::models::TableV1; + +/// Helper to get a column index from a table +// TODO: this could be moved upstream into Kusto Data +pub fn get_column_index(table: &TableV1, column_name: &str) -> Option { + table + .columns + .iter() + .position(|c| c.column_name == column_name) +}