diff --git a/src/sentry/models/organization.py b/src/sentry/models/organization.py index c89da812ab841..7fb430cfc83c0 100644 --- a/src/sentry/models/organization.py +++ b/src/sentry/models/organization.py @@ -29,6 +29,7 @@ sane_repr, ) from sentry.db.models.utils import slugify_instance +from sentry.db.postgres.transactions import in_test_hide_transaction_boundary from sentry.locks import locks from sentry.models.options.option import OptionMixin from sentry.models.organizationmember import OrganizationMember @@ -323,7 +324,8 @@ def get_owners(self) -> Sequence[RpcUser]: "user_id", flat=True ) - return user_service.get_many(filter={"user_ids": list(owners)}) + with in_test_hide_transaction_boundary(): + return user_service.get_many(filter={"user_ids": list(owners)}) def get_default_owner(self) -> RpcUser: if not hasattr(self, "_default_owner"): diff --git a/src/sentry/services/hybrid_cloud/auth/impl.py b/src/sentry/services/hybrid_cloud/auth/impl.py index 3c262be31d9d8..635224141071c 100644 --- a/src/sentry/services/hybrid_cloud/auth/impl.py +++ b/src/sentry/services/hybrid_cloud/auth/impl.py @@ -193,7 +193,7 @@ def authenticate(self, *, request: AuthenticationRequest) -> MiddlewareAuthentic elif fake_request.user is not None and not fake_request.user.is_anonymous: with transaction.atomic(using=router.db_for_read(User)): result.user = self._load_auth_user(fake_request.user) - transaction.set_rollback(True) + transaction.set_rollback(True, using=router.db_for_read(User)) if SiloMode.single_process_silo_mode(): connections.close_all() diff --git a/src/sentry/silo/patches/silo_aware_transaction_patch.py b/src/sentry/silo/patches/silo_aware_transaction_patch.py index f22fa6ec479a3..5f671bbbbd53b 100644 --- a/src/sentry/silo/patches/silo_aware_transaction_patch.py +++ b/src/sentry/silo/patches/silo_aware_transaction_patch.py @@ -16,6 +16,10 @@ class MismatchedSiloTransactionError(Exception): pass +class TransactionMissingDBException(Exception): + pass + + def _get_db_for_model_if_available(model: Type["Model"]) -> Optional[str]: from sentry.db.router import SiloConnectionUnavailableError @@ -28,36 +32,36 @@ def _get_db_for_model_if_available(model: Type["Model"]) -> Optional[str]: def siloed_atomic( using: Optional[str] = None, savepoint: bool = True, durable: bool = False ) -> Atomic: - using = determine_using_by_silo_mode(using) + validate_transaction_using_for_silo_mode(using) return _default_atomic_impl(using=using, savepoint=savepoint, durable=durable) def siloed_get_connection(using: Optional[str] = None) -> BaseDatabaseWrapper: - using = determine_using_by_silo_mode(using) + validate_transaction_using_for_silo_mode(using) return _default_get_connection(using=using) def siloed_on_commit(func: Callable[..., Any], using: Optional[str] = None) -> None: - using = determine_using_by_silo_mode(using) + validate_transaction_using_for_silo_mode(using) return _default_on_commit(func, using) -def determine_using_by_silo_mode(using: Optional[str]) -> str: +def validate_transaction_using_for_silo_mode(using: Optional[str]): from sentry.models import ControlOutbox, RegionOutbox from sentry.silo import SiloMode + if using is None: + raise TransactionMissingDBException("'using' must be specified when creating a transaction") + current_silo_mode = SiloMode.get_current_mode() control_db = _get_db_for_model_if_available(ControlOutbox) region_db = _get_db_for_model_if_available(RegionOutbox) - if not using: - using = region_db if current_silo_mode == SiloMode.REGION else control_db - assert using - both_silos_route_to_same_db = control_db == region_db if both_silos_route_to_same_db or current_silo_mode == SiloMode.MONOLITH: - pass + return + elif using == control_db and current_silo_mode != SiloMode.CONTROL: raise MismatchedSiloTransactionError( f"Cannot use transaction.atomic({using}) except in Control Mode" @@ -67,7 +71,6 @@ def determine_using_by_silo_mode(using: Optional[str]) -> str: raise MismatchedSiloTransactionError( f"Cannot use transaction.atomic({using}) except in Region Mode" ) - return using def patch_silo_aware_atomic(): diff --git a/src/sentry/silo/safety.py b/src/sentry/silo/safety.py index ff3d535a551f5..6c013f157aa47 100644 --- a/src/sentry/silo/safety.py +++ b/src/sentry/silo/safety.py @@ -8,7 +8,9 @@ from django.db.transaction import get_connection -from sentry.silo.patches.silo_aware_transaction_patch import determine_using_by_silo_mode +from sentry.silo.patches.silo_aware_transaction_patch import ( + validate_transaction_using_for_silo_mode, +) _fence_re = re.compile(r"select\s*\'(?Pstart|end)_role_override", re.IGNORECASE) _fencing_counters: MutableMapping[str, int] = defaultdict(int) @@ -19,7 +21,7 @@ def match_fence_query(query: str) -> Optional[re.Match[str]]: @contextlib.contextmanager -def unguarded_write(using: str | None = None, *args: Any, **kwargs: Any): +def unguarded_write(using: str, *args: Any, **kwargs: Any): """ Used to indicate that the wrapped block is safe to do mutations on outbox backed records. @@ -37,7 +39,8 @@ def unguarded_write(using: str | None = None, *args: Any, **kwargs: Any): yield return - using = determine_using_by_silo_mode(using) + validate_transaction_using_for_silo_mode(using) + _fencing_counters[using] += 1 with get_connection(using).cursor() as conn: diff --git a/src/sentry/testutils/factories.py b/src/sentry/testutils/factories.py index c6e2abd5d6bd7..0296d93261acc 100644 --- a/src/sentry/testutils/factories.py +++ b/src/sentry/testutils/factories.py @@ -21,7 +21,6 @@ from django.utils.text import slugify from sentry.constants import SentryAppInstallationStatus, SentryAppStatus -from sentry.db.postgres.transactions import in_test_hide_transaction_boundary from sentry.event_manager import EventManager from sentry.incidents.logic import ( create_alert_rule, @@ -377,10 +376,9 @@ def create_project(organization=None, teams=None, fire_project_created=False, ** for team in teams: project.add_team(team) if fire_project_created: - with in_test_hide_transaction_boundary(): - project_created.send( - project=project, user=AnonymousUser(), default_rules=True, sender=Factories - ) + project_created.send( + project=project, user=AnonymousUser(), default_rules=True, sender=Factories + ) return project @staticmethod diff --git a/src/sentry/testutils/hybrid_cloud.py b/src/sentry/testutils/hybrid_cloud.py index 72c58372dc4e3..54b7abc4678cf 100644 --- a/src/sentry/testutils/hybrid_cloud.py +++ b/src/sentry/testutils/hybrid_cloud.py @@ -22,6 +22,7 @@ from django.db import connections, transaction from django.db.backends.base.base import BaseDatabaseWrapper +from sentry.db.postgres.transactions import in_test_transaction_enforcement from sentry.models.organizationmember import OrganizationMember from sentry.models.organizationmembermapping import OrganizationMemberMapping from sentry.services.hybrid_cloud import DelegatedBySiloMode, hc_test_stub @@ -182,6 +183,9 @@ def __init__(self, alias: str): self.alias = alias def __call__(self, execute: Callable[..., Any], *params: Any) -> Any: + if not in_test_transaction_enforcement.enabled: + return execute(*params) + open_transactions = simulated_transaction_watermarks.connections_above_watermark() # If you are hitting this, it means you have two open transactions working in differing databases at the same # time. This is problematic in general for a variety of reasons -- it will never be possible to atomically diff --git a/src/sentry/web/frontend/unsubscribe_notifications.py b/src/sentry/web/frontend/unsubscribe_notifications.py index dc817aa5714dc..961e992b94233 100644 --- a/src/sentry/web/frontend/unsubscribe_notifications.py +++ b/src/sentry/web/frontend/unsubscribe_notifications.py @@ -1,6 +1,6 @@ import abc -from django.db import transaction +from django.db import router, transaction from django.http import Http404, HttpResponse, HttpResponseRedirect from django.utils.decorators import method_decorator from django.views.decorators.cache import never_cache @@ -19,7 +19,7 @@ class UnsubscribeBaseView(BaseView, metaclass=abc.ABCMeta): @never_cache @signed_auth_required_m def handle(self, request: Request, **kwargs) -> HttpResponse: - with transaction.atomic(): + with transaction.atomic(using=router.db_for_write(OrganizationMember)): if not getattr(request, "user_from_signed_request", False): raise Http404 diff --git a/tests/sentry/silo/test_silo_aware_transaction_patch.py b/tests/sentry/silo/test_silo_aware_transaction_patch.py index 60c2295f59002..0e5a8e4ac6ca8 100644 --- a/tests/sentry/silo/test_silo_aware_transaction_patch.py +++ b/tests/sentry/silo/test_silo_aware_transaction_patch.py @@ -8,6 +8,7 @@ from sentry.silo import SiloMode from sentry.silo.patches.silo_aware_transaction_patch import ( MismatchedSiloTransactionError, + TransactionMissingDBException, siloed_atomic, ) from sentry.testutils import TestCase @@ -18,31 +19,16 @@ def is_running_in_split_db_mode() -> bool: class TestSiloAwareTransactionPatchInSingleDbMode(TestCase): - @pytest.mark.skipif(is_running_in_split_db_mode(), reason="only runs in single db mode") - def test_routes_to_correct_db_in_control_silo(self): - with override_settings(SILO_MODE=SiloMode.CONTROL): - transaction_in_test = siloed_atomic() - assert transaction_in_test.using == "default" - - @pytest.mark.skipif(is_running_in_split_db_mode(), reason="only runs in single db mode") - def test_routes_to_correct_db_in_region_silo(self): - - with override_settings(SILO_MODE=SiloMode.REGION): - transaction_in_test = siloed_atomic() - assert transaction_in_test.using == "default" - def test_correctly_accepts_using_for_atomic(self): transaction_in_test = siloed_atomic(using="foobar") assert transaction_in_test.using == "foobar" + def test_accepts_cross_silo_atomics_in_monolith_mode(self): + siloed_atomic(using=router.db_for_write(Organization)) + siloed_atomic(using=router.db_for_write(OrganizationMapping)) -class TestSiloAwareTransactionPatchInSplitDbMode(TestCase): - @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode") - def test_routes_to_correct_db_in_control_silo(self): - with override_settings(SILO_MODE=SiloMode.REGION): - transaction_in_test = siloed_atomic() - assert transaction_in_test.using == "default" +class TestSiloAwareTransactionPatchInSplitDbMode(TestCase): @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode") def test_fails_if_silo_mismatch_with_using_in_region_silo(self): with override_settings(SILO_MODE=SiloMode.REGION), pytest.raises( @@ -50,15 +36,24 @@ def test_fails_if_silo_mismatch_with_using_in_region_silo(self): ): siloed_atomic(using=router.db_for_write(OrganizationMapping)) - @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode") - def test_routes_to_correct_db_in_region_silo(self): - with override_settings(SILO_MODE=SiloMode.CONTROL): - transaction_in_test = siloed_atomic() - assert transaction_in_test.using == "control" - @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode") def test_fails_if_silo_mismatch_with_using_in_control_silo(self): with override_settings(SILO_MODE=SiloMode.CONTROL), pytest.raises( MismatchedSiloTransactionError ): siloed_atomic(using=router.db_for_write(Organization)) + + @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode") + def test_fails_if_no_using_provided(self): + with pytest.raises(TransactionMissingDBException): + siloed_atomic() + + @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode") + def test_accepts_control_silo_routing_in_control_silo(self): + with override_settings(SILO_MODE=SiloMode.CONTROL): + siloed_atomic(using=router.db_for_write(OrganizationMapping)) + + @pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode") + def test_accepts_control_silo_routing_in_region_silo(self): + with override_settings(SILO_MODE=SiloMode.REGION): + siloed_atomic(using=router.db_for_write(Organization))