diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 2b47e4e57b8..ed7016216d5 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -225,7 +225,11 @@ where logger, ); - state.entity_cache.set(key, entity)?; + state.entity_cache.set( + key, + entity, + Some(&mut state.write_capacity_remaining), + )?; } ParsedChanges::Delete(entity_key) => { let entity_type = entity_key.entity_type.cheap_clone(); diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index a0269863614..fe1ac3a020b 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -1646,7 +1646,7 @@ async fn update_proof_of_indexing( data.push((entity_cache.schema.poi_block_time(), block_time)); } let poi = entity_cache.make_entity(data)?; - entity_cache.set(key, poi) + entity_cache.set(key, poi, None) } let _section_guard = stopwatch.start_section("update_proof_of_indexing"); diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index dfaae80f76a..a34767d54d2 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -8,7 +8,7 @@ use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; use crate::components::store::{self as s, Entity, EntityOperation}; use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator}; -use crate::prelude::ENV_VARS; +use crate::prelude::{CacheWeight, ENV_VARS}; use crate::schema::{EntityKey, InputSchema}; use crate::util::intern::Error as InternError; use crate::util::lfu_cache::{EvictStats, LfuCache}; @@ -349,10 +349,28 @@ impl EntityCache { /// with existing data. The entity will be validated against the /// subgraph schema, and any errors will result in an `Err` being /// returned. - pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> { + pub fn set( + &mut self, + key: EntityKey, + entity: Entity, + write_capacity_remaining: Option<&mut usize>, + ) -> Result<(), anyhow::Error> { // check the validate for derived fields let is_valid = entity.validate(&key).is_ok(); + if let Some(write_capacity_remaining) = write_capacity_remaining { + let weight = entity.weight(); + + if !self.current.contains_key(&key) && weight > *write_capacity_remaining { + return Err(anyhow!( + "exceeded block write limit when writing entity `{}`", + key.entity_id, + )); + } + + *write_capacity_remaining -= weight; + } + self.entity_op(key.clone(), EntityOp::Update(entity)); // The updates we were given are not valid by themselves; force a diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 470e50334d3..5609b2ac8f4 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -81,6 +81,8 @@ pub struct BlockState { in_handler: bool, pub metrics: BlockStateMetrics, + + pub write_capacity_remaining: usize, } impl BlockState { @@ -94,6 +96,7 @@ impl BlockState { processed_data_sources: Vec::new(), in_handler: false, metrics: BlockStateMetrics::new(), + write_capacity_remaining: ENV_VARS.block_write_capacity, } } } @@ -111,6 +114,7 @@ impl BlockState { processed_data_sources, in_handler, metrics, + write_capacity_remaining, } = self; match in_handler { @@ -121,7 +125,9 @@ impl BlockState { entity_cache.extend(other.entity_cache); processed_data_sources.extend(other.processed_data_sources); persisted_data_sources.extend(other.persisted_data_sources); - metrics.extend(other.metrics) + metrics.extend(other.metrics); + *write_capacity_remaining = + write_capacity_remaining.saturating_sub(other.write_capacity_remaining); } pub fn has_errors(&self) -> bool { diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index f1533afad99..b97e44ef9a1 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -238,6 +238,8 @@ pub struct EnvVars { /// /// Defaults to an empty list, which means that this feature is enabled for all chains; pub firehose_disable_extended_blocks_for_chains: Vec, + + pub block_write_capacity: usize, } impl EnvVars { @@ -327,6 +329,7 @@ impl EnvVars { Self::firehose_disable_extended_blocks_for_chains( inner.firehose_disable_extended_blocks_for_chains, ), + block_write_capacity: inner.block_write_capacity.0, }) } @@ -488,6 +491,8 @@ struct Inner { graphman_server_auth_token: Option, #[envconfig(from = "GRAPH_NODE_FIREHOSE_DISABLE_EXTENDED_BLOCKS_FOR_CHAINS")] firehose_disable_extended_blocks_for_chains: Option, + #[envconfig(from = "GRAPH_NODE_BLOCK_WRITE_CAPACITY", default = "4_000_000_000")] + block_write_capacity: NoUnderscores, } #[derive(Clone, Debug)] diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 4d050db23de..a793f5b7cc6 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -350,7 +350,9 @@ impl HostExports { state.metrics.track_entity_write(&entity_type, &entity); - state.entity_cache.set(key, entity)?; + state + .entity_cache + .set(key, entity, Some(&mut state.write_capacity_remaining))?; Ok(()) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index def46ce9244..f5b2825f63f 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -874,8 +874,7 @@ impl DeploymentStore { } } -/// Methods that back the trait `graph::components::Store`, but have small -/// variations in their signatures +/// Methods that back the trait `WritableStore`, but have small variations in their signatures impl DeploymentStore { pub(crate) async fn block_ptr(&self, site: Arc) -> Result, StoreError> { let site = site.cheap_clone(); diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index b90283f6c93..2f41a006172 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -209,12 +209,14 @@ fn insert_modifications() { let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" }; let mogwai_key = make_band_key("mogwai"); - cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap(); + cache + .set(mogwai_key.clone(), mogwai_data.clone(), None) + .unwrap(); let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" }; let sigurros_key = make_band_key("sigurros"); cache - .set(sigurros_key.clone(), sigurros_data.clone()) + .set(sigurros_key.clone(), sigurros_data.clone(), None) .unwrap(); let result = cache.as_modifications(0); @@ -253,12 +255,14 @@ fn overwrite_modifications() { let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 }; let mogwai_key = make_band_key("mogwai"); - cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap(); + cache + .set(mogwai_key.clone(), mogwai_data.clone(), None) + .unwrap(); let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994 }; let sigurros_key = make_band_key("sigurros"); cache - .set(sigurros_key.clone(), sigurros_data.clone()) + .set(sigurros_key.clone(), sigurros_data.clone(), None) .unwrap(); let result = cache.as_modifications(0); @@ -289,12 +293,12 @@ fn consecutive_modifications() { let update_data = entity! { SCHEMA => id: "mogwai", founded: 1995, label: "Rock Action Records" }; let update_key = make_band_key("mogwai"); - cache.set(update_key, update_data).unwrap(); + cache.set(update_key, update_data, None).unwrap(); // Then, just reset the "label". let update_data = entity! { SCHEMA => id: "mogwai", label: Value::Null }; let update_key = make_band_key("mogwai"); - cache.set(update_key.clone(), update_data).unwrap(); + cache.set(update_key.clone(), update_data, None).unwrap(); // We expect a single overwrite modification for the above that leaves "id" // and "name" untouched, sets "founded" and removes the "label" field. @@ -715,7 +719,7 @@ fn scoped_get() { let account5 = ACCOUNT_TYPE.parse_id("5").unwrap(); let wallet5 = create_wallet_entity("5", &account5, 100); let key5 = WALLET_TYPE.parse_key("5").unwrap(); - cache.set(key5.clone(), wallet5.clone()).unwrap(); + cache.set(key5.clone(), wallet5.clone(), None).unwrap(); // For the new entity, we can retrieve it with either scope let act5 = cache.get(&key5, GetScope::InBlock).unwrap(); @@ -736,7 +740,7 @@ fn scoped_get() { // But if it gets updated, it becomes visible with either scope let mut wallet1 = wallet1; wallet1.set("balance", 70).unwrap(); - cache.set(key1.clone(), wallet1.clone()).unwrap(); + cache.set(key1.clone(), wallet1.clone(), None).unwrap(); let act1 = cache.get(&key1, GetScope::InBlock).unwrap(); assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref())); let act1 = cache.get(&key1, GetScope::Store).unwrap(); @@ -783,6 +787,6 @@ fn no_interface_mods() { let entity = entity! { LOAD_RELATED_SUBGRAPH => id: "1", balance: 100 }; - cache.set(key, entity).unwrap_err(); + cache.set(key, entity, None).unwrap_err(); }) }