Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync minimal flow data #597

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cylc/uiserver/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,12 @@ def initialize_settings(self):
self.scan_interval * 1000
).start()

# configure the sync level expiry check
ioloop.PeriodicCallback(
self.data_store_mgr.check_query_sync_level_expiries,
self.data_store_mgr.SYNC_LEVEL_TIMER_INTERVAL * 1000
).start()

def initialize_handlers(self):
self.authobj = self.set_auth()
self.set_sub_server()
Expand Down
246 changes: 211 additions & 35 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from copy import deepcopy
from pathlib import Path
import time
from typing import Dict, Optional, Set
from typing import Dict, Iterable, List, Optional, Set

from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
Expand All @@ -56,6 +56,29 @@
from .utils import fmt_call
from .workflows_mgr import workflow_request

MIN_LEVEL = 'min'
MAX_LEVEL = 'max'
SUBSCRIPTION_LEVELS = {
MIN_LEVEL: {
'topics': {WORKFLOW.encode('utf-8'), b'shutdown'},
'criteria': {
'fragments': {
'AddedDelta',
'WorkflowData',
'UpdatedDelta'
},
Comment on lines +65 to +69
Copy link
Member

@oliver-sanders oliver-sanders May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we can reliably determine the topic(s) we need to subscribe to by looking at the GraphQL fragments alone:

  • We have to handle queries as well as subscriptions.
  • Subscriptions might not use fragments at all.
  • Without looking inside the fragments, we can't tell what they are looking at anyway.
  • We can have subscriptions that request only data present at the UIS but require no data from the scheduler (e.g. analysis view queries and log file subscriptions).

The difficulty in coming up with a reliable mechanism for this is why I didn't go ahead with the code I wrote for #568 (which worked, but just wasn't production grade). I think there are fundamentally two approaches:

  1. Parse the query before it is resolved (inefficient and has security implications).
  2. Add hooks into the resolvers (awkward).

To achieve (2) we could "spy" on the resolvers to see which resolvers a query/subscription is going to hit, then subscribe to the corresponding topics, wait for the data to arrive, and return the response. I.E:

async def resolve_task_proxies(...):
    await subscribe_topic('tasks')
    ...

Copy link
Member

@oliver-sanders oliver-sanders Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dwsutherland, I had a poke at the info object and found that it contains the complete parsed query.

This should allow us to determine what the query/subscription is looking at without having to rely on the client/user writing their query to use fragments named in a particular way or for the UIS to treat queries differently to subscriptions. It should also allow for finer grained protobuf subscriptions if we decide to go down that route (e.g. only subscribe to tasks & families to satisfy the tree view, add edges to the subscription to satisfy the graph view).

I had a quick go at using this to extract the requested field types. It's crude, but it seems to be good enough to determine whether a query/subscription is requesting top-level Cylc types e.g. tasks, taskProxies, edges which is all we need to know for this purpose.

from graphql import language


TOP_LEVEL_THINGGIES = {
    # TODO: populate from somewhere else?
    'tasks',
    'taskProxies',
    'families',
    'familyProxies',
    'edges',
}


def iter_top_level_types(info):
    """Loop through top-level Cylc types requested in a query/subscription."""
    ret = set()
    for selection_set in iter_selection_sets(info):
        for selection in selection_set.selections:
            # TODO: check this handles query nesting correctly, it appears to
            if isinstance(selection, language.ast.Field):
                # TODO: it looks like selection items are always fields?
                field_name = selection.name.value
                if field_name in TOP_LEVEL_THINGGIES:
                    if field_name not in ret:
                        ret.add(field_name)
                        yield field_name


def iter_selection_sets(info):
    """Loop through top-level selection sets in the query definitions."""
    if hasattr(info, 'operation'):
        yield info.operation.selection_set
    if hasattr(info, 'fragments'):
        for fragment in info.fragments.values():
            yield fragment.selection_set

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we can do this...
IMO - this can just be a refinement change, the bones are ready in the PR, and we can just build on it by changing the criteria from fragments to field sets.

But we're running out of time for 8.3.0.. so going to have to bump the cylc-flow end to 8.4

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So long as this doesn't require changes to cylc-flow (which is likely necessary for back-compat), then this can be put into a cylc-uiserver release at any point (no need to wait for any particular cylc-flow release).

},
'request': 'pb_workflow_only',
},
MAX_LEVEL: {
'topics': {ALL_DELTAS.encode('utf-8'), b'shutdown'},
'criteria': {'fragments': set()},
'request': 'pb_entire_workflow',
},
}
# expiry interval post query
QUERY_SYNC_EXPIRY = 60


def log_call(fcn):
"""Decorator for data store methods we want to log."""
Expand Down Expand Up @@ -95,13 +118,24 @@ class DataStoreMgr:
INIT_DATA_RETRY_DELAY = 0.5 # seconds
RECONCILE_TIMEOUT = 5. # seconds
PENDING_DELTA_CHECK_INTERVAL = 0.5
SYNC_LEVEL_TIMER_INTERVAL = 30

def __init__(self, workflows_mgr, log, max_threads=10):
self.workflows_mgr = workflows_mgr
self.log = log
self.data = {}
self.w_subs: Dict[str, WorkflowSubscriber] = {}
self.topics = {ALL_DELTAS.encode('utf-8'), b'shutdown'}
# graphql subscription level
self.sync_level_graphql_subs = {
MIN_LEVEL: set(),
MAX_LEVEL: set()
}
# workflow graphql subscription by level
self.workflow_sync_level_graphql_subs = {}
# workflow graphql query timers
self.workflow_query_sync_timers = {}
# resultant workflow sync level
self.workflow_sync_level = {}
self.loop = None
self.executor = ThreadPoolExecutor(max_threads)
self.delta_queues = {}
Expand All @@ -126,6 +160,18 @@ async def register_workflow(self, w_id: str, is_active: bool) -> None:
status_msg=self._get_status_msg(w_id, is_active),
)

# setup sync subscriber level sets
self.workflow_sync_level_graphql_subs[w_id] = {
MIN_LEVEL: set(),
MAX_LEVEL: set()
}

# set query sync timer
self.workflow_query_sync_timers[w_id] = 0.0

# set workflow sync level
self.workflow_sync_level[w_id] = MIN_LEVEL

@log_call
async def unregister_workflow(self, w_id):
"""Remove a workflow from the data store entirely.
Expand Down Expand Up @@ -161,26 +207,46 @@ async def connect_workflow(self, w_id, contact_data):

self.delta_queues[w_id] = {}

level = MIN_LEVEL
if self.workflow_sync_level_graphql_subs[w_id][MAX_LEVEL]:
level = MAX_LEVEL

# Might be options other than threads to achieve
# non-blocking subscriptions, but this works.
self.executor.submit(
self._start_subscription,
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT]
contact_data[CFF.PUBLISH_PORT],
SUBSCRIPTION_LEVELS[level]['topics']
)

result = await self.workflow_data_update(w_id, level)

if result:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)

@log_call
async def workflow_data_update(
self,
w_id: str,
level: str,
):
# for some reason mypy doesn't like non-fstring...
successful_updates = await self._workflow_update(
[w_id],
f'{SUBSCRIPTION_LEVELS[level]["request"]}'
)
successful_updates = await self._entire_workflow_update(ids=[w_id])

if w_id not in successful_updates:
# something went wrong, undo any changes to allow for subsequent
# connection attempts
self.log.info(f'failed to connect to {w_id}')
self.disconnect_workflow(w_id)
return False
else:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)
return True

@log_call
def disconnect_workflow(self, w_id, update_contact=True):
Expand Down Expand Up @@ -236,23 +302,30 @@ def _purge_workflow(self, w_id):
del self.data[w_id]
if w_id in self.delta_queues:
del self.delta_queues[w_id]

def _start_subscription(self, w_id, reg, host, port):
if w_id in self.workflow_sync_level_graphql_subs:
del self.workflow_sync_level_graphql_subs[w_id]
if w_id in self.workflow_query_sync_timers:
del self.workflow_query_sync_timers[w_id]
if w_id in self.workflow_sync_level:
del self.workflow_sync_level[w_id]

def _start_subscription(self, w_id, reg, host, port, topics):
"""Instantiate and run subscriber data-store sync.

Args:
w_id (str): Workflow external ID.
reg (str): Registered workflow name.
host (str): Hostname of target workflow.
port (int): Port of target workflow.
topics set(str): set of topics to subscribe to.

"""
self.w_subs[w_id] = WorkflowSubscriber(
reg,
host=host,
port=port,
context=self.workflows_mgr.context,
topics=self.topics
topics=topics
)
self.w_subs[w_id].loop.run_until_complete(
self.w_subs[w_id].subscribe(
Expand Down Expand Up @@ -283,8 +356,18 @@ def _update_workflow_data(self, topic, delta, w_id):
# close connections
self.disconnect_workflow(w_id)
return
self._apply_all_delta(w_id, delta)
self._delta_store_to_queues(w_id, topic, delta)
elif topic == WORKFLOW:
if self.workflow_sync_level_graphql_subs[w_id][MAX_LEVEL]:
return
self._apply_delta(w_id, WORKFLOW, delta)
# might seem clunky, but as with contact update, making it look
# like an ALL_DELTA avoids changing the resolver in cylc-flow
all_deltas = DELTAS_MAP[ALL_DELTAS]()
all_deltas.workflow.CopyFrom(delta)
self._delta_store_to_queues(w_id, ALL_DELTAS, all_deltas)
else:
self._apply_all_delta(w_id, delta)
self._delta_store_to_queues(w_id, topic, delta)

def _clear_data_field(self, w_id, field_name):
if field_name == WORKFLOW:
Expand All @@ -295,22 +378,26 @@ def _clear_data_field(self, w_id, field_name):
def _apply_all_delta(self, w_id, delta):
"""Apply the AllDeltas delta."""
for field, sub_delta in delta.ListFields():
delta_time = getattr(sub_delta, 'time', 0.0)
# If the workflow has reloaded clear the data before
# delta application.
if sub_delta.reloaded:
self._clear_data_field(w_id, field.name)
self.data[w_id]['delta_times'][field.name] = 0.0
# hard to catch errors in a threaded async app, so use try-except.
try:
# Apply the delta if newer than the previously applied.
if delta_time >= self.data[w_id]['delta_times'][field.name]:
apply_delta(field.name, sub_delta, self.data[w_id])
self.data[w_id]['delta_times'][field.name] = delta_time
if not sub_delta.reloaded:
self._reconcile_update(field.name, sub_delta, w_id)
except Exception as exc:
self.log.exception(exc)
self._apply_delta(w_id, field.name, sub_delta)

def _apply_delta(self, w_id, name, delta):
"""Apply delta."""
delta_time = getattr(delta, 'time', 0.0)
# If the workflow has reloaded clear the data before
# delta application.
if delta.reloaded:
self._clear_data_field(w_id, name)
self.data[w_id]['delta_times'][name] = 0.0
# hard to catch errors in a threaded async app, so use try-except.
try:
# Apply the delta if newer than the previously applied.
if delta_time >= self.data[w_id]['delta_times'][name]:
apply_delta(name, delta, self.data[w_id])
self.data[w_id]['delta_times'][name] = delta_time
if not delta.reloaded:
self._reconcile_update(name, delta, w_id)
except Exception as exc:
self.log.exception(exc)

def _delta_store_to_queues(self, w_id, topic, delta):
# Queue delta for graphql subscription resolving
Expand Down Expand Up @@ -368,20 +455,15 @@ def _reconcile_update(self, topic, delta, w_id):
except Exception as exc:
self.log.exception(exc)

async def _entire_workflow_update(
self, ids: Optional[list] = None
async def _workflow_update(
self, ids: List[str], req_method: str,
) -> Set[str]:
"""Update entire local data-store of workflow(s).

Args:
ids: List of workflow external IDs.

"""
if ids is None:
ids = []

# Request new data
req_method = 'pb_entire_workflow'

requests = {
w_id: workflow_request(
Expand Down Expand Up @@ -502,3 +584,97 @@ def _get_status_msg(self, w_id: str, is_active: bool) -> str:
else:
# the workflow has not yet run
return 'not yet run'

async def _update_subscription_level(self, w_id, level):
"""Update level of data subscribed to."""
sub = self.w_subs.get(w_id)
if sub:
stop_topics = sub.topics.difference(
SUBSCRIPTION_LEVELS[level]['topics']
)
start_topics = SUBSCRIPTION_LEVELS[level]['topics'].difference(
sub.topics
)
for stop_topic in stop_topics:
sub.unsubscribe_topic(stop_topic)
# Doing this after unsubscribe and before subscribe
# to make sure old topics stop and new data is in place.
await self.workflow_data_update(w_id, level)
for start_topic in start_topics:
sub.subscribe_topic(start_topic)
self.workflow_sync_level[w_id] = level

def graphql_sub_interrogate(self, sub_id, info):
"""Scope data requirements."""
fragments = set(info.fragments.keys())
minimal = (
(
fragments
<= SUBSCRIPTION_LEVELS[MIN_LEVEL]['criteria']['fragments']
)
and bool(fragments)
)
Comment on lines +610 to +616
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how Protobuf subscriptions could be escalated back to the minimum level after GraphQL subscriptions complete under this model.

Copy link
Member Author

@dwsutherland dwsutherland May 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They already do, once the subscription ends, switching between workflows in the UI will do it.. or closing the browser (obviously)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if minimal:
self.sync_level_graphql_subs[MIN_LEVEL].add(sub_id)
return
self.sync_level_graphql_subs[MAX_LEVEL].add(sub_id)

async def graphql_sub_data_match(self, w_id, sub_id):
"""Match store data level to requested graphql subscription."""
sync_level_wsubs = self.workflow_sync_level_graphql_subs[w_id]
if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]:
no_max = not sync_level_wsubs[MAX_LEVEL]
sync_level_wsubs[MAX_LEVEL].add(sub_id)
if no_max:
await self._update_subscription_level(w_id, MAX_LEVEL)
else:
sync_level_wsubs[MIN_LEVEL].add(sub_id)

async def graphql_sub_discard(self, sub_id):
"""Discard graphql subscription references."""
level = MIN_LEVEL
if sub_id in self.sync_level_graphql_subs[MAX_LEVEL]:
level = MAX_LEVEL
self.sync_level_graphql_subs[level].discard(sub_id)
for w_id in self.workflow_sync_level_graphql_subs:
self.workflow_sync_level_graphql_subs[w_id][level].discard(
sub_id
)
# if there are no more max level subscriptions after removal
# of a max level sub, downgrade to min.
if (
not self.workflow_sync_level_graphql_subs[w_id][level]
and level is MAX_LEVEL
and self.workflow_query_sync_timers[w_id] < time.time()
):
await self._update_subscription_level(w_id, MIN_LEVEL)

async def set_query_sync_levels(
self,
w_ids: Iterable[str],
level: Optional[str] = None,
expire_delay: Optional[float] = None,
):
"""Set a workflow sync level."""
if level is None:
level = MAX_LEVEL
if expire_delay is None:
expire_delay = QUERY_SYNC_EXPIRY
expire_time = time.time() + expire_delay
for w_id in w_ids:
self.workflow_query_sync_timers[w_id] = expire_time
if self.workflow_sync_level[w_id] is level:
# Already required level
continue
await self._update_subscription_level(w_id, level)

async def check_query_sync_level_expiries(self):
"""Check for and downgrade expired sub levels."""
for w_id, expiry in self.workflow_query_sync_timers.items():
if (
w_id in self.w_subs
and self.workflow_sync_level[w_id] is not MIN_LEVEL
and not self.workflow_sync_level_graphql_subs[w_id][MAX_LEVEL]
and expiry < time.time()
):
await self._update_subscription_level(w_id, MIN_LEVEL)
11 changes: 6 additions & 5 deletions cylc/uiserver/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
WorkflowFilesError,
)
from cylc.flow.id import Tokens
from cylc.flow.network.resolvers import BaseResolvers
from cylc.flow.network.resolvers import BaseResolvers, workflow_filter
from cylc.flow.scripts.clean import CleanOptions
from cylc.flow.scripts.clean import run

Expand Down Expand Up @@ -497,11 +497,12 @@ async def mutator(
)
}
w_ids = [
flow[WORKFLOW].id
for flow in await self.get_workflows_data(w_args)]
workflow[WORKFLOW].id
for workflow in self.data_store_mgr.data.values()
if workflow_filter(workflow, w_args)
]
if not w_ids:
return [{
'response': (False, 'No matching workflows')}]
return [{'response': (False, 'No matching workflows')}]
# Pass the request to the workflow GraphQL endpoints
_, variables, _, _ = info.context.get( # type: ignore[union-attr]
'graphql_params'
Expand Down
Loading
Loading