Skip to content

Commit

Permalink
chore(hybrid-cloud): Adds silo modes to most unmarked tasks (#54086)
Browse files Browse the repository at this point in the history
Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com>
  • Loading branch information
GabeVillalobos and getsantry[bot] authored Aug 8, 2023
1 parent 38cdf23 commit 2d2c17b
Show file tree
Hide file tree
Showing 69 changed files with 285 additions and 32 deletions.
10 changes: 9 additions & 1 deletion src/sentry/data_export/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import celery
import sentry_sdk

from sentry.silo import SiloMode

# XXX(mdtro): backwards compatible imports for celery 4.4.7, remove after upgrade to 5.2.7
if celery.version_info >= (5, 2):
from celery import current_task
Expand Down Expand Up @@ -53,6 +55,7 @@
default_retry_delay=60,
max_retries=3,
acks_late=True,
silo_mode=SiloMode.REGION,
)
def assemble_download(
data_export_id,
Expand Down Expand Up @@ -297,7 +300,12 @@ def store_export_chunk_as_blob(data_export, bytes_written, fileobj, blob_size=DE
return 0


@instrumented_task(name="sentry.data_export.tasks.merge_blobs", queue="data_export", acks_late=True)
@instrumented_task(
name="sentry.data_export.tasks.merge_blobs",
queue="data_export",
acks_late=True,
silo_mode=SiloMode.REGION,
)
def merge_export_blobs(data_export_id, **kwargs):
with sentry_sdk.start_span(op="merge"):
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
)
from sentry.models import Organization, Project
from sentry.sentry_metrics import indexer
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.referrer import Referrer
Expand All @@ -71,6 +72,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60,
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def boost_low_volume_projects(context: TaskContext) -> None:
Expand All @@ -91,6 +93,7 @@ def boost_low_volume_projects(context: TaskContext) -> None:
max_retries=5,
soft_time_limit=25 * 60,
time_limit=2 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task
def boost_low_volume_projects_of_org(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from sentry.models import Organization
from sentry.sentry_metrics import indexer
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.referrer import Referrer
Expand Down Expand Up @@ -92,6 +93,7 @@ class ProjectTransactionsTotals(TypedDict, total=True):
max_retries=5,
soft_time_limit=2 * 60 * 60,
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def boost_low_volume_transactions(context: TaskContext) -> None:
Expand Down Expand Up @@ -146,6 +148,7 @@ def boost_low_volume_transactions(context: TaskContext) -> None:
max_retries=5,
soft_time_limit=25 * 60,
time_limit=2 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task
def boost_low_volume_transactions_of_project(project_transactions: ProjectTransactions) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/collect_orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sentry.dynamic_sampling.tasks.logging import log_task_execution, log_task_timeout
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task


Expand All @@ -16,6 +17,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60,
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task
def collect_orgs() -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/recalibrate_orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task_with_context
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task

# Since we are using a granularity of 60 (minute granularity), we want to have a higher time upper limit for executing
Expand All @@ -51,6 +52,7 @@ def __init__(self, org_id, message):
max_retries=5,
soft_time_limit=2 * 60 * 60, # 2hours
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def recalibrate_orgs(context: TaskContext) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/sliding_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task_with_context
from sentry.sentry_metrics import indexer
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.referrer import Referrer
Expand All @@ -57,6 +58,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60, # 2 hours
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def sliding_window(context: TaskContext) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/sliding_window_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task_with_context
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task


Expand All @@ -30,6 +31,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60, # 2 hours
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def sliding_window_org(context: TaskContext) -> None:
Expand Down
9 changes: 8 additions & 1 deletion src/sentry/incidents/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sentry.models import Project
from sentry.services.hybrid_cloud.user import RpcUser
from sentry.services.hybrid_cloud.user.service import user_service
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset
from sentry.snuba.models import QuerySubscription
from sentry.snuba.query_subscriptions.consumer import register_subscriber
Expand All @@ -36,7 +37,11 @@
SUBSCRIPTION_METRICS_LOGGER = "subscription_metrics_logger"


@instrumented_task(name="sentry.incidents.tasks.send_subscriber_notifications", queue="incidents")
@instrumented_task(
name="sentry.incidents.tasks.send_subscriber_notifications",
queue="incidents",
silo_mode=SiloMode.REGION,
)
def send_subscriber_notifications(activity_id: int) -> None:
from sentry.incidents.logic import get_incident_subscribers, unsubscribe_from_incident

Expand Down Expand Up @@ -172,6 +177,7 @@ def handle_snuba_query_update(
queue="incidents",
default_retry_delay=60,
max_retries=5,
silo_mode=SiloMode.REGION,
)
def handle_trigger_action(
action_id: int,
Expand Down Expand Up @@ -218,6 +224,7 @@ def handle_trigger_action(
queue="incidents",
default_retry_delay=60,
max_retries=2,
silo_mode=SiloMode.REGION,
)
def auto_resolve_snapshot_incidents(alert_rule_id: int, **kwargs: Any) -> None:
from sentry.incidents.logic import update_incident_status
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/issues/forecasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sentry.issues.escalating_group_forecast import EscalatingGroupForecast
from sentry.issues.escalating_issues_alg import generate_issue_forecast, standard_version
from sentry.models import Group
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,6 +66,7 @@ def generate_and_save_forecasts(groups: Sequence[Group]) -> None:
@instrumented_task(
name="sentry.tasks.weekly_escalating_forecast.generate_and_save_missing_forecasts",
queue="weekly_escalating_forecast",
silo_mode=SiloMode.REGION,
)
def generate_and_save_missing_forecasts(group_id: int) -> None:
"""
Expand Down
15 changes: 13 additions & 2 deletions src/sentry/monitors/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.utils import timezone

from sentry.constants import ObjectStatus
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics

Expand Down Expand Up @@ -40,7 +41,12 @@
SUBTITLE_DATETIME_FORMAT = "%b %d, %I:%M %p"


@instrumented_task(name="sentry.monitors.tasks.check_missing", time_limit=15, soft_time_limit=10)
@instrumented_task(
name="sentry.monitors.tasks.check_missing",
time_limit=15,
soft_time_limit=10,
silo_mode=SiloMode.REGION,
)
def check_missing(current_datetime=None):
if current_datetime is None:
current_datetime = timezone.now()
Expand Down Expand Up @@ -106,7 +112,12 @@ def check_missing(current_datetime=None):
logger.exception("Exception in check_monitors - mark missed")


@instrumented_task(name="sentry.monitors.tasks.check_timeout", time_limit=15, soft_time_limit=10)
@instrumented_task(
name="sentry.monitors.tasks.check_timeout",
time_limit=15,
soft_time_limit=10,
silo_mode=SiloMode.REGION,
)
def check_timeout(current_datetime=None):
if current_datetime is None:
current_datetime = timezone.now()
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/profiles/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sentry.profiles.java import deobfuscate_signature
from sentry.profiles.utils import get_from_profiling_service
from sentry.signals import first_profile_received
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import json, metrics
from sentry.utils.outcomes import Outcome, track_outcome
Expand All @@ -46,6 +47,7 @@ class VroomTimeout(Exception):
acks_late=True,
task_time_limit=60,
task_acks_on_failure_or_timeout=False,
silo_mode=SiloMode.REGION,
)
def process_profile_task(
profile: Optional[Profile] = None,
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/replays/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sentry.replays.lib.storage import FilestoreBlob, StorageBlob
from sentry.replays.models import ReplayRecordingSegment
from sentry.replays.usecases.reader import fetch_segments_metadata
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import json
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand All @@ -22,6 +23,7 @@
queue="replays.delete_replay",
default_retry_delay=5,
max_retries=5,
silo_mode=SiloMode.REGION,
)
def delete_recording_segments(project_id: int, replay_id: str, **kwargs: Any) -> None:
"""Asynchronously delete a replay."""
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/snuba/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
SUBSCRIPTION_STATUS_MAX_AGE = timedelta(minutes=10)


# TODO(hybrid-cloud): Mark this as region silo only once testing/decorator
# interaction is cleaned up
@instrumented_task(
name="sentry.snuba.tasks.create_subscription_in_snuba",
queue="subscriptions",
Expand Down
5 changes: 4 additions & 1 deletion src/sentry/tasks/activity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils.safe import safe_execute
from sentry.utils.sdk import bind_organization_context
Expand Down Expand Up @@ -27,7 +28,9 @@ def get_activity_notifiers(project):


@instrumented_task(
name="sentry.tasks.activity.send_activity_notifications", queue="activity.notify"
name="sentry.tasks.activity.send_activity_notifications",
queue="activity.notify",
silo_mode=SiloMode.REGION,
)
def send_activity_notifications(activity_id):
from sentry.models import Activity, Organization
Expand Down
13 changes: 11 additions & 2 deletions src/sentry/tasks/assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ReleaseArtifactBundle,
)
from sentry.models.releasefile import ReleaseArchive, update_artifact_index
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.db import atomic_transaction
Expand Down Expand Up @@ -191,7 +192,11 @@ def set_assemble_status(task, scope, checksum, state, detail=None):
default_cache.set(cache_key, (state, detail), 600)


@instrumented_task(name="sentry.tasks.assemble.assemble_dif", queue="assemble")
@instrumented_task(
name="sentry.tasks.assemble.assemble_dif",
queue="assemble",
silo_mode=SiloMode.REGION,
)
def assemble_dif(project_id, name, checksum, chunks, debug_id=None, **kwargs):
"""
Assembles uploaded chunks into a ``ProjectDebugFile``.
Expand Down Expand Up @@ -802,7 +807,11 @@ def prepare_post_assembler(
)


@instrumented_task(name="sentry.tasks.assemble.assemble_artifacts", queue="assemble")
@instrumented_task(
name="sentry.tasks.assemble.assemble_artifacts",
queue="assemble",
silo_mode=SiloMode.REGION,
)
def assemble_artifacts(
org_id,
version,
Expand Down
5 changes: 5 additions & 0 deletions src/sentry/tasks/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sentry.services.hybrid_cloud.organization import organization_service
from sentry.services.hybrid_cloud.user import RpcUser
from sentry.services.hybrid_cloud.user.service import user_service
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task, retry
from sentry.utils.audit import create_audit_entry_from_user
from sentry.utils.email import MessageBuilder
Expand All @@ -21,6 +22,7 @@
logger = logging.getLogger("sentry.auth")


# TODO(hybrid-cloud): Remove cross-silo DB accesses, or review control silo usages
@instrumented_task(name="sentry.tasks.send_sso_link_emails", queue="auth")
def email_missing_links(org_id: int, actor_id: int, provider_key: str, **kwargs):
try:
Expand All @@ -42,6 +44,7 @@ def email_missing_links(org_id: int, actor_id: int, provider_key: str, **kwargs)
member.send_sso_link_email(user.id, provider)


# TODO(hybrid-cloud): Remove cross-silo DB accesses, or review control silo usages
@instrumented_task(name="sentry.tasks.email_unlink_notifications", queue="auth")
def email_unlink_notifications(org_id: int, actor_id: int, provider_key: str):
try:
Expand Down Expand Up @@ -164,6 +167,7 @@ def call_to_action(self, org: Organization, user: RpcUser, member: OrganizationM
queue="auth",
default_retry_delay=60 * 5,
max_retries=5,
silo_mode=SiloMode.REGION,
)
@retry
def remove_2fa_non_compliant_members(org_id, actor_id=None, actor_key_id=None, ip_address=None):
Expand Down Expand Up @@ -220,6 +224,7 @@ def call_to_action(self, org: Organization, user: RpcUser, member: OrganizationM
queue="auth",
default_retry_delay=60 * 5,
max_retries=5,
silo_mode=SiloMode.REGION,
)
@retry
def remove_email_verification_non_compliant_members(
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/tasks/auto_enable_codecov.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from sentry import audit_log, features
from sentry.integrations.utils.codecov import has_codecov_integration
from sentry.models.organization import Organization, OrganizationStatus
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils.audit import create_system_audit_entry
from sentry.utils.query import RangeQuerySetWrapper
Expand All @@ -14,6 +15,7 @@
name="sentry.tasks.auto_enable_codecov.enable_for_org",
queue="auto_enable_codecov",
max_retries=0,
silo_mode=SiloMode.REGION,
)
def enable_for_org(dry_run: bool = False) -> None:
"""
Expand Down
Loading

0 comments on commit 2d2c17b

Please sign in to comment.