Skip to content

Commit

Permalink
api tests are passing, reworked pipeline interface a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
volokluev committed Mar 20, 2024
1 parent 3d4395f commit 763507b
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 39 deletions.
4 changes: 4 additions & 0 deletions snuba/datasets/plans/storage_plan_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
ReadableTableStorage,
StorageNotAvailable,
)
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query.allocation_policies import AllocationPolicy
from snuba.query.data_source.simple import Table
from snuba.query.logical import Query as LogicalQuery
Expand Down Expand Up @@ -93,6 +94,7 @@ def get_query_data_source(
allocation_policies: list[AllocationPolicy],
final: bool,
sampling_rate: Optional[float],
storage_key: StorageKey,
) -> Table:
assert isinstance(relational_source, TableSource)
return Table(
Expand All @@ -102,6 +104,7 @@ def get_query_data_source(
final=final,
sampling_rate=sampling_rate,
mandatory_conditions=relational_source.get_mandatory_conditions(),
storage_key=storage_key,
)


Expand Down Expand Up @@ -201,6 +204,7 @@ def build_and_rank_plans(
allocation_policies=storage.get_allocation_policies(),
final=query.get_final(),
sampling_rate=query.get_sample(),
storage_key=storage.get_storage_key(),
)
)

Expand Down
18 changes: 15 additions & 3 deletions snuba/pipeline/query_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,27 @@ class QueryPipelineStage(Generic[Tin, Tout]):
>>> return QueryPipelineResult(None, e)
"""

def _process_error(self, error: Exception) -> Union[Tout, Exception]:
"""default behaviour is to just pass through to the next stage of the pipeline
Can be overridden to do something else"""
return error

@abstractmethod
def _execute(self, input: QueryPipelineResult[Tin]) -> QueryPipelineResult[Tout]:
def _process_data(self, data: Tin) -> Tout:
raise NotImplementedError

def execute(self, input: QueryPipelineResult[Tin]) -> QueryPipelineResult[Tout]:
if input.error:
# Forward the error to next stage of pipeline
return QueryPipelineResult(None, input.error)
return self._execute(input)
res = self._process_error(input.error)
if isinstance(res, Exception):
return QueryPipelineResult(data=None, error=res)
else:
return QueryPipelineResult(data=res, error=None)
try:
return QueryPipelineResult(data=self._process_data(input.data), error=None)
except Exception as e:
return QueryPipelineResult(data=None, error=e)


class InvalidQueryPipelineResult(Exception):
Expand Down
2 changes: 2 additions & 0 deletions snuba/query/data_source/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from snuba.clickhouse.columns import ColumnSet as PhysicalColumnSet
from snuba.datasets.entities.entity_data_model import EntityColumnSet
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query.allocation_policies import DEFAULT_PASSTHROUGH_POLICY, AllocationPolicy
from snuba.query.data_source import DataSource
from snuba.query.expressions import FunctionCall
Expand Down Expand Up @@ -58,6 +59,7 @@ class Table(SimpleDataSource):

table_name: str
schema: PhysicalColumnSet
storage_key: StorageKey
# By default a table has a regular passthrough policy.
# this is overwridden by the query pipeline if there
# is one defined on the storage.
Expand Down
149 changes: 113 additions & 36 deletions snuba/web/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import textwrap
from dataclasses import replace
from functools import partial
from math import floor
from typing import Any, MutableMapping, Optional, Union

Expand All @@ -14,14 +13,14 @@
from snuba.attribution.attribution_info import AttributionInfo
from snuba.clickhouse.formatter.query import format_query
from snuba.clickhouse.query import Query
from snuba.query.logical import Query as LogicalQuery
from snuba.clickhouse.query_inspector import TablesCollector
from snuba.datasets.dataset import Dataset
from snuba.datasets.factory import get_dataset_name
from snuba.pipeline.query_pipeline import QueryPipelineResult
from snuba.query.composite import CompositeQuery
from snuba.query.data_source.simple import Table
from snuba.query.exceptions import QueryPlanException
from snuba.query.logical import Query as LogicalQuery
from snuba.query.query_settings import QuerySettings
from snuba.querylog import record_query
from snuba.querylog.query_metadata import (
Expand Down Expand Up @@ -136,72 +135,150 @@ def _run_query_pipeline(
"""


record_missing_use_case_id(request, dataset)
record_subscription_created_missing_tenant_ids(request)

from snuba.pipeline.query_pipeline import QueryPipelineStage
from snuba.datasets.plans.query_plan import QueryRunner
from snuba.pipeline.composite import CompositeExecutionPipeline
from snuba.clusters.cluster import Cluster
from snuba.datasets.entities.factory import get_entity
from snuba.pipeline.composite import CompositeExecutionPipeline
from snuba.pipeline.query_pipeline import QueryPipelineStage


class EntityAndStoragePipelineStage(QueryPipelineStage[Request, tuple[Query, QuerySettings, Reader, str]]):
def _execute(self, input: QueryPipelineResult[Request]) -> QueryPipelineResult[tuple[Query, QuerySettings, Reader, str]]:
class EntityAndStoragePipelineStage(
QueryPipelineStage[Request, tuple[Query, QuerySettings]]
):
def _process_data(self, data: Request) -> tuple[Query, QuerySettings]:
_clickhouse_query = None
_reader = None
_cluster_name = None
_query_settings = None

def _query_runner(
clickhouse_query: Union[Query, CompositeQuery[Table]],
query_settings: QuerySettings,
reader: Reader,
cluster_name: str,
) -> QueryResult:
nonlocal _clickhouse_query, _reader, _cluster_name, _query_settings
nonlocal _clickhouse_query, _query_settings
_clickhouse_query = clickhouse_query
_reader = reader
_query_settings = query_settings
_cluster_name = cluster_name
if isinstance(input.data.query, LogicalQuery):
entity = get_entity(input.data.query.get_from_clause().key)
execution_pipeline = entity.get_query_pipeline_builder().build_execution_pipeline(
request, _query_runner

if isinstance(data.query, LogicalQuery):
entity = get_entity(data.query.get_from_clause().key)
execution_pipeline = (
entity.get_query_pipeline_builder().build_execution_pipeline(
request, _query_runner
)
)
else:
execution_pipeline = CompositeExecutionPipeline(
input.data.query, input.data.query_settings, _query_runner
data.query, data.query_settings, _query_runner
)

execution_pipeline.execute()
return QueryPipelineResult(data=(_clickhouse_query, _query_settings, _reader, _cluster_name), error=None)


class ExecutionStage(QueryPipelineStage[tuple[Query, QuerySettings, Reader, str], QueryResult]):
def _execute(self, input: QueryPipelineResult[Query]) -> QueryPipelineResult[QueryResult]:
# run the query processors and stuff
return (_clickhouse_query, _query_settings)

from snuba.datasets.plans.cluster_selector import ColumnBasedStorageSliceSelector
from snuba.datasets.slicing import is_storage_set_sliced
from snuba.datasets.storage import ReadableTableStorage
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.query import ProcessableQuery
from snuba.query.data_source.join import IndividualNode, JoinClause, JoinVisitor
from snuba.query.data_source.visitor import DataSourceVisitor

class StorageKeyJoinFinder(JoinVisitor[StorageKey, Table]):
"""
Produces all the viable ClickhouseQueryPlans for each subquery
in the join.
"""

def visit_individual_node(self, node: IndividualNode[Table]) -> StorageKey:
if isinstance(node.data_source, ProcessableQuery):
return node.data_source.get_from_clause().storage_key
else:
return node.data_source.storage_key

def visit_join_clause(self, node: JoinClause[Table]) -> StorageKey:
left_storage_key = node.left_node.accept(self)
right_storage_key = node.right_node.accept(self)
if is_storage_set_sliced(
get_storage(left_storage_key).get_storage_set_key()
):
return left_storage_key
return right_storage_key

class StorageKeyFinder(DataSourceVisitor[StorageKey, Table]):
"""
Given a query, finds the storage_set_key from which to get the cluster
In the case of a join, it will select an arbitrary storage_set_key that is in
the from_clause. Storages that are sliced are given higher priority
"""

def _visit_simple_source(self, data_source: Table) -> StorageKey:
return data_source.storage_key

def _visit_join(self, data_source: JoinClause[Table]) -> StorageKey:
return data_source.accept(StorageKeyJoinFinder())

def _visit_simple_query(
self, data_source: ProcessableQuery[Table]
) -> StorageKey:
return data_source.get_from_clause().storage_key

def _visit_composite_query(
self, data_source: CompositeQuery[Table]
) -> StorageKey:
return self.visit(data_source.get_from_clause())

class ExecutionStage(QueryPipelineStage[tuple[Query, QuerySettings], QueryResult]):
def get_cluster(self, query: Query, settings: QuerySettings) -> Cluster:
storage_key = StorageKeyFinder().visit(query)
storage = get_storage(storage_key)
if is_storage_set_sliced(storage.get_storage_set_key()):
with sentry_sdk.start_span(
op="build_plan.sliced_storage", description="select_storage"
):
assert (
self.__partition_key_column_name is not None
), "partition key column name must be defined for a sliced storage"
assert isinstance(storage, ReadableTableStorage)
return ColumnBasedStorageSliceSelector(
storage=storage.get_storage_key(),
storage_set=storage.get_storage_set_key(),
partition_key_column_name=self.__partition_key_column_name,
).select_cluster(query, settings)
return storage.get_cluster()

def _process_data(self, data: tuple[Query, QuerySettings]) -> QueryResult:
clickhouse_query = data[0]
query_settings = data[1]
cluster = self.get_cluster(clickhouse_query, query_settings)
if request.query_settings.get_dry_run():
return _dry_run_query_runner(
clickhouse_query=input.data[0],
query_settings=input.data[1],
reader=Reader(input.data[0].get_from_clause().key),
cluster_name=input.data[0].get_from_clause().key.cluster,
clickhouse_query=data[0],
query_settings=data[1],
reader=Reader(data[0].get_from_clause().key),
cluster_name=data[0].get_from_clause().key.cluster,
)
else:
return _run_and_apply_column_names(
return _run_and_apply_column_names(
timer=timer,
query_metadata=query_metadata,
attribution_info=request.attribution_info,
robust=robust,
concurrent_queries_gauge=concurrent_queries_gauge,
clickhouse_query=input.data[0],
query_settings=input.data[1],
reader=input.data[2],
cluster_name=input.data[3]
clickhouse_query=clickhouse_query,
query_settings=query_settings,
reader=cluster.get_reader(),
cluster_name=cluster.get_clickhouse_cluster_name() or "",
)

clickhouse_query = EntityAndStoragePipelineStage().execute(QueryPipelineResult(data=request, error=None))
return ExecutionStage().execute(clickhouse_query)
clickhouse_query = EntityAndStoragePipelineStage().execute(
QueryPipelineResult(data=request, error=None)
)
res = ExecutionStage().execute(clickhouse_query)
if res.error:
raise res.error
return res.data


def record_missing_use_case_id(request: Request, dataset: Dataset) -> None:
Expand Down

0 comments on commit 763507b

Please sign in to comment.