Skip to content

Commit

Permalink
Fix for scope snapshot diff gaps (#4144)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy committed Jul 7, 2023
1 parent e369e89 commit 8bedc20
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public class PythonDeephavenSession extends AbstractScriptSession<PythonSnapshot

public static String SCRIPT_TYPE = "Python";

private final PythonScriptSessionModule module;
private final ScriptFinder scriptFinder;
private final PythonEvaluator evaluator;
private final PythonScope<PyObject> scope;
private final PythonScriptSessionModule module;
private final ScriptFinder scriptFinder;

/**
* Create a Python ScriptSession.
Expand All @@ -89,13 +89,12 @@ public PythonDeephavenSession(
scope = pythonEvaluator.getScope();
executionContext.getQueryLibrary().importClass(org.jpy.PyObject.class);
try (final SafeCloseable ignored = executionContext.open()) {
this.module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
.createProxy(CallableKind.FUNCTION, PythonScriptSessionModule.class);
}
this.scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);
scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH);

publishInitial();

/*
* And now the user-defined initialization scripts, if any.
*/
Expand All @@ -115,13 +114,16 @@ public PythonDeephavenSession(
public PythonDeephavenSession(
final UpdateGraph updateGraph, final PythonScope<?> scope) {
super(updateGraph, NoOp.INSTANCE, null);

evaluator = null;
this.scope = (PythonScope<PyObject>) scope;
try (final SafeCloseable ignored = executionContext.open()) {
this.module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
module = (PythonScriptSessionModule) PyModule.importModule("deephaven.server.script_session")
.createProxy(CallableKind.FUNCTION, PythonScriptSessionModule.class);
}
this.evaluator = null;
this.scriptFinder = null;
scriptFinder = null;

publishInitial();
}

@Override
Expand Down Expand Up @@ -280,24 +282,23 @@ public boolean hasVariableName(String name) {

@Override
public synchronized void setVariable(String name, @Nullable Object newValue) {
try (PythonSnapshot fromSnapshot = takeSnapshot()) {
final PyDictWrapper globals = scope.mainGlobals();
if (newValue == null) {
try {
globals.delItem(name);
} catch (KeyError key) {
// ignore
}
} else {
if (!(newValue instanceof PyObject)) {
newValue = PythonObjectWrapper.wrap(newValue);
}
globals.setItem(name, newValue);
final PyDictWrapper globals = scope.mainGlobals();
if (newValue == null) {
try {
globals.delItem(name);
} catch (KeyError key) {
// ignore
}
try (PythonSnapshot toSnapshot = takeSnapshot()) {
applyDiff(fromSnapshot, toSnapshot, null);
} else {
if (!(newValue instanceof PyObject)) {
newValue = PythonObjectWrapper.wrap(newValue);
}
globals.setItem(name, newValue);
}

// Observe changes from this "setVariable" (potentially capturing previous or concurrent external changes from
// other threads)
observeScopeChanges();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ private static void createOrClearDirectory(final File directory) {
private final ObjectTypeLookup objectTypeLookup;
private final Listener changeListener;

private S lastSnapshot;

protected AbstractScriptSession(
UpdateGraph updateGraph,
ObjectTypeLookup objectTypeLookup,
Expand Down Expand Up @@ -96,31 +98,25 @@ public ExecutionContext getExecutionContext() {
}

protected synchronized void publishInitial() {
try (S empty = emptySnapshot(); S snapshot = takeSnapshot()) {
applyDiff(empty, snapshot, null);
}
}

protected interface Snapshot extends SafeCloseable {

lastSnapshot = emptySnapshot();
observeScopeChanges();
}

@Override
public synchronized SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent) {
// TODO deephaven-core#2453 this should be redone, along with other scope change handling
if (previousIfPresent != null) {
previousIfPresent.close();
}
S snapshot = takeSnapshot();
return () -> finishSnapshot(snapshot);
public void observeScopeChanges() {
observeAndCollectScopeChanges(null);
}

private synchronized void finishSnapshot(S beforeSnapshot) {
try (beforeSnapshot; S afterSnapshot = takeSnapshot()) {
applyDiff(beforeSnapshot, afterSnapshot, null);
private synchronized Changes observeAndCollectScopeChanges(@Nullable final RuntimeException evaluationError) {
final S beforeSnapshot = lastSnapshot;
lastSnapshot = takeSnapshot();
try (beforeSnapshot) {
return applyDiff(beforeSnapshot, lastSnapshot, evaluationError);
}
}

protected interface Snapshot extends SafeCloseable {
}

protected abstract S emptySnapshot();

Expand All @@ -138,12 +134,14 @@ protected Changes applyDiff(S from, S to, RuntimeException e) {

@Override
public synchronized final Changes evaluateScript(final String script, final @Nullable String scriptName) {
// Observe any external changes that are not yet recorded
observeScopeChanges();

RuntimeException evaluateErr = null;
final Changes diff;
// retain any objects which are created in the executed code, we'll release them when the script session
// closes
try (S fromSnapshot = takeSnapshot();
final SafeCloseable ignored = LivenessScopeStack.open(this, false)) {
try (final SafeCloseable ignored = LivenessScopeStack.open(this, false)) {

try {
// Actually evaluate the script; use the enclosing auth context, since AbstractScriptSession's
Expand All @@ -154,9 +152,8 @@ public synchronized final Changes evaluateScript(final String script, final @Nul
evaluateErr = err;
}

try (S toSnapshot = takeSnapshot()) {
diff = applyDiff(fromSnapshot, toSnapshot, evaluateErr);
}
// Observe changes during this evaluation (potentially capturing external changes from other threads)
diff = observeAndCollectScopeChanges(evaluateErr);
}

return diff;
Expand Down Expand Up @@ -226,18 +223,6 @@ public Changes evaluateScript(Path scriptPath) {
}
}

protected synchronized void notifyVariableChange(String name, @Nullable Object oldValue,
@Nullable Object newValue) {
if (changeListener == null) {
return;
}
Changes changes = new Changes();
applyVariableChangeToDiff(changes, name, oldValue, newValue);
if (!changes.isEmpty()) {
changeListener.onScopeChanges(this, changes);
}
}

@Override
protected void destroy() {
super.destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public ExecutionContext getExecutionContext() {
}

@Override
public SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent) {
return delegate.snapshot(previousIfPresent);
public void observeScopeChanges() {
delegate.observeScopeChanges();
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,9 +680,11 @@ public boolean hasVariableName(String name) {

@Override
public void setVariable(String name, @Nullable Object newValue) {
final Object oldValue = getVariable(name, null);
groovyShell.getContext().setVariable(NameValidator.validateQueryParameterName(name), newValue);
notifyVariableChange(name, oldValue, newValue);

// Observe changes from this "setVariable" (potentially capturing previous or concurrent external changes from
// other threads)
observeScopeChanges();
}

public Binding getBinding() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ public boolean hasVariableName(String name) {

@Override
public void setVariable(String name, @Nullable Object newValue) {
Object oldValue = getVariable(name, null);
variables.put(name, newValue);
notifyVariableChange(name, oldValue, newValue);
// changeListener is always null for NoLanguageDeephavenSession; we have no mechanism for reporting scope
// changes
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.deephaven.engine.liveness.ReleasableLivenessManager;
import io.deephaven.engine.util.scripts.ScriptPathLoader;
import io.deephaven.engine.util.scripts.ScriptPathLoaderState;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -87,26 +86,13 @@ interface Listener {
}

/**
* Tracks changes in the script session bindings until the SnapshotScope is closed.
*
* @return a new SnapshotScope, so that the caller can control when to stop tracking changes to bindings.
* Observe (and report via {@link Listener#onScopeChanges(ScriptSession, Changes) onScopeChanges}) any changes to
* this ScriptSession's {@link QueryScope} that may have been made externally, rather than during
* {@link #evaluateScript script evaluation}.
*
* @apiNote This method should be regarded as an unstable API
*/
default SnapshotScope snapshot() {
return snapshot(null);
}

/**
* Tracks changes in the script session bindings until the SnapshotScope is closed.
*
* This API should be considered unstable, see deephaven-core#2453.
*
* @param previousIfPresent if non-null, will be closed atomically with the new scope being opened.
* @return a new SnapshotScope, so that the caller can control when to stop tracking changes to bindings.
*/
SnapshotScope snapshot(@Nullable SnapshotScope previousIfPresent);

interface SnapshotScope extends SafeCloseable {
}
void observeScopeChanges();

/**
* Evaluates the script and manages liveness of objects that are exported to the user. This method should be called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
import io.deephaven.server.runner.DeephavenApiServerModule;
import io.deephaven.server.runner.MainHelper;
import io.deephaven.server.session.ObfuscatingErrorTransformerModule;
import io.deephaven.server.util.Scheduler;
import org.jpy.PyModule;
import org.jpy.PyObject;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.inject.Singleton;
Expand Down Expand Up @@ -70,8 +68,6 @@ interface Builder extends JettyServerComponent.Builder<Builder, PythonServerComp
@Inject
DeephavenApiServer server;
@Inject
Scheduler scheduler;
@Inject
Provider<ScriptSession> scriptSession;

// // this is a nice idea, but won't work, since this is the same instance that we had to disable via sysprop
Expand Down Expand Up @@ -103,8 +99,6 @@ public EmbeddedServer(String host, Integer port, PyObject dict) throws IOExcepti
.build()
.injectFields(this);

checkGlobals(scriptSession.get(), null);

// We need to open the systemic execution context to permanently install the contexts for this thread.
scriptSession.get().getExecutionContext().open();
}
Expand All @@ -115,27 +109,6 @@ public void start() throws Exception {
Bootstrap.printf("Server started on port %d%n", getPort());
}

private void checkGlobals(ScriptSession scriptSession, @Nullable ScriptSession.SnapshotScope lastSnapshot) {
// TODO deephaven-core#2453 make this more generic, ideally by pushing this in whole or part into script session
ScriptSession.SnapshotScope nextSnapshot;
try {
nextSnapshot = scriptSession.snapshot(lastSnapshot);
} catch (IllegalStateException e) {
if (e.getMessage().startsWith("Expected transition from=")) {
// We are limited in how we can track external changes, and the web IDE has made this change and
// already applied it.
// Take a fresh snapshot right away to continue polling
nextSnapshot = scriptSession.snapshot();
} else {
throw e;
}
}
ScriptSession.SnapshotScope s = nextSnapshot;
scheduler.runAfterDelay(100, () -> {
checkGlobals(scriptSession, s);
});
}

public int getPort() {
return server.server().getPort();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.server.runner;

import io.deephaven.auth.AuthenticationRequestHandler;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.table.impl.OperationInitializationThreadPool;
import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
Expand All @@ -22,6 +23,7 @@
import io.deephaven.server.log.LogInit;
import io.deephaven.server.plugin.PluginRegistration;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.util.Scheduler;
import io.deephaven.uri.resolver.UriResolver;
import io.deephaven.uri.resolver.UriResolvers;
import io.deephaven.uri.resolver.UriResolversInstance;
Expand All @@ -44,9 +46,14 @@
public class DeephavenApiServer {
private static final Logger log = LoggerFactory.getLogger(DeephavenApiServer.class);

private static final long CHECK_SCOPE_CHANGES_INTERVAL_MILLIS =
Configuration.getInstance().getLongForClassWithDefault(
DeephavenApiServer.class, "checkScopeChangesIntervalMillis", 100);

private final GrpcServer server;
private final UpdateGraph ug;
private final LogInit logInit;
private final Scheduler scheduler;
private final Provider<ScriptSession> scriptSessionProvider;
private final PluginRegistration pluginRegistration;
private final ApplicationInjector applicationInjector;
Expand All @@ -61,6 +68,7 @@ public DeephavenApiServer(
final GrpcServer server,
@Named(PeriodicUpdateGraph.DEFAULT_UPDATE_GRAPH_NAME) final UpdateGraph ug,
final LogInit logInit,
final Scheduler scheduler,
final Provider<ScriptSession> scriptSessionProvider,
final PluginRegistration pluginRegistration,
final ApplicationInjector applicationInjector,
Expand All @@ -72,6 +80,7 @@ public DeephavenApiServer(
this.server = server;
this.ug = ug;
this.logInit = logInit;
this.scheduler = scheduler;
this.scriptSessionProvider = scriptSessionProvider;
this.pluginRegistration = pluginRegistration;
this.applicationInjector = applicationInjector;
Expand Down Expand Up @@ -127,7 +136,7 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time
AbstractScriptSession.createScriptCache();

log.info().append("Initializing Script Session...").endl();
scriptSessionProvider.get();
checkScopeChanges(scriptSessionProvider.get());
pluginRegistration.registerAll();

log.info().append("Initializing Execution Context for Main Thread...").endl();
Expand Down Expand Up @@ -167,6 +176,13 @@ public DeephavenApiServer run() throws IOException, ClassNotFoundException, Time
return this;
}

private void checkScopeChanges(ScriptSession scriptSession) {
scriptSession.observeScopeChanges();
scheduler.runAfterDelay(CHECK_SCOPE_CHANGES_INTERVAL_MILLIS, () -> {
checkScopeChanges(scriptSession);
});
}

/**
* Blocks until the server exits.
*
Expand Down

0 comments on commit 8bedc20

Please sign in to comment.