diff --git a/docker-compose/project/enqueue.py b/docker-compose/project/enqueue.py index 29ab62d..9c10b1e 100644 --- a/docker-compose/project/enqueue.py +++ b/docker-compose/project/enqueue.py @@ -21,7 +21,7 @@ def main(custom_classes=False): while True: # Choose a random job - job = random.choice((jobs.long_running_job, jobs.process_data)) + job = random.choice((jobs.long_running_job, jobs.process_data, jobs.short_running_job)) # Choose a random queue queue = random.choice(queues_list) diff --git a/docker-compose/project/jobs.py b/docker-compose/project/jobs.py index 2a6ddad..2dd981b 100644 --- a/docker-compose/project/jobs.py +++ b/docker-compose/project/jobs.py @@ -14,6 +14,14 @@ def long_running_job(s=10): return s +def short_running_job(s=10): + s = s/10 + print(f'short_running_job: sleeping for {s} seconds') + time.sleep(s) + + return s + + def process_data(s=10): print(f'process_data: sleeping for {s} seconds') time.sleep(s) diff --git a/grafana/rq-dashboard.json b/grafana/rq-dashboard.json index f7db905..da9c698 100644 --- a/grafana/rq-dashboard.json +++ b/grafana/rq-dashboard.json @@ -1735,6 +1735,190 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_PROMETHEUS", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 46 + }, + "hiddenSeries": false, + "id": 31, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideZero": true, + "max": true, + "min": true, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "avg(rq_timings_sum{job=\"$job\", instance=\"$instance\"}) by (func_name)", + "interval": "", + "legendFormat": "{{func_name}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Average completion time by job", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_PROMETHEUS", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 46 + }, + "hiddenSeries": false, + "id": 32, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideZero": true, + "max": true, + "min": true, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "avg(rq_timings_sum{job=\"$job\", instance=\"$instance\"}) by (queue)", + "interval": "", + "legendFormat": "{{queue}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Average completion time by queue", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "refresh": "", diff --git a/rq_exporter/collector.py b/rq_exporter/collector.py index ff8dfa8..1b92c77 100644 --- a/rq_exporter/collector.py +++ b/rq_exporter/collector.py @@ -7,9 +7,9 @@ from rq import Connection from prometheus_client import Summary -from prometheus_client.core import GaugeMetricFamily +from prometheus_client.core import GaugeMetricFamily, SummaryMetricFamily -from .utils import get_workers_stats, get_jobs_by_queue +from .utils import get_workers_stats, get_jobs_by_queue, get_finished_registries_by_queue logger = logging.getLogger(__name__) @@ -51,6 +51,8 @@ def collect(self): with Connection(self.connection): rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status']) + # This measures runtime as job.ended_at - job.started_at, not from enqueued_at + rq_timings = SummaryMetricFamily('rq_timings', 'Sampled runtime of RQ jobs', labels=['queue', 'func_name']) for worker in get_workers_stats(self.worker_class): rq_workers.add_metric([worker['name'], worker['state'], ','.join(worker['queues'])], 1) @@ -63,4 +65,12 @@ def collect(self): yield rq_jobs + for (queue_name, registries) in get_finished_registries_by_queue( + self.connection, self.queue_class).items(): + for (_, timings) in registries.items(): + # Samples are added individually with count_value as 1, timestamps past Prometheus will be dropped + rq_timings.add_metric([queue_name, timings['func_name']], 1, timings['runtime'], timings['ended_at']) + + yield rq_timings + logger.debug('RQ metrics collection finished') diff --git a/rq_exporter/utils.py b/rq_exporter/utils.py index 7f5ab34..73e23e0 100644 --- a/rq_exporter/utils.py +++ b/rq_exporter/utils.py @@ -5,7 +5,7 @@ from redis import Redis from rq import Queue, Worker -from rq.job import JobStatus +from rq.job import JobStatus, Job def get_redis_connection(host='localhost', port='6379', db='0', @@ -69,6 +69,48 @@ def get_workers_stats(worker_class=None): ] +def get_job_timings(job): + """Get stats for the RQ job. + + Args: + job (rq.job.Job): an instance of a RQ job + + Returns: + dict: Dictionary of basic runtime stats for the job + + """ + # Runtime should never be -1 unless the job is not from a finished_job_registry + return { + 'func_name': job.func_name, + 'started_at': job.started_at.timestamp(), + 'ended_at': job.ended_at.timestamp(), + 'runtime': (job.ended_at - job.started_at).total_seconds() if job.ended_at else -1 + } + + +def get_registry_timings(connection, job_registry, limit=3): + """Get the timings for jobs in a Registry. + + Args: + job_registry (rq.BaseRegistry): The RQ Registry instance + limit (int): The max number of jobs to retrieve + + Returns: + dict: Dictionary of job id to a dict of the job's stats + + Raises: + redis.exceptions.RedisError: On Redis connection errors + + """ + # Jobs are added in RQ with zscore of current time + ttl, fetch by last completed + job_ids = job_registry.get_job_ids(start=limit*-1, end=-1) + jobs = Job.fetch_many(job_ids, connection=connection) + + return { + job.id: get_job_timings(job) for job in jobs + } + + def get_queue_jobs(queue_name, queue_class=None): """Get the jobs by status of a Queue. @@ -117,3 +159,26 @@ def get_jobs_by_queue(queue_class=None): return { q.name: get_queue_jobs(q.name, queue_class) for q in queues } + + +def get_finished_registries_by_queue(connection, queue_class=None): + """Get finished registries by queue. + + Args: + connection : this is required to fetch jobs + queue_class (type): RQ Queue class + + Returns: + dict: Dictionary of queues with nested runtime stats + + Raises: + redis.exceptions.RedisError: On Redis connection errors + + """ + queue_class = queue_class if queue_class is not None else Queue + + queues = queue_class.all() + + return { + q.name: get_registry_timings(connection, q.finished_job_registry) for q in queues + }