Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: only subscribe to short lived subnets 2 slots in advanced #5810

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ export function getValidatorApi({
metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);

return {
data: chain.attestationPool.getAggregate(slot, attestationDataRoot),
data: aggregate,
};
},

Expand Down
38 changes: 20 additions & 18 deletions packages/beacon-node/src/network/subnets/attnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ enum SubnetSource {
random = "random",
}

/** As monitored on goerli, we only need to subscribe 2 slots before aggregator dutied slot to get stable mesh peers */
const SLOTS_TO_SUBSCRIBE_IN_ADVANCE = 2;

/**
* Manage random (long lived) subnets and committee (short lived) subnets.
* - PeerManager uses attnetsService to know which peers are requried for duties
Expand Down Expand Up @@ -118,7 +121,6 @@ export class AttnetsService implements IAttnetsService {
addCommitteeSubscriptions(subscriptions: CommitteeSubscription[]): void {
const currentSlot = this.clock.currentSlot;
let addedknownValidators = false;
const subnetsToSubscribe: RequestedSubnet[] = [];

for (const {validatorIndex, subnet, slot, isAggregator} of subscriptions) {
// Add known validator
Expand All @@ -129,23 +131,10 @@ export class AttnetsService implements IAttnetsService {
this.committeeSubnets.request({subnet, toSlot: slot + 1});
if (isAggregator) {
// need exact slot here
subnetsToSubscribe.push({subnet, toSlot: slot});
this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet);
}
}

// Trigger gossip subscription first, in batch
if (subnetsToSubscribe.length > 0) {
this.subscribeToSubnets(
subnetsToSubscribe.map((sub) => sub.subnet),
SubnetSource.committee
);
}
// Then, register the subscriptions
for (const subscription of subnetsToSubscribe) {
this.subscriptionsCommittee.request(subscription);
}

if (addedknownValidators) this.rebalanceRandomSubnets();
}

Expand Down Expand Up @@ -179,16 +168,29 @@ export class AttnetsService implements IAttnetsService {

/**
* Run per slot.
* - Subscribe to gossip subnets `${SLOTS_TO_SUBSCRIBE_IN_ADVANCE}` slots in advance
* - Unsubscribe from expired subnets
*/
private onSlot = (slot: Slot): void => {
private onSlot = (clockSlot: Slot): void => {
try {
for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) {
if (dutiedSlot === clockSlot + SLOTS_TO_SUBSCRIBE_IN_ADVANCE) {
// Trigger gossip subscription first, in batch
if (subnets.size > 0) {
this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee);
}
// Then, register the subscriptions
Array.from(subnets).map((subnet) => this.subscriptionsCommittee.request({subnet, toSlot: dutiedSlot}));
}
}

// For node >= 64 validators, we should consistently subscribe to all subnets
// it's important to check random subnets first
// See https://github.com/ChainSafe/lodestar/issues/4929
this.unsubscribeExpiredRandomSubnets(slot);
this.unsubscribeExpiredCommitteeSubnets(slot);
this.unsubscribeExpiredRandomSubnets(clockSlot);
this.unsubscribeExpiredCommitteeSubnets(clockSlot);
} catch (e) {
this.logger.error("Error on AttnetsService.onSlot", {slot}, e as Error);
this.logger.error("Error on AttnetsService.onSlot", {slot: clockSlot}, e as Error);
}
};

Expand Down
37 changes: 19 additions & 18 deletions packages/beacon-node/src/network/subnets/dllAttnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ enum SubnetSource {
longLived = "long_lived",
}

/** As monitored on goerli, we only need to subscribe 2 slots before aggregator dutied slot to get stable mesh peers */
const SLOTS_TO_SUBSCRIBE_IN_ADVANCE = 2;
dapplion marked this conversation as resolved.
Show resolved Hide resolved

/**
* Manage deleterministic long lived (DLL) subnets and short lived subnets.
* - PeerManager uses attnetsService to know which peers are required for duties and long lived subscriptions
Expand Down Expand Up @@ -103,29 +106,14 @@ export class DLLAttnetsService implements IAttnetsService {
* Called from the API when validator is a part of a committee.
*/
addCommitteeSubscriptions(subscriptions: CommitteeSubscription[]): void {
const subnetsToSubscribe: RequestedSubnet[] = [];

for (const {subnet, slot, isAggregator} of subscriptions) {
// the peer-manager heartbeat will help find the subnet
this.committeeSubnets.request({subnet, toSlot: slot + 1});
if (isAggregator) {
// need exact slot here
subnetsToSubscribe.push({subnet, toSlot: slot});
this.aggregatorSlotSubnet.getOrDefault(slot).add(subnet);
}
}

// Trigger gossip subscription first, in batch
if (subnetsToSubscribe.length > 0) {
this.subscribeToSubnets(
subnetsToSubscribe.map((sub) => sub.subnet),
SubnetSource.committee
);
}
// Then, register the subscriptions
for (const subscription of subnetsToSubscribe) {
this.shortLivedSubscriptions.request(subscription);
}
}

/**
Expand Down Expand Up @@ -167,12 +155,25 @@ export class DLLAttnetsService implements IAttnetsService {

/**
* Run per slot.
* - Subscribe to gossip subnets `${SLOTS_TO_SUBSCRIBE_IN_ADVANCE}` slots in advance
* - Unsubscribe from expired subnets
*/
private onSlot = (slot: Slot): void => {
private onSlot = (clockSlot: Slot): void => {
try {
this.unsubscribeExpiredCommitteeSubnets(slot);
for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) {
if (dutiedSlot === clockSlot + SLOTS_TO_SUBSCRIBE_IN_ADVANCE) {
// Trigger gossip subscription first, in batch
if (subnets.size > 0) {
this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee);
}
// Then, register the subscriptions
Array.from(subnets).map((subnet) => this.shortLivedSubscriptions.request({subnet, toSlot: dutiedSlot}));
}
}

this.unsubscribeExpiredCommitteeSubnets(clockSlot);
} catch (e) {
this.logger.error("Error on AttnetsService.onSlot", {slot}, e as Error);
this.logger.error("Error on AttnetsService.onSlot", {slot: clockSlot}, e as Error);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ describe("DLLAttnetsService", () => {
isAggregator: true,
};
service.addCommitteeSubscriptions([subscription]);
// it does not subscribe immediately
expect(gossipStub.subscribeTopic.callCount).to.be.equal(SUBNETS_PER_NODE);
sandbox.clock.tick(config.SECONDS_PER_SLOT * (subscription.slot - 2) * 1000);
// then subscribe 2 slots before dutied slot
expect(gossipStub.subscribeTopic.callCount).to.be.equal(SUBNETS_PER_NODE + 1);
// then unsubscribe after the expiration
sandbox.clock.tick(config.SECONDS_PER_SLOT * (subscription.slot + 1) * 1000);
Expand Down
Loading