Skip to content

Commit

Permalink
Revert "Refactor the Celery Beat integration (#3105)" (#3144)
Browse files Browse the repository at this point in the history
This reverts commit c80cad1, which appears to have introduced a regression preventing checkins from being sent when a cron job is finished.
  • Loading branch information
szokeasaurusrex authored Jun 7, 2024
1 parent c2af1b0 commit d818e8f
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 224 deletions.
17 changes: 9 additions & 8 deletions sentry_sdk/integrations/celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ def __init__(
self.monitor_beat_tasks = monitor_beat_tasks
self.exclude_beat_tasks = exclude_beat_tasks

_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals()
if monitor_beat_tasks:
_patch_beat_apply_entry()
_patch_redbeat_maybe_due()
_setup_celery_beat_signals()

@staticmethod
def setup_once():
Expand Down Expand Up @@ -166,11 +167,11 @@ def _update_celery_task_headers(original_headers, span, monitor_beat_tasks):
"""
updated_headers = original_headers.copy()
with capture_internal_exceptions():
# if span is None (when the task was started by Celery Beat)
# this will return the trace headers from the scope.
headers = dict(
Scope.get_isolation_scope().iter_trace_propagation_headers(span=span)
)
headers = {}
if span is not None:
headers = dict(
Scope.get_current_scope().iter_trace_propagation_headers(span=span)
)

if monitor_beat_tasks:
headers.update(
Expand Down
166 changes: 95 additions & 71 deletions sentry_sdk/integrations/celery/beat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from functools import wraps
import sentry_sdk
from sentry_sdk.crons import capture_checkin, MonitorStatus
from sentry_sdk.integrations import DidNotEnable
Expand Down Expand Up @@ -114,108 +113,133 @@ def _get_monitor_config(celery_schedule, app, monitor_name):
return monitor_config


def _apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration):
# type: (Any, Any, sentry_sdk.integrations.celery.CeleryIntegration) -> None
def _patch_beat_apply_entry():
# type: () -> None
"""
Add Sentry Crons information to the schedule_entry headers.
Makes sure that the Sentry Crons information is set in the Celery Beat task's
headers so that is is monitored with Sentry Crons.
This is only called by Celery Beat. After apply_entry is called
Celery will call apply_async to put the task in the queue.
"""
if not integration.monitor_beat_tasks:
return
from sentry_sdk.integrations.celery import CeleryIntegration

monitor_name = schedule_entry.name
original_apply_entry = Scheduler.apply_entry

task_should_be_excluded = match_regex_list(
monitor_name, integration.exclude_beat_tasks
)
if task_should_be_excluded:
return
def sentry_apply_entry(*args, **kwargs):
# type: (*Any, **Any) -> None
scheduler, schedule_entry = args
app = scheduler.app

celery_schedule = schedule_entry.schedule
app = scheduler.app
celery_schedule = schedule_entry.schedule
monitor_name = schedule_entry.name

monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)
integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return original_apply_entry(*args, **kwargs)

is_supported_schedule = bool(monitor_config)
if not is_supported_schedule:
return
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
return original_apply_entry(*args, **kwargs)

headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)
# Tasks started by Celery Beat start a new Trace
scope = Scope.get_isolation_scope()
scope.set_new_propagation_context()
scope._name = "celery-beat"

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers
is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

return original_apply_entry(*args, **kwargs)

Scheduler.apply_entry = sentry_apply_entry


def _patch_redbeat_maybe_due():
# type: () -> None

if RedBeatScheduler is None:
return

def _wrap_beat_scheduler(f):
# type: (Callable[..., Any]) -> Callable[..., Any]
"""
Makes sure that:
- a new Sentry trace is started for each task started by Celery Beat and
it is propagated to the task.
- the Sentry Crons information is set in the Celery Beat task's
headers so that is is monitored with Sentry Crons.
After the patched function is called,
Celery Beat will call apply_async to put the task in the queue.
"""
from sentry_sdk.integrations.celery import CeleryIntegration

@wraps(f)
def sentry_patched_scheduler(*args, **kwargs):
original_maybe_due = RedBeatScheduler.maybe_due

def sentry_maybe_due(*args, **kwargs):
# type: (*Any, **Any) -> None
scheduler, schedule_entry = args
app = scheduler.app

celery_schedule = schedule_entry.schedule
monitor_name = schedule_entry.name

integration = sentry_sdk.get_client().get_integration(CeleryIntegration)
if integration is None:
return f(*args, **kwargs)
return original_maybe_due(*args, **kwargs)

task_should_be_excluded = match_regex_list(
monitor_name, integration.exclude_beat_tasks
)
if task_should_be_excluded:
return original_maybe_due(*args, **kwargs)

# Tasks started by Celery Beat start a new Trace
scope = Scope.get_isolation_scope()
scope.set_new_propagation_context()
scope._name = "celery-beat"

scheduler, schedule_entry = args
_apply_crons_data_to_schedule_entry(scheduler, schedule_entry, integration)

return f(*args, **kwargs)
monitor_config = _get_monitor_config(celery_schedule, app, monitor_name)

return sentry_patched_scheduler
is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

def _patch_beat_apply_entry():
# type: () -> None
Scheduler.apply_entry = _wrap_beat_scheduler(Scheduler.apply_entry)
# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

return original_maybe_due(*args, **kwargs)

def _patch_redbeat_maybe_due():
# type: () -> None
if RedBeatScheduler is None:
return

RedBeatScheduler.maybe_due = _wrap_beat_scheduler(RedBeatScheduler.maybe_due)
RedBeatScheduler.maybe_due = sentry_maybe_due


def _setup_celery_beat_signals():
# type: () -> None
from sentry_sdk.integrations.celery import CeleryIntegration

integration = sentry_sdk.get_client().get_integration(CeleryIntegration)

if integration is not None and integration.monitor_beat_tasks:
task_success.connect(crons_task_success)
task_failure.connect(crons_task_failure)
task_retry.connect(crons_task_retry)
task_success.connect(crons_task_success)
task_failure.connect(crons_task_failure)
task_retry.connect(crons_task_retry)


def crons_task_success(sender, **kwargs):
Expand Down
7 changes: 3 additions & 4 deletions sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,9 @@ def iter_headers(self):
def iter_trace_propagation_headers(self, *args, **kwargs):
# type: (Any, Any) -> Generator[Tuple[str, str], None, None]
"""
Return HTTP headers which allow propagation of trace data.
If a span is given, the trace data will taken from the span.
If no span is given, the trace data is taken from the scope.
Return HTTP headers which allow propagation of trace data. Data taken
from the span representing the request, if available, or the current
span on the scope if not.
"""
client = Scope.get_client()
if not client.options.get("propagate_traces"):
Expand Down
Loading

0 comments on commit d818e8f

Please sign in to comment.