Skip to content

Commit

Permalink
feat: fetching EHD for DAC v5 prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
yahortsaryk committed Dec 24, 2024
1 parent 423844b commit 65714d5
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 9 deletions.
92 changes: 92 additions & 0 deletions pallets/ddc-verification/src/aggregator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ impl<'a> AggregatorClient<'a> {
}
}

pub fn era_historical_document(
&self,
era_id: DdcEra,
) -> Result<json::EHDResponse, http::Error> {
let mut url = format!("{}/activity/ehd/{}", self.base_url, era_id);
if self.verify_sig {
url = format!("{}?sign=true", url);
}
let response = self.get(&url, Accept::Any)?;

let body = response.body().collect::<Vec<u8>>();
if self.verify_sig {
let json_response: json::SignedJsonResponse<json::EHDResponse> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

if !json_response.verify() {
log::debug!("bad signature, req: {:?}, resp: {:?}", url, json_response);
return Err(http::Error::Unknown); // TODO (khssnv): more specific error.
}

Ok(json_response.payload)
} else {
let json_response: json::EHDResponse =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
}
}

pub fn traverse_bucket_sub_aggregate(
&self,
era_id: DdcEra,
Expand Down Expand Up @@ -530,4 +559,67 @@ pub(crate) mod json {
#[serde_as(as = "Base64")]
pub signature: Vec<u8>,
}

#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Encode, Decode)]
#[allow(non_snake_case)]
pub struct EHDResponse {
pub ehdId: String,
pub pdhIds: Vec<String>,
pub status: String,
pub customers: Vec<EHDCustomer>,
pub providers: Vec<EHDProvider>,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDUsage {
/// Total amount of stored bytes.
pub storedBytes: i64,
/// Total amount of transferred bytes.
pub transferredBytes: u64,
/// Total number of puts.
pub puts: u64,
/// Total number of gets.
pub gets: u64,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDCustomer {
pub customerId: String,
pub consumedUsage: EHDUsage,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDProvider {
pub providerId: String,
pub providedUsage: EHDUsage,
pub nodes: Vec<EHDNodeResources>,
}

#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
#[allow(non_snake_case)]
pub struct EHDNodeResources {
/// Node Key.
pub nodeId: String,
/// Total amount of stored bytes.
pub storedBytes: i64,
/// Total amount of transferred bytes.
pub transferredBytes: u64,
/// Total number of puts.
pub puts: u64,
/// Total number of gets.
pub gets: u64,
pub avgLatency: u64,
pub avgBandwidth: u64,
}
}
88 changes: 80 additions & 8 deletions pallets/ddc-verification/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub mod pallet {
pub const BUCKETS_AGGREGATES_FETCH_BATCH_SIZE: usize = 100;
pub const NODES_AGGREGATES_FETCH_BATCH_SIZE: usize = 10;
pub const OCW_MUTEX_ID: &[u8] = b"inspection";
pub const EHD_PROTOTYPE: DdcEra = 123;

/// Delta usage of a bucket includes only the delta usage for the processing era reported by
/// collectors. This usage can be verified of unverified by inspectors.
Expand Down Expand Up @@ -424,6 +425,7 @@ pub mod pallet {
EmptyConsistentGroup,
FailedToFetchVerifiedDeltaUsage,
FailedToFetchVerifiedPayableUsage,
FailedToFetchEHD,
}

/// Consensus Errors
Expand Down Expand Up @@ -548,6 +550,7 @@ pub mod pallet {
EmptyConsistentGroup,
FailedToFetchVerifiedDeltaUsage,
FailedToFetchVerifiedPayableUsage,
FailedToFetchEHD,
}

#[pallet::error]
Expand Down Expand Up @@ -868,17 +871,32 @@ pub mod pallet {
for cluster_id in clusters_ids {
let mut errors: Vec<OCWError> = Vec::new();

let validation_result =
Self::start_validation_phase(&cluster_id, &verification_account, &signer);
//
// ========= DAC v4 =========
//

if let Err(errs) = validation_result {
errors.extend(errs);
}
// let validation_result =
// Self::start_validation_phase(&cluster_id, &verification_account, &signer);

// if let Err(errs) = validation_result {
// errors.extend(errs);
// }

// let payouts_result =
// Self::start_payouts_phase(&cluster_id, &verification_account, &signer);

// if let Err(errs) = payouts_result {
// errors.extend(errs);
// }

let payouts_result =
Self::start_payouts_phase(&cluster_id, &verification_account, &signer);
//
// ========= DAC v5 =========
//

if let Err(errs) = payouts_result {
let inspection_result =
Self::start_inspection_phase(&cluster_id, &verification_account, &signer);

if let Err(errs) = inspection_result {
errors.extend(errs);
}

Expand Down Expand Up @@ -1177,6 +1195,57 @@ pub mod pallet {
)))
}

pub(crate) fn start_inspection_phase(
cluster_id: &ClusterId,
verification_account: &Account<T>,
signer: &Signer<T, T::OffchainIdentifierId>,
) -> Result<(), Vec<OCWError>> {
let ehd = Self::fetch_era_historical_document(cluster_id, EHD_PROTOTYPE)?;

Ok(())
}

/// Fetch EHD record.
///
/// Parameters:
/// - `cluster_id`: cluster id of a cluster
/// - `era_id`: era id
pub(crate) fn fetch_era_historical_document(
cluster_id: &ClusterId,
era_id: DdcEra,
) -> Result<aggregator_client::json::EHDResponse, Vec<OCWError>> {
let collectors = Self::get_dac_nodes(cluster_id).map_err(|_| {
log::error!("❌ Error retrieving collectors for cluster {:?}", cluster_id);
vec![OCWError::FailedToFetchDacNodes]
})?;

for (collector_key, collector_params) in collectors {
if let Ok(host) = str::from_utf8(&collector_params.host) {
let base_url = format!("http://{}:{}", host, collector_params.http_port);
let client = aggregator_client::AggregatorClient::new(
&base_url,
Duration::from_millis(RESPONSE_TIMEOUT),
3,
false, // no response signature verification for now
);

if let Ok(ehd) = client.era_historical_document(era_id) {
// proceed with the first available EHD record for the prototype
return Ok(ehd);
} else {
log::warn!(
"Collector from cluster {:?} is unavailable while fetching EHD record or responded with unexpected body. Key: {:?} Host: {:?}",
cluster_id,
collector_key.get_hex(),
String::from_utf8(collector_params.host)
);
}
}
}

Err(vec![OCWError::FailedToFetchEHD])
}

pub(crate) fn start_validation_phase(
cluster_id: &ClusterId,
verification_account: &Account<T>,
Expand Down Expand Up @@ -4123,6 +4192,9 @@ pub mod pallet {
OCWError::FailedToFetchVerifiedPayableUsage => {
Self::deposit_event(Event::FailedToFetchVerifiedPayableUsage);
},
OCWError::FailedToFetchEHD => {
Self::deposit_event(Event::FailedToFetchEHD);
},
}
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/cere-dev/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1331,7 +1331,7 @@ impl pallet_ddc_verification::Config for Runtime {
type CustomerVisitor = pallet_ddc_customers::Pallet<Runtime>;
const MAX_MERKLE_NODE_IDENTIFIER: u16 = 3;
type Currency = Balances;
const VERIFY_AGGREGATOR_RESPONSE_SIGNATURE: bool = true;
const VERIFY_AGGREGATOR_RESPONSE_SIGNATURE: bool = false;
type BucketsStorageUsageProvider = DdcCustomers;
type NodesStorageUsageProvider = DdcNodes;
#[cfg(feature = "runtime-benchmarks")]
Expand Down

0 comments on commit 65714d5

Please sign in to comment.