Skip to content

Commit

Permalink
Separate derived compression settings
Browse files Browse the repository at this point in the history
  • Loading branch information
lucagiac81 committed Jan 11, 2024
1 parent 45a3eca commit 0f12bbe
Show file tree
Hide file tree
Showing 17 changed files with 105 additions and 90 deletions.
2 changes: 1 addition & 1 deletion db/blob/blob_file_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ BlobFileBuilder::BlobFileBuilder(
immutable_options_(immutable_options),
min_blob_size_(mutable_cf_options->min_blob_size),
blob_file_size_(mutable_cf_options->blob_file_size),
blob_compressor_(mutable_cf_options->blob_compressor),
blob_compressor_(mutable_cf_options->derived_blob_compressor),
prepopulate_blob_cache_(mutable_cf_options->prepopulate_blob_cache),
file_options_(file_options),
db_id_(std::move(db_id)),
Expand Down
24 changes: 13 additions & 11 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,18 @@ void GetIntTblPropCollectorFactory(
Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
MutableCFOptions moptions(cf_options);
ImmutableCFOptions ioptions(cf_options);
if (moptions.compressor && !moptions.compressor->Supported()) {
if (moptions.derived_compressor &&
!moptions.derived_compressor->Supported()) {
return Status::InvalidArgument("Compression type " +
moptions.compressor->GetId() +
moptions.derived_compressor->GetId() +
" is not linked with the binary.");
} else if (moptions.bottommost_compressor &&
!moptions.bottommost_compressor->Supported()) {
return Status::InvalidArgument("Compression type " +
moptions.bottommost_compressor->GetId() +
" is not linked with the binary.");
} else if (!moptions.compressor_per_level.empty()) {
for (const auto& compressor : moptions.compressor_per_level) {
} else if (moptions.derived_bottommost_compressor &&
!moptions.derived_bottommost_compressor->Supported()) {
return Status::InvalidArgument(
"Compression type " + moptions.derived_bottommost_compressor->GetId() +
" is not linked with the binary.");
} else if (!moptions.derived_compressor_per_level.empty()) {
for (const auto& compressor : moptions.derived_compressor_per_level) {
if (compressor == nullptr) {
return Status::InvalidArgument("Compression type is invalid.");
} else if (!compressor->Supported()) {
Expand Down Expand Up @@ -170,9 +171,10 @@ Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) {
"should be nonzero if we're using zstd's dictionary generator.");
}
}
if (moptions.blob_compressor && !moptions.blob_compressor->Supported()) {
if (moptions.derived_blob_compressor &&
!moptions.derived_blob_compressor->Supported()) {
return Status::InvalidArgument("Blob compression type " +
moptions.blob_compressor->GetId() +
moptions.derived_blob_compressor->GetId() +
" is not linked with the binary.");
}
return Status::OK();
Expand Down
13 changes: 7 additions & 6 deletions db/compaction/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ std::shared_ptr<Compressor> GetCompressor(const VersionStorageInfo* vstorage,
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use it.
bool bottom_level = (level >= (vstorage->num_non_empty_levels() - 1));
if (moptions.bottommost_compressor != nullptr && bottom_level) {
return moptions.bottommost_compressor;
if (moptions.derived_bottommost_compressor != nullptr && bottom_level) {
return moptions.derived_bottommost_compressor;
}
// If the user has specified a different compression level for each level,
// then pick the compression for that level.
if (!moptions.compressor_per_level.empty()) {
if (!moptions.derived_compressor_per_level.empty()) {
// It is possible for level_ to be -1; in that case, we use level
// 0's compression. This occurs mostly in backwards compatibility
// situations when the builder doesn't know what level the file
Expand All @@ -101,7 +101,8 @@ std::shared_ptr<Compressor> GetCompressor(const VersionStorageInfo* vstorage,
assert(level == 0 || level >= base_level);
int lvl = std::max(0, level - base_level + 1);
int idx = std::min(
static_cast<int>(moptions.compressor_per_level.size()) - 1, lvl);
static_cast<int>(moptions.derived_compressor_per_level.size()) - 1,
lvl);
// If not specified directly by the user, compressors in
// compressor_per_level are instantiated using compression_opts. If the user
// enabled bottommost_compression_opts, we need to create a new compressor
Expand All @@ -112,10 +113,10 @@ std::shared_ptr<Compressor> GetCompressor(const VersionStorageInfo* vstorage,
moptions.compression_per_level[idx],
moptions.bottommost_compression_opts);
} else {
return moptions.compressor_per_level[idx];
return moptions.derived_compressor_per_level[idx];
}
} else {
return moptions.compressor;
return moptions.derived_compressor;
}
}

Expand Down
4 changes: 2 additions & 2 deletions db/compaction/compaction_picker_fifo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
{comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compressor,
0 /* output path ID */, mutable_cf_options.derived_compressor,
Temperature::kUnknown, 0 /* max_subcompactions */, {},
/* is manual */ false, /* trim_ts */ "",
vstorage->CompactionScore(0),
Expand Down Expand Up @@ -416,7 +416,7 @@ Compaction* FIFOCompactionPicker::PickTemperatureChangeCompaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0 /* output file size limit */,
0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
mutable_cf_options.compressor, compaction_target_temp,
mutable_cf_options.derived_compressor, compaction_target_temp,
/* max_subcompactions */ 0, {}, /* is manual */ false, /* trim_ts */ "",
vstorage->CompactionScore(0),
/* is deletion compaction */ false, /* l0_files_might_overlap */ true,
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ std::shared_ptr<Compressor> GetCompressionFlush(
// latency overhead is not offset by saving much space.
if (ioptions.compaction_style == kCompactionStyleUniversal) {
if (moptions.compaction_options_universal.compression_size_percent < 0) {
return moptions.compressor;
return moptions.derived_compressor;
} else {
return BuiltinCompressor::GetCompressor(kNoCompression);
}
} else if (moptions.compressor_per_level.empty()) {
return moptions.compressor;
} else if (moptions.derived_compressor_per_level.empty()) {
return moptions.derived_compressor;
} else {
return moptions.compressor_per_level[0];
return moptions.derived_compressor_per_level[0];
}
}

Expand Down
7 changes: 4 additions & 3 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1931,8 +1931,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
->compaction_style) /* output file size limit, not applicable */
,
LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compressor,
Temperature::kUnknown, 0 /* max_subcompactions, not applicable */,
0 /* output path ID, not applicable */,
mutable_cf_options.derived_compressor, Temperature::kUnknown,
0 /* max_subcompactions, not applicable */,
{} /* grandparents, not applicable */, false /* is manual */,
"" /* trim_ts */, -1 /* score, not applicable */,
false /* is deletion compaction, not applicable */,
Expand Down Expand Up @@ -4155,7 +4156,7 @@ void DBImpl::BuildCompactionJobInfo(
newf.first, file_number, meta.oldest_blob_file_number});
}
compaction_job_info->blob_compression_type =
c->mutable_cf_options()->blob_compressor->GetCompressionType();
c->mutable_cf_options()->derived_blob_compressor->GetCompressionType();

// Update BlobFilesInfo.
for (const auto& blob_file : c->edit()->GetBlobFileAdditions()) {
Expand Down
5 changes: 3 additions & 2 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,8 +550,9 @@ void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() {
*/
,
LLONG_MAX /* max compaction bytes, not applicable */,
0 /* output path ID, not applicable */, mutable_cf_options.compressor,
Temperature::kUnknown, 0 /* max_subcompaction, not applicable */,
0 /* output path ID, not applicable */,
mutable_cf_options.derived_compressor, Temperature::kUnknown,
0 /* max_subcompaction, not applicable */,
{} /* grandparents, not applicable */, false /* is manual */,
"" /* trim_ts */, -1 /* score, not applicable */,
false /* is deletion compaction, not applicable */,
Expand Down
2 changes: 1 addition & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
info->table_properties = table_properties_;
info->flush_reason = flush_reason_;
info->blob_compression_type =
mutable_cf_options_.blob_compressor->GetCompressionType();
mutable_cf_options_.derived_blob_compressor->GetCompressionType();

// Update BlobFilesInfo.
for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
Expand Down
8 changes: 4 additions & 4 deletions db/table_properties_collector_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ void MakeBuilder(
std::unique_ptr<FSWritableFile> wf(new test::StringSink);
writable->reset(
new WritableFileWriter(std::move(wf), "" /* don't care */, EnvOptions()));
TableBuilderOptions tboptions(ioptions, moptions, internal_comparator,
int_tbl_prop_collector_factories,
moptions.compressor, kTestColumnFamilyId,
kTestColumnFamilyName, kTestLevel);
TableBuilderOptions tboptions(
ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories,
moptions.derived_compressor, kTestColumnFamilyId, kTestColumnFamilyName,
kTestLevel);
builder->reset(NewTableBuilder(tboptions, writable->get()));
}
} // namespace
Expand Down
37 changes: 25 additions & 12 deletions options/cf_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1211,41 +1211,54 @@ void MutableCFOptions::RefreshDerivedOptions(int num_levels,
}
}

derived_compressor.reset();
if (compressor == nullptr) {
compressor =
derived_compressor =
BuiltinCompressor::GetCompressor(compression, compression_opts);
}
if (compressor == nullptr) {
compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
if (derived_compressor == nullptr) {
derived_compressor = BuiltinCompressor::GetCompressor(kSnappyCompression);
}
} else {
derived_compressor = compressor;
}

derived_bottommost_compressor.reset();
if (bottommost_compressor == nullptr) {
if (bottommost_compression != kDisableCompressionOption) {
if (bottommost_compression_opts.enabled) {
bottommost_compressor = BuiltinCompressor::GetCompressor(
derived_bottommost_compressor = BuiltinCompressor::GetCompressor(
bottommost_compression, bottommost_compression_opts);
} else {
bottommost_compressor = BuiltinCompressor::GetCompressor(
derived_bottommost_compressor = BuiltinCompressor::GetCompressor(
bottommost_compression, compression_opts);
}
}
} else {
derived_bottommost_compressor = bottommost_compressor;
}

derived_blob_compressor.reset();
if (blob_compressor == nullptr) {
if (blob_compression_type != kDisableCompressionOption) {
blob_compressor = BuiltinCompressor::GetCompressor(blob_compression_type,
compression_opts);
derived_blob_compressor = BuiltinCompressor::GetCompressor(
blob_compression_type, compression_opts);
}
}
if (blob_compressor == nullptr) {
blob_compressor = BuiltinCompressor::GetCompressor(kNoCompression);
if (derived_blob_compressor == nullptr) {
derived_blob_compressor =
BuiltinCompressor::GetCompressor(kNoCompression);
}
} else {
derived_blob_compressor = blob_compressor;
}

derived_compressor_per_level.clear();
if (compressor_per_level.empty()) {
for (auto type : compression_per_level) {
compressor_per_level.push_back(
derived_compressor_per_level.push_back(
BuiltinCompressor::GetCompressor(type, compression_opts));
}
} else {
derived_compressor_per_level = compressor_per_level;
}
}

Expand Down
4 changes: 4 additions & 0 deletions options/cf_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ struct MutableCFOptions {
// Derived options
// Per-level target file size.
std::vector<uint64_t> max_file_size;
std::shared_ptr<Compressor> derived_compressor;
std::shared_ptr<Compressor> derived_bottommost_compressor;
std::shared_ptr<Compressor> derived_blob_compressor;
std::vector<std::shared_ptr<Compressor>> derived_compressor_per_level;
};

uint64_t MultiplyCheckOverflow(uint64_t op1, double op2);
Expand Down
26 changes: 15 additions & 11 deletions options/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,17 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
MutableCFOptions moptions(*this);
ConfigOptions config_options;

if (!moptions.compressor_per_level.empty()) {
for (unsigned int i = 0; i < moptions.compressor_per_level.size(); i++) {
ROCKS_LOG_HEADER(log, " Options.compression[%d]: %s", i,
moptions.compressor_per_level[i]->GetId().c_str());
if (!moptions.derived_compressor_per_level.empty()) {
for (unsigned int i = 0; i < moptions.derived_compressor_per_level.size();
i++) {
ROCKS_LOG_HEADER(
log, " Options.compression[%d]: %s", i,
moptions.derived_compressor_per_level[i]->GetId().c_str());
}
} else if (moptions.compressor) {
ROCKS_LOG_HEADER(log, " Options.compression: %s",
moptions.compressor->ToString(config_options).c_str());
} else if (moptions.derived_compressor) {
ROCKS_LOG_HEADER(
log, " Options.compression: %s",
moptions.derived_compressor->ToString(config_options).c_str());
} else {
ROCKS_LOG_HEADER(
log, " Options.compression: %s",
Expand Down Expand Up @@ -228,10 +231,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
"%" PRIu64,
compression_opts.max_dict_buffer_bytes);
}
if (moptions.bottommost_compressor) {
if (moptions.derived_bottommost_compressor) {
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression: %s",
moptions.bottommost_compressor->ToString(config_options).c_str());
moptions.derived_bottommost_compressor->ToString(config_options)
.c_str());
} else {
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression: %s",
Expand Down Expand Up @@ -451,9 +455,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log,
" Options.blob_file_size: %" PRIu64,
blob_file_size);
if (moptions.blob_compressor) {
if (moptions.derived_blob_compressor) {
ROCKS_LOG_HEADER(log, " Options.blob_compression: %s",
moptions.blob_compressor->GetId().c_str());
moptions.derived_blob_compressor->GetId().c_str());
} else {
ROCKS_LOG_HEADER(
log, " Options.blob_compression_type: %s",
Expand Down
2 changes: 2 additions & 0 deletions options/options_settable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
sizeof(std::vector<std::shared_ptr<Compressor>>)},
{offsetof(struct MutableCFOptions, max_file_size),
sizeof(std::vector<uint64_t>)},
{offsetof(struct MutableCFOptions, derived_compressor_per_level),
sizeof(std::vector<std::shared_ptr<Compressor>>)},
};

// For all memory used for options, pre-fill every char. Otherwise, the
Expand Down
2 changes: 1 addition & 1 deletion table/block_based/data_block_hash_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, internal_comparator,
&int_tbl_prop_collector_factories, moptions.compressor,
&int_tbl_prop_collector_factories, moptions.derived_compressor,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
column_family_name, level_),
file_writer.get()));
Expand Down
12 changes: 6 additions & 6 deletions table/sst_file_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,14 @@ Status SstFileWriter::Open(const std::string& file_path) {
sst_file->SetIOPriority(r->io_priority);

std::shared_ptr<Compressor> compressor;
if (r->mutable_cf_options.bottommost_compressor != nullptr) {
compressor = r->mutable_cf_options.bottommost_compressor;
} else if (r->mutable_cf_options.compressor_per_level.empty()) {
compressor = r->mutable_cf_options.compressor;
if (r->mutable_cf_options.derived_bottommost_compressor != nullptr) {
compressor = r->mutable_cf_options.derived_bottommost_compressor;
} else if (r->mutable_cf_options.derived_compressor_per_level.empty()) {
compressor = r->mutable_cf_options.derived_compressor;
} else {
// Use the compression of the last level if we have per level compression
auto levels = r->mutable_cf_options.compressor_per_level.size();
compressor = r->mutable_cf_options.compressor_per_level[levels - 1];
auto levels = r->mutable_cf_options.derived_compressor_per_level.size();
compressor = r->mutable_cf_options.derived_compressor_per_level[levels - 1];
}

IntTblPropCollectorFactories int_tbl_prop_collector_factories;
Expand Down
16 changes: 9 additions & 7 deletions table/table_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ class TableConstructor : public Constructor {
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, internal_comparator,
&int_tbl_prop_collector_factories,
moptions.compressor, kUnknownColumnFamily,
moptions.derived_compressor, kUnknownColumnFamily,
column_family_name, level_),
file_writer_.get()));

Expand Down Expand Up @@ -3995,9 +3995,10 @@ TEST_P(BlockBasedTableTest, NoFileChecksum) {
f.CreateWritableFile();
std::unique_ptr<TableBuilder> builder;
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, *comparator, &int_tbl_prop_collector_factories,
moptions.compressor, kUnknownColumnFamily, column_family_name, level),
TableBuilderOptions(ioptions, moptions, *comparator,
&int_tbl_prop_collector_factories,
moptions.derived_compressor, kUnknownColumnFamily,
column_family_name, level),
f.GetFileWriter()));
ASSERT_OK(f.ResetTableBuilder(std::move(builder)));
f.AddKVtoKVMap(1000);
Expand Down Expand Up @@ -4030,9 +4031,10 @@ TEST_P(BlockBasedTableTest, Crc32cFileChecksum) {
f.SetFileChecksumGenerator(checksum_crc32c_gen1.release());
std::unique_ptr<TableBuilder> builder;
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, *comparator, &int_tbl_prop_collector_factories,
moptions.compressor, kUnknownColumnFamily, column_family_name, level),
TableBuilderOptions(ioptions, moptions, *comparator,
&int_tbl_prop_collector_factories,
moptions.derived_compressor, kUnknownColumnFamily,
column_family_name, level),
f.GetFileWriter()));
ASSERT_OK(f.ResetTableBuilder(std::move(builder)));
f.AddKVtoKVMap(1000);
Expand Down
Loading

0 comments on commit 0f12bbe

Please sign in to comment.