Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(rust): Remove Python processors in favor of Rust ones #5345

Merged
merged 22 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
892e640
ref(rust): Remove Python processors in favor of Rust ones
phacops Jan 8, 2024
e75f99a
Remove also the Querylog processor as it's Rust only
phacops Jan 8, 2024
45e445d
ref(spans): Remove deprecated field from payload and set default valu…
phacops Jan 8, 2024
b22d1bd
Set parent_span_id as optional
phacops Jan 8, 2024
fdf2428
Fix test value
phacops Jan 8, 2024
0eeaeda
Set segement_id as optional as well
phacops Jan 8, 2024
0b461a4
Merge branch 'pierre/spans-remove-deprecated-group_raw-field' into pi…
phacops Jan 8, 2024
f9f5bdc
Fix sentry_tags column target in spans entity
phacops Jan 8, 2024
501779e
Remove unused module field for functions
phacops Jan 8, 2024
bccf645
Remove module field as it's not used or ingested
phacops Jan 8, 2024
2279cf1
Set origin_timestamp as None since the RustCompatProcessor does not s…
phacops Jan 8, 2024
89f2ad2
Merge branch 'master' into pierre/cleanup-python-processors
phacops Jan 8, 2024
e1b7fa3
Fix profiles tests
phacops Jan 8, 2024
3ae9adc
Fix spans payload tests
phacops Jan 8, 2024
43dc760
Fix spans processor tests
phacops Jan 8, 2024
a3affcd
ref: bump sentry-kafka-schemas to 0.1.41
getsentry-bot Jan 8, 2024
b92c5d8
Merge branch 'bot/bump-version/sentry-kafka-schemas/0.1.41' into pier…
phacops Jan 8, 2024
361ea4d
Fix typing issues
phacops Jan 8, 2024
cecda72
Remove replays tests until they're fixed
phacops Jan 8, 2024
82c5600
Skip replay schema tests until they can pass
phacops Jan 8, 2024
7b222fe
Revert functions processor changes
phacops Jan 9, 2024
51d9f87
Merge branch 'master' into pierre/cleanup-python-processors-2
phacops Jan 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.15.3
sentry-kafka-schemas==0.1.38
sentry-kafka-schemas==0.1.41
sentry-redis-tools==0.1.7
sentry-relay==0.8.39
sentry-sdk==1.28.0
Expand Down
1 change: 0 additions & 1 deletion rust_snuba/src/processors/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ struct Function<'a> {
http_method: Option<&'a str>,
is_application: u8,
materialization_version: u8,
module: &'a str,
name: &'a str,
package: &'a str,
platform: &'a str,
Expand Down
24 changes: 8 additions & 16 deletions rust_snuba/src/processors/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct FromSentryTags {
transaction_method: Option<String>,
#[serde(rename(deserialize = "transaction.op"))]
transaction_op: Option<String>,
user: Option<String>,
#[serde(flatten)]
extra: BTreeMap<String, String>,
}
Expand Down Expand Up @@ -130,6 +131,10 @@ impl FromSentryTags {
tags.insert("status_code".into(), status_code.into());
}

if let Some(user) = &self.user {
tags.insert("user".into(), user.into());
}

for (key, value) in &self.extra {
tags.insert(key.into(), value.into());
}
Expand Down Expand Up @@ -203,10 +208,11 @@ impl TryFrom<FromSpanMessage> for Span {
0
};
let status = from.sentry_tags.status.unwrap_or_default() as u8;

let (sentry_tag_keys, sentry_tag_values) = from.sentry_tags.to_keys_values();
let transaction_op = from.sentry_tags.transaction_op.unwrap_or_default();

let (mut tag_keys, mut tag_values): (Vec<_>, Vec<_>) =
let (tag_keys, tag_values): (Vec<_>, Vec<_>) =
from.tags.unwrap_or_default().into_iter().unzip();

let (measurement_keys, measurement_values) = from
Expand All @@ -216,21 +222,6 @@ impl TryFrom<FromSpanMessage> for Span {
.map(|(k, v)| (k, v.value))
.unzip();

if let Some(http_method) = from.sentry_tags.http_method {
tag_keys.push("http.method".into());
tag_values.push(http_method);
}

if let Some(status_code) = from.sentry_tags.status_code {
tag_keys.push("status_code".into());
tag_values.push(status_code);
}

if let Some(transaction_method) = from.sentry_tags.transaction_method {
tag_keys.push("transaction.method".into());
tag_values.push(transaction_method);
}

let metrics_summary = match from._metrics_summary {
Value::Object(v) => serde_json::to_string(&v).unwrap_or_default(),
_ => "".into(),
Expand Down Expand Up @@ -270,6 +261,7 @@ impl TryFrom<FromSpanMessage> for Span {
trace_id: from.trace_id,
transaction_id: from.event_id,
transaction_op,
user: from.sentry_tags.user.unwrap_or_default(),
..Default::default()
})
}
Expand Down
30 changes: 15 additions & 15 deletions snuba/datasets/configuration/spans/entities/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,25 @@ schema:
},
},
{
name: sentry_tags,
type: Nested,
name: _tags_hash_map,
type: Array,
args:
{
subcolumns:
[{ name: key, type: String }, { name: value, type: String }],
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
},
},
{
name: measurements,
name: sentry_tags,
type: Nested,
args:
{
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
[{ name: key, type: String }, { name: value, type: String }],
},
},
{
name: _tags_hash_map,
name: _sentry_tags_hash_map,
type: Array,
args:
{
Expand All @@ -89,12 +86,15 @@ schema:
},
},
{
name: _sentry_tags_hash_map,
type: Array,
name: measurements,
type: Nested,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -128,7 +128,7 @@ storages:
from_column_table: null
from_column_name: sentry_tags
to_nested_col_table: null
to_nested_col_name: tags
to_nested_col_name: sentry_tags
value_subcolumn_name: value
nullable: false
- mapper: SubscriptableMapper
Expand Down
30 changes: 21 additions & 9 deletions snuba/datasets/configuration/spans/storages/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,33 @@ schema:
},
},
{
name: measurements,
name: _tags_hash_map,
type: Array,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
},
},
{
name: sentry_tags,
type: Nested,
args:
{
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
[ { name: key, type: String }, { name: value, type: String } ],
},
},
{
name: _tags_hash_map,
type: Array,
name: measurements,
type: Nested,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -112,6 +121,9 @@ query_processors:
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: sentry_tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: measurements
Expand Down
1 change: 0 additions & 1 deletion snuba/datasets/processors/functions_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ def process_message(
"name": function["function"], # to be removed
"function": function["function"],
"package": function.get("package", ""),
"module": function.get("module", ""),
"path": "", # deprecated
"is_application": 1 if function.get("in_app", True) else 0,
"platform": message["platform"],
Expand Down
130 changes: 4 additions & 126 deletions snuba/datasets/processors/metrics_summaries_processor.py
Original file line number Diff line number Diff line change
@@ -1,128 +1,6 @@
import time
import uuid
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any, Mapping, MutableMapping, MutableSequence, Optional, Tuple
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor

import structlog
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent

from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.events_format import (
EventTooOld,
enforce_retention,
extract_extra_tags,
)
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.datasets.processors.spans_processor import RetentionDays
from snuba.processor import (
InsertBatch,
ProcessedMessage,
_as_dict_safe,
_ensure_valid_date,
)
from snuba.utils.metrics.wrapper import MetricsWrapper

logger = structlog.get_logger(__name__)

metrics = MetricsWrapper(environment.metrics, "metrics_summaries.processor")

MetricsSummaries = MutableSequence[MutableMapping[str, Any]]


class MetricsSummariesMessageProcessor(DatasetMessageProcessor):
"""
Message processor for writing metrics summary data to the metrics_summaries table.
"""

def __extract_timestamp(self, timestamp_ms: int) -> Tuple[int, int]:
# We are purposely using a naive datetime here to work with the rest of the codebase.
# We can be confident that clients are only sending UTC dates.
timestamp_sec = timestamp_ms / 1000
if _ensure_valid_date(datetime.utcfromtimestamp(timestamp_sec)) is None:
timestamp_sec = int(time.time())
return int(timestamp_sec), int(timestamp_ms % 1000)

@staticmethod
def _structure_and_validate_message(
message: SpanEvent,
) -> Optional[Tuple[SpanEvent, RetentionDays]]:
if not message.get("_metrics_summary"):
return None
try:
# We are purposely using a naive datetime here to work with the
# rest of the codebase. We can be confident that clients are only
# sending UTC dates.
retention_days = enforce_retention(
message.get("retention_days"),
datetime.utcfromtimestamp(message["start_timestamp_ms"] / 1000),
)
except EventTooOld:
return None

return message, retention_days

def _process_span_event(
self,
span_event: SpanEvent,
retention_days: Optional[RetentionDays],
) -> MetricsSummaries:
end_timestamp, _ = self.__extract_timestamp(
span_event["start_timestamp_ms"] + span_event["duration_ms"],
)
common_fields = {
"deleted": 0,
"end_timestamp": end_timestamp,
"project_id": span_event["project_id"],
"retention_days": retention_days,
"span_id": int(span_event["span_id"], 16),
"trace_id": str(uuid.UUID(span_event["trace_id"])),
}

processed_rows: MetricsSummaries = []
metrics_summary: Mapping[str, Any] = _as_dict_safe(
span_event.get("_metrics_summary", None)
)
for metric_mri, metric_values in metrics_summary.items():
for metric_value in metric_values:
processed: MutableMapping[str, Any] = deepcopy(common_fields)

tags: Mapping[str, Any] = _as_dict_safe(metric_value.get("tags", None))
processed["tags.key"], processed["tags.value"] = extract_extra_tags(
tags
)

processed["metric_mri"] = metric_mri
processed["count"] = int(metric_value["count"])

for key in {"min", "max", "sum"}:
processed[key] = float(metric_value[key])

processed_rows.append(processed)

return processed_rows

def process_message(
self,
message: SpanEvent,
metadata: KafkaMessageMetadata,
) -> Optional[ProcessedMessage]:
span_event, retention_days = self._structure_and_validate_message(message) or (
None,
None,
)
if not span_event:
return None
try:
processed_rows = self._process_span_event(span_event, retention_days)
except Exception:
metrics.increment("message_processing_error")
return None

received = (
datetime.fromtimestamp(span_event["received"], tz=timezone.utc)
if "received" in span_event
else None
)
return InsertBatch(rows=processed_rows, origin_timestamp=received)
class MetricsSummariesMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("MetricsSummariesMessageProcessor")
Loading
Loading