Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create rc-processing-transaction #79200

Merged
merged 67 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
bd10897
create rc-processing-transaction
kneeyo1 Oct 16, 2024
8ab9f15
add to all
kneeyo1 Oct 16, 2024
1059d02
Merge branch 'master' into rc-processing-split-pt1
kneeyo1 Oct 16, 2024
987cf21
add to everywhere transactions uses eventprocessing
kneeyo1 Oct 16, 2024
96b5134
Update src/sentry/conf/server.py
kneeyo1 Oct 16, 2024
021461b
update to just read/write to transactions
kneeyo1 Oct 16, 2024
6302093
cleanup
kneeyo1 Oct 16, 2024
c15dc45
transaction
kneeyo1 Oct 16, 2024
31c3521
Update src/sentry/conf/server.py
kneeyo1 Oct 16, 2024
9d7fa8b
cleanup
kneeyo1 Oct 17, 2024
2a1aece
added some better ways to split up errors and transactions
kneeyo1 Oct 17, 2024
db433f0
Merge branch 'master' into rc-processing-split-pt1
kneeyo1 Oct 17, 2024
ff63406
adding event type to post process
kneeyo1 Oct 18, 2024
9a95123
Merge branch 'master' into rc-processing-split-pt1
kneeyo1 Oct 18, 2024
0950893
some cleanup
kneeyo1 Oct 18, 2024
23cb250
Merge branch 'master' into rc-processing-split-pt1
kneeyo1 Oct 18, 2024
b3da377
fix the arg
kneeyo1 Oct 18, 2024
142101c
typing
kneeyo1 Oct 18, 2024
76af9df
update to use eventstream type
kneeyo1 Oct 20, 2024
96cfee4
testcases
kneeyo1 Oct 21, 2024
12b70ee
fixed
kneeyo1 Oct 21, 2024
8d1fd28
dont overrwrite
kneeyo1 Oct 21, 2024
1c3016f
Update src/sentry/post_process_forwarder/post_process_forwarder.py
kneeyo1 Oct 21, 2024
0eb1845
fix(feedback): keep oldest date_added for duplicate user reports (#79…
aliu39 Oct 18, 2024
54665c6
Revert "feat(sentry-sdk): Enable HTTP2 transport" (#79391)
ellisonmarks Oct 19, 2024
a98f387
feat(dynamic-sampling): Settings for sample rate (#79341)
ArthurKnaus Oct 21, 2024
e4a34c0
feat(metrics): Register MRI for spans/count_per_root_project (#78992)
jan-auer Oct 21, 2024
a3b5f0f
chore(sentryapps) Add more assertions for refresh tokens (#79256)
markstory Oct 21, 2024
9f1bf83
ref(onboarding): Remove empty code snippets comments for react (#79407)
priscilawebdev Oct 21, 2024
c1983e4
ref(dashboards): Convert `WidgetCard` to functional component (#79273)
gggritso Oct 21, 2024
0255356
ref(grouping): Create _process_frames function (#79375)
armenzg Oct 21, 2024
0cf8c08
fix(dashboards): Don't show Add an Equation when dataset incompatible…
narsaynorath Oct 21, 2024
08d58ff
fix(sentryapps) Stop passing event to celery task v2 (#79042)
markstory Oct 21, 2024
15b7ce9
fix(unmerge): Allow members to unmerge (#79125)
jangjodi Oct 21, 2024
990a5b8
fix(trace) convert autogrouped timestamps to ms (#79373)
JonasBa Oct 21, 2024
c3f40ca
fix(trace) zoom on column click (#79414)
JonasBa Oct 21, 2024
948487b
ref(similarity): Remove message from similarity tab (#78423)
jangjodi Oct 21, 2024
57c503d
fix(discover): Environment overrides for homepage query (#79416)
shruthilayaj Oct 21, 2024
dadb28a
chore(insights): Set MongoDB feature badge to `new` (#79363)
0Calories Oct 21, 2024
a4bf728
feat(query-builder): Improve value suggestions so that they show up m…
malwilley Oct 21, 2024
5c55aa3
ref(trace) preserve context when zooming into spans (#78902)
JonasBa Oct 21, 2024
ff7a9b5
fix(flamegraph) add reset filter overlay filter (#79001)
JonasBa Oct 21, 2024
f430872
fix(trace) adjust timeline if the span timeline is exceeded (#79371)
JonasBa Oct 21, 2024
bb751c3
fix(trace): dont allow expanding and collapsing root node (#79390)
JonasBa Oct 21, 2024
6f2821c
ref(onboarding): Update verify code for svelte and react (#79409)
priscilawebdev Oct 21, 2024
d20263c
cleanup(group-attributes): Remove the option to send updates to kafka…
snigdhas Oct 21, 2024
e1274d5
ref(onboarding): Improve vanilla option visibility (#79400)
priscilawebdev Oct 21, 2024
b672ffb
feat(occurrences): Add rate limiter to issues occurrence consumer (#…
roggenkemper Oct 21, 2024
8c111e1
Merge branch 'master' into rc-processing-split-pt1
kneeyo1 Oct 21, 2024
a980a65
more corrections
kneeyo1 Oct 21, 2024
0900e9d
more corrections
kneeyo1 Oct 21, 2024
3747f85
more cleanup
kneeyo1 Oct 21, 2024
6649290
last type
kneeyo1 Oct 21, 2024
1112e9b
fix the type
kneeyo1 Oct 21, 2024
0ce361a
retype because can only pickle the str
kneeyo1 Oct 21, 2024
8ed4bdd
convert to str
kneeyo1 Oct 21, 2024
0019895
fixed tyuping issuese
kneeyo1 Oct 21, 2024
fb4cb87
updated to use consumertype as str and pass along consumer type to pr…
kneeyo1 Oct 21, 2024
430bfce
addressed comments, cleaned up some types
kneeyo1 Oct 22, 2024
e6c758b
cleaned up test
kneeyo1 Oct 22, 2024
18af4f6
fix test cases again
kneeyo1 Oct 22, 2024
51fab09
fixed test
kneeyo1 Oct 22, 2024
59da8fd
back to enum
kneeyo1 Oct 22, 2024
c108e3e
Merge branch 'master' into rc-processing-split-pt1
kneeyo1 Oct 22, 2024
7e9e4ec
added small test case fix
kneeyo1 Oct 22, 2024
0f36dc2
make sure is string
kneeyo1 Oct 22, 2024
3a0aa4b
forgot this one
kneeyo1 Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1666,6 +1666,13 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
)
SENTRY_EVENT_PROCESSING_STORE_OPTIONS: dict[str, str] = {}

# Transactions processing backend
kneeyo1 marked this conversation as resolved.
Show resolved Hide resolved
# If these are set, transactions will be written to a different processing store
# than errors. If these are set to none, Events(errors) and transactions will
# both write to the EVENT_PROCESSING_STORE.
SENTRY_TRANSACTION_PROCESSING_STORE: str | None = None
SENTRY_TRANSACTION_PROCESSING_STORE_OPTIONS: dict[str, str] = {}

# The internal Django cache is still used in many places
# TODO(dcramer): convert uses over to Sentry's backend
CACHES = {"default": {"BACKEND": "django.core.cache.backends.dummy.DummyCache"}}
Expand Down
10 changes: 10 additions & 0 deletions src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
validate_consumer_definition,
)
from sentry.consumers.validate_schema import ValidateSchema
from sentry.eventstream.types import EventStreamEventType
from sentry.ingest.types import ConsumerType
from sentry.utils.imports import import_string
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down Expand Up @@ -373,20 +374,29 @@ def ingest_transactions_options() -> list[click.Option]:
"synchronize_commit_log_topic_default": "snuba-generic-events-commit-log",
"synchronize_commit_group_default": "generic_events_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
"static_args": {
"eventstream_type": EventStreamEventType.Generic.value,
},
},
"post-process-forwarder-transactions": {
"topic": Topic.TRANSACTIONS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-transactions-commit-log",
"synchronize_commit_group_default": "transactions_group",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
"static_args": {
"eventstream_type": EventStreamEventType.Transaction.value,
},
},
"post-process-forwarder-errors": {
"topic": Topic.EVENTS,
"strategy_factory": "sentry.eventstream.kafka.dispatch.EventPostProcessForwarderStrategyFactory",
"synchronize_commit_log_topic_default": "snuba-commit-log",
"synchronize_commit_group_default": "snuba-consumers",
"click_options": _POST_PROCESS_FORWARDER_OPTIONS,
"static_args": {
"eventstream_type": EventStreamEventType.Error.value,
},
},
"process-spans": {
"topic": Topic.SNUBA_SPANS,
Expand Down
15 changes: 14 additions & 1 deletion src/sentry/eventstore/processing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,18 @@
settings.SENTRY_EVENT_PROCESSING_STORE_OPTIONS,
)

transaction_processing_store = LazyServiceWrapper(
EventProcessingStore,
(
settings.SENTRY_EVENT_PROCESSING_STORE
if settings.SENTRY_TRANSACTION_PROCESSING_STORE is None
else settings.SENTRY_TRANSACTION_PROCESSING_STORE
),
(
settings.SENTRY_EVENT_PROCESSING_STORE_OPTIONS
if settings.SENTRY_TRANSACTION_PROCESSING_STORE_OPTIONS is None
else settings.SENTRY_TRANSACTION_PROCESSING_STORE_OPTIONS
),
)

__all__ = ["event_processing_store"]
__all__ = ["event_processing_store", "transaction_processing_store"]
16 changes: 5 additions & 11 deletions src/sentry/eventstream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
from collections.abc import Collection, Mapping, MutableMapping, Sequence
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, Optional, TypedDict, cast

from sentry.issues.issue_occurrence import IssueOccurrence
Expand All @@ -12,6 +11,8 @@
from sentry.utils.cache import cache_key_for_event
from sentry.utils.services import Service

from .types import EventStreamEventType

logger = logging.getLogger(__name__)


Expand All @@ -36,16 +37,6 @@ class GroupState(TypedDict):
GroupStates = Sequence[GroupState]


class EventStreamEventType(Enum):
"""
We have 3 broad categories of event types that we care about in eventstream.
"""

Error = "error" # error, default, various security errors
Transaction = "transaction" # transactions
Generic = "generic" # generic events ingested via the issue platform


class EventStream(Service):
__all__ = (
"insert",
Expand All @@ -69,6 +60,7 @@ def __init__(self, **options: Any) -> None:

def _dispatch_post_process_group_task(
self,
eventstream_type: str,
event_id: str,
project_id: int,
group_id: int | None,
Expand All @@ -88,6 +80,7 @@ def _dispatch_post_process_group_task(

post_process_group.apply_async(
kwargs={
"eventstream_type": eventstream_type,
"is_new": is_new,
"is_regression": is_regression,
"is_new_group_environment": is_new_group_environment,
Expand Down Expand Up @@ -133,6 +126,7 @@ def insert(
group_states: GroupStates | None = None,
) -> None:
self._dispatch_post_process_group_task(
self._get_event_type(event).value,
event.event_id,
event.project_id,
event.group_id,
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@

from sentry import options
from sentry.conf.types.kafka_definition import Topic
from sentry.eventstream.base import EventStreamEventType, GroupStates
from sentry.eventstream.base import GroupStates
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
from sentry.eventstream.types import EventStreamEventType
from sentry.killswitches import killswitch_matches_context
from sentry.utils import json
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
Expand Down
10 changes: 6 additions & 4 deletions src/sentry/eventstream/kafka/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def _sampled_eventstream_timer(instance: str) -> Generator[None, None, None]:


def dispatch_post_process_group_task(
eventstream_type: str,
event_id: str,
project_id: int,
group_id: int | None,
Expand All @@ -53,6 +54,7 @@ def dispatch_post_process_group_task(

post_process_group.apply_async(
kwargs={
"eventstream_type": eventstream_type,
"is_new": is_new,
"is_regression": is_regression,
"is_new_group_environment": is_new_group_environment,
Expand Down Expand Up @@ -83,15 +85,15 @@ def _get_task_kwargs(message: Message[KafkaPayload]) -> Mapping[str, Any] | None
return get_task_kwargs_for_message(message.payload.value)


def _get_task_kwargs_and_dispatch(message: Message[KafkaPayload]) -> None:
def _get_task_kwargs_and_dispatch(eventstream_type: str, message: Message[KafkaPayload]) -> None:
task_kwargs = _get_task_kwargs(message)
if not task_kwargs:
return None

dispatch_post_process_group_task(**task_kwargs)
dispatch_post_process_group_task(eventstream_type=eventstream_type, **task_kwargs)


class EventPostProcessForwarderStrategyFactory(PostProcessForwarderStrategyFactory):
@staticmethod
def _dispatch_function(message: Message[KafkaPayload]) -> None:
return _get_task_kwargs_and_dispatch(message)
def _dispatch_function(eventstream_type: str, message: Message[KafkaPayload]) -> None:
return _get_task_kwargs_and_dispatch(eventstream_type, message)
4 changes: 3 additions & 1 deletion src/sentry/eventstream/snuba.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from sentry import quotas
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
from sentry.eventstore.models import GroupEvent
from sentry.eventstream.base import EventStream, EventStreamEventType, GroupStates
from sentry.eventstream.base import EventStream, GroupStates
from sentry.eventstream.types import EventStreamEventType
from sentry.utils import json, snuba
from sentry.utils.safe import get_path
from sentry.utils.sdk import set_current_event_project
Expand Down Expand Up @@ -469,6 +470,7 @@ def insert(
**kwargs,
)
self._dispatch_post_process_group_task(
self._get_event_type(event).value,
event.event_id,
event.project_id,
event.group_id,
Expand Down
11 changes: 11 additions & 0 deletions src/sentry/eventstream/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pulled this out of base.py to avoid circular imports



class EventStreamEventType(Enum):
"""
We have 3 broad categories of event types that we care about in eventstream.
"""

Error = "error" # error, default, various security errors
Transaction = "transaction" # transactions
Generic = "generic" # generic events ingested via the issue platform
7 changes: 5 additions & 2 deletions src/sentry/ingest/consumer/attachment_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.ingest.types import ConsumerType
from sentry.models.project import Project
from sentry.utils import metrics

Expand All @@ -23,7 +24,9 @@


def decode_and_process_chunks(
raw_message: Message[KafkaPayload], consumer_type: str, reprocess_only_stuck_events: bool
raw_message: Message[KafkaPayload],
consumer_type: str,
reprocess_only_stuck_events: bool,
) -> IngestMessage | None:
"""
The first pass for the `attachments` topic:
Expand Down Expand Up @@ -90,7 +93,7 @@ def process_attachments_and_events(
if not reprocess_only_stuck_events:
process_individual_attachment(message, project)
elif message_type == "event":
process_event(message, project, reprocess_only_stuck_events)
process_event(ConsumerType.Events, message, project, reprocess_only_stuck_events)
elif message_type == "user_report":
process_userreport(message, project)
else:
Expand Down
15 changes: 11 additions & 4 deletions src/sentry/ingest/consumer/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from sentry import eventstore, features
from sentry.attachments import CachedAttachment, attachment_cache
from sentry.event_manager import EventManager, save_attachment
from sentry.eventstore.processing import event_processing_store
from sentry.eventstore.processing import event_processing_store, transaction_processing_store
from sentry.feedback.usecases.create_feedback import FeedbackCreationSource, is_in_feedback_denylist
from sentry.ingest.types import ConsumerType
from sentry.ingest.userreport import Conflict, save_userreport
from sentry.killswitches import killswitch_matches_context
from sentry.models.project import Project
Expand Down Expand Up @@ -73,13 +74,14 @@ def process_transaction_no_celery(
data = dict(data.items())

with sentry_sdk.start_span(op="event_processing_store.store"):
cache_key = event_processing_store.store(data)
cache_key = transaction_processing_store.store(data)
save_attachments(attachments, cache_key)


@trace_func(name="ingest_consumer.process_event")
@metrics.wraps("ingest_consumer.process_event")
def process_event(
consumer_type: str,
message: IngestMessage,
project: Project,
reprocess_only_stuck_events: bool = False,
Expand All @@ -95,6 +97,11 @@ def process_event(
remote_addr = message.get("remote_addr")
attachments = message.get("attachments") or ()

if consumer_type == ConsumerType.Transactions:
processing_store = transaction_processing_store
else:
processing_store = event_processing_store

sentry_sdk.set_extra("event_id", event_id)
sentry_sdk.set_extra("len_attachments", len(attachments))

Expand Down Expand Up @@ -183,7 +190,7 @@ def process_event(
# process and consume the event from the `processing_store`, whereby getting it "unstuck".
if reprocess_only_stuck_events:
with sentry_sdk.start_span(op="event_processing_store.exists"):
if not event_processing_store.exists(data):
if not processing_store.exists(data):
return

# The no_celery_mode version of the transactions consumer skips one trip to rc-processing
Expand All @@ -193,7 +200,7 @@ def process_event(
cache_key = None
else:
with metrics.timer("ingest_consumer._store_event"):
cache_key = event_processing_store.store(data)
cache_key = processing_store.store(data)
save_attachments(attachments, cache_key)

try:
Expand Down
5 changes: 4 additions & 1 deletion src/sentry/ingest/consumer/simple_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from arroyo.dlq import InvalidMessage
from arroyo.types import BrokerValue, Message

from sentry.ingest.types import ConsumerType
from sentry.models.project import Project
from sentry.utils import metrics

Expand Down Expand Up @@ -59,7 +60,9 @@ def process_simple_event_message(
logger.exception("Project for ingested event does not exist: %s", project_id)
return

return process_event(message, project, reprocess_only_stuck_events, no_celery_mode)
return process_event(
ConsumerType.Events, message, project, reprocess_only_stuck_events, no_celery_mode
)

except Exception as exc:
# If the retriable exception was raised, we should not DLQ
Expand Down
9 changes: 6 additions & 3 deletions src/sentry/post_process_forwarder/post_process_forwarder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from abc import ABC, abstractmethod
from collections.abc import Mapping
from functools import partial

from arroyo.backends.kafka import KafkaPayload
from arroyo.processing.strategies import (
Expand All @@ -19,11 +20,12 @@
class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload], ABC):
@staticmethod
@abstractmethod
def _dispatch_function(message: Message[KafkaPayload]) -> None:
def _dispatch_function(eventstream_type: str, message: Message[KafkaPayload]) -> None:
raise NotImplementedError()

def __init__(
self,
eventstream_type: str,
mode: str,
num_processes: int,
input_block_size: int,
Expand All @@ -32,6 +34,7 @@ def __init__(
max_batch_time: int,
concurrency: int,
) -> None:
self.eventstream_type = eventstream_type
self.mode = mode
self.input_block_size = input_block_size
self.output_block_size = output_block_size
Expand All @@ -49,15 +52,15 @@ def create_with_partitions(
if self.mode == "multithreaded":
logger.info("Starting multithreaded post process forwarder")
return RunTaskInThreads(
processing_function=self._dispatch_function,
processing_function=partial(self._dispatch_function, self.eventstream_type),
concurrency=self.concurrency,
max_pending_futures=self.max_pending_futures,
next_step=CommitOffsets(commit),
)
elif self.mode == "multiprocess":
logger.info("Starting multiprocess post process forwarder")
return run_task_with_multiprocessing(
function=self._dispatch_function,
function=partial(self._dispatch_function, self.eventstream_type),
next_step=CommitOffsets(commit),
max_batch_size=self.max_batch_size,
max_batch_time=self.max_batch_time,
Expand Down
Loading
Loading