Skip to content

Commit

Permalink
fix panic when reusing a was-empty batch (#267)
Browse files Browse the repository at this point in the history
* add a test case

Signed-off-by: tabokie <xy.tao@outlook.com>

* fix the bug

Signed-off-by: tabokie <xy.tao@outlook.com>

* update changelog

Signed-off-by: tabokie <xy.tao@outlook.com>

* Address comment

Signed-off-by: Xinye Tao <xy.tao@outlook.com>

Signed-off-by: tabokie <xy.tao@outlook.com>
Signed-off-by: Xinye Tao <xy.tao@outlook.com>
  • Loading branch information
tabokie authored Sep 8, 2022
1 parent 618eea7 commit 087d36b
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

* Unconditionally tolerate `fallocate` failures as a fix to its portability issue. Errors other than `EOPNOTSUPP` will still emit a warning.
* Avoid leaving fractured write after failure by reseeking the file writer. Panic if the reseek fails as well.
* Fix panic when an empty batch is written to engine and then reused.

### New Features

Expand Down
66 changes: 43 additions & 23 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,12 @@ where
/// bytes. If `sync` is true, the write will be followed by a call to
/// `fdatasync` on the log file.
pub fn write(&self, log_batch: &mut LogBatch, mut sync: bool) -> Result<usize> {
if log_batch.is_empty() {
return Ok(0);
}
let start = Instant::now();
let len = log_batch.finish_populate(self.cfg.batch_compression_threshold.0 as usize)?;
debug_assert!(len > 0);
let block_handle = {
let mut writer = Writer::new(log_batch, sync);
// Snapshot and clear the current perf context temporarily, so the write group
Expand All @@ -151,16 +155,7 @@ where
writer.entered_time = Some(now);
sync |= writer.sync;
let log_batch = writer.mut_payload();
let res = if !log_batch.is_empty() {
self.pipe_log.append(LogQueue::Append, log_batch)
} else {
// TODO(tabokie): use Option<FileBlockHandle> instead.
Ok(FileBlockHandle {
id: FileId::new(LogQueue::Append, 0),
offset: 0,
len: 0,
})
};
let res = self.pipe_log.append(LogQueue::Append, log_batch);
writer.set_output(res);
}
perf_context!(log_write_duration).observe_since(now);
Expand All @@ -185,20 +180,17 @@ where
set_perf_context(perf_context);
writer.finish()?
};

let mut now = Instant::now();
if len > 0 {
log_batch.finish_write(block_handle);
self.memtables.apply_append_writes(log_batch.drain());
for listener in &self.listeners {
listener.post_apply_memtables(block_handle.id);
}
let end = Instant::now();
let apply_duration = end.saturating_duration_since(now);
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
perf_context!(apply_duration).observe(apply_duration);
now = end;
}
log_batch.finish_write(block_handle);
self.memtables.apply_append_writes(log_batch.drain());
for listener in &self.listeners {
listener.post_apply_memtables(block_handle.id);
}
let end = Instant::now();
let apply_duration = end.saturating_duration_since(now);
ENGINE_WRITE_APPLY_DURATION_HISTOGRAM.observe(apply_duration.as_secs_f64());
perf_context!(apply_duration).observe(apply_duration);
now = end;
ENGINE_WRITE_DURATION_HISTOGRAM.observe(now.saturating_duration_since(start).as_secs_f64());
ENGINE_WRITE_SIZE_HISTOGRAM.observe(len as f64);
Ok(len)
Expand Down Expand Up @@ -1393,6 +1385,34 @@ mod tests {
);
}

#[test]
fn test_empty_batch() {
let dir = tempfile::Builder::new()
.prefix("test_empty_batch")
.tempdir()
.unwrap();
let cfg = Config {
dir: dir.path().to_str().unwrap().to_owned(),
..Default::default()
};
let engine =
RaftLogEngine::open_with_file_system(cfg, Arc::new(ObfuscatedFileSystem::default()))
.unwrap();
let data = vec![b'x'; 16];
let cases = [[false, false], [false, true], [true, true]];
for (i, writes) in cases.iter().enumerate() {
let rid = i as u64;
let mut batch = LogBatch::default();
for &has_data in writes {
if has_data {
batch.put(rid, b"key".to_vec(), data.clone());
}
engine.write(&mut batch, true).unwrap();
assert!(batch.is_empty());
}
}
}

#[test]
fn test_dirty_recovery() {
let dir = tempfile::Builder::new()
Expand Down

0 comments on commit 087d36b

Please sign in to comment.