Skip to content

Commit

Permalink
resolve comment
Browse files Browse the repository at this point in the history
Signed-off-by: glorv <glorvs@163.com>
  • Loading branch information
glorv committed Mar 14, 2024
1 parent 1fd38fe commit 4a7ee4c
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 23 deletions.
3 changes: 1 addition & 2 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl<T: Storage> Raft<T> {
r: RaftCore {
id: c.id,
read_states: Default::default(),
raft_log: RaftLog::new(store, logger.clone()),
raft_log: RaftLog::new(store, logger.clone(), c),
max_inflight: c.max_inflight_msgs,
max_msg_size: c.max_size_per_msg,
pending_request_snapshot: INVALID_INDEX,
Expand Down Expand Up @@ -367,7 +367,6 @@ impl<T: Storage> Raft<T> {
max_committed_size_per_ready: c.max_committed_size_per_ready,
},
};
r.raft_log.apply_unpersisted_log_limit = c.apply_unpersisted_log_limit;
confchange::restore(&mut r.prs, r.r.raft_log.last_index(), conf_state)?;
let new_cs = r.post_conf_change();
if !raft_proto::conf_state_eq(&new_cs, conf_state) {
Expand Down
47 changes: 26 additions & 21 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::cmp;
use slog::warn;
use slog::Logger;

use crate::config::Config;
use crate::eraftpb::{Entry, Snapshot};
use crate::errors::{Error, Result, StorageError};
use crate::log_unstable::Unstable;
Expand Down Expand Up @@ -80,7 +81,7 @@ where

impl<T: Storage> RaftLog<T> {
/// Creates a new raft log with a given storage and tag.
pub fn new(store: T, logger: Logger) -> RaftLog<T> {
pub fn new(store: T, logger: Logger, cfg: &Config) -> RaftLog<T> {
let first_index = store.first_index().unwrap();
let last_index = store.last_index().unwrap();

Expand All @@ -91,7 +92,7 @@ impl<T: Storage> RaftLog<T> {
persisted: last_index,
applied: first_index - 1,
unstable: Unstable::new(last_index + 1, logger),
apply_unpersisted_log_limit: 0,
apply_unpersisted_log_limit: cfg.apply_unpersisted_log_limit,
}
}

Expand Down Expand Up @@ -719,6 +720,7 @@ mod test {
panic::{self, AssertUnwindSafe},
};

use crate::config::Config;
use crate::default_logger;
use crate::eraftpb;
use crate::errors::{Error, StorageError};
Expand Down Expand Up @@ -792,7 +794,7 @@ mod test {
];
for (i, &(ref ents, wconflict)) in tests.iter().enumerate() {
let store = MemStorage::new();
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
raft_log.append(&previous_ents);
let gconflict = raft_log.find_conflict(ents);
if gconflict != wconflict {
Expand All @@ -805,7 +807,7 @@ mod test {
fn test_is_up_to_date() {
let previous_ents = vec![new_entry(1, 1), new_entry(2, 2), new_entry(3, 3)];
let store = MemStorage::new();
let mut raft_log = RaftLog::new(store, default_logger());
let mut raft_log = RaftLog::new(store, default_logger(), &Config::default());
raft_log.append(&previous_ents);
let tests = vec![
// greater term, ignore lastIndex
Expand Down Expand Up @@ -854,7 +856,7 @@ mod test {
for (i, &(ref ents, windex, ref wents, wunstable)) in tests.iter().enumerate() {
let store = MemStorage::new();
store.wl().append(&previous_ents).expect("append failed");
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
let index = raft_log.append(ents);
if index != windex {
panic!("#{}: last_index = {}, want {}", i, index, windex);
Expand Down Expand Up @@ -884,7 +886,7 @@ mod test {
.append(&[new_entry(i, i)])
.expect("append failed");
}
let mut raft_log = RaftLog::new(storage, default_logger());
let mut raft_log = RaftLog::new(storage, default_logger(), &Config::default());
for i in unstable_index..last_index {
raft_log.append(&[new_entry(i + 1, i + 1)]);
}
Expand Down Expand Up @@ -931,7 +933,7 @@ mod test {
.wl()
.apply_snapshot(new_snapshot(storagesnapi, 1))
.expect("apply failed.");
let mut raft_log = RaftLog::new(store, default_logger());
let mut raft_log = RaftLog::new(store, default_logger(), &Config::default());
raft_log.restore(new_snapshot(unstablesnapi, 1));
assert_eq!(raft_log.committed, unstablesnapi);
assert_eq!(raft_log.persisted, storagesnapi);
Expand Down Expand Up @@ -964,7 +966,7 @@ mod test {
.wl()
.apply_snapshot(new_snapshot(offset, 1))
.expect("apply failed.");
let mut raft_log = RaftLog::new(store, default_logger());
let mut raft_log = RaftLog::new(store, default_logger(), &Config::default());
for i in 1..num {
raft_log.append(&[new_entry(offset + i, i)]);
}
Expand Down Expand Up @@ -995,7 +997,7 @@ mod test {
.expect("apply failed.");
let entries = vec![new_entry(index + 1, term), new_entry(index + 2, term + 1)];
store.wl().append(&entries).expect("");
let raft_log = RaftLog::new(store, default_logger());
let raft_log = RaftLog::new(store, default_logger(), &Config::default());

assert_eq!(raft_log.all_entries(), entries);
assert_eq!(index + 1, raft_log.first_index());
Expand Down Expand Up @@ -1064,7 +1066,7 @@ mod test {
.wl()
.apply_snapshot(new_snapshot(snap_index, snap_term))
.expect("");
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
assert_eq!(raft_log.persisted, snap_index);
raft_log.append(new_ents);
let unstable = raft_log.unstable_entries().to_vec();
Expand All @@ -1082,7 +1084,7 @@ mod test {
}
}

let mut raft_log = RaftLog::new(MemStorage::new(), default_logger());
let mut raft_log = RaftLog::new(MemStorage::new(), default_logger(), &Config::default());
raft_log.restore(new_snapshot(100, 1));
assert_eq!(raft_log.unstable.offset, 101);
raft_log.append(&[new_entry(101, 1)]);
Expand Down Expand Up @@ -1112,7 +1114,7 @@ mod test {
.expect("");

// append unstable entries to raftlog
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
raft_log.append(&previous_ents[(unstable - 1)..]);

let ents = raft_log.unstable_entries().to_vec();
Expand Down Expand Up @@ -1163,7 +1165,7 @@ mod test {
for (i, &(applied, persisted, committed, ref expect_entries)) in tests.iter().enumerate() {
let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(3, 1)).expect("");
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
raft_log.append(&ents);
let unstable = raft_log.unstable_entries().to_vec();
if let Some(e) = unstable.last() {
Expand Down Expand Up @@ -1241,13 +1243,16 @@ mod test {
(5, 5, 5, 0, None),
(5, 7, 7, MAX, Some(&ents[2..4])),
(7, 7, 7, MAX, None),
// applied can be higher than commited/persisted after restart.
(15, 9, 10, 0, None),
(15, 10, 10, MAX, None),
];
for (i, &(applied, persisted, committed, limit, ref expect_entries)) in
tests.iter().enumerate()
{
let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(3, 1)).expect("");
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
raft_log.apply_unpersisted_log_limit = limit;
raft_log.append(&ents);
let unstable = raft_log.unstable_entries().to_vec();
Expand Down Expand Up @@ -1308,7 +1313,7 @@ mod test {
.append(&[new_entry(offset + i, offset + i)])
.expect("");
}
let mut raft_log = RaftLog::new(store, default_logger());
let mut raft_log = RaftLog::new(store, default_logger(), &Config::default());
for i in (num / 2)..num {
raft_log.append(&[new_entry(offset + i, offset + i)]);
}
Expand Down Expand Up @@ -1447,7 +1452,7 @@ mod test {
let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(offset, 0)).unwrap();
store.wl().append(&entries(offset + 1, half)).unwrap();
let mut raft_log = RaftLog::new(store, default_logger());
let mut raft_log = RaftLog::new(store, default_logger(), &Config::default());
raft_log.append(&entries(half, last));

// Test that scan() returns the same entries as slice(), on all inputs.
Expand Down Expand Up @@ -1696,7 +1701,7 @@ mod test {
tests.iter().enumerate()
{
let store = MemStorage::new();
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
raft_log.append(&previous_ents);
raft_log.committed = commit;
raft_log.persisted = persist;
Expand Down Expand Up @@ -1751,7 +1756,7 @@ mod test {
];
for (i, &(commit, wcommit, wpanic)) in tests.iter().enumerate() {
let store = MemStorage::new();
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
raft_log.append(&previous_ents);
raft_log.committed = previous_commit;
let has_panic =
Expand Down Expand Up @@ -1788,7 +1793,7 @@ mod test {
for i in 1u64..index {
store.wl().append(&[new_entry(i, 0)]).expect("");
}
let mut raft_log = RaftLog::new(store, l.clone());
let mut raft_log = RaftLog::new(store, l.clone(), &Config::default());
raft_log.maybe_commit(index - 1, 0);
let committed = raft_log.committed;
#[allow(deprecated)]
Expand Down Expand Up @@ -1818,7 +1823,7 @@ mod test {
.wl()
.apply_snapshot(new_snapshot(offset, 0))
.expect("");
let mut raft_log = RaftLog::new(store, default_logger());
let mut raft_log = RaftLog::new(store, default_logger(), &Config::default());
for i in 1u64..=num {
raft_log.append(&[new_entry(i + offset, 0)]);
}
Expand Down Expand Up @@ -1868,7 +1873,7 @@ mod test {
fn test_restore_snap() {
let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(100, 1)).expect("");
let mut raft_log = RaftLog::new(store, default_logger());
let mut raft_log = RaftLog::new(store, default_logger(), &Config::default());
assert_eq!(raft_log.committed, 100);
assert_eq!(raft_log.persisted, 100);
raft_log.restore(new_snapshot(200, 1));
Expand Down

0 comments on commit 4a7ee4c

Please sign in to comment.