Skip to content

Commit

Permalink
chore: rebase main
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed May 15, 2024
1 parent 7cca788 commit 253110d
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ pub enum Error {
#[snafu(display("Failed to build time range filters for value: {:?}", timestamp))]
BuildTimeRangeFilter {
timestamp: Timestamp,
#[snafu(implicit)]
location: Location,
},
}
Expand Down
86 changes: 45 additions & 41 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use datafusion_common::arrow::array::BooleanArray;
use datafusion_common::arrow::buffer::BooleanBuffer;
use common_time::Timestamp;
use datafusion_common::ScalarValue;
use datafusion_expr::Operator;
use datafusion_expr::Expr;
use datafusion_expr::{Expr, Operator};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::ConcreteDataType;
use itertools::Itertools;
Expand Down Expand Up @@ -248,7 +246,7 @@ impl ParquetReaderBuilder {
};

if let Some(time_range) = &self.time_range {
predicate.extend(time_range_to_predicate(*time_range, &region_meta)?);
filters.extend(time_range_to_predicate(*time_range, &region_meta)?);
}

let codec = McmpRowCodec::new(
Expand Down Expand Up @@ -464,55 +462,47 @@ impl ParquetReaderBuilder {
fn time_range_to_predicate(
time_range: TimestampRange,
metadata: &RegionMetadataRef,
) -> Result<Vec<SimpleFilterEvaluator>> {
) -> Result<Vec<SimpleFilterContext>> {
let ts_col = metadata.time_index_column();
let ts_col_id = ts_col.column_id;

let ts_lit = |val: i64| match ts_col
.column_schema
.data_type
.as_timestamp()
.unwrap()
.unit()
{
TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None),
TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None),
TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None),
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None),
let ts_to_filter = |op: Operator, timestamp: &Timestamp| {
let value = match timestamp.unit() {
TimeUnit::Second => ScalarValue::TimestampSecond(Some(timestamp.value()), None),
TimeUnit::Millisecond => {
ScalarValue::TimestampMillisecond(Some(timestamp.value()), None)
}
TimeUnit::Microsecond => {
ScalarValue::TimestampMicrosecond(Some(timestamp.value()), None)
}
TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(timestamp.value()), None),
};
let evaluator = SimpleFilterEvaluator::new(ts_col.column_schema.name.clone(), value, op)
.context(error::BuildTimeRangeFilterSnafu {
timestamp: *timestamp,
})?;
Ok(SimpleFilterContext::new(
evaluator,
ts_col_id,
SemanticType::Timestamp,
ts_col.column_schema.data_type.clone(),
))
};

let predicates = match (time_range.start(), time_range.end()) {
(Some(start), Some(end)) => {
vec![
SimpleFilterEvaluator::new(
ts_col.column_schema.name.clone(),
ts_lit(start.value()),
Operator::GtEq,
)
.context(error::BuildTimeRangeFilterSnafu { timestamp: *start })?,
SimpleFilterEvaluator::new(
ts_col.column_schema.name.clone(),
ts_lit(end.value()),
Operator::Lt,
)
.context(error::BuildTimeRangeFilterSnafu { timestamp: *end })?,
ts_to_filter(Operator::GtEq, start)?,
ts_to_filter(Operator::Lt, end)?,
]
}

(Some(start), None) => {
vec![SimpleFilterEvaluator::new(
ts_col.column_schema.name.clone(),
ts_lit(start.value()),
Operator::GtEq,
)
.context(error::BuildTimeRangeFilterSnafu { timestamp: *start })?]
vec![ts_to_filter(Operator::GtEq, start)?]
}

(None, Some(end)) => {
vec![SimpleFilterEvaluator::new(
ts_col.column_schema.name.clone(),
ts_lit(end.value()),
Operator::Lt,
)
.context(error::BuildTimeRangeFilterSnafu { timestamp: *end })?]
vec![ts_to_filter(Operator::Lt, end)?]
}
(None, None) => {
vec![]
Expand Down Expand Up @@ -642,6 +632,20 @@ pub(crate) struct SimpleFilterContext {
}

impl SimpleFilterContext {
fn new(
filter: SimpleFilterEvaluator,
column_id: ColumnId,
semantic_type: SemanticType,
data_type: ConcreteDataType,
) -> Self {
Self {
filter,
column_id,
semantic_type,
data_type,
}
}

/// Creates a context for the `expr`.
///
/// Returns None if the column to filter doesn't exist in the SST metadata or the
Expand Down

0 comments on commit 253110d

Please sign in to comment.