Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(rust): Remove Python processors in favor of Rust ones #5327

Merged
merged 20 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 4 additions & 70 deletions snuba/datasets/processors/functions_processor.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,6 @@
import uuid
from datetime import datetime
from typing import Any, Mapping, Optional
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor

from sentry_relay.consts import SPAN_STATUS_NAME_TO_CODE

from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.processor import InsertBatch, ProcessedMessage
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "functions.processor")

UNKNOWN_SPAN_STATUS = 2
MAX_DEPTH = 1024


class FunctionsMessageProcessor(DatasetMessageProcessor):
def process_message(
self, message: Mapping[str, Any], metadata: KafkaMessageMetadata
) -> Optional[ProcessedMessage]:
status = message.get("transaction_status")
if status:
int_status = SPAN_STATUS_NAME_TO_CODE.get(
status,
UNKNOWN_SPAN_STATUS,
)
else:
int_status = UNKNOWN_SPAN_STATUS
functions_list = [
{
"project_id": message["project_id"],
"transaction_name": message["transaction_name"],
"timestamp": int(
message.get("timestamp", datetime.utcnow().timestamp())
),
"depth": 0, # deprecated
"parent_fingerprint": 0, # deprecated
"fingerprint": function["fingerprint"],
"name": function["function"], # to be removed
"function": function["function"],
"package": function.get("package", ""),
"module": function.get("module", ""),
"path": "", # deprecated
"is_application": 1 if function.get("in_app", True) else 0,
"platform": message["platform"],
"environment": message.get("environment"),
"release": message.get("release"),
"dist": message.get("dist"),
"transaction_op": message.get("transaction_op", ""),
"transaction_status": int_status,
"http_method": message.get("http_method"),
"browser_name": message.get("browser_name"),
"device_classification": message.get("device_class", 0),
"os_name": "", # deprecated
"os_version": "", # deprecated
"retention_days": message["retention_days"],
"durations": function["self_times_ns"],
"profile_id": str(uuid.UUID(message["profile_id"])),
"materialization_version": 0,
}
for function in message["functions"]
if function.get("function") and function.get("self_times_ns")
]

received = message.get("received")

return InsertBatch(
functions_list,
datetime.utcfromtimestamp(received) if received else None,
)
class FunctionsMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("FunctionsMessageProcessor")
130 changes: 4 additions & 126 deletions snuba/datasets/processors/metrics_summaries_processor.py
Original file line number Diff line number Diff line change
@@ -1,128 +1,6 @@
import time
import uuid
from copy import deepcopy
from datetime import datetime, timezone
from typing import Any, Mapping, MutableMapping, MutableSequence, Optional, Tuple
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor

import structlog
from sentry_kafka_schemas.schema_types.snuba_spans_v1 import SpanEvent

from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.events_format import (
EventTooOld,
enforce_retention,
extract_extra_tags,
)
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.datasets.processors.spans_processor import RetentionDays
from snuba.processor import (
InsertBatch,
ProcessedMessage,
_as_dict_safe,
_ensure_valid_date,
)
from snuba.utils.metrics.wrapper import MetricsWrapper

logger = structlog.get_logger(__name__)

metrics = MetricsWrapper(environment.metrics, "metrics_summaries.processor")

MetricsSummaries = MutableSequence[MutableMapping[str, Any]]


class MetricsSummariesMessageProcessor(DatasetMessageProcessor):
"""
Message processor for writing metrics summary data to the metrics_summaries table.
"""

def __extract_timestamp(self, timestamp_ms: int) -> Tuple[int, int]:
# We are purposely using a naive datetime here to work with the rest of the codebase.
# We can be confident that clients are only sending UTC dates.
timestamp_sec = timestamp_ms / 1000
if _ensure_valid_date(datetime.utcfromtimestamp(timestamp_sec)) is None:
timestamp_sec = int(time.time())
return int(timestamp_sec), int(timestamp_ms % 1000)

@staticmethod
def _structure_and_validate_message(
message: SpanEvent,
) -> Optional[Tuple[SpanEvent, RetentionDays]]:
if not message.get("_metrics_summary"):
return None
try:
# We are purposely using a naive datetime here to work with the
# rest of the codebase. We can be confident that clients are only
# sending UTC dates.
retention_days = enforce_retention(
message.get("retention_days"),
datetime.utcfromtimestamp(message["start_timestamp_ms"] / 1000),
)
except EventTooOld:
return None

return message, retention_days

def _process_span_event(
self,
span_event: SpanEvent,
retention_days: Optional[RetentionDays],
) -> MetricsSummaries:
end_timestamp, _ = self.__extract_timestamp(
span_event["start_timestamp_ms"] + span_event["duration_ms"],
)
common_fields = {
"deleted": 0,
"end_timestamp": end_timestamp,
"project_id": span_event["project_id"],
"retention_days": retention_days,
"span_id": int(span_event["span_id"], 16),
"trace_id": str(uuid.UUID(span_event["trace_id"])),
}

processed_rows: MetricsSummaries = []
metrics_summary: Mapping[str, Any] = _as_dict_safe(
span_event.get("_metrics_summary", None)
)
for metric_mri, metric_values in metrics_summary.items():
for metric_value in metric_values:
processed: MutableMapping[str, Any] = deepcopy(common_fields)

tags: Mapping[str, Any] = _as_dict_safe(metric_value.get("tags", None))
processed["tags.key"], processed["tags.value"] = extract_extra_tags(
tags
)

processed["metric_mri"] = metric_mri
processed["count"] = int(metric_value["count"])

for key in {"min", "max", "sum"}:
processed[key] = float(metric_value[key])

processed_rows.append(processed)

return processed_rows

def process_message(
self,
message: SpanEvent,
metadata: KafkaMessageMetadata,
) -> Optional[ProcessedMessage]:
span_event, retention_days = self._structure_and_validate_message(message) or (
None,
None,
)
if not span_event:
return None
try:
processed_rows = self._process_span_event(span_event, retention_days)
except Exception:
metrics.increment("message_processing_error")
return None

received = (
datetime.fromtimestamp(span_event["received"], tz=timezone.utc)
if "received" in span_event
else None
)
return InsertBatch(rows=processed_rows, origin_timestamp=received)
class MetricsSummariesMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("MetricsSummariesMessageProcessor")
77 changes: 4 additions & 73 deletions snuba/datasets/processors/profiles_processor.py
Original file line number Diff line number Diff line change
@@ -1,75 +1,6 @@
from datetime import datetime
from typing import Any, Mapping, Optional
from uuid import UUID
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor

from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.events_format import EventTooOld, enforce_retention
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.processor import InsertBatch, ProcessedMessage
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "profiles.processor")


class ProfilesMessageProcessor(DatasetMessageProcessor):
def process_message(
self, message: Mapping[str, Any], metadata: KafkaMessageMetadata
) -> Optional[ProcessedMessage]:
try:
received = datetime.utcfromtimestamp(message["received"])
retention_days = enforce_retention(
message["retention_days"],
received,
)
processed = _normalize(
message,
metadata,
retention_days,
)
except EventTooOld:
metrics.increment("event_too_old")
return None
except IndexError:
metrics.increment("invalid_transaction")
return None
except ValueError:
metrics.increment("invalid_uuid")
return None
except KeyError:
metrics.increment("missing_field")
return None
return InsertBatch([processed], received)


def _normalize(
message: Mapping[str, Any],
metadata: KafkaMessageMetadata,
retention_days: int,
) -> Mapping[str, Any]:
return {
"android_api_level": message.get("android_api_level"),
"architecture": message.get("architecture", "unknown"),
"device_classification": message.get("device_classification", ""),
"device_locale": message["device_locale"],
"device_manufacturer": message["device_manufacturer"],
"device_model": message["device_model"],
"device_os_build_number": message.get("device_os_build_number"),
"device_os_name": message["device_os_name"],
"device_os_version": message["device_os_version"],
"duration_ns": message["duration_ns"],
"environment": message.get("environment"),
"offset": metadata.offset,
"organization_id": message["organization_id"],
"partition": metadata.partition,
"platform": message["platform"],
"profile_id": str(UUID(message["profile_id"])),
"project_id": message["project_id"],
"received": message["received"],
"retention_days": retention_days,
"trace_id": str(UUID(message["trace_id"])),
"transaction_id": str(UUID(message["transaction_id"])),
"transaction_name": message["transaction_name"],
"version_code": message["version_code"],
"version_name": message["version_name"],
}
class ProfilesMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("ProfilesMessageProcessor")
Loading
Loading