Skip to content

Commit

Permalink
feat(admin): create an api to overwrite a snuba migration state (#6238)
Browse files Browse the repository at this point in the history
Yes this is kind of a hack but asking OPS to run these commands every
time we mess up a migration is driving me and @phacops insane
  • Loading branch information
volokluev authored Aug 29, 2024
1 parent 50f1a67 commit afde219
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 3 deletions.
1 change: 1 addition & 0 deletions snuba/admin/audit_log/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AuditLogAction(Enum):
REVERSED_MIGRATION_COMPLETED = "reversed.migration.completed"
RAN_MIGRATION_FAILED = "ran.migration.failed"
REVERSED_MIGRATION_FAILED = "reversed.migration.failed"
FORCE_MIGRATION_OVERWRITE = "force.migration.overwrite"
ALLOCATION_POLICY_UPDATE = "allocation_policy.update"
ALLOCATION_POLICY_DELETE = "allocation_policy.delete"
DLQ_REPLAY = "dlq.replay"
Expand Down
29 changes: 29 additions & 0 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
from snuba.migrations.errors import InactiveClickhouseReplica, MigrationError
from snuba.migrations.groups import MigrationGroup, get_group_readiness_state
from snuba.migrations.runner import MigrationKey, Runner
from snuba.migrations.status import Status
from snuba.query.exceptions import InvalidQueryException
from snuba.query.query_settings import HTTPQuerySettings
from snuba.replacers.replacements_and_expiry import (
Expand Down Expand Up @@ -233,6 +234,34 @@ def reverse_migration(group: str, migration_id: str) -> Response:
)


@application.route(
"/migrations/<group>/overwrite/<migration_id>/status/<new_status>",
methods=["POST"],
)
@check_tool_perms(tools=[AdminTools.MIGRATIONS])
def force_overwrite_migration_status(
group: str, migration_id: str, new_status: str
) -> Response:
try:
migration_group = MigrationGroup(group)
except ValueError as err:
logger.error(err, exc_info=True)
return make_response(jsonify({"error": "Group not found"}), 400)

runner.force_overwrite_status(migration_group, migration_id, Status(new_status))
user = request.headers.get(USER_HEADER_KEY)

audit_log.record(
user or "",
AuditLogAction.FORCE_MIGRATION_OVERWRITE,
{"group": group, "migration": migration_id, "new_status": new_status},
notify=True,
)

res = {"status": "OK"}
return make_response(jsonify(res), 200)


@check_migration_perms
def run_or_reverse_migration(group: str, action: str, migration_id: str) -> Response:
try:
Expand Down
23 changes: 20 additions & 3 deletions snuba/migrations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ class MigrationDetails(NamedTuple):

class Runner:
def __init__(self) -> None:
migrations_cluster = get_cluster(StorageSetKey.MIGRATIONS)
self.__migrations_cluster = get_cluster(StorageSetKey.MIGRATIONS)
self.__table_name = (
LOCAL_TABLE_NAME if migrations_cluster.is_single_node() else DIST_TABLE_NAME
LOCAL_TABLE_NAME
if self.__migrations_cluster.is_single_node()
else DIST_TABLE_NAME
)

self.__connection = migrations_cluster.get_query_connection(
self.__connection = self.__migrations_cluster.get_query_connection(
ClickhouseClientSettings.MIGRATE
)

Expand Down Expand Up @@ -115,6 +117,21 @@ def get_status(

return Status.NOT_STARTED, None

def force_overwrite_status(
self, group: MigrationGroup, migration_id: str, new_status: Status
) -> None:
"""Sometimes a migration gets blocked or times out for whatever reason.
This function is used to overwrite the state in the snuba table keeping
track of migration so we can try again"""
local_node = self.__migrations_cluster.get_local_nodes()[0]
local_node_connection = self.__migrations_cluster.get_node_connection(
ClickhouseClientSettings.MIGRATE, local_node
)

local_node_connection.execute(
f"ALTER TABLE {LOCAL_TABLE_NAME} UPDATE status='{new_status.value}' WHERE migration_id='{migration_id}'"
)

def show_all(
self, groups: Optional[Sequence[str]] = None
) -> List[Tuple[MigrationGroup, List[MigrationDetails]]]:
Expand Down
22 changes: 22 additions & 0 deletions tests/admin/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,28 @@ def test_prod_snql_query_invalid_query(admin_api: FlaskClient) -> None:
)


@pytest.mark.redis_db
@pytest.mark.clickhouse_db
def test_force_overwrite(admin_api: FlaskClient) -> None:
migration_id = "0009_add_message"
migrations = json.loads(admin_api.get("/migrations/search_issues/list").data)
downgraded_migration = [
m for m in migrations if m.get("migration_id") == migration_id
][0]
assert downgraded_migration["status"] == "completed"

response = admin_api.post(
f"/migrations/search_issues/overwrite/{migration_id}/status/not_started",
headers={"Referer": "https://snuba-admin.getsentry.net/"},
)
assert response.status_code == 200
migrations = json.loads(admin_api.get("/migrations/search_issues/list").data)
downgraded_migration = [
m for m in migrations if m.get("migration_id") == migration_id
][0]
assert downgraded_migration["status"] == "not_started"


@pytest.mark.redis_db
@pytest.mark.clickhouse_db
def test_prod_snql_query_valid_query(admin_api: FlaskClient) -> None:
Expand Down

0 comments on commit afde219

Please sign in to comment.