diff --git a/fixtures/backup/model_dependencies/detailed.json b/fixtures/backup/model_dependencies/detailed.json index b7c16cbbb8bb0b..85f0d35084596d 100644 --- a/fixtures/backup/model_dependencies/detailed.json +++ b/fixtures/backup/model_dependencies/detailed.json @@ -5099,6 +5099,10 @@ [ "file", "relocation" + ], + [ + "kind", + "relocation" ] ] }, diff --git a/migrations_lockfile.txt b/migrations_lockfile.txt index 41e71a776cc07c..f79b4f0b7449cf 100644 --- a/migrations_lockfile.txt +++ b/migrations_lockfile.txt @@ -10,6 +10,6 @@ hybridcloud: 0016_add_control_cacheversion nodestore: 0002_nodestore_no_dictfield remote_subscriptions: 0003_drop_remote_subscription replays: 0004_index_together -sentry: 0739_backfill_group_info_to_group_attributes +sentry: 0740_one_relocation_file_kind_per_relocation social_auth: 0002_default_auto_field uptime: 0004_projectuptimesubscription_mode diff --git a/src/sentry/api/endpoints/relocations/index.py b/src/sentry/api/endpoints/relocations/index.py index f7f790c61162c7..33e286da3bf0b4 100644 --- a/src/sentry/api/endpoints/relocations/index.py +++ b/src/sentry/api/endpoints/relocations/index.py @@ -29,7 +29,7 @@ from sentry.search.utils import tokenize_query from sentry.signals import relocation_link_promo_code from sentry.slug.patterns import ORG_SLUG_PATTERN -from sentry.tasks.relocation import uploading_complete +from sentry.tasks.relocation import uploading_start from sentry.users.services.user.model import RpcUser from sentry.users.services.user.service import user_service from sentry.utils.db import atomic_transaction @@ -277,7 +277,7 @@ def post(self, request: Request) -> Response: relocation_link_promo_code.send_robust( relocation_uuid=relocation.uuid, promo_code=promo_code, sender=self.__class__ ) - uploading_complete.delay(relocation.uuid) + uploading_start.delay(relocation.uuid) try: analytics.record( "relocation.created", diff --git a/src/sentry/api/endpoints/relocations/retry.py b/src/sentry/api/endpoints/relocations/retry.py index f76eee062d2014..d452e1dc6e29e4 100644 --- a/src/sentry/api/endpoints/relocations/retry.py +++ b/src/sentry/api/endpoints/relocations/retry.py @@ -22,7 +22,7 @@ from sentry.models.files.file import File from sentry.models.relocation import Relocation, RelocationFile from sentry.signals import relocation_retry_link_promo_code -from sentry.tasks.relocation import uploading_complete +from sentry.tasks.relocation import uploading_start from sentry.users.services.user.service import user_service from sentry.utils.db import atomic_transaction @@ -125,7 +125,7 @@ def post(self, request: Request, relocation_uuid: str) -> Response: kind=RelocationFile.Kind.RAW_USER_DATA.value, ) - uploading_complete.delay(new_relocation.uuid) + uploading_start.delay(new_relocation.uuid) try: analytics.record( "relocation.created", diff --git a/src/sentry/migrations/0740_one_relocation_file_kind_per_relocation.py b/src/sentry/migrations/0740_one_relocation_file_kind_per_relocation.py new file mode 100644 index 00000000000000..3dbac2f24130fb --- /dev/null +++ b/src/sentry/migrations/0740_one_relocation_file_kind_per_relocation.py @@ -0,0 +1,32 @@ +# Generated by Django 5.0.6 on 2024-07-08 22:33 + +from django.db import migrations + +from sentry.new_migrations.migrations import CheckedMigration + + +class Migration(CheckedMigration): + # This flag is used to mark that a migration shouldn't be automatically run in production. + # This should only be used for operations where it's safe to run the migration after your + # code has deployed. So this should not be used for most operations that alter the schema + # of a table. + # Here are some things that make sense to mark as post deployment: + # - Large data migrations. Typically we want these to be run manually so that they can be + # monitored and not block the deploy for a long period of time while they run. + # - Adding indexes to large tables. Since this can take a long time, we'd generally prefer to + # run this outside deployments so that we don't block them. Note that while adding an index + # is a schema change, it's completely safe to run the operation after the code has deployed. + # Once deployed, run these manually via: https://develop.sentry.dev/database-migrations/#migration-deployment + + is_post_deployment = False + + dependencies = [ + ("sentry", "0739_backfill_group_info_to_group_attributes"), + ] + + operations = [ + migrations.AlterUniqueTogether( + name="relocationfile", + unique_together={("relocation", "file"), ("relocation", "kind")}, + ), + ] diff --git a/src/sentry/models/outbox.py b/src/sentry/models/outbox.py index 92e70e0ae321d4..ec63973369f3b2 100644 --- a/src/sentry/models/outbox.py +++ b/src/sentry/models/outbox.py @@ -97,6 +97,9 @@ class OutboxCategory(IntEnum): ISSUE_COMMENT_UPDATE = 34 EXTERNAL_ACTOR_UPDATE = 35 + RELOCATION_EXPORT_REQUEST = 36 + RELOCATION_EXPORT_REPLY = 37 + @classmethod def as_choices(cls): return [(i.value, i.value) for i in cls] @@ -343,6 +346,9 @@ class OutboxScope(IntEnum): }, ) SUBSCRIPTION_SCOPE = scope_categories(9, {OutboxCategory.SUBSCRIPTION_UPDATE}) + RELOCATION_SCOPE = scope_categories( + 10, {OutboxCategory.RELOCATION_EXPORT_REQUEST, OutboxCategory.RELOCATION_EXPORT_REPLY} + ) def __str__(self): return self.name diff --git a/src/sentry/models/relocation.py b/src/sentry/models/relocation.py index 56380fb10d5684..a66a229f85f1d7 100644 --- a/src/sentry/models/relocation.py +++ b/src/sentry/models/relocation.py @@ -201,18 +201,18 @@ class Kind(Enum): # # TODO(getsentry/team-ospo#216): Add a normalization step to the relocation flow NORMALIZED_USER_DATA = 2 - # (Deprecated) The global configuration we're going to validate against - pulled from the - # live Sentry instance, not supplied by the user. + # The global configuration we're going to validate against - pulled from the live Sentry + # instance, not supplied by the user. # - # TODO(getsentry/team-ospo#216): Deprecated, since we no longer store these in main bucket. - # Remove in the future. + # Note: These files are only ever stored in the relocation-specific GCP bucket, never in the + # main filestore, so in practice no DB entry should have this value set. BASELINE_CONFIG_VALIDATION_DATA = 3 # (Deprecated) The colliding users we're going to validate against - pulled from the live # Sentry instance, not supplied by the user. However, to determine what is a "colliding # user", we must inspect the user-provided data. # - # TODO(getsentry/team-ospo#216): Deprecated, since we no longer store these in main bucket. - # Remove in the future. + # Note: These files are only ever stored in the relocation-specific GCP bucket, never in the + # main filestore, so in practice no DB entry should have this value set. COLLIDING_USERS_VALIDATION_DATA = 4 # TODO(getsentry/team-ospo#190): Could we dedup this with a mixin in the future? @@ -242,7 +242,7 @@ def to_filename(self, ext: str): __repr__ = sane_repr("relocation", "file") class Meta: - unique_together = (("relocation", "file"),) + unique_together = (("relocation", "file"), ("relocation", "kind")) app_label = "sentry" db_table = "sentry_relocationfile" diff --git a/src/sentry/receivers/outbox/control.py b/src/sentry/receivers/outbox/control.py index ccd0604c88b545..567466a850dc88 100644 --- a/src/sentry/receivers/outbox/control.py +++ b/src/sentry/receivers/outbox/control.py @@ -5,6 +5,7 @@ are drained. Receivers are expected to make local state changes (tombstones) and perform RPC calls to propagate changes to relevant region(s). """ + from __future__ import annotations import logging @@ -17,6 +18,7 @@ from sentry.hybridcloud.rpc.caching import region_caching_service from sentry.issues.services.issue import issue_service from sentry.models.apiapplication import ApiApplication +from sentry.models.files.utils import get_relocation_storage from sentry.models.integrations.integration import Integration from sentry.models.integrations.sentry_app import SentryApp from sentry.models.integrations.sentry_app_installation import SentryAppInstallation @@ -24,6 +26,7 @@ from sentry.models.outbox import OutboxCategory, process_control_outbox from sentry.organizations.services.organization import RpcOrganizationSignal, organization_service from sentry.receivers.outbox import maybe_process_tombstone +from sentry.relocation.services.relocation_export.service import region_relocation_export_service from sentry.sentry_apps.services.app.service import get_by_application_id, get_installation logger = logging.getLogger(__name__) @@ -127,3 +130,47 @@ def process_issue_email_reply(shard_identifier: int, payload: Any, **kwds): from_email=payload["from_email"], text=payload["text"], ) + + +# See the comment on /src/sentry/tasks/relocation.py::uploading_start for a detailed description of +# how this outbox drain handler fits into the entire SAAS->SAAS relocation workflow. +@receiver(process_control_outbox, sender=OutboxCategory.RELOCATION_EXPORT_REQUEST) +def process_relocation_request_new_export(payload: Mapping[str, Any], **kwds): + encrypt_with_public_key = ( + payload["encrypt_with_public_key"].encode("utf-8") + if isinstance(payload["encrypt_with_public_key"], str) + else payload["encrypt_with_public_key"] + ) + region_relocation_export_service.request_new_export( + relocation_uuid=payload["relocation_uuid"], + requesting_region_name=payload["requesting_region_name"], + replying_region_name=payload["replying_region_name"], + org_slug=payload["org_slug"], + encrypt_with_public_key=encrypt_with_public_key, + ) + + +# See the comment on /src/sentry/tasks/relocation.py::uploading_start for a detailed description of +# how this outbox drain handler fits into the entire SAAS->SAAS relocation workflow. +@receiver(process_control_outbox, sender=OutboxCategory.RELOCATION_EXPORT_REPLY) +def process_relocation_reply_with_export(payload: Mapping[str, Any], **kwds): + # We expect the `ProxyRelocationExportService::reply_with_export` implementation to have written + # the export data to the control silo's local relocation-specific GCS bucket. Here, we just read + # it into memory and attempt the RPC call back to the requesting region. + uuid = payload["relocation_uuid"] + slug = payload["org_slug"] + relocation_storage = get_relocation_storage() + path = f"runs/{uuid}/saas_to_saas_export/{slug}.tar" + try: + encrypted_contents = relocation_storage.open(path) + except Exception: + raise FileNotFoundError("Could not open SaaS -> SaaS export in proxy relocation bucket.") + + with encrypted_contents: + region_relocation_export_service.reply_with_export( + relocation_uuid=payload["relocation_uuid"], + requesting_region_name=payload["requesting_region_name"], + replying_region_name=payload["replying_region_name"], + org_slug=payload["org_slug"], + encrypted_contents=encrypted_contents.read(), + ) diff --git a/src/sentry/receivers/outbox/region.py b/src/sentry/receivers/outbox/region.py index 5044ffa2370de4..d3db0173d02438 100644 --- a/src/sentry/receivers/outbox/region.py +++ b/src/sentry/receivers/outbox/region.py @@ -5,6 +5,7 @@ are drained. Receivers are expected to make local state changes (tombstones) and perform RPC calls to propagate changes to Control Silo. """ + from __future__ import annotations from typing import Any @@ -20,10 +21,12 @@ update_organization_mapping_from_instance, ) from sentry.models.authproviderreplica import AuthProviderReplica +from sentry.models.files.utils import get_relocation_storage from sentry.models.organization import Organization from sentry.models.outbox import OutboxCategory, process_region_outbox from sentry.models.project import Project from sentry.receivers.outbox import maybe_process_tombstone +from sentry.relocation.services.relocation_export.service import control_relocation_export_service from sentry.types.region import get_local_region @@ -71,3 +74,28 @@ def process_disable_auth_provider(object_identifier: int, shard_identifier: int, # Deprecated auth_service.disable_provider(provider_id=object_identifier) AuthProviderReplica.objects.filter(auth_provider_id=object_identifier).delete() + + +# See the comment on /src/sentry/tasks/relocation.py::uploading_start for a detailed description of +# how this outbox drain handler fits into the entire SAAS->SAAS relocation workflow. +@receiver(process_region_outbox, sender=OutboxCategory.RELOCATION_EXPORT_REPLY) +def process_relocation_reply_with_export(payload: Any, **kwds): + uuid = payload["relocation_uuid"] + slug = payload["org_slug"] + relocation_storage = get_relocation_storage() + path = f"runs/{uuid}/saas_to_saas_export/{slug}.tar" + try: + encrypted_contents = relocation_storage.open(path) + except Exception: + raise FileNotFoundError( + "Could not open SaaS -> SaaS export in export-side relocation bucket." + ) + + with encrypted_contents: + control_relocation_export_service.reply_with_export( + relocation_uuid=uuid, + requesting_region_name=payload["requesting_region_name"], + replying_region_name=payload["replying_region_name"], + org_slug=slug, + encrypted_contents=encrypted_contents.read(), + ) diff --git a/src/sentry/relocation/services/relocation_export/__init__.py b/src/sentry/relocation/services/relocation_export/__init__.py new file mode 100644 index 00000000000000..2a9746c30ef42c --- /dev/null +++ b/src/sentry/relocation/services/relocation_export/__init__.py @@ -0,0 +1,2 @@ +from .model import * # noqa +from .service import * # noqa diff --git a/src/sentry/relocation/services/relocation_export/impl.py b/src/sentry/relocation/services/relocation_export/impl.py new file mode 100644 index 00000000000000..da99753fdb8fb4 --- /dev/null +++ b/src/sentry/relocation/services/relocation_export/impl.py @@ -0,0 +1,205 @@ +# Please do not use +# from __future__ import annotations +# in modules such as this one where hybrid cloud data models or service classes are +# defined, because we want to reflect on type annotations and avoid forward references. + +import logging +from datetime import UTC, datetime +from io import BytesIO +from uuid import UUID + +from django.db import router +from sentry_sdk import capture_exception + +from sentry.models.files.file import File +from sentry.models.files.utils import get_relocation_storage +from sentry.models.outbox import ControlOutbox, OutboxCategory, OutboxScope +from sentry.models.relocation import Relocation, RelocationFile +from sentry.relocation.services.relocation_export.model import ( + RelocationExportReplyWithExportParameters, + RelocationExportRequestNewExportParameters, +) +from sentry.relocation.services.relocation_export.service import ( + ControlRelocationExportService, + RegionRelocationExportService, +) +from sentry.utils.db import atomic_transaction +from sentry.utils.relocation import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE, uuid_to_identifier + +logger = logging.getLogger(__name__) + + +class DBBackedRelocationExportService(RegionRelocationExportService): + def request_new_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypt_with_public_key: bytes, + ) -> None: + from sentry.tasks.relocation import fulfill_cross_region_export_request + + logger_data = { + "uuid": relocation_uuid, + "requesting_region_name": requesting_region_name, + "replying_region_name": replying_region_name, + "org_slug": org_slug, + "encrypted_contents_size": len(encrypt_with_public_key), + } + logger.info("SaaS -> SaaS request received in exporting region", extra=logger_data) + + # This task will do the actual work of performing the export and saving it to this regions + # "relocation" GCS bucket. It is annotated with the appropriate retry, back-off etc logic + # for robustness' sake. The last action performed by this task is to call an instance of + # `ControlRelocationExportService.reply_with_export` via a manually-scheduled + # `RegionOutbox`, which will handle the task of asynchronously delivering the encrypted, + # newly-exported bytes. + fulfill_cross_region_export_request.apply_async( + args=[ + relocation_uuid, + requesting_region_name, + replying_region_name, + org_slug, + encrypt_with_public_key, + int(round(datetime.now(tz=UTC).timestamp())), + ] + ) + logger.info("SaaS -> SaaS exporting task scheduled", extra=logger_data) + + def reply_with_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypted_contents: bytes, + ) -> None: + from sentry.tasks.relocation import uploading_complete + + with atomic_transaction( + using=( + router.db_for_write(Relocation), + router.db_for_write(RelocationFile), + router.db_for_write(File), + ) + ): + logger_data = { + "uuid": relocation_uuid, + "requesting_region_name": requesting_region_name, + "replying_region_name": replying_region_name, + "org_slug": org_slug, + "encrypted_contents_size": len(encrypted_contents), + } + logger.info("SaaS -> SaaS reply received in triggering region", extra=logger_data) + + try: + relocation: Relocation = Relocation.objects.get(uuid=relocation_uuid) + except Relocation.DoesNotExist as e: + logger.exception("Could not locate Relocation model by UUID: %s", relocation_uuid) + capture_exception(e) + return + + fp = BytesIO(encrypted_contents) + file = File.objects.create(name="raw-relocation-data.tar", type=RELOCATION_FILE_TYPE) + file.putfile(fp, blob_size=RELOCATION_BLOB_SIZE, logger=logger) + logger.info("SaaS -> SaaS relocation underlying File created", extra=logger_data) + + # This write ensures that the entire chain triggered by `uploading_start` remains + # idempotent, since only one (relocation_uuid, relocation_file_kind) pairing can exist + # in that database's table at a time. If we try to write a second, it will fail due to + # that unique constraint. + RelocationFile.objects.create( + relocation=relocation, + file=file, + kind=RelocationFile.Kind.RAW_USER_DATA.value, + ) + logger.info("SaaS -> SaaS relocation RelocationFile saved", extra=logger_data) + + uploading_complete.apply_async(args=[relocation.uuid]) + logger.info("SaaS -> SaaS relocation next task scheduled", extra=logger_data) + + +class ProxyingRelocationExportService(ControlRelocationExportService): + def request_new_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypt_with_public_key: bytes, + ) -> None: + logger_data = { + "uuid": relocation_uuid, + "requesting_region_name": requesting_region_name, + "replying_region_name": replying_region_name, + "org_slug": org_slug, + "encrypt_with_public_key_size": len(encrypt_with_public_key), + } + logger.info("SaaS -> SaaS request received on proxy", extra=logger_data) + + # Saving this outbox "proxies" the request to the correct region. + identifier = uuid_to_identifier(UUID(relocation_uuid)) + payload = RelocationExportRequestNewExportParameters( + relocation_uuid=relocation_uuid, + requesting_region_name=requesting_region_name, + replying_region_name=replying_region_name, + org_slug=org_slug, + encrypt_with_public_key=encrypt_with_public_key, + ).dict() + ControlOutbox( + region_name=replying_region_name, + shard_scope=OutboxScope.RELOCATION_SCOPE, + category=OutboxCategory.RELOCATION_EXPORT_REQUEST, + shard_identifier=identifier, + object_identifier=identifier, + payload=payload, + ).save() + logger.info("SaaS -> SaaS request proxy outbox saved", extra=logger_data) + + def reply_with_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypted_contents: bytes, + ) -> None: + logger_data = { + "uuid": relocation_uuid, + "requesting_region_name": requesting_region_name, + "replying_region_name": replying_region_name, + "org_slug": org_slug, + "encrypt_with_public_key_size": len(encrypted_contents), + } + logger.info("SaaS -> SaaS reply received on proxy", extra=logger_data) + + # Save the payload into the control silo's "relocation" GCS bucket. This bucket is only used + # for temporary storage of `encrypted_contents` being shuffled between regions like this. + path = f"runs/{relocation_uuid}/saas_to_saas_export/{org_slug}.tar" + relocation_storage = get_relocation_storage() + fp = BytesIO(encrypted_contents) + relocation_storage.save(path, fp) + logger.info("SaaS -> SaaS export contents retrieved", extra=logger_data) + + # Saving this outbox "proxies" the reply to the correct region. + identifier = uuid_to_identifier(UUID(relocation_uuid)) + payload = RelocationExportReplyWithExportParameters( + relocation_uuid=relocation_uuid, + requesting_region_name=requesting_region_name, + replying_region_name=replying_region_name, + org_slug=org_slug, + ).dict() + ControlOutbox( + region_name=requesting_region_name, + shard_scope=OutboxScope.RELOCATION_SCOPE, + category=OutboxCategory.RELOCATION_EXPORT_REPLY, + shard_identifier=identifier, + object_identifier=identifier, + payload=payload, + ).save() + logger.info("SaaS -> SaaS reply proxy outbox saved", extra=logger_data) diff --git a/src/sentry/relocation/services/relocation_export/model.py b/src/sentry/relocation/services/relocation_export/model.py new file mode 100644 index 00000000000000..0c2102f381a429 --- /dev/null +++ b/src/sentry/relocation/services/relocation_export/model.py @@ -0,0 +1,17 @@ +import pydantic + + +class RelocationExportRequestNewExportParameters(pydantic.BaseModel): + relocation_uuid: str + requesting_region_name: str + replying_region_name: str + org_slug: str + encrypt_with_public_key: bytes + + +class RelocationExportReplyWithExportParameters(pydantic.BaseModel): + relocation_uuid: str + requesting_region_name: str + replying_region_name: str + org_slug: str + # encrypted_contents excluded, as receivers are expected to manually read them from filestore. diff --git a/src/sentry/relocation/services/relocation_export/service.py b/src/sentry/relocation/services/relocation_export/service.py new file mode 100644 index 00000000000000..a4c713f8f9a2d3 --- /dev/null +++ b/src/sentry/relocation/services/relocation_export/service.py @@ -0,0 +1,144 @@ +# Please do not use +# from __future__ import annotations +# in modules such as this one where hybrid cloud data models or service classes are +# defined, because we want to reflect on type annotations and avoid forward references. + +from abc import abstractmethod +from dataclasses import dataclass + +from sentry.hybridcloud.rpc.resolvers import ByRegionName +from sentry.hybridcloud.rpc.service import RpcService, regional_rpc_method, rpc_method +from sentry.silo.base import SiloMode + + +@dataclass(frozen=True) +class ByRequestingRegionName(ByRegionName): + parameter_name: str = "requesting_region_name" + + +@dataclass(frozen=True) +class ByReplyingRegionName(ByRegionName): + parameter_name: str = "replying_region_name" + + +# See the comment on /src/sentry/tasks/relocation.py::uploading_start for a detailed description of +# how this service fits into the entire SAAS->SAAS relocation workflow. +class RegionRelocationExportService(RpcService): + """ + Service that implements asynchronous relocation export request and reply methods. The request + method is sent by the requesting region to the exporting region, with + `ControlRelocationExportService` acting as a middleman proxy. + """ + + key = "region_relocation_export" + local_mode = SiloMode.REGION + + @classmethod + def get_local_implementation(cls) -> RpcService: + from sentry.relocation.services.relocation_export.impl import ( + DBBackedRelocationExportService, + ) + + return DBBackedRelocationExportService() + + @regional_rpc_method(resolve=ByReplyingRegionName()) + @abstractmethod + def request_new_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypt_with_public_key: bytes, + ) -> None: + """ + This helper method exists to facilitate calling `export_in_organization_scope` from one + region to another. It performs the `export_in_organization_scope` call in the target region, + but instead of merely writing the output to a file locally, it takes the bytes of the + resulting tarball and (asynchronously, via a `RegionOutbox` handler that calls + `reply_with_export`) sends them back over the wire using the `reply_with_export` method. + + This method always produces encrypted exports, so the caller must supply the correct public + key in string form. + """ + pass + + @regional_rpc_method(resolve=ByRequestingRegionName()) + @abstractmethod + def reply_with_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypted_contents: bytes, + ) -> None: + """ + This method is responsible for asynchronously sending an already generated and locally-saved + export tarball back from the exporting region to the region that requested the export. + """ + pass + + +# See the comment on /src/sentry/tasks/relocation.py::uploading_start for a detailed description of +# how this service fits into the entire SAAS->SAAS relocation workflow. +class ControlRelocationExportService(RpcService): + """ + Service that proxies asynchronous relocation export request and reply methods. The requesting + and exporting region use this service as a middleman to enable inter-region communication. The + actual export logic is contained in the `RegionRelocationExportService` that this proxy serves + to connect. + """ + + key = "control_relocation_export" + local_mode = SiloMode.CONTROL + + @classmethod + def get_local_implementation(cls) -> RpcService: + from sentry.relocation.services.relocation_export.impl import ( + ProxyingRelocationExportService, + ) + + return ProxyingRelocationExportService() + + @rpc_method + @abstractmethod + def request_new_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypt_with_public_key: bytes, + ) -> None: + """ + This helper method is a proxy handler for the `request_new_export` method, durably + forwarding it from the requesting region to the exporting region by writing a retryable + `ControlOutbox` entry. + """ + pass + + @rpc_method + @abstractmethod + def reply_with_export( + self, + *, + relocation_uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypted_contents: bytes, + ) -> None: + """ + This helper method is a proxy handler for the `reply_with_export` method, durably forwarding + it from the requesting region to the exporting region by writing a retryable `ControlOutbox` + entry. + """ + pass + + +region_relocation_export_service = RegionRelocationExportService.create_delegation() +control_relocation_export_service = ControlRelocationExportService.create_delegation() diff --git a/src/sentry/tasks/relocation.py b/src/sentry/tasks/relocation.py index 865b23addf5a29..c1ebc6853ec9cc 100644 --- a/src/sentry/tasks/relocation.py +++ b/src/sentry/tasks/relocation.py @@ -8,6 +8,7 @@ from io import BytesIO from string import Template from typing import Any +from uuid import UUID from zipfile import ZipFile import yaml @@ -25,11 +26,16 @@ from sentry.backup.crypto import ( GCPKMSDecryptor, GCPKMSEncryptor, + LocalFileEncryptor, get_default_crypto_key_version, unwrap_encrypted_export_tarball, ) from sentry.backup.dependencies import NormalizedModelName, get_model -from sentry.backup.exports import export_in_config_scope, export_in_user_scope +from sentry.backup.exports import ( + export_in_config_scope, + export_in_organization_scope, + export_in_user_scope, +) from sentry.backup.helpers import ImportFlags from sentry.backup.imports import import_in_organization_scope from sentry.models.files.file import File @@ -38,6 +44,7 @@ from sentry.models.lostpasswordhash import LostPasswordHash as LostPasswordHash from sentry.models.organization import Organization from sentry.models.organizationmember import OrganizationMember +from sentry.models.outbox import OutboxCategory, OutboxScope, RegionOutbox from sentry.models.relocation import ( Relocation, RelocationFile, @@ -47,9 +54,14 @@ ) from sentry.models.user import User from sentry.organizations.services.organization import organization_service +from sentry.relocation.services.relocation_export.model import ( + RelocationExportReplyWithExportParameters, +) +from sentry.relocation.services.relocation_export.service import control_relocation_export_service from sentry.signals import relocated, relocation_redeem_promo_code from sentry.silo.base import SiloMode from sentry.tasks.base import instrumented_task +from sentry.types.region import get_local_region from sentry.users.services.lost_password_hash import lost_password_hash_service from sentry.users.services.user.service import user_service from sentry.utils import json @@ -65,6 +77,7 @@ retry_task_or_fail_relocation, send_relocation_update_email, start_relocation_task, + uuid_to_identifier, ) logger = logging.getLogger(__name__) @@ -75,6 +88,7 @@ MEDIUM_TIME_LIMIT = 60 * 5 # 5 minutes SLOW_TIME_LIMIT = 60 * 60 # 1 hour DEFAULT_VALIDATION_TIMEOUT = timedelta(minutes=60) +CROSS_REGION_EXPORT_TIMEOUT = timedelta(minutes=60) # All pre and post processing tasks have the same number of retries. A "fast" task is one that almost always completes in <=5 minutes, and does relatively little bulk writing to the database. MAX_FAST_TASK_RETRIES = 3 @@ -97,6 +111,10 @@ # Various error strings that we want to surface to users, grouped by step. ERR_UPLOADING_FAILED = "Internal error during file upload." +ERR_UPLOADING_NO_SAAS_TO_SAAS_ORG_SLUG = "SAAS->SAAS relocations must specify an org slug." +ERR_UPLOADING_CROSS_REGION_TIMEOUT = Template( + "Cross-region relocation export request timed out after $delta." +) ERR_PREPROCESSING_DECRYPTION = """Could not decrypt the imported JSON - are you sure you used the correct public key?""" @@ -131,9 +149,294 @@ ERR_COMPLETED_INTERNAL = "Internal error during relocation wrap-up." -# TODO(getsentry/team-ospo#216): We should split this task in two, one for "small" imports of say -# <=10MB, and one for large imports >10MB. Then we should limit the number of daily executions of -# the latter. +@instrumented_task( + name="sentry.relocation.uploading_start", + queue="relocation", + autoretry_for=(Exception,), + max_retries=MAX_FAST_TASK_RETRIES, + retry_backoff=RETRY_BACKOFF, + retry_backoff_jitter=True, + soft_time_limit=FAST_TIME_LIMIT, +) +def uploading_start(uuid: str, replying_region_name: str | None, org_slug: str | None) -> None: + """ + The very first action in the relocation pipeline. In the case of a `SAAS_TO_SAAS` relocation, it + will trigger the export of the requested organization from the region it currently live in. If + this is a `SELF_HOSTED` relocation, this task is a no-op that merely auto-triggers the next step + in the chain, `upload_complete`. + + In the case of a `SAAS_TO_SAAS` relocation, we'll need to export an organization from the + exporting region (ER) back to the requesting region (RR - where this method is running). Because + region-to-region messaging is forbidden, all of this messaging will need to be proxied via the + control silo (CS). Thus, to accomplish this export-and-copy operation, we'll need to use + sequenced RPC calls to fault-tolerantly execute code in these three siloed locations. + + Due to of our system architecture, the sequence of remote functions executed can be a bit hard + to follow, so it is diagrammed and listed below. Each function executed is given a sequential + numerical identifier and is annotated with information about where to find the source and which + silo it will be executed in: + + + | Requesting | | Control | | Exporting | + | (RR) | | (CS) | | (ER) | + |============| |============| |============| + | | | | | | + | 01 | | | | | + | |-----02---->| | | | + | | | 03 | | | + | | | 04 | | | + | | | |-----05---->| | + | | | | | 06 | + | | | | | 07 | + | | | | | 08 | + | | | |<----09-----| | + | | | 10 | | | + | | | 11 | | | + | |<----12-----| | | | + | 13 | | | | | + | | | | | | + + + 01. (RR) .../tasks/relocation.py::uploading_start: This first function grabs this (aka the + "requesting" region) region's public key, then sends an RPC call to the control silo, + requesting an export of some `org_slug` from the `replying_region_name` in which is lives. + 02. The `ProxyingRelocationExportService::request_new_export` call is sent over the wire from + the requesting region to the control silo. + 03. (CS) .../relocation_export/impl.py::ProxyingRelocationExportService::request_new_export: The + request RPC call is received, and is immediately packaged into a `ControlOutbox`, so that we + may robustly forward it to the exporting region. This task successfully completing causes + the RPC to successfully return to the sender, allowing the calling `uploading_start` celery + task to finish successfully as well. + 04. (CS) .../receiver/outbox/control.py::process_relocation_request_new_export: Whenever an + outbox draining attempt occurs, this code will be called to forward the proxied call into + the exporting region. + 05. The `DBBackedExportService::request_new_export` call is sent over the wire from the control + silo to the exporting region. + 06. (ER) .../relocation_export/impl.py::DBBackedRelocationExportService::request_new_export: The + request RPC call is received, and immediately schedules the + `fulfill_cross_region_export_request` celery task, which uses an exponential backoff + algorithm to try and create an encrypted tarball containing an export of the requested org + slug. + 07. (ER) .../tasks/relocation.py::fulfill_cross_region_export_request: This celery task performs + the actual export operation locally in the exporting region. This data is written as a file + to this region's relocation-specific GCS bucket, and the response is immediately packaged + into a `RegionOutbox`, so that we may robustly attempt to send it at drain-time. + 08. (ER) .../receiver/outbox/region.py::process_relocation_reply_with_export: Whenever an outbox + draining attempt occurs, this code will be called to read the saved export data from the + local GCS bucket, package it into an RPC call, and send it back to the proxy. + 09. The `ProxyingRelocationExportService::reply_with_export` call is sent over the wire from the + exporting region back to the control silo. + 10. (CS) .../relocation_export/impl.py::ProxyingRelocationExportService::reply_with_export: The + request RPC call is received, and is immediately packaged into a `ControlOutbox`, so that we + may robustly forward it back to the requesting region. To ensure robustness, the export data + is saved to a local file, so that outbox drain attempts can read it locally without needing + to make their own nested RPB calls. + 11. (CS) .../receiver/outbox/control.py::process_relocation_reply_with_export: Whenever an + outbox draining attempt occurs, this code will be called to read the export data from the + local relocation-specific GCS bucket, then forward it into the requesting region. + 12. The `DBBackedExportService::reply_with_export` call is sent over the wire from the control + silo back to the requesting region. + 13. (RR) .../relocation_export/impl.py::DBBackedRelocationExportService::reply_with_export: + We've made it all the way back! The export data gets saved to a `RelocationFile` associated + with the `Relocation` that originally triggered `uploading_start`, and the next task in the + sequence (`uploading_complete`) is scheduled. + """ + + relocation: Relocation | None + attempts_left: int + (relocation, attempts_left) = start_relocation_task( + uuid=uuid, + task=OrderedTask.UPLOADING_START, + allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS, + ) + if relocation is None: + return + + with retry_task_or_fail_relocation( + relocation, + OrderedTask.UPLOADING_START, + attempts_left, + ERR_UPLOADING_FAILED, + ): + # If SAAS->SAAS, kick off an export on the source region. In this case, we will not schedule + # an `uploading_complete` task - this region's `RelocationExportService.reply_with_export` + # method will wait for a reply from the work we've scheduled here, which will be in charge + # of writing the `RelocationFile` from the exported data and kicking off + # `uploading_complete` with the export data from the source region. + if relocation.provenance == Relocation.Provenance.SAAS_TO_SAAS: + if not org_slug: + return fail_relocation( + relocation, + OrderedTask.UPLOADING_START, + ERR_UPLOADING_NO_SAAS_TO_SAAS_ORG_SLUG, + ) + + # We want to encrypt this organization from the other region using this region's public + # key. + public_key_pem = GCPKMSEncryptor.from_crypto_key_version( + get_default_crypto_key_version() + ).get_public_key_pem() + + # Send out the cross-region request. + control_relocation_export_service.request_new_export( + relocation_uuid=uuid, + requesting_region_name=get_local_region().name, + replying_region_name=replying_region_name, + org_slug=org_slug, + encrypt_with_public_key=public_key_pem, + ) + + # Make sure we're not waiting forever for our cross-region check to come back. After a + # reasonable amount of time, go ahead and fail the relocation. + cross_region_export_timeout_check.apply_async( + args=[uuid], + countdown=CROSS_REGION_EXPORT_TIMEOUT * 60, + ) + return + + # If this is a regular self-hosted relocation, we have nothing to do here, so just move on + # to the next step. + uploading_complete.apply_async(args=[uuid]) + + +@instrumented_task( + name="sentry.relocation.fulfill_cross_region_export_request", + queue="relocation", + autoretry_for=(Exception,), + max_retries=MAX_FAST_TASK_RETRIES, + retry_backoff=RETRY_BACKOFF, + retry_backoff_jitter=True, + # Setting `acks_late` here allows us to retry the potentially long-lived task if the k8s pod if + # the worker received SIGKILL/TERM/QUIT. We have a timeout check in the task itself to make sure + # it does not loop indefinitely. + acks_late=True, + soft_time_limit=MEDIUM_TIME_LIMIT, + silo_mode=SiloMode.REGION, +) +def fulfill_cross_region_export_request( + uuid: str, + requesting_region_name: str, + replying_region_name: str, + org_slug: str, + encrypt_with_public_key: bytes, + # Unix timestamp, in seconds. + scheduled_at: int, +) -> None: + """ + Unlike most other tasks in this file, this one is not an `OrderedTask` intended to be + sequentially executed as part of the relocation pipeline. Instead, its job is to export an + already existing organization from an adjacent region. That means it is triggered (via RPC - the + `relocation_export` service for more) on that region from the `uploading_start` task, which then + waits for the exporting region to issue an RPC call back with the data. Once that replying RPC + call is received with the encrypted export in tow, it will trigger the next step in the + `SAAS_TO_SAAS` relocation's pipeline, namely `uploading_complete`. + """ + + # Because we use `acks_late`, we need to be careful to prevent infinite scheduling due to some + # persistent bug, like an error in the export logic. So, if `CROSS_REGION_EXPORT_TIMEOUT` time + # has elapsed, always fail this task. Note that we don't report proactively back this failure, + # and instead wait for the timeout check to pick it up on the other end. + scheduled_at_dt = datetime.fromtimestamp(scheduled_at, tz=UTC) + if scheduled_at_dt + CROSS_REGION_EXPORT_TIMEOUT < datetime.now(tz=UTC): + logger.error( + "Cross region relocation fulfillment timeout", + extra={ + "uuid": uuid, + "requesting_region_name": requesting_region_name, + "replying_region_name": replying_region_name, + "org_slug": org_slug, + "encrypted_contents_size": len(encrypt_with_public_key), + "scheduled_at": scheduled_at, + }, + ) + return + + log_gcp_credentials_details(logger) + path = f"runs/{uuid}/saas_to_saas_export/{org_slug}.tar" + relocation_storage = get_relocation_storage() + fp = BytesIO() + export_in_organization_scope( + fp, + encryptor=LocalFileEncryptor(BytesIO(encrypt_with_public_key)), + org_filter={org_slug}, + printer=LoggingPrinter(uuid), + ) + fp.seek(0) + relocation_storage.save(path, fp) + + identifier = uuid_to_identifier(UUID(uuid)) + RegionOutbox( + shard_scope=OutboxScope.RELOCATION_SCOPE, + category=OutboxCategory.RELOCATION_EXPORT_REPLY, + shard_identifier=identifier, + object_identifier=identifier, + payload=RelocationExportReplyWithExportParameters( + relocation_uuid=uuid, + requesting_region_name=requesting_region_name, + replying_region_name=replying_region_name, + org_slug=org_slug, + ).dict(), + ).save() + + +@instrumented_task( + name="sentry.relocation.cross_region_export_timeout_check", + queue="relocation", + autoretry_for=(Exception,), + max_retries=MAX_FAST_TASK_RETRIES, + retry_backoff=RETRY_BACKOFF, + retry_backoff_jitter=True, + soft_time_limit=FAST_TIME_LIMIT, + silo_mode=SiloMode.REGION, +) +def cross_region_export_timeout_check( + uuid: str, +) -> None: + """ + Not part of the primary `OrderedTask` queue. This task is only used to ensure that cross-region + export requests don't hang indefinitely. + """ + + try: + relocation: Relocation = Relocation.objects.get(uuid=uuid) + except Relocation.DoesNotExist: + logger.exception("Could not locate Relocation model by UUID: %s", uuid) + return + + logger_data = {"uuid": relocation.uuid, "task": "cross_region_export_timeout_check"} + logger.info( + "Cross region timeout check: started", + extra=logger_data, + ) + + # We've moved past the `UPLOADING_START` step, so the cross-region response was received, one + # way or another. + if relocation.latest_task != OrderedTask.UPLOADING_START.name: + logger.info( + "Cross region timeout check: no timeout detected", + extra=logger_data, + ) + return + + # Another nested exception handler could have already failed this relocation - in this case, do + # nothing. + if relocation.status == Relocation.Status.FAILURE.value: + logger.info( + "Cross region timeout check: task already failed", + extra=logger_data, + ) + return + + reason = ERR_UPLOADING_CROSS_REGION_TIMEOUT.substitute(delta=CROSS_REGION_EXPORT_TIMEOUT) + logger_data["reason"] = reason + logger.error( + "Cross region timeout check: timeout detected", + extra=logger_data, + ) + + return fail_relocation(relocation, OrderedTask.UPLOADING_START, reason) + + @instrumented_task( name="sentry.relocation.uploading_complete", queue="relocation", @@ -1025,7 +1328,7 @@ def validating_complete(uuid: str, build_id: str) -> None: autoretry_for=(Exception,), # At first blush, it would seem that retrying a failed import will leave a bunch of "abandoned" # data from the previous one, but that is not actually the case: because we use this relocation - # UUID as the `import_uuid` for the `import...` call, we'll be able to re-use all of the + # UUID as the `import_uuid` for the `import_in...` call, we'll be able to re-use all of the # already-written import chunks (and, by extension, their models). This is due to each import # write operation atomically checking the relevant `ImportChunk` table for collisions at # database write time. So it will attempt to write a new copy, realize that this `(import_uuid, @@ -1353,6 +1656,7 @@ def completed(uuid: str) -> None: TASK_MAP: dict[OrderedTask, Task] = { OrderedTask.NONE: Task(), + OrderedTask.UPLOADING_START: uploading_start, OrderedTask.UPLOADING_COMPLETE: uploading_complete, OrderedTask.PREPROCESSING_SCAN: preprocessing_scan, OrderedTask.PREPROCESSING_TRANSFER: preprocessing_transfer, diff --git a/src/sentry/testutils/region.py b/src/sentry/testutils/region.py index e78b6701b9c71c..0251c1e81887a3 100644 --- a/src/sentry/testutils/region.py +++ b/src/sentry/testutils/region.py @@ -53,6 +53,16 @@ def swap_to_default_region(self): with override_settings(SENTRY_REGION=self._tmp_state.default_region.name): yield + @contextmanager + def swap_to_region_by_name(self, region_name: str): + """Swap to the specified region when entering region mode.""" + + region = self.get_by_name(region_name) + if region is None: + raise Exception("specified swap region not found") + with override_settings(SENTRY_REGION=region.name): + yield + @property def regions(self) -> frozenset[Region]: return self._tmp_state.regions diff --git a/src/sentry/testutils/silo.py b/src/sentry/testutils/silo.py index f959caab382fcc..d27fadbd52be06 100644 --- a/src/sentry/testutils/silo.py +++ b/src/sentry/testutils/silo.py @@ -337,7 +337,9 @@ def apply(self, decorated_obj: Any) -> Any: @contextmanager -def assume_test_silo_mode(desired_silo: SiloMode, can_be_monolith: bool = True) -> Any: +def assume_test_silo_mode( + desired_silo: SiloMode, can_be_monolith: bool = True, region_name: str | None = None +) -> Any: """Potential swap the silo mode in a test class or factory, useful for creating multi SiloMode models and executing test code in a special silo context. In monolith mode, this context manager has no effect. @@ -356,8 +358,12 @@ def assume_test_silo_mode(desired_silo: SiloMode, can_be_monolith: bool = True) with override_settings(SILO_MODE=desired_silo): if desired_silo == SiloMode.REGION: region_dir = get_test_env_directory() - with region_dir.swap_to_default_region(): - yield + if region_name is None: + with region_dir.swap_to_default_region(): + yield + else: + with region_dir.swap_to_region_by_name(region_name): + yield else: with override_settings(SENTRY_REGION=None): yield diff --git a/src/sentry/utils/relocation.py b/src/sentry/utils/relocation.py index e53aea7db3e65a..c547719d41e069 100644 --- a/src/sentry/utils/relocation.py +++ b/src/sentry/utils/relocation.py @@ -7,6 +7,7 @@ from functools import lru_cache from string import Template from typing import Any +from uuid import UUID from django.utils import timezone @@ -27,26 +28,31 @@ # weird out-of-order executions. @unique class OrderedTask(Enum): + # Note: the numerical values should always be in execution order (ie, the order the tasks should + # be completed in). It is safe to edit the numbers assigned to any given task, as we only store + # tasks in the database by name. NONE = 0 - UPLOADING_COMPLETE = 1 - PREPROCESSING_SCAN = 2 - PREPROCESSING_TRANSFER = 3 - PREPROCESSING_BASELINE_CONFIG = 4 - PREPROCESSING_COLLIDING_USERS = 5 - PREPROCESSING_COMPLETE = 6 - VALIDATING_START = 7 - VALIDATING_POLL = 8 - VALIDATING_COMPLETE = 9 - IMPORTING = 10 - POSTPROCESSING = 11 - NOTIFYING_USERS = 12 - NOTIFYING_OWNER = 13 - COMPLETED = 14 + UPLOADING_START = 1 + UPLOADING_COMPLETE = 2 + PREPROCESSING_SCAN = 3 + PREPROCESSING_TRANSFER = 4 + PREPROCESSING_BASELINE_CONFIG = 5 + PREPROCESSING_COLLIDING_USERS = 6 + PREPROCESSING_COMPLETE = 7 + VALIDATING_START = 8 + VALIDATING_POLL = 9 + VALIDATING_COMPLETE = 10 + IMPORTING = 11 + POSTPROCESSING = 12 + NOTIFYING_USERS = 13 + NOTIFYING_OWNER = 14 + COMPLETED = 15 # Match each `OrderedTask` to the `Relocation.Step` it is part of. TASK_TO_STEP: dict[OrderedTask, Relocation.Step] = { OrderedTask.NONE: Relocation.Step.UNKNOWN, + OrderedTask.UPLOADING_START: Relocation.Step.UPLOADING, OrderedTask.UPLOADING_COMPLETE: Relocation.Step.UPLOADING, OrderedTask.PREPROCESSING_SCAN: Relocation.Step.PREPROCESSING, OrderedTask.PREPROCESSING_TRANSFER: Relocation.Step.PREPROCESSING, @@ -688,3 +694,10 @@ def create_cloudbuild_validation_step( timeout=str(timeout) + "s", wait_for=make_cloudbuild_step_args(3, wait_for), ) + + +def uuid_to_identifier(uuid: UUID) -> int: + """ + Take a UUID object and generated a unique-enough 64-bit identifier from it's final 64 bits. + """ + return uuid.int & ((1 << 63) - 1) diff --git a/tests/sentry/api/endpoints/relocations/test_index.py b/tests/sentry/api/endpoints/relocations/test_index.py index 465a2da6da4fcb..c181acd66ec1b0 100644 --- a/tests/sentry/api/endpoints/relocations/test_index.py +++ b/tests/sentry/api/endpoints/relocations/test_index.py @@ -297,10 +297,10 @@ def tmp_keys(self, tmp_dir: str) -> tuple[Path, Path]: return (tmp_priv_key_path, tmp_pub_key_path) @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 1}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_simple( self, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, relocation_link_promo_code_signal_mock: Mock, analytics_record_mock: Mock, ): @@ -341,7 +341,7 @@ def test_good_simple( assert Relocation.objects.count() == relocation_count + 1 assert RelocationFile.objects.count() == relocation_file_count + 1 - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 assert analytics_record_mock.call_count == 1 analytics_record_mock.assert_called_with( @@ -359,10 +359,10 @@ def test_good_simple( ) @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 1}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_promo_code( self, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, relocation_link_promo_code_signal_mock: Mock, analytics_record_mock: Mock, ): @@ -404,7 +404,7 @@ def test_good_promo_code( assert Relocation.objects.count() == relocation_count + 1 assert RelocationFile.objects.count() == relocation_file_count + 1 - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 assert analytics_record_mock.call_count == 1 analytics_record_mock.assert_called_with( @@ -428,10 +428,10 @@ def test_good_promo_code( "relocation.autopause": "IMPORTING", } ) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_with_valid_autopause_option( self, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, relocation_link_promo_code_signal_mock: Mock, analytics_record_mock: Mock, ): @@ -458,7 +458,7 @@ def test_good_with_valid_autopause_option( assert response.data["step"] == Relocation.Step.UPLOADING.name assert response.data["scheduledPauseAtStep"] == Relocation.Step.IMPORTING.name - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 assert analytics_record_mock.call_count == 1 analytics_record_mock.assert_called_with( @@ -482,10 +482,10 @@ def test_good_with_valid_autopause_option( "relocation.autopause": "DOESNOTEXIST", } ) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_with_invalid_autopause_option( self, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, relocation_link_promo_code_signal_mock: Mock, analytics_record_mock: Mock, ): @@ -512,7 +512,7 @@ def test_good_with_invalid_autopause_option( assert response.data["step"] == Relocation.Step.UPLOADING.name assert response.data["scheduledPauseAtStep"] is None - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 assert analytics_record_mock.call_count == 1 analytics_record_mock.assert_called_with( "relocation.created", @@ -531,10 +531,10 @@ def test_good_with_invalid_autopause_option( @override_options( {"relocation.enabled": False, "relocation.daily-limit.small": 1, "staff.ga-rollout": True} ) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_staff_when_feature_disabled( self, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, relocation_link_promo_code_signal_mock: Mock, analytics_record_mock: Mock, ): @@ -574,7 +574,7 @@ def test_good_staff_when_feature_disabled( assert Relocation.objects.count() == relocation_count + 1 assert RelocationFile.objects.count() == relocation_file_count + 1 - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 assert analytics_record_mock.call_count == 1 analytics_record_mock.assert_called_with( @@ -592,10 +592,10 @@ def test_good_staff_when_feature_disabled( ) @override_options({"relocation.enabled": False, "relocation.daily-limit.small": 1}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_superuser_when_feature_disabled( self, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, relocation_link_promo_code_signal_mock: Mock, analytics_record_mock: Mock, ): @@ -635,7 +635,7 @@ def test_good_superuser_when_feature_disabled( assert Relocation.objects.count() == relocation_count + 1 assert RelocationFile.objects.count() == relocation_file_count + 1 - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 assert analytics_record_mock.call_count == 1 analytics_record_mock.assert_called_with( @@ -715,10 +715,10 @@ def test_bad_expired_superuser_when_feature_disabled( ]: @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 1}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_valid_org_slugs( self, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, relocation_link_promo_code_signal_mock: Mock, analytics_record_mock: Mock, org_slugs=org_slugs, @@ -750,7 +750,7 @@ def test_good_valid_org_slugs( assert Relocation.objects.count() == relocation_count + 1 assert RelocationFile.objects.count() == relocation_file_count + 1 assert Relocation.objects.get(owner_id=self.owner.id).want_org_slugs == expected - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 assert analytics_record_mock.call_count == 1 analytics_record_mock.assert_called_with( @@ -775,12 +775,12 @@ def test_good_valid_org_slugs( ]: @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 1}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_invalid_org_slugs( self, analytics_record_mock: Mock, relocation_link_promo_code_signal_mock: Mock, - uploading_complete_mock: Mock, + uploading_start_mock: Mock, org_slugs=org_slugs, invalid_org_slug=invalid_org_slug, ): @@ -813,7 +813,7 @@ def test_bad_invalid_org_slugs( ) assert Relocation.objects.count() == relocation_count assert RelocationFile.objects.count() == relocation_file_count - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 assert analytics_record_mock.call_count == 0 assert relocation_link_promo_code_signal_mock.call_count == 0 diff --git a/tests/sentry/api/endpoints/relocations/test_retry.py b/tests/sentry/api/endpoints/relocations/test_retry.py index 1dc182f32e8bc2..9b5ee1408f9cbe 100644 --- a/tests/sentry/api/endpoints/relocations/test_retry.py +++ b/tests/sentry/api/endpoints/relocations/test_retry.py @@ -85,8 +85,8 @@ def setUp(self): ) @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") - def test_good_simple(self, uploading_complete_mock: Mock, analytics_record_mock: Mock): + @patch("sentry.tasks.relocation.uploading_start.delay") + def test_good_simple(self, uploading_start_mock: Mock, analytics_record_mock: Mock): self.login_as(user=self.owner, superuser=False) relocation_count = Relocation.objects.count() relocation_file_count = RelocationFile.objects.count() @@ -123,7 +123,7 @@ def test_good_simple(self, uploading_complete_mock: Mock, analytics_record_mock: assert RelocationFile.objects.count() == relocation_file_count + 1 assert File.objects.count() == file_count - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 analytics_record_mock.assert_called_with( "relocation.created", @@ -135,9 +135,9 @@ def test_good_simple(self, uploading_complete_mock: Mock, analytics_record_mock: @override_options( {"relocation.enabled": False, "relocation.daily-limit.small": 2, "staff.ga-rollout": True} ) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_staff_when_feature_disabled( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.staff_user, staff=True) relocation_count = Relocation.objects.count() @@ -163,7 +163,7 @@ def test_good_staff_when_feature_disabled( assert RelocationFile.objects.count() == relocation_file_count + 1 assert File.objects.count() == file_count - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 analytics_record_mock.assert_called_with( "relocation.created", @@ -173,9 +173,9 @@ def test_good_staff_when_feature_disabled( ) @override_options({"relocation.enabled": False, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_good_superuser_when_feature_disabled( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.superuser, superuser=True) relocation_count = Relocation.objects.count() @@ -201,7 +201,7 @@ def test_good_superuser_when_feature_disabled( assert RelocationFile.objects.count() == relocation_file_count + 1 assert File.objects.count() == file_count - assert uploading_complete_mock.call_count == 1 + assert uploading_start_mock.call_count == 1 analytics_record_mock.assert_called_with( "relocation.created", @@ -211,9 +211,9 @@ def test_good_superuser_when_feature_disabled( ) @override_options({"relocation.enabled": False, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_without_superuser_when_feature_disabled( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.owner, superuser=False) relocation_count = Relocation.objects.count() @@ -233,12 +233,12 @@ def test_bad_without_superuser_when_feature_disabled( assert RelocationFile.objects.count() == relocation_file_count assert File.objects.count() == file_count - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 @override_options({"relocation.enabled": False, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_expired_superuser_when_feature_disabled( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.owner, superuser=True) relocation_count = Relocation.objects.count() @@ -258,25 +258,25 @@ def test_bad_expired_superuser_when_feature_disabled( assert RelocationFile.objects.count() == relocation_file_count assert File.objects.count() == file_count - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_relocation_not_found( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.owner, superuser=False) self.get_error_response(str(uuid4().hex), status_code=404) - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_relocation_file_not_found( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.owner, superuser=False) RelocationFile.objects.all().delete() @@ -284,26 +284,26 @@ def test_bad_relocation_file_not_found( response = self.get_error_response(self.relocation.uuid, status_code=400) assert response.data.get("detail") == ERR_FILE_NO_LONGER_EXISTS - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") - def test_bad_file_not_found(self, uploading_complete_mock: Mock, analytics_record_mock: Mock): + @patch("sentry.tasks.relocation.uploading_start.delay") + def test_bad_file_not_found(self, uploading_start_mock: Mock, analytics_record_mock: Mock): self.login_as(user=self.owner, superuser=False) File.objects.all().delete() response = self.get_error_response(self.relocation.uuid, status_code=400) assert response.data.get("detail") == ERR_FILE_NO_LONGER_EXISTS - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 @override_options( {"relocation.enabled": True, "relocation.daily-limit.small": 2, "staff.ga-rollout": True} ) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_staff_owner_not_found( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.staff_user, staff=True) with assume_test_silo_mode(SiloMode.CONTROL): @@ -312,13 +312,13 @@ def test_bad_staff_owner_not_found( response = self.get_error_response(self.relocation.uuid, status_code=400) assert response.data.get("detail") == ERR_OWNER_NO_LONGER_EXISTS - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_superuser_owner_not_found( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock + self, uploading_start_mock: Mock, analytics_record_mock: Mock ): self.login_as(user=self.superuser, superuser=True) with assume_test_silo_mode(SiloMode.CONTROL): @@ -327,18 +327,18 @@ def test_bad_superuser_owner_not_found( response = self.get_error_response(self.relocation.uuid, status_code=400) assert response.data.get("detail") == ERR_OWNER_NO_LONGER_EXISTS - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 1}) - @patch("sentry.tasks.relocation.uploading_complete.delay") - def test_bad_throttled(self, uploading_complete_mock: Mock, analytics_record_mock: Mock): + @patch("sentry.tasks.relocation.uploading_start.delay") + def test_bad_throttled(self, uploading_start_mock: Mock, analytics_record_mock: Mock): self.login_as(user=self.owner, superuser=False) response = self.get_error_response(self.relocation.uuid, status_code=429) assert response.data.get("detail") == ERR_THROTTLED_RELOCATION - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() for stat in [ @@ -347,9 +347,9 @@ def test_bad_throttled(self, uploading_complete_mock: Mock, analytics_record_moc ]: @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 2}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_relocation_still_ongoing( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock, stat=stat + self, uploading_start_mock: Mock, analytics_record_mock: Mock, stat=stat ): self.login_as(user=self.owner, superuser=False) self.relocation.status = stat.value @@ -361,7 +361,7 @@ def test_bad_relocation_still_ongoing( assert response.data.get("detail") == ERR_NOT_RETRYABLE_STATUS.substitute( status=stat.name ) - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() for stat in [ @@ -370,9 +370,9 @@ def test_bad_relocation_still_ongoing( ]: @override_options({"relocation.enabled": True, "relocation.daily-limit.small": 3}) - @patch("sentry.tasks.relocation.uploading_complete.delay") + @patch("sentry.tasks.relocation.uploading_start.delay") def test_bad_owner_has_another_active_relocation( - self, uploading_complete_mock: Mock, analytics_record_mock: Mock, stat=stat + self, uploading_start_mock: Mock, analytics_record_mock: Mock, stat=stat ): self.login_as(user=self.owner, superuser=False) Relocation.objects.create( @@ -393,5 +393,5 @@ def test_bad_owner_has_another_active_relocation( response = self.get_error_response(self.relocation.uuid, status_code=409) assert response.data.get("detail") == ERR_DUPLICATE_RELOCATION - assert uploading_complete_mock.call_count == 0 + assert uploading_start_mock.call_count == 0 analytics_record_mock.assert_not_called() diff --git a/tests/sentry/tasks/test_relocation.py b/tests/sentry/tasks/test_relocation.py index 78383ae85eb617..14e89003334452 100644 --- a/tests/sentry/tasks/test_relocation.py +++ b/tests/sentry/tasks/test_relocation.py @@ -1,3 +1,4 @@ +from datetime import timedelta from functools import cached_property from io import BytesIO from pathlib import Path @@ -9,6 +10,7 @@ import pytest import yaml from django.core.files.storage import Storage +from django.test import override_settings from google.cloud.devtools.cloudbuild_v1 import Build from google_crc32c import value as crc32c @@ -39,6 +41,7 @@ ValidationStatus, ) from sentry.models.user import User +from sentry.relocation.services.relocation_export.service import control_relocation_export_service from sentry.silo.base import SiloMode from sentry.tasks.relocation import ( ERR_NOTIFYING_INTERNAL, @@ -52,7 +55,9 @@ ERR_PREPROCESSING_NO_USERS, ERR_PREPROCESSING_TOO_MANY_ORGS, ERR_PREPROCESSING_TOO_MANY_USERS, + ERR_UPLOADING_CROSS_REGION_TIMEOUT, ERR_UPLOADING_FAILED, + ERR_UPLOADING_NO_SAAS_TO_SAAS_ORG_SLUG, ERR_VALIDATING_INTERNAL, ERR_VALIDATING_MAX_RUNS, MAX_FAST_TASK_ATTEMPTS, @@ -71,6 +76,7 @@ preprocessing_scan, preprocessing_transfer, uploading_complete, + uploading_start, validating_complete, validating_poll, validating_start, @@ -78,13 +84,22 @@ from sentry.testutils.cases import TestCase, TransactionTestCase from sentry.testutils.factories import get_fixture_path from sentry.testutils.helpers.backups import FakeKeyManagementServiceClient, generate_rsa_key_pair -from sentry.testutils.helpers.task_runner import BurstTaskRunner, BurstTaskRunnerRetryError -from sentry.testutils.silo import assume_test_silo_mode +from sentry.testutils.helpers.task_runner import ( + BurstTaskRunner, + BurstTaskRunnerRetryError, + TaskRunner, +) +from sentry.testutils.outbox import outbox_runner +from sentry.testutils.silo import assume_test_silo_mode, create_test_regions, region_silo_test from sentry.utils import json from sentry.utils.relocation import RELOCATION_BLOB_SIZE, RELOCATION_FILE_TYPE, OrderedTask IMPORT_JSON_FILE_PATH = get_fixture_path("backup", "fresh-install.json") +REQUESTING_TEST_REGION = "requesting" +EXPORTING_TEST_REGION = "exporting" +SAAS_TO_SAAS_TEST_REGIONS = create_test_regions(REQUESTING_TEST_REGION, EXPORTING_TEST_REGION) + class FakeCloudBuildClient: """ @@ -107,7 +122,9 @@ def setUp(self): is_staff=False, is_active=True, ) - self.existing_org = self.create_organization(name=self.requested_org_slug) + self.existing_org = self.create_organization( + name=self.requested_org_slug, owner=self.existing_org_owner + ) self.owner = self.create_user( email="owner@example.com", is_superuser=False, is_staff=False, is_active=True @@ -119,7 +136,7 @@ def setUp(self): self.relocation: Relocation = Relocation.objects.create( creator_id=self.superuser.id, owner_id=self.owner.id, - want_org_slugs=["testing"], + want_org_slugs=[self.requested_org_slug], step=Relocation.Step.UPLOADING.value, ) self.relocation_file = RelocationFile.objects.create( @@ -175,6 +192,8 @@ def swap_file( def mock_kms_client(self, fake_kms_client: FakeKeyManagementServiceClient): fake_kms_client.asymmetric_decrypt.call_count = 0 fake_kms_client.get_public_key.call_count = 0 + if not hasattr(self, "tarball"): + _ = self.file unwrapped = unwrap_encrypted_export_tarball(BytesIO(self.tarball)) plaintext_dek = LocalFileDecryptor.from_bytes( @@ -210,9 +229,303 @@ def mock_message_builder(self, fake_message_builder: Mock): fake_message_builder.return_value.send_async.return_value = Mock() +@patch( + "sentry.backup.crypto.KeyManagementServiceClient", + new_callable=lambda: FakeKeyManagementServiceClient, +) +@patch("sentry.utils.relocation.MessageBuilder") +@patch("sentry.tasks.relocation.uploading_complete.apply_async") +@region_silo_test(regions=SAAS_TO_SAAS_TEST_REGIONS) +class UploadingStartTest(RelocationTaskTestCase): + def setUp(self): + self.owner = self.create_user( + email="owner@example.com", is_superuser=False, is_staff=False, is_active=True + ) + self.superuser = self.create_user( + email="superuser@example.com", is_superuser=True, is_staff=True, is_active=True + ) + self.login_as(user=self.superuser, superuser=True) + + with assume_test_silo_mode(SiloMode.REGION, region_name=EXPORTING_TEST_REGION): + self.requested_org_slug = "testing" + self.existing_org_owner = self.create_user( + email="existing_org_owner@example.com", + is_superuser=False, + is_staff=False, + is_active=True, + ) + self.existing_org = self.create_organization( + name=self.requested_org_slug, owner=self.existing_org_owner + ) + + with assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION): + self.relocation: Relocation = Relocation.objects.create( + creator_id=self.superuser.id, + owner_id=self.owner.id, + want_org_slugs=[self.requested_org_slug], + step=Relocation.Step.UPLOADING.value, + latest_task=OrderedTask.UPLOADING_START.name, + provenance=Relocation.Provenance.SAAS_TO_SAAS, + ) + self.uuid = str(self.relocation.uuid) + + @override_settings( + SENTRY_MONOLITH_REGION=REQUESTING_TEST_REGION, SENTRY_REGION=REQUESTING_TEST_REGION + ) + @patch("sentry.tasks.relocation.cross_region_export_timeout_check.apply_async") + def test_success_saas_to_saas( + self, + cross_region_export_timeout_check_mock: Mock, + uploading_complete_mock: Mock, + fake_message_builder: Mock, + fake_kms_client: FakeKeyManagementServiceClient, + ): + self.mock_message_builder(fake_message_builder) + self.mock_kms_client(fake_kms_client) + + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + with assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION): + uploading_start(self.uuid, EXPORTING_TEST_REGION, self.requested_org_slug) + + assert uploading_complete_mock.call_count == 0 + with outbox_runner(): + pass + + assert uploading_complete_mock.call_count == 1 + assert cross_region_export_timeout_check_mock.call_count == 1 + assert fake_message_builder.call_count == 0 + assert fake_kms_client.get_public_key.call_count == 1 + assert fake_kms_client.asymmetric_decrypt.call_count == 0 + + assert RelocationFile.objects.filter( + relocation=self.relocation, + kind=RelocationFile.Kind.RAW_USER_DATA.value, + ).exists() + + @override_settings( + SENTRY_MONOLITH_REGION=REQUESTING_TEST_REGION, SENTRY_REGION=REQUESTING_TEST_REGION + ) + @patch("sentry.tasks.relocation.cross_region_export_timeout_check.apply_async") + def test_success_saas_to_saas_racing( + self, + cross_region_export_timeout_check_mock: Mock, + uploading_complete_mock: Mock, + fake_message_builder: Mock, + fake_kms_client: FakeKeyManagementServiceClient, + ): + self.mock_message_builder(fake_message_builder) + self.mock_kms_client(fake_kms_client) + + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + with assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION): + uploading_start(self.uuid, EXPORTING_TEST_REGION, self.requested_org_slug) + + # Create a racing call, due to ex: outbox retry. These must be deduped when + # receiving the reply back in the requesting region. + control_relocation_export_service.request_new_export( + relocation_uuid=str(self.uuid), + requesting_region_name=REQUESTING_TEST_REGION, + replying_region_name=EXPORTING_TEST_REGION, + org_slug=self.requested_org_slug, + encrypt_with_public_key=fake_kms_client.get_public_key().pem.encode(), + ) + + assert uploading_complete_mock.call_count == 0 + with outbox_runner(): + pass + + assert uploading_complete_mock.call_count == 1 + assert cross_region_export_timeout_check_mock.call_count == 1 + assert fake_message_builder.call_count == 0 + assert fake_kms_client.get_public_key.call_count == 2 + assert fake_kms_client.asymmetric_decrypt.call_count == 0 + + assert ( + RelocationFile.objects.filter( + relocation=self.relocation, + kind=RelocationFile.Kind.RAW_USER_DATA.value, + ).count() + == 1 + ) + + @patch("sentry.tasks.relocation.cross_region_export_timeout_check.apply_async") + def test_success_self_hosted( + self, + cross_region_export_timeout_check_mock: Mock, + uploading_complete_mock: Mock, + fake_message_builder: Mock, + fake_kms_client: FakeKeyManagementServiceClient, + ): + self.mock_message_builder(fake_message_builder) + self.mock_kms_client(fake_kms_client) + + self.relocation.provenance = Relocation.Provenance.SELF_HOSTED + self.relocation.save() + + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + with assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION): + uploading_start(self.uuid, None, None) + + assert uploading_complete_mock.call_count == 1 + with outbox_runner(): + pass + + assert uploading_complete_mock.call_count == 1 + assert cross_region_export_timeout_check_mock.call_count == 0 + assert fake_message_builder.call_count == 0 + assert fake_kms_client.get_public_key.call_count == 0 + assert fake_kms_client.asymmetric_decrypt.call_count == 0 + + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + + @patch("sentry.tasks.relocation.cross_region_export_timeout_check.apply_async") + def test_retry_if_attempts_left( + self, + cross_region_export_timeout_check_mock: Mock, + uploading_complete_mock: Mock, + fake_message_builder: Mock, + fake_kms_client: FakeKeyManagementServiceClient, + ): + self.mock_message_builder(fake_message_builder) + self.mock_kms_client(fake_kms_client) + + # An exception being raised will trigger a retry in celery. + with pytest.raises(Exception): + fake_kms_client.get_public_key.side_effect = Exception("Test") + with assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION): + uploading_start(self.uuid, EXPORTING_TEST_REGION, self.requested_org_slug) + + assert uploading_complete_mock.call_count == 0 + assert cross_region_export_timeout_check_mock.call_count == 0 + assert fake_message_builder.call_count == 0 + assert fake_kms_client.get_public_key.call_count == 1 + assert fake_kms_client.asymmetric_decrypt.call_count == 0 + + relocation = Relocation.objects.get(uuid=self.uuid) + assert relocation.status == Relocation.Status.IN_PROGRESS.value + assert not relocation.failure_reason + + @patch("sentry.tasks.relocation.cross_region_export_timeout_check.apply_async") + def test_fail_if_no_attempts_left( + self, + cross_region_export_timeout_check_mock: Mock, + uploading_complete_mock: Mock, + fake_message_builder: Mock, + fake_kms_client: FakeKeyManagementServiceClient, + ): + self.mock_message_builder(fake_message_builder) + self.mock_kms_client(fake_kms_client) + self.relocation.latest_task = OrderedTask.UPLOADING_START.name + self.relocation.latest_task_attempts = MAX_FAST_TASK_RETRIES + self.relocation.save() + + with pytest.raises(Exception): + fake_kms_client.get_public_key.side_effect = Exception("Test") + with assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION): + uploading_start(self.uuid, EXPORTING_TEST_REGION, self.requested_org_slug) + + assert fake_message_builder.call_count == 1 + assert fake_message_builder.call_args.kwargs["type"] == "relocation.failed" + fake_message_builder.return_value.send_async.assert_called_once_with( + to=[self.owner.email, self.superuser.email] + ) + + assert uploading_complete_mock.call_count == 0 + assert cross_region_export_timeout_check_mock.call_count == 0 + assert fake_kms_client.get_public_key.call_count == 1 + assert fake_kms_client.asymmetric_decrypt.call_count == 0 + + relocation = Relocation.objects.get(uuid=self.uuid) + assert relocation.status == Relocation.Status.FAILURE.value + assert relocation.failure_reason == ERR_UPLOADING_FAILED + + @patch("sentry.tasks.relocation.cross_region_export_timeout_check.apply_async") + def test_fail_no_org_slug_when_saas_to_saas( + self, + cross_region_export_timeout_check_mock: Mock, + uploading_complete_mock: Mock, + fake_message_builder: Mock, + fake_kms_client: FakeKeyManagementServiceClient, + ): + self.mock_message_builder(fake_message_builder) + self.mock_kms_client(fake_kms_client) + + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + with assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION): + # Will fail, because we do not supply an `org_slug` argument for a `SAAS_TO_SAAS` + # relocation. + uploading_start(self.uuid, EXPORTING_TEST_REGION, None) + + assert uploading_complete_mock.call_count == 0 + with outbox_runner(): + pass + + assert uploading_complete_mock.call_count == 0 + assert cross_region_export_timeout_check_mock.call_count == 0 + assert fake_message_builder.call_count == 1 + assert fake_kms_client.get_public_key.call_count == 0 + assert fake_kms_client.asymmetric_decrypt.call_count == 0 + + relocation = Relocation.objects.get(uuid=self.uuid) + assert relocation.status == Relocation.Status.FAILURE.value + assert relocation.latest_notified == Relocation.EmailKind.FAILED.value + assert relocation.failure_reason == ERR_UPLOADING_NO_SAAS_TO_SAAS_ORG_SLUG + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + + # -1 minutes guarantees a timeout, even during synchronous execution. + @patch("sentry.tasks.relocation.CROSS_REGION_EXPORT_TIMEOUT", timedelta(minutes=-1)) + def test_fail_due_to_timeout( + self, + uploading_complete_mock: Mock, + fake_message_builder: Mock, + fake_kms_client: FakeKeyManagementServiceClient, + ): + self.mock_message_builder(fake_message_builder) + self.mock_kms_client(fake_kms_client) + + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + with ( + TaskRunner(), + assume_test_silo_mode(SiloMode.REGION, region_name=REQUESTING_TEST_REGION), + ): + uploading_start(self.uuid, EXPORTING_TEST_REGION, self.requested_org_slug) + + assert uploading_complete_mock.call_count == 0 + with outbox_runner(): + pass + + # No reply due to server-side timeout. + assert uploading_complete_mock.call_count == 0 + + # Ensure that the relocation has been marked as failed via the timeout handler on the + # client-side. + relocation = Relocation.objects.get(uuid=self.uuid) + assert relocation.status == Relocation.Status.FAILURE.value + assert relocation.latest_notified == Relocation.EmailKind.FAILED.value + assert relocation.failure_reason == ERR_UPLOADING_CROSS_REGION_TIMEOUT.substitute( + delta=timedelta(minutes=-1) + ) + assert fake_message_builder.call_count == 1 + assert fake_message_builder.call_args.kwargs["type"] == "relocation.failed" + fake_message_builder.return_value.send_async.assert_called_once_with( + to=[self.owner.email, self.superuser.email] + ) + + assert fake_kms_client.get_public_key.call_count == 1 + assert fake_kms_client.asymmetric_decrypt.call_count == 0 + + assert not RelocationFile.objects.filter(relocation=self.relocation).exists() + + @patch("sentry.utils.relocation.MessageBuilder") @patch("sentry.tasks.relocation.preprocessing_scan.apply_async") class UploadingCompleteTest(RelocationTaskTestCase): + def setUp(self): + super().setUp() + self.relocation.step = Relocation.Step.UPLOADING.value + self.relocation.latest_task = OrderedTask.UPLOADING_START.name + self.relocation.save() + def test_success( self, preprocessing_scan_mock: Mock, @@ -2322,7 +2635,7 @@ def test_valid_no_retries( org_count = Organization.objects.filter(slug__startswith="testing").count() with BurstTaskRunner() as burst: - uploading_complete(self.relocation.uuid) + uploading_start(self.relocation.uuid, None, None) with patch.object( LostPasswordHash, "send_relocate_account_email" @@ -2371,7 +2684,7 @@ def test_valid_max_retries( org_count = Organization.objects.filter(slug__startswith="testing").count() with BurstTaskRunner() as burst: - uploading_complete(self.relocation.uuid) + uploading_start(self.relocation.uuid, None, None) with patch.object( LostPasswordHash, "send_relocate_account_email" @@ -2419,7 +2732,7 @@ def test_invalid_no_retries( org_count = Organization.objects.filter(slug__startswith="testing").count() with BurstTaskRunner() as burst: - uploading_complete(self.relocation.uuid) + uploading_start(self.relocation.uuid, None, None) with patch.object( LostPasswordHash, "send_relocate_account_email" @@ -2469,7 +2782,7 @@ def test_invalid_max_retries( org_count = Organization.objects.filter(slug__startswith="testing").count() with BurstTaskRunner() as burst: - uploading_complete(self.relocation.uuid) + uploading_start(self.relocation.uuid, None, None) with patch.object( LostPasswordHash, "send_relocate_account_email" diff --git a/tests/sentry/utils/test_relocation.py b/tests/sentry/utils/test_relocation.py index 70e724575241fc..4a564b305eb5f5 100644 --- a/tests/sentry/utils/test_relocation.py +++ b/tests/sentry/utils/test_relocation.py @@ -116,7 +116,7 @@ def test_bad_task_attempts_exhausted(self, fake_message_builder: Mock): def test_good_first_task(self, fake_message_builder: Mock): self.mock_message_builder(fake_message_builder) - (rel, attempts_left) = start_relocation_task(self.uuid, OrderedTask.UPLOADING_COMPLETE, 3) + (rel, attempts_left) = start_relocation_task(self.uuid, OrderedTask.UPLOADING_START, 3) assert fake_message_builder.call_count == 0 assert attempts_left == 2