Skip to content

Commit

Permalink
Merge branch 'master' into deprecate-topic-partitions-count
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnagara authored May 20, 2024
2 parents 78a7299 + e79b1ee commit 9f0ffea
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 14 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ PyYAML==6.0
sqlparse==0.4.2
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
freezegun==1.2.2
16 changes: 15 additions & 1 deletion snuba/admin/static/api_client.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import {
RunMigrationResult,
} from "SnubaAdmin/clickhouse_migrations/types";
import { TracingRequest, TracingResult } from "SnubaAdmin/tracing/types";
import { SnQLRequest, SnQLResult, SnubaDatasetName } from "SnubaAdmin/snql_to_sql/types";
import {
SnQLRequest,
SnQLResult,
SnubaDatasetName,
} from "SnubaAdmin/snql_to_sql/types";

import { KafkaTopicData } from "SnubaAdmin/kafka/types";
import { QuerylogRequest, QuerylogResult } from "SnubaAdmin/querylog/types";
Expand All @@ -33,10 +37,14 @@ import {
import { AllocationPolicy } from "SnubaAdmin/capacity_management/types";

import { ReplayInstruction, Topic } from "SnubaAdmin/dead_letter_queue/types";
import { AutoReplacementsBypassProjectsData } from "SnubaAdmin/auto_replacements_bypass_projects/types";

interface Client {
getSettings: () => Promise<Settings>;
getConfigs: () => Promise<Config[]>;
getAutoReplacementsBypassProjects: () => Promise<
AutoReplacementsBypassProjectsData[]
>;
createNewConfig: (
key: ConfigKey,
value: ConfigValue,
Expand Down Expand Up @@ -106,6 +114,12 @@ function Client() {
const url = baseUrl + "configs";
return fetch(url).then((resp) => resp.json());
},
getAutoReplacementsBypassProjects: () => {
const url = baseUrl + "auto-replacements-bypass-projects";
return fetch(url, {
headers: { "Content-Type": "application/json" },
}).then((resp) => resp.json());
},
createNewConfig: (
key: ConfigKey,
value: ConfigValue,
Expand Down
38 changes: 38 additions & 0 deletions snuba/admin/static/auto_replacements_bypass_projects/index.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import React, { useState, useEffect } from "react";
import { AutoReplacementsBypassProjectsData } from "SnubaAdmin/auto_replacements_bypass_projects/types";

import Client from "SnubaAdmin/api_client";
import { Table } from "../table";

function AutoReplacementsBypassProjects(props: { api: Client }) {
const [data, setData] = useState<AutoReplacementsBypassProjectsData[] | null>(
null
);

useEffect(() => {
props.api.getAutoReplacementsBypassProjects().then((res) => {
setData(res);
});
}, []);

if (!data) {
return null;
}

const rowData = data.map(({ projectID, expiry }) => {
return [projectID, expiry];
});

return (
<div>
<div>
<Table
headerData={["Project ID", "Expiration Time"]}
rowData={rowData}
/>
</div>
</div>
);
}

export default AutoReplacementsBypassProjects;
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { ReactNode } from "react";

type AutoReplacementsBypassProjectsData = {
projectID: number;
expiry: string;
};

export { AutoReplacementsBypassProjectsData };
6 changes: 6 additions & 0 deletions snuba/admin/static/data.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import RuntimeConfig from "SnubaAdmin/runtime_config";
import AutoReplacementsBypassProjects from "SnubaAdmin/auto_replacements_bypass_projects";
import AuditLog from "SnubaAdmin/runtime_config/auditlog";
import ClickhouseMigrations from "SnubaAdmin/clickhouse_migrations";
import ClickhouseQueries from "SnubaAdmin/clickhouse_queries";
Expand All @@ -16,6 +17,11 @@ import Welcome from "SnubaAdmin/welcome";
const NAV_ITEMS = [
{ id: "overview", display: "🤿 Snuba Admin", component: Welcome },
{ id: "config", display: "⚙️ Runtime Config", component: RuntimeConfig },
{
id: "auto-replacements-bypass-projects",
display: "👻 Replacements",
component: AutoReplacementsBypassProjects,
},
{
id: "capacity-management",
display: "🪫 Capacity Management",
Expand Down
1 change: 1 addition & 0 deletions snuba/admin/tool_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class AdminTools(Enum):

ALL = "all"
CONFIGURATION = "configuration"
AUTO_REPLACEMENTS_BYPASS_PROJECTS = "auto-replacements-bypass-projects"
SNQL_TO_SQL = "snql-to-sql"
SYSTEM_QUERIES = "system-queries"
MIGRATIONS = "clickhouse-migrations"
Expand Down
19 changes: 19 additions & 0 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
from contextlib import redirect_stdout
from dataclasses import asdict
from datetime import datetime
from typing import Any, List, Mapping, Optional, Sequence, Tuple, cast

import sentry_sdk
Expand Down Expand Up @@ -69,6 +70,9 @@
from snuba.migrations.groups import MigrationGroup, get_group_readiness_state
from snuba.migrations.runner import MigrationKey, Runner
from snuba.query.exceptions import InvalidQueryException
from snuba.replacers.replacements_and_expiry import (
get_config_auto_replacements_bypass_projects,
)
from snuba.state.explain_meta import explain_cleanup, get_explain_meta
from snuba.utils.metrics.timer import Timer
from snuba.web.views import dataset_query
Expand Down Expand Up @@ -345,6 +349,21 @@ def kafka_topics() -> Response:
return make_response(jsonify(get_broker_data()), 200)


@application.route("/auto-replacements-bypass-projects")
@check_tool_perms(tools=[AdminTools.AUTO_REPLACEMENTS_BYPASS_PROJECTS])
def auto_replacements_bypass_projects() -> Response:
def serialize(project_id: int, expiry: datetime) -> Any:
return {"projectID": project_id, "expiry": str(expiry)}

data = [
serialize(project_id, expiry)
for [project_id, expiry] in get_config_auto_replacements_bypass_projects(
datetime.now()
).items()
]
return Response(json.dumps(data), 200, {"Content-Type": "application/json"})


# Sample cURL command:
#
# curl -X POST \
Expand Down
7 changes: 5 additions & 2 deletions snuba/replacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
)
from snuba.datasets.storage import WritableTableStorage
from snuba.processor import InvalidMessageVersion
from snuba.redis import RedisClientKey, get_redis_client
from snuba.replacers.errors_replacer import Replacement as ErrorReplacement
from snuba.replacers.replacements_and_expiry import (
redis_client,
set_config_auto_replacements_bypass_projects,
)
from snuba.replacers.replacer_processor import (
Replacement,
ReplacementMessage,
Expand All @@ -52,7 +55,6 @@
logger = logging.getLogger("snuba.replacer")

executor = ThreadPoolExecutor()
redis_client = get_redis_client(RedisClientKey.REPLACEMENTS_STORE)
NODES_REFRESH_PERIOD = 10

RESET_CHECK_CONFIG = "consumer_groups_to_reset_offset_check"
Expand Down Expand Up @@ -589,6 +591,7 @@ def _attempt_emitting_metric_for_projects_exceeding_limit(
projects_exceeding_limit = (
self.__processing_time_counter.get_projects_exceeding_limit()
)
set_config_auto_replacements_bypass_projects(projects_exceeding_limit, end_time)
logger.info(
"projects_exceeding_limit = {}".format(
",".join(str(project_id) for project_id in projects_exceeding_limit)
Expand Down
17 changes: 14 additions & 3 deletions snuba/replacers/errors_replacer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
_hashify,
)
from snuba.replacers.projects_query_flags import ProjectsQueryFlags
from snuba.replacers.replacements_and_expiry import (
get_config_auto_replacements_bypass_projects,
)
from snuba.replacers.replacer_processor import Replacement as ReplacementBase
from snuba.replacers.replacer_processor import (
ReplacementMessage,
Expand Down Expand Up @@ -203,9 +206,17 @@ def process_message(
raise InvalidMessageType("Invalid message type: {}".format(type_))

if processed is not None:
bypass_projects = get_config("replacements_bypass_projects", "[]")
projects = json.loads(cast(str, bypass_projects))
if processed.get_project_id() in projects:
manual_bypass_projects = get_config("replacements_bypass_projects", "[]")
auto_bypass_projects = list(
get_config_auto_replacements_bypass_projects(datetime.now()).keys()
)
projects_to_skip = auto_bypass_projects
if manual_bypass_projects is not None:
try:
projects_to_skip.extend(json.loads(manual_bypass_projects))
except Exception as e:
logger.exception(e)
if processed.get_project_id() in projects_to_skip:
# For a persistent non rate limited logger
logger.info(
f"Skipping replacement for project. Data {message}, Partition: {message.metadata.partition_index}, Offset: {message.metadata.offset}",
Expand Down
71 changes: 71 additions & 0 deletions snuba/replacers/replacements_and_expiry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

import logging
import typing
from datetime import datetime, timedelta
from typing import Mapping, Sequence

logger = logging.getLogger(__name__)


from snuba.redis import RedisClientKey, get_redis_client
from snuba.state import get_int_config

redis_client = get_redis_client(RedisClientKey.REPLACEMENTS_STORE)
config_auto_replacements_bypass_projects_hash = (
"snuba-config-auto-replacements-bypass-projects-hash"
)

REPLACEMENTS_EXPIRY_WINDOW_MINUTES_KEY = "replacements_expiry_window_minutes"


def set_config_auto_replacements_bypass_projects(
new_project_ids: Sequence[int], curr_time: datetime
) -> None:
try:
projects_within_expiry = get_config_auto_replacements_bypass_projects(curr_time)
expiry_window = typing.cast(
int, get_int_config(key=REPLACEMENTS_EXPIRY_WINDOW_MINUTES_KEY, default=5)
)
pipeline = redis_client.pipeline()
for project_id in new_project_ids:
if project_id not in projects_within_expiry:
expiry = curr_time + timedelta(minutes=expiry_window)
pipeline.hset(
config_auto_replacements_bypass_projects_hash,
project_id,
expiry.isoformat(),
)
pipeline.execute()
except Exception as e:
logger.exception(e)


def _retrieve_projects_from_redis() -> Mapping[int, datetime]:
try:
return {
int(k.decode("utf-8")): datetime.fromisoformat(v.decode("utf-8"))
for k, v in redis_client.hgetall(
config_auto_replacements_bypass_projects_hash
).items()
}
except Exception as e:
logger.exception(e)
return {}


def get_config_auto_replacements_bypass_projects(
curr_time: datetime,
) -> Mapping[int, datetime]:
curr_projects = _retrieve_projects_from_redis()

valid_projects = {}
pipeline = redis_client.pipeline()
for project_id in curr_projects:
if curr_projects[project_id] < curr_time:
pipeline.hdel(config_auto_replacements_bypass_projects_hash, project_id)
else:
valid_projects[project_id] = curr_projects[project_id]
pipeline.execute()

return valid_projects
3 changes: 0 additions & 3 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,6 @@ class RedisClusters(TypedDict):
ENTITY_CONFIG_FILES_GLOB = f"{CONFIG_FILES_PATH}/**/entities/*.yaml"
DATASET_CONFIG_FILES_GLOB = f"{CONFIG_FILES_PATH}/**/dataset.yaml"

# Counter utility class window size in minutes
COUNTER_WINDOW_SIZE_MINUTES = 10


# Slicing Configuration

Expand Down
14 changes: 9 additions & 5 deletions snuba/utils/bucket_timer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

import typing
from collections import defaultdict
from datetime import datetime, timedelta
from typing import List, MutableMapping

from snuba import environment, settings, state
from snuba import environment, state
from snuba.state import get_int_config
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "bucket_timer")
Expand All @@ -22,8 +24,6 @@ def ceil_minute(time: datetime) -> datetime:

Buckets = MutableMapping[datetime, MutableMapping[int, timedelta]]

COUNTER_WINDOW_SIZE = timedelta(minutes=settings.COUNTER_WINDOW_SIZE_MINUTES)


class Counter:
"""
Expand All @@ -39,11 +39,15 @@ def __init__(self, consumer_group: str) -> None:

percentage = state.get_config("project_quota_time_percentage", 1.0)
assert isinstance(percentage, float)
self.limit = COUNTER_WINDOW_SIZE * percentage
counter_window_size_minutes = typing.cast(
int, get_int_config(key="counter_window_size_minutes", default=10)
)
self.counter_window_size = timedelta(minutes=counter_window_size_minutes)
self.limit = self.counter_window_size * percentage

def __trim_expired_buckets(self, now: datetime) -> None:
current_minute = floor_minute(now)
window_start = current_minute - COUNTER_WINDOW_SIZE
window_start = current_minute - self.counter_window_size
new_buckets: Buckets = {}
for min, dict in self.buckets.items():
if min >= window_start:
Expand Down
1 change: 1 addition & 0 deletions tests/consumers/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def _generate_tests() -> Iterator[Case]:


@pytest.mark.parametrize("case", _generate_tests())
@pytest.mark.redis_db
def test_all_schemas(case: Case) -> None:
"""
"Assert" that no message processor crashes under the example payloads in
Expand Down
Loading

0 comments on commit 9f0ffea

Please sign in to comment.