From 30dfd08e16fe536094905ca9d9837c32395c5a54 Mon Sep 17 00:00:00 2001 From: Kevin Le Date: Thu, 10 Sep 2020 16:08:34 -0600 Subject: [PATCH 1/7] Initial draft of timings gathering --- rq_exporter/collector.py | 12 ++++++-- rq_exporter/utils.py | 63 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/rq_exporter/collector.py b/rq_exporter/collector.py index ff8dfa8..ac12303 100644 --- a/rq_exporter/collector.py +++ b/rq_exporter/collector.py @@ -7,9 +7,9 @@ from rq import Connection from prometheus_client import Summary -from prometheus_client.core import GaugeMetricFamily +from prometheus_client.core import GaugeMetricFamily, SummaryMetricFamily -from .utils import get_workers_stats, get_jobs_by_queue +from .utils import get_workers_stats, get_jobs_by_queue, get_finished_registries_by_queue logger = logging.getLogger(__name__) @@ -51,6 +51,7 @@ def collect(self): with Connection(self.connection): rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status']) + rq_timings = SummaryMetricFamily('rq_timings', 'RQ jobs by state', labels=['queue', 'func_name', 'job_id']) for worker in get_workers_stats(self.worker_class): rq_workers.add_metric([worker['name'], worker['state'], ','.join(worker['queues'])], 1) @@ -63,4 +64,11 @@ def collect(self): yield rq_jobs + for (queue_name, registries) in get_finished_registries_by_queue( + self.connection, self.queue_class).items(): + for (_, timings) in registries.items(): + rq_timings.add_metric([queue_name, timings['func_name']], 1, timings['runtime'], timings['ended_at']) + + yield rq_timings + logger.debug('RQ metrics collection finished') diff --git a/rq_exporter/utils.py b/rq_exporter/utils.py index 7f5ab34..9b3b7ed 100644 --- a/rq_exporter/utils.py +++ b/rq_exporter/utils.py @@ -5,7 +5,7 @@ from redis import Redis from rq import Queue, Worker -from rq.job import JobStatus +from rq.job import JobStatus, Job def get_redis_connection(host='localhost', port='6379', db='0', @@ -69,6 +69,44 @@ def get_workers_stats(worker_class=None): ] +def get_job_timings(job): + """ + + Returns: + dict: + + """ + # Runtime should never be -1 unless the job is not from a finished_job_registry + return { + 'func_name': job.func_name, + 'started_at': job.started_at.timestamp(), + 'ended_at': job.ended_at.timestamp(), + 'runtime': (job.ended_at - job.started_at).total_seconds() if job.ended_at else -1 + } + + +def get_registry_timings(connection, job_registry, limit=-1): + """Get the timings for jobs in a Registry. + + Args: + job_registry (rq.BaseRegistry): The RQ Registry instance + limit (int): The max number of jobs to retrieve + + Returns: + dict: + + Raises: + redis.exceptions.RedisError: On Redis connection errors + + """ + job_ids = job_registry.get_job_ids(end=limit) + jobs = Job.fetch_many(job_ids, connection=connection) + + return { + job.id: get_job_timings(job) for job in jobs + } + + def get_queue_jobs(queue_name, queue_class=None): """Get the jobs by status of a Queue. @@ -117,3 +155,26 @@ def get_jobs_by_queue(queue_class=None): return { q.name: get_queue_jobs(q.name, queue_class) for q in queues } + + +def get_finished_registries_by_queue(connection, queue_class=None): + """Get finished registries by queue. + + Args: + connection : this is required to fetch jobs + queue_class (type): RQ Queue class + + Returns: + dict: Dictionary of job count by status for each queue + + Raises: + redis.exceptions.RedisError: On Redis connection errors + + """ + queue_class = queue_class if queue_class is not None else Queue + + queues = queue_class.all() + + return { + q.name: get_registry_timings(connection, q.finished_job_registry) for q in queues + } \ No newline at end of file From 7ed6d3eedb8927bdd3fd70a722d9d0ff8373d867 Mon Sep 17 00:00:00 2001 From: Kevin Le Date: Mon, 14 Sep 2020 10:07:45 -0600 Subject: [PATCH 2/7] Add a short_running_job to enqueue --- docker-compose/project/enqueue.py | 2 +- docker-compose/project/jobs.py | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/docker-compose/project/enqueue.py b/docker-compose/project/enqueue.py index 29ab62d..9c10b1e 100644 --- a/docker-compose/project/enqueue.py +++ b/docker-compose/project/enqueue.py @@ -21,7 +21,7 @@ def main(custom_classes=False): while True: # Choose a random job - job = random.choice((jobs.long_running_job, jobs.process_data)) + job = random.choice((jobs.long_running_job, jobs.process_data, jobs.short_running_job)) # Choose a random queue queue = random.choice(queues_list) diff --git a/docker-compose/project/jobs.py b/docker-compose/project/jobs.py index 2a6ddad..edbc3d6 100644 --- a/docker-compose/project/jobs.py +++ b/docker-compose/project/jobs.py @@ -14,6 +14,14 @@ def long_running_job(s=10): return s +def short_running_job(s=10): + s = s/10 + print(f'long_running_job: sleeping for {s} seconds') + time.sleep(s) + + return s + + def process_data(s=10): print(f'process_data: sleeping for {s} seconds') time.sleep(s) From 0de7c8286a6f942774bbfa26e6db411121b8a294 Mon Sep 17 00:00:00 2001 From: Kevin Le Date: Mon, 14 Sep 2020 10:31:58 -0600 Subject: [PATCH 3/7] Add 2 panels on runtime to grafana --- grafana/rq-dashboard.json | 184 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 184 insertions(+) diff --git a/grafana/rq-dashboard.json b/grafana/rq-dashboard.json index f7db905..da9c698 100644 --- a/grafana/rq-dashboard.json +++ b/grafana/rq-dashboard.json @@ -1735,6 +1735,190 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_PROMETHEUS", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 46 + }, + "hiddenSeries": false, + "id": 31, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideZero": true, + "max": true, + "min": true, + "rightSide": true, + "show": true, + "sort": "current", + "sortDesc": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "avg(rq_timings_sum{job=\"$job\", instance=\"$instance\"}) by (func_name)", + "interval": "", + "legendFormat": "{{func_name}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Average completion time by job", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_PROMETHEUS", + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 46 + }, + "hiddenSeries": false, + "id": 32, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideZero": true, + "max": true, + "min": true, + "rightSide": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 2, + "nullPointMode": "null as zero", + "options": { + "dataLinks": [] + }, + "percentage": false, + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "avg(rq_timings_sum{job=\"$job\", instance=\"$instance\"}) by (queue)", + "interval": "", + "legendFormat": "{{queue}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Average completion time by queue", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "s", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "refresh": "", From b6bb82ad4271f21f8a58d5af380defb463b49251 Mon Sep 17 00:00:00 2001 From: Kevin Le Date: Mon, 14 Sep 2020 12:06:01 -0600 Subject: [PATCH 4/7] Change default timings limit to 3 --- rq_exporter/utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq_exporter/utils.py b/rq_exporter/utils.py index 9b3b7ed..e23884d 100644 --- a/rq_exporter/utils.py +++ b/rq_exporter/utils.py @@ -85,7 +85,7 @@ def get_job_timings(job): } -def get_registry_timings(connection, job_registry, limit=-1): +def get_registry_timings(connection, job_registry, limit=3): """Get the timings for jobs in a Registry. Args: @@ -99,7 +99,7 @@ def get_registry_timings(connection, job_registry, limit=-1): redis.exceptions.RedisError: On Redis connection errors """ - job_ids = job_registry.get_job_ids(end=limit) + job_ids = job_registry.get_job_ids(start=limit*-1, end=-1) jobs = Job.fetch_many(job_ids, connection=connection) return { From 19b419194134363500394336b2c8c88ed875c574 Mon Sep 17 00:00:00 2001 From: Kevin Le Date: Mon, 14 Sep 2020 15:02:31 -0600 Subject: [PATCH 5/7] Comments to runtime code --- rq_exporter/collector.py | 6 ++++-- rq_exporter/utils.py | 14 +++++++++----- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/rq_exporter/collector.py b/rq_exporter/collector.py index ac12303..e65691d 100644 --- a/rq_exporter/collector.py +++ b/rq_exporter/collector.py @@ -51,7 +51,8 @@ def collect(self): with Connection(self.connection): rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status']) - rq_timings = SummaryMetricFamily('rq_timings', 'RQ jobs by state', labels=['queue', 'func_name', 'job_id']) + # This measures runtime as job.ended_at - job.started_at, not from enqueued_at + rq_timings = SummaryMetricFamily('rq_timings', 'Sampled runtime of RQ jobs', labels=['queue', 'func_name', 'job_id']) for worker in get_workers_stats(self.worker_class): rq_workers.add_metric([worker['name'], worker['state'], ','.join(worker['queues'])], 1) @@ -65,8 +66,9 @@ def collect(self): yield rq_jobs for (queue_name, registries) in get_finished_registries_by_queue( - self.connection, self.queue_class).items(): + self.connection, self.queue_class).items(): for (_, timings) in registries.items(): + # Samples are added individually with count_value as 1, timestamps past Prometheus will be dropped rq_timings.add_metric([queue_name, timings['func_name']], 1, timings['runtime'], timings['ended_at']) yield rq_timings diff --git a/rq_exporter/utils.py b/rq_exporter/utils.py index e23884d..73e23e0 100644 --- a/rq_exporter/utils.py +++ b/rq_exporter/utils.py @@ -70,10 +70,13 @@ def get_workers_stats(worker_class=None): def get_job_timings(job): - """ + """Get stats for the RQ job. + + Args: + job (rq.job.Job): an instance of a RQ job Returns: - dict: + dict: Dictionary of basic runtime stats for the job """ # Runtime should never be -1 unless the job is not from a finished_job_registry @@ -93,12 +96,13 @@ def get_registry_timings(connection, job_registry, limit=3): limit (int): The max number of jobs to retrieve Returns: - dict: + dict: Dictionary of job id to a dict of the job's stats Raises: redis.exceptions.RedisError: On Redis connection errors """ + # Jobs are added in RQ with zscore of current time + ttl, fetch by last completed job_ids = job_registry.get_job_ids(start=limit*-1, end=-1) jobs = Job.fetch_many(job_ids, connection=connection) @@ -165,7 +169,7 @@ def get_finished_registries_by_queue(connection, queue_class=None): queue_class (type): RQ Queue class Returns: - dict: Dictionary of job count by status for each queue + dict: Dictionary of queues with nested runtime stats Raises: redis.exceptions.RedisError: On Redis connection errors @@ -177,4 +181,4 @@ def get_finished_registries_by_queue(connection, queue_class=None): return { q.name: get_registry_timings(connection, q.finished_job_registry) for q in queues - } \ No newline at end of file + } From ebeb049597f02f3f83306a4e2f5462264ed07eca Mon Sep 17 00:00:00 2001 From: Kevin Le Date: Mon, 14 Sep 2020 20:12:59 -0600 Subject: [PATCH 6/7] Update jobs.py --- docker-compose/project/jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose/project/jobs.py b/docker-compose/project/jobs.py index edbc3d6..2dd981b 100644 --- a/docker-compose/project/jobs.py +++ b/docker-compose/project/jobs.py @@ -16,7 +16,7 @@ def long_running_job(s=10): def short_running_job(s=10): s = s/10 - print(f'long_running_job: sleeping for {s} seconds') + print(f'short_running_job: sleeping for {s} seconds') time.sleep(s) return s From 540b4e4f3ee670eda5f9df72d2279f4ebec9ca2e Mon Sep 17 00:00:00 2001 From: Kevin Le Date: Mon, 14 Sep 2020 20:13:55 -0600 Subject: [PATCH 7/7] Update collector.py --- rq_exporter/collector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq_exporter/collector.py b/rq_exporter/collector.py index e65691d..1b92c77 100644 --- a/rq_exporter/collector.py +++ b/rq_exporter/collector.py @@ -52,7 +52,7 @@ def collect(self): rq_workers = GaugeMetricFamily('rq_workers', 'RQ workers', labels=['name', 'state', 'queues']) rq_jobs = GaugeMetricFamily('rq_jobs', 'RQ jobs by state', labels=['queue', 'status']) # This measures runtime as job.ended_at - job.started_at, not from enqueued_at - rq_timings = SummaryMetricFamily('rq_timings', 'Sampled runtime of RQ jobs', labels=['queue', 'func_name', 'job_id']) + rq_timings = SummaryMetricFamily('rq_timings', 'Sampled runtime of RQ jobs', labels=['queue', 'func_name']) for worker in get_workers_stats(self.worker_class): rq_workers.add_metric([worker['name'], worker['state'], ','.join(worker['queues'])], 1)