diff --git a/eth/node.go b/eth/node.go index 801fa50..dfa45f0 100644 --- a/eth/node.go +++ b/eth/node.go @@ -58,6 +58,9 @@ type Node struct { connBeacon metric.Int64ObservableGauge connAge metric.Float64Histogram connMedianAge metric.Float64ObservableGauge + + // eventCallbacks contains a list of callbacks that are executed when an event is received + eventCallbacks []func(ctx context.Context, event *host.TraceEvent) } // NewNode initializes a new [Node] using the provided configuration. @@ -78,7 +81,6 @@ func NewNode(cfg *NodeConfig) (*Node, error) { var ds host.DataStream if cfg.AWSConfig != nil { - droppedTraces, err := cfg.Meter.Int64Counter("dropped_traces") if err != nil { return nil, fmt.Errorf("new dropped_traces counter: %w", err) @@ -106,9 +108,10 @@ func NewNode(cfg *NodeConfig) (*Node, error) { if err != nil { return nil, fmt.Errorf("new kinesis producer: %w", err) } - ds = p + + ds = host.NewKinesisDataStream(p) } else { - ds = host.NoopDataStream{} + ds = host.NewCallbackDataStream() } hostCfg := &host.Config{ @@ -199,15 +202,27 @@ func NewNode(cfg *NodeConfig) (*Node, error) { // finally, initialize hermes node n := &Node{ - cfg: cfg, - host: h, - ds: ds, - sup: suture.NewSimple("eth"), - reqResp: reqResp, - pubSub: pubSub, - pryClient: pryClient, - peerer: NewPeerer(h, pryClient), - disc: disc, + cfg: cfg, + host: h, + ds: ds, + sup: suture.NewSimple("eth"), + reqResp: reqResp, + pubSub: pubSub, + pryClient: pryClient, + peerer: NewPeerer(h, pryClient), + disc: disc, + eventCallbacks: []func(ctx context.Context, event *host.TraceEvent){}, + } + + if ds.Type() == host.DataStreamTypeCallback { + cbDs := ds.(*host.CallbackDataStream) + + cbDs.OnEvent(func(ctx context.Context, event *host.TraceEvent) { + for _, cb := range n.eventCallbacks { + cb(ctx, event) + } + }) + } // initialize custom prometheus metrics @@ -301,6 +316,11 @@ func (n *Node) initMetrics(cfg *NodeConfig) (err error) { return nil } +// OnEvent registers a callback that is executed when an event is received. +func (n *Node) OnEvent(cb func(ctx context.Context, event *host.TraceEvent)) { + n.eventCallbacks = append(n.eventCallbacks, cb) +} + // Start starts the listening process. func (n *Node) Start(ctx context.Context) error { defer logDeferErr(n.host.Close, "Failed closing libp2p host") @@ -473,45 +493,17 @@ func terminateSupervisorTreeOnErr(err error) error { // startDataStream starts the data stream and implements a graceful shutdown func (n *Node) startDataStream(ctx context.Context) (func(), error) { - dsCtx, dsCancel := context.WithCancel(context.Background()) + backgroundCtx := context.Background() go func() { - if err := n.ds.Start(dsCtx); err != nil { + if err := n.ds.Start(backgroundCtx); err != nil { slog.Warn("Failed to start data stream", tele.LogAttrError(err)) } }() cleanupFn := func() { - producer, ok := n.ds.(*gk.Producer) - if !ok { - dsCancel() - return - } - - slog.Info("Waiting for Kinesis producer to become idle", "timeout", "15s") - // wait until the producer is idle - timeoutCtx, timeoutCncl := context.WithTimeout(dsCtx, 15*time.Second) - if err := producer.WaitIdle(timeoutCtx); err != nil { - slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err)) - } - timeoutCncl() - - // stop the producer - dsCancel() - - slog.Info("Stopped Kinesis producer, waiting for shutdown", "timeout", "5s") - // wait until the producer has stopped - timeoutCtx, timeoutCncl = context.WithTimeout(dsCtx, 5*time.Second) - if err := producer.WaitStopped(timeoutCtx); err != nil { - slog.Info("Error waiting for producer to stop", tele.LogAttrError(err)) - } - timeoutCncl() - } - - producer, ok := n.ds.(*gk.Producer) - if !ok { - return cleanupFn, nil + n.ds.Stop(ctx) } - return cleanupFn, producer.WaitIdle(ctx) + return cleanupFn, nil } diff --git a/eth/pubsub.go b/eth/pubsub.go index ef9e270..dcf9ccc 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -152,7 +152,9 @@ func (n *Node) FilterIncomingSubscriptions(id peer.ID, subs []*pubsubpb.RPC_SubO } func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) error { - genericBlock, err := p.getSignedBeaconBlockForForkDigest(msg.Data) + now := time.Now() + + genericBlock, root, err := p.getSignedBeaconBlockForForkDigest(msg.Data) if err != nil { return err } @@ -160,7 +162,6 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err slot := genericBlock.GetSlot() ProposerIndex := genericBlock.GetProposerIndex() - now := time.Now() slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) evt := &host.TraceEvent{ @@ -175,6 +176,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err "Seq": msg.GetSeqno(), "ValIdx": ProposerIndex, "Slot": slot, + "Root": root, "TimeInSlot": now.Sub(slotStart).Seconds(), }, } @@ -187,6 +189,8 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err } func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + if msg == nil || msg.Topic == nil || *msg.Topic == "" { return fmt.Errorf("nil message or topic") } @@ -218,7 +222,6 @@ func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) err payload["AggregatePos"] = attestation.AggregationBits.BitIndices()[0] } - now := time.Now() evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), @@ -234,6 +237,8 @@ func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) err } func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + ap := ðtypes.SignedAggregateAttestationAndProof{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil { return fmt.Errorf("decode aggregate and proof message: %w", err) @@ -242,7 +247,7 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -268,6 +273,8 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag } func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + ve := ðtypes.VoluntaryExit{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil { return fmt.Errorf("decode voluntary exit message: %w", err) @@ -276,7 +283,7 @@ func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) err evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -296,6 +303,8 @@ func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) err } func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + as := ðtypes.AttesterSlashing{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil { return fmt.Errorf("decode attester slashing message: %w", err) @@ -304,7 +313,7 @@ func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub. evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -324,6 +333,8 @@ func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub. } func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + ps := ðtypes.ProposerSlashing{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil { return fmt.Errorf("decode proposer slashing message: %w", err) @@ -332,7 +343,7 @@ func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub. evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -356,6 +367,8 @@ func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub. } func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + cp := ðtypes.SignedContributionAndProof{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil { return fmt.Errorf("decode contribution and proof message: %w", err) @@ -364,7 +377,7 @@ func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pu evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -387,6 +400,8 @@ func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pu } func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + sc := ðtypes.SyncCommitteeMessage{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil { return fmt.Errorf("decode sync committee message: %w", err) @@ -395,7 +410,7 @@ func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Mes evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -417,6 +432,8 @@ func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Mes } func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + pb := ðtypes.BLSToExecutionChange{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil { return fmt.Errorf("decode bls to execution change message: %w", err) @@ -425,7 +442,7 @@ func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pub evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -446,6 +463,8 @@ func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pub } func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + switch p.cfg.ForkVersion { case DenebForkVersion: var blob ethtypes.BlobSidecar @@ -459,7 +478,6 @@ func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) err slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex() - now := time.Now() evt := &host.TraceEvent{ Type: "HANDLE_MESSAGE", PeerID: p.host.ID(), @@ -499,7 +517,7 @@ type GenericBeaconBlock interface { GetProposerIndex() primitives.ValidatorIndex } -func (p *PubSub) getSignedBeaconBlockForForkDigest(msgData []byte) (genericSbb GenericBeaconBlock, err error) { +func (p *PubSub) getSignedBeaconBlockForForkDigest(msgData []byte) (genericSbb GenericBeaconBlock, root [32]byte, err error) { // get the correct fork switch p.cfg.ForkVersion { @@ -507,48 +525,68 @@ func (p *PubSub) getSignedBeaconBlockForForkDigest(msgData []byte) (genericSbb G phase0Sbb := ethtypes.SignedBeaconBlock{} err = p.cfg.Encoder.DecodeGossip(msgData, &phase0Sbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding phase0 beacon block gossip message: %w", err) } genericSbb = phase0Sbb.GetBlock() - return genericSbb, err + root, err = phase0Sbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case AltairForkVersion: altairSbb := ethtypes.SignedBeaconBlockAltair{} err = p.cfg.Encoder.DecodeGossip(msgData, &altairSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding altair beacon block gossip message: %w", err) } genericSbb = altairSbb.GetBlock() - return genericSbb, err + root, err = altairSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case BellatrixForkVersion: BellatrixSbb := ethtypes.SignedBeaconBlockBellatrix{} err = p.cfg.Encoder.DecodeGossip(msgData, &BellatrixSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding bellatrix beacon block gossip message: %w", err) } genericSbb = BellatrixSbb.GetBlock() - return genericSbb, err + root, err = BellatrixSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case CapellaForkVersion: capellaSbb := ethtypes.SignedBeaconBlockCapella{} err = p.cfg.Encoder.DecodeGossip(msgData, &capellaSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding capella beacon block gossip message: %w", err) } genericSbb = capellaSbb.GetBlock() - return genericSbb, err + root, err = capellaSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case DenebForkVersion: denebSbb := ethtypes.SignedBeaconBlockDeneb{} err = p.cfg.Encoder.DecodeGossip(msgData, &denebSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding deneb beacon block gossip message: %w", err) } genericSbb = denebSbb.GetBlock() - return genericSbb, err + root, err = denebSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil default: - return genericSbb, fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) + return genericSbb, [32]byte{}, fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) } } diff --git a/eth/reqresp.go b/eth/reqresp.go index 638931a..04b0519 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -249,10 +249,12 @@ func (r *ReqResp) wrapStreamHandler(ctx context.Context, name string, handler Co traceType := "HANDLE_STREAM" + protocol := string(s.Protocol()) + // Usual protocol string: /eth2/beacon_chain/req/metadata/2/ssz_snappy - parts := strings.Split(string(s.Protocol()), "/") + parts := strings.Split(protocol, "/") if len(parts) > 4 { - traceType = "HANDLE_" + strings.ToUpper(parts[4]) + traceType = hermeshost.EventTypeFromBeaconChainProtocol(protocol) } commonData := map[string]any{ diff --git a/host/callback.go b/host/callback.go new file mode 100644 index 0000000..c2df4db --- /dev/null +++ b/host/callback.go @@ -0,0 +1,63 @@ +package host + +import "context" + +var _ DataStream = (*CallbackDataStream)(nil) + +// CallbackDataStream is a simple implementation of DataStream that holds a callback function. +// Users of CallbackDataStream should ensure that the callback function does not block, +// as blocking can delay or disrupt the processing of subsequent events. +type CallbackDataStream struct { + cb func(ctx context.Context, event *TraceEvent) + + stopped bool +} + +// NewCallbackDataStream creates a new instance of CallbackDataStream. +func NewCallbackDataStream() *CallbackDataStream { + return &CallbackDataStream{ + cb: func(ctx context.Context, event *TraceEvent) { + // no-op + }, + stopped: true, + } +} + +// Type returns the type of the data stream, which is DataStreamTypeCallback. +func (c *CallbackDataStream) Type() DataStreamType { + return DataStreamTypeCallback +} + +// OnEvent sets the callback function that will be called when an event is received. +func (c *CallbackDataStream) OnEvent(onRecord func(ctx context.Context, event *TraceEvent)) { + c.cb = onRecord +} + +// Start begins the data stream's operations. +func (c *CallbackDataStream) Start(ctx context.Context) error { + c.stopped = false + + return ctx.Err() +} + +// Stop ends the data stream's operation. +func (c *CallbackDataStream) Stop(ctx context.Context) error { + c.stopped = true + + return ctx.Err() +} + +// PutRecord sends an event to the callback if the stream has not been stopped. +func (c *CallbackDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { + if c.stopped { + return ctx.Err() + } + + if c.cb != nil && event != nil { + c.cb(ctx, event) + + return nil + } + + return ctx.Err() +} diff --git a/host/event_types.go b/host/event_types.go new file mode 100644 index 0000000..8fabecd --- /dev/null +++ b/host/event_types.go @@ -0,0 +1,14 @@ +package host + +import "strings" + +// EventTypeFromBeaconChainProtocol returns the event type for a given protocol string. +func EventTypeFromBeaconChainProtocol(protocol string) string { + // Usual protocol string: /eth2/beacon_chain/req/metadata/2/ssz_snappy + parts := strings.Split(protocol, "/") + if len(parts) > 4 { + return "HANDLE_" + strings.ToUpper(parts[4]) + } + + return "UNKNOWN" +} diff --git a/host/event_types_test.go b/host/event_types_test.go new file mode 100644 index 0000000..c62bba2 --- /dev/null +++ b/host/event_types_test.go @@ -0,0 +1,35 @@ +package host_test + +import ( + "testing" + + "github.com/probe-lab/hermes/host" +) + +func TestEventTypeFromBeaconChainProtocol(t *testing.T) { + tests := []struct { + name string + protocol string + expected string + }{ + { + name: "Valid protocol with metadata", + protocol: "/eth2/beacon_chain/req/metadata/2/ssz_snappy", + expected: "HANDLE_METADATA", + }, + { + name: "Invalid protocol", + protocol: "/invalid/protocol", + expected: "UNKNOWN", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := host.EventTypeFromBeaconChainProtocol(tt.protocol) + if result != tt.expected { + t.Errorf("EventTypeFromBeaconChainProtocol(%s) = %v, want %v", tt.protocol, result, tt.expected) + } + }) + } +} diff --git a/host/kinesis.go b/host/kinesis.go new file mode 100644 index 0000000..dad62f6 --- /dev/null +++ b/host/kinesis.go @@ -0,0 +1,82 @@ +package host + +import ( + "context" + "log/slog" + "time" + + gk "github.com/dennis-tra/go-kinesis" + "github.com/probe-lab/hermes/tele" +) + +type KinesisDataStream struct { + producer *gk.Producer + ctx context.Context + cancelFn context.CancelFunc +} + +var _ DataStream = (*KinesisDataStream)(nil) + +// NewKinesisDataStream creates a new instance of KinesisDataStream with a given producer. +func NewKinesisDataStream(p *gk.Producer) *KinesisDataStream { + return &KinesisDataStream{ + producer: p, + ctx: nil, + cancelFn: nil, + } +} + +// Type returns the type of the data stream +func (k *KinesisDataStream) Type() DataStreamType { + return DataStreamTypeKinesis +} + +// Start begins the data stream's operation. +func (k *KinesisDataStream) Start(ctx context.Context) error { + dsCtx, dsCancel := context.WithCancel(ctx) + + k.ctx = dsCtx + k.cancelFn = dsCancel + + if err := k.producer.Start(ctx); err != nil { + return err + } + + <-dsCtx.Done() + + return dsCtx.Err() +} + +// Stop ends the data stream. +func (k *KinesisDataStream) Stop(ctx context.Context) error { + // wait until the producer has stopped + timeoutCtx, timeoutCncl := context.WithTimeout(k.ctx, 15*time.Second) + if err := k.producer.WaitIdle(timeoutCtx); err != nil { + slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err)) + } + timeoutCncl() + // stop the producer + k.cancelFn() + + slog.Info("Stopped Kinesis producer, waiting for shutdown", "timeout", "5s") + // wait until the producer has stopped + timeoutCtx, timeoutCncl = context.WithTimeout(k.ctx, 5*time.Second) + if err := k.producer.WaitStopped(timeoutCtx); err != nil { + slog.Info("Error waiting for producer to stop", tele.LogAttrError(err)) + } + + timeoutCncl() + + return k.producer.WaitIdle(ctx) +} + +// PutRecord sends an event to the Kinesis data stream. +func (k *KinesisDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { + if event != nil { + kRecord := gk.Record(event) + + return k.producer.PutRecord(ctx, kRecord) + } + + return ctx.Err() +} diff --git a/host/producer.go b/host/producer.go index a92eb6f..8e6f755 100644 --- a/host/producer.go +++ b/host/producer.go @@ -2,24 +2,18 @@ package host import ( "context" - - gk "github.com/dennis-tra/go-kinesis" ) type DataStream interface { Start(ctx context.Context) error - PutRecord(ctx context.Context, record gk.Record) error + Stop(ctx context.Context) error + PutRecord(ctx context.Context, event *TraceEvent) error + Type() DataStreamType } -type NoopDataStream struct{} - -var _ DataStream = (*NoopDataStream)(nil) - -func (n NoopDataStream) Start(ctx context.Context) error { - <-ctx.Done() - return nil -} +type DataStreamType int -func (n NoopDataStream) PutRecord(ctx context.Context, record gk.Record) error { - return ctx.Err() -} +const ( + DataStreamTypeKinesis DataStreamType = iota + DataStreamTypeCallback +) diff --git a/host/rpc.go b/host/rpc.go index d4a7102..f8c1726 100644 --- a/host/rpc.go +++ b/host/rpc.go @@ -4,75 +4,75 @@ import ( "encoding/hex" "log/slog" - "github.com/libp2p/go-libp2p-pubsub/pb" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" "github.com/probe-lab/hermes/tele" ) -type rpcMeta struct { +type RpcMeta struct { PeerID peer.ID - Subscriptions []rpcMetaSub `json:"Subs,omitempty"` - Messages []rpcMetaMsg `json:"Msgs,omitempty"` - Control *rpcMetaControl `json:"Control,omitempty"` + Subscriptions []RpcMetaSub `json:"Subs,omitempty"` + Messages []RpcMetaMsg `json:"Msgs,omitempty"` + Control *RpcMetaControl `json:"Control,omitempty"` } -type rpcMetaSub struct { +type RpcMetaSub struct { Subscribe bool TopicID string } -type rpcMetaMsg struct { +type RpcMetaMsg struct { MsgID string `json:"MsgID,omitempty"` Topic string `json:"Topic,omitempty"` } -type rpcMetaControl struct { - IHave []rpcControlIHave `json:"IHave,omitempty"` - IWant []rpcControlIWant `json:"IWant,omitempty"` - Graft []rpcControlGraft `json:"Graft,omitempty"` - Prune []rpcControlPrune `json:"Prune,omitempty"` +type RpcMetaControl struct { + IHave []RpcControlIHave `json:"IHave,omitempty"` + IWant []RpcControlIWant `json:"IWant,omitempty"` + Graft []RpcControlGraft `json:"Graft,omitempty"` + Prune []RpcControlPrune `json:"Prune,omitempty"` } -type rpcControlIHave struct { +type RpcControlIHave struct { TopicID string MsgIDs []string } -type rpcControlIWant struct { +type RpcControlIWant struct { MsgIDs []string } -type rpcControlGraft struct { +type RpcControlGraft struct { TopicID string } -type rpcControlPrune struct { +type RpcControlPrune struct { TopicID string PeerIDs []peer.ID } -func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { - subs := make([]rpcMetaSub, len(meta.GetSubscription())) +func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *RpcMeta { + subs := make([]RpcMetaSub, len(meta.GetSubscription())) for i, subMeta := range meta.GetSubscription() { - subs[i] = rpcMetaSub{ + subs[i] = RpcMetaSub{ Subscribe: subMeta.GetSubscribe(), TopicID: subMeta.GetTopic(), } } - msgs := make([]rpcMetaMsg, len(meta.GetMessages())) + msgs := make([]RpcMetaMsg, len(meta.GetMessages())) for i, msg := range meta.GetMessages() { - msgs[i] = rpcMetaMsg{ + msgs[i] = RpcMetaMsg{ MsgID: hex.EncodeToString(msg.GetMessageID()), Topic: msg.GetTopic(), } } - controlMsg := &rpcMetaControl{ - IHave: make([]rpcControlIHave, len(meta.GetControl().GetIhave())), - IWant: make([]rpcControlIWant, len(meta.GetControl().GetIwant())), - Graft: make([]rpcControlGraft, len(meta.GetControl().GetGraft())), - Prune: make([]rpcControlPrune, len(meta.GetControl().GetPrune())), + controlMsg := &RpcMetaControl{ + IHave: make([]RpcControlIHave, len(meta.GetControl().GetIhave())), + IWant: make([]RpcControlIWant, len(meta.GetControl().GetIwant())), + Graft: make([]RpcControlGraft, len(meta.GetControl().GetGraft())), + Prune: make([]RpcControlPrune, len(meta.GetControl().GetPrune())), } for i, ihave := range meta.GetControl().GetIhave() { @@ -81,7 +81,7 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { msgIDs[j] = hex.EncodeToString(msgID) } - controlMsg.IHave[i] = rpcControlIHave{ + controlMsg.IHave[i] = RpcControlIHave{ TopicID: ihave.GetTopic(), MsgIDs: msgIDs, } @@ -93,13 +93,13 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { msgIDs[j] = hex.EncodeToString(msgID) } - controlMsg.IWant[i] = rpcControlIWant{ + controlMsg.IWant[i] = RpcControlIWant{ MsgIDs: msgIDs, } } for i, graft := range meta.GetControl().GetGraft() { - controlMsg.Graft[i] = rpcControlGraft{ + controlMsg.Graft[i] = RpcControlGraft{ TopicID: graft.GetTopic(), } } @@ -117,7 +117,7 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { peerIDs[j] = peerID } - controlMsg.Prune[i] = rpcControlPrune{ + controlMsg.Prune[i] = RpcControlPrune{ TopicID: prune.GetTopic(), PeerIDs: peerIDs, } @@ -132,7 +132,7 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { slog.Warn("Failed parsing peer ID", tele.LogAttrError(err)) } - return &rpcMeta{ + return &RpcMeta{ PeerID: pid, Subscriptions: subs, Messages: msgs,