diff --git a/cylc/uiserver/schema.py b/cylc/uiserver/schema.py index 3deb6237..a6c5ef11 100644 --- a/cylc/uiserver/schema.py +++ b/cylc/uiserver/schema.py @@ -32,6 +32,7 @@ ID, SortArgs, Task, + Job, Mutations, Queries, process_resolver_info, @@ -281,6 +282,53 @@ class Meta: result = GenericScalar() +async def get_tasks(root, info, **kwargs): + if kwargs['live']: + return await get_nodes_all(root, info, **kwargs) + + _, field_ids = process_resolver_info(root, info, kwargs) + + if hasattr(kwargs, 'id'): + kwargs['ids'] = [kwargs.get('id')] + if field_ids: + if isinstance(field_ids, str): + field_ids = [field_ids] + elif isinstance(field_ids, dict): + field_ids = list(field_ids) + kwargs['ids'] = field_ids + elif field_ids == []: + return [] + + for arg in ('ids', 'exids'): + # live objects can be represented by a universal ID + kwargs[arg] = [Tokens(n_id, relative=True) for n_id in kwargs[arg]] + kwargs['workflows'] = [ + Tokens(w_id) for w_id in kwargs['workflows']] + kwargs['exworkflows'] = [ + Tokens(w_id) for w_id in kwargs['exworkflows']] + + return await list_tasks(kwargs) + + +async def list_tasks(args): + if not args['workflows']: + raise Exception('At least one workflow must be provided.') + from cylc.flow.rundb import CylcWorkflowDAO + from cylc.flow.pathutil import get_workflow_run_dir + from cylc.flow.workflow_files import WorkflowFiles + tasks = [] + for workflow in args['workflows']: + db_file = get_workflow_run_dir( + workflow['workflow'], + WorkflowFiles.LogDir.DIRNAME, + "db" + ) + with CylcWorkflowDAO(db_file, is_public=True) as dao: + conn = dao.connect() + tasks.extend(make_query(conn, workflow)) + return tasks + + async def get_jobs(root, info, **kwargs): if kwargs['live']: return await get_nodes_all(root, info, **kwargs) @@ -324,7 +372,7 @@ async def list_jobs(args): ) with CylcWorkflowDAO(db_file, is_public=True) as dao: conn = dao.connect() - jobs.extend(make_query(conn, workflow)) + jobs.extend(make_jobs_query(conn, workflow, args.get('tasks'))) return jobs @@ -451,6 +499,62 @@ def make_query(conn, workflow): return tasks +def make_jobs_query(conn, workflow, tasks): + + # TODO: support all arguments including states + # https://github.com/cylc/cylc-uiserver/issues/440 + jobs = [] + + # Create sql snippet used to limit which tasks are returned by query + if tasks: + where_clauses = "' OR name = '".join(tasks) + where_clauses = f" AND (name = '{where_clauses}')" + else: + where_clauses = '' + + for row in conn.execute(f''' +SELECT + name, + cycle, + submit_num, + submit_status, + time_run, + time_run_exit, + job_id, + platform_name, + time_submit, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time, + STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time +FROM + task_jobs +WHERE + run_status = 0 + {where_clauses}; +'''): + jobs.append({ + 'id': workflow.duplicate( + cycle=row[1], + task=row[0], + job=row[2] + ), + 'name': row[0], + 'cycle_point': row[1], + 'submit_num': row[2], + 'state': row[3], + 'started_time': row[4], + 'finished_time': row[5], + 'job_ID': row[6], + 'platform': row[7], + 'submitted_time': row[8], + 'total_time': row[9], + 'run_time': row[10], + 'queue_time': row[11] + }) + + return jobs + + class UISTask(Task): platform = graphene.String() @@ -484,6 +588,13 @@ class UISTask(Task): count = graphene.Int() +class UISJob(Job): + + total_time = graphene.Int() + queue_time = graphene.Int() + run_time = graphene.Int() + + class UISQueries(Queries): class LogFiles(graphene.ObjectType): @@ -511,6 +622,22 @@ class LogFiles(graphene.ObjectType): description=Task._meta.description, live=graphene.Boolean(default_value=True), strip_null=STRIP_NULL_DEFAULT, + resolver=get_tasks, + workflows=graphene.List(ID, default_value=[]), + exworkflows=graphene.List(ID, default_value=[]), + ids=graphene.List(ID, default_value=[]), + exids=graphene.List(ID, default_value=[]), + mindepth=graphene.Int(default_value=-1), + maxdepth=graphene.Int(default_value=-1), + sort=SortArgs(default_value=None), + + ) + + jobs = graphene.List( + UISJob, + description=Job._meta.description, + live=graphene.Boolean(default_value=True), + strip_null=STRIP_NULL_DEFAULT, resolver=get_jobs, workflows=graphene.List(ID, default_value=[]), exworkflows=graphene.List(ID, default_value=[]), @@ -519,6 +646,7 @@ class LogFiles(graphene.ObjectType): mindepth=graphene.Int(default_value=-1), maxdepth=graphene.Int(default_value=-1), sort=SortArgs(default_value=None), + tasks=graphene.List(ID, default_value=[]) )