Skip to content

Commit

Permalink
Add config option for cache stores (#5716)
Browse files Browse the repository at this point in the history
  • Loading branch information
encalypto authored Nov 25, 2024
1 parent 4ff59df commit 7164866
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 17 deletions.
6 changes: 5 additions & 1 deletion chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ where
logger,
);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(
key,
entity,
Some(&mut state.write_capacity_remaining),
)?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,7 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi)
entity_cache.set(key, poi, None)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");
Expand Down
22 changes: 20 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::components::store::{self as s, Entity, EntityOperation};
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::prelude::ENV_VARS;
use crate::prelude::{CacheWeight, ENV_VARS};
use crate::schema::{EntityKey, InputSchema};
use crate::util::intern::Error as InternError;
use crate::util::lfu_cache::{EvictStats, LfuCache};
Expand Down Expand Up @@ -349,10 +349,28 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
pub fn set(
&mut self,
key: EntityKey,
entity: Entity,
write_capacity_remaining: Option<&mut usize>,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

if let Some(write_capacity_remaining) = write_capacity_remaining {
let weight = entity.weight();

if !self.current.contains_key(&key) && weight > *write_capacity_remaining {
return Err(anyhow!(
"exceeded block write limit when writing entity `{}`",
key.entity_id,
));
}

*write_capacity_remaining -= weight;
}

self.entity_op(key.clone(), EntityOp::Update(entity));

// The updates we were given are not valid by themselves; force a
Expand Down
8 changes: 7 additions & 1 deletion graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct BlockState {
in_handler: bool,

pub metrics: BlockStateMetrics,

pub write_capacity_remaining: usize,
}

impl BlockState {
Expand All @@ -94,6 +96,7 @@ impl BlockState {
processed_data_sources: Vec::new(),
in_handler: false,
metrics: BlockStateMetrics::new(),
write_capacity_remaining: ENV_VARS.block_write_capacity,
}
}
}
Expand All @@ -111,6 +114,7 @@ impl BlockState {
processed_data_sources,
in_handler,
metrics,
write_capacity_remaining,
} = self;

match in_handler {
Expand All @@ -121,7 +125,9 @@ impl BlockState {
entity_cache.extend(other.entity_cache);
processed_data_sources.extend(other.processed_data_sources);
persisted_data_sources.extend(other.persisted_data_sources);
metrics.extend(other.metrics)
metrics.extend(other.metrics);
*write_capacity_remaining =
write_capacity_remaining.saturating_sub(other.write_capacity_remaining);
}

pub fn has_errors(&self) -> bool {
Expand Down
5 changes: 5 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ pub struct EnvVars {
///
/// Defaults to an empty list, which means that this feature is enabled for all chains;
pub firehose_disable_extended_blocks_for_chains: Vec<String>,

pub block_write_capacity: usize,
}

impl EnvVars {
Expand Down Expand Up @@ -327,6 +329,7 @@ impl EnvVars {
Self::firehose_disable_extended_blocks_for_chains(
inner.firehose_disable_extended_blocks_for_chains,
),
block_write_capacity: inner.block_write_capacity.0,
})
}

Expand Down Expand Up @@ -488,6 +491,8 @@ struct Inner {
graphman_server_auth_token: Option<String>,
#[envconfig(from = "GRAPH_NODE_FIREHOSE_DISABLE_EXTENDED_BLOCKS_FOR_CHAINS")]
firehose_disable_extended_blocks_for_chains: Option<String>,
#[envconfig(from = "GRAPH_NODE_BLOCK_WRITE_CAPACITY", default = "4_000_000_000")]
block_write_capacity: NoUnderscores<usize>,
}

#[derive(Clone, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.entity_cache.set(key, entity)?;
state
.entity_cache
.set(key, entity, Some(&mut state.write_capacity_remaining))?;

Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,7 @@ impl DeploymentStore {
}
}

/// Methods that back the trait `graph::components::Store`, but have small
/// variations in their signatures
/// Methods that back the trait `WritableStore`, but have small variations in their signatures
impl DeploymentStore {
pub(crate) async fn block_ptr(&self, site: Arc<Site>) -> Result<Option<BlockPtr>, StoreError> {
let site = site.cheap_clone();
Expand Down
22 changes: 13 additions & 9 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,14 @@ fn insert_modifications() {

let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" };
let mogwai_key = make_band_key("mogwai");
cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap();
cache
.set(mogwai_key.clone(), mogwai_data.clone(), None)
.unwrap();

let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" };
let sigurros_key = make_band_key("sigurros");
cache
.set(sigurros_key.clone(), sigurros_data.clone())
.set(sigurros_key.clone(), sigurros_data.clone(), None)
.unwrap();

let result = cache.as_modifications(0);
Expand Down Expand Up @@ -253,12 +255,14 @@ fn overwrite_modifications() {

let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 };
let mogwai_key = make_band_key("mogwai");
cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap();
cache
.set(mogwai_key.clone(), mogwai_data.clone(), None)
.unwrap();

let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994 };
let sigurros_key = make_band_key("sigurros");
cache
.set(sigurros_key.clone(), sigurros_data.clone())
.set(sigurros_key.clone(), sigurros_data.clone(), None)
.unwrap();

let result = cache.as_modifications(0);
Expand Down Expand Up @@ -289,12 +293,12 @@ fn consecutive_modifications() {
let update_data =
entity! { SCHEMA => id: "mogwai", founded: 1995, label: "Rock Action Records" };
let update_key = make_band_key("mogwai");
cache.set(update_key, update_data).unwrap();
cache.set(update_key, update_data, None).unwrap();

// Then, just reset the "label".
let update_data = entity! { SCHEMA => id: "mogwai", label: Value::Null };
let update_key = make_band_key("mogwai");
cache.set(update_key.clone(), update_data).unwrap();
cache.set(update_key.clone(), update_data, None).unwrap();

// We expect a single overwrite modification for the above that leaves "id"
// and "name" untouched, sets "founded" and removes the "label" field.
Expand Down Expand Up @@ -715,7 +719,7 @@ fn scoped_get() {
let account5 = ACCOUNT_TYPE.parse_id("5").unwrap();
let wallet5 = create_wallet_entity("5", &account5, 100);
let key5 = WALLET_TYPE.parse_key("5").unwrap();
cache.set(key5.clone(), wallet5.clone()).unwrap();
cache.set(key5.clone(), wallet5.clone(), None).unwrap();

// For the new entity, we can retrieve it with either scope
let act5 = cache.get(&key5, GetScope::InBlock).unwrap();
Expand All @@ -736,7 +740,7 @@ fn scoped_get() {
// But if it gets updated, it becomes visible with either scope
let mut wallet1 = wallet1;
wallet1.set("balance", 70).unwrap();
cache.set(key1.clone(), wallet1.clone()).unwrap();
cache.set(key1.clone(), wallet1.clone(), None).unwrap();
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref()));
let act1 = cache.get(&key1, GetScope::Store).unwrap();
Expand Down Expand Up @@ -783,6 +787,6 @@ fn no_interface_mods() {

let entity = entity! { LOAD_RELATED_SUBGRAPH => id: "1", balance: 100 };

cache.set(key, entity).unwrap_err();
cache.set(key, entity, None).unwrap_err();
})
}

0 comments on commit 7164866

Please sign in to comment.