diff --git a/snuba/datasets/processors/transactions_processor.py b/snuba/datasets/processors/transactions_processor.py index 781cf501df..879dd1fa9b 100644 --- a/snuba/datasets/processors/transactions_processor.py +++ b/snuba/datasets/processors/transactions_processor.py @@ -421,6 +421,13 @@ def _sanitize_contexts( transaction_ctx.pop("trace_id", None) transaction_ctx.pop("span_id", None) + # Only top level scalar values within a context are written to the table. `data` is + # always a dict, so pop it from the context and move some values into the top level. + transaction_data = transaction_ctx.pop("data", {}) + if "thread.id" in transaction_data: + # The thread.id can be either a str/int. Make sure to always convert to a str. + transaction_ctx["thread_id"] = str(transaction_data["thread.id"]) + # The hash and exclusive_time is being stored in the spans columns # so there is no need to store it again in the context array. transaction_ctx.pop("hash", None) @@ -432,7 +439,7 @@ def _sanitize_contexts( if app_ctx is not None: app_ctx.pop("start_type", None) - # The profile_id and replay_id are promoted as columns, so no need to store them + # The profile_id, profiler_id and replay_id are promoted as columns, so no need to store them # again in the context array profile_ctx = sanitized_context.get("profile", {}) profile_ctx.pop("profile_id", None) diff --git a/tests/datasets/test_transaction_processor.py b/tests/datasets/test_transaction_processor.py index 9ae424e9e1..4acc621906 100644 --- a/tests/datasets/test_transaction_processor.py +++ b/tests/datasets/test_transaction_processor.py @@ -46,9 +46,28 @@ class TransactionEvent: has_app_ctx: bool = True profile_id: Optional[str] = None profiler_id: Optional[str] = None + thread_id: Optional[int | str] = None replay_id: Optional[str] = None received: Optional[float] = None + def get_trace_context(self) -> Optional[Mapping[str, Any]]: + context = { + "sampled": True, + "trace_id": self.trace_id, + "op": self.op, + "type": "trace", + "span_id": self.span_id, + "status": self.status, + "hash": "a" * 16, + "exclusive_time": 1.2345, + } + + if self.thread_id is not None: + data = context.setdefault("data", {}) + data["thread.id"] = self.thread_id + + return context + def get_app_context(self) -> Optional[Mapping[str, str]]: if self.has_app_ctx: return {"start_type": self.app_start_type} @@ -161,16 +180,7 @@ def serialize(self) -> Tuple[int, str, Mapping[str, Any]]: } }, "contexts": { - "trace": { - "sampled": True, - "trace_id": self.trace_id, - "op": self.op, - "type": "trace", - "span_id": self.span_id, - "status": self.status, - "hash": "a" * 16, - "exclusive_time": 1.2345, - }, + "trace": self.get_trace_context(), "app": self.get_app_context(), "experiments": {"test1": 1, "test2": 2}, "profile": self.get_profile_context(), @@ -249,6 +259,7 @@ def build_result(self, meta: KafkaMessageMetadata) -> Mapping[str, Any]: "trace.sampled", "trace.op", "trace.status", + "trace.thread_id", "geo.country_code", "geo.region", "geo.city", @@ -258,6 +269,7 @@ def build_result(self, meta: KafkaMessageMetadata) -> Mapping[str, Any]: "True", self.op, self.status, + str(self.thread_id), self.geo["country_code"], self.geo["region"], self.geo["city"], @@ -345,6 +357,7 @@ def __get_transaction_event(self) -> TransactionEvent: transaction_source="url", profile_id="046852d24483455c8c44f0c8fbf496f9", profiler_id="822301ff8bdb4daca920ddf2f993b1ff", + thread_id=123, replay_id="d2731f8ed8934c6fa5253e450915aa12", )