Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(rust): Remove Python processors in favor of Rust ones #5327

Merged
merged 20 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rust_snuba/src/processors/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ struct Function<'a> {
http_method: Option<&'a str>,
is_application: u8,
materialization_version: u8,
module: &'a str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come the rust processor is changing too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is unused and was picked up by the Rust processor when the Python processor wasn't ingesting it.

name: &'a str,
package: &'a str,
platform: &'a str,
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/src/processors/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ impl TryFrom<FromSpanMessage> for Span {
0
};
let status = from.sentry_tags.status.unwrap_or_default() as u8;

let (sentry_tag_keys, sentry_tag_values) = from.sentry_tags.to_keys_values();
let transaction_op = from.sentry_tags.transaction_op.unwrap_or_default();

Expand Down
30 changes: 15 additions & 15 deletions snuba/datasets/configuration/spans/entities/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,25 @@ schema:
},
},
{
name: sentry_tags,
type: Nested,
name: _tags_hash_map,
type: Array,
args:
{
subcolumns:
[{ name: key, type: String }, { name: value, type: String }],
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
},
},
{
name: measurements,
name: sentry_tags,
type: Nested,
args:
{
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
[{ name: key, type: String }, { name: value, type: String }],
},
},
{
name: _tags_hash_map,
name: _sentry_tags_hash_map,
type: Array,
args:
{
Expand All @@ -89,12 +86,15 @@ schema:
},
},
{
name: _sentry_tags_hash_map,
type: Array,
name: measurements,
type: Nested,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -128,7 +128,7 @@ storages:
from_column_table: null
from_column_name: sentry_tags
to_nested_col_table: null
to_nested_col_name: tags
to_nested_col_name: sentry_tags
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were fetching from tags instead of sentry_tags for the sentry_tags field.

value_subcolumn_name: value
nullable: false
- mapper: SubscriptableMapper
Expand Down
30 changes: 21 additions & 9 deletions snuba/datasets/configuration/spans/storages/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,33 @@ schema:
},
},
{
name: measurements,
name: _tags_hash_map,
type: Array,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
},
},
{
name: sentry_tags,
type: Nested,
args:
{
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
[ { name: key, type: String }, { name: value, type: String } ],
},
},
{
name: _tags_hash_map,
type: Array,
name: measurements,
type: Nested,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -112,6 +121,9 @@ query_processors:
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: sentry_tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: measurements
Expand Down
74 changes: 4 additions & 70 deletions snuba/datasets/processors/functions_processor.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,6 @@
import uuid
from datetime import datetime
from typing import Any, Mapping, Optional
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor

from sentry_relay.consts import SPAN_STATUS_NAME_TO_CODE

from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.processor import InsertBatch, ProcessedMessage
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "functions.processor")

UNKNOWN_SPAN_STATUS = 2
MAX_DEPTH = 1024


class FunctionsMessageProcessor(DatasetMessageProcessor):
def process_message(
self, message: Mapping[str, Any], metadata: KafkaMessageMetadata
) -> Optional[ProcessedMessage]:
status = message.get("transaction_status")
if status:
int_status = SPAN_STATUS_NAME_TO_CODE.get(
status,
UNKNOWN_SPAN_STATUS,
)
else:
int_status = UNKNOWN_SPAN_STATUS
functions_list = [
{
"project_id": message["project_id"],
"transaction_name": message["transaction_name"],
"timestamp": int(
message.get("timestamp", datetime.utcnow().timestamp())
),
"depth": 0, # deprecated
"parent_fingerprint": 0, # deprecated
"fingerprint": function["fingerprint"],
"name": function["function"], # to be removed
"function": function["function"],
"package": function.get("package", ""),
"module": function.get("module", ""),
"path": "", # deprecated
"is_application": 1 if function.get("in_app", True) else 0,
"platform": message["platform"],
"environment": message.get("environment"),
"release": message.get("release"),
"dist": message.get("dist"),
"transaction_op": message.get("transaction_op", ""),
"transaction_status": int_status,
"http_method": message.get("http_method"),
"browser_name": message.get("browser_name"),
"device_classification": message.get("device_class", 0),
"os_name": "", # deprecated
"os_version": "", # deprecated
"retention_days": message["retention_days"],
"durations": function["self_times_ns"],
"profile_id": str(uuid.UUID(message["profile_id"])),
"materialization_version": 0,
}
for function in message["functions"]
if function.get("function") and function.get("self_times_ns")
]

received = message.get("received")

return InsertBatch(
functions_list,
datetime.utcfromtimestamp(received) if received else None,
)
class FunctionsMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("FunctionsMessageProcessor")
130 changes: 4 additions & 126 deletions snuba/datasets/processors/metrics_summaries_processor.py
Original file line number Diff line number Diff line change
@@ -1,128 +1,6 @@
import time
import uuid
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any, Mapping, MutableMapping, MutableSequence, Optional, Tuple
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor

import structlog
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent

from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.events_format import (
EventTooOld,
enforce_retention,
extract_extra_tags,
)
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.datasets.processors.spans_processor import RetentionDays
from snuba.processor import (
InsertBatch,
ProcessedMessage,
_as_dict_safe,
_ensure_valid_date,
)
from snuba.utils.metrics.wrapper import MetricsWrapper

logger = structlog.get_logger(__name__)

metrics = MetricsWrapper(environment.metrics, "metrics_summaries.processor")

MetricsSummaries = MutableSequence[MutableMapping[str, Any]]


class MetricsSummariesMessageProcessor(DatasetMessageProcessor):
"""
Message processor for writing metrics summary data to the metrics_summaries table.
"""

def __extract_timestamp(self, timestamp_ms: int) -> Tuple[int, int]:
# We are purposely using a naive datetime here to work with the rest of the codebase.
# We can be confident that clients are only sending UTC dates.
timestamp_sec = timestamp_ms / 1000
if _ensure_valid_date(datetime.utcfromtimestamp(timestamp_sec)) is None:
timestamp_sec = int(time.time())
return int(timestamp_sec), int(timestamp_ms % 1000)

@staticmethod
def _structure_and_validate_message(
message: SpanEvent,
) -> Optional[Tuple[SpanEvent, RetentionDays]]:
if not message.get("_metrics_summary"):
return None
try:
# We are purposely using a naive datetime here to work with the
# rest of the codebase. We can be confident that clients are only
# sending UTC dates.
retention_days = enforce_retention(
message.get("retention_days"),
datetime.utcfromtimestamp(message["start_timestamp_ms"] / 1000),
)
except EventTooOld:
return None

return message, retention_days

def _process_span_event(
self,
span_event: SpanEvent,
retention_days: Optional[RetentionDays],
) -> MetricsSummaries:
end_timestamp, _ = self.__extract_timestamp(
span_event["start_timestamp_ms"] + span_event["duration_ms"],
)
common_fields = {
"deleted": 0,
"end_timestamp": end_timestamp,
"project_id": span_event["project_id"],
"retention_days": retention_days,
"span_id": int(span_event["span_id"], 16),
"trace_id": str(uuid.UUID(span_event["trace_id"])),
}

processed_rows: MetricsSummaries = []
metrics_summary: Mapping[str, Any] = _as_dict_safe(
span_event.get("_metrics_summary", None)
)
for metric_mri, metric_values in metrics_summary.items():
for metric_value in metric_values:
processed: MutableMapping[str, Any] = deepcopy(common_fields)

tags: Mapping[str, Any] = _as_dict_safe(metric_value.get("tags", None))
processed["tags.key"], processed["tags.value"] = extract_extra_tags(
tags
)

processed["metric_mri"] = metric_mri
processed["count"] = int(metric_value["count"])

for key in {"min", "max", "sum"}:
processed[key] = float(metric_value[key])

processed_rows.append(processed)

return processed_rows

def process_message(
self,
message: SpanEvent,
metadata: KafkaMessageMetadata,
) -> Optional[ProcessedMessage]:
span_event, retention_days = self._structure_and_validate_message(message) or (
None,
None,
)
if not span_event:
return None
try:
processed_rows = self._process_span_event(span_event, retention_days)
except Exception:
metrics.increment("message_processing_error")
return None

received = (
datetime.fromtimestamp(span_event["received"], tz=timezone.utc)
if "received" in span_event
else None
)
return InsertBatch(rows=processed_rows, origin_timestamp=received)
class MetricsSummariesMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("MetricsSummariesMessageProcessor")
Loading
Loading