Skip to content

Commit

Permalink
Only retrieve updated query statement metrics (#19321)
Browse files Browse the repository at this point in the history
* Cache digest text for MySQL

* WIP

* Metric

* Fix time

* Last seen

* WIP

* Fixed

* Clean

* Clean

* Feature flag

* Changelog

* Clean

* Handle case of skipped queries between runs

* Clean

* Clean
  • Loading branch information
sethsamuel authored Jan 3, 2025
1 parent 36642e0 commit e1501f8
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 12 deletions.
10 changes: 10 additions & 0 deletions mysql/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ files:
value:
type: number
example: 10
- name: only_query_recent_statements
description: |
Enable querying only for statements that have been run since last collection. This may improve agent
performance and reduce database load. Enabling this option should not alter the total number of query
metrics available.
value:
type: boolean
example: false
display_default: false

- name: query_samples
description: Configure collection of query samples
options:
Expand Down
1 change: 1 addition & 0 deletions mysql/changelog.d/19321.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added an optional optimization to MySQL statement metrics collection to only query for queries that have run since the last check collection.
1 change: 1 addition & 0 deletions mysql/datadog_checks/mysql/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class QueryMetrics(BaseModel):
)
collection_interval: Optional[float] = None
enabled: Optional[bool] = None
only_query_recent_statements: Optional[bool] = None


class QuerySamples(BaseModel):
Expand Down
7 changes: 7 additions & 0 deletions mysql/datadog_checks/mysql/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,13 @@ instances:
#
# collection_interval: 10

## @param only_query_recent_statements - boolean - optional - default: false
## Enable querying only for statements that have been run since last collection. This may improve agent
## performance and reduce database load. Enabling this option should not alter the total number of query
## metrics available.
#
# only_query_recent_statements: false

## Configure collection of query samples
#
# query_samples:
Expand Down
103 changes: 92 additions & 11 deletions mysql/datadog_checks/mysql/statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,19 @@ def __init__(self, check, config, connection_args):
self.log = get_check_logger()
self._state = StatementMetrics()
self._obfuscate_options = to_native_string(json.dumps(self._config.obfuscator_options))
# last_seen: the last query execution time seen by the check
# This is used to limit the queries to fetch from the performance schema to only the new ones
self._last_seen = '1970-01-01'
# full_statement_text_cache: limit the ingestion rate of full statement text events per query_signature
self._full_statement_text_cache = TTLCache(
maxsize=self._config.full_statement_text_cache_max_size,
ttl=60 * 60 / self._config.full_statement_text_samples_per_hour_per_query,
) # type: TTLCache

# statement_rows: cache of all rows for each digest, keyed by (schema_name, query_signature)
# This is used to cache the metrics for queries that have the same query_signature but different digests
self._statement_rows = {} # type: Dict[(str, str), Dict[str, PyMysqlRow]]

def _get_db_connection(self):
"""
lazy reconnect db
Expand All @@ -111,7 +118,14 @@ def _close_db_conn(self):
self._db = None

def run_job(self):
start = time.time()
self.collect_per_statement_metrics()
self._check.gauge(
"dd.mysql.statement_metrics.collect_metrics.elapsed_ms",
(time.time() - start) * 1000,
tags=self._check.tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)

@tracked_method(agent_check_getter=attrgetter('_check'))
def collect_per_statement_metrics(self):
Expand All @@ -134,12 +148,14 @@ def collect_per_statement_metrics(self):
)
return

rows = self._collect_per_statement_metrics()
if not rows:
return
# Omit internal tags for dbm payloads since those are only relevant to metrics processed directly
# by the agent
tags = [t for t in self._tags if not t.startswith('dd.internal')]

rows = self._collect_per_statement_metrics(tags)
if not rows:
# No rows to process, can skip the rest of the payload generation and avoid an empty payload
return
for event in self._rows_to_fqt_events(rows, tags):
self._check.database_monitoring_query_sample(json.dumps(event, default=default_json_event_encoding))
payload = {
Expand All @@ -156,20 +172,45 @@ def collect_per_statement_metrics(self):
'mysql_rows': rows,
}
self._check.database_monitoring_query_metrics(json.dumps(payload, default=default_json_event_encoding))
self._check.count(
self._check.gauge(
"dd.mysql.collect_per_statement_metrics.rows",
len(rows),
tags=tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)

def _collect_per_statement_metrics(self):
# type: () -> List[PyMysqlRow]
def _collect_per_statement_metrics(self, tags):
# type: (List[str]) -> List[PyMysqlRow]

self._get_statement_count(tags)

monotonic_rows = self._query_summary_per_statement()
self._check.gauge(
"dd.mysql.statement_metrics.query_rows",
len(monotonic_rows),
tags=tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)

monotonic_rows = self._filter_query_rows(monotonic_rows)
monotonic_rows = self._normalize_queries(monotonic_rows)
monotonic_rows = self._add_associated_rows(monotonic_rows)
rows = self._state.compute_derivative_rows(monotonic_rows, METRICS_COLUMNS, key=_row_key)
return rows

def _get_statement_count(self, tags):
with closing(self._get_db_connection().cursor(CommenterDictCursor)) as cursor:
cursor.execute("SELECT count(*) AS count from performance_schema.events_statements_summary_by_digest")

rows = cursor.fetchall() or [] # type: ignore
if rows:
self._check.gauge(
"dd.mysql.statement_metrics.events_statements_summary_by_digest.total_rows",
rows[0]['count'],
tags=tags + self._check._get_debug_tags(),
hostname=self._check.resolved_hostname,
)

def _query_summary_per_statement(self):
# type: () -> List[PyMysqlRow]
"""
Expand All @@ -178,6 +219,14 @@ def _query_summary_per_statement(self):
values to get the counts for the elapsed period. This is similar to monotonic_count, but
several fields must be further processed from the delta values.
"""
only_query_recent_statements = self._config.statement_metrics_config.get('only_query_recent_statements', False)
condition = (
"WHERE `last_seen` >= %s"
if only_query_recent_statements
else """WHERE `digest_text` NOT LIKE 'EXPLAIN %' OR `digest_text` IS NULL
ORDER BY `count_star` DESC
LIMIT 10000"""
)

sql_statement_summary = """\
SELECT `schema_name`,
Expand All @@ -193,19 +242,34 @@ def _query_summary_per_statement(self):
`sum_select_scan`,
`sum_select_full_join`,
`sum_no_index_used`,
`sum_no_good_index_used`
`sum_no_good_index_used`,
`last_seen`
FROM performance_schema.events_statements_summary_by_digest
WHERE `digest_text` NOT LIKE 'EXPLAIN %' OR `digest_text` IS NULL
ORDER BY `count_star` DESC
LIMIT 10000"""
{}
""".format(
condition
)

with closing(self._get_db_connection().cursor(CommenterDictCursor)) as cursor:
cursor.execute(sql_statement_summary)
args = [self._last_seen] if only_query_recent_statements else None
cursor.execute(sql_statement_summary, args)

rows = cursor.fetchall() or [] # type: ignore

if rows:
self._last_seen = max(row['last_seen'] for row in rows)

return rows

def _filter_query_rows(self, rows):
# type: (List[PyMysqlRow]) -> List[PyMysqlRow]
"""
Filter out rows that are EXPLAIN statements
"""
return [
row for row in rows if row['digest_text'] is None or not row['digest_text'].lower().startswith('explain')
]

def _normalize_queries(self, rows):
normalized_rows = []
for row in rows:
Expand All @@ -227,6 +291,23 @@ def _normalize_queries(self, rows):

return normalized_rows

def _add_associated_rows(self, rows):
"""
If two or more statements with different digests have the same query_signature, they are considered the same
Because only one digest statement may be updated, we cache all the rows for each digest,
update with any new rows and then return all the rows for all the query_signatures.
We return all rows to guard against the case where a signature wasn't collected on the immediately previous run
but was present on runs before that.
"""
for row in rows:
key = (row['schema_name'], row['query_signature'])
if key not in self._statement_rows:
self._statement_rows[key] = {}
self._statement_rows[key][row['digest']] = row

return [row for statement_row in self._statement_rows.values() for row in statement_row.values()]

def _rows_to_fqt_events(self, rows, tags):
for row in rows:
query_cache_key = _row_key(row)
Expand Down
11 changes: 10 additions & 1 deletion mysql/tests/test_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,19 @@ def test_statement_samples_enabled_config(dbm_instance, statement_samples_key, s
)
@pytest.mark.parametrize("default_schema", [None, "testdb"])
@pytest.mark.parametrize("aurora_replication_role", [None, "writer", "reader"])
@pytest.mark.parametrize("only_query_recent_statements", [False, True])
@mock.patch.dict('os.environ', {'DDEV_SKIP_GENERIC_TAGS_CHECK': 'true'})
def test_statement_metrics(
aggregator, dd_run_check, dbm_instance, query, default_schema, datadog_agent, aurora_replication_role
aggregator,
dd_run_check,
dbm_instance,
query,
default_schema,
datadog_agent,
aurora_replication_role,
only_query_recent_statements,
):
dbm_instance['query_metrics']['only_query_recent_statements'] = only_query_recent_statements
mysql_check = MySql(common.CHECK_NAME, {}, [dbm_instance])

def run_query(q):
Expand Down

0 comments on commit e1501f8

Please sign in to comment.