Skip to content

Commit

Permalink
Merge branch 'master' into volo/eap/virtual_columns
Browse files Browse the repository at this point in the history
  • Loading branch information
volokluev committed Sep 12, 2024
2 parents d2e63df + bdfe450 commit 7077059
Show file tree
Hide file tree
Showing 19 changed files with 438 additions and 10 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ RUN set -ex; \
runtimeDeps=' \
curl \
libjemalloc2 \
gdb \
heaptrack \
'; \
apt-get update; \
apt-get install -y $buildDeps $runtimeDeps --no-install-recommends; \
Expand Down
4 changes: 4 additions & 0 deletions docker_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ else
printf "\n${help_result}"
fi

if [ -n "${ENABLE_HEAPTRACK:-}" ]; then
set -- heaptrack "$@"
fi

exec "$@"
10 changes: 8 additions & 2 deletions rust_snuba/src/processors/eap_spans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ struct EAPSpan {
name: String, //aka description

sampling_factor: f64,
sampling_weight: f64,
sampling_weight: f64, //remove eventually
sampling_weight_2: u64,
sign: u8, //1 for additions, -1 for deletions - for this worker it should be 1

#(
Expand Down Expand Up @@ -101,7 +102,8 @@ impl From<FromSpanMessage> for EAPSpan {
retention_days: from.retention_days,
name: from.description.unwrap_or_default(),

sampling_weight: 1.,
sampling_weight: 1., //remove eventually
sampling_weight_2: 1,
sampling_factor: 1.,
sign: 1,

Expand Down Expand Up @@ -153,6 +155,7 @@ impl From<FromSpanMessage> for EAPSpan {
if k == "client_sample_rate" && v.value != 0.0 {
res.sampling_factor = v.value;
res.sampling_weight = 1.0 / v.value;
res.sampling_weight_2 = (1.0 / v.value) as u64;
} else {
insert_num(k.clone(), v.value);
}
Expand Down Expand Up @@ -217,6 +220,9 @@ mod tests {
"measurements": {
"num_of_spans": {
"value": 50.0
},
"client_sample_rate": {
"value": 0.01
}
},
"organization_id": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ expression: span
"exclusive_time_ms": 0.228,
"retention_days": 90,
"name": "/api/0/relays/projectconfigs/",
"sampling_factor": 1.0,
"sampling_weight": 1.0,
"sampling_factor": 0.01,
"sampling_weight": 100.0,
"sampling_weight_2": 100,
"sign": 1,
"attr_str_0": {
"relay_protocol_version": "3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ expression: snapshot_payload
"retention_days": 90,
"sampling_factor": 1.0,
"sampling_weight": 1.0,
"sampling_weight_2": 1,
"segment_id": 16045690984833335023,
"segment_name": "/organizations/:orgId/issues/",
"service": "1",
Expand Down
28 changes: 28 additions & 0 deletions snuba/cli/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import Tuple

import click

from snuba.manual_jobs.job_loader import JobLoader


@click.group()
def jobs() -> None:
pass


@jobs.command()
@click.argument("job_name")
@click.option(
"--dry_run",
default=True,
)
@click.argument("pairs", nargs=-1)
def run(*, job_name: str, dry_run: bool, pairs: Tuple[str, ...]) -> None:

kwargs = {}
for pair in pairs:
k, v = pair.split("=")
kwargs[k] = v

job_to_run = JobLoader.get_job_instance(job_name, dry_run, **kwargs)
job_to_run.execute()
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ schema:
{ name: name, type: String },
{ name: sampling_factor, type: Float, args: { size: 64 } },
{ name: sampling_weight, type: Float, args: { size: 64 } },
{ name: sampling_weight_2, type: UInt, args: { size: 64 } },
{ name: sign, type: Int, args: { size: 8 } },
{ name: attr_str, type: Map, args: { key: { type: String }, value: { type: String } } },
{ name: attr_num, type: Map, args: { key: { type: String }, value: { type: Float, args: { size: 64 } } } },
Expand All @@ -39,6 +40,7 @@ storages:
from_col_name: timestamp
to_table_name: null
to_col_name: _sort_timestamp

subscriptables:
- mapper: SubscriptableHashBucketMapper
args:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ schema:
{ name: name, type: String },
{ name: sampling_factor, type: Float, args: { size: 64 } },
{ name: sampling_weight, type: Float, args: { size: 64 } },
{ name: sampling_weight_2, type: UInt, args: { size: 64 } },
{ name: sign, type: Int, args: { size: 8 } },
{ name: attr_str_0, type: Map, args: { key: { type: String }, value: { type: String } } },
{ name: attr_str_1, type: Map, args: { key: { type: String }, value: { type: String } } },
Expand Down
20 changes: 20 additions & 0 deletions snuba/datasets/readiness_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@


class ReadinessState(Enum):
"""
Readiness states are essentially feature flags for snuba datasets.
The readiness state defines whether or not a dataset is made available
in specific sentry environments.
Currently, sentry environments include the following:
* local/CI
* SaaS
* S4S
* Self-Hosted
* Single-Tenant
The following is a list of readiness states and the environments
they map to:
* limited -> local/CI
* experimental -> local/CI, S4S
* partial -> local/CI, SaaS, S4S
* deprecate -> local/CI, Self-Hosted
* complete -> local/CI, SaaS, S4S, Self-Hosted, Single-Tenant
"""

LIMITED = "limited"
DEPRECATE = "deprecate"
PARTIAL = "partial"
Expand Down
29 changes: 29 additions & 0 deletions snuba/manual_jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
from abc import ABC, abstractmethod
from typing import Any, cast

from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory


class Job(ABC, metaclass=RegisteredClass):
def __init__(self, dry_run: bool, **kwargs: Any) -> None:
self.dry_run = dry_run
for k, v in kwargs.items():
setattr(self, k, v)

@abstractmethod
def execute(self) -> None:
pass

@classmethod
def config_key(cls) -> str:
return cls.__name__

@classmethod
def get_from_name(cls, name: str) -> "Job":
return cast("Job", cls.class_from_name(name))


import_submodules_in_directory(
os.path.dirname(os.path.realpath(__file__)), "snuba.manual_jobs"
)
15 changes: 15 additions & 0 deletions snuba/manual_jobs/job_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from typing import Any, cast

from snuba.manual_jobs import Job


class JobLoader:
@staticmethod
def get_job_instance(class_name: str, dry_run: bool, **kwargs: Any) -> "Job":
job_type_class = Job.class_from_name(class_name)
if job_type_class is None:
raise Exception(
f"Job does not exist. Did you make a file {class_name}.py yet?"
)

return cast("Job", job_type_class(dry_run=dry_run, **kwargs))
19 changes: 19 additions & 0 deletions snuba/manual_jobs/toy_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Any

import click

from snuba.manual_jobs import Job


class ToyJob(Job):
def __init__(self, dry_run: bool, **kwargs: Any):
super().__init__(dry_run, **kwargs)

def _build_query(self) -> str:
if self.dry_run:
return "dry run query"
else:
return "not dry run query"

def execute(self) -> None:
click.echo("executing query `" + self._build_query() + "`")
2 changes: 1 addition & 1 deletion snuba/pipeline/stages/query_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ def _process_data(
if translated_storage_query:
return translated_storage_query

run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings)
if isinstance(query, LogicalQuery) and isinstance(
query.get_from_clause(), Entity
):
run_entity_validators(cast(EntityQuery, query), pipe_input.query_settings)
return run_entity_processing_executor(query, pipe_input.query_settings)
elif isinstance(query, CompositeQuery):
# if we were not able to translate the storage query earlier and we got to this point, this is
Expand Down
41 changes: 38 additions & 3 deletions snuba/query/processors/logical/hash_bucket_functions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Sequence

from snuba.query.expressions import Column, Expression, FunctionCall
from snuba.query.expressions import Column, Expression, FunctionCall, Literal
from snuba.query.logical import Query
from snuba.query.processors.logical import LogicalQueryProcessor
from snuba.query.query_settings import QuerySettings
from snuba.utils.constants import ATTRIBUTE_BUCKETS
from snuba.utils.hashes import fnv_1a


class HashBucketFunctionTransformer(LogicalQueryProcessor):
Expand All @@ -17,6 +18,8 @@ class HashBucketFunctionTransformer(LogicalQueryProcessor):
This transformer converts mapKeys(attr_str) to arrayConcat(mapKeys(attr_str_0), mapKeys(attr_str_1), ...)
and the same for mapValues
It converts mapExists(attr_str, 'blah') to mapExists(attr_str_{hash('blah')%20}, 'blah')
"""

def __init__(
Expand All @@ -26,7 +29,7 @@ def __init__(
self.hash_bucket_names = hash_bucket_names

def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def transform_expression(exp: Expression) -> Expression:
def transform_map_keys_and_values_expression(exp: Expression) -> Expression:
if not isinstance(exp, FunctionCall):
return exp

Expand Down Expand Up @@ -62,4 +65,36 @@ def transform_expression(exp: Expression) -> Expression:
),
)

query.transform_expressions(transform_expression)
def transform_map_contains_expression(exp: Expression) -> Expression:
if not isinstance(exp, FunctionCall):
return exp

if len(exp.parameters) != 2:
return exp

column = exp.parameters[0]
if not isinstance(column, Column):
return exp

if column.column_name not in self.hash_bucket_names:
return exp

if exp.function_name != "mapContains":
return exp

key = exp.parameters[1]
if not isinstance(key, Literal) or not isinstance(key.value, str):
return exp

bucket_idx = fnv_1a(key.value.encode("utf-8")) % ATTRIBUTE_BUCKETS
return FunctionCall(
alias=exp.alias,
function_name=exp.function_name,
parameters=(
Column(None, None, f"{column.column_name}_{bucket_idx}"),
key,
),
)

query.transform_expressions(transform_map_keys_and_values_expression)
query.transform_expressions(transform_map_contains_expression)
Loading

0 comments on commit 7077059

Please sign in to comment.