Skip to content

Commit

Permalink
fix(inc-602): make migration connection checks readiness-state aware (#…
Browse files Browse the repository at this point in the history
…5414)

In INC-602, we ran into an issue where we had to remove a bunch of connection checks because one cluster was down and that ended up blocking deploys. This happened because the functions 

```python
check_clickhouse_connections()
check_for_inactive_replicas()
```

both checked ALL clusters, regardless of readiness state. Even though we downgraded the migrationgroups and the storage, the checks would still fail. 

### What this PR does

`check_clickhouse_connections` now has a mandatory `clusters` argument to specify which clusters to check. Before it simply assumed all clusters.

`check_for_inactive_replicas` now has a mandatory `storage_keys` argument to specify which storages to check. Before it simply assumed all storages.

In the CLI and admin commands, the appropriate mandatory arguments are filtered by readiness states.
  • Loading branch information
volokluev committed Jan 26, 2024
1 parent 27aec85 commit 56a37e2
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 34 deletions.
10 changes: 6 additions & 4 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@
get_dataset,
get_enabled_dataset_names,
)
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.factory import get_all_storage_keys, get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.migrations.connect import check_for_inactive_replicas
from snuba.migrations.errors import InactiveClickhouseReplica, MigrationError
from snuba.migrations.groups import MigrationGroup
from snuba.migrations.groups import MigrationGroup, get_group_readiness_state
from snuba.migrations.runner import MigrationKey, Runner
from snuba.query.exceptions import InvalidQueryException
from snuba.state.explain_meta import explain_cleanup, get_explain_meta
Expand Down Expand Up @@ -221,7 +221,7 @@ def run_or_reverse_migration(group: str, action: str, migration_id: str) -> Resp
except ValueError as err:
logger.error(err, exc_info=True)
return make_response(jsonify({"error": "Group not found"}), 400)

readiness_state = get_group_readiness_state(migration_group)
migration_key = MigrationKey(migration_group, migration_id)

def str_to_bool(s: str) -> bool:
Expand All @@ -242,7 +242,9 @@ def do_action() -> None:
else AuditLogAction.REVERSED_MIGRATION_STARTED,
{"migration": str(migration_key), "force": force, "fake": fake},
)
check_for_inactive_replicas()
check_for_inactive_replicas(
get_all_storage_keys(readiness_states=[readiness_state])
)

if action == "run":
runner.run_migration(migration_key, force=force, fake=fake, dry_run=dry_run)
Expand Down
6 changes: 4 additions & 2 deletions snuba/cli/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import click
from confluent_kafka import KafkaException

from snuba.clusters.cluster import CLUSTERS
from snuba.datasets.storages.factory import get_all_storage_keys
from snuba.environment import setup_logging
from snuba.migrations.connect import (
check_clickhouse_connections,
Expand Down Expand Up @@ -87,6 +89,6 @@ def bootstrap(
create_topics(client, [t for t in Topic])

if migrate:
check_clickhouse_connections()
check_for_inactive_replicas()
check_clickhouse_connections(CLUSTERS)
check_for_inactive_replicas(get_all_storage_keys())
Runner().run_all(force=True)
69 changes: 51 additions & 18 deletions snuba/cli/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
from snuba.clusters.cluster import CLUSTERS, ClickhouseNodeType
from snuba.clusters.storage_sets import StorageSetKey
from snuba.datasets.readiness_state import ReadinessState
from snuba.datasets.storages.factory import get_all_storage_keys
from snuba.environment import setup_logging
from snuba.migrations.connect import (
check_clickhouse_connections,
check_for_inactive_replicas,
get_clickhouse_clusters_for_migration_group,
get_clusters_for_readiness_states,
)
from snuba.migrations.errors import MigrationError
from snuba.migrations.groups import MigrationGroup, get_group_readiness_state
Expand All @@ -30,7 +33,7 @@ def list() -> None:
Lists migrations and their statuses
"""
setup_logging()
check_clickhouse_connections()
check_clickhouse_connections(CLUSTERS)
runner = Runner()
for group, group_migrations in runner.show_all():
readiness_state = get_group_readiness_state(group)
Expand Down Expand Up @@ -84,9 +87,21 @@ def migrate(
Blocking migrations will not be run unless --force is passed.
"""

readiness_states = (
[ReadinessState(state) for state in readiness_state]
if readiness_state
else None
)

setup_logging(log_level)
check_clickhouse_connections()
check_for_inactive_replicas()
clusters_to_check = (
get_clusters_for_readiness_states(readiness_states, CLUSTERS)
if readiness_states
else CLUSTERS
)
check_clickhouse_connections(clusters_to_check)
check_for_inactive_replicas(get_all_storage_keys(readiness_states=readiness_states))
runner = Runner()

try:
Expand All @@ -101,11 +116,7 @@ def migrate(
force=force,
fake=fake,
group=migration_group,
readiness_states=(
[ReadinessState(state) for state in readiness_state]
if readiness_state
else None
),
readiness_states=readiness_states,
check_dangerous=check_dangerous,
)
except MigrationError as e:
Expand Down Expand Up @@ -143,12 +154,18 @@ def run(
Migrations that are already in an in-progress or completed status will not be run.
"""
setup_logging(log_level)
migration_group = MigrationGroup(group)
readiness_state = get_group_readiness_state(migration_group)
if not dry_run:
check_clickhouse_connections()
check_for_inactive_replicas()
# just check the connection for the migration that's being run
check_clickhouse_connections(
get_clickhouse_clusters_for_migration_group(migration_group)
)
check_for_inactive_replicas(
get_all_storage_keys(readiness_states=[readiness_state])
)

runner = Runner()
migration_group = MigrationGroup(group)
migration_key = MigrationKey(migration_group, migration_id)

if dry_run:
Expand Down Expand Up @@ -197,12 +214,17 @@ def reverse(
--force is required to reverse an already completed migration.
--fake marks a migration as reversed without doing anything.
"""
migration_group = MigrationGroup(group)
readiness_state = get_group_readiness_state(migration_group)
setup_logging(log_level)
if not dry_run:
check_clickhouse_connections()
check_for_inactive_replicas()
check_clickhouse_connections(
get_clickhouse_clusters_for_migration_group(migration_group)
)
check_for_inactive_replicas(
get_all_storage_keys(readiness_states=[readiness_state])
)
runner = Runner()
migration_group = MigrationGroup(group)
migration_key = MigrationKey(migration_group, migration_id)

if dry_run:
Expand Down Expand Up @@ -245,13 +267,24 @@ def reverse_in_progress(
--fake marks migrations as reversed without doing anything.
"""
setup_logging(log_level)
migration_group = MigrationGroup(group) if group else None
readiness_state = (
get_group_readiness_state(migration_group) if migration_group else None
)
if not dry_run:
check_clickhouse_connections()
check_for_inactive_replicas()
clusters_to_check = (
CLUSTERS
if not migration_group
else get_clickhouse_clusters_for_migration_group(migration_group)
)
check_clickhouse_connections(clusters_to_check)
check_for_inactive_replicas(
get_all_storage_keys(
readiness_states=[readiness_state] if readiness_state else None
)
)
runner = Runner()

migration_group = MigrationGroup(group) if group else None

if dry_run:
runner.reverse_in_progress(group=migration_group, dry_run=True)
return
Expand Down
14 changes: 12 additions & 2 deletions snuba/datasets/storages/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from snuba import settings
from snuba.datasets.cdc.cdcstorage import CdcStorage
from snuba.datasets.configuration.storage_builder import build_storage_from_config
from snuba.datasets.readiness_state import ReadinessState
from snuba.datasets.storage import ReadableTableStorage, Storage, WritableTableStorage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.datasets.storages.validator import StorageValidator
Expand Down Expand Up @@ -109,8 +110,17 @@ def get_cdc_storage_keys() -> list[StorageKey]:
return _storage_factory().get_cdc_storage_keys()


def get_all_storage_keys() -> list[StorageKey]:
return _storage_factory().get_all_storage_keys()
def get_all_storage_keys(
readiness_states: list[ReadinessState] | None = None,
) -> list[StorageKey]:
all_storage_keys = _storage_factory().get_all_storage_keys()
if readiness_states is None:
return all_storage_keys
return [
s
for s in all_storage_keys
if get_storage(s).get_readiness_state() in readiness_states
]


def get_config_built_storages() -> dict[StorageKey, Storage]:
Expand Down
53 changes: 46 additions & 7 deletions snuba/migrations/connect.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,76 @@
import re
import time
from typing import Sequence, Tuple
from typing import List, Sequence, Tuple

import structlog
from packaging import version

from snuba.clickhouse.native import ClickhousePool
from snuba.clusters.cluster import (
CLUSTERS,
ClickhouseClientSettings,
ClickhouseCluster,
ClickhouseNode,
UndefinedClickhouseCluster,
get_cluster,
)
from snuba.clusters.storage_sets import DEV_STORAGE_SETS
from snuba.datasets.readiness_state import ReadinessState
from snuba.datasets.storages.factory import get_all_storage_keys, get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.migrations.clickhouse import (
CLICKHOUSE_SERVER_MAX_VERSION,
CLICKHOUSE_SERVER_MIN_VERSION,
)
from snuba.migrations.errors import InactiveClickhouseReplica, InvalidClickhouseVersion
from snuba.migrations.groups import (
MigrationGroup,
get_group_readiness_state_from_storage_set,
get_storage_set_keys,
)
from snuba.settings import ENABLE_DEV_FEATURES
from snuba.utils.types import ColumnStatesMapType

logger = structlog.get_logger().bind(module=__name__)


def get_clickhouse_clusters_for_migration_group(
migration_group: MigrationGroup,
) -> List[ClickhouseCluster]:
storage_set_keys = get_storage_set_keys(migration_group)
return list({get_cluster(storage_set_key) for storage_set_key in storage_set_keys})


def get_clusters_for_readiness_states(
readiness_states: Sequence[ReadinessState], clusters: Sequence[ClickhouseCluster]
) -> Sequence[ClickhouseCluster]:
"""Given a set of clusters, return just the ones that serve storage_sets corresponding to the provided readiness states
Storage sets do not have readiness states but the migration groups those storage sets are related to do
E.g.
# Get all the configured clusters which have partial or complete readiness states located on them
from snuba.clusters.cluster import CLUSTERS
get_clusters_for_readiness_states(
[ReadinessState.PARTIAL, readiness_state.COMPLETE],
CLUSTERS
)
"""
res = []
for cluster in clusters:
storage_sets = cluster.get_storage_set_keys()
cluster_readiness_states = {
get_group_readiness_state_from_storage_set(storage_set_key)
for storage_set_key in storage_sets
}
if set(readiness_states).intersection(cluster_readiness_states):
res.append(cluster)
return res


def check_clickhouse_connections(
clusters: Sequence[ClickhouseCluster] = CLUSTERS,
clusters: Sequence[ClickhouseCluster],
) -> None:
"""
Ensure that we can establish a connection with every cluster.
Expand Down Expand Up @@ -115,13 +157,10 @@ def _get_all_nodes_for_storage(
return (local_nodes, distributed_nodes, query_node)


def check_for_inactive_replicas() -> None:
def check_for_inactive_replicas(storage_keys: List[StorageKey]) -> None:
"""
Checks for inactive replicas and raise InactiveClickhouseReplica if any are found.
"""

storage_keys = _get_all_storage_keys()

checked_nodes = set()
inactive_replica_info = []
for storage_key in storage_keys:
Expand Down
4 changes: 4 additions & 0 deletions snuba/migrations/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,10 @@ def get_group_loader(group: MigrationGroup) -> GroupLoader:
return _REGISTERED_MIGRATION_GROUPS[group].loader


def get_storage_set_keys(group: MigrationGroup) -> Set[StorageSetKey]:
return _REGISTERED_MIGRATION_GROUPS[group].storage_set_keys


def get_group_readiness_state_from_storage_set(
storage_set_key: StorageSetKey,
) -> ReadinessState:
Expand Down
Loading

0 comments on commit 56a37e2

Please sign in to comment.