Skip to content

Commit

Permalink
fix data loss caused by atomic group being partially purged (#314)
Browse files Browse the repository at this point in the history
* remember atomic group in memtable

Signed-off-by: tabokie <xy.tao@outlook.com>

* add tests

Signed-off-by: tabokie <xy.tao@outlook.com>

* patch changelog

Signed-off-by: tabokie <xy.tao@outlook.com>

---------

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie authored Jun 16, 2023
1 parent c9a95c8 commit 2c76690
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 15 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@

* `LogBatch::put` returns a `Result<()>` instead of `()`. It errs when the key is reserved for internal use.
* Possible to specify a permission in `FileSystem::open`.
* Prometheus counter `raft_engine_log_file_count` no longer includes retired log files that are stashed for recycling. Those files are now tracked by a new counter `raft_engine_recycled_file_count`.

### Bug Fixes

* Fix data loss caused by aborted rewrite operation. Downgrading to an earlier version without the fix may produce phantom Raft Groups or keys, i.e. never written but appear in queries.

### New Features

* Support preparing prefilled logs to enable log recycling when start-up.
* Support preparing prefilled logs to enable log recycling when start-up. The amount of logs to prepare is controlled by `Config::prefill_limit`.
* Add a new configuration `spill-dir` to allow automatic placement of logs into an auxiliary directory when `dir` is full.
* Add a new method `Engine::fork` to duplicate an `Engine` to a new place, with a few disk file copies.

Expand Down
3 changes: 2 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2417,6 +2417,7 @@ pub(crate) mod tests {
dir: dir.path().to_str().unwrap().to_owned(),
// Make sure each file gets replayed individually.
recovery_threads: 100,
target_file_size: ReadableSize(1),
..Default::default()
};
let fs = Arc::new(ObfuscatedFileSystem::default());
Expand Down Expand Up @@ -2550,7 +2551,7 @@ pub(crate) mod tests {
assert!(data.remove(&rid), "{}", rid);
assert_eq!(engine.get(rid, &key).unwrap(), value);
}
assert!(data.is_empty());
assert!(data.is_empty(), "data loss {:?}", data);
}

#[test]
Expand Down
69 changes: 64 additions & 5 deletions src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ pub struct MemTable<A: AllocatorTrait> {
/// A map of active key value pairs.
kvs: BTreeMap<Vec<u8>, (Vec<u8>, FileId)>,

/// (start_seq, end_seq).
/// If there's an active entry stored before end_seq, it possibly belongs to
/// an atomic group. In order to not lose this entry, We cannot delete any
/// other entries in that group.
/// Only applies to Rewrite queue. Each Raft Group has at most one atomic
/// group at a time, because we only use atomic group for rewrite-rewrite
/// operation, a group always contains all the Rewrite entries in a Raft
/// Group.
atomic_group: Option<(FileSeq, FileSeq)>,

/// Shared statistics.
global_stats: Arc<GlobalStats>,

Expand Down Expand Up @@ -183,6 +193,7 @@ impl<A: AllocatorTrait> MemTable<A> {
first_index: 0,
rewrite_count: 0,
kvs: BTreeMap::default(),
atomic_group: None,
global_stats,
_phantom: PhantomData,
}
Expand Down Expand Up @@ -216,6 +227,11 @@ impl<A: AllocatorTrait> MemTable<A> {
self.put(key.clone(), value.clone(), *file_id);
}

if let Some(g) = rhs.atomic_group.take() {
assert!(self.atomic_group.map_or(true, |(_, end)| end <= g.0));
self.atomic_group = Some(g);
}

let deleted = rhs.global_stats.deleted_rewrite_entries();
self.global_stats.add(LogQueue::Rewrite, deleted);
self.global_stats.delete(LogQueue::Rewrite, deleted);
Expand Down Expand Up @@ -251,6 +267,8 @@ impl<A: AllocatorTrait> MemTable<A> {
self.put(key.clone(), value.clone(), *file_id);
}

assert!(rhs.atomic_group.is_none());

let deleted = rhs.global_stats.deleted_rewrite_entries();
self.global_stats.add(LogQueue::Rewrite, deleted);
self.global_stats.delete(LogQueue::Rewrite, deleted);
Expand Down Expand Up @@ -526,6 +544,11 @@ impl<A: AllocatorTrait> MemTable<A> {
count as u64
}

pub fn apply_rewrite_atomic_group(&mut self, start: FileSeq, end: FileSeq) {
assert!(self.atomic_group.map_or(true, |(_, b)| b <= start));
self.atomic_group = Some((start, end));
}

/// Removes all entry indexes with index greater than or equal to `index`.
/// Assumes `index` <= `last`.
///
Expand Down Expand Up @@ -719,12 +742,20 @@ impl<A: AllocatorTrait> MemTable<A> {
Some(v.1.seq)
}
});
match (ents_min, kvs_min) {
(Some(ents_min), Some(kvs_min)) => Some(std::cmp::min(kvs_min, ents_min)),
(Some(ents_min), None) => Some(ents_min),
(None, Some(kvs_min)) => Some(kvs_min),
(None, None) => None,
let res = match (ents_min, kvs_min) {
(Some(ents_min), Some(kvs_min)) => std::cmp::min(kvs_min, ents_min),
(Some(ents_min), None) => ents_min,
(None, Some(kvs_min)) => kvs_min,
(None, None) => return None,
};
if queue == LogQueue::Rewrite {
if let Some((start, end)) = self.atomic_group {
if res <= end {
return Some(std::cmp::min(start, res));
}
}
}
Some(res)
}

#[inline]
Expand Down Expand Up @@ -1154,6 +1185,11 @@ impl<A: AllocatorTrait> MemTableAccessor<A> {
}
}

pub fn apply_rewrite_atomic_group(&self, raft: u64, start: FileSeq, end: FileSeq) {
let memtable = self.get_or_insert(raft);
memtable.write().apply_rewrite_atomic_group(start, end);
}

#[inline]
fn slot_index(id: u64) -> usize {
debug_assert!(MEMTABLE_SLOT_COUNT.is_power_of_two());
Expand All @@ -1171,6 +1207,8 @@ struct PendingAtomicGroup {
status: AtomicGroupStatus,
items: Vec<LogItem>,
tombstone_items: Vec<LogItem>,
start: FileSeq,
end: FileSeq,
}

pub struct MemTableRecoverContext<A: AllocatorTrait> {
Expand Down Expand Up @@ -1256,20 +1294,39 @@ impl<A: AllocatorTrait> MemTableRecoverContext<A> {
| (AtomicGroupStatus::Middle, AtomicGroupStatus::Middle) => {
group.items.append(&mut new_group.items);
group.tombstone_items.append(&mut new_group.tombstone_items);
assert!(group.end <= new_group.start);
group.end = new_group.end;
}
(AtomicGroupStatus::Middle, AtomicGroupStatus::End) => {
group.items.append(&mut new_group.items);
group.tombstone_items.append(&mut new_group.tombstone_items);
group.status = new_group.status;
assert!(group.end <= new_group.start);
group.end = new_group.end;
}
(AtomicGroupStatus::Begin, AtomicGroupStatus::End) => {
let mut group = groups.pop().unwrap();
let mut rids = HashSet::with_capacity(1);
for item in group
.items
.iter()
.chain(group.tombstone_items.iter())
.chain(new_group.items.iter())
.chain(new_group.tombstone_items.iter())
{
rids.insert(item.raft_group_id);
}
self.tombstone_items.append(&mut group.tombstone_items);
self.tombstone_items.append(&mut new_group.tombstone_items);
self.memtables
.replay_rewrite_writes(group.items.into_iter());
self.memtables
.replay_rewrite_writes(new_group.items.into_iter());
assert!(group.end <= new_group.start);
for rid in rids {
self.memtables
.apply_rewrite_atomic_group(rid, group.start, new_group.end);
}
}
}
if groups.is_empty() {
Expand Down Expand Up @@ -1329,6 +1386,8 @@ impl<A: AllocatorTrait> ReplayMachine for MemTableRecoverContext<A> {
status,
items,
tombstone_items: new_tombstones,
start: file_id.seq,
end: file_id.seq,
},
);
} else {
Expand Down
42 changes: 35 additions & 7 deletions src/purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,22 @@ where
.unwrap();
}

pub fn must_purge_all_stale(&self) {
let _lk = self.force_rewrite_candidates.try_lock().unwrap();
self.pipe_log.rotate(LogQueue::Rewrite).unwrap();
self.rescan_memtables_and_purge_stale_files(
LogQueue::Rewrite,
self.pipe_log.file_span(LogQueue::Rewrite).1,
)
.unwrap();
self.pipe_log.rotate(LogQueue::Append).unwrap();
self.rescan_memtables_and_purge_stale_files(
LogQueue::Append,
self.pipe_log.file_span(LogQueue::Append).1,
)
.unwrap();
}

pub(crate) fn needs_rewrite_log_files(&self, queue: LogQueue) -> bool {
let (first_file, active_file) = self.pipe_log.file_span(queue);
if active_file == first_file {
Expand Down Expand Up @@ -281,7 +297,8 @@ where
&mut log_batch,
None, /* rewrite_watermark */
true, /* sync */
)
)?;
Ok(())
}

// Exclusive.
Expand Down Expand Up @@ -335,6 +352,7 @@ where

let mut previous_size = log_batch.approximate_size();
let mut atomic_group = None;
let mut atomic_group_start = None;
let mut current_entry_indexes = Vec::new();
let mut current_entries = Vec::new();
let mut current_size = 0;
Expand Down Expand Up @@ -380,7 +398,10 @@ where
)?;
current_size = 0;
previous_size = 0;
self.rewrite_impl(&mut log_batch, rewrite, false)?;
let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
if needs_atomicity && atomic_group_start.is_none() {
atomic_group_start = Some(handle.id.seq);
}
}
}
log_batch.add_raw_entries(region_id, current_entry_indexes, current_entries)?;
Expand All @@ -389,23 +410,30 @@ where
}
if let Some(g) = atomic_group.as_mut() {
g.end(&mut log_batch);
self.rewrite_impl(&mut log_batch, rewrite, false)?;
let handle = self.rewrite_impl(&mut log_batch, rewrite, false)?.unwrap();
self.memtables.apply_rewrite_atomic_group(
region_id,
atomic_group_start.unwrap(),
handle.id.seq,
);
} else if log_batch.approximate_size() > max_batch_bytes() {
self.rewrite_impl(&mut log_batch, rewrite, false)?;
}
}
self.rewrite_impl(&mut log_batch, rewrite, true)
self.rewrite_impl(&mut log_batch, rewrite, true)?;
Ok(())
}

fn rewrite_impl(
&self,
log_batch: &mut LogBatch,
rewrite_watermark: Option<FileSeq>,
sync: bool,
) -> Result<()> {
) -> Result<Option<FileBlockHandle>> {
if log_batch.is_empty() {
debug_assert!(sync);
return self.pipe_log.sync(LogQueue::Rewrite);
self.pipe_log.sync(LogQueue::Rewrite)?;
return Ok(None);
}
log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
let file_handle = self.pipe_log.append(LogQueue::Rewrite, log_batch)?;
Expand All @@ -430,7 +458,7 @@ where
.append
.observe(file_handle.len as f64);
}
Ok(())
Ok(Some(file_handle))
}
}

Expand Down
48 changes: 47 additions & 1 deletion tests/failpoints/test_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ fn test_split_rewrite_batch_imp(regions: u64, region_size: u64, split_size: u64,
})
.unwrap();
let r = catch_unwind_silent(|| engine.purge_manager().must_rewrite_rewrite_queue());
fail::remove("atomic_group::begin");
fail::remove("atomic_group::add");
fail::remove("log_fd::write::err");
if r.is_ok() {
break;
Expand Down Expand Up @@ -972,6 +972,52 @@ fn test_split_rewrite_batch_with_only_kvs() {
}
}

// issue-315
#[test]
fn test_split_rewrite_batch_then_delete_some() {
let dir = tempfile::Builder::new()
.prefix("test_split_rewrite_batch_then_delete_some")
.tempdir()
.unwrap();
let _f = FailGuard::new("max_rewrite_batch_bytes", "return(1)");
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
target_file_size: ReadableSize(1),
..Default::default()
};
let mut log_batch = LogBatch::default();
let value = vec![b'y'; 8];

let rid = 1;
let engine = Engine::open(cfg.clone()).unwrap();
for i in 0..=5 {
append(&engine, rid, i * 2, i * 2 + 2, Some(&value));
engine.purge_manager().must_rewrite_append_queue(None, None);
}
engine.purge_manager().must_rewrite_rewrite_queue();
log_batch.add_command(rid, Command::Compact { index: 7 });
log_batch.delete(rid, b"last_index".to_vec());
engine.write(&mut log_batch, true).unwrap();
engine.purge_manager().must_purge_all_stale();

drop(engine);
let engine = Engine::open(cfg.clone()).unwrap();
// The Compact mark is dropped during `must_purge_all_stale`.
assert_eq!(engine.first_index(rid).unwrap(), 0);
assert_eq!(engine.last_index(rid).unwrap(), 11);

// Removes all rewrite entries.
log_batch.add_command(rid, Command::Compact { index: 100 });
engine.write(&mut log_batch, false).unwrap();
append(&engine, rid, 5, 11, Some(&value));
engine.purge_manager().must_rewrite_append_queue(None, None);
engine.purge_manager().must_purge_all_stale();
drop(engine);
let engine = Engine::open(cfg).unwrap();
assert_eq!(engine.first_index(rid).unwrap(), 5);
assert_eq!(engine.last_index(rid).unwrap(), 10);
}

#[test]
fn test_build_engine_with_recycling_and_multi_dirs() {
let dir = tempfile::Builder::new()
Expand Down

0 comments on commit 2c76690

Please sign in to comment.