Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into bump-arroyo
Browse files Browse the repository at this point in the history
  • Loading branch information
ayirr7 committed Sep 16, 2024
2 parents dd317b8 + b43bf24 commit 28a5fbb
Show file tree
Hide file tree
Showing 15 changed files with 320 additions and 74 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ RUN set -ex; \
runtimeDeps=' \
curl \
libjemalloc2 \
gdb \
heaptrack \
'; \
apt-get update; \
apt-get install -y $buildDeps $runtimeDeps --no-install-recommends; \
Expand Down
4 changes: 4 additions & 0 deletions docker_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ else
printf "\n${help_result}"
fi

if [ -n "${ENABLE_HEAPTRACK:-}" ]; then
set -- heaptrack "$@"
fi

exec "$@"
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ sqlparse==0.4.2
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
freezegun==1.2.2
sentry-protos==0.1.16
sentry-protos==0.1.21
10 changes: 8 additions & 2 deletions rust_snuba/src/processors/eap_spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ struct EAPSpan {
name: String, //aka description

sampling_factor: f64,
sampling_weight: f64,
sampling_weight: f64, //remove eventually
sampling_weight_2: u64,
sign: u8, //1 for additions, -1 for deletions - for this worker it should be 1

#(
Expand Down Expand Up @@ -101,7 +102,8 @@ impl From<FromSpanMessage> for EAPSpan {
retention_days: from.retention_days,
name: from.description.unwrap_or_default(),

sampling_weight: 1.,
sampling_weight: 1., //remove eventually
sampling_weight_2: 1,
sampling_factor: 1.,
sign: 1,

Expand Down Expand Up @@ -153,6 +155,7 @@ impl From<FromSpanMessage> for EAPSpan {
if k == "client_sample_rate" && v.value != 0.0 {
res.sampling_factor = v.value;
res.sampling_weight = 1.0 / v.value;
res.sampling_weight_2 = (1.0 / v.value) as u64;
} else {
insert_num(k.clone(), v.value);
}
Expand Down Expand Up @@ -217,6 +220,9 @@ mod tests {
"measurements": {
"num_of_spans": {
"value": 50.0
},
"client_sample_rate": {
"value": 0.01
}
},
"organization_id": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ expression: span
"exclusive_time_ms": 0.228,
"retention_days": 90,
"name": "/api/0/relays/projectconfigs/",
"sampling_factor": 1.0,
"sampling_weight": 1.0,
"sampling_factor": 0.01,
"sampling_weight": 100.0,
"sampling_weight_2": 100,
"sign": 1,
"attr_str_0": {
"relay_protocol_version": "3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ expression: snapshot_payload
"retention_days": 90,
"sampling_factor": 1.0,
"sampling_weight": 1.0,
"sampling_weight_2": 1,
"segment_id": 16045690984833335023,
"segment_name": "/organizations/:orgId/issues/",
"service": "1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ schema:
{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: name, type: String },
{ name: sampling_factor, type: Float, args: { size: 64 } },
{ name: sampling_weight, type: UInt, args: { size: 64 } },
{ name: sampling_weight, type: Float, args: { size: 64 } },
{ name: sampling_weight_2, type: UInt, args: { size: 64 } },
{ name: sign, type: Int, args: { size: 8 } },
{ name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } },
{ name: attr_num, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } },
Expand All @@ -39,12 +40,7 @@ storages:
from_col_name: timestamp
to_table_name: null
to_col_name: _sort_timestamp
- mapper: ColumnToColumn
args:
from_table_name: null
from_col_name: sampling_weight
to_table_name: null
to_col_name: sampling_weight_2

subscriptables:
- mapper: SubscriptableHashBucketMapper
args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ query_processors:
prewhere_candidates:
[span_id, trace_id, segment_name]
- processor: TupleUnaliaser
- processor: ClickhouseSettingsOverride
args:
settings:
max_memory_usage: 5000000000
max_rows_to_group_by: 1000000
group_by_overflow_mode: any

mandatory_condition_checkers:
- condition: OrgIdEnforcer
Expand Down
20 changes: 20 additions & 0 deletions snuba/datasets/readiness_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@


class ReadinessState(Enum):
"""
Readiness states are essentially feature flags for snuba datasets.
The readiness state defines whether or not a dataset is made available
in specific sentry environments.
Currently, sentry environments include the following:
* local/CI
* SaaS
* S4S
* Self-Hosted
* Single-Tenant
The following is a list of readiness states and the environments
they map to:
* limited -> local/CI
* experimental -> local/CI, S4S
* partial -> local/CI, SaaS, S4S
* deprecate -> local/CI, Self-Hosted
* complete -> local/CI, SaaS, S4S, Self-Hosted, Single-Tenant
"""

LIMITED = "limited"
DEPRECATE = "deprecate"
PARTIAL = "partial"
Expand Down
74 changes: 72 additions & 2 deletions snuba/web/rpc/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import Final, Mapping, Set
from typing import Final, Mapping, Sequence, Set

from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import (
AttributeKey,
VirtualColumnContext,
)
from sentry_protos.snuba.v1alpha.trace_item_filter_pb2 import (
ComparisonFilter,
TraceItemFilter,
Expand Down Expand Up @@ -130,6 +133,73 @@ def attribute_key_to_expression(attr_key: AttributeKey) -> Expression:
)


def apply_virtual_columns(
query: Query, virtual_column_contexts: Sequence[VirtualColumnContext]
) -> None:
"""Injects virtual column mappings into the clickhouse query. Works with NORMALIZED_COLUMNS on the table or
dynamic columns in attr_str
attr_num not supported because mapping on floats is a bad idea
Example:
SELECT
project_name AS `project_name`,
attr_str['release'] AS `release`,
attr_str['sentry.sdk.name'] AS `sentry.sdk.name`,
... rest of query
contexts:
[ {from_column_name: project_id, to_column_name: project_name, value_map: {1: "sentry", 2: "snuba"}} ]
Query will be transformed into:
SELECT
-- see the project name column transformed and the value mapping injected
transform( CAST( project_id, 'String'), array( '1', '2'), array( 'sentry', 'snuba'), 'unknown') AS `project_name`,
--
attr_str['release'] AS `release`,
attr_str['sentry.sdk.name'] AS `sentry.sdk.name`,
... rest of query
"""

if not virtual_column_contexts:
return

mapped_column_to_context = {c.to_column_name: c for c in virtual_column_contexts}

def transform_expressions(expression: Expression) -> Expression:
# virtual columns will show up as `attr_str[virtual_column_name]` or `attr_num[virtual_column_name]`
if not isinstance(expression, SubscriptableReference):
return expression

if expression.column.column_name != "attr_str":
return expression
context = mapped_column_to_context.get(str(expression.key.value))
if context:
attribute_expression = attribute_key_to_expression(
AttributeKey(
name=context.from_column_name,
type=NORMALIZED_COLUMNS.get(
context.from_column_name, AttributeKey.TYPE_STRING
),
)
)
return f.transform(
f.CAST(attribute_expression, "String"),
literals_array(None, [literal(k) for k in context.value_map.keys()]),
literals_array(None, [literal(v) for v in context.value_map.values()]),
literal("unknown"),
alias=context.to_column_name,
)

return expression

query.transform_expressions(transform_expressions)


def trace_item_filters_to_expression(item_filter: TraceItemFilter) -> Expression:
"""
Trace Item Filters are things like (span.id=12345 AND start_timestamp >= "june 4th, 2024")
Expand Down
2 changes: 2 additions & 0 deletions snuba/web/rpc/span_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from snuba.utils.metrics.timer import Timer
from snuba.web.query import run_query
from snuba.web.rpc.common import (
apply_virtual_columns,
attribute_key_to_expression,
base_conditions_and,
trace_item_filters_to_expression,
Expand Down Expand Up @@ -71,6 +72,7 @@ def _build_query(request: SpanSamplesRequest) -> Query:
limit=request.limit,
)
treeify_or_and_conditions(res)
apply_virtual_columns(res, request.virtual_column_contexts)
return res


Expand Down
58 changes: 24 additions & 34 deletions snuba/web/rpc/tags_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from typing import List, Optional

from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TagsListRequest,
TagsListResponse,
TraceItemAttributesRequest,
TraceItemAttributesResponse,
)
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey

from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode
from snuba.datasets.schemas.tables import TableSource
Expand All @@ -15,15 +16,17 @@


def tags_list_query(
request: TagsListRequest, _timer: Optional[Timer] = None
) -> TagsListResponse:
str_storage = get_storage(StorageKey("spans_str_attrs"))
num_storage = get_storage(StorageKey("spans_num_attrs"))

str_data_source = str_storage.get_schema().get_data_source()
assert isinstance(str_data_source, TableSource)
num_data_source = num_storage.get_schema().get_data_source()
assert isinstance(num_data_source, TableSource)
request: TraceItemAttributesRequest, _timer: Optional[Timer] = None
) -> TraceItemAttributesResponse:
if request.type == AttributeKey.Type.TYPE_STRING:
storage = get_storage(StorageKey("spans_str_attrs"))
elif request.type == AttributeKey.Type.TYPE_FLOAT:
storage = get_storage(StorageKey("spans_num_attrs"))
else:
return TraceItemAttributesResponse(tags=[])

data_source = storage.get_schema().get_data_source()
assert isinstance(data_source, TableSource)

if request.limit > 1000:
raise BadSnubaRPCRequestException("Limit can be at most 1000")
Expand All @@ -35,39 +38,26 @@ def tags_list_query(
)

query = f"""
SELECT * FROM (
SELECT DISTINCT attr_key, 'str' as type, timestamp
FROM {str_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
UNION ALL
SELECT DISTINCT attr_key, 'num' as type, timestamp
FROM {num_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
)
SELECT DISTINCT attr_key, timestamp
FROM {data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
ORDER BY attr_key
LIMIT {request.limit} OFFSET {request.offset}
"""

cluster = str_storage.get_cluster()
cluster = storage.get_cluster()
reader = cluster.get_reader()
result = reader.execute(FormattedQuery([StringNode(query)]))

tags: List[TagsListResponse.Tag] = []
tags: List[TraceItemAttributesResponse.Tag] = []
for row in result.get("data", []):
tags.append(
TagsListResponse.Tag(
TraceItemAttributesResponse.Tag(
name=row["attr_key"],
type={
"str": TagsListResponse.TYPE_STRING,
"num": TagsListResponse.TYPE_NUMBER,
}[row["type"]],
type=request.type,
)
)

return TagsListResponse(tags=tags)
return TraceItemAttributesResponse(tags=tags)
6 changes: 4 additions & 2 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
AggregateBucketRequest,
)
from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import TagsListRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TraceItemAttributesRequest,
)
from werkzeug import Response as WerkzeugResponse
from werkzeug.exceptions import InternalServerError

Expand Down Expand Up @@ -288,7 +290,7 @@ def rpc(*, name: str, timer: Timer) -> Response:
] = {
"AggregateBucketRequest": (timeseries_query, AggregateBucketRequest),
"SpanSamplesRequest": (span_samples_query, SpanSamplesRequest),
"TagsListRequest": (tags_list_query, TagsListRequest),
"TraceItemAttributesRequest": (tags_list_query, TraceItemAttributesRequest),
}
try:
endpoint, req_class = rpcs[name]
Expand Down
Loading

0 comments on commit 28a5fbb

Please sign in to comment.