Skip to content

Commit

Permalink
ref(deletes): add use_bulk_deletes runtime config (#6560)
Browse files Browse the repository at this point in the history
Adding a temporary runtime config (`use_bulk_deletes`) to allow for
delete queries to go through the bulk delete pipeline, until we are
ready to fully move over to using it.

I choose a runtime config because I didn't want sentry to have to be
aware of which implementation was used (bulk deletes or not).

We should only enable this when we have the consumers running in S4S
  • Loading branch information
MeredithAnya authored Nov 12, 2024
1 parent d79be79 commit 0d81bb9
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 6 deletions.
19 changes: 14 additions & 5 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from snuba.redis import all_redis_clients
from snuba.request.exceptions import InvalidJsonRequestException, JsonDecodeException
from snuba.request.schema import RequestSchema
from snuba.state import get_int_config
from snuba.state.rate_limit import RateLimitExceeded
from snuba.subscriptions.codecs import SubscriptionDataCodec
from snuba.subscriptions.data import PartitionId
Expand All @@ -72,6 +73,7 @@
from snuba.utils.metrics.timer import Timer
from snuba.utils.metrics.util import with_span
from snuba.web import QueryException, QueryTooLongException
from snuba.web.bulk_delete_query import delete_from_storage as bulk_delete_from_storage
from snuba.web.constants import get_http_status_for_clickhouse_error
from snuba.web.converters import DatasetConverter, EntityConverter, StorageConverter
from snuba.web.delete_query import (
Expand Down Expand Up @@ -330,11 +332,18 @@ def storage_delete(
try:
schema = RequestSchema.build(HTTPQuerySettings, is_delete=True)
request_parts = schema.validate(body)
payload = delete_from_storage(
storage,
request_parts.query["query"]["columns"],
request_parts.attribution_info,
)
if get_int_config("use_bulk_deletes"):
payload = bulk_delete_from_storage(
storage,
request_parts.query["query"]["columns"],
request_parts.attribution_info,
)
else:
payload = delete_from_storage(
storage,
request_parts.query["query"]["columns"],
request_parts.attribution_info,
)
except (
InvalidJsonRequestException,
DeletesNotEnabledError,
Expand Down
55 changes: 54 additions & 1 deletion tests/web/test_bulk_delete_query.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import time
from typing import Any, Mapping, Optional
from typing import Any, Callable, Mapping, Optional, Tuple, Union
from unittest.mock import Mock, patch

import pytest
Expand All @@ -10,6 +10,9 @@
from confluent_kafka.admin import AdminClient

from snuba import settings
from snuba.core.initialize import initialize_snuba
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query.exceptions import InvalidQueryException
Expand All @@ -19,6 +22,9 @@
from snuba.utils.streams.topics import Topic
from snuba.web.bulk_delete_query import delete_from_storage
from snuba.web.delete_query import DeletesNotEnabledError
from tests.base import BaseApiTest
from tests.datasets.configuration.utils import ConfigurationTest
from tests.test_api import SimpleAPITest

CONSUMER_CONFIG = {
"bootstrap.servers": settings.BROKER_CONFIG["bootstrap.servers"],
Expand Down Expand Up @@ -123,3 +129,50 @@ def test_delete_invalid_column_name() -> None:

with pytest.raises(InvalidQueryException):
delete_from_storage(storage, conditions, attr_info)


class TestSimpleBulkDeleteApi(SimpleAPITest, BaseApiTest, ConfigurationTest):
@pytest.fixture
def test_entity(self) -> Union[str, Tuple[str, str]]:
return "search_issues"

@pytest.fixture
def test_app(self) -> Any:
return self.app

def setup_method(self, test_method: Callable[..., Any]) -> None:
super().setup_method(test_method)
initialize_snuba()
self.events_storage = get_entity(EntityKey.SEARCH_ISSUES).get_writable_storage()
assert self.events_storage is not None

def delete_query(
self,
group_id: int,
) -> Any:
return self.app.delete(
"/search_issues",
data=rapidjson.dumps(
{
"query": {"columns": {"group_id": [group_id], "project_id": [3]}},
"debug": True,
"tenant_ids": {"referrer": "test", "organization_id": 1},
}
),
headers={"referer": "test"},
)

@patch("snuba.web.views.delete_from_storage", return_value={})
@patch("snuba.web.views.bulk_delete_from_storage", return_value={})
def test_bulk_delete_runtime_config(
self, mock_bulk_delete: Mock, mock_delete: Mock
) -> None:
set_config("read_through_cache.short_circuit", 1)

self.delete_query(1)
mock_bulk_delete.assert_not_called()
mock_delete.assert_called_once()

set_config("use_bulk_deletes", 1)
self.delete_query(3)
mock_bulk_delete.assert_called_once()

0 comments on commit 0d81bb9

Please sign in to comment.