diff --git a/snuba/admin/views.py b/snuba/admin/views.py index f6a75b2ed4..b891c04baa 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -62,11 +62,11 @@ get_dataset, get_enabled_dataset_names, ) -from snuba.datasets.storages.factory import get_storage +from snuba.datasets.storages.factory import get_all_storage_keys, get_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.migrations.connect import check_for_inactive_replicas from snuba.migrations.errors import InactiveClickhouseReplica, MigrationError -from snuba.migrations.groups import MigrationGroup +from snuba.migrations.groups import MigrationGroup, get_group_readiness_state from snuba.migrations.runner import MigrationKey, Runner from snuba.query.exceptions import InvalidQueryException from snuba.state.explain_meta import explain_cleanup, get_explain_meta @@ -221,7 +221,7 @@ def run_or_reverse_migration(group: str, action: str, migration_id: str) -> Resp except ValueError as err: logger.error(err, exc_info=True) return make_response(jsonify({"error": "Group not found"}), 400) - + readiness_state = get_group_readiness_state(migration_group) migration_key = MigrationKey(migration_group, migration_id) def str_to_bool(s: str) -> bool: @@ -242,7 +242,9 @@ def do_action() -> None: else AuditLogAction.REVERSED_MIGRATION_STARTED, {"migration": str(migration_key), "force": force, "fake": fake}, ) - check_for_inactive_replicas() + check_for_inactive_replicas( + get_all_storage_keys(readiness_states=[readiness_state]) + ) if action == "run": runner.run_migration(migration_key, force=force, fake=fake, dry_run=dry_run) diff --git a/snuba/cli/bootstrap.py b/snuba/cli/bootstrap.py index 150191fdae..88786047c0 100644 --- a/snuba/cli/bootstrap.py +++ b/snuba/cli/bootstrap.py @@ -5,6 +5,8 @@ import click from confluent_kafka import KafkaException +from snuba.clusters.cluster import CLUSTERS +from snuba.datasets.storages.factory import get_all_storage_keys from snuba.environment import setup_logging from snuba.migrations.connect import ( check_clickhouse_connections, @@ -87,6 +89,6 @@ def bootstrap( create_topics(client, [t for t in Topic]) if migrate: - check_clickhouse_connections() - check_for_inactive_replicas() + check_clickhouse_connections(CLUSTERS) + check_for_inactive_replicas(get_all_storage_keys()) Runner().run_all(force=True) diff --git a/snuba/cli/migrations.py b/snuba/cli/migrations.py index 1a55ec26a0..2d3f59a388 100644 --- a/snuba/cli/migrations.py +++ b/snuba/cli/migrations.py @@ -6,10 +6,13 @@ from snuba.clusters.cluster import CLUSTERS, ClickhouseNodeType from snuba.clusters.storage_sets import StorageSetKey from snuba.datasets.readiness_state import ReadinessState +from snuba.datasets.storages.factory import get_all_storage_keys from snuba.environment import setup_logging from snuba.migrations.connect import ( check_clickhouse_connections, check_for_inactive_replicas, + get_clickhouse_clusters_for_migration_group, + get_clusters_for_readiness_states, ) from snuba.migrations.errors import MigrationError from snuba.migrations.groups import MigrationGroup, get_group_readiness_state @@ -30,7 +33,7 @@ def list() -> None: Lists migrations and their statuses """ setup_logging() - check_clickhouse_connections() + check_clickhouse_connections(CLUSTERS) runner = Runner() for group, group_migrations in runner.show_all(): readiness_state = get_group_readiness_state(group) @@ -84,9 +87,21 @@ def migrate( Blocking migrations will not be run unless --force is passed. """ + + readiness_states = ( + [ReadinessState(state) for state in readiness_state] + if readiness_state + else None + ) + setup_logging(log_level) - check_clickhouse_connections() - check_for_inactive_replicas() + clusters_to_check = ( + get_clusters_for_readiness_states(readiness_states, CLUSTERS) + if readiness_states + else CLUSTERS + ) + check_clickhouse_connections(clusters_to_check) + check_for_inactive_replicas(get_all_storage_keys(readiness_states=readiness_states)) runner = Runner() try: @@ -101,11 +116,7 @@ def migrate( force=force, fake=fake, group=migration_group, - readiness_states=( - [ReadinessState(state) for state in readiness_state] - if readiness_state - else None - ), + readiness_states=readiness_states, check_dangerous=check_dangerous, ) except MigrationError as e: @@ -143,12 +154,18 @@ def run( Migrations that are already in an in-progress or completed status will not be run. """ setup_logging(log_level) + migration_group = MigrationGroup(group) + readiness_state = get_group_readiness_state(migration_group) if not dry_run: - check_clickhouse_connections() - check_for_inactive_replicas() + # just check the connection for the migration that's being run + check_clickhouse_connections( + get_clickhouse_clusters_for_migration_group(migration_group) + ) + check_for_inactive_replicas( + get_all_storage_keys(readiness_states=[readiness_state]) + ) runner = Runner() - migration_group = MigrationGroup(group) migration_key = MigrationKey(migration_group, migration_id) if dry_run: @@ -197,12 +214,17 @@ def reverse( --force is required to reverse an already completed migration. --fake marks a migration as reversed without doing anything. """ + migration_group = MigrationGroup(group) + readiness_state = get_group_readiness_state(migration_group) setup_logging(log_level) if not dry_run: - check_clickhouse_connections() - check_for_inactive_replicas() + check_clickhouse_connections( + get_clickhouse_clusters_for_migration_group(migration_group) + ) + check_for_inactive_replicas( + get_all_storage_keys(readiness_states=[readiness_state]) + ) runner = Runner() - migration_group = MigrationGroup(group) migration_key = MigrationKey(migration_group, migration_id) if dry_run: @@ -245,13 +267,24 @@ def reverse_in_progress( --fake marks migrations as reversed without doing anything. """ setup_logging(log_level) + migration_group = MigrationGroup(group) if group else None + readiness_state = ( + get_group_readiness_state(migration_group) if migration_group else None + ) if not dry_run: - check_clickhouse_connections() - check_for_inactive_replicas() + clusters_to_check = ( + CLUSTERS + if not migration_group + else get_clickhouse_clusters_for_migration_group(migration_group) + ) + check_clickhouse_connections(clusters_to_check) + check_for_inactive_replicas( + get_all_storage_keys( + readiness_states=[readiness_state] if readiness_state else None + ) + ) runner = Runner() - migration_group = MigrationGroup(group) if group else None - if dry_run: runner.reverse_in_progress(group=migration_group, dry_run=True) return diff --git a/snuba/datasets/storages/factory.py b/snuba/datasets/storages/factory.py index bcb89e2750..dc79b02715 100644 --- a/snuba/datasets/storages/factory.py +++ b/snuba/datasets/storages/factory.py @@ -8,6 +8,7 @@ from snuba import settings from snuba.datasets.cdc.cdcstorage import CdcStorage from snuba.datasets.configuration.storage_builder import build_storage_from_config +from snuba.datasets.readiness_state import ReadinessState from snuba.datasets.storage import ReadableTableStorage, Storage, WritableTableStorage from snuba.datasets.storages.storage_key import StorageKey from snuba.datasets.storages.validator import StorageValidator @@ -109,8 +110,17 @@ def get_cdc_storage_keys() -> list[StorageKey]: return _storage_factory().get_cdc_storage_keys() -def get_all_storage_keys() -> list[StorageKey]: - return _storage_factory().get_all_storage_keys() +def get_all_storage_keys( + readiness_states: list[ReadinessState] | None = None, +) -> list[StorageKey]: + all_storage_keys = _storage_factory().get_all_storage_keys() + if readiness_states is None: + return all_storage_keys + return [ + s + for s in all_storage_keys + if get_storage(s).get_readiness_state() in readiness_states + ] def get_config_built_storages() -> dict[StorageKey, Storage]: diff --git a/snuba/migrations/connect.py b/snuba/migrations/connect.py index 51c0c04117..d9598b1ff2 100644 --- a/snuba/migrations/connect.py +++ b/snuba/migrations/connect.py @@ -1,19 +1,20 @@ import re import time -from typing import Sequence, Tuple +from typing import List, Sequence, Tuple import structlog from packaging import version from snuba.clickhouse.native import ClickhousePool from snuba.clusters.cluster import ( - CLUSTERS, ClickhouseClientSettings, ClickhouseCluster, ClickhouseNode, UndefinedClickhouseCluster, + get_cluster, ) from snuba.clusters.storage_sets import DEV_STORAGE_SETS +from snuba.datasets.readiness_state import ReadinessState from snuba.datasets.storages.factory import get_all_storage_keys, get_storage from snuba.datasets.storages.storage_key import StorageKey from snuba.migrations.clickhouse import ( @@ -21,14 +22,55 @@ CLICKHOUSE_SERVER_MIN_VERSION, ) from snuba.migrations.errors import InactiveClickhouseReplica, InvalidClickhouseVersion +from snuba.migrations.groups import ( + MigrationGroup, + get_group_readiness_state_from_storage_set, + get_storage_set_keys, +) from snuba.settings import ENABLE_DEV_FEATURES from snuba.utils.types import ColumnStatesMapType logger = structlog.get_logger().bind(module=__name__) +def get_clickhouse_clusters_for_migration_group( + migration_group: MigrationGroup, +) -> List[ClickhouseCluster]: + storage_set_keys = get_storage_set_keys(migration_group) + return list({get_cluster(storage_set_key) for storage_set_key in storage_set_keys}) + + +def get_clusters_for_readiness_states( + readiness_states: Sequence[ReadinessState], clusters: Sequence[ClickhouseCluster] +) -> Sequence[ClickhouseCluster]: + """Given a set of clusters, return just the ones that serve storage_sets corresponding to the provided readiness states + Storage sets do not have readiness states but the migration groups those storage sets are related to do + + E.g. + + # Get all the configured clusters which have partial or complete readiness states located on them + from snuba.clusters.cluster import CLUSTERS + + get_clusters_for_readiness_states( + [ReadinessState.PARTIAL, readiness_state.COMPLETE], + CLUSTERS + ) + + """ + res = [] + for cluster in clusters: + storage_sets = cluster.get_storage_set_keys() + cluster_readiness_states = { + get_group_readiness_state_from_storage_set(storage_set_key) + for storage_set_key in storage_sets + } + if set(readiness_states).intersection(cluster_readiness_states): + res.append(cluster) + return res + + def check_clickhouse_connections( - clusters: Sequence[ClickhouseCluster] = CLUSTERS, + clusters: Sequence[ClickhouseCluster], ) -> None: """ Ensure that we can establish a connection with every cluster. @@ -115,13 +157,10 @@ def _get_all_nodes_for_storage( return (local_nodes, distributed_nodes, query_node) -def check_for_inactive_replicas() -> None: +def check_for_inactive_replicas(storage_keys: List[StorageKey]) -> None: """ Checks for inactive replicas and raise InactiveClickhouseReplica if any are found. """ - - storage_keys = _get_all_storage_keys() - checked_nodes = set() inactive_replica_info = [] for storage_key in storage_keys: diff --git a/snuba/migrations/groups.py b/snuba/migrations/groups.py index b46a1429ff..c9b6142c8a 100644 --- a/snuba/migrations/groups.py +++ b/snuba/migrations/groups.py @@ -198,6 +198,10 @@ def get_group_loader(group: MigrationGroup) -> GroupLoader: return _REGISTERED_MIGRATION_GROUPS[group].loader +def get_storage_set_keys(group: MigrationGroup) -> Set[StorageSetKey]: + return _REGISTERED_MIGRATION_GROUPS[group].storage_set_keys + + def get_group_readiness_state_from_storage_set( storage_set_key: StorageSetKey, ) -> ReadinessState: diff --git a/tests/migrations/test_connect.py b/tests/migrations/test_connect.py new file mode 100644 index 0000000000..20d039ffcf --- /dev/null +++ b/tests/migrations/test_connect.py @@ -0,0 +1,129 @@ +from __future__ import annotations + +from functools import reduce +from typing import Any + +import pytest + +from snuba.clusters import cluster +from snuba.clusters.storage_sets import StorageSetKey +from snuba.datasets.readiness_state import ReadinessState +from snuba.migrations.connect import ( + get_clickhouse_clusters_for_migration_group, + get_clusters_for_readiness_states, +) +from snuba.migrations.groups import MigrationGroup + +_ALL_STORAGE_SET_KEYS = set([s.value for s in StorageSetKey]) +_REMAINING_STORAGE_SET_KEYS = _ALL_STORAGE_SET_KEYS - {"events", "querylog"} + +_QUERYLOG_CLUSTER = cluster.ClickhouseCluster( + host="host_1", + port=1, + user="", + password="", + database="default", + http_port=420, + storage_sets={ + "querylog", + }, + single_node=True, +) + +_EVENTS_CLUSTER = cluster.ClickhouseCluster( + host="host_2", + port=2, + user="", + password="", + database="default", + http_port=420, + storage_sets={ + "events", + }, + single_node=True, +) + +_REST_CLUSTER = cluster.ClickhouseCluster( + host="host_3", + port=3, + user="", + password="", + database="default", + http_port=420, + storage_sets=_REMAINING_STORAGE_SET_KEYS, + single_node=True, +) + + +TEST_CLUSTERS = [ + _QUERYLOG_CLUSTER, + _EVENTS_CLUSTER, + _REST_CLUSTER, +] + + +@pytest.fixture +def override_cluster( + monkeypatch: pytest.MonkeyPatch, +) -> Any: + with monkeypatch.context() as m: + m.setattr(cluster, "CLUSTERS", TEST_CLUSTERS) + m.setattr( + cluster, + "_STORAGE_SET_CLUSTER_MAP", + { + StorageSetKey.QUERYLOG: _QUERYLOG_CLUSTER, + StorageSetKey.EVENTS: _EVENTS_CLUSTER, + **{ + StorageSetKey(s): _REST_CLUSTER for s in _REMAINING_STORAGE_SET_KEYS + }, + }, + ) + yield + + +def test_get_clickhouse_clusters_for_migration_group(override_cluster: Any) -> None: + clusters = get_clickhouse_clusters_for_migration_group(MigrationGroup.QUERYLOG) + assert len(clusters) == 1 + assert clusters[0] == _QUERYLOG_CLUSTER + + +@pytest.mark.parametrize( + ["readiness_states", "clusters", "expected_clusters", "expected_storage_set_keys"], + [ + pytest.param( + [ReadinessState.PARTIAL], + [_QUERYLOG_CLUSTER, _EVENTS_CLUSTER], + [_QUERYLOG_CLUSTER], + set([StorageSetKey.QUERYLOG]), + id="partial only", + ), + pytest.param( + [ReadinessState.COMPLETE], + [_QUERYLOG_CLUSTER, _EVENTS_CLUSTER], + [_EVENTS_CLUSTER], + set([StorageSetKey.EVENTS]), + id="complete only", + ), + pytest.param( + [ReadinessState.COMPLETE, ReadinessState.PARTIAL], + [_QUERYLOG_CLUSTER, _EVENTS_CLUSTER], + [_QUERYLOG_CLUSTER, _EVENTS_CLUSTER], + set([StorageSetKey.EVENTS, StorageSetKey.QUERYLOG]), + id="complete and partial", + ), + ], +) +def test_get_clusters_for_readiness_states( + readiness_states: list[ReadinessState], + clusters: list[cluster.ClickhouseCluster], + expected_clusters: list[cluster.ClickhouseCluster], + expected_storage_set_keys: set[ReadinessState], +) -> None: + + result_clusters = get_clusters_for_readiness_states(readiness_states, clusters) + assert result_clusters == expected_clusters + assert ( + reduce(set.union, [rc.get_storage_set_keys() for rc in result_clusters]) + == expected_storage_set_keys + ) diff --git a/tests/migrations/test_runner.py b/tests/migrations/test_runner.py index bf099a7ed7..f54d7675f8 100644 --- a/tests/migrations/test_runner.py +++ b/tests/migrations/test_runner.py @@ -499,7 +499,7 @@ def test_check_inactive_replica() -> None: mock_conn.execute.return_value = inactive_replica_query_result with pytest.raises(InactiveClickhouseReplica) as exc: - check_for_inactive_replicas() + check_for_inactive_replicas([storage_key]) assert exc.value.args[0] == ( f"Storage {storage_key.value} has inactive replicas for table bad_table_1 "