Skip to content

Commit

Permalink
raft: paginate the unapplied config changes scan (#530)
Browse files Browse the repository at this point in the history
Fix potentially unlimited memory usage spike possible in raft.hup() which reads
all unapplied committed entries in order to check that there are no unapplied
config changes. This PR paginates this scan so that the spike is limited to
MaxCommittedSizePerReady. It also terminates the scan early if a config change
has been found.

It is ported from etcd-io/raft#32.

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus authored Oct 17, 2023
1 parent a541303 commit f60fb9e
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 55 deletions.
101 changes: 49 additions & 52 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1240,15 +1240,6 @@ impl<T: Storage> Raft<T> {
trace!(self.logger, "EXIT become_leader");
}

fn num_pending_conf(&self, ents: &[Entry]) -> usize {
ents.iter()
.filter(|e| {
e.get_entry_type() == EntryType::EntryConfChange
|| e.get_entry_type() == EntryType::EntryConfChangeV2
})
.count()
}

// Campaign to attempt to become a leader.
//
// If prevote is enabled, this is handled as well.
Expand Down Expand Up @@ -1517,43 +1508,27 @@ impl<T: Storage> Raft<T> {
return;
}

// Scan all unapplied committed entries to find a config change.
// Paginate the scan, to avoid a potentially unlimited memory spike.
//
// If there is a pending snapshot, its index will be returned by
// `maybe_first_index`. Note that snapshot updates configuration
// already, so as long as pending entries don't contain conf change
// it's safe to start campaign.
let first_index = match self.raft_log.unstable.maybe_first_index() {
let low = match self.raft_log.unstable.maybe_first_index() {
Some(idx) => idx,
None => self.raft_log.applied + 1,
};

let ents = self
.raft_log
.slice(
first_index,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::TransferLeader),
)
.unwrap_or_else(|e| {
fatal!(
self.logger,
"unexpected error getting unapplied entries [{}, {}): {:?}",
first_index,
self.raft_log.committed + 1,
e
);
});
let n = self.num_pending_conf(&ents);
if n != 0 {
let high = self.raft_log.committed + 1;
let ctx = GetEntriesContext(GetEntriesFor::TransferLeader);
if self.has_unapplied_conf_changes(low, high, ctx) {
warn!(
self.logger,
"cannot campaign at term {term} since there are still {pending_changes} pending \
configuration changes to apply",
term = self.term,
pending_changes = n;
"cannot campaign at term {} since there are still pending configuration changes to apply", self.term
);
return;
}

info!(
self.logger,
"starting a new election";
Expand All @@ -1568,6 +1543,40 @@ impl<T: Storage> Raft<T> {
}
}

fn has_unapplied_conf_changes(&self, lo: u64, hi: u64, context: GetEntriesContext) -> bool {
if self.raft_log.applied >= self.raft_log.committed {
// in fact applied == committed
return false;
}
let mut found = false;
// Reuse the max_committed_size_per_ready limit because it is used for
// similar purposes (limiting the read of unapplied committed entries)
// when raft sends entries via the Ready struct for application.
// TODO(pavelkalinnikov): find a way to budget memory/bandwidth for this scan
// outside the raft package.
let page_size = self.max_committed_size_per_ready;
if let Err(err) = self.raft_log.scan(lo, hi, page_size, context, |ents| {
for e in ents {
if e.get_entry_type() == EntryType::EntryConfChange
|| e.get_entry_type() == EntryType::EntryConfChangeV2
{
found = true;
return false;
}
}
true
}) {
fatal!(
self.logger,
"error scanning unapplied entries [{}, {}): {:?}",
lo,
hi,
err
);
}
found
}

fn log_vote_approve(&self, m: &Message) {
info!(
self.logger,
Expand Down Expand Up @@ -2186,24 +2195,12 @@ impl<T: Storage> Raft<T> {
return;
}

let ents = self
.raft_log
.slice(
last_commit + 1,
self.raft_log.committed + 1,
None,
GetEntriesContext(GetEntriesFor::CommitByVote),
)
.unwrap_or_else(|e| {
fatal!(
self.logger,
"unexpected error getting unapplied entries [{}, {}): {:?}",
last_commit + 1,
self.raft_log.committed + 1,
e
);
});
if self.num_pending_conf(&ents) != 0 {
// Scan all unapplied committed entries to find a config change.
// Paginate the scan, to avoid a potentially unlimited memory spike.
let low = last_commit + 1;
let high = self.raft_log.committed + 1;
let ctx = GetEntriesContext(GetEntriesFor::CommitByVote);
if self.has_unapplied_conf_changes(low, high, ctx) {
// The candidate doesn't have to step down in theory, here just for best
// safety as we assume quorum won't change during election.
let term = self.term;
Expand Down
125 changes: 125 additions & 0 deletions src/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,41 @@ impl<T: Storage> RaftLog<T> {
}
}

// scan visits all log entries in the [lo, hi) range, returning them via the
// given callback. The callback can be invoked multiple times, with consecutive
// sub-ranges of the requested range. Returns up to page_size bytes worth of
// entries at a time. May return more if a single entry size exceeds the limit.
//
// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
// error.
//
// If the callback returns false, scan terminates.
pub(crate) fn scan<F>(
&self,
mut lo: u64,
hi: u64,
page_size: u64,
context: GetEntriesContext,
mut v: F,
) -> Result<()>
where
F: FnMut(Vec<Entry>) -> bool,
{
while lo < hi {
let ents = self.slice(lo, hi, page_size, context)?;
if ents.is_empty() {
return Err(Error::Store(StorageError::Other(
format!("got 0 entries in [{}, {})", lo, hi).into(),
)));
}
lo += ents.len() as u64;
if !v(ents) {
return Ok(());
}
}
Ok(())
}

/// Grabs a slice of entries from the raft. Unlike a rust slice pointer, these are
/// returned by value. The result is truncated to the max_size in bytes.
pub fn slice(
Expand Down Expand Up @@ -672,6 +707,7 @@ mod test {
use crate::errors::{Error, StorageError};
use crate::raft_log::{self, RaftLog};
use crate::storage::{GetEntriesContext, MemStorage};
use crate::NO_LIMIT;
use protobuf::Message as PbMessage;

fn new_entry(index: u64, term: u64) -> eraftpb::Entry {
Expand Down Expand Up @@ -1283,6 +1319,95 @@ mod test {
}
}

fn ents_size(ents: &[eraftpb::Entry]) -> u64 {
let mut size = 0;
for ent in ents {
size += ent.compute_size() as u64;
}
size
}

#[test]
fn test_scan() {
let offset = 47;
let num = 20;
let last = offset + num;
let half = offset + num / 2;
let entries = |from, to| {
let mut ents = vec![];
for i in from..to {
ents.push(new_entry(i, i));
}
ents
};
let entry_size = ents_size(&entries(half, half + 1));

let store = MemStorage::new();
store.wl().apply_snapshot(new_snapshot(offset, 0)).unwrap();
store.wl().append(&entries(offset + 1, half)).unwrap();
let mut raft_log = RaftLog::new(store, default_logger());
raft_log.append(&entries(half, last));

// Test that scan() returns the same entries as slice(), on all inputs.
for page_size in [0, 1, 10, 100, entry_size, entry_size + 1] {
for lo in offset + 1..last {
for hi in lo..=last {
let mut got = vec![];
raft_log
.scan(lo, hi, page_size, GetEntriesContext::empty(false), |e| {
assert!(
e.len() == 1 || ents_size(&e) < page_size,
"{} {} {}",
e.len(),
ents_size(&e),
page_size
);
got.extend(e);
true
})
.unwrap();
let want = raft_log
.slice(lo, hi, NO_LIMIT, GetEntriesContext::empty(false))
.unwrap();
assert_eq!(
got, want,
"scan() and slice() mismatch on [{}, {}) @ {}",
lo, hi, page_size
);
}
}
}

// Test that the callback early return.
let mut iters = 0;
raft_log
.scan(offset + 1, half, 0, GetEntriesContext::empty(false), |_| {
iters += 1;
if iters == 2 {
return false;
}
true
})
.unwrap();
assert_eq!(iters, 2);

// Test that we max out the limit, and not just always return a single entry.
// NB: this test works only because the requested range length is even.
raft_log
.scan(
offset + 1,
offset + 11,
entry_size * 2,
GetEntriesContext::empty(false),
|e| {
assert_eq!(e.len(), 2);
assert_eq!(entry_size * 2, ents_size(&e));
true
},
)
.unwrap();
}

/// `test_log_maybe_append` ensures:
/// If the given (index, term) matches with the existing log:
/// 1. If an existing entry conflicts with a new one (same index
Expand Down
6 changes: 3 additions & 3 deletions src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl RaftState {
}

/// Records the context of the caller who calls entries() of Storage trait.
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub struct GetEntriesContext(pub(crate) GetEntriesFor);

impl GetEntriesContext {
Expand All @@ -76,7 +76,7 @@ impl GetEntriesContext {
}
}

#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub(crate) enum GetEntriesFor {
// for sending entries to followers
SendAppend {
Expand Down Expand Up @@ -120,7 +120,7 @@ pub trait Storage {
/// Storage should check context.can_async() first and decide whether to fetch entries asynchronously
/// based on its own implementation. If the entries are fetched asynchronously, storage should return
/// LogTemporarilyUnavailable, and application needs to call `on_entries_fetched(context)` to trigger
/// re-fetch of the entries after the storage finishes fetching the entries.
/// re-fetch of the entries after the storage finishes fetching the entries.
///
/// # Panics
///
Expand Down

0 comments on commit f60fb9e

Please sign in to comment.