Skip to content

Commit

Permalink
ref(ch-upgrades): create dist tables functionality (#5737)
Browse files Browse the repository at this point in the history
* ref(ch-upgrades): create dist tables functionality

* extra not needed

* err can be none

* fix typing

* fix help message, use ErrorCodes

* split out parse table logic

* add --old-cluster, --new-cluster

* remove CLUSTERS

* always upload schema

* Update snuba/cli/query_fetcher.py

Co-authored-by: Nikhar Saxena <84807402+nikhars@users.noreply.github.com>

---------

Co-authored-by: Nikhar Saxena <84807402+nikhars@users.noreply.github.com>
  • Loading branch information
MeredithAnya and nikhars committed Apr 11, 2024
1 parent bf29fb5 commit 72b5205
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 21 deletions.
27 changes: 27 additions & 0 deletions snuba/cli/query_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
FileFormat,
FileManager,
QueryInfoResult,
delete_local_file,
full_path,
)
from snuba.clusters.cluster import ClickhouseClientSettings
from snuba.environment import setup_logging, setup_sentry
Expand Down Expand Up @@ -154,6 +156,28 @@ def get_table_names() -> Sequence[str]:
else:
table_names = [t for t in tables.split(",")]

def save_table_schema(table: str) -> None:
"""
Fetches the table schema from the same node we are
fetching the queries from. The schemas may be needed
when running the replayer for the first time.
Only upload missing schemas, puts them in same bucket
under the following convention:
schemas/table_name.sql
"""
filename = f"{table}.sql"
((schema,),) = connection.execute(
f"SELECT create_table_query FROM system.tables WHERE name = '{table}'"
).results
filepath = full_path(filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(schema)
logger.info(f"Uploading schema for {table}...")
uploader.upload_file(filepath, f"schemas/{filename}")
delete_local_file(filepath)

# rounded to the hour
now = datetime.utcnow().replace(
microsecond=0,
Expand All @@ -164,6 +188,9 @@ def get_table_names() -> Sequence[str]:
interval = timedelta(hours=1)

for table in table_names:
# we'll use the table schema in the replayer, so
# save the create_table_query
save_table_schema(table)
logger.info(f"Running fetcher for {table}...")
start_time = window_hours_ago_ts
files_saved = 0
Expand Down
87 changes: 75 additions & 12 deletions snuba/cli/query_replayer.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
import re
import time
from datetime import datetime
from typing import Optional, Tuple

import click
import sentry_sdk
import structlog
from clickhouse_driver.errors import ErrorCodes

from snuba import settings
from snuba.admin.notifications.slack.client import SlackClient
from snuba.clickhouse.errors import ClickhouseError
from snuba.clickhouse.native import ClickhousePool
from snuba.clickhouse.upgrades.comparisons import (
BlobGetter,
FileFormat,
FileManager,
QueryInfoResult,
QueryMeasurementResult,
delete_local_file,
full_path,
)
from snuba.clusters.cluster import ClickhouseClientSettings
from snuba.environment import setup_logging, setup_sentry
Expand Down Expand Up @@ -56,6 +61,15 @@ def get_credentials(user: Optional[str], password: Optional[str]) -> Tuple[str,
return (user or "default", password or "")


def parse_table_name(err: ClickhouseError) -> str:
match = re.search(r"Table\s+(\S+)\s+doesn\'t", err.message)
if not match:
raise Exception("Couldn't parse table name.")
# err message will have full database.table but we just want table
_, table = match.group(1).strip().split(".")
return table


@click.command()
@click.option(
"--clickhouse-host",
Expand Down Expand Up @@ -93,19 +107,19 @@ def get_credentials(user: Optional[str], password: Optional[str]) -> Tuple[str,
)
@click.option(
"--override",
help="Option to override any previously re-run results",
help="Option to override any previously re-run results.",
is_flag=True,
default=False,
)
@click.option(
"--notify",
help="Option to send saved csv file to slack",
help="Option to send saved csv file to slack.",
is_flag=True,
default=False,
)
@click.option(
"--gcs-bucket",
help="Name of gcs bucket to save query files to",
help="Name of gcs bucket to save query files to.",
required=True,
)
@click.option(
Expand All @@ -115,6 +129,15 @@ def get_credentials(user: Optional[str], password: Optional[str]) -> Tuple[str,
required=True,
default=30,
)
@click.option(
"--old-cluster",
help="Cluster name you copied the schemas from, will be replaced by --new-cluster.",
default="snuba-test",
)
@click.option(
"--new-cluster",
help="Cluster name for the nodes you're re-running queries on, replaces --old-cluster.",
)
@click.option("--log-level", help="Logging level to use.")
def query_replayer(
*,
Expand All @@ -125,6 +148,8 @@ def query_replayer(
notify: bool,
gcs_bucket: str,
wait_seconds: int,
old_cluster: Optional[str],
new_cluster: Optional[str],
log_level: Optional[str] = None,
clickhouse_user: Optional[str] = None,
clickhouse_password: Optional[str] = None,
Expand Down Expand Up @@ -170,27 +195,65 @@ def get_version() -> str:
blob_getter.get_name_diffs(("queries/", f"{results_directory}/"))
)

def create_table(table: str) -> None:
"""
If we try to replay queries for tables that haven't
been created yet and fail, then we attempt to create
the table now. We download the schema from our gcs
bucket, replace the cluster name in the schema with
the one passed in the cli command (--cluster-name)
and execute the CREATE TABLE query.
"""
if not (old_cluster and new_cluster):
raise Exception(
"Must have --old-cluster and --new-cluster if creating tables!"
)
filename = f"{table}.sql"
filepath = full_path(filename)
logger.info(f"Downloading schema for {table}...")
uploader.download_file(f"schemas/{filename}", filepath)
with open(filename, encoding="utf-8") as f:
schema = f.read()
schema = schema.replace(old_cluster, new_cluster)
logger.info(f"Creating table {table}...")
connection.execute(schema)
delete_local_file(filepath)

def rerun_queries_for_blob(blob: str) -> Tuple[int, int]:
queries = file_manager.download(blob)
reran_queries = 0
total_queries = len(queries)
logger.info(f"Re-running queries for {blob}")
for q in queries:
assert isinstance(q, QueryInfoResult)

def _run_query(q: QueryInfoResult) -> Optional[ClickhouseError]:
try:
connection.execute(
q.query_str,
query_id=q.query_id,
)
reran_queries += 1
except Exception as e:
logger.info(
f"Re-ran {reran_queries}/{total_queries} queries before failing on {q.query_id}"
)
except Exception as err:
# capturing the execption so that we can debug,
# but not re-raising because we don't want one
# blob to prevent others from being processed
sentry_sdk.capture_exception(e)
sentry_sdk.capture_exception(err)
if isinstance(err, ClickhouseError):
return err
else:
return None
return None

logger.info(f"Re-running queries for {blob}")
for q in queries:
assert isinstance(q, QueryInfoResult)
err = _run_query(q)
if err and err.code == ErrorCodes.UNKNOWN_TABLE:
table = parse_table_name(err)
create_table(table)
# give it a second to create the table
time.sleep(1)
err = _run_query(q)
if not err:
reran_queries += 1

logger.info(f"Re-ran {reran_queries}/{total_queries} queries")
return (total_queries, reran_queries)

Expand Down
23 changes: 14 additions & 9 deletions snuba/clickhouse/upgrades/comparisons.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import csv
import dataclasses
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List, NamedTuple, Sequence, Tuple, Type, Union
Expand Down Expand Up @@ -100,6 +101,15 @@ def type_for_directory(directory: str) -> Type[Results]:
return DIRECTORY_RESULT_TYPES[directory]


def full_path(filename: str) -> str:
return f"/tmp/{filename}"


def delete_local_file(full_path: str) -> None:
if os.path.isfile(full_path):
os.remove(full_path)


class FileManager:
def __init__(self, uploader: GCSUploader) -> None:
self.uploader = uploader
Expand All @@ -122,27 +132,22 @@ def _format_blob_name(self, file_format: FileFormat) -> str:
day = datetime.strftime(date, "%Y_%m_%d")
return f"{directory}/{day}/{table}_{hour}.csv"

def _full_path(self, filename: str) -> str:
return f"/tmp/{filename}"

def _save_to_csv(
self, filename: str, results: Sequence[Results], header_row: bool = False
) -> None:
result_type = self._result_type(filename)
with open(self._full_path(filename), mode="w") as file:
with open(full_path(filename), mode="w") as file:
writer = csv.writer(file)
if header_row:
fields = [f.name for f in dataclasses.fields(result_type)] # mypy ig
writer.writerow(fields)
for row in results:
writer.writerow(dataclasses.astuple(row))

logger.debug(f"File {self._full_path(filename)} saved")

def _download_from_csv(self, filename: str) -> Sequence[Results]:
result_type = self._result_type(filename)
results: List[Results] = []
with open(self._full_path(filename), mode="r") as csvfile:
with open(full_path(filename), mode="r") as csvfile:
reader = csv.reader(csvfile)
for row in reader:
result: Results
Expand All @@ -161,10 +166,10 @@ def _download_from_csv(self, filename: str) -> Sequence[Results]:
return results

def _save_to_gcs(self, filename: str, blob_name: str) -> None:
self.uploader.upload_file(self._full_path(filename), blob_name)
self.uploader.upload_file(full_path(filename), blob_name)

def _download_from_gcs(self, blob_name: str, filename: str) -> None:
self.uploader.download_file(blob_name, self._full_path(filename))
self.uploader.download_file(blob_name, full_path(filename))

def filename_from_blob_name(self, blob_name: str) -> str:
return blob_name.replace("/", "_")
Expand Down

0 comments on commit 72b5205

Please sign in to comment.