Skip to content

Commit

Permalink
fix: properly clean tiered state upon flash (#3281)
Browse files Browse the repository at this point in the history
* fix: properly clean tiered state upon flash

The bug was around io pending entries that have not been properly cleaned during flush.
This PR simplified the logic around tiered storage handling during flush, it always performs the
cleaning in the synchronous part of the command.

In addition, this PR improves error logging in tests if dragonfly process exits with an error.
Finally, a test is added that makes sure pending tiered items are flushed during the flash call.

Fixes #3252
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Jul 8, 2024
1 parent 4fd6ba6 commit fba902d
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 107 deletions.
39 changes: 20 additions & 19 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -708,14 +708,10 @@ void DbSlice::FlushSlots(cluster::SlotRanges slot_ranges) {
}

void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
// Async cleanup can only be performed if no tiered entries exist
bool async_cleanup = true;
for (DbIndex index : indexes) {
async_cleanup &= db_arr_[index]->stats.tiered_entries == 0;
}
bool clear_tiered = owner_->tiered_storage() != nullptr;

if (!async_cleanup)
ClearEntriesOnFlush(indexes, db_arr_, false);
if (clear_tiered)
ClearOffloadedEntries(indexes, db_arr_);

DbTableArray flush_db_arr(db_arr_.size());
for (DbIndex index : indexes) {
Expand All @@ -729,9 +725,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}

CHECK(fetched_items_.empty());
auto cb = [this, async_cleanup, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
if (async_cleanup)
ClearEntriesOnFlush(indexes, flush_db_arr, true);
auto cb = [this, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
ServerState::kGlibcmalloc);
Expand Down Expand Up @@ -1408,24 +1402,31 @@ void DbSlice::InvalidateSlotWatches(const cluster::SlotSet& slot_ids) {
}
}

void DbSlice::ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
bool async) {
for (auto index : indices) {
void DbSlice::ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr) {
// Currently being used only for tiered storage.
TieredStorage* tiered_storage = shard_owner()->tiered_storage();
string scratch;
for (DbIndex index : indices) {
const auto& db_ptr = db_arr[index];
if (!db_ptr || db_ptr->stats.tiered_entries == 0)
if (!db_ptr)
continue;

// Delete all tiered entries
PrimeTable::Cursor cursor;
do {
cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) {
if (it->second.IsExternal())
PerformDeletion(it, db_ptr.get());
if (it->second.IsExternal()) {
tiered_storage->Delete(index, &it->second);
} else if (it->second.HasIoPending()) {
tiered_storage->CancelStash(index, it->first.GetSlice(&scratch), &it->second);
}
});
} while (cursor && db_ptr->stats.tiered_entries > 0);
} while (cursor);

// Wait for delete operations to finish in sync
while (!async && db_ptr->stats.tiered_entries > 0) {
// Wait for delete operations to finish in sync.
// TODO: the logic inside tiered_storage that updates tiered_entries is somewhat fragile.
// To revisit it, otherwise we may have deadlocks around this code.
while (db_ptr->stats.tiered_entries > 0) {
LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush";
ThisFiber::SleepFor(1ms);
}
Expand Down
6 changes: 2 additions & 4 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,10 +494,8 @@ class DbSlice {
// Invalidate all watched keys for given slots. Used on FlushSlots.
void InvalidateSlotWatches(const cluster::SlotSet& slot_ids);

// Properly clear db_arr before deleting it. If async is set, it's called from a detached fiber
// after swapping the db.
void ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTableArray& db_arr,
bool async);
// Clear tiered storage entries for the specified indices.
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);

void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table);

Expand Down
7 changes: 7 additions & 0 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ TEST_F(DflyEngineTest, MemcacheFlags) {
ASSERT_EQ(resp, "OK");
MCResponse resp2 = RunMC(MP::GET, "key");
EXPECT_THAT(resp2, ElementsAre("VALUE key 42 3", "bar", "END"));

ASSERT_EQ(Run("resp", {"flushdb"}), "OK");
pp_->AwaitFiberOnAll([](auto*) {
if (auto* shard = EngineShard::tlocal(); shard) {
EXPECT_EQ(shard->db_slice().GetDBTable(0)->mcflag.size(), 0u);
}
});
}

TEST_F(DflyEngineTest, LimitMemory) {
Expand Down
121 changes: 64 additions & 57 deletions src/server/tiered_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_cache_fetched);
}

// Called before overriding value with segment
void RecordAdded(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed());
stats->tiered_entries++;
stats->tiered_used_bytes += segment.length;
}

// Called after setting new value in place of previous segment
void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) {
stats->AddTypeMemoryUsage(pv.ObjType(), pv.MallocUsed());
stats->tiered_entries--;
stats->tiered_used_bytes -= segment.length;
}

// Find entry by key in db_slice and store external segment in place of original value.
// Update memory stats
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
RecordAdded(db_slice_->MutableStats(key.first), *pv, segment);
RecordAdded(db_slice_->MutableStats(key.first), *pv, segment.length);

pv->SetIoPending(false);
pv->SetExternal(segment.offset, segment.length);
Expand Down Expand Up @@ -113,9 +99,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
if (!value.empty())
pv->SetString(value);

RecordDeleted(db_slice_->MutableStats(dbid), *pv, segment);

(value.empty() ? stats_.total_deletes : stats_.total_fetches)++;
RecordDeleted(db_slice_->MutableStats(dbid), *pv, segment.length);
}

// Find entry by key and store it's up-to-date value in place of external segment.
Expand All @@ -129,25 +113,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

// Load all values from bin by their hashes
void Defragment(tiering::DiskSegment segment, string_view value) {
// Note: Bin could've already been deleted, in that case DeleteBin returns an empty list
for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) {
// Search for key with the same hash and value pointing to the same segment.
// If it still exists, it must correspond to the value stored in this bin
auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment;
};
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
if (!IsValid(it))
continue;

stats_.total_defrags++;

// Cut out relevant part of value and restore it to memory
string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length);
SetInMemory(&it->second, dbid, sub_value, sub_segment);
}
}
void Defragment(tiering::DiskSegment segment, string_view value);

void ReportStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override {
if (ec) {
Expand All @@ -159,21 +125,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}

bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override {
if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
Defragment(segment, value);
return true; // delete
}

if (!modified && !cache_fetched_)
return false;

if (SliceSnapshot::IsSnaphotInProgress())
return false;

SetInMemory(get<OpManager::KeyRef>(id), value, segment);
return true;
}
bool modified) override;

bool ReportDelete(tiering::DiskSegment segment) override {
if (OccupiesWholePages(segment.length))
Expand Down Expand Up @@ -203,17 +155,70 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
return IsValid(it) ? &it->second : nullptr;
}

// Called before overriding value with segment
void RecordAdded(DbTableStats* stats, const PrimeValue& pv, size_t tiered_len) {
stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed());
stats->tiered_entries++;
stats->tiered_used_bytes += tiered_len;
}

// Called after setting new value in place of previous segment
void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, size_t tiered_len) {
stats->AddTypeMemoryUsage(pv.ObjType(), pv.MallocUsed());
stats->tiered_entries--;
stats->tiered_used_bytes -= tiered_len;
}

bool cache_fetched_ = false;

struct {
size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0;
size_t total_defrags = 0; // included in total_fetches
size_t total_stashes = 0, total_cancels = 0, total_fetches = 0;
size_t total_defrags = 0;
} stats_;

TieredStorage* ts_;
DbSlice* db_slice_;
};

void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view value) {
// Note: Bin could've already been deleted, in that case DeleteBin returns an empty list
for (auto [dbid, hash, sub_segment] : ts_->bins_->DeleteBin(segment, value)) {
// Search for key with the same hash and value pointing to the same segment.
// If it still exists, it must correspond to the value stored in this bin
auto predicate = [sub_segment = sub_segment](const PrimeKey& key, const PrimeValue& probe) {
return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == sub_segment;
};
auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate);
if (!IsValid(it))
continue;

stats_.total_defrags++;

// Cut out relevant part of value and restore it to memory
string_view sub_value = value.substr(sub_segment.offset - segment.offset, sub_segment.length);
SetInMemory(&it->second, dbid, sub_value, sub_segment);
}
}

bool TieredStorage::ShardOpManager::ReportFetched(EntryId id, string_view value,
tiering::DiskSegment segment, bool modified) {
++stats_.total_fetches;

if (id == EntryId{kFragmentedBin}) { // Generally we read whole bins only for defrag
Defragment(segment, value);
return true; // delete
}

if (!modified && !cache_fetched_)
return false;

if (SliceSnapshot::IsSnaphotInProgress())
return false;

SetInMemory(get<OpManager::KeyRef>(id), value, segment);
return true;
}

TieredStorage::TieredStorage(DbSlice* db_slice, size_t max_size)
: op_manager_{make_unique<ShardOpManager>(this, db_slice, max_size)},
bins_{make_unique<tiering::SmallBins>()} {
Expand Down Expand Up @@ -276,7 +281,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
return false;

// This invariant should always hold because ShouldStash tests for IoPending flag.
CHECK(!bins_->IsPending(dbid, key));
DCHECK(!bins_->IsPending(dbid, key));

// TODO: When we are low on memory we should introduce a back-pressure, to avoid OOMs
// with a lot of underutilized disk space.
Expand Down Expand Up @@ -310,9 +315,11 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {

void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
DCHECK(value->IsExternal());
++stats_.total_deletes;

tiering::DiskSegment segment = value->GetExternalSlice();
op_manager_->Delete(segment);
op_manager_->SetInMemory(value, dbid, "", segment);
op_manager_->DeleteOffloaded(segment);
op_manager_->SetInMemory(value, dbid, string_view{}, segment);
}

void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {
Expand Down
1 change: 1 addition & 0 deletions src/server/tiered_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class TieredStorage {
unsigned write_depth_limit_ = 10;
struct {
uint64_t stash_overflow_cnt = 0;
uint64_t total_deletes = 0;
} stats_;
};

Expand Down
Loading

0 comments on commit fba902d

Please sign in to comment.