Skip to content

Commit

Permalink
core: do not allow multiple active runners for a subgraph (#5715)
Browse files Browse the repository at this point in the history
  • Loading branch information
isum authored Nov 25, 2024
1 parent 6b48bfd commit 1e7732c
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 12 deletions.
4 changes: 4 additions & 0 deletions core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl SubgraphKeepAlive {
self.sg_metrics.running_count.inc();
}
}

pub fn contains(&self, deployment_id: &DeploymentId) -> bool {
self.alive_map.read().unwrap().contains_key(deployment_id)
}
}

// The context keeps track of mutable in-memory state that is retained across blocks.
Expand Down
38 changes: 32 additions & 6 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;

use crate::polling_monitor::{ArweaveService, IpfsService};
use crate::subgraph::context::{IndexingContext, SubgraphKeepAlive};
use crate::subgraph::inputs::IndexingInputs;
Expand All @@ -22,6 +25,7 @@ use tokio::task;

use super::context::OffchainMonitor;
use super::SubgraphTriggerProcessor;
use crate::subgraph::runner::SubgraphRunnerError;

#[derive(Clone)]
pub struct SubgraphInstanceManager<S: SubgraphStore> {
Expand All @@ -35,6 +39,18 @@ pub struct SubgraphInstanceManager<S: SubgraphStore> {
arweave_service: ArweaveService,
static_filters: bool,
env_vars: Arc<EnvVars>,

/// By design, there should be only one subgraph runner process per subgraph, but the current
/// implementation does not completely prevent multiple runners from being active at the same
/// time, and we have already had a [bug][0] due to this limitation. Investigating the problem
/// was quite complicated because there was no way to know that the logs were coming from two
/// different processes because all the logs looked the same. Ideally, the implementation
/// should be refactored to make it more strict, but until then, we keep this counter, which
/// is incremented each time a new runner is started, and the previous count is embedded in
/// each log of the started runner, to make debugging future issues easier.
///
/// [0]: https://github.com/graphprotocol/graph-node/issues/5452
subgraph_start_counter: Arc<AtomicU64>,
}

#[async_trait]
Expand All @@ -45,7 +61,11 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
) {
let runner_index = self.subgraph_start_counter.fetch_add(1, Ordering::SeqCst);

let logger = self.logger_factory.subgraph_logger(&loc);
let logger = logger.new(o!("runner_index" => runner_index));

let err_logger = logger.clone();
let instance_manager = self.cheap_clone();

Expand Down Expand Up @@ -185,6 +205,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
static_filters,
env_vars,
arweave_service,
subgraph_start_counter: Arc::new(AtomicU64::new(0)),
}
}

Expand Down Expand Up @@ -491,13 +512,18 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
// it has a dedicated OS thread so the OS will handle the preemption. See
// https://github.com/tokio-rs/tokio/issues/3493.
graph::spawn_thread(deployment.to_string(), move || {
if let Err(e) = graph::block_on(task::unconstrained(runner.run())) {
error!(
&logger,
"Subgraph instance failed to run: {}",
format!("{:#}", e)
);
match graph::block_on(task::unconstrained(runner.run())) {
Ok(()) => {}
Err(SubgraphRunnerError::Duplicate) => {
// We do not need to unregister metrics because they are unique per subgraph
// and another runner is still active.
return;
}
Err(err) => {
error!(&logger, "Subgraph instance failed to run: {:#}", err);
}
}

subgraph_metrics_unregister.unregister(registry);
});

Expand Down
49 changes: 44 additions & 5 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ where
pub metrics: RunnerMetrics,
}

#[derive(Debug, thiserror::Error)]
pub enum SubgraphRunnerError {
#[error("subgraph runner terminated because a newer one was active")]
Duplicate,

#[error(transparent)]
Unknown(#[from] Error),
}

impl<C, T> SubgraphRunner<C, T>
where
C: Blockchain,
Expand Down Expand Up @@ -109,7 +118,7 @@ where

#[cfg(debug_assertions)]
pub async fn run_for_test(self, break_on_restart: bool) -> Result<Self, Error> {
self.run_inner(break_on_restart).await
self.run_inner(break_on_restart).await.map_err(Into::into)
}

fn is_static_filters_enabled(&self) -> bool {
Expand Down Expand Up @@ -166,11 +175,11 @@ where
self.build_filter()
}

pub async fn run(self) -> Result<(), Error> {
pub async fn run(self) -> Result<(), SubgraphRunnerError> {
self.run_inner(false).await.map(|_| ())
}

async fn run_inner(mut self, break_on_restart: bool) -> Result<Self, Error> {
async fn run_inner(mut self, break_on_restart: bool) -> Result<Self, SubgraphRunnerError> {
// If a subgraph failed for deterministic reasons, before start indexing, we first
// revert the deployment head. It should lead to the same result since the error was
// deterministic.
Expand Down Expand Up @@ -246,15 +255,39 @@ where
// TODO: move cancel handle to the Context
// This will require some code refactor in how the BlockStream is created
let block_start = Instant::now();
match self

let action = self
.handle_stream_event(event, &block_stream_cancel_handle)
.await
.map(|res| {
self.metrics
.subgraph
.observe_block_processed(block_start.elapsed(), res.block_finished());
res
})? {
})?;

// It is possible that the subgraph was unassigned, but the runner was in
// a retry delay state and did not observe the cancel signal.
if block_stream_cancel_handle.is_canceled() {
// It is also possible that the runner was in a retry delay state while
// the subgraph was reassigned and a new runner was started.
if self.ctx.instances.contains(&self.inputs.deployment.id) {
warn!(
self.logger,
"Terminating the subgraph runner because a newer one is active. \
Possible reassignment detected while the runner was in a non-cancellable pending state",
);
return Err(SubgraphRunnerError::Duplicate);
}

warn!(
self.logger,
"Terminating the subgraph runner because subgraph was unassigned",
);
return Ok(self);
}

match action {
Action::Continue => continue,
Action::Stop => {
info!(self.logger, "Stopping subgraph");
Expand Down Expand Up @@ -1579,6 +1612,12 @@ where
}
}

impl From<StoreError> for SubgraphRunnerError {
fn from(err: StoreError) -> Self {
Self::Unknown(err.into())
}
}

/// Transform the proof of indexing changes into entity updates that will be
/// inserted when as_modifications is called.
async fn update_proof_of_indexing(
Expand Down
3 changes: 2 additions & 1 deletion store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use graph::prelude::{
SubgraphStore as _, BLOCK_NUMBER_MAX,
};
use graph::schema::{EntityKey, EntityType, InputSchema};
use graph::slog::{info, warn};
use graph::slog::{debug, info, warn};
use graph::tokio::select;
use graph::tokio::sync::Notify;
use graph::tokio::task::JoinHandle;
Expand Down Expand Up @@ -936,6 +936,7 @@ impl Queue {
// Graceful shutdown. We also handled the request
// successfully
queue.queue.pop().await;
debug!(logger, "Subgraph writer has processed a stop request");
return;
}
Ok(Err(e)) => {
Expand Down

0 comments on commit 1e7732c

Please sign in to comment.