diff --git a/snuba/admin/clickhouse/profile_events.py b/snuba/admin/clickhouse/profile_events.py new file mode 100644 index 00000000000..44533cf54f5 --- /dev/null +++ b/snuba/admin/clickhouse/profile_events.py @@ -0,0 +1,98 @@ +import json +import socket +import time +from typing import Dict, List, cast + +import structlog +from flask import g + +from snuba.admin.clickhouse.system_queries import run_system_query_on_host_with_sql +from snuba.admin.clickhouse.tracing import QueryTraceData, TraceOutput +from snuba.utils.constants import ( + PROFILE_EVENTS_MAX_ATTEMPTS, + PROFILE_EVENTS_MAX_WAIT_SECONDS, +) + +logger = structlog.get_logger().bind(module=__name__) + + +def gather_profile_events(query_trace: TraceOutput, storage: str) -> None: + """ + Gathers profile events for each query trace and updates the query_trace object with results. + Uses exponential backoff when polling for results. + + Args: + query_trace: TraceOutput object to update with profile events + storage: Storage identifier + """ + profile_events_raw_sql = "SELECT ProfileEvents FROM system.query_log WHERE query_id = '{}' AND type = 'QueryFinish'" + + for query_trace_data in parse_trace_for_query_ids(query_trace): + sql = profile_events_raw_sql.format(query_trace_data.query_id) + logger.info( + "Gathering profile event using host: {}, port = {}, storage = {}, sql = {}, g.user = {}".format( + query_trace_data.host, query_trace_data.port, storage, sql, g.user + ) + ) + + system_query_result = None + attempt = 0 + wait_time = 1 + while attempt < PROFILE_EVENTS_MAX_ATTEMPTS: + system_query_result = run_system_query_on_host_with_sql( + query_trace_data.host, + int(query_trace_data.port), + storage, + sql, + False, + g.user, + ) + + if system_query_result.results: + break + + wait_time = min(wait_time * 2, PROFILE_EVENTS_MAX_WAIT_SECONDS) + time.sleep(wait_time) + attempt += 1 + + if system_query_result is not None and len(system_query_result.results) > 0: + query_trace.profile_events_meta.append(system_query_result.meta) + query_trace.profile_events_profile = cast( + Dict[str, int], system_query_result.profile + ) + columns = system_query_result.meta + if columns: + res = {} + res["column_names"] = [name for name, _ in columns] + res["rows"] = [] + for query_result in system_query_result.results: + if query_result[0]: + res["rows"].append(json.dumps(query_result[0])) + query_trace.profile_events_results[query_trace_data.node_name] = res + + +def hostname_resolves(hostname: str) -> bool: + try: + socket.gethostbyname(hostname) + except socket.error: + return False + else: + return True + + +def parse_trace_for_query_ids(trace_output: TraceOutput) -> List[QueryTraceData]: + summarized_trace_output = trace_output.summarized_trace_output + node_name_to_query_id = { + node_name: query_summary.query_id + for node_name, query_summary in summarized_trace_output.query_summaries.items() + } + logger.info("node to query id mapping: {}".format(node_name_to_query_id)) + return [ + QueryTraceData( + host=node_name if hostname_resolves(node_name) else "127.0.0.1", + port=9000, + query_id=query_id, + node_name=node_name, + ) + for node_name, query_id in node_name_to_query_id.items() + ] diff --git a/snuba/admin/views.py b/snuba/admin/views.py index 675786e2608..d9447e470e7 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -1,13 +1,11 @@ from __future__ import annotations import io -import socket import sys -import time from contextlib import redirect_stdout from dataclasses import asdict from datetime import datetime -from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast +from typing import Any, List, Mapping, Optional, Sequence, Tuple, Type, cast import sentry_sdk import simplejson as json @@ -33,17 +31,14 @@ ) from snuba.admin.clickhouse.predefined_querylog_queries import QuerylogQuery from snuba.admin.clickhouse.predefined_system_queries import SystemQuery +from snuba.admin.clickhouse.profile_events import gather_profile_events from snuba.admin.clickhouse.querylog import describe_querylog_schema, run_querylog_query from snuba.admin.clickhouse.system_queries import ( UnauthorizedForSudo, run_system_query_on_host_with_sql, ) from snuba.admin.clickhouse.trace_log_parsing import summarize_trace_output -from snuba.admin.clickhouse.tracing import ( - QueryTraceData, - TraceOutput, - run_query_and_get_trace, -) +from snuba.admin.clickhouse.tracing import TraceOutput, run_query_and_get_trace from snuba.admin.dead_letter_queue import get_dlq_topics from snuba.admin.kafka.topics import get_broker_data from snuba.admin.migrations_policies import ( @@ -531,83 +526,6 @@ def clickhouse_trace_query() -> Response: ) -def gather_profile_events(query_trace: TraceOutput, storage: str) -> None: - """ - Gathers profile events for each query trace and updates the query_trace object with results. - - Args: - query_trace: TraceOutput object to update with profile events - storage: Storage identifier - """ - profile_events_raw_sql = "SELECT ProfileEvents FROM system.query_log WHERE query_id = '{}' AND type = 'QueryFinish'" - for query_trace_data in parse_trace_for_query_ids(query_trace): - sql = profile_events_raw_sql.format(query_trace_data.query_id) - logger.info( - "Gathering profile event using host: {}, port = {}, storage = {}, sql = {}, g.user = {}".format( - query_trace_data.host, query_trace_data.port, storage, sql, g.user - ) - ) - system_query_result, counter = None, 0 - while counter < 30: - # There is a race between the trace query and the 'SELECT ProfileEvents...' query. ClickHouse does not immediately - # return the rows for 'SELECT ProfileEvents...' query. To make it return rows, sleep between the query executions. - system_query_result = run_system_query_on_host_with_sql( - query_trace_data.host, - int(query_trace_data.port), - storage, - sql, - False, - g.user, - ) - if not system_query_result.results: - time.sleep(1) - counter += 1 - else: - break - - if system_query_result is not None and len(system_query_result.results) > 0: - query_trace.profile_events_meta.append(system_query_result.meta) - query_trace.profile_events_profile = cast( - Dict[str, int], system_query_result.profile - ) - columns = system_query_result.meta - if columns: - res = {} - res["column_names"] = [name for name, _ in columns] - res["rows"] = [] - for query_result in system_query_result.results: - if query_result[0]: - res["rows"].append(json.dumps(query_result[0])) - query_trace.profile_events_results[query_trace_data.node_name] = res - - -def hostname_resolves(hostname: str) -> bool: - try: - socket.gethostbyname(hostname) - except socket.error: - return False - else: - return True - - -def parse_trace_for_query_ids(trace_output: TraceOutput) -> List[QueryTraceData]: - summarized_trace_output = trace_output.summarized_trace_output - node_name_to_query_id = { - node_name: query_summary.query_id - for node_name, query_summary in summarized_trace_output.query_summaries.items() - } - logger.info("node to query id mapping: {}".format(node_name_to_query_id)) - return [ - QueryTraceData( - host=node_name if hostname_resolves(node_name) else "127.0.0.1", - port=9000, - query_id=query_id, - node_name=node_name, - ) - for node_name, query_id in node_name_to_query_id.items() - ] - - @application.route("/rpc_summarize_trace_with_profile", methods=["POST"]) @check_tool_perms(tools=[AdminTools.QUERY_TRACING]) def summarize_trace_with_profile() -> Response: diff --git a/snuba/utils/constants.py b/snuba/utils/constants.py index 8793ad1fb96..624f1e589ad 100644 --- a/snuba/utils/constants.py +++ b/snuba/utils/constants.py @@ -11,3 +11,11 @@ # on spans are bucketed into different columns # This will affect migrations and querying. ATTRIBUTE_BUCKETS = 20 + +# Maximum number of attempts to fetch profile events +PROFILE_EVENTS_MAX_ATTEMPTS = ( + 4 # Will result in ~23 seconds total wait time with exponential backoff +) + +# Maximum wait time between attempts in seconds +PROFILE_EVENTS_MAX_WAIT_SECONDS = 16 diff --git a/tests/clickhouse/test_profile_events.py b/tests/clickhouse/test_profile_events.py new file mode 100644 index 00000000000..711b6dd292c --- /dev/null +++ b/tests/clickhouse/test_profile_events.py @@ -0,0 +1,137 @@ +from unittest.mock import MagicMock, patch + +import pytest +from flask import g, json + +from snuba.admin.clickhouse.profile_events import ( + gather_profile_events, + hostname_resolves, + parse_trace_for_query_ids, +) +from snuba.admin.clickhouse.tracing import QueryTraceData + + +def test_hostname_resolves(): + assert hostname_resolves("localhost") == True + assert hostname_resolves("invalid-hostname-that-doesnt-exist-123.local") == False + + +def test_parse_trace_for_query_ids(): + trace_output = MagicMock() + trace_output.summarized_trace_output.query_summaries = { + "host1": MagicMock(query_id="query1"), + "host2": MagicMock(query_id="query2"), + } + + with patch( + "snuba.admin.clickhouse.profile_events.hostname_resolves" + ) as mock_resolve: + mock_resolve.return_value = True + result = parse_trace_for_query_ids(trace_output) + + assert len(result) == 2 + assert result[0] == QueryTraceData( + host="host1", port=9000, query_id="query1", node_name="host1" + ) + assert result[1] == QueryTraceData( + host="host2", port=9000, query_id="query2", node_name="host2" + ) + + mock_resolve.return_value = False + result = parse_trace_for_query_ids(trace_output) + + assert len(result) == 2 + assert result[0] == QueryTraceData( + host="127.0.0.1", port=9000, query_id="query1", node_name="host1" + ) + assert result[1] == QueryTraceData( + host="127.0.0.1", port=9000, query_id="query2", node_name="host2" + ) + + +@pytest.mark.clickhouse_db +def test_gather_profile_events(): + trace_output = MagicMock() + trace_output.summarized_trace_output.query_summaries = { + "host1": MagicMock(query_id="query1"), + } + trace_output.profile_events_meta = [] + trace_output.profile_events_results = {} + + mock_system_query_result = MagicMock() + mock_system_query_result.results = [("profile_events",)] + mock_system_query_result.meta = [("column1", "type1")] + mock_system_query_result.profile = {"profile_key": 123} + + with patch( + "snuba.admin.clickhouse.profile_events.run_system_query_on_host_with_sql" + ) as mock_query: + mock_query.return_value = mock_system_query_result + with patch( + "snuba.admin.clickhouse.profile_events.hostname_resolves", return_value=True + ): + from flask import Flask + + app = Flask(__name__) + with app.app_context(): + g.user = "test_user" + gather_profile_events(trace_output, "test_storage") + + mock_query.assert_called_once_with( + "host1", + 9000, + "test_storage", + "SELECT ProfileEvents FROM system.query_log WHERE query_id = 'query1' AND type = 'QueryFinish'", + False, + "test_user", + ) + + assert trace_output.profile_events_meta == [ + mock_system_query_result.meta + ] + assert ( + trace_output.profile_events_profile + == mock_system_query_result.profile + ) + assert trace_output.profile_events_results["host1"] == { + "column_names": ["column1"], + "rows": [json.dumps("profile_events")], + } + + +@pytest.mark.clickhouse_db +def test_gather_profile_events_retry_logic(): + trace_output = MagicMock() + trace_output.summarized_trace_output.query_summaries = { + "host1": MagicMock(query_id="query1"), + } + + empty_result = MagicMock() + empty_result.results = [] + + success_result = MagicMock() + success_result.results = [("profile_events",)] + success_result.meta = [("column1", "type1")] + success_result.profile = {"profile_key": 123} + + with patch( + "snuba.admin.clickhouse.profile_events.run_system_query_on_host_with_sql" + ) as mock_query: + mock_query.side_effect = [empty_result, empty_result, success_result] + with patch( + "snuba.admin.clickhouse.profile_events.hostname_resolves", return_value=True + ): + with patch("time.sleep") as mock_sleep: + from flask import Flask + + app = Flask(__name__) + with app.app_context(): + g.user = "test_user" + + gather_profile_events(trace_output, "test_storage") + + assert mock_query.call_count == 3 + assert mock_sleep.call_count == 2 + + assert mock_sleep.call_args_list[0][0][0] == 2 + assert mock_sleep.call_args_list[1][0][0] == 4