Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Warp Integration #856

Merged
merged 29 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 5 additions & 153 deletions chain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/ava-labs/avalanchego/snow/choices"
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
"github.com/ava-labs/avalanchego/x/merkledb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -51,8 +49,7 @@ type StatefulBlock struct {
// or [Verify], which reduces the amount of time we are
// blocking the consensus engine from voting on the block,
// starting the verification of another block, etc.
StateRoot ids.ID `json:"stateRoot"`
WarpResults set.Bits64 `json:"warpResults"`
StateRoot ids.ID `json:"stateRoot"`

size int

Expand All @@ -73,16 +70,6 @@ func (b *StatefulBlock) ID() (ids.ID, error) {
return utils.ToID(blk), nil
}

// warpJob is used to signal to a listner that a *warp.Message has been
// verified.
type warpJob struct {
msg *warp.Message
signers int
verifiedChan chan bool
verified bool
warpNum int
}

func NewGenesisBlock(root ids.ID) *StatefulBlock {
return &StatefulBlock{
// We set the genesis block timestamp to be after the ProposerVM fork activation.
Expand Down Expand Up @@ -112,11 +99,6 @@ type StatelessBlock struct {
bytes []byte
txsSet set.Set[ids.ID]

warpMessages map[ids.ID]*warpJob
containsWarp bool // this allows us to avoid allocating a map when we build
bctx *block.Context
vdrState validators.State

results []*Result
feeManager *fees.Manager

Expand Down Expand Up @@ -179,7 +161,6 @@ func (b *StatelessBlock) populateTxs(ctx context.Context) error {
// Confirm no transaction duplicates and setup
// AWM processing
b.txsSet = set.NewSet[ids.ID](len(b.Txs))
b.warpMessages = map[ids.ID]*warpJob{}
for _, tx := range b.Txs {
// Ensure there are no duplicate transactions
if b.txsSet.Contains(tx.ID()) {
Expand All @@ -195,29 +176,6 @@ func (b *StatelessBlock) populateTxs(ctx context.Context) error {
}
batchVerifier.Add(txDigest, tx.Auth)
}

// Check if we need the block context to verify the block (which contains
// an Avalanche Warp Message)
//
// Instead of erroring out if a warp message is invalid, we mark the
// verification as skipped and include it in the verification result so
// that a fee can still be deducted.
if tx.WarpMessage != nil {
if len(b.warpMessages) == MaxWarpMessages {
return ErrTooManyWarpMessages
}
signers, err := tx.WarpMessage.Signature.NumSigners()
if err != nil {
return err
}
b.warpMessages[tx.ID()] = &warpJob{
msg: tx.WarpMessage,
signers: signers,
verifiedChan: make(chan bool, 1),
warpNum: len(b.warpMessages),
}
b.containsWarp = true
}
}
return nil
}
Expand Down Expand Up @@ -287,9 +245,6 @@ func (b *StatelessBlock) initializeBuilt(
b.txsSet = set.NewSet[ids.ID](len(b.Txs))
for _, tx := range b.Txs {
b.txsSet.Add(tx.ID())
if tx.WarpMessage != nil {
b.containsWarp = true
}
}
return nil
}
Expand All @@ -298,12 +253,12 @@ func (b *StatelessBlock) initializeBuilt(
func (b *StatelessBlock) ID() ids.ID { return b.id }

// implements "block.WithVerifyContext"
func (b *StatelessBlock) ShouldVerifyWithContext(context.Context) (bool, error) {
return b.containsWarp, nil
func (*StatelessBlock) ShouldVerifyWithContext(context.Context) (bool, error) {
return false, nil
}

// implements "block.WithVerifyContext"
func (b *StatelessBlock) VerifyWithContext(ctx context.Context, bctx *block.Context) error {
func (b *StatelessBlock) VerifyWithContext(ctx context.Context, _ *block.Context) error {
wlawt marked this conversation as resolved.
Show resolved Hide resolved
start := time.Now()
defer func() {
b.vm.RecordBlockVerify(time.Since(start))
Expand All @@ -316,15 +271,11 @@ func (b *StatelessBlock) VerifyWithContext(ctx context.Context, bctx *block.Cont
attribute.Int("txs", len(b.Txs)),
attribute.Int64("height", int64(b.Hght)),
attribute.Bool("stateReady", stateReady),
attribute.Int64("pchainHeight", int64(bctx.PChainHeight)),
attribute.Bool("built", b.Processed()),
),
)
defer span.End()

// Persist the context in case we need it during Accept
b.bctx = bctx

// Proceed with normal verification
return b.verify(ctx, stateReady)
}
Expand Down Expand Up @@ -407,33 +358,6 @@ func (b *StatelessBlock) verify(ctx context.Context, stateReady bool) error {
return nil
}

// verifyWarpMessage will attempt to verify a given warp message provided by an
// Action.
func (b *StatelessBlock) verifyWarpMessage(ctx context.Context, r Rules, msg *warp.Message) bool {
// We do not check the validity of [SourceChainID] because a VM could send
// itself a message to trigger a chain upgrade.
allowed, num, denom := r.GetWarpConfig(msg.SourceChainID)
if !allowed {
b.vm.Logger().
Warn("unable to verify warp message", zap.Stringer("warpID", msg.ID()), zap.Error(ErrDisabledChainID))
return false
}
if err := msg.Signature.Verify(
ctx,
&msg.UnsignedMessage,
r.NetworkID(),
b.vdrState,
b.bctx.PChainHeight,
num,
denom,
); err != nil {
b.vm.Logger().
Warn("unable to verify warp message", zap.Stringer("warpID", msg.ID()), zap.Error(err))
return false
}
return true
}

// innerVerify executes the block on top of the provided [VerifyContext].
//
// Invariants:
Expand Down Expand Up @@ -515,64 +439,11 @@ func (b *StatelessBlock) innerVerify(ctx context.Context, vctx VerifyContext) er
}
}

// Start validating warp messages, if they exist
var invalidWarpResult bool
if b.containsWarp {
if b.bctx == nil {
log.Error(
"missing verify block context",
zap.Uint64("height", b.Hght),
zap.Stringer("id", b.ID()),
)
return ErrMissingBlockContext
}
_, warpVerifySpan := b.vm.Tracer().Start(ctx, "StatelessBlock.verifyWarpMessages") //nolint:spancheck
b.vdrState = b.vm.ValidatorState()
go func() {
defer warpVerifySpan.End()
// We don't use [b.vm.Workers] here because we need the warp verification
// results during normal execution. If we added a job to the workers queue,
// it would get executed after all signatures. Additionally, BLS
// Multi-Signature verification is already parallelized so we should just
// do one at a time to avoid overwhelming the CPU.
for txID, msg := range b.warpMessages {
if ctx.Err() != nil {
return
}
blockVerified := b.WarpResults.Contains(uint(msg.warpNum))
if b.vm.IsBootstrapped() && !invalidWarpResult {
start := time.Now()
verified := b.verifyWarpMessage(ctx, r, msg.msg)
msg.verifiedChan <- verified
msg.verified = verified
log.Info(
"processed warp message",
zap.Stringer("txID", txID),
zap.Bool("verified", verified),
zap.Int("signers", msg.signers),
zap.Duration("t", time.Since(start)),
)
if blockVerified != verified {
invalidWarpResult = true
}
} else {
// When we are bootstrapping, we just use the result in the block.
//
// We also use the result in the block when we have found
// a verification mismatch (our verify result is different than the
// block) to avoid doing extra work.
msg.verifiedChan <- blockVerified
msg.verified = blockVerified
}
}
}()
}

// Compute next unit prices to use
feeKey := FeeKey(b.vm.StateManager().FeeKey())
feeRaw, err := parentView.GetValue(ctx, feeKey)
if err != nil {
return err //nolint:spancheck
return err
}
parentFeeManager := fees.NewManager(feeRaw)
feeManager, err := parentFeeManager.ComputeNext(parentTimestamp, b.Tmstmp, r)
Expand All @@ -589,23 +460,6 @@ func (b *StatelessBlock) innerVerify(ctx context.Context, vctx VerifyContext) er
b.results = results
b.feeManager = feeManager

// Ensure warp results are correct
if invalidWarpResult {
return ErrWarpResultMismatch
}
numWarp := len(b.warpMessages)
if numWarp > MaxWarpMessages {
return ErrTooManyWarpMessages
}
var warpResultsLimit set.Bits64
warpResultsLimit.Add(uint(numWarp))
if b.WarpResults >= warpResultsLimit {
// If the value of [WarpResults] is greater than the value of uint64 with
// a 1-bit shifted [numWarp] times, then there are unused bits set to
// 1 (which should is not allowed).
return ErrWarpResultMismatch
}

// Update chain metadata
heightKeyStr := string(heightKey)
timestampKeyStr := string(timestampKey)
Expand Down Expand Up @@ -999,7 +853,6 @@ func (b *StatefulBlock) Marshal() ([]byte, error) {
}

p.PackID(b.StateRoot)
p.PackUint64(uint64(b.WarpResults))
bytes := p.Bytes()
if err := p.Err(); err != nil {
return nil, err
Expand Down Expand Up @@ -1034,7 +887,6 @@ func UnmarshalBlock(raw []byte, parser Parser) (*StatefulBlock, error) {
}

p.UnpackID(false, &b.StateRoot)
b.WarpResults = set.Bits64(p.UnpackUint64(false))

// Ensure no leftover bytes
if !p.Empty() {
Expand Down
54 changes: 1 addition & 53 deletions chain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,11 @@ func BuildBlock(
cache = map[string]*fetchData{}

blockLock sync.RWMutex
warpAdded = uint(0)
start = time.Now()
txsAttempted = 0
results = []*Result{}

vdrState = vm.ValidatorState()
sm = vm.StateManager()
sm = vm.StateManager()

// prepareStreamLock ensures we don't overwrite stream prefetching spawned
// asynchronously.
Expand Down Expand Up @@ -174,18 +172,6 @@ func BuildBlock(
continue
}

// Ensure we can process if transaction includes a warp message
if tx.WarpMessage != nil && blockContext == nil {
log.Debug(
"dropping pending warp message because no context provided",
zap.Stringer("txID", tx.ID()),
)
restorableLock.Lock()
restorable = append(restorable, tx)
restorableLock.Unlock()
continue
}

stateKeys, err := tx.StateKeys(sm)
if err != nil {
// Drop bad transaction and continue
Expand Down Expand Up @@ -287,35 +273,6 @@ func BuildBlock(
return nil
}

// Verify warp message, if it exists
//
// We don't drop invalid warp messages because we must collect fees for
// the work the sender made us do (otherwise this would be a DoS).
//
// We wait as long as possible to verify the signature to ensure we don't
// spend unnecessary time on an invalid tx.
var warpErr error
if tx.WarpMessage != nil {
// We do not check the validity of [SourceChainID] because a VM could send
// itself a message to trigger a chain upgrade.
allowed, num, denom := r.GetWarpConfig(tx.WarpMessage.SourceChainID)
if allowed {
warpErr = tx.WarpMessage.Signature.Verify(
ctx, &tx.WarpMessage.UnsignedMessage, r.NetworkID(),
vdrState, blockContext.PChainHeight, num, denom,
)
} else {
warpErr = ErrDisabledChainID
}
if warpErr != nil {
log.Warn(
"warp verification failed",
zap.Stringer("txID", tx.ID()),
zap.Error(warpErr),
)
}
}

// If execution works, keep moving forward with new state
//
// Note, these calculations must match block verification exactly
Expand Down Expand Up @@ -347,7 +304,6 @@ func BuildBlock(
r,
tsv,
nextTime,
tx.WarpMessage != nil && warpErr == nil,
)
if err != nil {
// Returning an error here should be avoided at all costs (can be a DoS). Rather,
Expand All @@ -357,7 +313,6 @@ func BuildBlock(
return err
}

// Need to atomically check there aren't too many warp messages and add to block
blockLock.Lock()
defer blockLock.Unlock()

Expand Down Expand Up @@ -385,13 +340,6 @@ func BuildBlock(
tsv.Commit()
b.Txs = append(b.Txs, tx)
results = append(results, result)
if tx.WarpMessage != nil {
if warpErr == nil {
// Add a bit if the warp message was verified
b.WarpResults.Add(warpAdded)
}
warpAdded++
}
return nil
})
}
Expand Down
Loading
Loading