Skip to content

Commit

Permalink
ref(deletes): bulk delete consumer (#6510)
Browse files Browse the repository at this point in the history
**context**:
The work for bulk deleting in snuba has so far included the following:
* Added the kafka schemas -
getsentry/sentry-kafka-schemas#347
* Adding the new endpoint including producing to the topic -
#6440
* Created the topic in production for all env -
getsentry/ops#12711

**what's left**:
Now that we have the topics created we can finish up the consumer side.
- [ ] Add the consumer logic to snuba (This PR)
- [ ] Add the consumer deployment to S4S region in the ops repository
- [ ] Set up datadog alerts/metrics and other observability

**this PR**:
It's a bit of a larger PR but it can be reviewed in a couple sections:
* The main consumer logic and strategy
* This has the logic to create the strategy factory for the consumer and
composes the strategy steps. The `strategy.py` file has the details for
actually executing the delete query
* The `batching.py` file - I have an [arroyo
PR](getsentry/arroyo#390) that makes this file
obsolete but in the meantime I don't think it needs to block this PR
since it will be easy to remove after
* The formatters are going to be the only code that someone will need to
write in the future when deploying the deletions consumer for a
different storage. How one formats the conditions for the `DELETE` query
is up to that logic.
  • Loading branch information
MeredithAnya authored Nov 8, 2024
1 parent 20c5da7 commit 2d54fea
Show file tree
Hide file tree
Showing 12 changed files with 830 additions and 0 deletions.
1 change: 1 addition & 0 deletions devservices/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ services:
ENABLE_ISSUE_OCCURRENCE_CONSUMER: ${ENABLE_ISSUE_OCCURRENCE_CONSUMER:-}
ENABLE_AUTORUN_MIGRATION_SEARCH_ISSUES: 1
ENABLE_GROUP_ATTRIBUTES_CONSUMER: ${ENABLE_GROUP_ATTRIBUTES_CONSUMER:-}
ENABLE_LW_DELETIONS_CONSUMER: ${ENABLE_LW_DELETIONS_CONSUMER:-}
platform: linux/amd64
extra_hosts:
host.docker.internal: host-gateway
Expand Down
18 changes: 18 additions & 0 deletions snuba/cli/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,24 @@ def devserver(*, bootstrap: bool, workers: bool) -> None:
),
]

if settings.ENABLE_LW_DELETIONS_CONSUMER:
daemons += [
(
"lw-deletions-consumer",
[
"snuba",
"lw-deletions-consumer",
"--storage-name=search_issues",
"--consumer-group=search_issues_deletes_group",
"--max-rows-batch-size=10",
"--max-batch-time-ms=1000",
"--auto-offset-reset=latest",
"--no-strict-offset-reset",
"--log-level=debug",
],
),
]

manager = Manager()
for name, cmd in daemons:
manager.add_process(
Expand Down
174 changes: 174 additions & 0 deletions snuba/cli/lw_deletions_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import logging
import signal
from typing import Any, Optional, Sequence

import click
import sentry_sdk
from arroyo import configure_metrics
from arroyo.backends.kafka import KafkaPayload
from arroyo.processing import StreamProcessor

from snuba import environment, settings
from snuba.consumers.consumer_builder import (
ConsumerBuilder,
KafkaParameters,
ProcessingParameters,
)
from snuba.consumers.consumer_config import resolve_consumer_config
from snuba.datasets.deletion_settings import MAX_ROWS_TO_DELETE_DEFAULT
from snuba.datasets.storages.factory import get_writable_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.environment import setup_logging, setup_sentry
from snuba.lw_deletions.formatters import STORAGE_FORMATTER
from snuba.lw_deletions.strategy import LWDeletionsConsumerStrategyFactory
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.streams.metrics_adapter import StreamMetricsAdapter
from snuba.web.bulk_delete_query import STORAGE_TOPIC

# A longer batch time for deletes is reasonable
# since we want fewer mutations
DEFAULT_DELETIONS_MAX_BATCH_TIME_MS = 60000 * 2

logger = logging.getLogger(__name__)


@click.command()
@click.option(
"--consumer-group",
help="Consumer group use for consuming the deletion topic.",
required=True,
)
@click.option(
"--bootstrap-server",
multiple=True,
help="Kafka bootstrap server to use for consuming.",
)
@click.option("--storage", help="Storage name to consume from", required=True)
@click.option(
"--max-rows-batch-size",
default=MAX_ROWS_TO_DELETE_DEFAULT,
type=int,
help="Max amount of rows to delete at one time.",
)
@click.option(
"--max-batch-time-ms",
default=DEFAULT_DELETIONS_MAX_BATCH_TIME_MS,
type=int,
help="Max duration to buffer messages in memory for.",
)
@click.option(
"--auto-offset-reset",
default="earliest",
type=click.Choice(["error", "earliest", "latest"]),
help="Kafka consumer auto offset reset.",
)
@click.option(
"--no-strict-offset-reset",
is_flag=True,
help="Forces the kafka consumer auto offset reset.",
)
@click.option(
"--queued-max-messages-kbytes",
default=settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES,
type=int,
help="Maximum number of kilobytes per topic+partition in the local consumer queue.",
)
@click.option(
"--queued-min-messages",
default=settings.DEFAULT_QUEUED_MIN_MESSAGES,
type=int,
help="Minimum number of messages per topic+partition the local consumer queue should contain before messages are sent to kafka.",
)
@click.option("--log-level", help="Logging level to use.")
def lw_deletions_consumer(
*,
consumer_group: str,
bootstrap_server: Sequence[str],
storage: str,
max_rows_batch_size: int,
max_batch_time_ms: int,
auto_offset_reset: str,
no_strict_offset_reset: bool,
queued_max_messages_kbytes: int,
queued_min_messages: int,
log_level: str,
) -> None:
setup_logging(log_level)
setup_sentry()

logger.info("Consumer Starting")

sentry_sdk.set_tag("storage", storage)
shutdown_requested = False
consumer: Optional[StreamProcessor[KafkaPayload]] = None

def handler(signum: int, frame: Any) -> None:
nonlocal shutdown_requested
shutdown_requested = True

if consumer is not None:
consumer.signal_shutdown()

signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

topic = STORAGE_TOPIC[storage]

while not shutdown_requested:
metrics_tags = {
"consumer_group": consumer_group,
"storage": storage,
}
metrics = MetricsWrapper(
environment.metrics, "lw_deletions_consumer", tags=metrics_tags
)
configure_metrics(StreamMetricsAdapter(metrics), force=True)
consumer_config = resolve_consumer_config(
storage_names=[storage],
raw_topic=topic.value,
commit_log_topic=None,
replacements_topic=None,
bootstrap_servers=bootstrap_server,
commit_log_bootstrap_servers=[],
replacement_bootstrap_servers=[],
slice_id=None,
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
group_instance_id=consumer_group,
)

consumer_builder = ConsumerBuilder(
consumer_config=consumer_config,
kafka_params=KafkaParameters(
group_id=consumer_group,
auto_offset_reset=auto_offset_reset,
strict_offset_reset=not no_strict_offset_reset,
queued_max_messages_kbytes=queued_max_messages_kbytes,
queued_min_messages=queued_min_messages,
),
processing_params=ProcessingParameters(None, None, None),
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
max_insert_batch_size=0,
max_insert_batch_time_ms=0,
metrics=metrics,
slice_id=None,
join_timeout=None,
enforce_schema=False,
metrics_tags=metrics_tags,
)

writable_storage = get_writable_storage(StorageKey(storage))
formatter = STORAGE_FORMATTER[storage]()
strategy_factory = LWDeletionsConsumerStrategyFactory(
max_batch_size=max_rows_batch_size,
max_batch_time_ms=max_batch_time_ms,
storage=writable_storage,
formatter=formatter,
metrics=metrics,
)

consumer = consumer_builder.build_lw_deletions_consumer(strategy_factory)

consumer.run()
consumer_builder.flush()
9 changes: 9 additions & 0 deletions snuba/consumers/consumer_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,15 @@ def build_dlq_consumer(
dlq_policy,
)

def build_lw_deletions_consumer(
self, strategy_factory: ProcessingStrategyFactory[KafkaPayload]
) -> StreamProcessor[KafkaPayload]:
return self.__build_consumer(
strategy_factory,
self.raw_topic,
self.__build_default_dlq_policy(),
)

def __build_default_dlq_policy(self) -> Optional[DlqPolicy[KafkaPayload]]:
"""
Default DLQ policy applies to the base consumer or the DLQ consumer when
Expand Down
Empty file added snuba/lw_deletions/__init__.py
Empty file.
157 changes: 157 additions & 0 deletions snuba/lw_deletions/batching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from __future__ import annotations

import time
from typing import Callable, Generic, MutableSequence, Optional, TypeVar, Union

from arroyo.processing.strategies.abstract import ProcessingStrategy
from arroyo.processing.strategies.buffer import Buffer
from arroyo.types import BaseValue, FilteredPayload, Message, TStrategyPayload

ValuesBatch = MutableSequence[BaseValue[TStrategyPayload]]


TPayload = TypeVar("TPayload")
TResult = TypeVar("TResult")


Accumulator = Callable[[TResult, BaseValue[TPayload]], TResult]


class ReduceRowsBuffer(Generic[TPayload, TResult]):
def __init__(
self,
accumulator: Accumulator[TResult, TPayload],
initial_value: Callable[[], TResult],
max_batch_size: int,
max_batch_time: float,
increment_by: Optional[Callable[[BaseValue[TPayload]], int]] = None,
):
self.accumulator = accumulator
self.initial_value = initial_value
self.max_batch_size = max_batch_size
self.max_batch_time = max_batch_time
self.increment_by = increment_by

self._buffer = initial_value()
self._buffer_size = 0
self._buffer_until = time.time() + max_batch_time

@property
def buffer(self) -> TResult:
return self._buffer

@property
def is_empty(self) -> bool:
return self._buffer_size == 0

@property
def is_ready(self) -> bool:
return (
self._buffer_size >= self.max_batch_size
or time.time() >= self._buffer_until
)

def append(self, message: BaseValue[TPayload]) -> None:
"""
Instead of increasing the buffer size based on the number
of messages, we use the `rows_to_delete` attribute in the
message payload so we can batch by the number of rows we
want to delete.
"""
self._buffer = self.accumulator(self._buffer, message)
if self.increment_by:
buffer_increment = self.increment_by(message)
else:
buffer_increment = 1
self._buffer_size += buffer_increment

def new(self) -> "ReduceRowsBuffer[TPayload, TResult]":
return ReduceRowsBuffer(
accumulator=self.accumulator,
initial_value=self.initial_value,
max_batch_size=self.max_batch_size,
max_batch_time=self.max_batch_time,
increment_by=self.increment_by,
)


class ReduceCustom(
ProcessingStrategy[Union[FilteredPayload, TPayload]], Generic[TPayload, TResult]
):
def __init__(
self,
max_batch_size: int,
max_batch_time: float,
accumulator: Accumulator[TResult, TPayload],
initial_value: Callable[[], TResult],
next_step: ProcessingStrategy[TResult],
increment_by: Optional[Callable[[BaseValue[TPayload]], int]] = None,
) -> None:
self.__buffer_step = Buffer(
buffer=ReduceRowsBuffer(
max_batch_size=max_batch_size,
max_batch_time=max_batch_time,
accumulator=accumulator,
initial_value=initial_value,
increment_by=increment_by,
),
next_step=next_step,
)

def submit(self, message: Message[Union[FilteredPayload, TPayload]]) -> None:
self.__buffer_step.submit(message)

def poll(self) -> None:
self.__buffer_step.poll()

def close(self) -> None:
self.__buffer_step.close()

def terminate(self) -> None:
self.__buffer_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
self.__buffer_step.join(timeout)


class BatchStepCustom(ProcessingStrategy[Union[FilteredPayload, TStrategyPayload]]):
def __init__(
self,
max_batch_size: int,
max_batch_time: float,
next_step: ProcessingStrategy[ValuesBatch[TStrategyPayload]],
increment_by: Optional[Callable[[BaseValue[TStrategyPayload]], int]] = None,
) -> None:
def accumulator(
result: ValuesBatch[TStrategyPayload], value: BaseValue[TStrategyPayload]
) -> ValuesBatch[TStrategyPayload]:
result.append(value)
return result

self.__reduce_step: ReduceCustom[
TStrategyPayload, ValuesBatch[TStrategyPayload]
] = ReduceCustom(
max_batch_size,
max_batch_time,
accumulator,
lambda: [],
next_step,
increment_by,
)

def submit(
self, message: Message[Union[FilteredPayload, TStrategyPayload]]
) -> None:
self.__reduce_step.submit(message)

def poll(self) -> None:
self.__reduce_step.poll()

def close(self) -> None:
self.__reduce_step.close()

def terminate(self) -> None:
self.__reduce_step.terminate()

def join(self, timeout: Optional[float] = None) -> None:
self.__reduce_step.join(timeout)
Loading

0 comments on commit 2d54fea

Please sign in to comment.