Skip to content

Commit

Permalink
Improve profile events gathering with exponential backoff
Browse files Browse the repository at this point in the history
This change improves the reliability and efficiency of gathering profile events from ClickHouse by:

- Moving profile events logic to a dedicated module (snuba/admin/clickhouse/profile_events.py)
- Implementing exponential backoff for polling profile events with configurable limits
- Replaced fixed 30 attempts with 1-second delays with an exponential backoff strategy
- Added configuration constants PROFILE_EVENTS_MAX_ATTEMPTS (4) and PROFILE_EVENTS_MAX_WAIT_SECONDS (16)
  • Loading branch information
nachivrn committed Nov 5, 2024
1 parent 02955c9 commit 04df25a
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 85 deletions.
98 changes: 98 additions & 0 deletions snuba/admin/clickhouse/profile_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import json
import socket
import time
from typing import Dict, List, cast

import structlog
from flask import g

from snuba.admin.clickhouse.system_queries import run_system_query_on_host_with_sql
from snuba.admin.clickhouse.tracing import QueryTraceData, TraceOutput
from snuba.utils.constants import (
PROFILE_EVENTS_MAX_ATTEMPTS,
PROFILE_EVENTS_MAX_WAIT_SECONDS,
)

logger = structlog.get_logger().bind(module=__name__)


def gather_profile_events(query_trace: TraceOutput, storage: str) -> None:
"""
Gathers profile events for each query trace and updates the query_trace object with results.
Uses exponential backoff when polling for results.
Args:
query_trace: TraceOutput object to update with profile events
storage: Storage identifier
"""
profile_events_raw_sql = "SELECT ProfileEvents FROM system.query_log WHERE query_id = '{}' AND type = 'QueryFinish'"

for query_trace_data in parse_trace_for_query_ids(query_trace):
sql = profile_events_raw_sql.format(query_trace_data.query_id)
logger.info(
"Gathering profile event using host: {}, port = {}, storage = {}, sql = {}, g.user = {}".format(
query_trace_data.host, query_trace_data.port, storage, sql, g.user
)
)

system_query_result = None
attempt = 0
wait_time = 1
while attempt < PROFILE_EVENTS_MAX_ATTEMPTS:
system_query_result = run_system_query_on_host_with_sql(
query_trace_data.host,
int(query_trace_data.port),
storage,
sql,
False,
g.user,
)

if system_query_result.results:
break

wait_time = min(wait_time * 2, PROFILE_EVENTS_MAX_WAIT_SECONDS)
time.sleep(wait_time)
attempt += 1

if system_query_result is not None and len(system_query_result.results) > 0:
query_trace.profile_events_meta.append(system_query_result.meta)
query_trace.profile_events_profile = cast(
Dict[str, int], system_query_result.profile
)
columns = system_query_result.meta
if columns:
res = {}
res["column_names"] = [name for name, _ in columns]
res["rows"] = []
for query_result in system_query_result.results:
if query_result[0]:
res["rows"].append(json.dumps(query_result[0]))
query_trace.profile_events_results[query_trace_data.node_name] = res


def hostname_resolves(hostname: str) -> bool:
try:
socket.gethostbyname(hostname)
except socket.error:
return False
else:
return True


def parse_trace_for_query_ids(trace_output: TraceOutput) -> List[QueryTraceData]:
summarized_trace_output = trace_output.summarized_trace_output
node_name_to_query_id = {
node_name: query_summary.query_id
for node_name, query_summary in summarized_trace_output.query_summaries.items()
}
logger.info("node to query id mapping: {}".format(node_name_to_query_id))
return [
QueryTraceData(
host=node_name if hostname_resolves(node_name) else "127.0.0.1",
port=9000,
query_id=query_id,
node_name=node_name,
)
for node_name, query_id in node_name_to_query_id.items()
]
88 changes: 3 additions & 85 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from __future__ import annotations

import io
import socket
import sys
import time
from contextlib import redirect_stdout
from dataclasses import asdict
from datetime import datetime
from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast
from typing import Any, List, Mapping, Optional, Sequence, Tuple, Type, cast

import sentry_sdk
import simplejson as json
Expand All @@ -33,17 +31,14 @@
)
from snuba.admin.clickhouse.predefined_querylog_queries import QuerylogQuery
from snuba.admin.clickhouse.predefined_system_queries import SystemQuery
from snuba.admin.clickhouse.profile_events import gather_profile_events
from snuba.admin.clickhouse.querylog import describe_querylog_schema, run_querylog_query
from snuba.admin.clickhouse.system_queries import (
UnauthorizedForSudo,
run_system_query_on_host_with_sql,
)
from snuba.admin.clickhouse.trace_log_parsing import summarize_trace_output
from snuba.admin.clickhouse.tracing import (
QueryTraceData,
TraceOutput,
run_query_and_get_trace,
)
from snuba.admin.clickhouse.tracing import TraceOutput, run_query_and_get_trace
from snuba.admin.dead_letter_queue import get_dlq_topics
from snuba.admin.kafka.topics import get_broker_data
from snuba.admin.migrations_policies import (
Expand Down Expand Up @@ -531,83 +526,6 @@ def clickhouse_trace_query() -> Response:
)


def gather_profile_events(query_trace: TraceOutput, storage: str) -> None:
"""
Gathers profile events for each query trace and updates the query_trace object with results.
Args:
query_trace: TraceOutput object to update with profile events
storage: Storage identifier
"""
profile_events_raw_sql = "SELECT ProfileEvents FROM system.query_log WHERE query_id = '{}' AND type = 'QueryFinish'"
for query_trace_data in parse_trace_for_query_ids(query_trace):
sql = profile_events_raw_sql.format(query_trace_data.query_id)
logger.info(
"Gathering profile event using host: {}, port = {}, storage = {}, sql = {}, g.user = {}".format(
query_trace_data.host, query_trace_data.port, storage, sql, g.user
)
)
system_query_result, counter = None, 0
while counter < 30:
# There is a race between the trace query and the 'SELECT ProfileEvents...' query. ClickHouse does not immediately
# return the rows for 'SELECT ProfileEvents...' query. To make it return rows, sleep between the query executions.
system_query_result = run_system_query_on_host_with_sql(
query_trace_data.host,
int(query_trace_data.port),
storage,
sql,
False,
g.user,
)
if not system_query_result.results:
time.sleep(1)
counter += 1
else:
break

if system_query_result is not None and len(system_query_result.results) > 0:
query_trace.profile_events_meta.append(system_query_result.meta)
query_trace.profile_events_profile = cast(
Dict[str, int], system_query_result.profile
)
columns = system_query_result.meta
if columns:
res = {}
res["column_names"] = [name for name, _ in columns]
res["rows"] = []
for query_result in system_query_result.results:
if query_result[0]:
res["rows"].append(json.dumps(query_result[0]))
query_trace.profile_events_results[query_trace_data.node_name] = res


def hostname_resolves(hostname: str) -> bool:
try:
socket.gethostbyname(hostname)
except socket.error:
return False
else:
return True


def parse_trace_for_query_ids(trace_output: TraceOutput) -> List[QueryTraceData]:
summarized_trace_output = trace_output.summarized_trace_output
node_name_to_query_id = {
node_name: query_summary.query_id
for node_name, query_summary in summarized_trace_output.query_summaries.items()
}
logger.info("node to query id mapping: {}".format(node_name_to_query_id))
return [
QueryTraceData(
host=node_name if hostname_resolves(node_name) else "127.0.0.1",
port=9000,
query_id=query_id,
node_name=node_name,
)
for node_name, query_id in node_name_to_query_id.items()
]


@application.route("/rpc_summarize_trace_with_profile", methods=["POST"])
@check_tool_perms(tools=[AdminTools.QUERY_TRACING])
def summarize_trace_with_profile() -> Response:
Expand Down
8 changes: 8 additions & 0 deletions snuba/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,11 @@
# on spans are bucketed into different columns
# This will affect migrations and querying.
ATTRIBUTE_BUCKETS = 20

# Maximum number of attempts to fetch profile events
PROFILE_EVENTS_MAX_ATTEMPTS = (
4 # Will result in ~23 seconds total wait time with exponential backoff
)

# Maximum wait time between attempts in seconds
PROFILE_EVENTS_MAX_WAIT_SECONDS = 16
137 changes: 137 additions & 0 deletions tests/clickhouse/test_profile_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
from unittest.mock import MagicMock, patch

import pytest
from flask import g, json

from snuba.admin.clickhouse.profile_events import (
gather_profile_events,
hostname_resolves,
parse_trace_for_query_ids,
)
from snuba.admin.clickhouse.tracing import QueryTraceData


def test_hostname_resolves():
assert hostname_resolves("localhost") == True
assert hostname_resolves("invalid-hostname-that-doesnt-exist-123.local") == False


def test_parse_trace_for_query_ids():
trace_output = MagicMock()
trace_output.summarized_trace_output.query_summaries = {
"host1": MagicMock(query_id="query1"),
"host2": MagicMock(query_id="query2"),
}

with patch(
"snuba.admin.clickhouse.profile_events.hostname_resolves"
) as mock_resolve:
mock_resolve.return_value = True
result = parse_trace_for_query_ids(trace_output)

assert len(result) == 2
assert result[0] == QueryTraceData(
host="host1", port=9000, query_id="query1", node_name="host1"
)
assert result[1] == QueryTraceData(
host="host2", port=9000, query_id="query2", node_name="host2"
)

mock_resolve.return_value = False
result = parse_trace_for_query_ids(trace_output)

assert len(result) == 2
assert result[0] == QueryTraceData(
host="127.0.0.1", port=9000, query_id="query1", node_name="host1"
)
assert result[1] == QueryTraceData(
host="127.0.0.1", port=9000, query_id="query2", node_name="host2"
)


@pytest.mark.clickhouse_db
def test_gather_profile_events():
trace_output = MagicMock()
trace_output.summarized_trace_output.query_summaries = {
"host1": MagicMock(query_id="query1"),
}
trace_output.profile_events_meta = []
trace_output.profile_events_results = {}

mock_system_query_result = MagicMock()
mock_system_query_result.results = [("profile_events",)]
mock_system_query_result.meta = [("column1", "type1")]
mock_system_query_result.profile = {"profile_key": 123}

with patch(
"snuba.admin.clickhouse.profile_events.run_system_query_on_host_with_sql"
) as mock_query:
mock_query.return_value = mock_system_query_result
with patch(
"snuba.admin.clickhouse.profile_events.hostname_resolves", return_value=True
):
from flask import Flask

app = Flask(__name__)
with app.app_context():
g.user = "test_user"
gather_profile_events(trace_output, "test_storage")

mock_query.assert_called_once_with(
"host1",
9000,
"test_storage",
"SELECT ProfileEvents FROM system.query_log WHERE query_id = 'query1' AND type = 'QueryFinish'",
False,
"test_user",
)

assert trace_output.profile_events_meta == [
mock_system_query_result.meta
]
assert (
trace_output.profile_events_profile
== mock_system_query_result.profile
)
assert trace_output.profile_events_results["host1"] == {
"column_names": ["column1"],
"rows": [json.dumps("profile_events")],
}


@pytest.mark.clickhouse_db
def test_gather_profile_events_retry_logic():
trace_output = MagicMock()
trace_output.summarized_trace_output.query_summaries = {
"host1": MagicMock(query_id="query1"),
}

empty_result = MagicMock()
empty_result.results = []

success_result = MagicMock()
success_result.results = [("profile_events",)]
success_result.meta = [("column1", "type1")]
success_result.profile = {"profile_key": 123}

with patch(
"snuba.admin.clickhouse.profile_events.run_system_query_on_host_with_sql"
) as mock_query:
mock_query.side_effect = [empty_result, empty_result, success_result]
with patch(
"snuba.admin.clickhouse.profile_events.hostname_resolves", return_value=True
):
with patch("time.sleep") as mock_sleep:
from flask import Flask

app = Flask(__name__)
with app.app_context():
g.user = "test_user"

gather_profile_events(trace_output, "test_storage")

assert mock_query.call_count == 3
assert mock_sleep.call_count == 2

assert mock_sleep.call_args_list[0][0][0] == 2
assert mock_sleep.call_args_list[1][0][0] == 4

0 comments on commit 04df25a

Please sign in to comment.