From c981964245cc96485c22d56148c81266b0dbb448 Mon Sep 17 00:00:00 2001 From: Oliver Sanders Date: Fri, 13 Oct 2023 16:27:16 +0100 Subject: [PATCH] schema: add source workflows --- cylc/uiserver/schema.py | 73 +++++++-- cylc/uiserver/services/__init__.py | 15 ++ cylc/uiserver/services/source_workflows.py | 106 +++++++++++++ cylc/uiserver/tests/conftest.py | 8 + cylc/uiserver/tests/services/__init__.py | 14 ++ .../tests/services/test_source_workflows.py | 143 ++++++++++++++++++ 6 files changed, 344 insertions(+), 15 deletions(-) create mode 100644 cylc/uiserver/services/__init__.py create mode 100644 cylc/uiserver/services/source_workflows.py create mode 100644 cylc/uiserver/tests/services/__init__.py create mode 100644 cylc/uiserver/tests/services/test_source_workflows.py diff --git a/cylc/uiserver/schema.py b/cylc/uiserver/schema.py index 99933a74..674f4786 100644 --- a/cylc/uiserver/schema.py +++ b/cylc/uiserver/schema.py @@ -25,33 +25,43 @@ import graphene from graphene.types.generic import GenericScalar +from cylc.flow.data_store_mgr import ( + DELTA_ADDED, +) from cylc.flow.id import Tokens from cylc.flow.data_store_mgr import JOBS, TASKS from cylc.flow.rundb import CylcWorkflowDAO from cylc.flow.pathutil import get_workflow_run_dir from cylc.flow.workflow_files import WorkflowFiles from cylc.flow.network.schema import ( - NODE_MAP, CyclePoint, GenericResponse, - SortArgs, - Task, + ID, Job, Mutations, + NODE_MAP, Queries, - process_resolver_info, STRIP_NULL_DEFAULT, + SortArgs, Subscriptions, + Task, + Workflow, WorkflowID, _mut_field, + get_nodes_all, + get_workflows, + process_resolver_info, sstrip, - get_nodes_all ) from cylc.uiserver.resolvers import ( Resolvers, list_log_files, stream_log, ) +from cylc.uiserver.services.source_workflows import ( + list_source_workflows, + get_workflow_source, +) if TYPE_CHECKING: from graphql import ResolveInfo @@ -552,17 +562,37 @@ class UISJob(Job): run_time = graphene.Int() -class UISQueries(Queries): +class SourceWorkflow(graphene.ObjectType): + """A Cylc workflow source directory. + + This may or may not be located within the configured cylc source + directories. For workflows located outside of the configured + directories, the "name" field will allways be null. + """ + name = graphene.String( + description='The name of the source workflow' + ) + path = graphene.String( + description='The location of the source workflow.' + ) + + +class UISWorkflow(Workflow): + source = graphene.Field( + SourceWorkflow, + resolver=get_workflow_source, + ) + - class LogFiles(graphene.ObjectType): - # Example GraphiQL query: - # { - # logFiles(workflowID: "", task: "") { - # files - # } - # } - files = graphene.List(graphene.String) +class LogFiles(graphene.ObjectType): + files = graphene.List(graphene.String) + +class UISQueries(Queries): + source_workflows = graphene.List( + SourceWorkflow, + resolver=list_source_workflows, + ) log_files = graphene.Field( LogFiles, description='List available job logs', @@ -573,7 +603,6 @@ class LogFiles(graphene.ObjectType): ), resolver=list_log_files ) - tasks = graphene.List( UISTask, description=Task._meta.description, @@ -606,6 +635,20 @@ class LogFiles(graphene.ObjectType): tasks=graphene.List(graphene.ID, default_value=[]) ) + workflows = graphene.List( + UISWorkflow, + description=Workflow._meta.description, + ids=graphene.List(ID, default_value=[]), + exids=graphene.List(ID, default_value=[]), + # TODO: Change these defaults post #3500 in coordination with WUI + strip_null=graphene.Boolean(default_value=False), + delta_store=graphene.Boolean(default_value=False), + delta_type=graphene.String(default_value=DELTA_ADDED), + initial_burst=graphene.Boolean(default_value=True), + ignore_interval=graphene.Float(default_value=2.5), + resolver=get_workflows + ) + class UISSubscriptions(Subscriptions): # Example graphiql workflow log subscription: diff --git a/cylc/uiserver/services/__init__.py b/cylc/uiserver/services/__init__.py new file mode 100644 index 00000000..763e078d --- /dev/null +++ b/cylc/uiserver/services/__init__.py @@ -0,0 +1,15 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . diff --git a/cylc/uiserver/services/source_workflows.py b/cylc/uiserver/services/source_workflows.py new file mode 100644 index 00000000..2ee627d9 --- /dev/null +++ b/cylc/uiserver/services/source_workflows.py @@ -0,0 +1,106 @@ +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +"""Utilities relating to the listing and management of source workflows.""" + +from contextlib import suppress +from pathlib import Path +from typing import Optional, List, Dict + +from cylc.flow.cfgspec.glbl_cfg import glbl_cfg +from cylc.flow.id import Tokens +from cylc.flow.network.scan import scan_multi +from cylc.flow.pathutil import get_workflow_run_dir +from cylc.flow.workflow_files import get_workflow_source_dir + + +# the user's configured workflow source directories +SOURCE_DIRS: List[Path] = [ + Path(source_dir).expanduser() + for source_dir in glbl_cfg().get(['install', 'source dirs']) +] + + +SourceWorkflow = Dict + + +def _source_workflow(source_path: Path) -> SourceWorkflow: + """Return the fields required to resolve a SourceWorkflow. + + Args: + source_path: + Path to the source workflow directory. + + """ + return { + 'name': _get_source_workflow_name(source_path), + 'path': source_path, + } + + +def _blank_source_workflow() -> SourceWorkflow: + """Return a blank source workflow. + + This will be used for workflows which were not installed by "cylc install". + """ + + return {'name': None, 'path': None} + + +def _get_source_workflow_name(source_path: Path) -> Optional[str]: + """Return the "name" of the source workflow. + + This is the "name" that can be provided to the "cylc install" command. + + Args: + source_path: + Path to the source workflow directory. + + Returns: + The source workflow name if the source workflow is located within + a configured source directory, else None. + + """ + for source_dir in SOURCE_DIRS: + with suppress(ValueError): + return str(source_path.relative_to(source_dir)) + return None + + +def _get_workflow_source(workflow_id): + """Return the source workflow for the given workflow ID.""" + run_dir = get_workflow_run_dir(workflow_id) + source_dir, _symlink = get_workflow_source_dir(run_dir) + if source_dir: + return _source_workflow(Path(source_dir)) + return _blank_source_workflow() + + +async def list_source_workflows(*_) -> List[SourceWorkflow]: + """List source workflows located in the configured source directories.""" + ret = [] + async for flow in scan_multi(SOURCE_DIRS): + ret.append(_source_workflow(flow['path'])) + return ret + + +def get_workflow_source(data, _, **kwargs) -> Optional[SourceWorkflow]: + """Resolve the source for an installed workflow. + + If the source cannot be resolved, e.g. if the workflow was not installed by + "cylc install", then this will return None. + """ + workflow_id = Tokens(data.id)['workflow'] + return _get_workflow_source(workflow_id) diff --git a/cylc/uiserver/tests/conftest.py b/cylc/uiserver/tests/conftest.py index 54cfac5c..6c19f8c1 100644 --- a/cylc/uiserver/tests/conftest.py +++ b/cylc/uiserver/tests/conftest.py @@ -26,6 +26,7 @@ import zmq from jupyter_server.auth.identity import User +from _pytest.monkeypatch import MonkeyPatch from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.id import Tokens @@ -407,3 +408,10 @@ def _inner(cached=False): yield _mock_glbl_cfg rmtree(tmp_path) + + +@pytest.fixture(scope='module') +def mod_monkeypatch(): + monkeypatch = MonkeyPatch() + yield monkeypatch + monkeypatch.undo() diff --git a/cylc/uiserver/tests/services/__init__.py b/cylc/uiserver/tests/services/__init__.py new file mode 100644 index 00000000..5ad6d67e --- /dev/null +++ b/cylc/uiserver/tests/services/__init__.py @@ -0,0 +1,14 @@ +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . diff --git a/cylc/uiserver/tests/services/test_source_workflows.py b/cylc/uiserver/tests/services/test_source_workflows.py new file mode 100644 index 00000000..2a70f8fb --- /dev/null +++ b/cylc/uiserver/tests/services/test_source_workflows.py @@ -0,0 +1,143 @@ +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from cylc.uiserver.services.source_workflows import ( + _get_source_workflow_name, + _get_workflow_source, + list_source_workflows, +) + +import pytest + + +@pytest.fixture(scope='module') +def source_dirs(mod_monkeypatch, mod_tmp_path): + source_dir_a = mod_tmp_path / 'a' + source_dir_b = mod_tmp_path / 'b' + source_dir_a.mkdir() + source_dir_b.mkdir() + + # a:foo + foo = source_dir_a / 'foo' + foo.mkdir() + (foo / 'flow.cylc').touch() + + # a:bar/b1 + bar = source_dir_a / 'bar/b1' + bar.mkdir(parents=True) + (bar / 'flow.cylc').touch() + + # b:baz + baz = source_dir_b / 'baz' + baz.mkdir() + (baz / 'flow.cylc').touch() + + mod_monkeypatch.setattr( + 'cylc.uiserver.services.source_workflows.SOURCE_DIRS', + [source_dir_a, source_dir_b] + ) + + return [source_dir_a, source_dir_b] + + +def _make_installed_workflow(mod_tmp_path, id_, source_path, run_name='run1'): + root_run_dir = mod_tmp_path / id_ + if run_name: + run_dir = root_run_dir / run_name + else: + run_dir = root_run_dir + run_dir.mkdir(parents=True) + (run_dir / 'flow.cylc').touch() + if source_path: + install_dir = (root_run_dir / '_cylc-install') + install_dir.mkdir() + (install_dir / 'source').symlink_to(source_path) + return root_run_dir + + +@pytest.fixture(scope='module') +def installed_workflows(source_dirs, mod_monkeypatch, mod_tmp_path): + a, b, *_ = source_dirs + + def _get_workflow_run_dir(id_): + return mod_tmp_path / id_ + + mod_monkeypatch.setattr( + 'cylc.uiserver.services.source_workflows.get_workflow_run_dir', + _get_workflow_run_dir, + ) + + # ~user/foo/run1: normally installed workflow + _make_installed_workflow( + mod_tmp_path, + 'one', a / 'foo', + ) + + # ~user/bar: installed from source outside of SOURCE_DIRS + _make_installed_workflow( + mod_tmp_path, + 'two', + mod_tmp_path / 'somewhere-else', + run_name=None, + ) + + # ~user/baz: not installed + _make_installed_workflow( + mod_tmp_path, + 'three', + None, + ) + + +def test_get_source_workflow_name(source_dirs): + a, b, *_ = source_dirs + assert _get_source_workflow_name(a / 'foo') == 'foo' + assert _get_source_workflow_name(a / 'bar/b1') == 'bar/b1' + assert _get_source_workflow_name(b / 'baz') == 'baz' + + +async def test_list_source_workflows(source_dirs): + a, b, *_ = source_dirs + source_workflows = await list_source_workflows() + assert sorted(source_workflows, key=lambda x: x['name']) == [ + { + 'name': 'bar/b1', + 'path': a / 'bar/b1', + }, + { + 'name': 'baz', + 'path': b / 'baz', + }, + { + 'name': 'foo', + 'path': a / 'foo', + }, + ] + + +def test_get_workflow_source(source_dirs, installed_workflows): + a, b, *_ = source_dirs + assert _get_workflow_source('one') == { + 'name': 'foo', + 'path': a / 'foo', + } + assert _get_workflow_source('two') == { + 'name': None, + 'path': a.parent / 'somewhere-else', + } + assert _get_workflow_source('three') == { + 'name': None, + 'path': None, + }