Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing querying of historic job data #511

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 129 additions & 1 deletion cylc/uiserver/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
ID,
SortArgs,
Task,
Job,
Mutations,
Queries,
process_resolver_info,
Expand Down Expand Up @@ -281,6 +282,53 @@ class Meta:
result = GenericScalar()


async def get_tasks(root, info, **kwargs):
if kwargs['live']:
return await get_nodes_all(root, info, **kwargs)

_, field_ids = process_resolver_info(root, info, kwargs)

if hasattr(kwargs, 'id'):
kwargs['ids'] = [kwargs.get('id')]
if field_ids:
if isinstance(field_ids, str):
field_ids = [field_ids]
elif isinstance(field_ids, dict):
field_ids = list(field_ids)
kwargs['ids'] = field_ids
elif field_ids == []:
return []

for arg in ('ids', 'exids'):
# live objects can be represented by a universal ID
kwargs[arg] = [Tokens(n_id, relative=True) for n_id in kwargs[arg]]
kwargs['workflows'] = [
Tokens(w_id) for w_id in kwargs['workflows']]
kwargs['exworkflows'] = [
Tokens(w_id) for w_id in kwargs['exworkflows']]

return await list_tasks(kwargs)


async def list_tasks(args):
if not args['workflows']:
raise Exception('At least one workflow must be provided.')
from cylc.flow.rundb import CylcWorkflowDAO
from cylc.flow.pathutil import get_workflow_run_dir
from cylc.flow.workflow_files import WorkflowFiles
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you intend to put these at the top? They would be imported every time the query/field is requested otherwise

tasks = []
for workflow in args['workflows']:
db_file = get_workflow_run_dir(
workflow['workflow'],
WorkflowFiles.LogDir.DIRNAME,
"db"
)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
conn = dao.connect()
tasks.extend(make_query(conn, workflow))
return tasks


async def get_jobs(root, info, **kwargs):
if kwargs['live']:
return await get_nodes_all(root, info, **kwargs)
Expand Down Expand Up @@ -324,7 +372,7 @@ async def list_jobs(args):
)
with CylcWorkflowDAO(db_file, is_public=True) as dao:
conn = dao.connect()
jobs.extend(make_query(conn, workflow))
jobs.extend(make_jobs_query(conn, workflow, args.get('tasks')))
return jobs


Expand Down Expand Up @@ -451,6 +499,62 @@ def make_query(conn, workflow):
return tasks


def make_jobs_query(conn, workflow, tasks):

# TODO: support all arguments including states
# https://github.com/cylc/cylc-uiserver/issues/440
jobs = []

# Create sql snippet used to limit which tasks are returned by query
if tasks:
where_clauses = "' OR name = '".join(tasks)
where_clauses = f" AND (name = '{where_clauses}')"
else:
where_clauses = ''

for row in conn.execute(f'''
SELECT
name,
cycle,
submit_num,
submit_status,
time_run,
time_run_exit,
job_id,
platform_name,
time_submit,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_submit) AS total_time,
STRFTIME('%s', time_run_exit) - STRFTIME('%s', time_run) AS run_time,
STRFTIME('%s', time_run) - STRFTIME('%s', time_submit) AS queue_time
FROM
task_jobs
WHERE
run_status = 0
{where_clauses};
'''):
jobs.append({
'id': workflow.duplicate(
cycle=row[1],
task=row[0],
job=row[2]
),
'name': row[0],
'cycle_point': row[1],
'submit_num': row[2],
'state': row[3],
'started_time': row[4],
'finished_time': row[5],
'job_ID': row[6],
'platform': row[7],
'submitted_time': row[8],
'total_time': row[9],
'run_time': row[10],
'queue_time': row[11]
})

return jobs


class UISTask(Task):

platform = graphene.String()
Expand Down Expand Up @@ -484,6 +588,13 @@ class UISTask(Task):
count = graphene.Int()


class UISJob(Job):

total_time = graphene.Int()
queue_time = graphene.Int()
run_time = graphene.Int()


class UISQueries(Queries):

class LogFiles(graphene.ObjectType):
Expand Down Expand Up @@ -511,6 +622,22 @@ class LogFiles(graphene.ObjectType):
description=Task._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_tasks,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
ids=graphene.List(ID, default_value=[]),
exids=graphene.List(ID, default_value=[]),
mindepth=graphene.Int(default_value=-1),
maxdepth=graphene.Int(default_value=-1),
sort=SortArgs(default_value=None),

)

jobs = graphene.List(
UISJob,
description=Job._meta.description,
live=graphene.Boolean(default_value=True),
strip_null=STRIP_NULL_DEFAULT,
resolver=get_jobs,
workflows=graphene.List(ID, default_value=[]),
exworkflows=graphene.List(ID, default_value=[]),
Expand All @@ -519,6 +646,7 @@ class LogFiles(graphene.ObjectType):
mindepth=graphene.Int(default_value=-1),
maxdepth=graphene.Int(default_value=-1),
sort=SortArgs(default_value=None),
tasks=graphene.List(ID, default_value=[])
)


Expand Down