Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for CPU tuple cost in DecompressChunk #7551

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/gh_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import json
import os
import random
import subprocess
from ci_settings import (
PG14_EARLIEST,
Expand Down Expand Up @@ -309,11 +310,13 @@ def macos_config(overrides):
sys.exit(1)

if tests:
to_run = list(tests) * 20
random.shuffle(to_run)
m["include"].append(
build_debug_config(
{
"coverage": False,
"installcheck_args": f'TESTS="{" ".join(list(tests) * 20)}"',
"installcheck_args": f'TESTS="{" ".join(to_run)}"',
"name": "Flaky Check Debug",
"pg": PG16_LATEST,
"pginstallcheck": False,
Expand All @@ -324,7 +327,7 @@ def macos_config(overrides):
build_debug_config(
{
"coverage": False,
"installcheck_args": f'TESTS="{" ".join(list(tests) * 20)}"',
"installcheck_args": f'TESTS="{" ".join(to_run)}"',
"name": "Flaky Check Debug",
"pg": PG17_LATEST,
"pginstallcheck": False,
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/chunkwise_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ tsl_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_re
bool can_sort = grouping_is_sortable(parse->groupClause);

/* Is hashing possible ? */
bool can_hash = grouping_is_hashable(parse->groupClause) &&
bool can_hash = parse->groupClause != NIL && grouping_is_hashable(parse->groupClause) &&
!ts_is_gapfill_path(linitial(output_rel->pathlist)) && enable_hashagg;

Assert(extra != NULL);
Expand Down
8 changes: 5 additions & 3 deletions tsl/src/nodes/decompress_chunk/decompress_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,12 +430,14 @@ static void
cost_decompress_chunk(PlannerInfo *root, Path *path, Path *compressed_path)
{
/* startup_cost is cost before fetching first tuple */
if (compressed_path->rows > 0)
path->startup_cost = compressed_path->total_cost / compressed_path->rows;
const double compressed_rows = Max(1, compressed_path->rows);
path->startup_cost =
compressed_path->startup_cost +
(compressed_path->total_cost - compressed_path->startup_cost) / compressed_rows;

/* total_cost is cost for fetching all tuples */
path->rows = compressed_rows * TARGET_COMPRESSED_BATCH_SIZE;
path->total_cost = compressed_path->total_cost + path->rows * cpu_tuple_cost;
path->rows = compressed_path->rows * TARGET_COMPRESSED_BATCH_SIZE;
}

/* Smoothstep function S1 (the h01 cubic Hermite spline). */
Expand Down
25 changes: 19 additions & 6 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,21 @@ vector_agg_exec(CustomScanState *node)

compressed_batch_set_compressed_tuple(dcontext, batch_state, compressed_slot);

/*
* This is required for the proper EXPLAIN output for the underlying
* DecompressChunk node. In normal Postgres plan execution, it is
* updated by InstrStopNode() and used in InstrEndLoop().
*/
if (dcontext->ps->instrument)
{
dcontext->ps->instrument->running = true;
}

/*
* Skip the batch if it was fully filtered out by the vectorized filters.
*/
if (batch_state->next_batch_row >= batch_state->total_batch_rows)
{
/* This batch was fully filtered out. */
continue;
}

Expand All @@ -302,13 +314,14 @@ vector_agg_exec(CustomScanState *node)
const int not_filtered_rows =
arrow_num_valid(batch_state->vector_qual_result, batch_state->total_batch_rows);
InstrCountFiltered1(dcontext->ps, batch_state->total_batch_rows - not_filtered_rows);

/*
* This is required for the proper EXPLAIN output for the underlying
* DecompressChunk node. In normal Postgres plan execution, it is
* updated by InstrStopNode() and used in InstrEndLoop().
*/
if (dcontext->ps->instrument)
{
/*
* These values are normally updated by InstrStopNode(), and are
* required so that the calculations in InstrEndLoop() run properly.
*/
dcontext->ps->instrument->running = true;
dcontext->ps->instrument->tuplecount += not_filtered_rows;
}

Expand Down
70 changes: 35 additions & 35 deletions tsl/test/expected/compress_auto_sparse_index.out
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ select count(compress_chunk(x)) from show_chunks('sparse') x;
1
(1 row)

explain select * from sparse where value = 1;
explain (costs off) select * from sparse where value = 1;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.27..20.20 rows=76000 width=12)
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (value = '1'::double precision)
-> Seq Scan on compress_hyper_2_2_chunk (cost=0.00..20.20 rows=76 width=76)
-> Seq Scan on compress_hyper_2_2_chunk
Filter: ((_ts_meta_v2_min_value <= '1'::double precision) AND (_ts_meta_v2_max_value >= '1'::double precision))
(4 rows)

Expand All @@ -40,12 +40,12 @@ select count(compress_chunk(decompress_chunk(x))) from show_chunks('sparse') x;
1
(1 row)

explain select * from sparse where value = 1;
QUERY PLAN
--------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.02..17.80 rows=780000 width=12)
explain (costs off) select * from sparse where value = 1;
QUERY PLAN
------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (value = '1'::double precision)
-> Seq Scan on compress_hyper_2_3_chunk (cost=0.00..17.80 rows=780 width=76)
-> Seq Scan on compress_hyper_2_3_chunk
(3 rows)

reset timescaledb.auto_sparse_indexes;
Expand All @@ -55,33 +55,33 @@ select count(compress_chunk(decompress_chunk(x))) from show_chunks('sparse') x;
1
(1 row)

explain select * from sparse where value = 1;
explain (costs off) select * from sparse where value = 1;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.27..20.20 rows=76000 width=12)
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (value = '1'::double precision)
-> Seq Scan on compress_hyper_2_4_chunk (cost=0.00..20.20 rows=76 width=76)
-> Seq Scan on compress_hyper_2_4_chunk
Filter: ((_ts_meta_v2_min_value <= '1'::double precision) AND (_ts_meta_v2_max_value >= '1'::double precision))
(4 rows)

-- Should survive renames.
alter table sparse rename column value to wert;
explain select * from sparse where wert = 1;
explain (costs off) select * from sparse where wert = 1;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.27..20.20 rows=76000 width=12)
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (wert = '1'::double precision)
-> Seq Scan on compress_hyper_2_4_chunk (cost=0.00..20.20 rows=76 width=76)
-> Seq Scan on compress_hyper_2_4_chunk
Filter: ((_ts_meta_v2_min_wert <= '1'::double precision) AND (_ts_meta_v2_max_wert >= '1'::double precision))
(4 rows)

alter table sparse rename column wert to value;
explain select * from sparse where value = 1;
explain (costs off) select * from sparse where value = 1;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.27..20.20 rows=76000 width=12)
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (value = '1'::double precision)
-> Seq Scan on compress_hyper_2_4_chunk (cost=0.00..20.20 rows=76 width=76)
-> Seq Scan on compress_hyper_2_4_chunk
Filter: ((_ts_meta_v2_min_value <= '1'::double precision) AND (_ts_meta_v2_max_value >= '1'::double precision))
(4 rows)

Expand All @@ -94,12 +94,12 @@ select count(compress_chunk(decompress_chunk(x))) from show_chunks('sparse') x;
1
(1 row)

explain select * from sparse where value = 1;
QUERY PLAN
--------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.02..17.80 rows=780000 width=12)
explain (costs off) select * from sparse where value = 1;
QUERY PLAN
------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (value = '1'::double precision)
-> Seq Scan on compress_hyper_2_5_chunk (cost=0.00..17.80 rows=780 width=76)
-> Seq Scan on compress_hyper_2_5_chunk
(3 rows)

-- Not for other index types.
Expand All @@ -111,12 +111,12 @@ select count(compress_chunk(decompress_chunk(x))) from show_chunks('sparse') x;
1
(1 row)

explain select * from sparse where value = 1;
QUERY PLAN
--------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.02..17.80 rows=780000 width=12)
explain (costs off) select * from sparse where value = 1;
QUERY PLAN
------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (value = '1'::double precision)
-> Seq Scan on compress_hyper_2_6_chunk (cost=0.00..17.80 rows=780 width=76)
-> Seq Scan on compress_hyper_2_6_chunk
(3 rows)

-- When the chunk is recompressed without index, no sparse index is created.
Expand All @@ -127,12 +127,12 @@ select count(compress_chunk(decompress_chunk(x))) from show_chunks('sparse') x;
1
(1 row)

explain select * from sparse where value = 1;
QUERY PLAN
--------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=0.02..17.80 rows=780000 width=12)
explain (costs off) select * from sparse where value = 1;
QUERY PLAN
------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (value = '1'::double precision)
-> Seq Scan on compress_hyper_2_7_chunk (cost=0.00..17.80 rows=780 width=76)
-> Seq Scan on compress_hyper_2_7_chunk
(3 rows)

-- Long column names.
Expand All @@ -149,12 +149,12 @@ select count(compress_chunk(x)) from show_chunks('sparse') x;
1
(1 row)

explain select * from sparse where Abcdef012345678_Bbcdef012345678_Cbcdef012345678_Dbcdef0 = 1;
explain (costs off) select * from sparse where Abcdef012345678_Bbcdef012345678_Cbcdef012345678_Dbcdef0 = 1;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------
Custom Scan (DecompressChunk) on _hyper_1_1_chunk (cost=3.48..10.45 rows=3000 width=264)
Custom Scan (DecompressChunk) on _hyper_1_1_chunk
Vectorized Filter: (abcdef012345678_bbcdef012345678_cbcdef012345678_dbcdef0 = 1)
-> Seq Scan on compress_hyper_2_8_chunk (cost=0.00..10.45 rows=3 width=2092)
-> Seq Scan on compress_hyper_2_8_chunk
Filter: ((_ts_meta_v2_min_9218_abcdef012345678_bbcdef012345678_cbcdef0 <= 1) AND (_ts_meta_v2_max_9218_abcdef012345678_bbcdef012345678_cbcdef0 >= 1))
(4 rows)

16 changes: 8 additions & 8 deletions tsl/test/expected/compression.out
Original file line number Diff line number Diff line change
Expand Up @@ -1859,17 +1859,17 @@ SELECT sum(cpu) FROM f_sensor_data;
Output: (PARTIAL sum(_hyper_37_73_chunk.cpu))
Workers Planned: 4
-> Parallel Append
-> Partial Aggregate
Output: PARTIAL sum(_hyper_37_73_chunk.cpu)
-> Parallel Seq Scan on _timescaledb_internal._hyper_37_73_chunk
Output: _hyper_37_73_chunk.cpu
-> Custom Scan (VectorAgg)
Output: (PARTIAL sum(_hyper_37_73_chunk.cpu))
Grouping Policy: all compressed batches
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_73_chunk
Output: _hyper_37_73_chunk.cpu
-> Parallel Seq Scan on _timescaledb_internal.compress_hyper_38_74_chunk
Output: compress_hyper_38_74_chunk._ts_meta_count, compress_hyper_38_74_chunk.sensor_id, compress_hyper_38_74_chunk._ts_meta_min_1, compress_hyper_38_74_chunk._ts_meta_max_1, compress_hyper_38_74_chunk."time", compress_hyper_38_74_chunk.cpu, compress_hyper_38_74_chunk.temperature
-> Partial Aggregate
Output: PARTIAL sum(_hyper_37_73_chunk.cpu)
-> Parallel Seq Scan on _timescaledb_internal._hyper_37_73_chunk
Output: _hyper_37_73_chunk.cpu
(17 rows)

:explain
Expand All @@ -1880,15 +1880,15 @@ SELECT * FROM f_sensor_data WHERE sensor_id > 100;
Output: _hyper_37_73_chunk."time", _hyper_37_73_chunk.sensor_id, _hyper_37_73_chunk.cpu, _hyper_37_73_chunk.temperature
Workers Planned: 3
-> Parallel Append
-> Parallel Index Scan using _hyper_37_73_chunk_f_sensor_data_time_sensor_id_idx on _timescaledb_internal._hyper_37_73_chunk
Output: _hyper_37_73_chunk."time", _hyper_37_73_chunk.sensor_id, _hyper_37_73_chunk.cpu, _hyper_37_73_chunk.temperature
Index Cond: (_hyper_37_73_chunk.sensor_id > 100)
-> Custom Scan (DecompressChunk) on _timescaledb_internal._hyper_37_73_chunk
Output: _hyper_37_73_chunk."time", _hyper_37_73_chunk.sensor_id, _hyper_37_73_chunk.cpu, _hyper_37_73_chunk.temperature
Filter: (_hyper_37_73_chunk.sensor_id > 100)
-> Parallel Index Scan using compress_hyper_38_74_chunk_sensor_id__ts_meta_min_1__ts_met_idx on _timescaledb_internal.compress_hyper_38_74_chunk
Output: compress_hyper_38_74_chunk._ts_meta_count, compress_hyper_38_74_chunk.sensor_id, compress_hyper_38_74_chunk._ts_meta_min_1, compress_hyper_38_74_chunk._ts_meta_max_1, compress_hyper_38_74_chunk."time", compress_hyper_38_74_chunk.cpu, compress_hyper_38_74_chunk.temperature
Index Cond: (compress_hyper_38_74_chunk.sensor_id > 100)
-> Parallel Index Scan using _hyper_37_73_chunk_f_sensor_data_time_sensor_id_idx on _timescaledb_internal._hyper_37_73_chunk
Output: _hyper_37_73_chunk."time", _hyper_37_73_chunk.sensor_id, _hyper_37_73_chunk.cpu, _hyper_37_73_chunk.temperature
Index Cond: (_hyper_37_73_chunk.sensor_id > 100)
(13 rows)

-- Test non-partial paths below append are not executed multiple times
Expand Down Expand Up @@ -2802,7 +2802,7 @@ COPY compressed_table (time,a,b,c) FROM stdin;
ERROR: duplicate key value violates unique constraint "_hyper_49_108_chunk_compressed_table_index"
\set ON_ERROR_STOP 1
COPY compressed_table (time,a,b,c) FROM stdin;
SELECT * FROM compressed_table;
SELECT * FROM compressed_table ORDER BY a;
time | a | b | c
------------------------------------+----+---+---
Thu Feb 29 01:00:00 2024 PST | 5 | 1 | 1
Expand Down
Loading
Loading