From 72b52054b52b16cbd55699e30d9bfe9b7ca86d72 Mon Sep 17 00:00:00 2001 From: MeredithAnya Date: Thu, 11 Apr 2024 12:17:19 -0700 Subject: [PATCH] ref(ch-upgrades): create dist tables functionality (#5737) * ref(ch-upgrades): create dist tables functionality * extra not needed * err can be none * fix typing * fix help message, use ErrorCodes * split out parse table logic * add --old-cluster, --new-cluster * remove CLUSTERS * always upload schema * Update snuba/cli/query_fetcher.py Co-authored-by: Nikhar Saxena <84807402+nikhars@users.noreply.github.com> --------- Co-authored-by: Nikhar Saxena <84807402+nikhars@users.noreply.github.com> --- snuba/cli/query_fetcher.py | 27 ++++++++ snuba/cli/query_replayer.py | 87 ++++++++++++++++++++---- snuba/clickhouse/upgrades/comparisons.py | 23 ++++--- 3 files changed, 116 insertions(+), 21 deletions(-) diff --git a/snuba/cli/query_fetcher.py b/snuba/cli/query_fetcher.py index 84f3af2b52..3cd709aaca 100644 --- a/snuba/cli/query_fetcher.py +++ b/snuba/cli/query_fetcher.py @@ -9,6 +9,8 @@ FileFormat, FileManager, QueryInfoResult, + delete_local_file, + full_path, ) from snuba.clusters.cluster import ClickhouseClientSettings from snuba.environment import setup_logging, setup_sentry @@ -154,6 +156,28 @@ def get_table_names() -> Sequence[str]: else: table_names = [t for t in tables.split(",")] + def save_table_schema(table: str) -> None: + """ + Fetches the table schema from the same node we are + fetching the queries from. The schemas may be needed + when running the replayer for the first time. + + Only upload missing schemas, puts them in same bucket + under the following convention: + + schemas/table_name.sql + """ + filename = f"{table}.sql" + ((schema,),) = connection.execute( + f"SELECT create_table_query FROM system.tables WHERE name = '{table}'" + ).results + filepath = full_path(filename) + with open(filepath, "w", encoding="utf-8") as f: + f.write(schema) + logger.info(f"Uploading schema for {table}...") + uploader.upload_file(filepath, f"schemas/{filename}") + delete_local_file(filepath) + # rounded to the hour now = datetime.utcnow().replace( microsecond=0, @@ -164,6 +188,9 @@ def get_table_names() -> Sequence[str]: interval = timedelta(hours=1) for table in table_names: + # we'll use the table schema in the replayer, so + # save the create_table_query + save_table_schema(table) logger.info(f"Running fetcher for {table}...") start_time = window_hours_ago_ts files_saved = 0 diff --git a/snuba/cli/query_replayer.py b/snuba/cli/query_replayer.py index 9154bc5eb8..897b7151db 100644 --- a/snuba/cli/query_replayer.py +++ b/snuba/cli/query_replayer.py @@ -1,3 +1,4 @@ +import re import time from datetime import datetime from typing import Optional, Tuple @@ -5,9 +6,11 @@ import click import sentry_sdk import structlog +from clickhouse_driver.errors import ErrorCodes from snuba import settings from snuba.admin.notifications.slack.client import SlackClient +from snuba.clickhouse.errors import ClickhouseError from snuba.clickhouse.native import ClickhousePool from snuba.clickhouse.upgrades.comparisons import ( BlobGetter, @@ -15,6 +18,8 @@ FileManager, QueryInfoResult, QueryMeasurementResult, + delete_local_file, + full_path, ) from snuba.clusters.cluster import ClickhouseClientSettings from snuba.environment import setup_logging, setup_sentry @@ -56,6 +61,15 @@ def get_credentials(user: Optional[str], password: Optional[str]) -> Tuple[str, return (user or "default", password or "") +def parse_table_name(err: ClickhouseError) -> str: + match = re.search(r"Table\s+(\S+)\s+doesn\'t", err.message) + if not match: + raise Exception("Couldn't parse table name.") + # err message will have full database.table but we just want table + _, table = match.group(1).strip().split(".") + return table + + @click.command() @click.option( "--clickhouse-host", @@ -93,19 +107,19 @@ def get_credentials(user: Optional[str], password: Optional[str]) -> Tuple[str, ) @click.option( "--override", - help="Option to override any previously re-run results", + help="Option to override any previously re-run results.", is_flag=True, default=False, ) @click.option( "--notify", - help="Option to send saved csv file to slack", + help="Option to send saved csv file to slack.", is_flag=True, default=False, ) @click.option( "--gcs-bucket", - help="Name of gcs bucket to save query files to", + help="Name of gcs bucket to save query files to.", required=True, ) @click.option( @@ -115,6 +129,15 @@ def get_credentials(user: Optional[str], password: Optional[str]) -> Tuple[str, required=True, default=30, ) +@click.option( + "--old-cluster", + help="Cluster name you copied the schemas from, will be replaced by --new-cluster.", + default="snuba-test", +) +@click.option( + "--new-cluster", + help="Cluster name for the nodes you're re-running queries on, replaces --old-cluster.", +) @click.option("--log-level", help="Logging level to use.") def query_replayer( *, @@ -125,6 +148,8 @@ def query_replayer( notify: bool, gcs_bucket: str, wait_seconds: int, + old_cluster: Optional[str], + new_cluster: Optional[str], log_level: Optional[str] = None, clickhouse_user: Optional[str] = None, clickhouse_password: Optional[str] = None, @@ -170,27 +195,65 @@ def get_version() -> str: blob_getter.get_name_diffs(("queries/", f"{results_directory}/")) ) + def create_table(table: str) -> None: + """ + If we try to replay queries for tables that haven't + been created yet and fail, then we attempt to create + the table now. We download the schema from our gcs + bucket, replace the cluster name in the schema with + the one passed in the cli command (--cluster-name) + and execute the CREATE TABLE query. + """ + if not (old_cluster and new_cluster): + raise Exception( + "Must have --old-cluster and --new-cluster if creating tables!" + ) + filename = f"{table}.sql" + filepath = full_path(filename) + logger.info(f"Downloading schema for {table}...") + uploader.download_file(f"schemas/{filename}", filepath) + with open(filename, encoding="utf-8") as f: + schema = f.read() + schema = schema.replace(old_cluster, new_cluster) + logger.info(f"Creating table {table}...") + connection.execute(schema) + delete_local_file(filepath) + def rerun_queries_for_blob(blob: str) -> Tuple[int, int]: queries = file_manager.download(blob) reran_queries = 0 total_queries = len(queries) - logger.info(f"Re-running queries for {blob}") - for q in queries: - assert isinstance(q, QueryInfoResult) + + def _run_query(q: QueryInfoResult) -> Optional[ClickhouseError]: try: connection.execute( q.query_str, query_id=q.query_id, ) - reran_queries += 1 - except Exception as e: - logger.info( - f"Re-ran {reran_queries}/{total_queries} queries before failing on {q.query_id}" - ) + except Exception as err: # capturing the execption so that we can debug, # but not re-raising because we don't want one # blob to prevent others from being processed - sentry_sdk.capture_exception(e) + sentry_sdk.capture_exception(err) + if isinstance(err, ClickhouseError): + return err + else: + return None + return None + + logger.info(f"Re-running queries for {blob}") + for q in queries: + assert isinstance(q, QueryInfoResult) + err = _run_query(q) + if err and err.code == ErrorCodes.UNKNOWN_TABLE: + table = parse_table_name(err) + create_table(table) + # give it a second to create the table + time.sleep(1) + err = _run_query(q) + if not err: + reran_queries += 1 + logger.info(f"Re-ran {reran_queries}/{total_queries} queries") return (total_queries, reran_queries) diff --git a/snuba/clickhouse/upgrades/comparisons.py b/snuba/clickhouse/upgrades/comparisons.py index 5ab28c45c3..eae01234bf 100644 --- a/snuba/clickhouse/upgrades/comparisons.py +++ b/snuba/clickhouse/upgrades/comparisons.py @@ -1,5 +1,6 @@ import csv import dataclasses +import os from dataclasses import dataclass from datetime import datetime from typing import Dict, List, NamedTuple, Sequence, Tuple, Type, Union @@ -100,6 +101,15 @@ def type_for_directory(directory: str) -> Type[Results]: return DIRECTORY_RESULT_TYPES[directory] +def full_path(filename: str) -> str: + return f"/tmp/{filename}" + + +def delete_local_file(full_path: str) -> None: + if os.path.isfile(full_path): + os.remove(full_path) + + class FileManager: def __init__(self, uploader: GCSUploader) -> None: self.uploader = uploader @@ -122,14 +132,11 @@ def _format_blob_name(self, file_format: FileFormat) -> str: day = datetime.strftime(date, "%Y_%m_%d") return f"{directory}/{day}/{table}_{hour}.csv" - def _full_path(self, filename: str) -> str: - return f"/tmp/{filename}" - def _save_to_csv( self, filename: str, results: Sequence[Results], header_row: bool = False ) -> None: result_type = self._result_type(filename) - with open(self._full_path(filename), mode="w") as file: + with open(full_path(filename), mode="w") as file: writer = csv.writer(file) if header_row: fields = [f.name for f in dataclasses.fields(result_type)] # mypy ig @@ -137,12 +144,10 @@ def _save_to_csv( for row in results: writer.writerow(dataclasses.astuple(row)) - logger.debug(f"File {self._full_path(filename)} saved") - def _download_from_csv(self, filename: str) -> Sequence[Results]: result_type = self._result_type(filename) results: List[Results] = [] - with open(self._full_path(filename), mode="r") as csvfile: + with open(full_path(filename), mode="r") as csvfile: reader = csv.reader(csvfile) for row in reader: result: Results @@ -161,10 +166,10 @@ def _download_from_csv(self, filename: str) -> Sequence[Results]: return results def _save_to_gcs(self, filename: str, blob_name: str) -> None: - self.uploader.upload_file(self._full_path(filename), blob_name) + self.uploader.upload_file(full_path(filename), blob_name) def _download_from_gcs(self, blob_name: str, filename: str) -> None: - self.uploader.download_file(blob_name, self._full_path(filename)) + self.uploader.download_file(blob_name, full_path(filename)) def filename_from_blob_name(self, blob_name: str) -> str: return blob_name.replace("/", "_")