Skip to content

Commit

Permalink
add graph-indexed header (#5710)
Browse files Browse the repository at this point in the history
This adds a `graph-indexed` header to query responses. The header value contains the block hash, number, and timestamp for the most recently processed block in the subgraph. This avoids the need to rewrite all queries to include `_meta { block { hash number timestamp } }` in either the indexer-service or gateway.

Related: edgeandnode/gateway#900, graphprotocol/indexer-rs#494
  • Loading branch information
Theodus authored Nov 25, 2024
1 parent 1e7732c commit 4ff59df
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 9 deletions.
2 changes: 1 addition & 1 deletion graph/src/data/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod trace;
pub use self::cache_status::CacheStatus;
pub use self::error::{QueryError, QueryExecutionError};
pub use self::query::{Query, QueryTarget, QueryVariables};
pub use self::result::{QueryResult, QueryResults};
pub use self::result::{LatestBlockInfo, QueryResult, QueryResults};
pub use self::trace::Trace;
33 changes: 28 additions & 5 deletions graph/src/data/query/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::server::query::ServerResponse;
use crate::data::value::Object;
use crate::derive::CacheWeight;
use crate::prelude::{r, CacheWeight, DeploymentHash};
use crate::prelude::{r, BlockHash, BlockNumber, CacheWeight, DeploymentHash};
use http_body_util::Full;
use hyper::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
Expand Down Expand Up @@ -48,20 +48,37 @@ where
ser.end()
}

fn serialize_block_hash<S>(data: &BlockHash, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&data.to_string())
}

pub type Data = Object;

#[derive(Debug)]
/// A collection of query results that is serialized as a single result.
pub struct QueryResults {
results: Vec<Arc<QueryResult>>,
pub trace: Trace,
pub indexed_block: Option<LatestBlockInfo>,
}

#[derive(Debug, Serialize)]
pub struct LatestBlockInfo {
#[serde(serialize_with = "serialize_block_hash")]
pub hash: BlockHash,
pub number: BlockNumber,
pub timestamp: Option<u64>,
}

impl QueryResults {
pub fn empty(trace: Trace) -> Self {
pub fn empty(trace: Trace, indexed_block: Option<LatestBlockInfo>) -> Self {
QueryResults {
results: Vec::new(),
trace,
indexed_block,
}
}

Expand Down Expand Up @@ -155,6 +172,7 @@ impl From<Data> for QueryResults {
QueryResults {
results: vec![Arc::new(x.into())],
trace: Trace::None,
indexed_block: None,
}
}
}
Expand All @@ -164,6 +182,7 @@ impl From<QueryResult> for QueryResults {
QueryResults {
results: vec![Arc::new(x)],
trace: Trace::None,
indexed_block: None,
}
}
}
Expand All @@ -173,6 +192,7 @@ impl From<Arc<QueryResult>> for QueryResults {
QueryResults {
results: vec![x],
trace: Trace::None,
indexed_block: None,
}
}
}
Expand All @@ -182,6 +202,7 @@ impl From<QueryExecutionError> for QueryResults {
QueryResults {
results: vec![Arc::new(x.into())],
trace: Trace::None,
indexed_block: None,
}
}
}
Expand All @@ -191,6 +212,7 @@ impl From<Vec<QueryExecutionError>> for QueryResults {
QueryResults {
results: vec![Arc::new(x.into())],
trace: Trace::None,
indexed_block: None,
}
}
}
Expand All @@ -205,14 +227,16 @@ impl QueryResults {
pub fn as_http_response(&self) -> ServerResponse {
let json = serde_json::to_string(&self).unwrap();
let attestable = self.results.iter().all(|r| r.is_attestable());
let indexed_block = serde_json::to_string(&self.indexed_block).unwrap();
Response::builder()
.status(200)
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(CONTENT_TYPE, "application/json")
.header(ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type, User-Agent")
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS, POST")
.header(CONTENT_TYPE, "application/json")
.header("Graph-Attestable", attestable.to_string())
.header("graph-attestable", attestable.to_string())
.header("graph-indexed", indexed_block)
.body(Full::from(json))
.unwrap()
}
Expand Down Expand Up @@ -386,8 +410,7 @@ fn multiple_data_items() {
let obj1 = make_obj("key1", "value1");
let obj2 = make_obj("key2", "value2");

let trace = Trace::None;
let mut res = QueryResults::empty(trace);
let mut res = QueryResults::empty(Trace::None, None);
res.append(obj1, CacheStatus::default());
res.append(obj2, CacheStatus::default());

Expand Down
19 changes: 17 additions & 2 deletions graphql/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use graph::{
};
use graph::{data::graphql::load_manager::LoadManager, prelude::QueryStoreManager};
use graph::{
data::query::{QueryResults, QueryTarget},
data::query::{LatestBlockInfo, QueryResults, QueryTarget},
prelude::QueryStore,
};

Expand Down Expand Up @@ -117,6 +117,20 @@ where
let network = Some(store.network_name().to_string());
let schema = store.api_schema()?;

let latest_block = match store.block_ptr().await.ok().flatten() {
Some(block) => Some(LatestBlockInfo {
timestamp: store
.block_number_with_timestamp_and_parent_hash(&block.hash)
.await
.ok()
.flatten()
.and_then(|(_, t, _)| t),
hash: block.hash,
number: block.number,
}),
None => None,
};

// Test only, see c435c25decbc4ad7bbbadf8e0ced0ff2
#[cfg(debug_assertions)]
let state = INITIAL_DEPLOYMENT_STATE_FOR_TESTS
Expand Down Expand Up @@ -148,7 +162,8 @@ where
let by_block_constraint =
StoreResolver::locate_blocks(store.as_ref(), &state, &query).await?;
let mut max_block = 0;
let mut result: QueryResults = QueryResults::empty(query.root_trace(do_trace));
let mut result: QueryResults =
QueryResults::empty(query.root_trace(do_trace), latest_block);
let mut query_res_futures: Vec<_> = vec![];
let setup_elapsed = execute_start.elapsed();

Expand Down
2 changes: 1 addition & 1 deletion store/test-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ async fn execute_subgraph_query_internal(
100,
graphql_metrics(),
));
let mut result = QueryResults::empty(query.root_trace(trace));
let mut result = QueryResults::empty(query.root_trace(trace), None);
let deployment = query.schema.id().clone();
let store = STORE
.clone()
Expand Down

0 comments on commit 4ff59df

Please sign in to comment.