From cb438288212c8e3d93c3405e86c1b9d804ca1f88 Mon Sep 17 00:00:00 2001 From: JAllen42 Date: Tue, 25 Jul 2023 11:49:20 +0100 Subject: [PATCH 1/2] Add UISJob to schema --- cylc/uiserver/schema.py | 126 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 125 insertions(+), 1 deletion(-) diff --git a/cylc/uiserver/schema.py b/cylc/uiserver/schema.py index 3deb6237..f965c133 100644 --- a/cylc/uiserver/schema.py +++ b/cylc/uiserver/schema.py @@ -32,6 +32,7 @@ ID, SortArgs, Task, + Job, Mutations, Queries, process_resolver_info, @@ -281,6 +282,53 @@ class Meta: result = GenericScalar() +async def get_tasks(root, info, **kwargs): + if kwargs['live']: + return await get_nodes_all(root, info, **kwargs) + + _, field_ids = process_resolver_info(root, info, kwargs) + + if hasattr(kwargs, 'id'): + kwargs['ids'] = [kwargs.get('id')] + if field_ids: + if isinstance(field_ids, str): + field_ids = [field_ids] + elif isinstance(field_ids, dict): + field_ids = list(field_ids) + kwargs['ids'] = field_ids + elif field_ids == []: + return [] + + for arg in ('ids', 'exids'): + # live objects can be represented by a universal ID + kwargs[arg] = [Tokens(n_id, relative=True) for n_id in kwargs[arg]] + kwargs['workflows'] = [ + Tokens(w_id) for w_id in kwargs['workflows']] + kwargs['exworkflows'] = [ + Tokens(w_id) for w_id in kwargs['exworkflows']] + + return await list_tasks(kwargs) + + +async def list_tasks(args): + if not args['workflows']: + raise Exception('At least one workflow must be provided.') + from cylc.flow.rundb import CylcWorkflowDAO + from cylc.flow.pathutil import get_workflow_run_dir + from cylc.flow.workflow_files import WorkflowFiles + tasks = [] + for workflow in args['workflows']: + db_file = get_workflow_run_dir( + workflow['workflow'], + WorkflowFiles.LogDir.DIRNAME, + "db" + ) + with CylcWorkflowDAO(db_file, is_public=True) as dao: + conn = dao.connect() + tasks.extend(make_query(conn, workflow)) + return tasks + + async def get_jobs(root, info, **kwargs): if kwargs['live']: return await get_nodes_all(root, info, **kwargs) @@ -324,7 +372,7 @@ async def list_jobs(args): ) with CylcWorkflowDAO(db_file, is_public=True) as dao: conn = dao.connect() - jobs.extend(make_query(conn, workflow)) + jobs.extend(make_jobs_query(conn, workflow, args.get('task_name'))) return jobs @@ -451,6 +499,58 @@ def make_query(conn, workflow): return tasks +def make_jobs_query(conn, workflow, task): + + # TODO: support all arguments including states + # https://github.com/cylc/cylc-uiserver/issues/440 + jobs = [] + # Make this more secure + if task: + snippet = f" AND name = '{task}'" + else: + snippet = '' + for row in conn.execute(f''' +SELECT + name, + cycle, + submit_num, + submit_status, + time_run, + time_run_exit, + job_id, + platform_name, + time_submit, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time, + STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time, + STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time +FROM + task_jobs +WHERE + run_status = 0{snippet}; +'''): + jobs.append({ + 'id': workflow.duplicate( + cycle=row[1], + task=row[0], + job=row[2] + ), + 'name': row[0], + 'cycle_point': row[1], + 'submit_num': row[2], + 'state': row[3], + 'started_time': row[4], + 'finished_time': row[5], + 'job_ID': row[6], + 'platform': row[7], + 'submitted_time': row[8], + 'total_time': row[9], + 'run_time': row[10], + 'queue_time': row[11] + }) + + return jobs + + class UISTask(Task): platform = graphene.String() @@ -484,6 +584,13 @@ class UISTask(Task): count = graphene.Int() +class UISJob(Job): + + total_time = graphene.Int() + queue_time = graphene.Int() + run_time = graphene.Int() + + class UISQueries(Queries): class LogFiles(graphene.ObjectType): @@ -511,6 +618,22 @@ class LogFiles(graphene.ObjectType): description=Task._meta.description, live=graphene.Boolean(default_value=True), strip_null=STRIP_NULL_DEFAULT, + resolver=get_tasks, + workflows=graphene.List(ID, default_value=[]), + exworkflows=graphene.List(ID, default_value=[]), + ids=graphene.List(ID, default_value=[]), + exids=graphene.List(ID, default_value=[]), + mindepth=graphene.Int(default_value=-1), + maxdepth=graphene.Int(default_value=-1), + sort=SortArgs(default_value=None), + + ) + + jobs = graphene.List( + UISJob, + description=Job._meta.description, + live=graphene.Boolean(default_value=True), + strip_null=STRIP_NULL_DEFAULT, resolver=get_jobs, workflows=graphene.List(ID, default_value=[]), exworkflows=graphene.List(ID, default_value=[]), @@ -519,6 +642,7 @@ class LogFiles(graphene.ObjectType): mindepth=graphene.Int(default_value=-1), maxdepth=graphene.Int(default_value=-1), sort=SortArgs(default_value=None), + task_name=graphene.String() ) From b779063710647c3ef7b51cc44a3f72fbdc7be3c0 Mon Sep 17 00:00:00 2001 From: JAllen42 Date: Mon, 2 Oct 2023 16:43:38 +0100 Subject: [PATCH 2/2] Change jobs query to use list of tasks --- cylc/uiserver/schema.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/cylc/uiserver/schema.py b/cylc/uiserver/schema.py index f965c133..a6c5ef11 100644 --- a/cylc/uiserver/schema.py +++ b/cylc/uiserver/schema.py @@ -372,7 +372,7 @@ async def list_jobs(args): ) with CylcWorkflowDAO(db_file, is_public=True) as dao: conn = dao.connect() - jobs.extend(make_jobs_query(conn, workflow, args.get('task_name'))) + jobs.extend(make_jobs_query(conn, workflow, args.get('tasks'))) return jobs @@ -499,16 +499,19 @@ def make_query(conn, workflow): return tasks -def make_jobs_query(conn, workflow, task): +def make_jobs_query(conn, workflow, tasks): # TODO: support all arguments including states # https://github.com/cylc/cylc-uiserver/issues/440 jobs = [] - # Make this more secure - if task: - snippet = f" AND name = '{task}'" + + # Create sql snippet used to limit which tasks are returned by query + if tasks: + where_clauses = "' OR name = '".join(tasks) + where_clauses = f" AND (name = '{where_clauses}')" else: - snippet = '' + where_clauses = '' + for row in conn.execute(f''' SELECT name, @@ -526,7 +529,8 @@ def make_jobs_query(conn, workflow, task): FROM task_jobs WHERE - run_status = 0{snippet}; + run_status = 0 + {where_clauses}; '''): jobs.append({ 'id': workflow.duplicate( @@ -642,7 +646,7 @@ class LogFiles(graphene.ObjectType): mindepth=graphene.Int(default_value=-1), maxdepth=graphene.Int(default_value=-1), sort=SortArgs(default_value=None), - task_name=graphene.String() + tasks=graphene.List(ID, default_value=[]) )