Skip to content

Commit

Permalink
feat(group-attributes): expose storage and entity, process messages (#…
Browse files Browse the repository at this point in the history
…4507)

* part 2 of setting up group_attributes

* type

* fix stuff after testing

* add simple tests

* typing, dlq incr

* fix test

* set literal

* fix test

* more tests

* dev

* project_id filter column
  • Loading branch information
barkbarkimashark authored Jul 19, 2023
1 parent 17d74b9 commit 11ff698
Show file tree
Hide file tree
Showing 13 changed files with 408 additions and 20 deletions.
16 changes: 16 additions & 0 deletions snuba/cli/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,22 @@ def devserver(*, bootstrap: bool, workers: bool) -> None:
),
]

if settings.ENABLE_GROUP_ATTRIBUTES_CONSUMER:
daemons += [
(
"group-attributes-consumer",
[
"snuba",
"consumer",
"--auto-offset-reset=latest",
"--no-strict-offset-reset",
"--log-level=debug",
"--storage=group_attributes",
"--consumer-group=group_attributes_group",
],
),
]

manager = Manager()
for name, cmd in daemons:
manager.add_process(
Expand Down
6 changes: 6 additions & 0 deletions snuba/datasets/configuration/group_attributes/dataset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
version: v1
kind: dataset
name: group_attributes

entities:
- group_attributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: v1
kind: entity
name: group_attributes

schema:
[
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: group_id, type: UInt, args: { size: 64 } },

{ name: group_status, type: UInt, args: { size: 8 } },
{ name: group_substatus, type: UInt, args: { size: 8, schema_modifiers: [ nullable ] } },
{ name: group_first_seen, type: DateTime },
{ name: group_num_comments, type: UInt, args: { size: 64 } },

{ name: assignee_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: assignee_team_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },

{ name: owner_suspect_commit_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_ownership_rule_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_ownership_rule_team_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_codeowners_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_codeowners_team_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },

{ name: deleted, type: UInt, args: { size: 8 } },
{ name: message_timestamp, type: DateTime },
{ name: partition, type: UInt, args: { size: 16 } },
{ name: offset, type: UInt, args: { size: 64 } },
]

storages:
- storage: group_attributes
is_writable: true

storage_selector:
selector: DefaultQueryStorageSelector

query_processors:
- processor: ReferrerRateLimiterProcessor
# - processor: ProjectReferrerRateLimiter
# args:
# project_column: project_id
# - processor: ProjectRateLimiterProcessor
# args:
# project_column: project_id
# - processor: ResourceQuotaProcessor
# args:
# project_field: project_id
- processor: BasicFunctionsProcessor

validate_data_model: error
validators:
- validator: EntityRequiredColumnValidator
args:
required_filter_columns: ["project_id"]

required_time_column: null
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
version: v1
kind: writable_storage
name: group_attributes

storage:
key: group_attributes
set_key: group_attributes

readiness_state: limited

schema:
columns:
[
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: group_id, type: UInt, args: { size: 64 } },

{ name: group_status, type: UInt, args: { size: 8 } },
{ name: group_substatus, type: UInt, args: { size: 8, schema_modifiers: [ nullable ] } },
{ name: group_first_seen, type: DateTime },
{ name: group_num_comments, type: UInt, args: { size: 64 } },

{ name: assignee_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: assignee_team_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },

{ name: owner_suspect_commit_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_ownership_rule_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_ownership_rule_team_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_codeowners_user_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },
{ name: owner_codeowners_team_id, type: UInt, args: { size: 64, schema_modifiers: [ nullable ] } },

{ name: deleted, type: UInt, args: { size: 8 } },
{ name: message_timestamp, type: DateTime },
{ name: partition, type: UInt, args: { size: 16 } },
{ name: offset, type: UInt, args: { size: 64 } },
]
local_table_name: group_attributes_local
dist_table_name: group_attributes_dist

allocation_policies:
- name: PassthroughPolicy
args:
required_tenant_types:
- blank

query_processors:
- processor: TableRateLimit

mandatory_condition_checkers:
- condition: ProjectIdEnforcer

stream_loader:
processor:
name: GroupAttributesMessageProcessor
default_topic: group-attributes
dlq_topic: snuba-dead-letter-group-attributes
52 changes: 52 additions & 0 deletions snuba/datasets/processors/group_attributes_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import datetime
from typing import Optional

from sentry_kafka_schemas.schema_types.group_attributes_v1 import (
GroupAttributesSnapshot,
)

from snuba import environment, settings
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.processors import DatasetMessageProcessor
from snuba.processor import InsertBatch, ProcessedMessage
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "group_attributes.processor")


class GroupAttributesMessageProcessor(DatasetMessageProcessor):
def process_message(
self, message: GroupAttributesSnapshot, metadata: KafkaMessageMetadata
) -> Optional[ProcessedMessage]:
return InsertBatch(
[
{
"project_id": message["project_id"],
"group_id": message["group_id"],
"group_status": message["status"],
"group_substatus": message["substatus"],
"group_first_seen": datetime.strptime(
message["first_seen"], settings.PAYLOAD_DATETIME_FORMAT
),
"group_num_comments": message["num_comments"],
"assignee_user_id": message["assignee_user_id"],
"assignee_team_id": message["assignee_team_id"],
"owner_suspect_commit_user_id": message[
"owner_suspect_commit_user_id"
],
"owner_ownership_rule_user_id": message[
"owner_ownership_rule_user_id"
],
"owner_ownership_rule_team_id": message[
"owner_ownership_rule_team_id"
],
"owner_codeowners_user_id": message["owner_codeowners_user_id"],
"owner_codeowners_team_id": message["owner_codeowners_team_id"],
"deleted": 1 if message["group_deleted"] else 0,
"message_timestamp": metadata.timestamp,
"partition": metadata.partition,
"offset": metadata.offset,
}
],
None,
)
5 changes: 5 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ class RedisClusters(TypedDict):
# Enable spans ingestion
ENABLE_SPANS_CONSUMER = os.environ.get("ENABLE_SPANS_CONSUMER", False)

# Enable group attributes consumer
ENABLE_GROUP_ATTRIBUTES_CONSUMER = os.environ.get(
"ENABLE_GROUP_ATTRIBUTES_CONSUMER", False
)

# Cutoff time from UTC 00:00:00 to stop running optimize jobs to
# avoid spilling over to the next day.
OPTIMIZE_JOB_CUTOFF_TIME = 23
Expand Down
2 changes: 2 additions & 0 deletions snuba/settings/validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def validate_settings(locals: Mapping[str, Any]) -> None:
"profiles-call-tree",
"ingest-replay-events",
"generic-events",
"group-attributes",
"snuba-generic-events-commit-log",
"snuba-dead-letter-replays",
"snuba-generic-metrics",
Expand All @@ -72,6 +73,7 @@ def validate_settings(locals: Mapping[str, Any]) -> None:
"snuba-dead-letter-metrics-counters",
"snuba-dead-letter-generic-events",
"snuba-dead-letter-querylog",
"snuba-dead-letter-group-attributes",
"snuba-generic-events-commit-log",
}

Expand Down
3 changes: 3 additions & 0 deletions snuba/utils/streams/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Topic(Enum):
GENERIC_METRICS_COUNTERS_COMMIT_LOG = "snuba-generic-metrics-counters-commit-log"
GENERIC_EVENTS = "generic-events"
GENERIC_EVENTS_COMMIT_LOG = "snuba-generic-events-commit-log"
GROUP_ATTRIBUTES = "group-attributes"

ATTRIBUTION = "snuba-attribution"
DEAD_LETTER_METRICS = "snuba-dead-letter-metrics"
Expand All @@ -59,6 +60,7 @@ class Topic(Enum):
DEAD_LETTER_REPLAYS = "snuba-dead-letter-replays"
DEAD_LETTER_GENERIC_EVENTS = "snuba-dead-letter-generic-events"
DEAD_LETTER_QUERYLOG = "snuba-dead-letter-querylog"
DEAD_LETTER_GROUP_ATTRIBUTES = "snuba-dead-letter-group-attributes"


def get_topic_creation_config(topic: Topic) -> Mapping[str, str]:
Expand All @@ -74,5 +76,6 @@ def get_topic_creation_config(topic: Topic) -> Mapping[str, str]:
Topic.GENERIC_METRICS: {"message.timestamp.type": "LogAppendTime"},
Topic.GENERIC_EVENTS: {"message.timestamp.type": "LogAppendTime"},
Topic.QUERYLOG: {"max.message.bytes": "2000000"},
Topic.GROUP_ATTRIBUTES: {"message.timestamp.type": "LogAppendTime"},
}
return config.get(topic, {})
2 changes: 1 addition & 1 deletion tests/admin/dead_letter_queue/test_dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def test_dlq() -> None:
assert len(get_dlq_topics()) == 7
assert len(get_dlq_topics()) == 8
37 changes: 18 additions & 19 deletions tests/datasets/test_dataset_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,21 @@ def test_get_dataset_factory_and_mapping_coupling() -> None:

def test_all_names() -> None:
factory._DS_FACTORY = None
assert set(get_enabled_dataset_names()) == set(
[
"discover",
"events",
"groupassignee",
"groupedmessage",
"metrics",
"outcomes",
"outcomes_raw",
"sessions",
"transactions",
"profiles",
"functions",
"generic_metrics",
"replays",
"search_issues",
"spans",
]
)
assert set(get_enabled_dataset_names()) == {
"discover",
"events",
"groupassignee",
"groupedmessage",
"metrics",
"outcomes",
"outcomes_raw",
"sessions",
"transactions",
"profiles",
"functions",
"generic_metrics",
"replays",
"search_issues",
"spans",
"group_attributes",
}
1 change: 1 addition & 0 deletions tests/datasets/test_entity_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
EntityKey.GENERIC_METRICS_COUNTERS,
EntityKey.GENERIC_ORG_METRICS_COUNTERS,
EntityKey.SPANS,
EntityKey.GROUP_ATTRIBUTES,
]


Expand Down
75 changes: 75 additions & 0 deletions tests/datasets/test_group_attributes_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from datetime import datetime
from typing import Optional

import pytest
from sentry_kafka_schemas.schema_types.group_attributes_v1 import (
GroupAttributesSnapshot,
)

from snuba import settings
from snuba.consumers.types import KafkaMessageMetadata
from snuba.datasets.processors.group_attributes_processor import (
GroupAttributesMessageProcessor,
)
from snuba.processor import ProcessedMessage
from snuba.writer import WriterTableRow


@pytest.fixture
def group_created() -> GroupAttributesSnapshot:
return {
"group_deleted": False,
"project_id": 1,
"group_id": 1,
"status": 0,
"substatus": 7,
"first_seen": "2023-02-27T15:40:12.223000Z",
"num_comments": 0,
"assignee_user_id": None,
"assignee_team_id": None,
"owner_suspect_commit_user_id": None,
"owner_ownership_rule_user_id": None,
"owner_ownership_rule_team_id": None,
"owner_codeowners_user_id": None,
"owner_codeowners_team_id": None,
"timestamp": "2023-02-27T15:40:12.223000Z",
}


class TestGroupAttributesMessageProcessor:
KAFKA_META = KafkaMessageMetadata(
offset=0, partition=0, timestamp=datetime(1970, 1, 1)
)

processor = GroupAttributesMessageProcessor()

def process_message(
self, message, kafka_meta: KafkaMessageMetadata = KAFKA_META
) -> Optional[ProcessedMessage]:
return self.processor.process_message(message, kafka_meta)

def processed_single_row(self, message) -> WriterTableRow:
return self.process_message(message).rows[0]

def test_group_created(self, group_created):
assert (
self.processed_single_row(group_created).items()
>= {
"project_id": 1,
"group_id": 1,
"group_status": 0,
"group_substatus": 7,
"group_first_seen": datetime.strptime(
group_created["first_seen"], settings.PAYLOAD_DATETIME_FORMAT
),
"group_num_comments": 0,
"assignee_user_id": None,
"assignee_team_id": None,
"owner_suspect_commit_user_id": None,
"owner_ownership_rule_user_id": None,
"owner_ownership_rule_team_id": None,
"owner_codeowners_user_id": None,
"owner_codeowners_team_id": None,
"deleted": 0,
}.items()
)
Loading

0 comments on commit 11ff698

Please sign in to comment.