Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export job start/end times (#3) #4

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose/project/enqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def main(custom_classes=False):

while True:
# Choose a random job
job = random.choice((jobs.long_running_job, jobs.process_data))
job = random.choice((jobs.long_running_job, jobs.process_data, jobs.short_running_job))

# Choose a random queue
queue = random.choice(queues_list)
Expand Down
8 changes: 8 additions & 0 deletions docker-compose/project/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ def long_running_job(s=10):
return s


def short_running_job(s=10):
s = s/10
print(f'short_running_job: sleeping for {s} seconds')
time.sleep(s)

return s


def process_data(s=10):
print(f'process_data: sleeping for {s} seconds')
time.sleep(s)
Expand Down
184 changes: 184 additions & 0 deletions grafana/rq-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,190 @@
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$DS_PROMETHEUS",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 46
},
"hiddenSeries": false,
"id": 31,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"hideZero": true,
"max": true,
"min": true,
"rightSide": true,
"show": true,
"sort": "current",
"sortDesc": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 2,
"nullPointMode": "null as zero",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "avg(rq_timings_sum{job=\"$job\", instance=\"$instance\"}) by (func_name)",
"interval": "",
"legendFormat": "{{func_name}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Average completion time by job",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$DS_PROMETHEUS",
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 46
},
"hiddenSeries": false,
"id": 32,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"hideZero": true,
"max": true,
"min": true,
"rightSide": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 2,
"nullPointMode": "null as zero",
"options": {
"dataLinks": []
},
"percentage": false,
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "avg(rq_timings_sum{job=\"$job\", instance=\"$instance\"}) by (queue)",
"interval": "",
"legendFormat": "{{queue}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Average completion time by queue",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"refresh": "",
Expand Down
14 changes: 12 additions & 2 deletions rq_exporter/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from rq import Connection
from prometheus_client import Summary
from prometheus_client.core import GaugeMetricFamily
from prometheus_client.core import GaugeMetricFamily, SummaryMetricFamily

from .utils import get_workers_stats, get_jobs_by_queue
from .utils import get_workers_stats, get_jobs_by_queue, get_finished_registries_by_queue


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,6 +51,8 @@ def collect(self):
with Connection(self.connection):
rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues'])
rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status'])
# This measures runtime as job.ended_at - job.started_at, not from enqueued_at
rq_timings = SummaryMetricFamily('rq_timings', 'Sampled runtime of RQ jobs', labels=['queue', 'func_name'])

for worker in get_workers_stats(self.worker_class):
rq_workers.add_metric([worker['name'], worker['state'], ','.join(worker['queues'])], 1)
Expand All @@ -63,4 +65,12 @@ def collect(self):

yield rq_jobs

for (queue_name, registries) in get_finished_registries_by_queue(
self.connection, self.queue_class).items():
for (_, timings) in registries.items():
# Samples are added individually with count_value as 1, timestamps past Prometheus will be dropped
rq_timings.add_metric([queue_name, timings['func_name']], 1, timings['runtime'], timings['ended_at'])

yield rq_timings

logger.debug('RQ metrics collection finished')
67 changes: 66 additions & 1 deletion rq_exporter/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from redis import Redis
from rq import Queue, Worker
from rq.job import JobStatus
from rq.job import JobStatus, Job


def get_redis_connection(host='localhost', port='6379', db='0',
Expand Down Expand Up @@ -69,6 +69,48 @@ def get_workers_stats(worker_class=None):
]


def get_job_timings(job):
"""Get stats for the RQ job.

Args:
job (rq.job.Job): an instance of a RQ job

Returns:
dict: Dictionary of basic runtime stats for the job

"""
# Runtime should never be -1 unless the job is not from a finished_job_registry
return {
'func_name': job.func_name,
'started_at': job.started_at.timestamp(),
'ended_at': job.ended_at.timestamp(),
'runtime': (job.ended_at - job.started_at).total_seconds() if job.ended_at else -1
}


def get_registry_timings(connection, job_registry, limit=3):
"""Get the timings for jobs in a Registry.

Args:
job_registry (rq.BaseRegistry): The RQ Registry instance
limit (int): The max number of jobs to retrieve

Returns:
dict: Dictionary of job id to a dict of the job's stats

Raises:
redis.exceptions.RedisError: On Redis connection errors

"""
# Jobs are added in RQ with zscore of current time + ttl, fetch by last completed
job_ids = job_registry.get_job_ids(start=limit*-1, end=-1)
jobs = Job.fetch_many(job_ids, connection=connection)

return {
job.id: get_job_timings(job) for job in jobs
}


def get_queue_jobs(queue_name, queue_class=None):
"""Get the jobs by status of a Queue.

Expand Down Expand Up @@ -117,3 +159,26 @@ def get_jobs_by_queue(queue_class=None):
return {
q.name: get_queue_jobs(q.name, queue_class) for q in queues
}


def get_finished_registries_by_queue(connection, queue_class=None):
"""Get finished registries by queue.

Args:
connection : this is required to fetch jobs
queue_class (type): RQ Queue class

Returns:
dict: Dictionary of queues with nested runtime stats

Raises:
redis.exceptions.RedisError: On Redis connection errors

"""
queue_class = queue_class if queue_class is not None else Queue

queues = queue_class.all()

return {
q.name: get_registry_timings(connection, q.finished_job_registry) for q in queues
}