Skip to content

Commit

Permalink
feat: message decode for publicPOI, versionUpgrade, and simple
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Jul 25, 2023
1 parent b3329ce commit 67638f5
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 134 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/gen-binaries.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ name: Build and upload release binaries
on:
release:
types: [published]
# push:
# branches: [dev]
push:
branches: [dev]

jobs:
build-linux:
Expand Down
12 changes: 1 addition & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,31 @@ version = "0.0.1"
edition = "2021"

[dependencies]
graphcast_sdk = { package = "graphcast-sdk", git = "https://github.com/graphops/graphcast-sdk", branch = "hope/error-type-strings" }
graphcast-sdk = "0.4.0"
anyhow = "1.0"
axum = { version = "0.5", features = ["headers"] }
async-graphql = "4.0.16"
async-graphql-axum = "4.0.16"
autometrics = { version = "0.3.3", features = ["prometheus-exporter"] }
chrono = "0.4"
clap = { version = "3.2.25", features = ["derive", "env"] }
derive-getters = "0.2.1"
dotenv = "0.15"
ethers = "2.0.4"
ethers-contract = "2.0.4"
ethers-core = "2.0.4"
ethers-derive-eip712 = "1.0.2"
graphql_client = "0.9.0"
hex = "0.4.3"
metrics = "0.20.1"
metrics-exporter-prometheus = "0.11.0"
num-bigint = "0.4.3"
num-traits = "0.2.15"
once_cell = "1.17"
opentelemetry = { version = "0.18.0", features = ["rt-tokio"] }
partial_application = "0.2.1"
prometheus = "0.13.3"
prost = "0.11"
regex = "1.8.1"
reqwest = { version = "0.11.17", features = ["json"] }
serde = { version = "1.0.163", features = ["rc", "derive"] }
serde_derive = "1.0"
serde_json = "1.0.96"
sha3 = "0.10.8"
sqlx = { version = "0.6.3", features = ["runtime-tokio-native-tls", "postgres", "uuid", "time", "json", "offline"] }
thiserror = "1.0.40"
tracing = "0.1"
tracing-opentelemetry = "0.18.0"
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"ansi",
Expand Down
84 changes: 13 additions & 71 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,15 @@ use graphcast_sdk::{
build_wallet,
callbook::CallBook,
graphcast_agent::{
message_typing::IdentityValidation, GraphcastAgent, GraphcastAgentConfig,
GraphcastAgentError,
},
graphql::{
client_network::query_network_subgraph, client_registry::query_registry, QueryError,
message_typing::IdentityValidation, GraphcastAgentConfig, GraphcastAgentError,
},
graphql::QueryError,
init_tracing, wallet_address,
};
use serde::{Deserialize, Serialize};
use tracing::info;

use crate::{active_allocation_hashes, syncing_deployment_hashes};
use crate::active_allocation_hashes;

#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default)]
pub enum CoverageLevel {
Expand Down Expand Up @@ -57,13 +54,6 @@ pub struct Config {
help = "Graph account corresponding to Graphcast operator"
)]
pub indexer_address: String,
#[clap(
long,
value_name = "ENDPOINT",
env = "GRAPH_NODE_STATUS_ENDPOINT",
help = "API endpoint to the Graph Node Status Endpoint"
)]
pub graph_node_endpoint: String,
#[clap(
long,
value_name = "KEY",
Expand Down Expand Up @@ -299,7 +289,7 @@ pub struct Config {
registered-indexer: must be registered at Graphcast Registry, correspond to and Indexer statisfying indexer minimum stake requirement, \n
indexer: must be registered at Graphcast Registry or is a Graph Account, correspond to and Indexer statisfying indexer minimum stake requirement"
)]
pub id_validation: Option<IdentityValidation>,
pub id_validation: IdentityValidation,
}

impl Config {
Expand Down Expand Up @@ -345,7 +335,8 @@ impl Config {
self.radio_name.clone(),
self.registry_subgraph.clone(),
self.network_subgraph.clone(),
self.graph_node_endpoint.clone(),
self.id_validation.clone(),
None,
Some(self.boot_node_addresses.clone()),
Some(self.graphcast_network.to_owned()),
Some(topics),
Expand All @@ -356,81 +347,32 @@ impl Config {
self.filter_protocol,
self.discv5_enrs.clone(),
self.discv5_port,
self.id_validation.clone(),
)
.await
}

pub async fn basic_info(&self) -> Result<(String, f32), QueryError> {
// Using unwrap directly as the query has been ran in the set-up validation
let wallet = build_wallet(
self.wallet_input()
.map_err(|e| QueryError::Other(e.into()))?,
)
.map_err(|e| QueryError::Other(e.into()))?;
// The query here must be Ok but so it is okay to panic here
// Alternatively, make validate_set_up return wallet, address, and stake
let my_address =
query_registry(self.registry_subgraph.to_string(), wallet_address(&wallet)).await?;
let my_stake =
query_network_subgraph(self.network_subgraph.to_string(), my_address.clone())
.await
.unwrap()
.indexer_stake();
info!(
my_address,
my_stake, "Initializing radio operator for indexer identity",
);
Ok((my_address, my_stake))
}

pub async fn create_graphcast_agent(&self) -> Result<GraphcastAgent, GraphcastAgentError> {
let config = self.to_graphcast_agent_config().await.unwrap();
GraphcastAgent::new(config).await
}

pub fn callbook(&self) -> CallBook {
CallBook::new(
self.graph_node_endpoint.clone(),
self.registry_subgraph.clone(),
self.network_subgraph.clone(),
None,
)
}

/// Generate a set of unique topics along with given static topics
#[autometrics]
pub async fn generate_topics(&self, indexer_address: String) -> Vec<String> {
pub async fn generate_topics(&self, indexer_address: &str) -> Vec<String> {
let static_topics = HashSet::from_iter(self.topics().to_vec());
let topics = match self.coverage {
CoverageLevel::Minimal => static_topics,
CoverageLevel::OnChain => {
let mut topics: HashSet<String> = active_allocation_hashes(
self.callbook().graph_network(),
indexer_address.clone(),
)
.await
.into_iter()
.collect();
topics.extend(static_topics);
topics
}
CoverageLevel::Comprehensive => {
let active_topics: HashSet<String> = active_allocation_hashes(
self.callbook().graph_network(),
indexer_address.clone(),
)
.await
.into_iter()
.collect();
let mut additional_topics: HashSet<String> =
syncing_deployment_hashes(self.graph_node_endpoint())
CoverageLevel::OnChain | CoverageLevel::Comprehensive => {
let mut topics: HashSet<String> =
active_allocation_hashes(self.callbook().graph_network(), indexer_address)
.await
.into_iter()
.collect();

additional_topics.extend(active_topics);
additional_topics.extend(static_topics);
additional_topics
topics.extend(static_topics);
topics
}
};
topics.into_iter().collect::<Vec<String>>()
Expand Down
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use graphcast_sdk::{

pub mod config;
pub mod db;
pub mod message_types;
pub mod metrics;
pub mod operator;
pub mod server;
Expand All @@ -39,9 +40,9 @@ pub fn radio_name() -> &'static str {
/// A vec of strings for subtopics
pub async fn active_allocation_hashes(
network_subgraph: &str,
indexer_address: String,
indexer_address: &str,
) -> Vec<String> {
query_network_subgraph(network_subgraph.to_string(), indexer_address)
query_network_subgraph(network_subgraph, indexer_address)
.await
.map(|result| result.indexer_allocations())
.unwrap_or_else(|e| {
Expand All @@ -56,7 +57,7 @@ pub async fn syncing_deployment_hashes(
graph_node_endpoint: &str,
// graphQL filter
) -> Vec<String> {
get_indexing_statuses(graph_node_endpoint.to_string())
get_indexing_statuses(graph_node_endpoint)
.await
.map_err(|e| -> Vec<String> {
error!(err = tracing::field::debug(&e), "Topic generation error");
Expand Down
2 changes: 0 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use dotenv::dotenv;
use graphcast_3la::{config::Config, operator::RadioOperator};

extern crate partial_application;

#[tokio::main]
async fn main() {
dotenv().ok();
Expand Down
82 changes: 82 additions & 0 deletions src/message_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use async_graphql::SimpleObject;
use ethers_contract::EthAbiType;
use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;
use prost::Message;
use serde::{Deserialize, Serialize};

#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, PartialEq, SimpleObject)]
#[eip712(
name = "PublicPoiMessage",
version = "0",
chain_id = 1,
verifying_contract = "0xc944e90c64b2c07662a292be6244bdf05cda44a7"
)]
pub struct PublicPoiMessage {
#[prost(string, tag = "1")]
pub identifier: String,
#[prost(string, tag = "2")]
pub content: String,
//TODO: see if timestamp that comes with waku message can be used
/// nonce cached to check against the next incoming message
#[prost(int64, tag = "3")]
pub nonce: i64,
/// blockchain relevant to the message
#[prost(string, tag = "4")]
pub network: String,
/// block relevant to the message
#[prost(uint64, tag = "5")]
pub block_number: u64,
/// block hash generated from the block number
#[prost(string, tag = "6")]
pub block_hash: String,
/// Graph account sender
#[prost(string, tag = "7")]
pub graph_account: String,
}

/// Make a test radio type
#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, SimpleObject)]
#[eip712(
name = "Graphcast Ping-Pong Radio",
version = "0",
chain_id = 1,
verifying_contract = "0xc944e90c64b2c07662a292be6244bdf05cda44a7"
)]
pub struct SimpleMessage {
#[prost(string, tag = "1")]
pub identifier: String,
#[prost(string, tag = "2")]
pub content: String,
}

#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, PartialEq, SimpleObject)]
#[eip712(
name = "VersionUpgradeMessage",
version = "0",
chain_id = 1,
verifying_contract = "0xc944e90c64b2c07662a292be6244bdf05cda44a7"
)]
pub struct VersionUpgradeMessage {
// identify through the current subgraph deployment
#[prost(string, tag = "1")]
pub identifier: String,
// new version of the subgraph has a new deployment hash
#[prost(string, tag = "2")]
pub new_hash: String,
/// subgraph id shared by both versions of the subgraph deployment
#[prost(string, tag = "6")]
pub subgraph_id: String,
/// nonce cached to check against the next incoming message
#[prost(int64, tag = "3")]
pub nonce: i64,
/// blockchain relevant to the message
#[prost(string, tag = "4")]
pub network: String,
/// estimated timestamp for the usage to switch to the new version
#[prost(int64, tag = "5")]
pub migrate_time: i64,
/// Graph account sender - expect the sender to be subgraph owner
#[prost(string, tag = "7")]
pub graph_account: String,
}
38 changes: 35 additions & 3 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use prometheus::{IntCounterVec, IntGauge, Opts};
use std::{net::SocketAddr, str::FromStr};
use tracing::{debug, info};

// Received (and validated) messages counter
/// Received (and validated) messages counter
#[allow(dead_code)]
pub static VALIDATED_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
let m = IntCounterVec::new(
Expand All @@ -23,7 +23,7 @@ pub static VALIDATED_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
m
});

// Received invalid messages counter
/// Received invalid messages counter
#[allow(dead_code)]
pub static INVALIDATED_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
let m = IntCounterVec::new(
Expand All @@ -38,7 +38,7 @@ pub static INVALIDATED_MESSAGES: Lazy<IntCounterVec> = Lazy::new(|| {
m
});

// Received (and validated) messages counter
/// Received (and validated) messages counter
#[allow(dead_code)]
pub static CACHED_MESSAGES: Lazy<IntGauge> = Lazy::new(|| {
let m = IntGauge::with_opts(
Expand All @@ -51,6 +51,37 @@ pub static CACHED_MESSAGES: Lazy<IntGauge> = Lazy::new(|| {
m
});

/// Number of active peers discoverable by 3la
/// Updated periodically for the recently received messages
#[allow(dead_code)]
pub static ACTIVE_PEERS: Lazy<IntGauge> = Lazy::new(|| {
let m = IntGauge::with_opts(
Opts::new(
"active_peers",
"Number of discoverable active peers on network",
)
.namespace("graphcast")
.subsystem("3la"),
)
.expect("Failed to create active_peers gauges");
prometheus::register(Box::new(m.clone())).expect("Failed to register active_peers guage");
m
});

// /// Number of content topics with traffic
// /// Updated periodically for the recently received messages
// #[allow(dead_code)]
// pub static ACTIVE_CONTENT_TOPICS: Lazy<IntGauge> = Lazy::new(|| {
// let m = IntGauge::with_opts(
// Opts::new("active_content_topics", "Number of content topics being gossiped on network")
// .namespace("graphcast")
// .subsystem("3la"),
// )
// .expect("Failed to create active_content_topics gauges");
// prometheus::register(Box::new(m.clone())).expect("Failed to register active_content_topics guage");
// m
// });

#[allow(dead_code)]
pub static REGISTRY: Lazy<prometheus::Registry> = Lazy::new(prometheus::Registry::new);

Expand All @@ -70,6 +101,7 @@ pub fn start_metrics() {
Box::new(VALIDATED_MESSAGES.clone()),
Box::new(INVALIDATED_MESSAGES.clone()),
Box::new(CACHED_MESSAGES.clone()),
Box::new(ACTIVE_PEERS.clone()),
],
);
}
Expand Down
Loading

0 comments on commit 67638f5

Please sign in to comment.