Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(rust): Remove Python processors in favor of Rust ones #5327

Merged
merged 20 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ python-dateutil==2.8.2
python-rapidjson==1.8
redis==4.3.4
sentry-arroyo==2.15.3
sentry-kafka-schemas==0.1.38
sentry-kafka-schemas==0.1.41
sentry-redis-tools==0.1.7
sentry-relay==0.8.39
sentry-sdk==1.28.0
Expand Down
1 change: 0 additions & 1 deletion rust_snuba/src/processors/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ struct Function<'a> {
http_method: Option<&'a str>,
is_application: u8,
materialization_version: u8,
module: &'a str,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come the rust processor is changing too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is unused and was picked up by the Rust processor when the Python processor wasn't ingesting it.

name: &'a str,
package: &'a str,
platform: &'a str,
Expand Down
24 changes: 8 additions & 16 deletions rust_snuba/src/processors/spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct FromSentryTags {
transaction_method: Option<String>,
#[serde(rename(deserialize = "transaction.op"))]
transaction_op: Option<String>,
user: Option<String>,
#[serde(flatten)]
extra: BTreeMap<String, String>,
}
Expand Down Expand Up @@ -130,6 +131,10 @@ impl FromSentryTags {
tags.insert("status_code".into(), status_code.into());
}

if let Some(user) = &self.user {
tags.insert("user".into(), user.into());
}

for (key, value) in &self.extra {
tags.insert(key.into(), value.into());
}
Expand Down Expand Up @@ -203,10 +208,11 @@ impl TryFrom<FromSpanMessage> for Span {
0
};
let status = from.sentry_tags.status.unwrap_or_default() as u8;

let (sentry_tag_keys, sentry_tag_values) = from.sentry_tags.to_keys_values();
let transaction_op = from.sentry_tags.transaction_op.unwrap_or_default();

let (mut tag_keys, mut tag_values): (Vec<_>, Vec<_>) =
let (tag_keys, tag_values): (Vec<_>, Vec<_>) =
from.tags.unwrap_or_default().into_iter().unzip();

let (measurement_keys, measurement_values) = from
Expand All @@ -216,21 +222,6 @@ impl TryFrom<FromSpanMessage> for Span {
.map(|(k, v)| (k, v.value))
.unzip();

if let Some(http_method) = from.sentry_tags.http_method {
tag_keys.push("http.method".into());
tag_values.push(http_method);
}

if let Some(status_code) = from.sentry_tags.status_code {
tag_keys.push("status_code".into());
tag_values.push(status_code);
}

if let Some(transaction_method) = from.sentry_tags.transaction_method {
tag_keys.push("transaction.method".into());
tag_values.push(transaction_method);
}
Comment on lines -219 to -232
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not meant to be in tags, just sentry_tags, the Python processor had incorrect behavior.


let metrics_summary = match from._metrics_summary {
Value::Object(v) => serde_json::to_string(&v).unwrap_or_default(),
_ => "".into(),
Expand Down Expand Up @@ -270,6 +261,7 @@ impl TryFrom<FromSpanMessage> for Span {
trace_id: from.trace_id,
transaction_id: from.event_id,
transaction_op,
user: from.sentry_tags.user.unwrap_or_default(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting a user column value was somehow forgotten despite the column being there.

..Default::default()
})
}
Expand Down
30 changes: 15 additions & 15 deletions snuba/datasets/configuration/spans/entities/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,28 +59,25 @@ schema:
},
},
{
name: sentry_tags,
type: Nested,
name: _tags_hash_map,
type: Array,
args:
{
subcolumns:
[{ name: key, type: String }, { name: value, type: String }],
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
},
},
{
name: measurements,
name: sentry_tags,
type: Nested,
args:
{
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
[{ name: key, type: String }, { name: value, type: String }],
},
},
{
name: _tags_hash_map,
name: _sentry_tags_hash_map,
type: Array,
args:
{
Expand All @@ -89,12 +86,15 @@ schema:
},
},
{
name: _sentry_tags_hash_map,
type: Array,
name: measurements,
type: Nested,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [readonly],
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -128,7 +128,7 @@ storages:
from_column_table: null
from_column_name: sentry_tags
to_nested_col_table: null
to_nested_col_name: tags
to_nested_col_name: sentry_tags
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were fetching from tags instead of sentry_tags for the sentry_tags field.

value_subcolumn_name: value
nullable: false
- mapper: SubscriptableMapper
Expand Down
30 changes: 21 additions & 9 deletions snuba/datasets/configuration/spans/storages/spans.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,33 @@ schema:
},
},
{
name: measurements,
name: _tags_hash_map,
type: Array,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
},
},
{
name: sentry_tags,
type: Nested,
args:
{
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
[ { name: key, type: String }, { name: value, type: String } ],
},
},
{
name: _tags_hash_map,
type: Array,
name: measurements,
type: Nested,
args:
{
inner_type: { type: UInt, args: { size: 64 } },
schema_modifiers: [ readonly ],
subcolumns:
[
{ name: key, type: String },
{ name: value, type: Float, args: { size: 64 } },
],
},
},
{ name: partition, type: UInt, args: { size: 16 } },
Expand Down Expand Up @@ -112,6 +121,9 @@ query_processors:
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: sentry_tags
- processor: ArrayJoinKeyValueOptimizer
args:
column_name: measurements
Expand Down
74 changes: 4 additions & 70 deletions snuba/datasets/processors/functions_processor.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,6 @@
import uuid
from datetime import datetime
from typing import Any, Mapping, Optional
from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor

from sentry_relay.consts import SPAN_STATUS_NAME_TO_CODE

from snuba import environment
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.processor import InsertBatch, ProcessedMessage
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "functions.processor")

UNKNOWN_SPAN_STATUS = 2
MAX_DEPTH = 1024


class FunctionsMessageProcessor(DatasetMessageProcessor):
def process_message(
self, message: Mapping[str, Any], metadata: KafkaMessageMetadata
) -> Optional[ProcessedMessage]:
status = message.get("transaction_status")
if status:
int_status = SPAN_STATUS_NAME_TO_CODE.get(
status,
UNKNOWN_SPAN_STATUS,
)
else:
int_status = UNKNOWN_SPAN_STATUS
functions_list = [
{
"project_id": message["project_id"],
"transaction_name": message["transaction_name"],
"timestamp": int(
message.get("timestamp", datetime.utcnow().timestamp())
),
"depth": 0, # deprecated
"parent_fingerprint": 0, # deprecated
"fingerprint": function["fingerprint"],
"name": function["function"], # to be removed
"function": function["function"],
"package": function.get("package", ""),
"module": function.get("module", ""),
"path": "", # deprecated
"is_application": 1 if function.get("in_app", True) else 0,
"platform": message["platform"],
"environment": message.get("environment"),
"release": message.get("release"),
"dist": message.get("dist"),
"transaction_op": message.get("transaction_op", ""),
"transaction_status": int_status,
"http_method": message.get("http_method"),
"browser_name": message.get("browser_name"),
"device_classification": message.get("device_class", 0),
"os_name": "", # deprecated
"os_version": "", # deprecated
"retention_days": message["retention_days"],
"durations": function["self_times_ns"],
"profile_id": str(uuid.UUID(message["profile_id"])),
"materialization_version": 0,
}
for function in message["functions"]
if function.get("function") and function.get("self_times_ns")
]

received = message.get("received")

return InsertBatch(
functions_list,
datetime.utcfromtimestamp(received) if received else None,
)
class FunctionsMessageProcessor(RustCompatProcessor):
def __init__(self) -> None:
super().__init__("FunctionsMessageProcessor")
Loading
Loading