Skip to content

Commit

Permalink
when packing ancient append vecs, organize access to index
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington committed Mar 6, 2024
1 parent ce34f3f commit 850d625
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 23 deletions.
41 changes: 31 additions & 10 deletions accounts-db/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ pub(crate) struct ShrinkCollect<'a, T: ShrinkCollectRefs<'a>> {
pub(crate) all_are_zero_lamports: bool,
/// index entries that need to be held in memory while shrink is in progress
/// These aren't read - they are just held so that entries cannot be flushed.
pub(crate) _index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
pub(crate) index_entries_being_shrunk: Vec<AccountMapEntry<AccountInfo>>,
}

pub const ACCOUNTS_DB_CONFIG_FOR_TESTING: AccountsDbConfig = AccountsDbConfig {
Expand Down Expand Up @@ -3750,22 +3750,37 @@ impl AccountsDb {
accounts: &'a [StoredAccountMeta<'a>],
stats: &ShrinkStats,
slot_to_shrink: Slot,
bin: Option<usize>,
) -> LoadAccountsIndexForShrink<'a, T> {
let count = accounts.len();
let mut alive_accounts = T::with_capacity(count, slot_to_shrink);
let mut unrefed_pubkeys = Vec::with_capacity(count);

let mut alive = 0;
let mut dead = 0;
let mut index = 0;
let index = AtomicUsize::default();
let mut all_are_zero_lamports = true;
let mut index_entries_being_shrunk = Vec::with_capacity(accounts.len());
let bin_calc = &self.accounts_index.bin_calculator;
self.accounts_index.scan(
accounts.iter().map(|account| account.pubkey()),
accounts.iter().filter_map(|account| {
let pk = account.pubkey();
if let Some(bin) = bin {
if bin_calc.bin_from_pubkey(pk) == bin {
Some(pk)
} else {
// skipped this one
index.fetch_add(1, Ordering::Relaxed);
None
}
} else {
Some(pk)
}
}),
|pubkey, slots_refs, entry| {
let mut result = AccountsIndexScanResult::OnlyKeepInMemoryIfDirty;
if let Some((slot_list, ref_count)) = slots_refs {
let stored_account = &accounts[index];
let stored_account = &accounts[index.fetch_add(1, Ordering::Relaxed)];
let is_alive = slot_list.iter().any(|(slot, _acct_info)| {
// if the accounts index contains an entry at this slot, then the append vec we're asking about contains this item and thus, it is alive at this slot
*slot == slot_to_shrink
Expand All @@ -3787,13 +3802,12 @@ impl AccountsDb {
alive += 1;
}
}
index += 1;
result
},
None,
true,
);
assert_eq!(index, std::cmp::min(accounts.len(), count));
// assert_eq!(index, std::cmp::min(accounts.len(), count));
stats.alive_accounts.fetch_add(alive, Ordering::Relaxed);
stats.dead_accounts.fetch_add(dead, Ordering::Relaxed);

Expand Down Expand Up @@ -3847,6 +3861,7 @@ impl AccountsDb {
store: &'a Arc<AccountStorageEntry>,
unique_accounts: &'b GetUniqueAccountsResult<'b>,
stats: &ShrinkStats,
bin: Option<usize>,
) -> ShrinkCollect<'b, T> {
let slot = store.slot();

Expand Down Expand Up @@ -3874,7 +3889,7 @@ impl AccountsDb {
mut unrefed_pubkeys,
all_are_zero_lamports,
mut index_entries_being_shrunk,
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot);
} = self.load_accounts_index_for_shrink(stored_accounts, stats, slot, bin);

// collect
alive_accounts_collect
Expand Down Expand Up @@ -3926,7 +3941,7 @@ impl AccountsDb {
alive_total_bytes,
total_starting_accounts: len,
all_are_zero_lamports: all_are_zero_lamports_collect.into_inner().unwrap(),
_index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(),
index_entries_being_shrunk: index_entries_being_shrunk_outer.into_inner().unwrap(),
}
}

Expand Down Expand Up @@ -3983,8 +3998,12 @@ impl AccountsDb {
let unique_accounts =
self.get_unique_accounts_from_storage_for_shrink(store, &self.shrink_stats);
debug!("do_shrink_slot_store: slot: {}", slot);
let shrink_collect =
self.shrink_collect::<AliveAccounts<'_>>(store, &unique_accounts, &self.shrink_stats);
let shrink_collect = self.shrink_collect::<AliveAccounts<'_>>(
store,
&unique_accounts,
&self.shrink_stats,
None,
);

// This shouldn't happen if alive_bytes/approx_stored_count are accurate
if Self::should_not_shrink(
Expand Down Expand Up @@ -4539,6 +4558,7 @@ impl AccountsDb {
old_storage,
&unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
None,
);

// could follow what shrink does more closely
Expand Down Expand Up @@ -16980,6 +17000,7 @@ pub mod tests {
&storage,
&unique_accounts,
&ShrinkStats::default(),
None,
);
let expect_single_opposite_alive_account =
if append_opposite_alive_account {
Expand Down
94 changes: 81 additions & 13 deletions accounts-db/src/ancient_append_vecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,18 +611,86 @@ impl AccountsDb {

// `shrink_collect` all accounts in the append vecs we want to combine.
// This also unrefs all dead accounts in those append vecs.
let mut accounts_to_combine = self.thread_pool_clean.install(|| {
accounts_per_storage
.par_iter()
.map(|(info, unique_accounts)| {
self.shrink_collect::<ShrinkCollectAliveSeparatedByRefs<'_>>(
&info.storage,
unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
)
})
.collect::<Vec<_>>()
});
let mut accounts_to_combine =
self.thread_pool_clean.install(|| {
let mut result = Vec::default();
for bin in 0..self.accounts_index.bins() {
let this_bin = accounts_per_storage
.par_iter()
.map(|(info, unique_accounts)| {
self.shrink_collect::<ShrinkCollectAliveSeparatedByRefs<'_>>(
&info.storage,
unique_accounts,
&self.shrink_ancient_stats.shrink_stats,
Some(bin),
)
})
.collect::<Vec<_>>();
if result.is_empty() {
result = this_bin;
} else {
// accumulate results from each bin. We need 1 entry per slot
this_bin.into_iter().zip(result.iter_mut()).for_each(
|(mut this_bin, all_bins)| {
all_bins.alive_accounts.many_refs_old_alive.accounts.append(
&mut this_bin.alive_accounts.many_refs_old_alive.accounts,
);
all_bins.alive_accounts.many_refs_old_alive.bytes +=
this_bin.alive_accounts.many_refs_old_alive.bytes;
assert_eq!(
all_bins.alive_accounts.many_refs_old_alive.slot,
this_bin.alive_accounts.many_refs_old_alive.slot
);

all_bins
.alive_accounts
.many_refs_this_is_newest_alive
.accounts
.append(
&mut this_bin
.alive_accounts
.many_refs_this_is_newest_alive
.accounts,
);
all_bins.alive_accounts.many_refs_this_is_newest_alive.bytes +=
this_bin.alive_accounts.many_refs_this_is_newest_alive.bytes;
assert_eq!(
all_bins.alive_accounts.many_refs_this_is_newest_alive.slot,
this_bin.alive_accounts.many_refs_this_is_newest_alive.slot
);

all_bins
.alive_accounts
.one_ref
.accounts
.append(&mut this_bin.alive_accounts.one_ref.accounts);
all_bins.alive_accounts.one_ref.bytes +=
this_bin.alive_accounts.one_ref.bytes;
all_bins.alive_total_bytes += this_bin.alive_total_bytes;
assert_eq!(
all_bins.alive_accounts.one_ref.slot,
this_bin.alive_accounts.one_ref.slot
);

// slot, capacity, total_starting_accounts: same for all bins
assert_eq!(all_bins.slot, this_bin.slot);
assert_eq!(all_bins.capacity, this_bin.capacity);
assert_eq!(all_bins.total_starting_accounts, this_bin.total_starting_accounts);

all_bins.all_are_zero_lamports = all_bins.all_are_zero_lamports
&& this_bin.all_are_zero_lamports;
all_bins
.unrefed_pubkeys
.append(&mut this_bin.unrefed_pubkeys);
all_bins
.index_entries_being_shrunk
.append(&mut this_bin.index_entries_being_shrunk);
},
);
}
}
result
});

let mut remove = Vec::default();
for (i, (shrink_collect, (info, _unique_accounts))) in accounts_to_combine
Expand Down Expand Up @@ -3301,7 +3369,7 @@ pub mod tests {
alive_total_bytes: 0,
total_starting_accounts: 0,
all_are_zero_lamports: false,
_index_entries_being_shrunk: Vec::default(),
index_entries_being_shrunk: Vec::default(),
};
let accounts_to_combine = AccountsToCombine {
accounts_keep_slots: HashMap::default(),
Expand Down

0 comments on commit 850d625

Please sign in to comment.