Skip to content

Commit

Permalink
Fix snuba admin's query tracing to connect to right storage and query…
Browse files Browse the repository at this point in the history
… nodes
  • Loading branch information
onkar committed Sep 5, 2024
1 parent 401125d commit de3dd51
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 14 deletions.
75 changes: 61 additions & 14 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

import io
import sys
import time
from contextlib import redirect_stdout
from dataclasses import asdict
from datetime import datetime
from typing import Any, List, Mapping, Optional, Sequence, Tuple, cast
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, cast

import sentry_sdk
import simplejson as json
Expand Down Expand Up @@ -501,11 +502,43 @@ def clickhouse_trace_query() -> Response:
for query_trace_data in parse_trace_for_query_ids(query_trace, storage):
sql = profile_events_raw_sql.format(query_trace_data.query_id)
logger.info(
"Profile event gathering host: {}, port = {}, storage = {}, sql = {}, g.user = {}".format(
"Gathering profile event using host: {}, port = {}, storage = {}, sql = {}, g.user = {}".format(
query_trace_data.host, query_trace_data.port, storage, sql, g.user
)
)
# TODO: Onkar to add the profile event logic later.
system_query_result, counter = None, 0
while counter < 60:
# There is a race between the trace query and the 'SELECT ProfileEvents...' query. ClickHouse does not immediately
# return the rows for 'SELECT ProfileEvents...' query. To make it return rows, sleep between the query executions.
system_query_result = run_system_query_on_host_with_sql(
query_trace_data.host,
int(query_trace_data.port),
storage,
sql,
False,
g.user,
)
if not system_query_result.results:
time.sleep(1)
counter += 1
else:
break

if system_query_result is not None and len(system_query_result.results) > 0:
query_trace.profile_events_meta.append(system_query_result.meta)
query_trace.profile_events_profile = cast(
Dict[str, int], system_query_result.profile
)
columns = system_query_result.meta
if columns:
res = {}
res["column_names"] = [name for name, _ in columns]
res["rows"] = []
for query_result in system_query_result.results:
if query_result[0]:
res["rows"].append(json.dumps(query_result[0]))
query_trace.profile_events_results[query_trace_data.node_name] = res

return make_response(jsonify(asdict(query_trace)), 200)
except InvalidCustomQuery as err:
return make_response(
Expand All @@ -520,13 +553,15 @@ def clickhouse_trace_query() -> Response:
400,
)
except ClickhouseError as err:
logger.error(err, exc_info=True)
details = {
"type": "clickhouse",
"message": str(err),
"code": err.code,
}
return make_response(jsonify({"error": details}), 400)
except Exception as err:
logger.error(err, exc_info=True)
return make_response(
jsonify({"error": {"type": "unknown", "message": str(err)}}),
500,
Expand All @@ -536,7 +571,7 @@ def clickhouse_trace_query() -> Response:
def parse_trace_for_query_ids(
trace_output: TraceOutput, storage_key: str
) -> List[QueryTraceData]:
result = []
traces = []
summarized_trace_output = trace_output.summarized_trace_output
storage_info = get_storage_info()
matched = next(
Expand All @@ -545,16 +580,28 @@ def parse_trace_for_query_ids(
if matched is not None:
local_nodes = matched.get("local_nodes", [])
query_node = matched.get("query_node", None)
result = [
QueryTraceData(
host=local_nodes[0].get("host") if local_nodes else query_node.get("host"), # type: ignore
port=local_nodes[0].get("port") if local_nodes else query_node.get("port"), # type: ignore
query_id=query_summary.query_id,
node_name=node_name,
)
for node_name, query_summary in summarized_trace_output.query_summaries.items()
]
return result
for node_name, query_summary in summarized_trace_output.query_summaries.items():
if local_nodes:
for local_node in local_nodes:
traces.append(
QueryTraceData(
host=local_node.get("host"), # type: ignore
port=local_node.get("port"), # type: ignore
query_id=query_summary.query_id,
node_name=node_name,
)
)
if query_node:
traces.append(
QueryTraceData(
host=query_node.get("host"), # type: ignore
port=query_node.get("port"), # type: ignore
query_id=query_summary.query_id,
node_name=node_name,
)
)

return traces


@application.route("/clickhouse_querylog_query", methods=["POST"])
Expand Down
1 change: 1 addition & 0 deletions tests/admin/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ def test_query_trace(admin_api: FlaskClient) -> None:
data = json.loads(response.data)
assert "<Debug> executeQuery" in data["trace_output"]
assert "summarized_trace_output" in data
assert "profile_events_results" in data


@pytest.mark.redis_db
Expand Down

0 comments on commit de3dd51

Please sign in to comment.