diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index 0fdb179c6ce..2ecd4a015dd 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -62,10 +62,10 @@ public class PythonDeephavenSession extends AbstractScriptSession scope; + private final PythonScriptSessionModule module; + private final ScriptFinder scriptFinder; /** * Create a Python ScriptSession. @@ -89,13 +89,12 @@ public PythonDeephavenSession( scope = pythonEvaluator.getScope(); executionContext.getQueryLibrary().importClass(org.jpy.PyObject.class); try (final SafeCloseable ignored = executionContext.open()) { - this.module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session") + module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session") .createProxy(CallableKind.FUNCTION, PythonScriptSessionModule.class); } - this.scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH); + scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH); publishInitial(); - /* * And now the user-defined initialization scripts, if any. */ @@ -115,13 +114,16 @@ public PythonDeephavenSession( public PythonDeephavenSession( final UpdateGraph updateGraph, final PythonScope scope) { super(updateGraph, NoOp.INSTANCE, null); + + evaluator = null; this.scope = (PythonScope) scope; try (final SafeCloseable ignored = executionContext.open()) { - this.module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session") + module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session") .createProxy(CallableKind.FUNCTION, PythonScriptSessionModule.class); } - this.evaluator = null; - this.scriptFinder = null; + scriptFinder = null; + + publishInitial(); } @Override @@ -280,24 +282,23 @@ public boolean hasVariableName(String name) { @Override public synchronized void setVariable(String name, @Nullable Object newValue) { - try (PythonSnapshot fromSnapshot = takeSnapshot()) { - final PyDictWrapper globals = scope.mainGlobals(); - if (newValue == null) { - try { - globals.delItem(name); - } catch (KeyError key) { - // ignore - } - } else { - if (!(newValue instanceof PyObject)) { - newValue = PythonObjectWrapper.wrap(newValue); - } - globals.setItem(name, newValue); + final PyDictWrapper globals = scope.mainGlobals(); + if (newValue == null) { + try { + globals.delItem(name); + } catch (KeyError key) { + // ignore } - try (PythonSnapshot toSnapshot = takeSnapshot()) { - applyDiff(fromSnapshot, toSnapshot, null); + } else { + if (!(newValue instanceof PyObject)) { + newValue = PythonObjectWrapper.wrap(newValue); } + globals.setItem(name, newValue); } + + // Observe changes from this "setVariable" (potentially capturing previous or concurrent external changes from + // other threads) + observeScopeChanges(); } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java index eaa6a3b8cad..c5ba344e25b 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/AbstractScriptSession.java @@ -66,6 +66,8 @@ private static void createOrClearDirectory(final File directory) { private final ObjectTypeLookup objectTypeLookup; private final Listener changeListener; + private S lastSnapshot; + protected AbstractScriptSession( UpdateGraph updateGraph, ObjectTypeLookup objectTypeLookup, @@ -96,31 +98,25 @@ public ExecutionContext getExecutionContext() { } protected synchronized void publishInitial() { - try (S empty = emptySnapshot(); S snapshot = takeSnapshot()) { - applyDiff(empty, snapshot, null); - } - } - - protected interface Snapshot extends SafeCloseable { - + lastSnapshot = emptySnapshot(); + observeScopeChanges(); } @Override - public synchronized SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent) { - // TODO deephaven-core#2453 this should be redone, along with other scope change handling - if (previousIfPresent != null) { - previousIfPresent.close(); - } - S snapshot = takeSnapshot(); - return () -> finishSnapshot(snapshot); + public void observeScopeChanges() { + observeAndCollectScopeChanges(null); } - private synchronized void finishSnapshot(S beforeSnapshot) { - try (beforeSnapshot; S afterSnapshot = takeSnapshot()) { - applyDiff(beforeSnapshot, afterSnapshot, null); + private synchronized Changes observeAndCollectScopeChanges(@Nullable final RuntimeException evaluationError) { + final S beforeSnapshot = lastSnapshot; + lastSnapshot = takeSnapshot(); + try (beforeSnapshot) { + return applyDiff(beforeSnapshot, lastSnapshot, evaluationError); } } + protected interface Snapshot extends SafeCloseable { + } protected abstract S emptySnapshot(); @@ -138,12 +134,14 @@ protected Changes applyDiff(S from, S to, RuntimeException e) { @Override public synchronized final Changes evaluateScript(final String script, final @Nullable String scriptName) { + // Observe any external changes that are not yet recorded + observeScopeChanges(); + RuntimeException evaluateErr = null; final Changes diff; // retain any objects which are created in the executed code, we'll release them when the script session // closes - try (S fromSnapshot = takeSnapshot(); - final SafeCloseable ignored = LivenessScopeStack.open(this, false)) { + try (final SafeCloseable ignored = LivenessScopeStack.open(this, false)) { try { // Actually evaluate the script; use the enclosing auth context, since AbstractScriptSession's @@ -154,9 +152,8 @@ public synchronized final Changes evaluateScript(final String script, final @Nul evaluateErr = err; } - try (S toSnapshot = takeSnapshot()) { - diff = applyDiff(fromSnapshot, toSnapshot, evaluateErr); - } + // Observe changes during this evaluation (potentially capturing external changes from other threads) + diff = observeAndCollectScopeChanges(evaluateErr); } return diff; @@ -226,18 +223,6 @@ public Changes evaluateScript(Path scriptPath) { } } - protected synchronized void notifyVariableChange(String name, @Nullable Object oldValue, - @Nullable Object newValue) { - if (changeListener == null) { - return; - } - Changes changes = new Changes(); - applyVariableChangeToDiff(changes, name, oldValue, newValue); - if (!changes.isEmpty()) { - changeListener.onScopeChanges(this, changes); - } - } - @Override protected void destroy() { super.destroy(); diff --git a/engine/table/src/main/java/io/deephaven/engine/util/DelegatingScriptSession.java b/engine/table/src/main/java/io/deephaven/engine/util/DelegatingScriptSession.java index 5a1c41c097f..f739a916a3f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/DelegatingScriptSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/DelegatingScriptSession.java @@ -62,8 +62,8 @@ public ExecutionContext getExecutionContext() { } @Override - public SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent) { - return delegate.snapshot(previousIfPresent); + public void observeScopeChanges() { + delegate.observeScopeChanges(); } @NotNull diff --git a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java index edb39bf3059..7e1dfdf925a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/GroovyDeephavenSession.java @@ -680,9 +680,11 @@ public boolean hasVariableName(String name) { @Override public void setVariable(String name, @Nullable Object newValue) { - final Object oldValue = getVariable(name, null); groovyShell.getContext().setVariable(NameValidator.validateQueryParameterName(name), newValue); - notifyVariableChange(name, oldValue, newValue); + + // Observe changes from this "setVariable" (potentially capturing previous or concurrent external changes from + // other threads) + observeScopeChanges(); } public Binding getBinding() { diff --git a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java index 567d165dff9..850aed19068 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/NoLanguageDeephavenSession.java @@ -102,9 +102,9 @@ public boolean hasVariableName(String name) { @Override public void setVariable(String name, @Nullable Object newValue) { - Object oldValue = getVariable(name, null); variables.put(name, newValue); - notifyVariableChange(name, oldValue, newValue); + // changeListener is always null for NoLanguageDeephavenSession; we have no mechanism for reporting scope + // changes } @Override diff --git a/engine/table/src/main/java/io/deephaven/engine/util/ScriptSession.java b/engine/table/src/main/java/io/deephaven/engine/util/ScriptSession.java index d25dd99add6..d4dd02d1dc2 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/ScriptSession.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/ScriptSession.java @@ -9,7 +9,6 @@ import io.deephaven.engine.liveness.ReleasableLivenessManager; import io.deephaven.engine.util.scripts.ScriptPathLoader; import io.deephaven.engine.util.scripts.ScriptPathLoaderState; -import io.deephaven.util.SafeCloseable; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -87,26 +86,13 @@ interface Listener { } /** - * Tracks changes in the script session bindings until the SnapshotScope is closed. - * - * @return a new SnapshotScope, so that the caller can control when to stop tracking changes to bindings. + * Observe (and report via {@link Listener#onScopeChanges(ScriptSession, Changes) onScopeChanges}) any changes to + * this ScriptSession's {@link QueryScope} that may have been made externally, rather than during + * {@link #evaluateScript script evaluation}. + * + * @apiNote This method should be regarded as an unstable API */ - default SnapshotScope snapshot() { - return snapshot(null); - } - - /** - * Tracks changes in the script session bindings until the SnapshotScope is closed. - * - * This API should be considered unstable, see deephaven-core#2453. - * - * @param previousIfPresent if non-null, will be closed atomically with the new scope being opened. - * @return a new SnapshotScope, so that the caller can control when to stop tracking changes to bindings. - */ - SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent); - - interface SnapshotScope extends SafeCloseable { - } + void observeScopeChanges(); /** * Evaluates the script and manages liveness of objects that are exported to the user. This method should be called diff --git a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java index c9aeda8f17d..467ecdb15ba 100644 --- a/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java +++ b/py/embedded-server/java-runtime/src/main/java/io/deephaven/python/server/EmbeddedServer.java @@ -28,11 +28,9 @@ import io.deephaven.server.runner.DeephavenApiServerModule; import io.deephaven.server.runner.MainHelper; import io.deephaven.server.session.ObfuscatingErrorTransformerModule; -import io.deephaven.server.util.Scheduler; import org.jpy.PyModule; import org.jpy.PyObject; -import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Provider; import javax.inject.Singleton; @@ -70,8 +68,6 @@ interface Builder extends JettyServerComponent.Builder scriptSession; // // this is a nice idea, but won't work, since this is the same instance that we had to disable via sysprop @@ -103,8 +99,6 @@ public EmbeddedServer(String host, Integer port, PyObject dict) throws IOExcepti .build() .injectFields(this); - checkGlobals(scriptSession.get(), null); - // We need to open the systemic execution context to permanently install the contexts for this thread. scriptSession.get().getExecutionContext().open(); } @@ -115,27 +109,6 @@ public void start() throws Exception { Bootstrap.printf("Server started on port %d%n", getPort()); } - private void checkGlobals(ScriptSession scriptSession, @Nullable ScriptSession.SnapshotScope lastSnapshot) { - // TODO deephaven-core#2453 make this more generic, ideally by pushing this in whole or part into script session - ScriptSession.SnapshotScope nextSnapshot; - try { - nextSnapshot = scriptSession.snapshot(lastSnapshot); - } catch (IllegalStateException e) { - if (e.getMessage().startsWith("Expected transition from=")) { - // We are limited in how we can track external changes, and the web IDE has made this change and - // already applied it. - // Take a fresh snapshot right away to continue polling - nextSnapshot = scriptSession.snapshot(); - } else { - throw e; - } - } - ScriptSession.SnapshotScope s = nextSnapshot; - scheduler.runAfterDelay(100, () -> { - checkGlobals(scriptSession, s); - }); - } - public int getPort() { return server.server().getPort(); } diff --git a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java index e0aae73137f..7c12f21dcc1 100644 --- a/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java +++ b/server/src/main/java/io/deephaven/server/runner/DeephavenApiServer.java @@ -4,6 +4,7 @@ package io.deephaven.server.runner; import io.deephaven.auth.AuthenticationRequestHandler; +import io.deephaven.configuration.Configuration; import io.deephaven.engine.context.ExecutionContext; import io.deephaven.engine.table.impl.OperationInitializationThreadPool; import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder; @@ -22,6 +23,7 @@ import io.deephaven.server.log.LogInit; import io.deephaven.server.plugin.PluginRegistration; import io.deephaven.server.session.SessionService; +import io.deephaven.server.util.Scheduler; import io.deephaven.uri.resolver.UriResolver; import io.deephaven.uri.resolver.UriResolvers; import io.deephaven.uri.resolver.UriResolversInstance; @@ -44,9 +46,14 @@ public class DeephavenApiServer { private static final Logger log = LoggerFactory.getLogger(DeephavenApiServer.class); + private static final long CHECK_SCOPE_CHANGES_INTERVAL_MILLIS = + Configuration.getInstance().getLongForClassWithDefault( + DeephavenApiServer.class, "checkScopeChangesIntervalMillis", 100); + private final GrpcServer server; private final UpdateGraph ug; private final LogInit logInit; + private final Scheduler scheduler; private final Provider scriptSessionProvider; private final PluginRegistration pluginRegistration; private final ApplicationInjector applicationInjector; @@ -61,6 +68,7 @@ public DeephavenApiServer( final GrpcServer server, @Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph ug, final LogInit logInit, + final Scheduler scheduler, final Provider scriptSessionProvider, final PluginRegistration pluginRegistration, final ApplicationInjector applicationInjector, @@ -72,6 +80,7 @@ public DeephavenApiServer( this.server = server; this.ug = ug; this.logInit = logInit; + this.scheduler = scheduler; this.scriptSessionProvider = scriptSessionProvider; this.pluginRegistration = pluginRegistration; this.applicationInjector = applicationInjector; @@ -127,7 +136,7 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time AbstractScriptSession.createScriptCache(); log.info().append("Initializing Script Session...").endl(); - scriptSessionProvider.get(); + checkScopeChanges(scriptSessionProvider.get()); pluginRegistration.registerAll(); log.info().append("Initializing Execution Context for Main Thread...").endl(); @@ -167,6 +176,13 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time return this; } + private void checkScopeChanges(ScriptSession scriptSession) { + scriptSession.observeScopeChanges(); + scheduler.runAfterDelay(CHECK_SCOPE_CHANGES_INTERVAL_MILLIS, () -> { + checkScopeChanges(scriptSession); + }); + } + /** * Blocks until the server exits. *