diff --git a/cylc/uiserver/app.py b/cylc/uiserver/app.py index e821a6f2..999770a9 100644 --- a/cylc/uiserver/app.py +++ b/cylc/uiserver/app.py @@ -324,6 +324,16 @@ class CylcUIServer(ExtensionApp): ''', default_value=1 ) + max_threads = Int( + config=True, + help=''' + Set the maximum number of threads the Cylc UI Server can use. + + This determines the maximum number of active workflows that the + server can track. + ''', + default_value=100, + ) @validate('ui_build_dir') def _check_ui_build_dir_exists(self, proposed): @@ -384,7 +394,11 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.executor = ProcessPoolExecutor(max_workers=self.max_workers) self.workflows_mgr = WorkflowsManager(self, log=self.log) - self.data_store_mgr = DataStoreMgr(self.workflows_mgr, self.log) + self.data_store_mgr = DataStoreMgr( + self.workflows_mgr, + self.log, + self.max_threads, + ) # sub_status dictionary storing status of subscriptions self.sub_statuses = {} self.resolvers = Resolvers( diff --git a/cylc/uiserver/data_store_mgr.py b/cylc/uiserver/data_store_mgr.py index 101fc361..b7e526a0 100644 --- a/cylc/uiserver/data_store_mgr.py +++ b/cylc/uiserver/data_store_mgr.py @@ -73,21 +73,37 @@ def _inner(*args, **kwargs): # works for serial & async calls class DataStoreMgr: - """Manage the local data-store acquisition/updates for all workflows.""" + """Manage the local data-store acquisition/updates for all workflows. + + Args: + workflows_mgr: + Service that scans for workflows. + log: + Application logger. + max_threads: + Max number of threads to use for subscriptions. + + Note, this determines the maximum number of active workflows that + can be updated. + + This should be overridden for real use in the UIS app. The + default is here for test purposes. + + """ INIT_DATA_WAIT_TIME = 5. # seconds INIT_DATA_RETRY_DELAY = 0.5 # seconds RECONCILE_TIMEOUT = 5. # seconds PENDING_DELTA_CHECK_INTERVAL = 0.5 - def __init__(self, workflows_mgr, log): + def __init__(self, workflows_mgr, log, max_threads=10): self.workflows_mgr = workflows_mgr self.log = log self.data = {} self.w_subs: Dict[str, WorkflowSubscriber] = {} self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'} self.loop = None - self.executor = ThreadPoolExecutor() + self.executor = ThreadPoolExecutor(max_threads) self.delta_queues = {} @log_call