From 4a7ee4c664053d6823381f057bdd7e92740fbe3c Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 14 Mar 2024 14:07:47 +0800 Subject: [PATCH] resolve comment Signed-off-by: glorv --- src/raft.rs | 3 +-- src/raft_log.rs | 47 ++++++++++++++++++++++++++--------------------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/raft.rs b/src/raft.rs index 05a2a978..6fcd4b61 100644 --- a/src/raft.rs +++ b/src/raft.rs @@ -334,7 +334,7 @@ impl Raft { r: RaftCore { id: c.id, read_states: Default::default(), - raft_log: RaftLog::new(store, logger.clone()), + raft_log: RaftLog::new(store, logger.clone(), c), max_inflight: c.max_inflight_msgs, max_msg_size: c.max_size_per_msg, pending_request_snapshot: INVALID_INDEX, @@ -367,7 +367,6 @@ impl Raft { max_committed_size_per_ready: c.max_committed_size_per_ready, }, }; - r.raft_log.apply_unpersisted_log_limit = c.apply_unpersisted_log_limit; confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?; let new_cs = r.post_conf_change(); if !raft_proto::conf_state_eq(&new_cs, conf_state) { diff --git a/src/raft_log.rs b/src/raft_log.rs index 5dcaa398..41fffc1c 100644 --- a/src/raft_log.rs +++ b/src/raft_log.rs @@ -19,6 +19,7 @@ use std::cmp; use slog::warn; use slog::Logger; +use crate::config::Config; use crate::eraftpb::{Entry, Snapshot}; use crate::errors::{Error, Result, StorageError}; use crate::log_unstable::Unstable; @@ -80,7 +81,7 @@ where impl RaftLog { /// Creates a new raft log with a given storage and tag. - pub fn new(store: T, logger: Logger) -> RaftLog { + pub fn new(store: T, logger: Logger, cfg: &Config) -> RaftLog { let first_index = store.first_index().unwrap(); let last_index = store.last_index().unwrap(); @@ -91,7 +92,7 @@ impl RaftLog { persisted: last_index, applied: first_index - 1, unstable: Unstable::new(last_index + 1, logger), - apply_unpersisted_log_limit: 0, + apply_unpersisted_log_limit: cfg.apply_unpersisted_log_limit, } } @@ -719,6 +720,7 @@ mod test { panic::{self, AssertUnwindSafe}, }; + use crate::config::Config; use crate::default_logger; use crate::eraftpb; use crate::errors::{Error, StorageError}; @@ -792,7 +794,7 @@ mod test { ]; for (i, &(ref ents, wconflict)) in tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); let gconflict = raft_log.find_conflict(ents); if gconflict != wconflict { @@ -805,7 +807,7 @@ mod test { fn test_is_up_to_date() { let previous_ents = vec![new_entry(1, 1), new_entry(2, 2), new_entry(3, 3)]; let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.append(&previous_ents); let tests = vec![ // greater term, ignore lastIndex @@ -854,7 +856,7 @@ mod test { for (i, &(ref ents, windex, ref wents, wunstable)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().append(&previous_ents).expect("append failed"); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); let index = raft_log.append(ents); if index != windex { panic!("#{}: last_index = {}, want {}", i, index, windex); @@ -884,7 +886,7 @@ mod test { .append(&[new_entry(i, i)]) .expect("append failed"); } - let mut raft_log = RaftLog::new(storage, default_logger()); + let mut raft_log = RaftLog::new(storage, default_logger(), &Config::default()); for i in unstable_index..last_index { raft_log.append(&[new_entry(i + 1, i + 1)]); } @@ -931,7 +933,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(storagesnapi, 1)) .expect("apply failed."); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.restore(new_snapshot(unstablesnapi, 1)); assert_eq!(raft_log.committed, unstablesnapi); assert_eq!(raft_log.persisted, storagesnapi); @@ -964,7 +966,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(offset, 1)) .expect("apply failed."); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in 1..num { raft_log.append(&[new_entry(offset + i, i)]); } @@ -995,7 +997,7 @@ mod test { .expect("apply failed."); let entries = vec![new_entry(index + 1, term), new_entry(index + 2, term + 1)]; store.wl().append(&entries).expect(""); - let raft_log = RaftLog::new(store, default_logger()); + let raft_log = RaftLog::new(store, default_logger(), &Config::default()); assert_eq!(raft_log.all_entries(), entries); assert_eq!(index + 1, raft_log.first_index()); @@ -1064,7 +1066,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(snap_index, snap_term)) .expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); assert_eq!(raft_log.persisted, snap_index); raft_log.append(new_ents); let unstable = raft_log.unstable_entries().to_vec(); @@ -1082,7 +1084,7 @@ mod test { } } - let mut raft_log = RaftLog::new(MemStorage::new(), default_logger()); + let mut raft_log = RaftLog::new(MemStorage::new(), default_logger(), &Config::default()); raft_log.restore(new_snapshot(100, 1)); assert_eq!(raft_log.unstable.offset, 101); raft_log.append(&[new_entry(101, 1)]); @@ -1112,7 +1114,7 @@ mod test { .expect(""); // append unstable entries to raftlog - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents[(unstable - 1)..]); let ents = raft_log.unstable_entries().to_vec(); @@ -1163,7 +1165,7 @@ mod test { for (i, &(applied, persisted, committed, ref expect_entries)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&ents); let unstable = raft_log.unstable_entries().to_vec(); if let Some(e) = unstable.last() { @@ -1241,13 +1243,16 @@ mod test { (5, 5, 5, 0, None), (5, 7, 7, MAX, Some(&ents[2..4])), (7, 7, 7, MAX, None), + // applied can be higher than commited/persisted after restart. + (15, 9, 10, 0, None), + (15, 10, 10, MAX, None), ]; for (i, &(applied, persisted, committed, limit, ref expect_entries)) in tests.iter().enumerate() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(3, 1)).expect(""); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.apply_unpersisted_log_limit = limit; raft_log.append(&ents); let unstable = raft_log.unstable_entries().to_vec(); @@ -1308,7 +1313,7 @@ mod test { .append(&[new_entry(offset + i, offset + i)]) .expect(""); } - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in (num / 2)..num { raft_log.append(&[new_entry(offset + i, offset + i)]); } @@ -1447,7 +1452,7 @@ mod test { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(offset, 0)).unwrap(); store.wl().append(&entries(offset + 1, half)).unwrap(); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); raft_log.append(&entries(half, last)); // Test that scan() returns the same entries as slice(), on all inputs. @@ -1696,7 +1701,7 @@ mod test { tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); raft_log.committed = commit; raft_log.persisted = persist; @@ -1751,7 +1756,7 @@ mod test { ]; for (i, &(commit, wcommit, wpanic)) in tests.iter().enumerate() { let store = MemStorage::new(); - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.append(&previous_ents); raft_log.committed = previous_commit; let has_panic = @@ -1788,7 +1793,7 @@ mod test { for i in 1u64..index { store.wl().append(&[new_entry(i, 0)]).expect(""); } - let mut raft_log = RaftLog::new(store, l.clone()); + let mut raft_log = RaftLog::new(store, l.clone(), &Config::default()); raft_log.maybe_commit(index - 1, 0); let committed = raft_log.committed; #[allow(deprecated)] @@ -1818,7 +1823,7 @@ mod test { .wl() .apply_snapshot(new_snapshot(offset, 0)) .expect(""); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); for i in 1u64..=num { raft_log.append(&[new_entry(i + offset, 0)]); } @@ -1868,7 +1873,7 @@ mod test { fn test_restore_snap() { let store = MemStorage::new(); store.wl().apply_snapshot(new_snapshot(100, 1)).expect(""); - let mut raft_log = RaftLog::new(store, default_logger()); + let mut raft_log = RaftLog::new(store, default_logger(), &Config::default()); assert_eq!(raft_log.committed, 100); assert_eq!(raft_log.persisted, 100); raft_log.restore(new_snapshot(200, 1));