Skip to content

Commit

Permalink
Extract variable api from ScriptSession, let ScriptSession guard reads (
Browse files Browse the repository at this point in the history
deephaven#4970)

This patch moves from assuming that the ScriptSession owns the
ExecContext for locking (with UpdateGraph) and variable access (with
QueryScope).

Previously, the groovy script session used a lock to interact with its
query scope, but python did not under the incorrect assumption that the
GIL would guard those reads. Instead, ScriptSession implementations
ensure that reads will be safe.

Partial deephaven#4040
  • Loading branch information
niloc132 authored Jan 22, 2024
1 parent fd429ff commit d3dae15
Show file tree
Hide file tree
Showing 34 changed files with 711 additions and 958 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import io.deephaven.plugin.type.ObjectTypeLookup.NoOp;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.annotations.ScriptApi;
import io.deephaven.util.annotations.VisibleForTesting;
import io.deephaven.util.thread.NamingThreadFactory;
import io.deephaven.util.thread.ThreadInitializationFactory;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jpy.KeyError;
import org.jpy.PyDictWrapper;
import org.jpy.PyInputMode;
import org.jpy.PyLib;
import org.jpy.PyLib.CallableKind;
import org.jpy.PyModule;
import org.jpy.PyObject;
Expand All @@ -41,14 +41,13 @@
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -150,13 +149,6 @@ public Thread newThread(@NotNull Runnable r) {
}
}

@Override
@VisibleForTesting
public QueryScope newQueryScope() {
// depend on the GIL instead of local synchronization
return new UnsynchronizedScriptSessionQueryScope(this);
}

/**
* Finds the specified script; and runs it as a file, or if it is a stream writes it to a temporary file in order to
* run it.
Expand All @@ -183,20 +175,14 @@ private void runScript(String script) throws IOException {
}
}

@NotNull
@SuppressWarnings("unchecked")
@Override
public Object getVariable(String name) throws QueryScope.MissingVariableException {
return scope
protected <T> T getVariable(String name) throws QueryScope.MissingVariableException {
return (T) scope
.getValue(name)
.orElseThrow(() -> new QueryScope.MissingVariableException("No variable for: " + name));
.orElseThrow(() -> new QueryScope.MissingVariableException("Missing variable " + name));
}

@Override
public <T> T getVariable(String name, T defaultValue) {
return scope
.<T>getValueUnchecked(name)
.orElse(defaultValue);
}

@SuppressWarnings("unused")
@ScriptApi
Expand Down Expand Up @@ -224,13 +210,6 @@ protected void evaluate(String command, String scriptName) {
}
}

@Override
public Map<String, Object> getVariables() {
final Map<String, Object> outMap = new LinkedHashMap<>();
scope.getEntriesMap().forEach((key, value) -> outMap.put(key, maybeUnwrap(value)));
return outMap;
}

protected static class PythonSnapshot implements Snapshot, SafeCloseable {

private final PyDictWrapper dict;
Expand Down Expand Up @@ -270,59 +249,62 @@ protected Changes createDiff(PythonSnapshot from, PythonSnapshot to, RuntimeExce
final String name = change.call(String.class, "__getitem__", int.class, 0);
final PyObject fromValue = change.call(PyObject.class, "__getitem__", int.class, 1);
final PyObject toValue = change.call(PyObject.class, "__getitem__", int.class, 2);
applyVariableChangeToDiff(diff, name, maybeUnwrap(fromValue), maybeUnwrap(toValue));
applyVariableChangeToDiff(diff, name, unwrapObject(fromValue), unwrapObject(toValue));
}
return diff;
}
}

private Object maybeUnwrap(Object o) {
if (o instanceof PyObject) {
return maybeUnwrap((PyObject) o);
}
return o;
}

private Object maybeUnwrap(PyObject o) {
if (o == null) {
return null;
}
final Object javaObject = module.javaify(o);
if (javaObject != null) {
return javaObject;
}
return o;
}

@Override
public Set<String> getVariableNames() {
return Collections.unmodifiableSet(scope.getKeys().collect(Collectors.toSet()));
protected Set<String> getVariableNames(Predicate<String> allowName) {
return PyLib.ensureGil(() -> scope.getKeys().filter(allowName).collect(Collectors.toUnmodifiableSet()));
}

@Override
public boolean hasVariableName(String name) {
protected boolean hasVariable(String name) {
return scope.containsKey(name);
}

@Override
public synchronized void setVariable(String name, @Nullable Object newValue) {
final PyDictWrapper globals = scope.mainGlobals();
if (newValue == null) {
try {
globals.delItem(name);
} catch (KeyError key) {
// ignore
}
} else {
if (!(newValue instanceof PyObject)) {
newValue = PythonObjectWrapper.wrap(newValue);
protected synchronized Object setVariable(String name, @Nullable Object newValue) {
Object old = PyLib.ensureGil(() -> {
final PyDictWrapper globals = scope.mainGlobals();

if (newValue == null) {
try {
return globals.unwrap().callMethod("pop", name);
} catch (KeyError key) {
return null;
}
} else {
Object wrapped;
if (newValue instanceof PyObject) {
wrapped = newValue;
} else {
wrapped = PythonObjectWrapper.wrap(newValue);
}
// This isn't thread safe, we're relying on the GIL being kind to us (as we have historically done).
// There is no built-in for "replace a variable and return the old one".
Object prev = globals.get(name);
globals.setItem(name, wrapped);
return prev;
}
globals.setItem(name, newValue);
}
});

// Observe changes from this "setVariable" (potentially capturing previous or concurrent external changes from
// other threads)
observeScopeChanges();

// This doesn't return the same Java instance of PyObject, so we won't decref it properly, but
// again, that is consistent with how we've historically treated these references.
return old;
}

@Override
protected Map<String, Object> getAllValues() {
return PyLib
.ensureGil(() -> scope.getEntries()
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@
*/
package io.deephaven.engine.context;

import io.deephaven.engine.liveness.LivenessReferent;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

public class EmptyQueryScope extends QueryScope {
public class EmptyQueryScope implements QueryScope {
public final static EmptyQueryScope INSTANCE = new EmptyQueryScope();

private EmptyQueryScope() {}
Expand All @@ -22,7 +28,7 @@ public boolean hasParamName(String name) {
}

@Override
protected <T> QueryScopeParam<T> createParam(String name) throws MissingVariableException {
public <T> QueryScopeParam<T> createParam(String name) throws MissingVariableException {
throw new MissingVariableException("Missing variable " + name);
}

Expand All @@ -42,7 +48,37 @@ public <T> void putParam(String name, T value) {
}

@Override
public void putObjectFields(Object object) {
throw new IllegalStateException("EmptyQueryScope cannot create parameters");
public Map<String, Object> toMap() {
return Collections.emptyMap();
}

@Override
public boolean tryManage(@NotNull LivenessReferent referent) {
throw new UnsupportedOperationException("tryManage");
}

@Override
public boolean tryUnmanage(@NotNull LivenessReferent referent) {
throw new UnsupportedOperationException("tryUnmanage");
}

@Override
public boolean tryUnmanage(@NotNull Stream<? extends LivenessReferent> referents) {
throw new UnsupportedOperationException("tryUnmanage");
}

@Override
public boolean tryRetainReference() {
throw new UnsupportedOperationException("tryRetainReference");
}

@Override
public void dropReference() {
throw new UnsupportedOperationException("dropReference");
}

@Override
public WeakReference<? extends LivenessReferent> getWeakReference() {
throw new UnsupportedOperationException("getWeakReference");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,22 @@ public ExecutionContext withOperationInitializer(final OperationInitializer oper
operationInitializer);
}

/**
* Returns, or creates, an execution context with the given value for {@code queryScope} and existing values for the
* other members.
*
* @param queryScope the query scope to use instead
* @return the execution context
*/
public ExecutionContext withQueryScope(QueryScope queryScope) {
if (queryScope == this.queryScope) {
return this;
}
return new ExecutionContext(isSystemic, authContext, queryLibrary, queryScope, queryCompiler, updateGraph,
operationInitializer);
}


/**
* Execute runnable within this execution context.
*/
Expand Down Expand Up @@ -333,7 +349,7 @@ public Builder emptyQueryScope() {
*/
@ScriptApi
public Builder newQueryScope() {
this.queryScope = new QueryScope.StandaloneImpl();
this.queryScope = new StandaloneQueryScope();
return this;
}

Expand Down Expand Up @@ -371,7 +387,7 @@ public Builder captureQueryScopeVars(String... vars) {
return newQueryScope();
}

this.queryScope = new QueryScope.StandaloneImpl();
this.queryScope = new StandaloneQueryScope();

final QueryScopeParam<?>[] params = getContext().getQueryScope().getParams(Arrays.asList(vars));
for (final QueryScopeParam<?> param : params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
*/
package io.deephaven.engine.context;

import io.deephaven.engine.liveness.LivenessReferent;
import io.deephaven.util.ExecutionContextRegistrationException;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.Set;
import java.util.stream.Stream;

public class PoisonedQueryScope extends QueryScope {
public class PoisonedQueryScope implements QueryScope {

public static final PoisonedQueryScope INSTANCE = new PoisonedQueryScope();

Expand All @@ -28,7 +33,7 @@ public boolean hasParamName(String name) {
}

@Override
protected <T> QueryScopeParam<T> createParam(String name) throws MissingVariableException {
public <T> QueryScopeParam<T> createParam(String name) throws MissingVariableException {
return fail();
}

Expand All @@ -48,7 +53,37 @@ public <T> void putParam(String name, T value) {
}

@Override
public void putObjectFields(Object object) {
public Map<String, Object> toMap() {
return fail();
}

@Override
public boolean tryManage(@NotNull LivenessReferent referent) {
return fail();
}

@Override
public boolean tryUnmanage(@NotNull LivenessReferent referent) {
return fail();
}

@Override
public boolean tryUnmanage(@NotNull Stream<? extends LivenessReferent> referents) {
return fail();
}

@Override
public boolean tryRetainReference() {
return fail();
}

@Override
public void dropReference() {
fail();
}

@Override
public WeakReference<? extends LivenessReferent> getWeakReference() {
return fail();
}
}
Loading

0 comments on commit d3dae15

Please sign in to comment.