Skip to content

Commit

Permalink
node: (cleanup) remove injectC in favor of using msgC directly
Browse files Browse the repository at this point in the history
  • Loading branch information
tbjump authored and tbjump committed Aug 25, 2023
1 parent 1d79769 commit 1d2e26c
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 82 deletions.
28 changes: 25 additions & 3 deletions node/pkg/adminrpc/adminserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
"github.com/holiman/uint256"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/slices"

"github.com/certusone/wormhole/node/pkg/db"
Expand All @@ -35,10 +37,18 @@ import (
sdktypes "github.com/cosmos/cosmos-sdk/types"
)

var (
vaaInjectionsTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "wormhole_vaa_injections_total",
Help: "Total number of injected VAA queued for broadcast",
})
)

type nodePrivilegedService struct {
nodev1.UnimplementedNodePrivilegedServiceServer
db *db.Database
injectC chan<- *vaa.VAA
injectC chan<- *common.MessagePublication
obsvReqSendC chan<- *gossipv1.ObservationRequest
logger *zap.Logger
signedInC chan<- *gossipv1.SignedVAAWithQuorum
Expand All @@ -52,7 +62,7 @@ type nodePrivilegedService struct {

func NewPrivService(
db *db.Database,
injectC chan<- *vaa.VAA,
injectC chan<- *common.MessagePublication,
obsvReqSendC chan<- *gossipv1.ObservationRequest,
logger *zap.Logger,
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
Expand Down Expand Up @@ -609,7 +619,19 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
zap.String("digest", digest.String()),
)

s.injectC <- v
vaaInjectionsTotal.Inc()

s.injectC <- &common.MessagePublication{
TxHash: ethcommon.Hash{},
Timestamp: v.Timestamp,
Nonce: v.Nonce,
Sequence: v.Sequence,
ConsistencyLevel: v.ConsistencyLevel,
EmitterChain: v.EmitterChain,
EmitterAddress: v.EmitterAddress,
Payload: v.Payload,
Unreliable: false,
}

digests[i] = digest.Bytes()
}
Expand Down
3 changes: 1 addition & 2 deletions node/pkg/node/adminServiceRunnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/certusone/wormhole/node/pkg/publicrpc"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"

ethcommon "github.com/ethereum/go-ethereum/common"
Expand All @@ -28,7 +27,7 @@ import (
func adminServiceRunnable(
logger *zap.Logger,
socketPath string,
injectC chan<- *vaa.VAA,
injectC chan<- *common.MessagePublication,
signedInC chan<- *gossipv1.SignedVAAWithQuorum,
obsvReqSendC chan<- *gossipv1.ObservationRequest,
db *db.Database,
Expand Down
4 changes: 0 additions & 4 deletions node/pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"

"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -67,8 +66,6 @@ type G struct {
obsvReqC channelPair[*gossipv1.ObservationRequest]
// Outbound observation requests
obsvReqSendC channelPair[*gossipv1.ObservationRequest]
// Injected VAAs (manually generated rather than created via observation)
injectC channelPair[*vaa.VAA]
// acctC is the channel where messages will be put after they reached quorum in the accountant.
acctC channelPair[*common.MessagePublication]
}
Expand Down Expand Up @@ -96,7 +93,6 @@ func (g *G) initializeBasic(logger *zap.Logger, rootCtxCancel context.CancelFunc
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestOutboundBufferSize)
g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)
g.injectC = makeChannelPair[*vaa.VAA](0)
g.acctC = makeChannelPair[*common.MessagePublication](accountant.MsgChannelCapacity)

// Guardian set state managed by processor
Expand Down
11 changes: 9 additions & 2 deletions node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
zap.Stringer("msgChainId", msg.EmitterChain),
zap.Stringer("watcherChainId", chainId),
)
} else if msg.EmitterAddress == vaa.GovernanceEmitter && msg.EmitterChain == vaa.GovernanceChain {
logger.Error(
"EMERGENCY: PLEASE REPORT THIS IMMEDIATELY! A Solana message was emitted from the governance emitter. This should never be possible.",
zap.Stringer("emitter_chain", msg.EmitterChain),
zap.Stringer("emitter_address", msg.EmitterAddress),
zap.Uint32("nonce", msg.Nonce),
zap.Stringer("txhash", msg.TxHash),
zap.Time("timestamp", msg.Timestamp))
} else {
g.msgC.writeC <- msg
}
Expand Down Expand Up @@ -379,7 +387,7 @@ func GuardianOptionAdminService(socketPath string, ethRpc *string, ethContract *
adminService, err := adminServiceRunnable(
logger,
socketPath,
g.injectC.writeC,
g.msgC.writeC,
g.signedInC.writeC,
g.obsvReqSendC.writeC,
g.db,
Expand Down Expand Up @@ -482,7 +490,6 @@ func GuardianOptionProcessor() *GuardianOption {
g.gossipSendC,
g.obsvC,
g.obsvReqSendC.writeC,
g.injectC.readC,
g.signedInC.readC,
g.gk,
g.gst,
Expand Down
46 changes: 0 additions & 46 deletions node/pkg/processor/injection.go

This file was deleted.

16 changes: 1 addition & 15 deletions node/pkg/processor/message.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package processor

import (
"context"
"encoding/hex"

"github.com/mr-tron/base58"
Expand All @@ -14,7 +13,6 @@ import (

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)

Expand All @@ -39,7 +37,7 @@ var (

// handleMessage processes a message received from a chain and instantiates our deterministic copy of the VAA. An
// event may be received multiple times and must be handled in an idempotent fashion.
func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublication) {
func (p *Processor) handleMessage(k *common.MessagePublication) {
if p.gs == nil {
p.logger.Warn("dropping observation since we haven't initialized our guardian set yet",
zap.Stringer("emitter_chain", k.EmitterChain),
Expand Down Expand Up @@ -82,18 +80,6 @@ func (p *Processor) handleMessage(ctx context.Context, k *common.MessagePublicat
Unreliable: k.Unreliable,
}

// A governance message should never be emitted on-chain
if v.EmitterAddress == vaa.GovernanceEmitter && v.EmitterChain == vaa.GovernanceChain {
supervisor.Logger(ctx).Error(
"EMERGENCY: PLEASE REPORT THIS IMMEDIATELY! A Solana message was emitted from the governance emitter. This should never be possible.",
zap.Stringer("emitter_chain", k.EmitterChain),
zap.Stringer("emitter_address", k.EmitterAddress),
zap.Uint32("nonce", k.Nonce),
zap.Stringer("txhash", k.TxHash),
zap.Time("timestamp", k.Timestamp))
return
}

// Generate digest of the unsigned VAA.
digest := v.SigningDigest()

Expand Down
13 changes: 3 additions & 10 deletions node/pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ type Processor struct {
// signedInC is a channel of inbound signed VAA observations from p2p
signedInC <-chan *gossipv1.SignedVAAWithQuorum

// injectC is a channel of VAAs injected locally.
injectC <-chan *vaa.VAA

// gk is the node's guardian private key
gk *ecdsa.PrivateKey

Expand Down Expand Up @@ -158,7 +155,6 @@ func NewProcessor(
gossipSendC chan<- []byte,
obsvC chan *common.MsgWithTimeStamp[gossipv1.SignedObservation],
obsvReqSendC chan<- *gossipv1.ObservationRequest,
injectC <-chan *vaa.VAA,
signedInC <-chan *gossipv1.SignedVAAWithQuorum,
gk *ecdsa.PrivateKey,
gst *common.GuardianSetState,
Expand All @@ -176,7 +172,6 @@ func NewProcessor(
obsvC: obsvC,
obsvReqSendC: obsvReqSendC,
signedInC: signedInC,
injectC: injectC,
gk: gk,
gst: gst,
db: db,
Expand Down Expand Up @@ -237,7 +232,7 @@ func (p *Processor) Run(ctx context.Context) error {
continue
}
}
p.handleMessage(ctx, k)
p.handleMessage(k)

case k := <-p.acctReadC:
if p.acct == nil {
Expand All @@ -247,9 +242,7 @@ func (p *Processor) Run(ctx context.Context) error {
if !p.acct.IsMessageCoveredByAccountant(k) {
return fmt.Errorf("accountant published a message that is not covered by it: `%s`", k.MessageIDString())
}
p.handleMessage(ctx, k)
case v := <-p.injectC:
p.handleInjection(ctx, v)
p.handleMessage(k)
case m := <-p.obsvC:
observationChanDelay.Observe(float64(time.Since(m.Timestamp).Microseconds()))
p.handleObservation(ctx, m)
Expand Down Expand Up @@ -280,7 +273,7 @@ func (p *Processor) Run(ctx context.Context) error {
continue
}
}
p.handleMessage(ctx, k)
p.handleMessage(k)
}
}
}
Expand Down

0 comments on commit 1d2e26c

Please sign in to comment.