Skip to content

Commit

Permalink
feat(compaction): per table vnode on compactor side (#19059)
Browse files Browse the repository at this point in the history
  • Loading branch information
Li0k authored Oct 31, 2024
1 parent 8457de2 commit 6defe4c
Show file tree
Hide file tree
Showing 25 changed files with 804 additions and 622 deletions.
2 changes: 1 addition & 1 deletion src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub async fn compute_node_serve(
compactor_context,
hummock_meta_client.clone(),
storage.sstable_object_id_manager().clone(),
storage.filter_key_extractor_manager().clone(),
storage.compaction_catalog_manager_ref().clone(),
);
sub_tasks.push((handle, shutdown_sender));
}
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/hummock/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ pub async fn setup_compute_env_with_metric(
compactor_streams_change_tx,
)
.await;

let fake_host_address = HostAddress {
host: "127.0.0.1".to_string(),
port,
Expand Down
28 changes: 22 additions & 6 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::ops::Range;
use std::sync::Arc;

Expand All @@ -32,6 +33,7 @@ use risingwave_object_store::object::object_metrics::ObjectStoreMetrics;
use risingwave_object_store::object::{InMemObjectStore, ObjectStore, ObjectStoreImpl};
use risingwave_pb::hummock::compact_task::PbTaskType;
use risingwave_pb::hummock::PbTableSchema;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::compactor::compactor_runner::compact_and_build_sst;
use risingwave_storage::hummock::compactor::{
ConcatSstableIterator, DummyCompactionFilter, TaskConfig, TaskProgress,
Expand Down Expand Up @@ -133,8 +135,13 @@ async fn build_table(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
let table_key_len = full_key.user_key.table_key.len();
Expand Down Expand Up @@ -177,8 +184,14 @@ async fn build_table_2(
policy: CachePolicy::Fill(CacheContext::Default),
},
);
let mut builder =
SstableBuilder::<_, Xor16FilterBuilder>::for_test(sstable_object_id, writer, opt);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();

Expand Down Expand Up @@ -273,8 +286,11 @@ async fn compact<I: HummockIterator<Direction = Forward>>(
bloom_false_positive: 0.001,
..Default::default()
};
let mut builder =
CapacitySplitTableBuilder::for_test(LocalTableBuilderFactory::new(32, sstable_store, opt));
let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);
let mut builder = CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(32, sstable_store, opt),
compaction_catalog_agent_ref,
);

let task_config = task_config.unwrap_or_else(|| TaskConfig {
key_range: KeyRange::inf(),
Expand Down
16 changes: 15 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::env;
use std::ops::Range;
use std::sync::atomic::AtomicU64;
Expand All @@ -24,11 +25,13 @@ use foyer::{Engine, HybridCacheBuilder};
use rand::random;
use risingwave_common::catalog::TableId;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
use risingwave_hummock_sdk::key::{FullKey, UserKey};
use risingwave_hummock_sdk::sstable_info::SstableInfo;
use risingwave_object_store::object::{
InMemObjectStore, ObjectStore, ObjectStoreImpl, S3ObjectStore,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{ConcatIterator, ConcatIteratorInner, HummockIterator};
use risingwave_storage::hummock::multi_builder::{CapacitySplitTableBuilder, TableBuilderFactory};
use risingwave_storage::hummock::value::HummockValue;
Expand Down Expand Up @@ -83,7 +86,11 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
.create_sst_writer(id, writer_options)
.await
.unwrap();
let builder = SstableBuilder::for_test(id, writer, self.options.clone());
let table_id_to_vnode = HashMap::from_iter(vec![(
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

Ok(builder)
}
Expand Down Expand Up @@ -192,6 +199,8 @@ fn bench_builder(

let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let mut group = c.benchmark_group("bench_multi_builder");
group
.sample_size(SAMPLE_COUNT)
Expand All @@ -205,6 +214,7 @@ fn bench_builder(
StreamingSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand All @@ -217,6 +227,7 @@ fn bench_builder(
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
})
});
Expand Down Expand Up @@ -249,13 +260,16 @@ fn bench_table_scan(c: &mut Criterion) {
let object_store = Arc::new(ObjectStoreImpl::InMem(store));
let sstable_store = runtime.block_on(async { generate_sstable_store(object_store).await });

let compaction_catalog_agent_ref = CompactionCatalogAgent::for_test(vec![0]);

let ssts = runtime.block_on(async {
build_tables(CapacitySplitTableBuilder::for_test(
LocalTableBuilderFactory::new(
1,
BatchSstableWriterFactory::new(sstable_store.clone()),
get_builder_options(capacity_mb),
),
compaction_catalog_agent_ref.clone(),
))
.await
});
Expand Down
29 changes: 9 additions & 20 deletions src/storage/compactor/src/compactor_observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::ObserverState;
use risingwave_pb::catalog::Table;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::SubscribeResponse;
use risingwave_storage::filter_key_extractor::{
FilterKeyExtractorImpl, FilterKeyExtractorManagerRef,
};
use risingwave_storage::compaction_catalog_manager::CompactionCatalogManagerRef;

pub struct CompactorObserverNode {
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
version: u64,
}
Expand Down Expand Up @@ -83,36 +78,30 @@ impl ObserverState for CompactorObserverNode {

impl CompactorObserverNode {
pub fn new(
filter_key_extractor_manager: FilterKeyExtractorManagerRef,
compaction_catalog_manager: CompactionCatalogManagerRef,
system_params_manager: LocalSystemParamsManagerRef,
) -> Self {
Self {
filter_key_extractor_manager,
compaction_catalog_manager,
system_params_manager,
version: 0,
}
}

fn handle_catalog_snapshot(&mut self, tables: Vec<Table>) {
let all_filter_key_extractors: HashMap<u32, Arc<FilterKeyExtractorImpl>> = tables
.iter()
.map(|t| (t.id, Arc::new(FilterKeyExtractorImpl::from_table(t))))
.collect();
self.filter_key_extractor_manager
.sync(all_filter_key_extractors);
self.compaction_catalog_manager
.sync(tables.into_iter().map(|t| (t.id, t)).collect());
}

fn handle_catalog_notification(&mut self, operation: Operation, table_catalog: Table) {
match operation {
Operation::Add | Operation::Update => {
self.filter_key_extractor_manager.update(
table_catalog.id,
Arc::new(FilterKeyExtractorImpl::from_table(&table_catalog)),
);
self.compaction_catalog_manager
.update(table_catalog.id, table_catalog);
}

Operation::Delete => {
self.filter_key_extractor_manager.remove(table_catalog.id);
self.compaction_catalog_manager.remove(table_catalog.id);
}

_ => panic!("receive an unsupported notify {:?}", operation),
Expand Down
14 changes: 6 additions & 8 deletions src/storage/compactor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use risingwave_pb::common::WorkerType;
use risingwave_pb::compactor::compactor_service_server::CompactorServiceServer;
use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
use risingwave_rpc_client::{GrpcCompactorProxyClient, MetaClient};
use risingwave_storage::filter_key_extractor::{
FilterKeyExtractorManager, RemoteTableAccessor, RpcFilterKeyExtractorManager,
use risingwave_storage::compaction_catalog_manager::{
CompactionCatalogManager, RemoteTableAccessor,
};
use risingwave_storage::hummock::compactor::{
new_compaction_await_tree_reg_ref, CompactionAwaitTreeRegRef, CompactionExecutor,
Expand Down Expand Up @@ -212,12 +212,13 @@ pub async fn compactor_serve(
compactor_metrics,
) = prepare_start_parameters(config.clone(), system_params_reader.clone()).await;

let filter_key_extractor_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new(
let compaction_catalog_manager_ref = Arc::new(CompactionCatalogManager::new(Box::new(
RemoteTableAccessor::new(meta_client.clone()),
)));

let system_params_manager = Arc::new(LocalSystemParamsManager::new(system_params_reader));
let compactor_observer_node = CompactorObserverNode::new(
filter_key_extractor_manager.clone(),
compaction_catalog_manager_ref.clone(),
system_params_manager.clone(),
);
let observer_manager =
Expand All @@ -234,9 +235,6 @@ pub async fn compactor_serve(
hummock_meta_client.clone(),
storage_opts.sstable_id_remote_fetch_number,
));
let filter_key_extractor_manager = FilterKeyExtractorManager::RpcFilterKeyExtractorManager(
filter_key_extractor_manager.clone(),
);

let compaction_executor = Arc::new(CompactionExecutor::new(
opts.compaction_worker_threads_number,
Expand All @@ -263,7 +261,7 @@ pub async fn compactor_serve(
compactor_context.clone(),
hummock_meta_client.clone(),
sstable_object_id_manager.clone(),
filter_key_extractor_manager.clone(),
compaction_catalog_manager_ref,
),
];

Expand Down
12 changes: 5 additions & 7 deletions src/storage/hummock_test/src/bin/replay/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use risingwave_hummock_trace::{
use risingwave_meta::hummock::test_utils::setup_compute_env;
use risingwave_meta::hummock::MockHummockMetaClient;
use risingwave_object_store::object::build_remote_object_store;
use risingwave_storage::filter_key_extractor::{
FakeRemoteTableAccessor, RpcFilterKeyExtractorManager,
use risingwave_storage::compaction_catalog_manager::{
CompactionCatalogManager, FakeRemoteTableAccessor,
};
use risingwave_storage::hummock::{HummockStorage, SstableStore, SstableStoreConfig};
use risingwave_storage::monitor::{CompactorMetrics, HummockStateStoreMetrics, ObjectStoreMetrics};
Expand Down Expand Up @@ -166,16 +166,14 @@ async fn create_replay_hummock(r: Record, args: &Args) -> Result<impl GlobalRepl
)
};

let key_filter_manager = Arc::new(RpcFilterKeyExtractorManager::new(Box::new(
FakeRemoteTableAccessor {},
)));

let storage = HummockStorage::new(
storage_opts,
sstable_store,
hummock_meta_client.clone(),
notification_client,
key_filter_manager,
Arc::new(CompactionCatalogManager::new(Box::new(
FakeRemoteTableAccessor {},
))),
state_store_metrics,
compactor_metrics,
None,
Expand Down
Loading

0 comments on commit 6defe4c

Please sign in to comment.