Skip to content

Commit

Permalink
feat: fetching PHDs for DAC v5 prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
yahortsaryk committed Dec 24, 2024
1 parent 65714d5 commit 6dc9788
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 33 deletions.
209 changes: 178 additions & 31 deletions pallets/ddc-verification/src/aggregator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,35 @@ impl<'a> AggregatorClient<'a> {
}
}

pub fn partial_historical_document(
&self,
phd_id: String,
) -> Result<json::PHDResponse, http::Error> {
let mut url = format!("{}/activity/phd/{}", self.base_url, phd_id);
if self.verify_sig {
url = format!("{}?sign=true", url);
}
let response = self.get(&url, Accept::Any)?;

let body = response.body().collect::<Vec<u8>>();
if self.verify_sig {
let json_response: json::SignedJsonResponse<json::PHDResponse> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

if !json_response.verify() {
log::debug!("bad signature, req: {:?}, resp: {:?}", url, json_response);
return Err(http::Error::Unknown); // TODO (khssnv): more specific error.
}

Ok(json_response.payload)
} else {
let json_response: json::PHDResponse =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
}
}

pub fn traverse_bucket_sub_aggregate(
&self,
era_id: DdcEra,
Expand Down Expand Up @@ -561,10 +590,11 @@ pub(crate) mod json {
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Encode, Decode)]
#[allow(non_snake_case)]
pub struct EHDResponse {
pub ehdId: String,
pub pdhIds: Vec<String>,
#[serde(rename = "ehdId")]
pub ehd_id: String,
#[serde(rename = "pdhIds")]
pub pdh_ids: Vec<String>,
pub status: String,
pub customers: Vec<EHDCustomer>,
pub providers: Vec<EHDProvider>,
Expand All @@ -573,53 +603,170 @@ pub(crate) mod json {
#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDUsage {
/// Total amount of stored bytes.
pub storedBytes: i64,
/// Total amount of transferred bytes.
pub transferredBytes: u64,
/// Total number of puts.
pub puts: u64,
/// Total number of gets.
pub gets: u64,
#[serde(rename = "storedBytes")]
pub stored_bytes: i64,
#[serde(rename = "transferredBytes")]
pub transferred_bytes: u64,
#[serde(rename = "puts")]
pub number_of_puts: u64,
#[serde(rename = "gets")]
pub number_of_gets: u64,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDCustomer {
pub customerId: String,
pub consumedUsage: EHDUsage,
#[serde(rename = "customerId")]
pub customer_id: String,
#[serde(rename = "consumedUsage")]
pub consumed_usage: EHDUsage,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDProvider {
pub providerId: String,
pub providedUsage: EHDUsage,
#[serde(rename = "providerId")]
pub provider_id: String,
#[serde(rename = "providedUsage")]
pub provided_usage: EHDUsage,
pub nodes: Vec<EHDNodeResources>,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDNodeResources {
/// Node Key.
pub nodeId: String,
/// Total amount of stored bytes.
pub storedBytes: i64,
/// Total amount of transferred bytes.
pub transferredBytes: u64,
/// Total number of puts.
pub puts: u64,
/// Total number of gets.
pub gets: u64,
pub avgLatency: u64,
pub avgBandwidth: u64,
#[serde(rename = "nodeId")]
pub node_key: String,
#[serde(rename = "storedBytes")]
pub stored_bytes: i64,
#[serde(rename = "transferredBytes")]
pub transferred_bytes: u64,
#[serde(rename = "puts")]
pub number_of_puts: u64,
#[serde(rename = "gets")]
pub number_of_gets: u64,
#[serde(rename = "avgLatency")]
pub avg_latency: u64,
#[serde(rename = "avgBandwidth")]
pub avg_bandwidth: u64,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct PHDNodeAggregate {
#[serde(rename = "nodeId")]
pub node_key: String,
#[serde(rename = "storedBytes")]
pub stored_bytes: i64,
#[serde(rename = "transferredBytes")]
pub transferred_bytes: u64,
#[serde(rename = "numberOfPuts")]
pub number_of_puts: u64,
#[serde(rename = "numberOfGets")]
pub number_of_gets: u64,
pub metrics: PHDNodeMetrics,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct PHDNodeMetrics {
latency: LatencyMetrics,
availability: AvailabilityMetrics,
bandwidth: BandwidthMetrics,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct LatencyMetrics {
#[serde(rename = "p50")]
p_50: u64,
#[serde(rename = "p95")]
p_95: u64,
#[serde(rename = "p99")]
p_99: u64,
#[serde(rename = "p999")]
p_999: u64,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct AvailabilityMetrics {
#[serde(rename = "totalRequests")]
total_requests: u64,
#[serde(rename = "failedRequests")]
failed_requests: u64,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct BandwidthMetrics {
#[serde(rename = "100kbps")]
kbps_100: u64,
#[serde(rename = "1Mbps")]
mbps_1: u64,
#[serde(rename = "10Mbps")]
mbps_10: u64,
#[serde(rename = "100Mbps")]
mbps_100: u64,
inf: u64,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Encode, Decode)]
pub struct PHDResponse {
#[serde(rename = "phdId")]
pub phd_id: String,
#[serde(rename = "NodesAggregates")]
pub nodes_aggregates: Vec<PHDNodeAggregate>,
#[serde(rename = "BucketsAggregates")]
pub buckets_aggregates: Vec<PHDBucketAggregate>,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct PHDBucketAggregate {
#[serde(rename = "bucketId")]
pub bucket_id: BucketId,
#[serde(rename = "storedBytes")]
pub stored_bytes: i64,
#[serde(rename = "transferredBytes")]
pub transferred_bytes: u64,
#[serde(rename = "numberOfPuts")]
pub number_of_puts: u64,
#[serde(rename = "numberOfGets")]
pub number_of_gets: u64,
#[serde(rename = "subAggregates")]
pub sub_aggregates: Vec<PHDBucketSubAggregate>,

// todo: remove from top-level bucket aggregation
#[serde(rename = "nodeId")]
pub node_key: String,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct PHDBucketSubAggregate {
#[serde(rename = "bucketId")]
pub bucket_id: BucketId,
#[serde(rename = "nodeId")]
pub node_key: String,
#[serde(rename = "storedBytes")]
pub stored_bytes: i64,
#[serde(rename = "transferredBytes")]
pub transferred_bytes: u64,
#[serde(rename = "numberOfPuts")]
pub number_of_puts: u64,
#[serde(rename = "numberOfGets")]
pub number_of_gets: u64,
}
}
54 changes: 52 additions & 2 deletions pallets/ddc-verification/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ pub mod pallet {
pub const BUCKETS_AGGREGATES_FETCH_BATCH_SIZE: usize = 100;
pub const NODES_AGGREGATES_FETCH_BATCH_SIZE: usize = 10;
pub const OCW_MUTEX_ID: &[u8] = b"inspection";
pub const EHD_PROTOTYPE: DdcEra = 123;
pub const EHD_PROTOTYPE_ID: DdcEra = 123;

/// Delta usage of a bucket includes only the delta usage for the processing era reported by
/// collectors. This usage can be verified of unverified by inspectors.
Expand Down Expand Up @@ -1200,7 +1200,16 @@ pub mod pallet {
verification_account: &Account<T>,
signer: &Signer<T, T::OffchainIdentifierId>,
) -> Result<(), Vec<OCWError>> {
let ehd = Self::fetch_era_historical_document(cluster_id, EHD_PROTOTYPE)?;
let ehd = Self::fetch_era_historical_document(cluster_id, EHD_PROTOTYPE_ID)?;
log::info!("------ EHD ------> {:?}", ehd);

let mut phds = vec![];
for phd_id in ehd.pdh_ids {
let phd = Self::fetch_partial_historical_document(cluster_id, phd_id)?;
phds.push(phd);
}

log::info!("------ PHDs ------> {:?}", phds);

Ok(())
}
Expand Down Expand Up @@ -1246,6 +1255,47 @@ pub mod pallet {
Err(vec![OCWError::FailedToFetchEHD])
}

/// Fetch PHD record.
///
/// Parameters:
/// - `cluster_id`: cluster id of a cluster
/// - `phd_id`: PHD identifier
pub(crate) fn fetch_partial_historical_document(
cluster_id: &ClusterId,
phd_id: String,
) -> Result<aggregator_client::json::PHDResponse, Vec<OCWError>> {
let collectors = Self::get_dac_nodes(cluster_id).map_err(|_| {
log::error!("❌ Error retrieving collectors for cluster {:?}", cluster_id);
vec![OCWError::FailedToFetchDacNodes]
})?;

for (collector_key, collector_params) in collectors {
if let Ok(host) = str::from_utf8(&collector_params.host) {
let base_url = format!("http://{}:{}", host, collector_params.http_port);
let client = aggregator_client::AggregatorClient::new(
&base_url,
Duration::from_millis(RESPONSE_TIMEOUT),
3,
false, // no response signature verification for now
);

if let Ok(phd) = client.partial_historical_document(phd_id.clone()) {
// proceed with the first available PHD record for the prototype
return Ok(phd);
} else {
log::warn!(
"Collector from cluster {:?} is unavailable while fetching PHD record or responded with unexpected body. Key: {:?} Host: {:?}",
cluster_id,
collector_key.get_hex(),
String::from_utf8(collector_params.host)
);
}
}
}

Err(vec![OCWError::FailedToFetchEHD])
}

pub(crate) fn start_validation_phase(
cluster_id: &ClusterId,
verification_account: &Account<T>,
Expand Down

0 comments on commit 6dc9788

Please sign in to comment.