diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java index c9992b984d1..9392a00bec3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/UnionSourceManager.java @@ -4,6 +4,7 @@ package io.deephaven.engine.table.impl.sources; import io.deephaven.base.verify.Assert; +import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.rowset.*; import io.deephaven.engine.rowset.RowSetFactory; import io.deephaven.engine.table.*; @@ -61,6 +62,7 @@ public class UnionSourceManager { private final MergedListener mergedListener; private final ConstituentChangesListenerRecorder constituentChangesListener; private final UpdateCommitter updateCommitter; + private final ExecutionContext executionContext; public UnionSourceManager(@NotNull final PartitionedTable partitionedTable) { constituentChangesPermitted = partitionedTable.constituentChangesPermitted(); @@ -99,11 +101,17 @@ public UnionSourceManager(@NotNull final PartitionedTable partitionedTable) { updateCommitter = new UpdateCommitter<>(this, partitionedTable.table().getUpdateGraph(), usm -> usm.unionRedirection.copyCurrToPrev()); + + executionContext = ExecutionContext.newBuilder() + .markSystemic() + .captureUpdateGraph() + .build(); } else { listenerRecorders = null; mergedListener = null; constituentChangesListener = null; updateCommitter = null; + executionContext = null; } try (final Stream initialConstituents = currConstituents()) { @@ -244,7 +252,8 @@ private MergedUnionListener( protected void process() { final TableUpdate constituentChanges = getAndCheckConstituentChanges(); final TableUpdate downstream; - try (final ChangeProcessingContext context = new ChangeProcessingContext(constituentChanges)) { + try (final SafeCloseable ignored = executionContext.open(); + final ChangeProcessingContext context = new ChangeProcessingContext(constituentChanges)) { downstream = context.processChanges(); } catch (Throwable ex) { propagateError(false, ex, entry);