diff --git a/snuba/admin/audit_log/action.py b/snuba/admin/audit_log/action.py index d83ebf981b..24cd15f18d 100644 --- a/snuba/admin/audit_log/action.py +++ b/snuba/admin/audit_log/action.py @@ -13,6 +13,7 @@ class AuditLogAction(Enum): REVERSED_MIGRATION_COMPLETED = "reversed.migration.completed" RAN_MIGRATION_FAILED = "ran.migration.failed" REVERSED_MIGRATION_FAILED = "reversed.migration.failed" + FORCE_MIGRATION_OVERWRITE = "force.migration.overwrite" ALLOCATION_POLICY_UPDATE = "allocation_policy.update" ALLOCATION_POLICY_DELETE = "allocation_policy.delete" DLQ_REPLAY = "dlq.replay" diff --git a/snuba/admin/views.py b/snuba/admin/views.py index 10a7085182..7fc89a1406 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -74,6 +74,7 @@ from snuba.migrations.errors import InactiveClickhouseReplica, MigrationError from snuba.migrations.groups import MigrationGroup, get_group_readiness_state from snuba.migrations.runner import MigrationKey, Runner +from snuba.migrations.status import Status from snuba.query.exceptions import InvalidQueryException from snuba.query.query_settings import HTTPQuerySettings from snuba.replacers.replacements_and_expiry import ( @@ -233,6 +234,34 @@ def reverse_migration(group: str, migration_id: str) -> Response: ) +@application.route( + "/migrations//overwrite//status/", + methods=["POST"], +) +@check_tool_perms(tools=[AdminTools.MIGRATIONS]) +def force_overwrite_migration_status( + group: str, migration_id: str, new_status: str +) -> Response: + try: + migration_group = MigrationGroup(group) + except ValueError as err: + logger.error(err, exc_info=True) + return make_response(jsonify({"error": "Group not found"}), 400) + + runner.force_overwrite_status(migration_group, migration_id, Status(new_status)) + user = request.headers.get(USER_HEADER_KEY) + + audit_log.record( + user or "", + AuditLogAction.FORCE_MIGRATION_OVERWRITE, + {"group": group, "migration": migration_id, "new_status": new_status}, + notify=True, + ) + + res = {"status": "OK"} + return make_response(jsonify(res), 200) + + @check_migration_perms def run_or_reverse_migration(group: str, action: str, migration_id: str) -> Response: try: diff --git a/snuba/migrations/runner.py b/snuba/migrations/runner.py index 05636be477..445e5f4ee6 100644 --- a/snuba/migrations/runner.py +++ b/snuba/migrations/runner.py @@ -68,12 +68,14 @@ class MigrationDetails(NamedTuple): class Runner: def __init__(self) -> None: - migrations_cluster = get_cluster(StorageSetKey.MIGRATIONS) + self.__migrations_cluster = get_cluster(StorageSetKey.MIGRATIONS) self.__table_name = ( - LOCAL_TABLE_NAME if migrations_cluster.is_single_node() else DIST_TABLE_NAME + LOCAL_TABLE_NAME + if self.__migrations_cluster.is_single_node() + else DIST_TABLE_NAME ) - self.__connection = migrations_cluster.get_query_connection( + self.__connection = self.__migrations_cluster.get_query_connection( ClickhouseClientSettings.MIGRATE ) @@ -115,6 +117,21 @@ def get_status( return Status.NOT_STARTED, None + def force_overwrite_status( + self, group: MigrationGroup, migration_id: str, new_status: Status + ) -> None: + """Sometimes a migration gets blocked or times out for whatever reason. + This function is used to overwrite the state in the snuba table keeping + track of migration so we can try again""" + local_node = self.__migrations_cluster.get_local_nodes()[0] + local_node_connection = self.__migrations_cluster.get_node_connection( + ClickhouseClientSettings.MIGRATE, local_node + ) + + local_node_connection.execute( + f"ALTER TABLE {LOCAL_TABLE_NAME} UPDATE status='{new_status.value}' WHERE migration_id='{migration_id}'" + ) + def show_all( self, groups: Optional[Sequence[str]] = None ) -> List[Tuple[MigrationGroup, List[MigrationDetails]]]: diff --git a/tests/admin/test_api.py b/tests/admin/test_api.py index 691dd756ad..d37153181f 100644 --- a/tests/admin/test_api.py +++ b/tests/admin/test_api.py @@ -582,6 +582,28 @@ def test_prod_snql_query_invalid_query(admin_api: FlaskClient) -> None: ) +@pytest.mark.redis_db +@pytest.mark.clickhouse_db +def test_force_overwrite(admin_api: FlaskClient) -> None: + migration_id = "0009_add_message" + migrations = json.loads(admin_api.get("/migrations/search_issues/list").data) + downgraded_migration = [ + m for m in migrations if m.get("migration_id") == migration_id + ][0] + assert downgraded_migration["status"] == "completed" + + response = admin_api.post( + f"/migrations/search_issues/overwrite/{migration_id}/status/not_started", + headers={"Referer": "https://snuba-admin.getsentry.net/"}, + ) + assert response.status_code == 200 + migrations = json.loads(admin_api.get("/migrations/search_issues/list").data) + downgraded_migration = [ + m for m in migrations if m.get("migration_id") == migration_id + ][0] + assert downgraded_migration["status"] == "not_started" + + @pytest.mark.redis_db @pytest.mark.clickhouse_db def test_prod_snql_query_valid_query(admin_api: FlaskClient) -> None: