Skip to content

Commit

Permalink
feat(eap): make mapContains work with EAP dataset (#6284)
Browse files Browse the repository at this point in the history
This hashes the key before it's sent over to clickhouse.
  • Loading branch information
colin-sentry authored Sep 10, 2024
1 parent 2d14165 commit d02cff4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 5 deletions.
41 changes: 38 additions & 3 deletions snuba/query/processors/logical/hash_bucket_functions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Sequence

from snuba.query.expressions import Column, Expression, FunctionCall
from snuba.query.expressions import Column, Expression, FunctionCall, Literal
from snuba.query.logical import Query
from snuba.query.processors.logical import LogicalQueryProcessor
from snuba.query.query_settings import QuerySettings
from snuba.utils.constants import ATTRIBUTE_BUCKETS
from snuba.utils.hashes import fnv_1a


class HashBucketFunctionTransformer(LogicalQueryProcessor):
Expand All @@ -17,6 +18,8 @@ class HashBucketFunctionTransformer(LogicalQueryProcessor):
This transformer converts mapKeys(attr_str) to arrayConcat(mapKeys(attr_str_0), mapKeys(attr_str_1), ...)
and the same for mapValues
It converts mapExists(attr_str, 'blah') to mapExists(attr_str_{hash('blah')%20}, 'blah')
"""

def __init__(
Expand All @@ -26,7 +29,7 @@ def __init__(
self.hash_bucket_names = hash_bucket_names

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def transform_expression(exp: Expression) -> Expression:
def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
if not isinstance(exp, FunctionCall):
return exp

Expand Down Expand Up @@ -62,4 +65,36 @@ def transform_expression(exp: Expression) -> Expression:
),
)

query.transform_expressions(transform_expression)
def transform_map_contains_expression(exp: Expression) -> Expression:
if not isinstance(exp, FunctionCall):
return exp

if len(exp.parameters) != 2:
return exp

column = exp.parameters[0]
if not isinstance(column, Column):
return exp

if column.column_name not in self.hash_bucket_names:
return exp

if exp.function_name != "mapContains":
return exp

key = exp.parameters[1]
if not isinstance(key, Literal) or not isinstance(key.value, str):
return exp

bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS
return FunctionCall(
alias=exp.alias,
function_name=exp.function_name,
parameters=(
Column(None, None, f"{column.column_name}_{bucket_idx}"),
key,
),
)

query.transform_expressions(transform_map_keys_and_values_expression)
query.transform_expressions(transform_map_contains_expression)
1 change: 0 additions & 1 deletion snuba/web/rpc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression
if k.name in NORMALIZED_COLUMNS.keys():
return f.isNotNull(column(k.name))
if k.type == AttributeKey.Type.TYPE_STRING:
# TODO: this doesn't actually work yet, need to make mapContains work with hash mapper too
return f.mapContains(column("attr_str"), literal(k.name))
else:
return f.mapContains(column("attr_num"), literal(k.name))
Expand Down
32 changes: 31 additions & 1 deletion tests/query/processors/test_hash_bucket_functions_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from snuba.query import SelectedExpression
from snuba.query.data_source.simple import Entity as QueryEntity
from snuba.query.dsl import Functions as f
from snuba.query.dsl import binary_condition, literal
from snuba.query.dsl import binary_condition, column, literal
from snuba.query.expressions import Column, FunctionCall
from snuba.query.logical import Query
from snuba.query.processors.logical.hash_bucket_functions import (
Expand Down Expand Up @@ -184,6 +184,36 @@
),
),
),
(
Query(
QueryEntity(EntityKey.EAP_SPANS, ColumnSet([])),
selected_columns=[
SelectedExpression(
"unrelated",
Column(None, None, "column2"),
),
],
condition=binary_condition(
"or",
f.mapContains(column("attr_str"), literal("blah"), alias="x"),
f.mapContains(column("attr_strz"), literal("blah"), alias="z"),
),
),
Query(
QueryEntity(EntityKey.EAP_SPANS, ColumnSet([])),
selected_columns=[
SelectedExpression(
"unrelated",
Column(None, None, "column2"),
),
],
condition=binary_condition(
"or",
f.mapContains(column("attr_str_2"), literal("blah"), alias="x"),
f.mapContains(column("attr_strz"), literal("blah"), alias="z"),
),
),
),
]


Expand Down

0 comments on commit d02cff4

Please sign in to comment.