Skip to content

Commit

Permalink
Refactor remove_uncleaned_slots to reduce memory consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
dmakarov committed Sep 19, 2024
1 parent 037838a commit ebf3039
Showing 1 changed file with 49 additions and 115 deletions.
164 changes: 49 additions & 115 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3069,32 +3069,24 @@ impl AccountsDb {
.collect()
}

/// Remove `slots` from `uncleaned_pubkeys` and collect all pubkeys
///
/// For each slot in the list of uncleaned slots, remove it from the `uncleaned_pubkeys` Map
/// and collect all the pubkeys to return.
fn remove_uncleaned_slots_and_collect_pubkeys(
&self,
uncleaned_slots: Vec<Slot>,
) -> Vec<Vec<Pubkey>> {
uncleaned_slots
.into_iter()
.filter_map(|uncleaned_slot| {
self.uncleaned_pubkeys
.remove(&uncleaned_slot)
.map(|(_removed_slot, removed_pubkeys)| removed_pubkeys)
})
.collect()
}

/// Remove uncleaned slots, up to a maximum slot, and return the collected pubkeys
///
fn remove_uncleaned_slots_and_collect_pubkeys_up_to_slot(
/// For each slot in the list of uncleaned slots, up to a maximum
/// slot, remove it from the `uncleaned_pubkeys` and move all the
/// pubkeys to `candidates` for cleaning.
fn remove_uncleaned_slots_up_to_slot_and_move_pubkeys(
&self,
max_slot_inclusive: Slot,
) -> Vec<Vec<Pubkey>> {
candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>],
) {
let uncleaned_slots = self.collect_uncleaned_slots_up_to_slot(max_slot_inclusive);
self.remove_uncleaned_slots_and_collect_pubkeys(uncleaned_slots)
for uncleaned_slot in uncleaned_slots.into_iter() {
if let Some((_removed_slot, removed_pubkeys)) =
self.uncleaned_pubkeys.remove(&uncleaned_slot)
{
removed_pubkeys
.iter()
.for_each(|pubkey| self.insert_pubkey(candidates, pubkey))
}
}
}

fn count_pubkeys(candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>]) -> u64 {
Expand All @@ -3104,10 +3096,19 @@ impl AccountsDb {
.sum::<usize>() as u64
}

/// Construct a vec of pubkeys for cleaning from:
/// uncleaned_pubkeys - the delta set of updated pubkeys in rooted slots from the last clean
/// dirty_stores - set of stores which had accounts removed or recently rooted
/// returns the minimum slot we encountered
fn insert_pubkey(&self, candidates: &[RwLock<HashMap<Pubkey, CleaningInfo>>], pubkey: &Pubkey) {
let index = self.accounts_index.bin_calculator.bin_from_pubkey(pubkey);
let mut candidates_bin = candidates[index].write().unwrap();
candidates_bin.insert(*pubkey, CleaningInfo::default());
}

/// Construct a list of candidates for cleaning from:
/// - dirty_stores -- set of stores which had accounts
/// removed or recently rooted;
/// - uncleaned_pubkeys -- the delta set of updated pubkeys in
/// rooted slots from the last clean.
///
/// The function also returns the minimum slot we encountered.
fn construct_candidate_clean_keys(
&self,
max_clean_root_inclusive: Option<Slot>,
Expand Down Expand Up @@ -3138,12 +3139,6 @@ impl AccountsDb {
std::iter::repeat_with(|| RwLock::new(HashMap::<Pubkey, CleaningInfo>::new()))
.take(num_bins)
.collect();

let insert_pubkey = |pubkey: &Pubkey| {
let index = self.accounts_index.bin_calculator.bin_from_pubkey(pubkey);
let mut candidates_bin = candidates[index].write().unwrap();
candidates_bin.insert(*pubkey, CleaningInfo::default());
};
let dirty_ancient_stores = AtomicUsize::default();
let mut dirty_store_routine = || {
let chunk_size = 1.max(dirty_stores_len.saturating_div(rayon::current_num_threads()));
Expand All @@ -3156,7 +3151,9 @@ impl AccountsDb {
dirty_ancient_stores.fetch_add(1, Ordering::Relaxed);
}
oldest_dirty_slot = oldest_dirty_slot.min(*slot);
store.accounts.scan_pubkeys(insert_pubkey);
store
.accounts
.scan_pubkeys(|pubkey| self.insert_pubkey(&candidates, pubkey));
});
oldest_dirty_slot
})
Expand Down Expand Up @@ -3186,25 +3183,14 @@ impl AccountsDb {
timings.dirty_ancient_stores = dirty_ancient_stores.load(Ordering::Relaxed);

let mut collect_delta_keys = Measure::start("key_create");
let delta_keys =
self.remove_uncleaned_slots_and_collect_pubkeys_up_to_slot(max_slot_inclusive);
self.remove_uncleaned_slots_up_to_slot_and_move_pubkeys(max_slot_inclusive, &candidates);
collect_delta_keys.stop();
timings.collect_delta_keys_us += collect_delta_keys.as_us();

let mut delta_insert = Measure::start("delta_insert");
self.thread_pool_clean.install(|| {
delta_keys.par_iter().for_each(|keys| {
for key in keys {
insert_pubkey(key);
}
});
});
delta_insert.stop();
timings.delta_insert_us += delta_insert.as_us();

timings.delta_key_count = Self::count_pubkeys(&candidates);

// Check if we should purge any of the zero_lamport_accounts_to_purge_later, based on the
// Check if we should purge any of the
// zero_lamport_accounts_to_purge_later, based on the
// latest_full_snapshot_slot.
let latest_full_snapshot_slot = self.latest_full_snapshot_slot();
assert!(
Expand All @@ -3217,7 +3203,7 @@ impl AccountsDb {
let is_candidate_for_clean =
max_slot_inclusive >= *slot && latest_full_snapshot_slot >= *slot;
if is_candidate_for_clean {
insert_pubkey(pubkey);
self.insert_pubkey(&candidates, pubkey);
}
!is_candidate_for_clean
});
Expand Down Expand Up @@ -14810,64 +14796,6 @@ pub mod tests {
assert_eq!(uncleaned_slots3, [slot1, slot2, slot3]);
}

#[test]
fn test_remove_uncleaned_slots_and_collect_pubkeys() {
solana_logger::setup();
let db = AccountsDb::new_single_for_tests();

let slot1 = 11;
let slot2 = 222;
let slot3 = 3333;

let pubkey1 = Pubkey::new_unique();
let pubkey2 = Pubkey::new_unique();
let pubkey3 = Pubkey::new_unique();

let account1 = AccountSharedData::new(0, 0, &pubkey1);
let account2 = AccountSharedData::new(0, 0, &pubkey2);
let account3 = AccountSharedData::new(0, 0, &pubkey3);

db.store_for_tests(slot1, &[(&pubkey1, &account1)]);
db.store_for_tests(slot2, &[(&pubkey2, &account2)]);
db.store_for_tests(slot3, &[(&pubkey3, &account3)]);

db.add_root(slot1);
// slot 2 is _not_ a root on purpose
db.add_root(slot3);

db.uncleaned_pubkeys.insert(slot1, vec![pubkey1]);
db.uncleaned_pubkeys.insert(slot2, vec![pubkey2]);
db.uncleaned_pubkeys.insert(slot3, vec![pubkey3]);

let uncleaned_pubkeys1 = db
.remove_uncleaned_slots_and_collect_pubkeys(vec![slot1])
.into_iter()
.flatten()
.collect::<Vec<_>>();
let uncleaned_pubkeys2 = db
.remove_uncleaned_slots_and_collect_pubkeys(vec![slot2])
.into_iter()
.flatten()
.collect::<Vec<_>>();
let uncleaned_pubkeys3 = db
.remove_uncleaned_slots_and_collect_pubkeys(vec![slot3])
.into_iter()
.flatten()
.collect::<Vec<_>>();

assert!(uncleaned_pubkeys1.contains(&pubkey1));
assert!(!uncleaned_pubkeys1.contains(&pubkey2));
assert!(!uncleaned_pubkeys1.contains(&pubkey3));

assert!(!uncleaned_pubkeys2.contains(&pubkey1));
assert!(uncleaned_pubkeys2.contains(&pubkey2));
assert!(!uncleaned_pubkeys2.contains(&pubkey3));

assert!(!uncleaned_pubkeys3.contains(&pubkey1));
assert!(!uncleaned_pubkeys3.contains(&pubkey2));
assert!(uncleaned_pubkeys3.contains(&pubkey3));
}

#[test]
fn test_remove_uncleaned_slots_and_collect_pubkeys_up_to_slot() {
solana_logger::setup();
Expand Down Expand Up @@ -14897,15 +14825,21 @@ pub mod tests {
db.uncleaned_pubkeys.insert(slot2, vec![pubkey2]);
db.uncleaned_pubkeys.insert(slot3, vec![pubkey3]);

let uncleaned_pubkeys = db
.remove_uncleaned_slots_and_collect_pubkeys_up_to_slot(slot3)
.into_iter()
.flatten()
.collect::<Vec<_>>();
let num_bins = db.accounts_index.bins();
let candidates: Box<_> =
std::iter::repeat_with(|| RwLock::new(HashMap::<Pubkey, CleaningInfo>::new()))
.take(num_bins)
.collect();
db.remove_uncleaned_slots_up_to_slot_and_move_pubkeys(slot3, &candidates);

assert!(uncleaned_pubkeys.contains(&pubkey1));
assert!(uncleaned_pubkeys.contains(&pubkey2));
assert!(uncleaned_pubkeys.contains(&pubkey3));
let candidates_contain = |pubkey: &Pubkey| {
candidates
.iter()
.any(|bin| bin.read().unwrap().contains(pubkey))
};
assert!(candidates_contain(&pubkey1));
assert!(candidates_contain(&pubkey2));
assert!(candidates_contain(&pubkey3));
}

#[test]
Expand Down

0 comments on commit ebf3039

Please sign in to comment.