Skip to content

Commit

Permalink
feat: aggregate metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger committed Mar 27, 2024
1 parent 92e0f0a commit ab4dd49
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 11 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions migrations/20240325124203_aggregates.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS aggregates;
5 changes: 5 additions & 0 deletions migrations/20240325124203_aggregates.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS aggregates
(
timestamp BIGINT PRIMARY KEY,
data JSONB NOT NULL
);
57 changes: 52 additions & 5 deletions src/db/resolver.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_graphql::{OutputType, SimpleObject};
use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::{postgres::PgQueryResult, types::Json, FromRow, PgPool, Row as SqliteRow};
use std::ops::Deref;
use tracing::trace;
Expand All @@ -21,11 +21,11 @@ pub struct MessageID {
}

#[allow(dead_code)]
#[derive(FromRow, SimpleObject, Serialize, Debug, Clone)]
#[derive(FromRow, SimpleObject, Serialize, Deserialize, Debug, Clone)]
pub struct IndexerStats {
graph_account: String,
message_count: i64,
subgraphs_count: i64,
pub graph_account: String,
pub message_count: i64,
pub subgraphs_count: i64,
}

// Define graphql type for the Row in Messages
Expand Down Expand Up @@ -352,6 +352,53 @@ pub async fn get_indexer_stats(
Ok(stats)
}

pub async fn insert_aggregate(
pool: &PgPool,
timestamp: i64,
data: Vec<IndexerStats>,
) -> anyhow::Result<()> {
let data_json = serde_json::to_value(data).unwrap();
let _ = sqlx::query!(
"INSERT INTO aggregates (timestamp, data) VALUES ($1, $2) ON CONFLICT (timestamp) DO NOTHING",
timestamp,
data_json
)
.execute(pool)
.await?;

Ok(())
}

pub async fn fetch_aggregates(
pool: &PgPool,
since_timestamp: i64,
) -> Result<Vec<IndexerStats>, anyhow::Error> {
let aggregates = sqlx::query_as!(
Aggregate,
"SELECT timestamp, data FROM aggregates WHERE timestamp > $1",
since_timestamp
)
.fetch_all(pool)
.await
.map_err(|e| anyhow::Error::new(e))?;

let results = aggregates
.into_iter()
.flat_map(|agg| {
serde_json::from_value::<Vec<IndexerStats>>(agg.data).unwrap_or_else(|_| vec![])
})
.collect();

Ok(results)
}

#[allow(dead_code)]
#[derive(sqlx::FromRow, Debug)]
struct Aggregate {
timestamp: i64,
data: serde_json::Value,
}

#[cfg(test)]
mod tests {
use crate::message_types::PublicPoiMessage;
Expand Down
29 changes: 27 additions & 2 deletions src/operator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::anyhow;
use chrono::Utc;
use graphcast_sdk::WakuMessage;
use sqlx::postgres::PgPoolOptions;
use sqlx::{Pool, Postgres};
Expand All @@ -9,11 +10,13 @@ use std::thread::{self, JoinHandle};
use std::time::Duration;
use tokio::runtime::Runtime;
use tokio::time::{interval, sleep, timeout};
use tracing::{debug, info, trace, warn};
use tracing::{debug, error, info, trace, warn};

use graphcast_sdk::graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent};

use crate::db::resolver::{count_messages, prune_old_messages, retain_max_storage};
use crate::db::resolver::{
count_messages, get_indexer_stats, insert_aggregate, prune_old_messages, retain_max_storage,
};
use crate::metrics::{CONNECTED_PEERS, GOSSIP_PEERS, PRUNED_MESSAGES, RECEIVED_MESSAGES};
use crate::{
config::Config,
Expand Down Expand Up @@ -110,6 +113,7 @@ impl RadioOperator {

let mut network_update_interval = interval(Duration::from_secs(600));
let mut summary_interval = interval(Duration::from_secs(180));
let mut daily_aggregate_interval = interval(Duration::from_secs(86400)); // 24 hours

let iteration_timeout = Duration::from_secs(180);
let update_timeout = Duration::from_secs(5);
Expand Down Expand Up @@ -214,6 +218,27 @@ impl RadioOperator {
}
}
},
_ = daily_aggregate_interval.tick() => {
if skip_iteration.load(Ordering::SeqCst) {
skip_iteration.store(false, Ordering::SeqCst);
continue;
}

let pool = &self.db;
let from_timestamp = (Utc::now() - Duration::from_secs(86400)).timestamp();

match get_indexer_stats(pool, None, from_timestamp).await {
Ok(stats) => {
error!("{:?}", stats);
match insert_aggregate(pool, Utc::now().timestamp(), stats).await {
Ok(_) => warn!("Successfully inserted daily aggregate."),
Err(e) => warn!("Failed to insert daily aggregate: {:?}", e),
}
},
Err(e) => warn!("Failed to fetch indexer stats: {:?}", e),
}
},

else => break,
}

Expand Down
82 changes: 78 additions & 4 deletions src/server/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use async_graphql::{Context, EmptySubscription, Object, OutputType, Schema, SimpleObject};

use chrono::Utc;
use serde::{de::DeserializeOwned, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use sqlx::{Pool, Postgres};
use std::{sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tracing::error;

use crate::{
config::Config,
db::resolver::{
delete_message_all, delete_message_by_id, get_indexer_stats, list_active_indexers,
list_messages, list_rows, message_by_id, IndexerStats,
delete_message_all, delete_message_by_id, fetch_aggregates, get_indexer_stats,
list_active_indexers, list_messages, list_rows, message_by_id, IndexerStats,
},
operator::radio_types::RadioPayloadMessage,
};
Expand All @@ -35,6 +40,20 @@ impl RadioContext {
}
}

#[derive(Serialize, SimpleObject)]
pub struct Summary {
active_indexers: Vec<String>,
total_message_count: HashMap<String, i64>,
average_subgraphs_count: HashMap<String, f64>,
}

#[derive(Serialize, Deserialize, Clone)]
pub struct AggregatedIndexerStats {
graph_account: String,
message_count: i64,
subgraphs_count: i64,
}

// Unified query object for resolvers
#[derive(Default)]
pub struct QueryRoot;
Expand Down Expand Up @@ -127,6 +146,61 @@ impl QueryRoot {
message_by_id(pool, id).await?.get_message();
Ok(msg)
}

async fn get_indexer_summary(
&self,
ctx: &Context<'_>,
days: i32,
) -> Result<Summary, HttpServiceError> {
let pool = ctx.data_unchecked::<Pool<Postgres>>();

let since_timestamp =
(Utc::now() - chrono::Duration::try_days(days.into()).unwrap()).timestamp();
let aggregates = fetch_aggregates(pool, since_timestamp)
.await
.map_err(|e| HttpServiceError::Others(e.into()))?;

let mut active_indexers_set = HashSet::new();
let mut total_message_count: HashMap<String, i64> = HashMap::new();
let mut total_subgraphs_count: HashMap<String, i64> = HashMap::new();

let mut subgraphs_counts = HashMap::new();

for stat in aggregates {
active_indexers_set.insert(stat.graph_account.clone());
*total_message_count
.entry(stat.graph_account.clone())
.or_default() += stat.message_count;
*total_subgraphs_count
.entry(stat.graph_account.clone())
.or_default() += stat.subgraphs_count;
subgraphs_counts
.entry(stat.graph_account.clone())
.or_insert_with(Vec::new)
.push(stat.subgraphs_count);
}

let average_subgraphs_count: HashMap<String, f64> = total_subgraphs_count
.iter()
.map(|(key, &total_count)| {
let count = subgraphs_counts.get(key).map_or(1, |counts| counts.len());
(
key.clone(),
if count > 0 {
total_count as f64 / count as f64
} else {
0.0
},
)
})
.collect();

Ok(Summary {
active_indexers: active_indexers_set.into_iter().collect(),
total_message_count,
average_subgraphs_count,
})
}
}

// Unified query object for resolvers
Expand Down

0 comments on commit ab4dd49

Please sign in to comment.