diff --git a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java index c7c80b27080..f37f58e3907 100644 --- a/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java +++ b/Integrations/src/main/java/io/deephaven/integrations/python/PythonDeephavenSession.java @@ -24,6 +24,7 @@ import io.deephaven.util.SafeCloseable; import io.deephaven.util.annotations.ScriptApi; import io.deephaven.util.annotations.VisibleForTesting; +import io.deephaven.util.thread.NamingThreadFactory; import io.deephaven.util.thread.ThreadInitializationFactory; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -44,6 +45,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -93,6 +97,7 @@ public PythonDeephavenSession( } scriptFinder = new ScriptFinder(DEFAULT_SCRIPT_PATH); + registerJavaExecutor(threadInitializationFactory); publishInitial(); /* * And now the user-defined initialization scripts, if any. @@ -122,9 +127,26 @@ public PythonDeephavenSession(final UpdateGraph updateGraph, } scriptFinder = null; + registerJavaExecutor(threadInitializationFactory); publishInitial(); } + private void registerJavaExecutor(ThreadInitializationFactory threadInitializationFactory) { + // TODO (deephaven-core#4040) Temporary exec service until we have cleaner startup wiring + try (PyModule pyModule = PyModule.importModule("deephaven.server.executors"); + final PythonDeephavenThreadsModule module = pyModule.createProxy(PythonDeephavenThreadsModule.class)) { + NamingThreadFactory threadFactory = new NamingThreadFactory(PythonDeephavenSession.class, "serverThread") { + @Override + public Thread newThread(@NotNull Runnable r) { + return super.newThread(threadInitializationFactory.createInitializer(r)); + } + }; + ExecutorService executorService = Executors.newFixedThreadPool(1, threadFactory); + module._register_named_java_executor("serial", executorService::submit); + module._register_named_java_executor("concurrent", executorService::submit); + } + } + @Override @VisibleForTesting public QueryScope newQueryScope() { @@ -327,4 +349,10 @@ interface PythonScriptSessionModule extends Closeable { void close(); } + + interface PythonDeephavenThreadsModule extends Closeable { + void close(); + + void _register_named_java_executor(String executorName, Consumer execute); + } } diff --git a/py/server/deephaven/server/__init__.py b/py/server/deephaven/server/__init__.py index b5af5621f70..cb8603f5850 100644 --- a/py/server/deephaven/server/__init__.py +++ b/py/server/deephaven/server/__init__.py @@ -1,6 +1,3 @@ # # Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending # - -# Packages under the deephaven.server heading are not meant to be called externally - it exists as a convenient place -# for the server to execute implementation logic via python diff --git a/py/server/deephaven/server/executors.py b/py/server/deephaven/server/executors.py new file mode 100644 index 00000000000..b655621468a --- /dev/null +++ b/py/server/deephaven/server/executors.py @@ -0,0 +1,63 @@ +# +# Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending +# +""" +Support for running operations on JVM server threads, so that they can be given work from python. Initially, there +are two executors, "serial" and "concurrent". Any task that will take an exclusive UGP lock should use the serial +executor, otherwise the concurrent executor should be used. In the future there may be a "fast" executor, for use +when there is no chance of using either lock. +""" + +from typing import Callable, Dict, List +import jpy +from deephaven.jcompat import j_runnable +from deephaven import DHError + + +_executors: Dict[str, Callable[[Callable[[], None]], None]] = {} + + +def has_executor(executor_name: str) -> bool: + """ + Returns True if an executor exists with that name. + """ + return executor_name in executor_names() + + +def executor_names() -> List[str]: + """ + Returns: the List of known executor names + """ + return list(_executors.keys()) + + +def submit_task(executor_name: str, task: Callable[[], None]) -> None: + """ + Submits a task to run on a named executor. If no such executor exists, raises KeyError. + + Typically, tasks should not block on other threads. Ensure tasks never block on other tasks submitted to the same executor. + + Args: + executor_name (str): the name of the executor to submit the task to + task (Callable[[], None]): the function to run on the named executor + + Raises: + KeyError if the executor name + """ + _executors[executor_name](task) + + +def _register_named_java_executor(executor_name: str, java_executor: jpy.JType) -> None: + """ + Provides a Java executor for user code to submit tasks to. Called during server startup. + + Args: + executor_name (str): the name of the executor to register + java_executor (jpy.JType): a Java Consumer instance + + Raises: + DHError + """ + if executor_name in executor_names(): + raise DHError(f"Executor with name {executor_name} already registered") + _executors[executor_name] = lambda task: java_executor.accept(j_runnable(task))