Skip to content

Commit

Permalink
Revert "ref(rust): Remove Python processors in favor of Rust ones (#5327
Browse files Browse the repository at this point in the history
)"

This reverts commit 98b9810.

Co-authored-by: phacops <336345+phacops@users.noreply.github.com>
  • Loading branch information
getsentry-bot and phacops committed Jan 9, 2024
1 parent e3bf2f3 commit dd479de
Show file tree
Hide file tree
Showing 17 changed files with 735 additions and 207 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.15.3
sentry-kafka-schemas==0.1.41
sentry-kafka-schemas==0.1.38
sentry-redis-tools==0.1.7
sentry-relay==0.8.39
sentry-sdk==1.28.0
Expand Down
1 change: 1 addition & 0 deletions rust_snuba/src/processors/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ struct Function<'a> {
http_method: Option<&'a str>,
is_application: u8,
materialization_version: u8,
module: &'a str,
name: &'a str,
package: &'a str,
platform: &'a str,
Expand Down
24 changes: 16 additions & 8 deletions rust_snuba/src/processors/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ struct FromSentryTags {
transaction_method: Option<String>,
#[serde(rename(deserialize = "transaction.op"))]
transaction_op: Option<String>,
user: Option<String>,
#[serde(flatten)]
extra: BTreeMap<String, String>,
}
Expand Down Expand Up @@ -131,10 +130,6 @@ impl FromSentryTags {
tags.insert("status_code".into(), status_code.into());
}

if let Some(user) = &self.user {
tags.insert("user".into(), user.into());
}

for (key, value) in &self.extra {
tags.insert(key.into(), value.into());
}
Expand Down Expand Up @@ -208,11 +203,10 @@ impl TryFrom<FromSpanMessage> for Span {
0
};
let status = from.sentry_tags.status.unwrap_or_default() as u8;

let (sentry_tag_keys, sentry_tag_values) = from.sentry_tags.to_keys_values();
let transaction_op = from.sentry_tags.transaction_op.unwrap_or_default();

let (tag_keys, tag_values): (Vec<_>, Vec<_>) =
let (mut tag_keys, mut tag_values): (Vec<_>, Vec<_>) =
from.tags.unwrap_or_default().into_iter().unzip();

let (measurement_keys, measurement_values) = from
Expand All @@ -222,6 +216,21 @@ impl TryFrom<FromSpanMessage> for Span {
.map(|(k, v)| (k, v.value))
.unzip();

if let Some(http_method) = from.sentry_tags.http_method {
tag_keys.push("http.method".into());
tag_values.push(http_method);
}

if let Some(status_code) = from.sentry_tags.status_code {
tag_keys.push("status_code".into());
tag_values.push(status_code);
}

if let Some(transaction_method) = from.sentry_tags.transaction_method {
tag_keys.push("transaction.method".into());
tag_values.push(transaction_method);
}

let metrics_summary = match from._metrics_summary {
Value::Object(v) => serde_json::to_string(&v).unwrap_or_default(),
_ => "".into(),
Expand Down Expand Up @@ -261,7 +270,6 @@ impl TryFrom<FromSpanMessage> for Span {
trace_id: from.trace_id,
transaction_id: from.event_id,
transaction_op,
user: from.sentry_tags.user.unwrap_or_default(),
..Default::default()
})
}
Expand Down
30 changes: 15 additions & 15 deletions snuba/datasets/configuration/spans/entities/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,28 @@ schema:
},
},
{
name: _tags_hash_map,
type: Array,
name: sentry_tags,
type: Nested,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
subcolumns:
[{ name: key, type: String }, { name: value, type: String }],
},
},
{
name: sentry_tags,
name: measurements,
type: Nested,
args:
{
subcolumns:
[{ name: key, type: String }, { name: value, type: String }],
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
},
},
{
name: _sentry_tags_hash_map,
name: _tags_hash_map,
type: Array,
args:
{
Expand All @@ -86,15 +89,12 @@ schema:
},
},
{
name: measurements,
type: Nested,
name: _sentry_tags_hash_map,
type: Array,
args:
{
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -128,7 +128,7 @@ storages:
from_column_table: null
from_column_name: sentry_tags
to_nested_col_table: null
to_nested_col_name: sentry_tags
to_nested_col_name: tags
value_subcolumn_name: value
nullable: false
- mapper: SubscriptableMapper
Expand Down
30 changes: 9 additions & 21 deletions snuba/datasets/configuration/spans/storages/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,6 @@ schema:
[ { name: key, type: String }, { name: value, type: String } ],
},
},
{
name: _tags_hash_map,
type: Array,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
},
},
{
name: sentry_tags,
type: Nested,
args:
{
subcolumns:
[ { name: key, type: String }, { name: value, type: String } ],
},
},
{
name: measurements,
type: Nested,
Expand All @@ -78,6 +60,15 @@ schema:
],
},
},
{
name: _tags_hash_map,
type: Array,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
{ name: offset, type: UInt, args: { size: 64 } },
{ name: retention_days, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -121,9 +112,6 @@ query_processors:
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: sentry_tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: measurements
Expand Down
74 changes: 70 additions & 4 deletions snuba/datasets/processors/functions_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,72 @@
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor
import uuid
from datetime import datetime
from typing import Any, Mapping, Optional

from sentry_relay.consts import SPAN_STATUS_NAME_TO_CODE

class FunctionsMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("FunctionsMessageProcessor")
from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.processor import InsertBatch, ProcessedMessage
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "functions.processor")

UNKNOWN_SPAN_STATUS = 2
MAX_DEPTH = 1024


class FunctionsMessageProcessor(DatasetMessageProcessor):
def process_message(
self, message: Mapping[str, Any], metadata: KafkaMessageMetadata
) -> Optional[ProcessedMessage]:
status = message.get("transaction_status")
if status:
int_status = SPAN_STATUS_NAME_TO_CODE.get(
status,
UNKNOWN_SPAN_STATUS,
)
else:
int_status = UNKNOWN_SPAN_STATUS
functions_list = [
{
"project_id": message["project_id"],
"transaction_name": message["transaction_name"],
"timestamp": int(
message.get("timestamp", datetime.utcnow().timestamp())
),
"depth": 0, # deprecated
"parent_fingerprint": 0, # deprecated
"fingerprint": function["fingerprint"],
"name": function["function"], # to be removed
"function": function["function"],
"package": function.get("package", ""),
"module": function.get("module", ""),
"path": "", # deprecated
"is_application": 1 if function.get("in_app", True) else 0,
"platform": message["platform"],
"environment": message.get("environment"),
"release": message.get("release"),
"dist": message.get("dist"),
"transaction_op": message.get("transaction_op", ""),
"transaction_status": int_status,
"http_method": message.get("http_method"),
"browser_name": message.get("browser_name"),
"device_classification": message.get("device_class", 0),
"os_name": "", # deprecated
"os_version": "", # deprecated
"retention_days": message["retention_days"],
"durations": function["self_times_ns"],
"profile_id": str(uuid.UUID(message["profile_id"])),
"materialization_version": 0,
}
for function in message["functions"]
if function.get("function") and function.get("self_times_ns")
]

received = message.get("received")

return InsertBatch(
functions_list,
datetime.utcfromtimestamp(received) if received else None,
)
Loading

0 comments on commit dd479de

Please sign in to comment.