Skip to content

Commit

Permalink
Remove Prometheus registry setup (#1495)
Browse files Browse the repository at this point in the history
* remove registry from prometheus calls

* Unregister metrics in all worker tests

* Add unregister to other worker test

* Get metric names with get_names() method

* fix method name

* fix method name 2

* only load metrics once in workers module

* clean unnecessary changes

* more cleaning unnecessary changes
  • Loading branch information
aarontp authored Jun 13, 2024
1 parent c351f9a commit 9da8379
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 31 deletions.
52 changes: 27 additions & 25 deletions turbinia/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from datetime import timedelta
from enum import IntEnum

from itertools import chain
import json
import logging
import os
Expand All @@ -33,7 +34,7 @@
import uuid
import filelock

from prometheus_client import CollectorRegistry, Counter, Histogram
from prometheus_client import Counter, Histogram
from turbinia import __version__, config
from turbinia.config import DATETIME_FORMAT
from turbinia.evidence import evidence_decode
Expand All @@ -46,6 +47,7 @@
from turbinia import log_and_report

from celery.exceptions import SoftTimeLimitExceeded
from prometheus_client import REGISTRY

METRICS = {}
# Set the maximum size that the report can be before truncating it. This is a
Expand All @@ -57,30 +59,30 @@

log = logging.getLogger(__name__)

registry = CollectorRegistry()
turbinia_worker_exception_failure = Counter(
'turbinia_worker_exception_failure',
'Total number Tasks failed due to uncaught exception', registry=registry)
turbinia_worker_tasks_started_total = Counter(
'turbinia_worker_tasks_started_total',
'Total number of started worker tasks', registry=registry)
turbinia_worker_tasks_completed_total = Counter(
'turbinia_worker_tasks_completed_total',
'Total number of completed worker tasks', registry=registry)
turbinia_worker_tasks_queued_total = Counter(
'turbinia_worker_tasks_queued_total', 'Total number of queued worker tasks',
registry=registry)
turbinia_worker_tasks_failed_total = Counter(
'turbinia_worker_tasks_failed_total', 'Total number of failed worker tasks',
registry=registry)
turbinia_worker_tasks_timeout_total = Counter(
'turbinia_worker_tasks_timeout_total',
'Total number of worker tasks timed out during dependency execution.',
registry=registry)
turbinia_worker_tasks_timeout_celery_soft = Counter(
'turbinia_worker_tasks_timeout_celery_soft',
'Total number of Tasks timed out due to Celery soft timeout',
registry=registry)
# Prevent re-registering metrics if module is loaded multiple times.
metric_names = list(chain.from_iterable(REGISTRY._collector_to_names.values()))
if 'turbinia_worker_exception_failure' not in metric_names:
turbinia_worker_exception_failure = Counter(
'turbinia_worker_exception_failure',
'Total number Tasks failed due to uncaught exception')
turbinia_worker_tasks_started_total = Counter(
'turbinia_worker_tasks_started_total',
'Total number of started worker tasks')
turbinia_worker_tasks_completed_total = Counter(
'turbinia_worker_tasks_completed_total',
'Total number of completed worker tasks')
turbinia_worker_tasks_queued_total = Counter(
'turbinia_worker_tasks_queued_total',
'Total number of queued worker tasks')
turbinia_worker_tasks_failed_total = Counter(
'turbinia_worker_tasks_failed_total',
'Total number of failed worker tasks')
turbinia_worker_tasks_timeout_total = Counter(
'turbinia_worker_tasks_timeout_total',
'Total number of worker tasks timed out during dependency execution.')
turbinia_worker_tasks_timeout_celery_soft = Counter(
'turbinia_worker_tasks_timeout_celery_soft',
'Total number of Tasks timed out due to Celery soft timeout')


class Priority(IntEnum):
Expand Down
8 changes: 2 additions & 6 deletions turbinia/workers/workers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def tearDown(self):
os.rmdir(directory)

os.rmdir(self.base_output_dir)
self.unregisterMetrics()

def setResults(
self, setup=None, run=None, validate_result=None, mock_run=True):
Expand Down Expand Up @@ -116,7 +117,7 @@ def setResults(

def unregisterMetrics(self):
"""Unset all the metrics to avoid duplicated timeseries error."""
for collector, names in tuple(REGISTRY._collector_to_names.items()):
for collector, _ in tuple(REGISTRY._collector_to_names.items()):
REGISTRY.unregister(collector)


Expand Down Expand Up @@ -148,7 +149,6 @@ def testTurbiniaTaskSerialize(self):
@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
def testTurbiniaTaskRunWrapper(self, _):
"""Test that the run wrapper executes task run."""
self.unregisterMetrics()
self.setResults()
self.result.closed = True
new_result = self.task.run_wrapper(self.evidence.__dict__)
Expand All @@ -160,7 +160,6 @@ def testTurbiniaTaskRunWrapper(self, _):
@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
def testTurbiniaTaskRunWrapperAutoClose(self, _):
"""Test that the run wrapper closes the task."""
self.unregisterMetrics()
self.setResults()
new_result = self.task.run_wrapper(self.evidence.__dict__)
new_result = TurbiniaTaskResult.deserialize(new_result)
Expand All @@ -171,7 +170,6 @@ def testTurbiniaTaskRunWrapperAutoClose(self, _):
@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
def testTurbiniaTaskRunWrapperBadResult(self, _, __):
"""Test that the run wrapper recovers from run returning bad result."""
self.unregisterMetrics()
bad_result = 'Not a TurbiniaTaskResult'
checked_result = TurbiniaTaskResult(base_output_dir=self.base_output_dir)
checked_result.setup(self.task)
Expand All @@ -186,7 +184,6 @@ def testTurbiniaTaskRunWrapperBadResult(self, _, __):
@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
def testTurbiniaTaskJobUnavailable(self, _):
"""Test that the run wrapper can fail if the job doesn't exist."""
self.unregisterMetrics()
self.setResults()
self.task.job_name = 'non_exist'
canary_status = (
Expand All @@ -199,7 +196,6 @@ def testTurbiniaTaskJobUnavailable(self, _):
@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
def testTurbiniaTaskRunWrapperExceptionThrown(self, _):
"""Test that the run wrapper recovers from run throwing an exception."""
self.unregisterMetrics()
self.setResults()
self.task.run = mock.MagicMock(side_effect=TurbiniaException)

Expand Down

0 comments on commit 9da8379

Please sign in to comment.