Skip to content

Commit

Permalink
when packing ancient append vecs, organize access to index
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 6, 2024
1 parent ce34f3f commit 974b0a4
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 20 deletions.
37 changes: 30 additions & 7 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
pub(crate) all_are_zero_lamports: bool,
/// index entries that need to be held in memory while shrink is in progress
/// These aren't read - they are just held so that entries cannot be flushed.
pub(crate) _index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
pub(crate) index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
}

pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
Expand Down Expand Up @@ -3750,6 +3750,7 @@ impl AccountsDb {
accounts: &'a [StoredAccountMeta<'a>],
stats: &ShrinkStats,
slot_to_shrink: Slot,
bin: Option<usize>,
) -> LoadAccountsIndexForShrink<'a, T> {
let count = accounts.len();
let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
Expand All @@ -3760,8 +3761,23 @@ impl AccountsDb {
let mut index = 0;
let mut all_are_zero_lamports = true;
let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len());
let bin_calc = &self.accounts_index.bin_calculator;
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
accounts.iter().filter_map(|account| {
let pk = account.pubkey();
if let Some(bin) = bin {
if bin_calc.bin_from_pubkey(pk) == bin {
Some(pk)
}
else {
// skipped this one
index += 1;
None
}
} else {
Some(pk)
}
}),
|pubkey, slots_refs, entry| {
let mut result = AccountsIndexScanResult::OnlyKeepInMemoryIfDirty;
if let Some((slot_list, ref_count)) = slots_refs {
Expand Down Expand Up @@ -3793,7 +3809,7 @@ impl AccountsDb {
None,
true,
);
assert_eq!(index, std::cmp::min(accounts.len(), count));
// assert_eq!(index, std::cmp::min(accounts.len(), count));
stats.alive_accounts.fetch_add(alive, Ordering::Relaxed);
stats.dead_accounts.fetch_add(dead, Ordering::Relaxed);

Expand Down Expand Up @@ -3847,6 +3863,7 @@ impl AccountsDb {
store: &'a Arc<AccountStorageEntry>,
unique_accounts: &'b GetUniqueAccountsResult<'b>,
stats: &ShrinkStats,
bin: Option<usize>,
) -> ShrinkCollect<'b, T> {
let slot = store.slot();

Expand Down Expand Up @@ -3874,7 +3891,7 @@ impl AccountsDb {
mut unrefed_pubkeys,
all_are_zero_lamports,
mut index_entries_being_shrunk,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot, bin);

// collect
alive_accounts_collect
Expand Down Expand Up @@ -3926,7 +3943,7 @@ impl AccountsDb {
alive_total_bytes,
total_starting_accounts: len,
all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(),
_index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(),
index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(),
}
}

Expand Down Expand Up @@ -3983,8 +4000,12 @@ impl AccountsDb {
let unique_accounts =
self.get_unique_accounts_from_storage_for_shrink(store, &self.shrink_stats);
debug!("do_shrink_slot_store: slot: {}", slot);
let shrink_collect =
self.shrink_collect::<AliveAccounts<'_>>(store, &unique_accounts, &self.shrink_stats);
let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
store,
&unique_accounts,
&self.shrink_stats,
None,
);

// This shouldn't happen if alive_bytes/approx_stored_count are accurate
if Self::should_not_shrink(
Expand Down Expand Up @@ -4539,6 +4560,7 @@ impl AccountsDb {
old_storage,
&unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
None,
);

// could follow what shrink does more closely
Expand Down Expand Up @@ -16980,6 +17002,7 @@ pub mod tests {
&storage,
&unique_accounts,
&ShrinkStats::default(),
None,
);
let expect_single_opposite_alive_account =
if append_opposite_alive_account {
Expand Down
94 changes: 81 additions & 13 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,86 @@ impl AccountsDb {

// `shrink_collect` all accounts in the append vecs we want to combine.
// This also unrefs all dead accounts in those append vecs.
let mut accounts_to_combine = self.thread_pool_clean.install(|| {
accounts_per_storage
.par_iter()
.map(|(info, unique_accounts)| {
self.shrink_collect::<ShrinkCollectAliveSeparatedByRefs<'_>>(
&info.storage,
unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
)
})
.collect::<Vec<_>>()
});
let mut accounts_to_combine =
self.thread_pool_clean.install(|| {
let mut result = Vec::default();
for bin in 0..self.accounts_index.bins() {
let this_bin = accounts_per_storage
.par_iter()
.map(|(info, unique_accounts)| {
self.shrink_collect::<ShrinkCollectAliveSeparatedByRefs<'_>>(
&info.storage,
unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
Some(bin),
)
})
.collect::<Vec<_>>();
if result.is_empty() {
result = this_bin;
} else {
// accumulate results from each bin. We need 1 entry per slot
this_bin.into_iter().zip(result.iter_mut()).for_each(
|(mut this_bin, all_bins)| {
all_bins.alive_accounts.many_refs_old_alive.accounts.append(
&mut this_bin.alive_accounts.many_refs_old_alive.accounts,
);
all_bins.alive_accounts.many_refs_old_alive.bytes +=
this_bin.alive_accounts.many_refs_old_alive.bytes;
assert_eq!(
all_bins.alive_accounts.many_refs_old_alive.slot,
this_bin.alive_accounts.many_refs_old_alive.slot
);

all_bins
.alive_accounts
.many_refs_this_is_newest_alive
.accounts
.append(
&mut this_bin
.alive_accounts
.many_refs_this_is_newest_alive
.accounts,
);
all_bins.alive_accounts.many_refs_this_is_newest_alive.bytes +=
this_bin.alive_accounts.many_refs_this_is_newest_alive.bytes;
assert_eq!(
all_bins.alive_accounts.many_refs_this_is_newest_alive.slot,
this_bin.alive_accounts.many_refs_this_is_newest_alive.slot
);

all_bins
.alive_accounts
.one_ref
.accounts
.append(&mut this_bin.alive_accounts.one_ref.accounts);
all_bins.alive_accounts.one_ref.bytes +=
this_bin.alive_accounts.one_ref.bytes;
all_bins.alive_total_bytes += this_bin.alive_total_bytes;
assert_eq!(
all_bins.alive_accounts.one_ref.slot,
this_bin.alive_accounts.one_ref.slot
);

// slot, capacity, total_starting_accounts: same for all bins
assert_eq!(all_bins.slot, this_bin.slot);
assert_eq!(all_bins.capacity, this_bin.capacity);
assert_eq!(all_bins.total_starting_accounts, this_bin.total_starting_accounts);

all_bins.all_are_zero_lamports = all_bins.all_are_zero_lamports
&& this_bin.all_are_zero_lamports;
all_bins
.unrefed_pubkeys
.append(&mut this_bin.unrefed_pubkeys);
all_bins
.index_entries_being_shrunk
.append(&mut this_bin.index_entries_being_shrunk);
},
);
}
}
result
});

let mut remove = Vec::default();
for (i, (shrink_collect, (info, _unique_accounts))) in accounts_to_combine
Expand Down Expand Up @@ -3301,7 +3369,7 @@ pub mod tests {
alive_total_bytes: 0,
total_starting_accounts: 0,
all_are_zero_lamports: false,
_index_entries_being_shrunk: Vec::default(),
index_entries_being_shrunk: Vec::default(),
};
let accounts_to_combine = AccountsToCombine {
accounts_keep_slots: HashMap::default(),
Expand Down

0 comments on commit 974b0a4

Please sign in to comment.