Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(hybrid-cloud): Adds silo modes to most unmarked tasks #54086

Merged
merged 9 commits into from
Aug 8, 2023
10 changes: 9 additions & 1 deletion src/sentry/data_export/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import celery
import sentry_sdk

from sentry.silo import SiloMode

# XXX(mdtro): backwards compatible imports for celery 4.4.7, remove after upgrade to 5.2.7
if celery.version_info >= (5, 2):
from celery import current_task
Expand Down Expand Up @@ -53,6 +55,7 @@
default_retry_delay=60,
max_retries=3,
acks_late=True,
silo_mode=SiloMode.REGION,
)
def assemble_download(
data_export_id,
Expand Down Expand Up @@ -297,7 +300,12 @@ def store_export_chunk_as_blob(data_export, bytes_written, fileobj, blob_size=DE
return 0


@instrumented_task(name="sentry.data_export.tasks.merge_blobs", queue="data_export", acks_late=True)
@instrumented_task(
name="sentry.data_export.tasks.merge_blobs",
queue="data_export",
acks_late=True,
silo_mode=SiloMode.REGION,
)
def merge_export_blobs(data_export_id, **kwargs):
with sentry_sdk.start_span(op="merge"):
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
)
from sentry.models import Organization, Project
from sentry.sentry_metrics import indexer
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.referrer import Referrer
Expand All @@ -71,6 +72,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60,
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def boost_low_volume_projects(context: TaskContext) -> None:
Expand All @@ -91,6 +93,7 @@ def boost_low_volume_projects(context: TaskContext) -> None:
max_retries=5,
soft_time_limit=25 * 60,
time_limit=2 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task
def boost_low_volume_projects_of_org(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from sentry.models import Organization
from sentry.sentry_metrics import indexer
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.referrer import Referrer
Expand Down Expand Up @@ -92,6 +93,7 @@ class ProjectTransactionsTotals(TypedDict, total=True):
max_retries=5,
soft_time_limit=2 * 60 * 60,
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def boost_low_volume_transactions(context: TaskContext) -> None:
Expand Down Expand Up @@ -146,6 +148,7 @@ def boost_low_volume_transactions(context: TaskContext) -> None:
max_retries=5,
soft_time_limit=25 * 60,
time_limit=2 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task
def boost_low_volume_transactions_of_project(project_transactions: ProjectTransactions) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/collect_orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sentry.dynamic_sampling.tasks.logging import log_task_execution, log_task_timeout
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task


Expand All @@ -16,6 +17,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60,
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task
def collect_orgs() -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/recalibrate_orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task_with_context
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task

# Since we are using a granularity of 60 (minute granularity), we want to have a higher time upper limit for executing
Expand All @@ -51,6 +52,7 @@ def __init__(self, org_id, message):
max_retries=5,
soft_time_limit=2 * 60 * 60, # 2hours
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def recalibrate_orgs(context: TaskContext) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/sliding_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task_with_context
from sentry.sentry_metrics import indexer
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset, EntityKey
from sentry.snuba.metrics.naming_layer.mri import TransactionMRI
from sentry.snuba.referrer import Referrer
Expand All @@ -57,6 +58,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60, # 2 hours
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def sliding_window(context: TaskContext) -> None:
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/tasks/sliding_window_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.tasks.utils import dynamic_sampling_task_with_context
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task


Expand All @@ -30,6 +31,7 @@
max_retries=5,
soft_time_limit=2 * 60 * 60, # 2 hours
time_limit=2 * 60 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def sliding_window_org(context: TaskContext) -> None:
Expand Down
9 changes: 8 additions & 1 deletion src/sentry/incidents/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sentry.models import Project
from sentry.services.hybrid_cloud.user import RpcUser
from sentry.services.hybrid_cloud.user.service import user_service
from sentry.silo import SiloMode
from sentry.snuba.dataset import Dataset
from sentry.snuba.models import QuerySubscription
from sentry.snuba.query_subscriptions.consumer import register_subscriber
Expand All @@ -36,7 +37,11 @@
SUBSCRIPTION_METRICS_LOGGER = "subscription_metrics_logger"


@instrumented_task(name="sentry.incidents.tasks.send_subscriber_notifications", queue="incidents")
@instrumented_task(
name="sentry.incidents.tasks.send_subscriber_notifications",
queue="incidents",
silo_mode=SiloMode.REGION,
)
def send_subscriber_notifications(activity_id: int) -> None:
from sentry.incidents.logic import get_incident_subscribers, unsubscribe_from_incident

Expand Down Expand Up @@ -169,6 +174,7 @@ def handle_snuba_query_update(
queue="incidents",
default_retry_delay=60,
max_retries=5,
silo_mode=SiloMode.REGION,
)
def handle_trigger_action(
action_id: int,
Expand Down Expand Up @@ -215,6 +221,7 @@ def handle_trigger_action(
queue="incidents",
default_retry_delay=60,
max_retries=2,
silo_mode=SiloMode.REGION,
)
def auto_resolve_snapshot_incidents(alert_rule_id: int, **kwargs: Any) -> None:
from sentry.incidents.logic import update_incident_status
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/issues/forecasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from sentry.issues.escalating_group_forecast import EscalatingGroupForecast
from sentry.issues.escalating_issues_alg import generate_issue_forecast, standard_version
from sentry.models import Group
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,6 +66,7 @@ def generate_and_save_forecasts(groups: Sequence[Group]) -> None:
@instrumented_task(
name="sentry.tasks.weekly_escalating_forecast.generate_and_save_missing_forecasts",
queue="weekly_escalating_forecast",
silo_mode=SiloMode.REGION,
)
def generate_and_save_missing_forecasts(group_id: int) -> None:
"""
Expand Down
15 changes: 13 additions & 2 deletions src/sentry/monitors/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from django.utils import timezone

from sentry.constants import ObjectStatus
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics

Expand Down Expand Up @@ -40,7 +41,12 @@
SUBTITLE_DATETIME_FORMAT = "%b %d, %I:%M %p"


@instrumented_task(name="sentry.monitors.tasks.check_missing", time_limit=15, soft_time_limit=10)
@instrumented_task(
name="sentry.monitors.tasks.check_missing",
time_limit=15,
soft_time_limit=10,
silo_mode=SiloMode.REGION,
)
def check_missing(current_datetime=None):
if current_datetime is None:
current_datetime = timezone.now()
Expand Down Expand Up @@ -106,7 +112,12 @@ def check_missing(current_datetime=None):
logger.exception("Exception in check_monitors - mark missed")


@instrumented_task(name="sentry.monitors.tasks.check_timeout", time_limit=15, soft_time_limit=10)
@instrumented_task(
name="sentry.monitors.tasks.check_timeout",
time_limit=15,
soft_time_limit=10,
silo_mode=SiloMode.REGION,
)
def check_timeout(current_datetime=None):
if current_datetime is None:
current_datetime = timezone.now()
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/profiles/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from sentry.profiles.java import deobfuscate_signature
from sentry.profiles.utils import get_from_profiling_service
from sentry.signals import first_profile_received
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import json, metrics
from sentry.utils.outcomes import Outcome, track_outcome
Expand All @@ -46,6 +47,7 @@ class VroomTimeout(Exception):
acks_late=True,
task_time_limit=60,
task_acks_on_failure_or_timeout=False,
silo_mode=SiloMode.REGION,
)
def process_profile_task(
profile: Optional[Profile] = None,
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/replays/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sentry.replays.lib.storage import FilestoreBlob, StorageBlob
from sentry.replays.models import ReplayRecordingSegment
from sentry.replays.usecases.reader import fetch_segments_metadata
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import json
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand All @@ -22,6 +23,7 @@
queue="replays.delete_replay",
default_retry_delay=5,
max_retries=5,
silo_mode=SiloMode.REGION,
)
def delete_recording_segments(project_id: int, replay_id: str, **kwargs: Any) -> None:
"""Asynchronously delete a replay."""
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/snuba/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
SUBSCRIPTION_STATUS_MAX_AGE = timedelta(minutes=10)


# TODO(hybrid-cloud): Mark this as region silo only once testing/decorator
# interaction is cleaned up
@instrumented_task(
name="sentry.snuba.tasks.create_subscription_in_snuba",
queue="subscriptions",
Expand Down
5 changes: 4 additions & 1 deletion src/sentry/tasks/activity.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging

from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils.safe import safe_execute
from sentry.utils.sdk import bind_organization_context
Expand Down Expand Up @@ -27,7 +28,9 @@ def get_activity_notifiers(project):


@instrumented_task(
name="sentry.tasks.activity.send_activity_notifications", queue="activity.notify"
name="sentry.tasks.activity.send_activity_notifications",
queue="activity.notify",
silo_mode=SiloMode.REGION,
)
def send_activity_notifications(activity_id):
from sentry.models import Activity, Organization
Expand Down
13 changes: 11 additions & 2 deletions src/sentry/tasks/assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ReleaseArtifactBundle,
)
from sentry.models.releasefile import ReleaseArchive, update_artifact_index
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.db import atomic_transaction
Expand Down Expand Up @@ -191,7 +192,11 @@ def set_assemble_status(task, scope, checksum, state, detail=None):
default_cache.set(cache_key, (state, detail), 600)


@instrumented_task(name="sentry.tasks.assemble.assemble_dif", queue="assemble")
@instrumented_task(
name="sentry.tasks.assemble.assemble_dif",
queue="assemble",
silo_mode=SiloMode.REGION,
)
def assemble_dif(project_id, name, checksum, chunks, debug_id=None, **kwargs):
"""
Assembles uploaded chunks into a ``ProjectDebugFile``.
Expand Down Expand Up @@ -802,7 +807,11 @@ def prepare_post_assembler(
)


@instrumented_task(name="sentry.tasks.assemble.assemble_artifacts", queue="assemble")
@instrumented_task(
name="sentry.tasks.assemble.assemble_artifacts",
queue="assemble",
silo_mode=SiloMode.REGION,
)
def assemble_artifacts(
org_id,
version,
Expand Down
5 changes: 5 additions & 0 deletions src/sentry/tasks/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from sentry.services.hybrid_cloud.organization import organization_service
from sentry.services.hybrid_cloud.user import RpcUser
from sentry.services.hybrid_cloud.user.service import user_service
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task, retry
from sentry.utils.audit import create_audit_entry_from_user
from sentry.utils.email import MessageBuilder
Expand All @@ -21,6 +22,7 @@
logger = logging.getLogger("sentry.auth")


# TODO(hybrid-cloud): Remove cross-silo DB accesses, or review control silo usages
@instrumented_task(name="sentry.tasks.send_sso_link_emails", queue="auth")
def email_missing_links(org_id: int, actor_id: int, provider_key: str, **kwargs):
try:
Expand All @@ -42,6 +44,7 @@ def email_missing_links(org_id: int, actor_id: int, provider_key: str, **kwargs)
member.send_sso_link_email(user.id, provider)


# TODO(hybrid-cloud): Remove cross-silo DB accesses, or review control silo usages
@instrumented_task(name="sentry.tasks.email_unlink_notifications", queue="auth")
def email_unlink_notifications(org_id: int, actor_id: int, provider_key: str):
try:
Expand Down Expand Up @@ -164,6 +167,7 @@ def call_to_action(self, org: Organization, user: RpcUser, member: OrganizationM
queue="auth",
default_retry_delay=60 * 5,
max_retries=5,
silo_mode=SiloMode.REGION,
)
@retry
def remove_2fa_non_compliant_members(org_id, actor_id=None, actor_key_id=None, ip_address=None):
Expand Down Expand Up @@ -220,6 +224,7 @@ def call_to_action(self, org: Organization, user: RpcUser, member: OrganizationM
queue="auth",
default_retry_delay=60 * 5,
max_retries=5,
silo_mode=SiloMode.REGION,
)
@retry
def remove_email_verification_non_compliant_members(
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/tasks/auto_enable_codecov.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from sentry import audit_log, features
from sentry.integrations.utils.codecov import has_codecov_integration
from sentry.models.organization import Organization, OrganizationStatus
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils.audit import create_system_audit_entry
from sentry.utils.query import RangeQuerySetWrapper
Expand All @@ -14,6 +15,7 @@
name="sentry.tasks.auto_enable_codecov.enable_for_org",
queue="auto_enable_codecov",
max_retries=0,
silo_mode=SiloMode.REGION,
)
def enable_for_org(dry_run: bool = False) -> None:
"""
Expand Down
Loading
Loading