Skip to content

Commit

Permalink
[wal] Correct how wal iterator handles edge of the mapping
Browse files Browse the repository at this point in the history
Wal iterator used to panic if it reaches close to the end of the current mapping as it can not read the header.
  • Loading branch information
andll committed Jan 9, 2024
1 parent 7e2fdd5 commit fc068df
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 14 deletions.
11 changes: 0 additions & 11 deletions mysticeti-core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,17 +576,6 @@ mod smoke_tests {
validator.stop().await;
}

// TODO: to make the test pass for now I am making the validators start with a fresh storage.
// Otherwise I get a storage related error like:
// panicked at 'range end index 16 out of range for slice of length 13', mysticeti-core/src/wal.rs:296:33
//
// when block store is restored during the `open` method.
//
// Switching to CoreOptions::production() to have a sync file on every block write the error goes away. This makes me
// believe that some partial writing is happening or something that makes the file perceived as corrupt while reloading it.
// Haven't investigated further but needs to look into it.
let tempdir = TempDir::new("validator_shutdown_and_start").unwrap();

// now start again the validators - no error (ex network port conflict) should arise
for i in 0..committee_size {
let authority = i as AuthorityIndex;
Expand Down
41 changes: 38 additions & 3 deletions mysticeti-core/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ impl WalReader {
let offset = offset(position.start);
let bytes = self.map_offset(offset)?;
let buf_offset = (position.start - offset) as usize;
let (crc, len, tag) = Self::read_header(&bytes[buf_offset..]);
eprintln!("bytes.len={}, buf_offset={}", bytes.len(), buf_offset);
let Some((crc, len, tag)) = Self::read_header(&bytes[buf_offset..]) else {
return Ok(None);
};
if len == 0 {
if crc == 0 {
return Ok(None);
Expand Down Expand Up @@ -291,11 +294,15 @@ impl WalReader {
}

#[inline]
fn read_header(bytes: &[u8]) -> (u64, u64, Tag /*(crc, len, tag)*/) {
// Parses header bytes, returns None if buffer is too small to contain header
fn read_header(bytes: &[u8]) -> Option<(u64, u64, Tag /*(crc, len, tag)*/)> {
if bytes.len() < HEADER_LEN_BYTES_USIZE {
return None;
}
let mut header = [0u8; HEADER_LEN_BYTES_USIZE];
header.copy_from_slice(&bytes[..HEADER_LEN_BYTES_USIZE]);
let header = u128::from_le_bytes(header);
split_header(header)
Some(split_header(header))
}
}

Expand Down Expand Up @@ -436,6 +443,34 @@ mod tests {
assert!(iter.next().is_none());
}

#[test]
// This test corner case when iterator reaches almost(but not quite) end of the mapping and there is not
// enough bytes in the mapping to read the header
fn test_wal_iterator_over_map_boundary() {
const SIZE: usize = (MAP_SIZE - HEADER_LEN_BYTES) as usize - 4;
let bytes = vec![5u8; SIZE];
let temp = tempdir::TempDir::new("test_wal").unwrap();
let file = temp.path().join("wal");
let (mut writer, _reader) = wal(&file).unwrap();
const TAG1: Tag = 1;
const TAG2: Tag = 2;
let pos1 = writer.write(TAG1, &bytes).unwrap();
let pos2 = writer.write(TAG2, &bytes).unwrap();
assert!(pos2.start >= MAP_SIZE);
drop(writer);
let (writer, reader) = wal(&file).unwrap();
let mut iterator = reader.iter_until(&writer);
let (rpos1, (tag1, bytes1)) = iterator.next().unwrap();
let (rpos2, (tag2, bytes2)) = iterator.next().unwrap();
assert!(iterator.next().is_none());
assert_eq!(rpos1, pos1);
assert_eq!(rpos2, pos2);
assert_eq!(tag1, TAG1);
assert_eq!(tag2, TAG2);
assert_eq!(bytes1.as_ref(), &bytes);
assert_eq!(bytes2.as_ref(), &bytes);
}

#[track_caller]
// Read from position, assert tag
fn rd(reader: &WalReader, pos: WalPosition, tag: Tag) -> Bytes {
Expand Down

0 comments on commit fc068df

Please sign in to comment.