Skip to content

Commit

Permalink
Merge pull request #570 from oliver-sanders/569
Browse files Browse the repository at this point in the history
data store mgr: Increase the max number of workflows to 100
  • Loading branch information
oliver-sanders authored Mar 27, 2024
2 parents 71ca652 + 32aadc5 commit 1e72de8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 4 deletions.
16 changes: 15 additions & 1 deletion cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,16 @@ class CylcUIServer(ExtensionApp):
''',
default_value=1
)
max_threads = Int(
config=True,
help='''
Set the maximum number of threads the Cylc UI Server can use.
This determines the maximum number of active workflows that the
server can track.
''',
default_value=100,
)

@validate('ui_build_dir')
def _check_ui_build_dir_exists(self, proposed):
Expand Down Expand Up @@ -384,7 +394,11 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ProcessPoolExecutor(max_workers=self.max_workers)
self.workflows_mgr = WorkflowsManager(self, log=self.log)
self.data_store_mgr = DataStoreMgr(self.workflows_mgr, self.log)
self.data_store_mgr = DataStoreMgr(
self.workflows_mgr,
self.log,
self.max_threads,
)
# sub_status dictionary storing status of subscriptions
self.sub_statuses = {}
self.resolvers = Resolvers(
Expand Down
22 changes: 19 additions & 3 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,37 @@ def _inner(*args, **kwargs): # works for serial & async calls


class DataStoreMgr:
"""Manage the local data-store acquisition/updates for all workflows."""
"""Manage the local data-store acquisition/updates for all workflows.
Args:
workflows_mgr:
Service that scans for workflows.
log:
Application logger.
max_threads:
Max number of threads to use for subscriptions.
Note, this determines the maximum number of active workflows that
can be updated.
This should be overridden for real use in the UIS app. The
default is here for test purposes.
"""

INIT_DATA_WAIT_TIME = 5. # seconds
INIT_DATA_RETRY_DELAY = 0.5 # seconds
RECONCILE_TIMEOUT = 5. # seconds
PENDING_DELTA_CHECK_INTERVAL = 0.5

def __init__(self, workflows_mgr, log):
def __init__(self, workflows_mgr, log, max_threads=10):
self.workflows_mgr = workflows_mgr
self.log = log
self.data = {}
self.w_subs: Dict[str, WorkflowSubscriber] = {}
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
self.loop = None
self.executor = ThreadPoolExecutor()
self.executor = ThreadPoolExecutor(max_threads)
self.delta_queues = {}

@log_call
Expand Down

0 comments on commit 1e72de8

Please sign in to comment.