Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queued ingest client with support for ingesting from blobs #29

Merged
merged 47 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
846dd72
Initial commit of queued ingestion client
krishanjmistry Aug 18, 2023
b67607d
Initial logic for caching resources
krishanjmistry Aug 18, 2023
d7eae30
More on caching logic
krishanjmistry Aug 21, 2023
b9f2e05
Allow configurability of ClientOptions for different service types
krishanjmistry Aug 23, 2023
0469d58
cargo fmt
krishanjmistry Aug 23, 2023
ffecd4b
cargo clippy fixes exc. unused
krishanjmistry Aug 23, 2023
8f86a5f
minor improvements
krishanjmistry Aug 23, 2023
a5ec8b8
various minor improvements upon testing
krishanjmistry Aug 23, 2023
ede9088
improvements
krishanjmistry Aug 24, 2023
6054520
improve validation in authorization context
krishanjmistry Aug 25, 2023
82ee449
move client options to separate file + other improvements
krishanjmistry Aug 25, 2023
2510919
remove duplicate definition of KustoIdentityToken
krishanjmistry Aug 25, 2023
eb955f8
move ingest client resource around
krishanjmistry Aug 25, 2023
df96e97
simplify
krishanjmistry Aug 29, 2023
f9da7f2
add some unit tests
krishanjmistry Aug 30, 2023
5cb3a23
remove use of URL in blob descriptor
krishanjmistry Aug 30, 2023
9b4a9b2
add tests for descriptors
krishanjmistry Aug 30, 2023
691c255
choose a random queue + more changes
krishanjmistry Aug 30, 2023
c250a67
simplify again to provide basic queued ingestion client supporting on…
krishanjmistry Aug 30, 2023
2951b13
remove more stuff
krishanjmistry Aug 30, 2023
90f968a
add basic example + more tidying
krishanjmistry Aug 30, 2023
10d1d9c
change to import from qualification
krishanjmistry Aug 30, 2023
0983454
change rng to allow multiple threads
krishanjmistry Aug 31, 2023
b72d2a9
handle authorization context token correctly
krishanjmistry Aug 31, 2023
e00f943
initial markups
krishanjmistry Sep 1, 2023
15288e5
markup: remove accessor methods on ResourceUri
krishanjmistry Sep 1, 2023
f03b21b
more markups
krishanjmistry Sep 1, 2023
0387e31
renaming
krishanjmistry Sep 4, 2023
9773174
convert println into initial debug logs
krishanjmistry Sep 4, 2023
65b9483
cleanup commented code related to container clients
krishanjmistry Sep 4, 2023
af220f6
update function names
krishanjmistry Sep 4, 2023
9f0244a
renames and add warning about using ingestion endpoint
krishanjmistry Sep 5, 2023
26dc69a
comments
krishanjmistry Sep 5, 2023
d477e2b
markups
krishanjmistry Sep 5, 2023
909bd03
use env vars in example
krishanjmistry Sep 5, 2023
44f9e08
Update Azure SDK to 0.15 and remove time 0.1.45 (#3)
krishanjmistry Sep 12, 2023
1f9cfa5
Update azure deps to 0.15
samtarver Sep 19, 2023
95af3f5
Merge pull request #4 from krishanjmistry/update-azure-to-0.15
samtarver Sep 19, 2023
74dfd6a
Azure SDK deps to 0.16 (#5)
krishanjmistry Nov 6, 2023
6145aad
Azure SDK deps to 0.17 (#6)
krishanjmistry Nov 7, 2023
f0dddce
Remove use of anyhow and replace with thiserror, other markups (#7)
krishanjmistry Jan 18, 2024
67ec88e
Merge remote-tracking branch 'origin/main' into simplified-ingest-client
krishanjmistry Jan 18, 2024
df61098
Update to Azure 0.19 (#9)
krishanjmistry Jan 18, 2024
4b28b5c
Use async-lock rather than tokio for RwLock (#10)
krishanjmistry Jan 18, 2024
ab17f7e
Remove debug logs (#11)
krishanjmistry Jan 18, 2024
b8ee019
Simplified ingest client markups (#12)
krishanjmistry Jan 26, 2024
4774e29
Merge branch 'main' into simplified-ingest-client
krishanjmistry Apr 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[workspace]
members = ["azure-kusto-data"]
members = ["azure-kusto-data", "azure-kusto-ingest"]
4 changes: 2 additions & 2 deletions azure-kusto-data/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ categories = ["api-bindings"]
[dependencies]
arrow-array = { version = "42", optional = true }
arrow-schema = { version = "42", optional = true }
azure_core = { version = "0.13", features = [
azure_core = { version = "0.17", features = [
"enable_reqwest",
"enable_reqwest_gzip",
] }
azure_identity = "0.13.0"
azure_identity = "0.17"
async-trait = "0.1.64"
async-convert = "1.0.0"
bytes = "1.4"
Expand Down
24 changes: 24 additions & 0 deletions azure-kusto-ingest/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "azure-kusto-ingest"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
azure-kusto-data = {path = "../azure-kusto-data"}
# Azure SDK for Rust crates versions must be kept in sync
azure_core = "0.17"
azure_storage = "0.17"
azure_storage_blobs = "0.17"
azure_storage_queues = "0.17"

anyhow = "1"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
krishanjmistry marked this conversation as resolved.
Show resolved Hide resolved
rand = "0.8"
serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1", default-features = false, features = ["std"] }
url = "2"
uuid = {version = "1", features = ["v4", "serde"]}
60 changes: 60 additions & 0 deletions azure-kusto-ingest/examples/ingest_from_blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::env;

use anyhow::Result;
use azure_kusto_data::prelude::{ConnectionString, KustoClient, KustoClientOptions};
use azure_kusto_ingest::data_format::DataFormat;
use azure_kusto_ingest::descriptors::{BlobAuth, BlobDescriptor};
use azure_kusto_ingest::ingestion_properties::IngestionProperties;
use azure_kusto_ingest::queued_ingest::QueuedIngestClient;

/// Example of ingesting data into Kusto from Azure Blob Storage using managed identities.
/// This example enforces that the Kusto cluster has a system assigned managed identity with access to the storage account
///
/// There are some steps that need to be taken to allow for managed identities to work:
/// - Permissions as the ingestor to initiate ingestion
/// https://learn.microsoft.com/en-us/azure/data-explorer/kusto/api/netfx/kusto-ingest-client-permissions
/// - Permissions for Kusto to access storage
/// https://learn.microsoft.com/en-us/azure/data-explorer/ingest-data-managed-identity
#[tokio::main]
async fn main() -> Result<()> {
let cluster_ingest_uri = env::var("KUSTO_INGEST_URI").expect("Must define KUSTO_INGEST_URI");
let user_mi_object_id =
env::var("KUSTO_USER_MI_OBJECT_ID").expect("Must define KUSTO_USER_MI_OBJECT_ID");

// Create a Kusto client with managed identity authentication via the user assigned identity
let kusto_client = KustoClient::new(
ConnectionString::with_managed_identity_auth(cluster_ingest_uri, Some(user_mi_object_id)),
KustoClientOptions::default(),
)?;

// Create a queued ingest client
let queued_ingest_client = QueuedIngestClient::new(kusto_client);

// Define ingestion properties
let ingestion_properties = IngestionProperties {
database_name: env::var("KUSTO_DATABASE_NAME").expect("Must define KUSTO_DATABASE_NAME"),
table_name: env::var("KUSTO_TABLE_NAME").expect("Must define KUSTO_TABLE_NAME"),
// Don't delete the blob on successful ingestion
retain_blob_on_success: Some(true),
// File format of the blob is Parquet
data_format: DataFormat::Parquet,
// Assume the server side default for flush_immediately
flush_immediately: None,
};

// Define the blob to ingest from
let blob_uri = env::var("BLOB_URI").expect("Must define BLOB_URI");
// Define the size of the blob if known, this improves ingestion performance as Kusto does not need to access the blob to determine the size
let blob_size: Option<u64> = match env::var("BLOB_SIZE") {
Ok(blob_size) => Some(blob_size.parse().expect("BLOB_SIZE must be a valid u64")),
Err(_) => None,
};

// Create the blob descriptor, also specifying that the blob should be accessed using the system assigned managed identity of the Kusto cluster
let blob_descriptor = BlobDescriptor::new(blob_uri, blob_size, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);

queued_ingest_client
.ingest_from_blob(blob_descriptor, ingestion_properties)
.await
}
51 changes: 51 additions & 0 deletions azure-kusto-ingest/src/client_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use azure_core::ClientOptions;

/// Allows configurability of ClientOptions for the storage clients used within [QueuedIngestClient](crate::queued_ingest::QueuedIngestClient)
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptions {
pub queue_service: ClientOptions,
krishanjmistry marked this conversation as resolved.
Show resolved Hide resolved
pub blob_service: ClientOptions,
}

impl From<ClientOptions> for QueuedIngestClientOptions {
/// Creates a `QueuedIngestClientOptions` struct where the same [ClientOptions] are used for all services
fn from(client_options: ClientOptions) -> Self {
Self {
queue_service: client_options.clone(),
blob_service: client_options,
}
}
}

/// Builder for [QueuedIngestClientOptions], call `build()` to create the [QueuedIngestClientOptions]
#[derive(Clone, Default)]
pub struct QueuedIngestClientOptionsBuilder {
queue_service: ClientOptions,
blob_service: ClientOptions,
}

impl QueuedIngestClientOptionsBuilder {
pub fn new() -> Self {
Self {
queue_service: ClientOptions::default(),
blob_service: ClientOptions::default(),
}
}

pub fn with_queue_service(mut self, queue_service: ClientOptions) -> Self {
self.queue_service = queue_service;
self
}

pub fn with_blob_service(mut self, blob_service: ClientOptions) -> Self {
self.blob_service = blob_service;
self
}

pub fn build(self) -> QueuedIngestClientOptions {
QueuedIngestClientOptions {
queue_service: self.queue_service,
blob_service: self.blob_service,
}
}
}
37 changes: 37 additions & 0 deletions azure-kusto-ingest/src/data_format.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use serde::Serialize;

/// All data formats supported by Kusto.
/// Default is [DataFormat::CSV]
#[derive(Serialize, Clone, Debug, Default, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum DataFormat {
ApacheAvro,
Avro,
#[default]
CSV,
JSON,
MultiJSON,
ORC,
Parquet,
PSV,
RAW,
SCSV,
SOHsv,
SingleJSON,
SStream,
TSV,
TSVe,
TXT,
W3CLOGFILE,
}

// Unit tests
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn data_format_default() {
assert_eq!(DataFormat::default(), DataFormat::CSV);
}
}
148 changes: 148 additions & 0 deletions azure-kusto-ingest/src/descriptors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use uuid::Uuid;

/// Encapsulates the information related to a blob that is required to ingest from a blob
#[derive(Debug, Clone)]
pub struct BlobDescriptor {
uri: String,
pub(crate) size: Option<u64>,
pub(crate) source_id: Uuid,
/// Authentication information for the blob; when [None], the uri is passed through as is
blob_auth: Option<BlobAuth>,
}

impl BlobDescriptor {
/// Create a new BlobDescriptor.
///
/// Parameters:
/// - `uri`: the uri of the blob to ingest from, note you can use the optional helper method `with_blob_auth` to add authentication information to the uri
/// - `size`: although the size is not required, providing it is recommended as it allows Kusto to better plan the ingestion process
/// - `source_id`: optional, useful if tracking ingestion status, if not provided, a random uuid will be generated
pub fn new(uri: impl Into<String>, size: Option<u64>, source_id: Option<Uuid>) -> Self {
let source_id = match source_id {
Some(source_id) => source_id,
None => Uuid::new_v4(),
};

Self {
uri: uri.into(),
size,
source_id,
blob_auth: None,
}
}

/// Mutator to modify the authentication information of the BlobDescriptor
pub fn with_blob_auth(mut self, blob_auth: BlobAuth) -> Self {
self.blob_auth = Some(blob_auth);
self
}

/// Returns the uri with the authentication information concatenated, ready to be serialized into the ingestion message
pub(crate) fn uri(&self) -> String {
match &self.blob_auth {
Some(BlobAuth::SASToken(sas_token)) => {
format!("{}?{}", self.uri, sas_token.as_str())
}
Some(BlobAuth::UserAssignedManagedIdentity(object_id)) => {
format!("{};managed_identity={}", self.uri, object_id)
}
Some(BlobAuth::SystemAssignedManagedIdentity) => {
format!("{};managed_identity=system", self.uri)
}
None => self.uri.to_string(),
}
}
}

/// Helper for adding authentication information to a blob path in the format expected by Kusto
#[derive(Clone)]
pub enum BlobAuth {
/// adds `?<sas_token>` to the blob path
SASToken(String),
/// adds `;managed_identity=<identity>` to the blob path
UserAssignedManagedIdentity(String),
/// adds `;managed_identity=system` to the blob path
SystemAssignedManagedIdentity,
}

/// Custom impl of Debug to avoid leaking sensitive information
impl std::fmt::Debug for BlobAuth {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BlobAuth::SASToken(_) => f.debug_struct("SASToken").finish(),
BlobAuth::UserAssignedManagedIdentity(object_id) => f
.debug_struct("UserAssignedManagedIdentity")
.field("object_id", object_id)
.finish(),
BlobAuth::SystemAssignedManagedIdentity => {
f.debug_struct("SystemAssignedManagedIdentity").finish()
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn blob_descriptor_with_no_auth_modification() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let blob_descriptor = BlobDescriptor::new(uri, None, None);

assert_eq!(blob_descriptor.uri(), uri);
}

#[test]
fn blob_descriptor_with_sas_token() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let sas_token = "my_sas_token";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::SASToken(sas_token.to_string()));

assert_eq!(blob_descriptor.uri(), format!("{uri}?{sas_token}"));
}

#[test]
fn blob_descriptor_with_user_assigned_managed_identity() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let object_id = "my_object_id";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::UserAssignedManagedIdentity(object_id.to_string()));

assert_eq!(
blob_descriptor.uri(),
format!("{uri};managed_identity={object_id}")
);
}

#[test]
fn blob_descriptor_with_system_assigned_managed_identity() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let blob_descriptor = BlobDescriptor::new(uri, None, None)
.with_blob_auth(BlobAuth::SystemAssignedManagedIdentity);

assert_eq!(
blob_descriptor.uri(),
format!("{uri};managed_identity=system")
);
}

#[test]
fn blob_descriptor_with_size() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let size = 123;
let blob_descriptor = BlobDescriptor::new(uri, Some(size), None);

assert_eq!(blob_descriptor.size, Some(size));
}

#[test]
fn blob_descriptor_with_source_id() {
let uri = "https://mystorageaccount.blob.core.windows.net/mycontainer/myblob";
let source_id = Uuid::new_v4();
let blob_descriptor = BlobDescriptor::new(uri, None, Some(source_id));

assert_eq!(blob_descriptor.source_id, source_id);
}
}
Loading