From d818e8f08625dbc44bac95598293e86cfac9e8a1 Mon Sep 17 00:00:00 2001 From: Daniel Szoke <7881302+szokeasaurusrex@users.noreply.github.com> Date: Fri, 7 Jun 2024 15:13:49 -0400 Subject: [PATCH] Revert "Refactor the Celery Beat integration (#3105)" (#3144) This reverts commit c80cad1e6e17790f02b29115013014d3b4bebd3c, which appears to have introduced a regression preventing checkins from being sent when a cron job is finished. --- sentry_sdk/integrations/celery/__init__.py | 17 +- sentry_sdk/integrations/celery/beat.py | 166 +++++++++-------- sentry_sdk/scope.py | 7 +- .../celery/test_update_celery_task_headers.py | 168 +++--------------- 4 files changed, 134 insertions(+), 224 deletions(-) diff --git a/sentry_sdk/integrations/celery/__init__.py b/sentry_sdk/integrations/celery/__init__.py index 72de43beb4..46e8002218 100644 --- a/sentry_sdk/integrations/celery/__init__.py +++ b/sentry_sdk/integrations/celery/__init__.py @@ -70,9 +70,10 @@ def __init__( self.monitor_beat_tasks = monitor_beat_tasks self.exclude_beat_tasks = exclude_beat_tasks - _patch_beat_apply_entry() - _patch_redbeat_maybe_due() - _setup_celery_beat_signals() + if monitor_beat_tasks: + _patch_beat_apply_entry() + _patch_redbeat_maybe_due() + _setup_celery_beat_signals() @staticmethod def setup_once(): @@ -166,11 +167,11 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks): """ updated_headers = original_headers.copy() with capture_internal_exceptions(): - # if span is None (when the task was started by Celery Beat) - # this will return the trace headers from the scope. - headers = dict( - Scope.get_isolation_scope().iter_trace_propagation_headers(span=span) - ) + headers = {} + if span is not None: + headers = dict( + Scope.get_current_scope().iter_trace_propagation_headers(span=span) + ) if monitor_beat_tasks: headers.update( diff --git a/sentry_sdk/integrations/celery/beat.py b/sentry_sdk/integrations/celery/beat.py index d9a1ca1854..060045eb37 100644 --- a/sentry_sdk/integrations/celery/beat.py +++ b/sentry_sdk/integrations/celery/beat.py @@ -1,4 +1,3 @@ -from functools import wraps import sentry_sdk from sentry_sdk.crons import capture_checkin, MonitorStatus from sentry_sdk.integrations import DidNotEnable @@ -114,108 +113,133 @@ def _get_monitor_config(celery_schedule, app, monitor_name): return monitor_config -def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration): - # type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None +def _patch_beat_apply_entry(): + # type: () -> None """ - Add Sentry Crons information to the schedule_entry headers. + Makes sure that the Sentry Crons information is set in the Celery Beat task's + headers so that is is monitored with Sentry Crons. + + This is only called by Celery Beat. After apply_entry is called + Celery will call apply_async to put the task in the queue. """ - if not integration.monitor_beat_tasks: - return + from sentry_sdk.integrations.celery import CeleryIntegration - monitor_name = schedule_entry.name + original_apply_entry = Scheduler.apply_entry - task_should_be_excluded = match_regex_list( - monitor_name, integration.exclude_beat_tasks - ) - if task_should_be_excluded: - return + def sentry_apply_entry(*args, **kwargs): + # type: (*Any, **Any) -> None + scheduler, schedule_entry = args + app = scheduler.app - celery_schedule = schedule_entry.schedule - app = scheduler.app + celery_schedule = schedule_entry.schedule + monitor_name = schedule_entry.name - monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) + integration = sentry_sdk.get_client().get_integration(CeleryIntegration) + if integration is None: + return original_apply_entry(*args, **kwargs) - is_supported_schedule = bool(monitor_config) - if not is_supported_schedule: - return + if match_regex_list(monitor_name, integration.exclude_beat_tasks): + return original_apply_entry(*args, **kwargs) - headers = schedule_entry.options.pop("headers", {}) - headers.update( - { - "sentry-monitor-slug": monitor_name, - "sentry-monitor-config": monitor_config, - } - ) + # Tasks started by Celery Beat start a new Trace + scope = Scope.get_isolation_scope() + scope.set_new_propagation_context() + scope._name = "celery-beat" - check_in_id = capture_checkin( - monitor_slug=monitor_name, - monitor_config=monitor_config, - status=MonitorStatus.IN_PROGRESS, - ) - headers.update({"sentry-monitor-check-in-id": check_in_id}) + monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) - # Set the Sentry configuration in the options of the ScheduleEntry. - # Those will be picked up in `apply_async` and added to the headers. - schedule_entry.options["headers"] = headers + is_supported_schedule = bool(monitor_config) + if is_supported_schedule: + headers = schedule_entry.options.pop("headers", {}) + headers.update( + { + "sentry-monitor-slug": monitor_name, + "sentry-monitor-config": monitor_config, + } + ) + check_in_id = capture_checkin( + monitor_slug=monitor_name, + monitor_config=monitor_config, + status=MonitorStatus.IN_PROGRESS, + ) + headers.update({"sentry-monitor-check-in-id": check_in_id}) + + # Set the Sentry configuration in the options of the ScheduleEntry. + # Those will be picked up in `apply_async` and added to the headers. + schedule_entry.options["headers"] = headers + + return original_apply_entry(*args, **kwargs) + + Scheduler.apply_entry = sentry_apply_entry + + +def _patch_redbeat_maybe_due(): + # type: () -> None + + if RedBeatScheduler is None: + return -def _wrap_beat_scheduler(f): - # type: (Callable[..., Any]) -> Callable[..., Any] - """ - Makes sure that: - - a new Sentry trace is started for each task started by Celery Beat and - it is propagated to the task. - - the Sentry Crons information is set in the Celery Beat task's - headers so that is is monitored with Sentry Crons. - - After the patched function is called, - Celery Beat will call apply_async to put the task in the queue. - """ from sentry_sdk.integrations.celery import CeleryIntegration - @wraps(f) - def sentry_patched_scheduler(*args, **kwargs): + original_maybe_due = RedBeatScheduler.maybe_due + + def sentry_maybe_due(*args, **kwargs): # type: (*Any, **Any) -> None + scheduler, schedule_entry = args + app = scheduler.app + + celery_schedule = schedule_entry.schedule + monitor_name = schedule_entry.name + integration = sentry_sdk.get_client().get_integration(CeleryIntegration) if integration is None: - return f(*args, **kwargs) + return original_maybe_due(*args, **kwargs) + + task_should_be_excluded = match_regex_list( + monitor_name, integration.exclude_beat_tasks + ) + if task_should_be_excluded: + return original_maybe_due(*args, **kwargs) # Tasks started by Celery Beat start a new Trace scope = Scope.get_isolation_scope() scope.set_new_propagation_context() scope._name = "celery-beat" - scheduler, schedule_entry = args - _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration) - - return f(*args, **kwargs) + monitor_config = _get_monitor_config(celery_schedule, app, monitor_name) - return sentry_patched_scheduler + is_supported_schedule = bool(monitor_config) + if is_supported_schedule: + headers = schedule_entry.options.pop("headers", {}) + headers.update( + { + "sentry-monitor-slug": monitor_name, + "sentry-monitor-config": monitor_config, + } + ) + check_in_id = capture_checkin( + monitor_slug=monitor_name, + monitor_config=monitor_config, + status=MonitorStatus.IN_PROGRESS, + ) + headers.update({"sentry-monitor-check-in-id": check_in_id}) -def _patch_beat_apply_entry(): - # type: () -> None - Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry) + # Set the Sentry configuration in the options of the ScheduleEntry. + # Those will be picked up in `apply_async` and added to the headers. + schedule_entry.options["headers"] = headers + return original_maybe_due(*args, **kwargs) -def _patch_redbeat_maybe_due(): - # type: () -> None - if RedBeatScheduler is None: - return - - RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due) + RedBeatScheduler.maybe_due = sentry_maybe_due def _setup_celery_beat_signals(): # type: () -> None - from sentry_sdk.integrations.celery import CeleryIntegration - - integration = sentry_sdk.get_client().get_integration(CeleryIntegration) - - if integration is not None and integration.monitor_beat_tasks: - task_success.connect(crons_task_success) - task_failure.connect(crons_task_failure) - task_retry.connect(crons_task_retry) + task_success.connect(crons_task_success) + task_failure.connect(crons_task_failure) + task_retry.connect(crons_task_retry) def crons_task_success(sender, **kwargs): diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index 7e458e6d14..156c84e204 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -603,10 +603,9 @@ def iter_headers(self): def iter_trace_propagation_headers(self, *args, **kwargs): # type: (Any, Any) -> Generator[Tuple[str, str], None, None] """ - Return HTTP headers which allow propagation of trace data. - - If a span is given, the trace data will taken from the span. - If no span is given, the trace data is taken from the scope. + Return HTTP headers which allow propagation of trace data. Data taken + from the span representing the request, if available, or the current + span on the scope if not. """ client = Scope.get_client() if not client.options.get("propagate_traces"): diff --git a/tests/integrations/celery/test_update_celery_task_headers.py b/tests/integrations/celery/test_update_celery_task_headers.py index a2c5fe3632..e94379f763 100644 --- a/tests/integrations/celery/test_update_celery_task_headers.py +++ b/tests/integrations/celery/test_update_celery_task_headers.py @@ -1,5 +1,4 @@ from copy import copy -import itertools import pytest from unittest import mock @@ -24,18 +23,17 @@ def test_monitor_beat_tasks(monitor_beat_tasks): headers = {} span = None - outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) + updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) assert headers == {} # left unchanged if monitor_beat_tasks: - assert outgoing_headers["sentry-monitor-start-timestamp-s"] == mock.ANY - assert ( - outgoing_headers["headers"]["sentry-monitor-start-timestamp-s"] == mock.ANY - ) + assert updated_headers == { + "headers": {"sentry-monitor-start-timestamp-s": mock.ANY}, + "sentry-monitor-start-timestamp-s": mock.ANY, + } else: - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] + assert updated_headers == headers @pytest.mark.parametrize("monitor_beat_tasks", [True, False, None, "", "bla", 1, 0]) @@ -46,44 +44,35 @@ def test_monitor_beat_tasks_with_headers(monitor_beat_tasks): } span = None - outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) - - assert headers == { - "blub": "foo", - "sentry-something": "bar", - } # left unchanged + updated_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) if monitor_beat_tasks: - assert outgoing_headers["blub"] == "foo" - assert outgoing_headers["sentry-something"] == "bar" - assert outgoing_headers["sentry-monitor-start-timestamp-s"] == mock.ANY - assert outgoing_headers["headers"]["sentry-something"] == "bar" - assert ( - outgoing_headers["headers"]["sentry-monitor-start-timestamp-s"] == mock.ANY - ) + assert updated_headers == { + "blub": "foo", + "sentry-something": "bar", + "headers": { + "sentry-monitor-start-timestamp-s": mock.ANY, + "sentry-something": "bar", + }, + "sentry-monitor-start-timestamp-s": mock.ANY, + } else: - assert outgoing_headers["blub"] == "foo" - assert outgoing_headers["sentry-something"] == "bar" - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] + assert updated_headers == headers def test_span_with_transaction(sentry_init): sentry_init(enable_tracing=True) headers = {} - monitor_beat_tasks = False with sentry_sdk.start_transaction(name="test_transaction") as transaction: with sentry_sdk.start_span(op="test_span") as span: - outgoing_headers = _update_celery_task_headers( - headers, span, monitor_beat_tasks - ) + updated_headers = _update_celery_task_headers(headers, span, False) - assert outgoing_headers["sentry-trace"] == span.to_traceparent() - assert outgoing_headers["headers"]["sentry-trace"] == span.to_traceparent() - assert outgoing_headers["baggage"] == transaction.get_baggage().serialize() + assert updated_headers["sentry-trace"] == span.to_traceparent() + assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent() + assert updated_headers["baggage"] == transaction.get_baggage().serialize() assert ( - outgoing_headers["headers"]["baggage"] + updated_headers["headers"]["baggage"] == transaction.get_baggage().serialize() ) @@ -97,10 +86,10 @@ def test_span_with_transaction_custom_headers(sentry_init): with sentry_sdk.start_transaction(name="test_transaction") as transaction: with sentry_sdk.start_span(op="test_span") as span: - outgoing_headers = _update_celery_task_headers(headers, span, False) + updated_headers = _update_celery_task_headers(headers, span, False) - assert outgoing_headers["sentry-trace"] == span.to_traceparent() - assert outgoing_headers["headers"]["sentry-trace"] == span.to_traceparent() + assert updated_headers["sentry-trace"] == span.to_traceparent() + assert updated_headers["headers"]["sentry-trace"] == span.to_traceparent() incoming_baggage = Baggage.from_incoming_header(headers["baggage"]) combined_baggage = copy(transaction.get_baggage()) @@ -115,112 +104,9 @@ def test_span_with_transaction_custom_headers(sentry_init): if x is not None and x != "" ] ) - assert outgoing_headers["baggage"] == combined_baggage.serialize( + assert updated_headers["baggage"] == combined_baggage.serialize( include_third_party=True ) - assert outgoing_headers["headers"]["baggage"] == combined_baggage.serialize( + assert updated_headers["headers"]["baggage"] == combined_baggage.serialize( include_third_party=True ) - - -@pytest.mark.parametrize("monitor_beat_tasks", [True, False]) -def test_celery_trace_propagation_default(sentry_init, monitor_beat_tasks): - """ - The celery integration does not check the traces_sample_rate. - By default traces_sample_rate is None which means "do not propagate traces". - But the celery integration does not check this value. - The Celery integration has its own mechanism to propagate traces: - https://docs.sentry.io/platforms/python/integrations/celery/#distributed-traces - """ - sentry_init() - - headers = {} - span = None - - scope = sentry_sdk.Scope.get_isolation_scope() - - outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) - - assert outgoing_headers["sentry-trace"] == scope.get_traceparent() - assert outgoing_headers["headers"]["sentry-trace"] == scope.get_traceparent() - assert outgoing_headers["baggage"] == scope.get_baggage().serialize() - assert outgoing_headers["headers"]["baggage"] == scope.get_baggage().serialize() - - if monitor_beat_tasks: - assert "sentry-monitor-start-timestamp-s" in outgoing_headers - assert "sentry-monitor-start-timestamp-s" in outgoing_headers["headers"] - else: - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] - - -@pytest.mark.parametrize( - "traces_sample_rate,monitor_beat_tasks", - list(itertools.product([None, 0, 0.0, 0.5, 1.0, 1, 2], [True, False])), -) -def test_celery_trace_propagation_traces_sample_rate( - sentry_init, traces_sample_rate, monitor_beat_tasks -): - """ - The celery integration does not check the traces_sample_rate. - By default traces_sample_rate is None which means "do not propagate traces". - But the celery integration does not check this value. - The Celery integration has its own mechanism to propagate traces: - https://docs.sentry.io/platforms/python/integrations/celery/#distributed-traces - """ - sentry_init(traces_sample_rate=traces_sample_rate) - - headers = {} - span = None - - scope = sentry_sdk.Scope.get_isolation_scope() - - outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) - - assert outgoing_headers["sentry-trace"] == scope.get_traceparent() - assert outgoing_headers["headers"]["sentry-trace"] == scope.get_traceparent() - assert outgoing_headers["baggage"] == scope.get_baggage().serialize() - assert outgoing_headers["headers"]["baggage"] == scope.get_baggage().serialize() - - if monitor_beat_tasks: - assert "sentry-monitor-start-timestamp-s" in outgoing_headers - assert "sentry-monitor-start-timestamp-s" in outgoing_headers["headers"] - else: - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"] - - -@pytest.mark.parametrize( - "enable_tracing,monitor_beat_tasks", - list(itertools.product([None, True, False], [True, False])), -) -def test_celery_trace_propagation_enable_tracing( - sentry_init, enable_tracing, monitor_beat_tasks -): - """ - The celery integration does not check the traces_sample_rate. - By default traces_sample_rate is None which means "do not propagate traces". - But the celery integration does not check this value. - The Celery integration has its own mechanism to propagate traces: - https://docs.sentry.io/platforms/python/integrations/celery/#distributed-traces - """ - sentry_init(enable_tracing=enable_tracing) - - headers = {} - span = None - - scope = sentry_sdk.Scope.get_isolation_scope() - - outgoing_headers = _update_celery_task_headers(headers, span, monitor_beat_tasks) - - assert outgoing_headers["sentry-trace"] == scope.get_traceparent() - assert outgoing_headers["headers"]["sentry-trace"] == scope.get_traceparent() - assert outgoing_headers["baggage"] == scope.get_baggage().serialize() - assert outgoing_headers["headers"]["baggage"] == scope.get_baggage().serialize() - - if monitor_beat_tasks: - assert "sentry-monitor-start-timestamp-s" in outgoing_headers - assert "sentry-monitor-start-timestamp-s" in outgoing_headers["headers"] - else: - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers - assert "sentry-monitor-start-timestamp-s" not in outgoing_headers["headers"]