Skip to content

Commit

Permalink
Simplified ingest client markups (#12)
Browse files Browse the repository at this point in the history
* resource_uri changes

* markups in client_options

* resource_manager markups

* remove dependency on chrono - use time

* cache changes

* add missing dev dependency feature

* add some basic tests for caching implementation
  • Loading branch information
krishanjmistry authored Jan 26, 2024
1 parent ab17f7e commit b8ee019
Show file tree
Hide file tree
Showing 9 changed files with 256 additions and 151 deletions.
4 changes: 2 additions & 2 deletions azure-kusto-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ azure_storage_blobs = "0.19"
azure_storage_queues = "0.19"

async-lock = "3"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
thiserror = "1"
time = { version = "0.3", features = ["serde-human-readable", "macros"] }
url = "2"
uuid = { version = "1", features = ["v4", "serde"] }

[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
28 changes: 14 additions & 14 deletions azure-kusto-ingest/src/client_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,49 @@ use azure_core::ClientOptions;
/// Allows configurability of ClientOptions for the storage clients used within [QueuedIngestClient](crate::queued_ingest::QueuedIngestClient)
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptions {
pub queue_service: ClientOptions,
pub blob_service: ClientOptions,
pub queue_service_options: ClientOptions,
pub blob_service_options: ClientOptions,
}

impl From<ClientOptions> for QueuedIngestClientOptions {
/// Creates a `QueuedIngestClientOptions` struct where the same [ClientOptions] are used for all services
fn from(client_options: ClientOptions) -> Self {
Self {
queue_service: client_options.clone(),
blob_service: client_options,
queue_service_options: client_options.clone(),
blob_service_options: client_options,
}
}
}

/// Builder for [QueuedIngestClientOptions], call `build()` to create the [QueuedIngestClientOptions]
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptionsBuilder {
queue_service: ClientOptions,
blob_service: ClientOptions,
queue_service_options: ClientOptions,
blob_service_options: ClientOptions,
}

impl QueuedIngestClientOptionsBuilder {
pub fn new() -> Self {
Self {
queue_service: ClientOptions::default(),
blob_service: ClientOptions::default(),
queue_service_options: ClientOptions::default(),
blob_service_options: ClientOptions::default(),
}
}

pub fn with_queue_service(mut self, queue_service: ClientOptions) -> Self {
self.queue_service = queue_service;
pub fn with_queue_service_options(mut self, queue_service_options: ClientOptions) -> Self {
self.queue_service_options = queue_service_options;
self
}

pub fn with_blob_service(mut self, blob_service: ClientOptions) -> Self {
self.blob_service = blob_service;
pub fn with_blob_service_options(mut self, blob_service_options: ClientOptions) -> Self {
self.blob_service_options = blob_service_options;
self
}

pub fn build(self) -> QueuedIngestClientOptions {
QueuedIngestClientOptions {
queue_service: self.queue_service,
blob_service: self.blob_service,
queue_service_options: self.queue_service_options,
blob_service_options: self.blob_service_options,
}
}
}
46 changes: 43 additions & 3 deletions azure-kusto-ingest/src/ingestion_blob_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use chrono::{DateTime, Utc};
use serde::Serialize;
use uuid::Uuid;

Expand All @@ -8,6 +7,18 @@ use crate::{
resource_manager::authorization_context::KustoIdentityToken,
};

use time::{
format_description::well_known::{iso8601, Iso8601},
OffsetDateTime,
};
/// The [DEFAULT](iso8601::Config::DEFAULT) ISO8601 format that the time crate serializes to uses a 6 digit year,
/// Here we create our own serializer function that uses a 4 digit year which is exposed as `kusto_ingest_iso8601_format`
const CONFIG: iso8601::EncodedConfig = iso8601::Config::DEFAULT
.set_year_is_six_digits(false)
.encode();
const FORMAT: Iso8601<CONFIG> = Iso8601::<CONFIG>;
time::serde::format_description!(kusto_ingest_iso8601_format, OffsetDateTime, FORMAT);

/// Message to be serialized as JSON and sent to the ingestion queue
///
/// Basing the ingestion message on
Expand Down Expand Up @@ -37,7 +48,9 @@ pub(crate) struct QueuedIngestionMessage {
/// If set to `true`, any server side aggregation will be skipped - thus overriding the batching policy. Default is `false`.
#[serde(skip_serializing_if = "Option::is_none")]
flush_immediately: Option<bool>,
source_message_creation_time: DateTime<Utc>,
#[serde(with = "kusto_ingest_iso8601_format")]
source_message_creation_time: OffsetDateTime,
// source_message_creation_time: DateTime<Utc>,
// Extra properties added to the ingestion command
additional_properties: AdditionalProperties,
}
Expand All @@ -61,7 +74,7 @@ impl QueuedIngestionMessage {
table_name: ingestion_properties.table_name.clone(),
retain_blob_on_success: ingestion_properties.retain_blob_on_success,
flush_immediately: ingestion_properties.flush_immediately,
source_message_creation_time: Utc::now(),
source_message_creation_time: OffsetDateTime::now_utc(),
additional_properties,
}
}
Expand All @@ -77,3 +90,30 @@ struct AdditionalProperties {
#[serde(rename = "format")]
data_format: DataFormat,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn time_custom_iso8601_serialization() {
#[derive(Serialize, Debug)]
struct TestTimeSerialize {
#[serde(with = "kusto_ingest_iso8601_format")]
customised_time_format: time::OffsetDateTime,
}

let test_message = TestTimeSerialize {
customised_time_format: time::OffsetDateTime::from_unix_timestamp_nanos(
1_234_567_890_123_456_789,
)
.unwrap(),
};

let serialized_message = serde_json::to_string(&test_message).unwrap();
assert_eq!(
serialized_message,
"{\"customised_time_format\":\"2009-02-13T23:31:30.123456789Z\"}"
);
}
}
2 changes: 1 addition & 1 deletion azure-kusto-ingest/src/queued_ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl QueuedIngestClient {
blob_descriptor: BlobDescriptor,
ingestion_properties: IngestionProperties,
) -> Result<()> {
let queue_client = self.resource_manager.ingestion_queue().await?;
let queue_client = self.resource_manager.random_ingestion_queue().await?;

let auth_context = self.resource_manager.authorization_context().await?;

Expand Down
50 changes: 8 additions & 42 deletions azure-kusto-ingest/src/resource_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use self::{
ingest_client_resources::IngestClientResources,
};

use rand::{rngs::StdRng, seq::SliceRandom, SeedableRng};
use rand::{seq::SliceRandom, thread_rng};

pub const RESOURCE_REFRESH_PERIOD: Duration = Duration::from_secs(60 * 60);

Expand Down Expand Up @@ -60,9 +60,14 @@ impl ResourceManager {

/// Returns a [QueueClient] to ingest to.
/// This is a random selection from the list of ingestion queues
pub async fn ingestion_queue(&self) -> Result<QueueClient> {
pub async fn random_ingestion_queue(&self) -> Result<QueueClient> {
let ingestion_queues = self.ingestion_queues().await?;
let selected_queue = select_random_resource(ingestion_queues)?;

let mut rng = thread_rng();
let selected_queue = ingestion_queues
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)?;

Ok(selected_queue.clone())
}

Expand All @@ -74,42 +79,3 @@ impl ResourceManager {
.map_err(ResourceManagerError::AuthorizationContextError)
}
}
/// Selects a random resource from the given list of resources
fn select_random_resource<T: Clone>(resources: Vec<T>) -> Result<T> {
let mut rng: StdRng = SeedableRng::from_entropy();
resources
.choose(&mut rng)
.ok_or(ResourceManagerError::NoResourcesFound)
.cloned()
}

#[cfg(test)]
mod select_random_resource_tests {
use super::*;

#[test]
fn single_resource() {
const VALUE: i32 = 1;
let resources = vec![VALUE];
let selected_resource = select_random_resource(resources).unwrap();
assert!(selected_resource == VALUE)
}

#[test]
fn multiple_resources() {
let resources = vec![1, 2, 3, 4, 5];
let selected_resource = select_random_resource(resources.clone()).unwrap();
assert!(resources.contains(&selected_resource));
}

#[test]
fn no_resources() {
let resources: Vec<i32> = vec![];
let selected_resource = select_random_resource(resources);
assert!(selected_resource.is_err());
assert!(matches!(
selected_resource.unwrap_err(),
ResourceManagerError::NoResourcesFound
))
}
}
38 changes: 6 additions & 32 deletions azure-kusto-ingest/src/resource_manager/authorization_context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use std::sync::Arc;

use async_lock::RwLock;
use azure_kusto_data::prelude::KustoClient;
use serde_json::Value;

use super::cache::{Cached, ThreadSafeCachedValue};
use super::cache::ThreadSafeCachedValue;
use super::utils::get_column_index;
use super::RESOURCE_REFRESH_PERIOD;

Expand Down Expand Up @@ -40,14 +37,14 @@ pub(crate) struct AuthorizationContext {
/// A client against a Kusto ingestion cluster
client: KustoClient,
/// Cache of the Kusto identity token
token_cache: ThreadSafeCachedValue<Option<KustoIdentityToken>>,
token_cache: ThreadSafeCachedValue<KustoIdentityToken>,
}

impl AuthorizationContext {
pub fn new(client: KustoClient) -> Self {
Self {
client,
token_cache: Arc::new(RwLock::new(Cached::new(None, RESOURCE_REFRESH_PERIOD))),
token_cache: ThreadSafeCachedValue::new(RESOURCE_REFRESH_PERIOD),
}
}

Expand Down Expand Up @@ -99,31 +96,8 @@ impl AuthorizationContext {

/// Fetches the latest Kusto identity token, either retrieving from cache if valid, or by executing a KQL query
pub(crate) async fn get(&self) -> Result<KustoIdentityToken> {
// first, try to get the resources from the cache by obtaining a read lock
{
let token_cache = self.token_cache.read().await;
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}
}

// obtain a write lock to refresh the kusto response
let mut token_cache = self.token_cache.write().await;

// Again attempt to return from cache, check is done in case another thread
// refreshed the token while we were waiting on the write lock
if !token_cache.is_expired() {
if let Some(token) = token_cache.get() {
return Ok(token.clone());
}
}

// Fetch new token from Kusto, update the cache, and return the token
let token = self.query_kusto_identity_token().await?;
token_cache.update(Some(token.clone()));

Ok(token)
self.token_cache
.get(self.query_kusto_identity_token())
.await
}
}
Loading

0 comments on commit b8ee019

Please sign in to comment.