Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(hybrid-cloud): Removes transaction silo routing from patch code #53080

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/sentry/models/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
sane_repr,
)
from sentry.db.models.utils import slugify_instance
from sentry.db.postgres.transactions import in_test_hide_transaction_boundary
from sentry.locks import locks
from sentry.models.options.option import OptionMixin
from sentry.models.organizationmember import OrganizationMember
Expand Down Expand Up @@ -323,7 +324,8 @@ def get_owners(self) -> Sequence[RpcUser]:
"user_id", flat=True
)

return user_service.get_many(filter={"user_ids": list(owners)})
with in_test_hide_transaction_boundary():
return user_service.get_many(filter={"user_ids": list(owners)})

def get_default_owner(self) -> RpcUser:
if not hasattr(self, "_default_owner"):
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/services/hybrid_cloud/auth/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def authenticate(self, *, request: AuthenticationRequest) -> MiddlewareAuthentic
elif fake_request.user is not None and not fake_request.user.is_anonymous:
with transaction.atomic(using=router.db_for_read(User)):
result.user = self._load_auth_user(fake_request.user)
transaction.set_rollback(True)
transaction.set_rollback(True, using=router.db_for_read(User))
if SiloMode.single_process_silo_mode():
connections.close_all()

Expand Down
23 changes: 13 additions & 10 deletions src/sentry/silo/patches/silo_aware_transaction_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class MismatchedSiloTransactionError(Exception):
pass


class TransactionMissingDBException(Exception):
pass


def _get_db_for_model_if_available(model: Type["Model"]) -> Optional[str]:
from sentry.db.router import SiloConnectionUnavailableError

Expand All @@ -28,36 +32,36 @@ def _get_db_for_model_if_available(model: Type["Model"]) -> Optional[str]:
def siloed_atomic(
using: Optional[str] = None, savepoint: bool = True, durable: bool = False
) -> Atomic:
using = determine_using_by_silo_mode(using)
validate_transaction_using_for_silo_mode(using)
return _default_atomic_impl(using=using, savepoint=savepoint, durable=durable)


def siloed_get_connection(using: Optional[str] = None) -> BaseDatabaseWrapper:
using = determine_using_by_silo_mode(using)
validate_transaction_using_for_silo_mode(using)
return _default_get_connection(using=using)


def siloed_on_commit(func: Callable[..., Any], using: Optional[str] = None) -> None:
using = determine_using_by_silo_mode(using)
validate_transaction_using_for_silo_mode(using)
return _default_on_commit(func, using)


def determine_using_by_silo_mode(using: Optional[str]) -> str:
def validate_transaction_using_for_silo_mode(using: Optional[str]):
from sentry.models import ControlOutbox, RegionOutbox
from sentry.silo import SiloMode

if using is None:
raise TransactionMissingDBException("'using' must be specified when creating a transaction")

current_silo_mode = SiloMode.get_current_mode()
control_db = _get_db_for_model_if_available(ControlOutbox)
region_db = _get_db_for_model_if_available(RegionOutbox)

if not using:
using = region_db if current_silo_mode == SiloMode.REGION else control_db
assert using

both_silos_route_to_same_db = control_db == region_db

if both_silos_route_to_same_db or current_silo_mode == SiloMode.MONOLITH:
pass
return

elif using == control_db and current_silo_mode != SiloMode.CONTROL:
raise MismatchedSiloTransactionError(
f"Cannot use transaction.atomic({using}) except in Control Mode"
Expand All @@ -67,7 +71,6 @@ def determine_using_by_silo_mode(using: Optional[str]) -> str:
raise MismatchedSiloTransactionError(
f"Cannot use transaction.atomic({using}) except in Region Mode"
)
return using


def patch_silo_aware_atomic():
Expand Down
9 changes: 6 additions & 3 deletions src/sentry/silo/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

from django.db.transaction import get_connection

from sentry.silo.patches.silo_aware_transaction_patch import determine_using_by_silo_mode
from sentry.silo.patches.silo_aware_transaction_patch import (
validate_transaction_using_for_silo_mode,
)

_fence_re = re.compile(r"select\s*\'(?P<operation>start|end)_role_override", re.IGNORECASE)
_fencing_counters: MutableMapping[str, int] = defaultdict(int)
Expand All @@ -19,7 +21,7 @@ def match_fence_query(query: str) -> Optional[re.Match[str]]:


@contextlib.contextmanager
def unguarded_write(using: str | None = None, *args: Any, **kwargs: Any):
def unguarded_write(using: str, *args: Any, **kwargs: Any):
"""
Used to indicate that the wrapped block is safe to do
mutations on outbox backed records.
Expand All @@ -37,7 +39,8 @@ def unguarded_write(using: str | None = None, *args: Any, **kwargs: Any):
yield
return

using = determine_using_by_silo_mode(using)
validate_transaction_using_for_silo_mode(using)

_fencing_counters[using] += 1

with get_connection(using).cursor() as conn:
Expand Down
8 changes: 3 additions & 5 deletions src/sentry/testutils/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from django.utils.text import slugify

from sentry.constants import SentryAppInstallationStatus, SentryAppStatus
from sentry.db.postgres.transactions import in_test_hide_transaction_boundary
from sentry.event_manager import EventManager
from sentry.incidents.logic import (
create_alert_rule,
Expand Down Expand Up @@ -377,10 +376,9 @@ def create_project(organization=None, teams=None, fire_project_created=False, **
for team in teams:
project.add_team(team)
if fire_project_created:
with in_test_hide_transaction_boundary():
project_created.send(
project=project, user=AnonymousUser(), default_rules=True, sender=Factories
)
project_created.send(
project=project, user=AnonymousUser(), default_rules=True, sender=Factories
)
return project

@staticmethod
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/testutils/hybrid_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from django.db import connections, transaction
from django.db.backends.base.base import BaseDatabaseWrapper

from sentry.db.postgres.transactions import in_test_transaction_enforcement
from sentry.models.organizationmember import OrganizationMember
from sentry.models.organizationmembermapping import OrganizationMemberMapping
from sentry.services.hybrid_cloud import DelegatedBySiloMode, hc_test_stub
Expand Down Expand Up @@ -182,6 +183,9 @@ def __init__(self, alias: str):
self.alias = alias

def __call__(self, execute: Callable[..., Any], *params: Any) -> Any:
if not in_test_transaction_enforcement.enabled:
return execute(*params)

open_transactions = simulated_transaction_watermarks.connections_above_watermark()
# If you are hitting this, it means you have two open transactions working in differing databases at the same
# time. This is problematic in general for a variety of reasons -- it will never be possible to atomically
Expand Down
4 changes: 2 additions & 2 deletions src/sentry/web/frontend/unsubscribe_notifications.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import abc

from django.db import transaction
from django.db import router, transaction
from django.http import Http404, HttpResponse, HttpResponseRedirect
from django.utils.decorators import method_decorator
from django.views.decorators.cache import never_cache
Expand All @@ -19,7 +19,7 @@ class UnsubscribeBaseView(BaseView, metaclass=abc.ABCMeta):
@never_cache
@signed_auth_required_m
def handle(self, request: Request, **kwargs) -> HttpResponse:
with transaction.atomic():
with transaction.atomic(router.db_for_write(OrganizationMember)):
if not getattr(request, "user_from_signed_request", False):
raise Http404

Expand Down
45 changes: 20 additions & 25 deletions tests/sentry/silo/test_silo_aware_transaction_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from sentry.silo import SiloMode
from sentry.silo.patches.silo_aware_transaction_patch import (
MismatchedSiloTransactionError,
TransactionMissingDBException,
siloed_atomic,
)
from sentry.testutils import TestCase
Expand All @@ -18,47 +19,41 @@ def is_running_in_split_db_mode() -> bool:


class TestSiloAwareTransactionPatchInSingleDbMode(TestCase):
@pytest.mark.skipif(is_running_in_split_db_mode(), reason="only runs in single db mode")
def test_routes_to_correct_db_in_control_silo(self):
with override_settings(SILO_MODE=SiloMode.CONTROL):
transaction_in_test = siloed_atomic()
assert transaction_in_test.using == "default"

@pytest.mark.skipif(is_running_in_split_db_mode(), reason="only runs in single db mode")
def test_routes_to_correct_db_in_region_silo(self):

with override_settings(SILO_MODE=SiloMode.REGION):
transaction_in_test = siloed_atomic()
assert transaction_in_test.using == "default"

def test_correctly_accepts_using_for_atomic(self):
transaction_in_test = siloed_atomic(using="foobar")
assert transaction_in_test.using == "foobar"

def test_accepts_cross_silo_atomics_in_monolith_mode(self):
siloed_atomic(using=router.db_for_write(Organization))
siloed_atomic(using=router.db_for_write(OrganizationMapping))

class TestSiloAwareTransactionPatchInSplitDbMode(TestCase):
@pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
def test_routes_to_correct_db_in_control_silo(self):
with override_settings(SILO_MODE=SiloMode.REGION):
transaction_in_test = siloed_atomic()
assert transaction_in_test.using == "default"

class TestSiloAwareTransactionPatchInSplitDbMode(TestCase):
@pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
def test_fails_if_silo_mismatch_with_using_in_region_silo(self):
with override_settings(SILO_MODE=SiloMode.REGION), pytest.raises(
MismatchedSiloTransactionError
):
siloed_atomic(using=router.db_for_write(OrganizationMapping))

@pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
def test_routes_to_correct_db_in_region_silo(self):
with override_settings(SILO_MODE=SiloMode.CONTROL):
transaction_in_test = siloed_atomic()
assert transaction_in_test.using == "control"

@pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
def test_fails_if_silo_mismatch_with_using_in_control_silo(self):
with override_settings(SILO_MODE=SiloMode.CONTROL), pytest.raises(
MismatchedSiloTransactionError
):
siloed_atomic(using=router.db_for_write(Organization))

@pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
def test_fails_if_no_using_provided(self):
with pytest.raises(TransactionMissingDBException):
siloed_atomic()

@pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
def test_accepts_control_silo_routing_in_control_silo(self):
with override_settings(SILO_MODE=SiloMode.CONTROL):
siloed_atomic(using=router.db_for_write(OrganizationMapping))

@pytest.mark.skipif(not is_running_in_split_db_mode(), reason="only runs in split db mode")
def test_accepts_control_silo_routing_in_region_silo(self):
with override_settings(SILO_MODE=SiloMode.REGION):
siloed_atomic(using=router.db_for_write(Organization))