Skip to content

Commit

Permalink
fix(hc): Support split db monolith mode via smarter delegators (#53365)
Browse files Browse the repository at this point in the history
Monolith mode delegators now *also* pick their implementation by
attempting to examine any open transactions and selecting for an
implementation that would align with the currently open tranaction.

This fixes the subtle issue for hybrid cloud services like the
log_service, as well as some helper methods on the organization_service,
that do not invoke RPC, but rather, *pick different models* locally
based on the silo mode. For Monolith mode, this means picking one model
that works on one db, even if being invoked inside the transaction of
the other db, threatening the transactional atomicity that is desired.
(`schedule_signal`, for instance, hopes to leave an outbox behind in the
same transaction as invocation).

The Sentry Options backend *may* also need this, but unclear. Going to
analyze split db tests after this lands and follow up.
  • Loading branch information
corps authored and armenzg committed Jul 24, 2023
1 parent aa1d06b commit d5388e3
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 32 deletions.
61 changes: 60 additions & 1 deletion src/sentry/services/hybrid_cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import pydantic
import sentry_sdk
from django.db import router
from django.db.models import Model
from typing_extensions import Self

from sentry.db.postgres.transactions import in_test_assert_no_transaction
Expand Down Expand Up @@ -226,6 +228,40 @@ def __getattr__(self, item: str) -> Any:
raise KeyError(f"No implementation found for {cur_mode}.")


class DelegatedByOpenTransaction(Generic[ServiceInterface]):
"""
It is possible to run monolith mode in a split database scenario -- in this case, the silo mode does not help
select the correct implementation to ensure non mingled transactions. This helper picks a backing implementation
by checking if an open transaction exists for the routing of the given model for a backend implementation.
If no transactions are open, it uses a given default implementation instead.
"""

_constructors: Mapping[Type[Model], Callable[[], ServiceInterface]]
_default: Callable[[], ServiceInterface]

def __init__(
self,
mapping: Mapping[Type[Model], Callable[[], ServiceInterface]],
default: Callable[[], ServiceInterface],
):
self._constructors = mapping
self._default = default

def __getattr__(self, item: str) -> Any:
from sentry.testutils.hybrid_cloud import simulated_transaction_watermarks

for model, constructor in self._constructors.items():
if (
simulated_transaction_watermarks.connection_transaction_depth_above_watermark(
using=router.db_for_write(model)
)
> 0
):
return getattr(constructor(), item)
return getattr(self._default(), item)


hc_test_stub: Any = threading.local()


Expand Down Expand Up @@ -299,8 +335,31 @@ def silo_mode_delegation(
"""
Simply creates a DelegatedBySiloMode from a mapping object, but casts it as a ServiceInterface matching
the mapping values.
In split database mode, it will also inject DelegatedByOpenTransaction in for the monolith mode implementation.
"""
return cast(ServiceInterface, DelegatedBySiloMode(mapping))

def delegator() -> ServiceInterface:
from sentry.models import Organization, User

return cast(
ServiceInterface,
DelegatedByOpenTransaction(
{
User: mapping[SiloMode.CONTROL],
Organization: mapping[SiloMode.REGION],
},
mapping[SiloMode.MONOLITH],
),
)

# We need to retain a closure around the original mapping passed in, so we'll use a new variable here
final_mapping: Mapping[SiloMode, Callable[[], ServiceInterface]] = {
SiloMode.MONOLITH: delegator,
**({k: v for k, v in mapping.items() if k != SiloMode.MONOLITH}),
}

return cast(ServiceInterface, DelegatedBySiloMode(final_mapping))


def coerce_id_from(m: object | int | None) -> int | None:
Expand Down
59 changes: 28 additions & 31 deletions src/sentry/services/hybrid_cloud/log/impl.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,43 @@
from __future__ import annotations

from django.db import IntegrityError, router
from django.db import IntegrityError

from sentry.db.postgres.transactions import django_test_transaction_water_mark
from sentry.models import AuditLogEntry, OutboxCategory, OutboxScope, RegionOutbox, UserIP
from sentry.services.hybrid_cloud.log import AuditLogEvent, LogService, UserIpEvent
from sentry.utils import metrics


class DatabaseBackedLogService(LogService):
def record_audit_log(self, *, event: AuditLogEvent) -> None:
with django_test_transaction_water_mark(router.db_for_write(RegionOutbox)):
entry = AuditLogEntry.from_event(event)
try:
entry.save()
except IntegrityError as e:
error_message = str(e)
if '"auth_user"' in error_message:
# It is possible that a user existed at the time of serialization but was deleted by the time of consumption
# in which case we follow the database's SET NULL on delete handling.
entry.actor_user_id = None
return self.record_audit_log(event=event)
else:
raise
entry = AuditLogEntry.from_event(event)
try:
entry.save()
except IntegrityError as e:
error_message = str(e)
if '"auth_user"' in error_message:
# It is possible that a user existed at the time of serialization but was deleted by the time of consumption
# in which case we follow the database's SET NULL on delete handling.
entry.actor_user_id = None
return self.record_audit_log(event=event)
else:
raise

def record_user_ip(self, *, event: UserIpEvent) -> None:
with django_test_transaction_water_mark(router.db_for_write(RegionOutbox)):
updated, created = UserIP.objects.create_or_update(
user_id=event.user_id,
ip_address=event.ip_address,
values=dict(
last_seen=event.last_seen,
country_code=event.country_code,
region_code=event.region_code,
),
)
if not created and not updated:
# This happens when there is an integrity error adding the UserIP -- such as when user is deleted,
# or the ip address does not match the db validation. This is expected and not an error condition
# in low quantities.
# TODO: Break the foreign key and simply remove this code path.
metrics.incr("hybrid_cloud.audit_log.user_ip_event.stale_event")
updated, created = UserIP.objects.create_or_update(
user_id=event.user_id,
ip_address=event.ip_address,
values=dict(
last_seen=event.last_seen,
country_code=event.country_code,
region_code=event.region_code,
),
)
if not created and not updated:
# This happens when there is an integrity error adding the UserIP -- such as when user is deleted,
# or the ip address does not match the db validation. This is expected and not an error condition
# in low quantities.
# TODO: Break the foreign key and simply remove this code path.
metrics.incr("hybrid_cloud.audit_log.user_ip_event.stale_event")

def find_last_log(
self, *, organization_id: int | None, target_object_id: int | None, event: int | None
Expand Down
42 changes: 42 additions & 0 deletions tests/sentry/db/test_transactions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from typing import Any

import pytest
from django.db import IntegrityError, router, transaction
from django.test import override_settings

from sentry.conf.server import env
from sentry.db.postgres.transactions import (
Expand All @@ -8,6 +11,8 @@
in_test_hide_transaction_boundary,
)
from sentry.models import Organization, User, outbox_context
from sentry.services.hybrid_cloud import silo_mode_delegation
from sentry.silo import SiloMode
from sentry.testutils import TestCase, TransactionTestCase
from sentry.testutils.factories import Factories
from sentry.testutils.hybrid_cloud import collect_transaction_queries
Expand Down Expand Up @@ -154,3 +159,40 @@ def test_bad_transaction_boundaries(self):
@django_db_all
def test_collect_transaction_queries(self):
super().test_collect_transaction_queries()


class FakeControlService:
def a(self) -> int:
return 1


class FakeRegionService:
def a(self) -> int:
return 2


@no_silo_test(stable=True)
class TestDelegatedByOpenTransaction(TestCase):
def test_selects_mode_in_transaction_or_default(self):
service: Any = silo_mode_delegation(
{
SiloMode.CONTROL: lambda: FakeControlService(),
SiloMode.REGION: lambda: FakeRegionService(),
SiloMode.MONOLITH: lambda: FakeRegionService(),
}
)

with override_settings(SILO_MODE=SiloMode.CONTROL):
assert service.a() == FakeControlService().a()
with transaction.atomic(router.db_for_write(User)):
assert service.a() == FakeControlService().a()

with override_settings(SILO_MODE=SiloMode.REGION):
assert service.a() == FakeRegionService().a()
with transaction.atomic(router.db_for_write(Organization)):
assert service.a() == FakeRegionService().a()

with override_settings(SILO_MODE=SiloMode.MONOLITH):
assert service.a() == FakeRegionService().a()
with transaction.atomic(router.db_for_write(User)):
assert service.a() == FakeControlService().a()

0 comments on commit d5388e3

Please sign in to comment.