From 12cdc28e4ff26988538c9d7c7d0ca43815dadf04 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 10 Apr 2024 13:28:44 +1000 Subject: [PATCH 01/14] feat: Add Callback data stream --- eth/node.go | 80 ++++++++++++++++++++------------------------ eth/pubsub.go | 2 +- eth/reqresp.go | 8 ++--- host/callback.go | 63 ++++++++++++++++++++++++++++++++++ host/flush_tracer.go | 2 +- host/host.go | 8 ++--- host/kinesis.go | 79 +++++++++++++++++++++++++++++++++++++++++++ host/producer.go | 22 +++++------- 8 files changed, 196 insertions(+), 68 deletions(-) create mode 100644 host/callback.go create mode 100644 host/kinesis.go diff --git a/eth/node.go b/eth/node.go index e3ce34a..724336b 100644 --- a/eth/node.go +++ b/eth/node.go @@ -59,6 +59,9 @@ type Node struct { connBeacon metric.Int64ObservableGauge connAge metric.Float64Histogram connMedianAge metric.Float64ObservableGauge + + // eventCallbacks contains a list of callbacks that are executed when an event is received + eventCallbacks []func(ctx context.Context, event *host.TraceEvent) } // NewNode initializes a new [Node] using the provided configuration. @@ -76,7 +79,6 @@ func NewNode(cfg *NodeConfig) (*Node, error) { var ds host.DataStream if cfg.AWSConfig != nil { - droppedTraces, err := cfg.Meter.Int64Counter("dropped_traces") if err != nil { return nil, fmt.Errorf("new dropped_traces counter: %w", err) @@ -104,9 +106,10 @@ func NewNode(cfg *NodeConfig) (*Node, error) { if err != nil { return nil, fmt.Errorf("new kinesis producer: %w", err) } - ds = p + + ds = host.NewKinesisDataStream(p) } else { - ds = host.NoopDataStream{} + ds = host.NewCallbackDataStream() } hostCfg := &host.Config{ @@ -185,15 +188,27 @@ func NewNode(cfg *NodeConfig) (*Node, error) { // finally, initialize hermes node n := &Node{ - cfg: cfg, - host: h, - ds: ds, - sup: suture.NewSimple("eth"), - reqResp: reqResp, - pubSub: pubSub, - pryClient: pryClient, - peerer: NewPeerer(h, pryClient), - disc: disc, + cfg: cfg, + host: h, + ds: ds, + sup: suture.NewSimple("eth"), + reqResp: reqResp, + pubSub: pubSub, + pryClient: pryClient, + peerer: NewPeerer(h, pryClient), + disc: disc, + eventCallbacks: []func(ctx context.Context, event *host.TraceEvent){}, + } + + if ds.Type() == host.DataStreamTypeCallback { + cbDs := ds.(*host.CallbackDataStream) + + cbDs.OnEvent(func(ctx context.Context, event *host.TraceEvent) { + for _, cb := range n.eventCallbacks { + cb(ctx, event) + } + }) + } // initialize custom prometheus metrics @@ -287,6 +302,11 @@ func (n *Node) initMetrics(cfg *NodeConfig) (err error) { return nil } +// OnEvent registers a callback that is executed when an event is received. +func (n *Node) OnEvent(cb func(ctx context.Context, event *host.TraceEvent)) { + n.eventCallbacks = append(n.eventCallbacks, cb) +} + // Start starts the listening process. func (n *Node) Start(ctx context.Context) error { defer logDeferErr(n.host.Close, "Failed closing libp2p host") @@ -453,45 +473,17 @@ func terminateSupervisorTreeOnErr(err error) error { // startDataStream starts the data stream and implements a graceful shutdown func (n *Node) startDataStream(ctx context.Context) (func(), error) { - dsCtx, dsCancel := context.WithCancel(context.Background()) + backgroundCtx := context.Background() go func() { - if err := n.ds.Start(dsCtx); err != nil { + if err := n.ds.Start(backgroundCtx); err != nil { slog.Warn("Failed to start data stream", tele.LogAttrError(err)) } }() cleanupFn := func() { - producer, ok := n.ds.(*gk.Producer) - if !ok { - dsCancel() - return - } - - slog.Info("Waiting for Kinesis producer to become idle", "timeout", "15s") - // wait until the producer is idle - timeoutCtx, timeoutCncl := context.WithTimeout(dsCtx, 15*time.Second) - if err := producer.WaitIdle(timeoutCtx); err != nil { - slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err)) - } - timeoutCncl() - - // stop the producer - dsCancel() - - slog.Info("Stopped Kinesis producer, waiting for shutdown", "timeout", "5s") - // wait until the producer has stopped - timeoutCtx, timeoutCncl = context.WithTimeout(dsCtx, 5*time.Second) - if err := producer.WaitStopped(timeoutCtx); err != nil { - slog.Info("Error waiting for producer to stop", tele.LogAttrError(err)) - } - timeoutCncl() - } - - producer, ok := n.ds.(*gk.Producer) - if !ok { - return cleanupFn, nil + n.ds.Stop(ctx) } - return cleanupFn, producer.WaitIdle(ctx) + return cleanupFn, nil } diff --git a/eth/pubsub.go b/eth/pubsub.go index 5be46d4..fb283d0 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -156,7 +156,7 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag }, } - if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } diff --git a/eth/reqresp.go b/eth/reqresp.go index e775a4f..92892aa 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -274,7 +274,7 @@ func (r *ReqResp) wrapStreamHandler(ctx context.Context, name string, handler Co Payload: commonData, } - if err := r.cfg.DataStream.PutRecord(ctx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutEvent(ctx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -571,7 +571,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e } traceCtx := context.Background() - if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -627,7 +627,7 @@ func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { }, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -689,7 +689,7 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV Payload: reqData, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } diff --git a/host/callback.go b/host/callback.go new file mode 100644 index 0000000..9e1cdd4 --- /dev/null +++ b/host/callback.go @@ -0,0 +1,63 @@ +package host + +import "context" + +var _ DataStream = (*CallbackDataStream)(nil) + +// CallbackDataStream is a simple implementation of DataStream that holds a callback function. +// Users of CallbackDataStream should ensure that the callback function does not block, +// as blocking can delay or disrupt the processing of subsequent events. +type CallbackDataStream struct { + cb func(ctx context.Context, event *TraceEvent) + + stopped bool +} + +// NewCallbackDataStream creates a new instance of CallbackDataStream. +func NewCallbackDataStream() *CallbackDataStream { + return &CallbackDataStream{ + cb: func(ctx context.Context, event *TraceEvent) { + // no-op + }, + stopped: true, + } +} + +// Type returns the type of the data stream, which is DataStreamTypeCallback. +func (c *CallbackDataStream) Type() DataStreamType { + return DataStreamTypeCallback +} + +// OnEvent sets the callback function that will be called when an event is received. +func (c *CallbackDataStream) OnEvent(onRecord func(ctx context.Context, event *TraceEvent)) { + c.cb = onRecord +} + +// Start begins the data stream's operations. +func (c *CallbackDataStream) Start(ctx context.Context) error { + c.stopped = false + + return ctx.Err() +} + +// Stop ends the data stream's operation. +func (c *CallbackDataStream) Stop(ctx context.Context) error { + c.stopped = true + + return ctx.Err() +} + +// PutEvent sends an event to the callback if the stream has not been stopped. +func (c *CallbackDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { + if c.stopped { + return ctx.Err() + } + + if c.cb != nil && event != nil { + c.cb(ctx, event) + + return nil + } + + return ctx.Err() +} diff --git a/host/flush_tracer.go b/host/flush_tracer.go index de87c2b..bb8f110 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -63,7 +63,7 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() - if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { slog.Warn("Failed to put trace event payload", tele.LogAttrError(err)) return } diff --git a/host/host.go b/host/host.go index fa4aa45..855c066 100644 --- a/host/host.go +++ b/host/host.go @@ -9,9 +9,9 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/golang-lru/v2" + lru "github.com/hashicorp/golang-lru/v2" "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p-pubsub" + pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -94,7 +94,7 @@ func (h *Host) Serve(ctx context.Context) error { }, } - if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { slog.Warn("Failed to put event payload", tele.LogAttrError(err)) return } @@ -275,7 +275,7 @@ func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { }, } - if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } diff --git a/host/kinesis.go b/host/kinesis.go new file mode 100644 index 0000000..a9ce570 --- /dev/null +++ b/host/kinesis.go @@ -0,0 +1,79 @@ +package host + +import ( + "context" + "log/slog" + "time" + + gk "github.com/dennis-tra/go-kinesis" + "github.com/probe-lab/hermes/tele" +) + +type KinesisDataStream struct { + producer *gk.Producer + ctx context.Context + cancelFn context.CancelFunc +} + +var _ DataStream = (*KinesisDataStream)(nil) + +// NewKinesisDataStream creates a new instance of KinesisDataStream with a given producer. +func NewKinesisDataStream(p *gk.Producer) *KinesisDataStream { + return &KinesisDataStream{ + producer: p, + ctx: nil, + cancelFn: nil, + } +} + +// Type returns the type of the data stream +func (k *KinesisDataStream) Type() DataStreamType { + return DataStreamTypeKinesis +} + +// Start begins the data stream's operation. +func (k *KinesisDataStream) Start(ctx context.Context) error { + dsCtx, dsCancel := context.WithCancel(ctx) + + k.ctx = dsCtx + + k.cancelFn = dsCancel + + <-dsCtx.Done() + + return dsCtx.Err() +} + +// Stop ends the data stream. +func (k *KinesisDataStream) Stop(ctx context.Context) error { + // wait until the producer has stopped + timeoutCtx, timeoutCncl := context.WithTimeout(k.ctx, 15*time.Second) + if err := k.producer.WaitIdle(timeoutCtx); err != nil { + slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err)) + } + + // stop the producer + k.cancelFn() + + slog.Info("Stopped Kinesis producer, waiting for shutdown", "timeout", "5s") + // wait until the producer has stopped + timeoutCtx, timeoutCncl = context.WithTimeout(k.ctx, 5*time.Second) + if err := k.producer.WaitStopped(timeoutCtx); err != nil { + slog.Info("Error waiting for producer to stop", tele.LogAttrError(err)) + } + + timeoutCncl() + + return k.producer.WaitIdle(ctx) +} + +// PutEvent sends an event to the Kinesis data stream. +func (k *KinesisDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { + if event != nil { + kRecord := gk.Record(event) + + return k.producer.PutRecord(ctx, kRecord) + } + + return ctx.Err() +} diff --git a/host/producer.go b/host/producer.go index a92eb6f..874c1eb 100644 --- a/host/producer.go +++ b/host/producer.go @@ -2,24 +2,18 @@ package host import ( "context" - - gk "github.com/dennis-tra/go-kinesis" ) type DataStream interface { Start(ctx context.Context) error - PutRecord(ctx context.Context, record gk.Record) error + Stop(ctx context.Context) error + PutEvent(ctx context.Context, event *TraceEvent) error + Type() DataStreamType } -type NoopDataStream struct{} - -var _ DataStream = (*NoopDataStream)(nil) - -func (n NoopDataStream) Start(ctx context.Context) error { - <-ctx.Done() - return nil -} +type DataStreamType int -func (n NoopDataStream) PutRecord(ctx context.Context, record gk.Record) error { - return ctx.Err() -} +const ( + DataStreamTypeKinesis DataStreamType = iota + DataStreamTypeCallback +) From 44f1c766e322ff1c220d4623a956a3947837355d Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 11 Apr 2024 17:15:33 +1000 Subject: [PATCH 02/14] feat: Add event_type --- eth/node.go | 4 +-- eth/pubsub.go | 2 +- eth/reqresp.go | 14 +++++----- host/flush_tracer.go | 38 +++++++++++++-------------- host/host.go | 6 ++--- host/rpc.go | 62 ++++++++++++++++++++++---------------------- 6 files changed, 64 insertions(+), 62 deletions(-) diff --git a/eth/node.go b/eth/node.go index 724336b..a9592f5 100644 --- a/eth/node.go +++ b/eth/node.go @@ -88,9 +88,9 @@ func NewNode(cfg *NodeConfig) (*Node, error) { DroppedRecordF: func(ctx context.Context, record gk.Record) { tevt, ok := record.(*host.TraceEvent) if !ok { - droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", "UNKNOWN"))) + droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", string(host.EventTypeUnknown)))) } else { - droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", tevt.Type))) + droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", string(tevt.Type)))) } slog.Warn("Dropped record", "partition_key", record.PartitionKey(), "size", len(record.Data())) }, diff --git a/eth/pubsub.go b/eth/pubsub.go index fb283d0..b39be68 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -141,7 +141,7 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag slotStart := p.cfg.GenesisTime.Add(time.Duration(blockSlot) * p.cfg.SecondsPerSlot) evt := &host.TraceEvent{ - Type: "HANDLE_MESSAGE", + Type: host.EventTypeHandleAggregateAndProof, PeerID: p.host.ID(), Timestamp: now, Payload: map[string]any{ diff --git a/eth/reqresp.go b/eth/reqresp.go index 92892aa..b861db8 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -245,12 +245,14 @@ func (r *ReqResp) wrapStreamHandler(ctx context.Context, name string, handler Co } end := time.Now() - traceType := "HANDLE_STREAM" + traceType := hermeshost.EventTypeHandleStream + + protocol := string(s.Protocol()) // Usual protocol string: /eth2/beacon_chain/req/metadata/2/ssz_snappy - parts := strings.Split(string(s.Protocol()), "/") + parts := strings.Split(protocol, "/") if len(parts) > 4 { - traceType = "HANDLE_" + strings.ToUpper(parts[4]) + traceType = hermeshost.EventTypeFromBeaconChainProtocol(protocol) } commonData := map[string]any{ @@ -564,7 +566,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e } traceEvt := &hermeshost.TraceEvent{ - Type: "REQUEST_STATUS", + Type: hermeshost.EventTypeRequestStatus, PeerID: r.host.ID(), Timestamp: time.Now(), Payload: reqData, @@ -619,7 +621,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { defer func() { traceEvt := &hermeshost.TraceEvent{ - Type: "REQUEST_PING", + Type: hermeshost.EventTypeRequestPing, PeerID: r.host.ID(), Timestamp: time.Now(), Payload: map[string]string{ @@ -683,7 +685,7 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV } traceEvt := &hermeshost.TraceEvent{ - Type: "REQUEST_METADATA", + Type: hermeshost.EventTypeRequestMetadata, PeerID: r.host.ID(), Timestamp: time.Now(), Payload: reqData, diff --git a/host/flush_tracer.go b/host/flush_tracer.go index bb8f110..7d5e85f 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -17,7 +17,7 @@ import ( ) type TraceEvent struct { - Type string + Type EventType PeerID peer.ID Timestamp time.Time Payload any `json:"Data"` // cannot use field "Data" because of gk.Record method @@ -48,11 +48,11 @@ var _ gk.Record = (*TraceEvent)(nil) var _ pubsub.RawTracer = (*Host)(nil) -func (h *Host) FlushTrace(evtType string, payload any) { +func (h *Host) FlushTrace(evtType EventType, payload any) { h.FlushTraceWithTimestamp(evtType, time.Now(), payload) } -func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payload any) { +func (h *Host) FlushTraceWithTimestamp(evtType EventType, timestamp time.Time, payload any) { evt := &TraceEvent{ Type: evtType, PeerID: h.ID(), @@ -72,46 +72,46 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl } func (h *Host) AddPeer(p peer.ID, proto protocol.ID) { - h.FlushTrace(pubsubpb.TraceEvent_ADD_PEER.String(), map[string]any{ + h.FlushTrace(EventTypeAddPeer, map[string]any{ "PeerID": p, "Protocol": proto, }) } func (h *Host) RemovePeer(p peer.ID) { - h.FlushTrace(pubsubpb.TraceEvent_REMOVE_PEER.String(), map[string]any{ + h.FlushTrace(EventTypeRemovePeer, map[string]any{ "PeerID": p, }) } func (h *Host) Join(topic string) { - h.FlushTrace(pubsubpb.TraceEvent_JOIN.String(), map[string]any{ + h.FlushTrace(EventTypeJoin, map[string]any{ "Topic": topic, }) } func (h *Host) Leave(topic string) { - h.FlushTrace(pubsubpb.TraceEvent_LEAVE.String(), map[string]any{ + h.FlushTrace(EventTypeLeave, map[string]any{ "Topic": topic, }) } func (h *Host) Graft(p peer.ID, topic string) { - h.FlushTrace(pubsubpb.TraceEvent_GRAFT.String(), map[string]any{ + h.FlushTrace(EventTypeGraft, map[string]any{ "PeerID": p, "Topic": topic, }) } func (h *Host) Prune(p peer.ID, topic string) { - h.FlushTrace(pubsubpb.TraceEvent_PRUNE.String(), map[string]any{ + h.FlushTrace(EventTypePrune, map[string]any{ "PeerID": p, "Topic": topic, }) } func (h *Host) ValidateMessage(msg *pubsub.Message) { - h.FlushTrace("VALIDATE_MESSAGE", map[string]any{ + h.FlushTrace(EventTypeValidateMessage, map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -122,7 +122,7 @@ func (h *Host) ValidateMessage(msg *pubsub.Message) { } func (h *Host) DeliverMessage(msg *pubsub.Message) { - h.FlushTrace(pubsubpb.TraceEvent_DELIVER_MESSAGE.String(), map[string]any{ + h.FlushTrace(EventTypeDeliverMessage, map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -133,7 +133,7 @@ func (h *Host) DeliverMessage(msg *pubsub.Message) { } func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { - h.FlushTrace(pubsubpb.TraceEvent_REJECT_MESSAGE.String(), map[string]any{ + h.FlushTrace(EventTypeRejectMessage, map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -145,7 +145,7 @@ func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { } func (h *Host) DuplicateMessage(msg *pubsub.Message) { - h.FlushTrace(pubsubpb.TraceEvent_DUPLICATE_MESSAGE.String(), map[string]any{ + h.FlushTrace(EventTypeDuplicateMessage, map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -156,7 +156,7 @@ func (h *Host) DuplicateMessage(msg *pubsub.Message) { } func (h *Host) ThrottlePeer(p peer.ID) { - h.FlushTrace("THROTTLE_PEER", map[string]any{ + h.FlushTrace(EventTypeThrottlePeer, map[string]any{ "PeerID": p, }) } @@ -174,7 +174,7 @@ func (h *Host) DropRPC(rpc *pubsub.RPC, p peer.ID) { } func (h *Host) UndeliverableMessage(msg *pubsub.Message) { - h.FlushTrace("UNDELIVERABLE_MESSAGE", map[string]any{ + h.FlushTrace(EventTypeUndeliverableMessage, map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -186,18 +186,18 @@ func (h *Host) Trace(evt *pubsubpb.TraceEvent) { ts := time.Unix(0, evt.GetTimestamp()) switch evt.GetType() { case pubsubpb.TraceEvent_PUBLISH_MESSAGE: - h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_PUBLISH_MESSAGE.String(), ts, map[string]any{ + h.FlushTraceWithTimestamp(EventTypePublishMessage, ts, map[string]any{ "MsgID": evt.GetPublishMessage().GetMessageID(), "Topic": evt.GetPublishMessage().GetTopic(), }) case pubsubpb.TraceEvent_RECV_RPC: payload := newRPCMeta(evt.GetRecvRPC().GetReceivedFrom(), evt.GetRecvRPC().GetMeta()) - h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_RECV_RPC.String(), ts, payload) + h.FlushTraceWithTimestamp(EventTypeRecvRPC, ts, payload) case pubsubpb.TraceEvent_SEND_RPC: payload := newRPCMeta(evt.GetSendRPC().GetSendTo(), evt.GetSendRPC().GetMeta()) - h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_SEND_RPC.String(), ts, payload) + h.FlushTraceWithTimestamp(EventTypeSendRPC, ts, payload) case pubsubpb.TraceEvent_DROP_RPC: payload := newRPCMeta(evt.GetDropRPC().GetSendTo(), evt.GetDropRPC().GetMeta()) - h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_DROP_RPC.String(), ts, payload) + h.FlushTraceWithTimestamp(EventTypeDropRPC, ts, payload) } } diff --git a/host/host.go b/host/host.go index 855c066..b299c20 100644 --- a/host/host.go +++ b/host/host.go @@ -72,7 +72,7 @@ func (h *Host) Serve(ctx context.Context) error { return fmt.Errorf("host started without gossip sub initialization: %w", suture.ErrTerminateSupervisorTree) } - eventHandler := func(n network.Network, c network.Conn, evtType string) { + eventHandler := func(n network.Network, c network.Conn, evtType EventType) { evt := &TraceEvent{ Type: evtType, PeerID: h.ID(), @@ -103,8 +103,8 @@ func (h *Host) Serve(ctx context.Context) error { } notifiee := &network.NotifyBundle{ - ConnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, "CONNECTED") }, - DisconnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, "DISCONNECTED") }, + ConnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, EventTypeConnected) }, + DisconnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, EventTypeDisconnected) }, } h.Host.Network().Notify(notifiee) diff --git a/host/rpc.go b/host/rpc.go index d4a7102..f8c1726 100644 --- a/host/rpc.go +++ b/host/rpc.go @@ -4,75 +4,75 @@ import ( "encoding/hex" "log/slog" - "github.com/libp2p/go-libp2p-pubsub/pb" + pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" "github.com/probe-lab/hermes/tele" ) -type rpcMeta struct { +type RpcMeta struct { PeerID peer.ID - Subscriptions []rpcMetaSub `json:"Subs,omitempty"` - Messages []rpcMetaMsg `json:"Msgs,omitempty"` - Control *rpcMetaControl `json:"Control,omitempty"` + Subscriptions []RpcMetaSub `json:"Subs,omitempty"` + Messages []RpcMetaMsg `json:"Msgs,omitempty"` + Control *RpcMetaControl `json:"Control,omitempty"` } -type rpcMetaSub struct { +type RpcMetaSub struct { Subscribe bool TopicID string } -type rpcMetaMsg struct { +type RpcMetaMsg struct { MsgID string `json:"MsgID,omitempty"` Topic string `json:"Topic,omitempty"` } -type rpcMetaControl struct { - IHave []rpcControlIHave `json:"IHave,omitempty"` - IWant []rpcControlIWant `json:"IWant,omitempty"` - Graft []rpcControlGraft `json:"Graft,omitempty"` - Prune []rpcControlPrune `json:"Prune,omitempty"` +type RpcMetaControl struct { + IHave []RpcControlIHave `json:"IHave,omitempty"` + IWant []RpcControlIWant `json:"IWant,omitempty"` + Graft []RpcControlGraft `json:"Graft,omitempty"` + Prune []RpcControlPrune `json:"Prune,omitempty"` } -type rpcControlIHave struct { +type RpcControlIHave struct { TopicID string MsgIDs []string } -type rpcControlIWant struct { +type RpcControlIWant struct { MsgIDs []string } -type rpcControlGraft struct { +type RpcControlGraft struct { TopicID string } -type rpcControlPrune struct { +type RpcControlPrune struct { TopicID string PeerIDs []peer.ID } -func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { - subs := make([]rpcMetaSub, len(meta.GetSubscription())) +func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *RpcMeta { + subs := make([]RpcMetaSub, len(meta.GetSubscription())) for i, subMeta := range meta.GetSubscription() { - subs[i] = rpcMetaSub{ + subs[i] = RpcMetaSub{ Subscribe: subMeta.GetSubscribe(), TopicID: subMeta.GetTopic(), } } - msgs := make([]rpcMetaMsg, len(meta.GetMessages())) + msgs := make([]RpcMetaMsg, len(meta.GetMessages())) for i, msg := range meta.GetMessages() { - msgs[i] = rpcMetaMsg{ + msgs[i] = RpcMetaMsg{ MsgID: hex.EncodeToString(msg.GetMessageID()), Topic: msg.GetTopic(), } } - controlMsg := &rpcMetaControl{ - IHave: make([]rpcControlIHave, len(meta.GetControl().GetIhave())), - IWant: make([]rpcControlIWant, len(meta.GetControl().GetIwant())), - Graft: make([]rpcControlGraft, len(meta.GetControl().GetGraft())), - Prune: make([]rpcControlPrune, len(meta.GetControl().GetPrune())), + controlMsg := &RpcMetaControl{ + IHave: make([]RpcControlIHave, len(meta.GetControl().GetIhave())), + IWant: make([]RpcControlIWant, len(meta.GetControl().GetIwant())), + Graft: make([]RpcControlGraft, len(meta.GetControl().GetGraft())), + Prune: make([]RpcControlPrune, len(meta.GetControl().GetPrune())), } for i, ihave := range meta.GetControl().GetIhave() { @@ -81,7 +81,7 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { msgIDs[j] = hex.EncodeToString(msgID) } - controlMsg.IHave[i] = rpcControlIHave{ + controlMsg.IHave[i] = RpcControlIHave{ TopicID: ihave.GetTopic(), MsgIDs: msgIDs, } @@ -93,13 +93,13 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { msgIDs[j] = hex.EncodeToString(msgID) } - controlMsg.IWant[i] = rpcControlIWant{ + controlMsg.IWant[i] = RpcControlIWant{ MsgIDs: msgIDs, } } for i, graft := range meta.GetControl().GetGraft() { - controlMsg.Graft[i] = rpcControlGraft{ + controlMsg.Graft[i] = RpcControlGraft{ TopicID: graft.GetTopic(), } } @@ -117,7 +117,7 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { peerIDs[j] = peerID } - controlMsg.Prune[i] = rpcControlPrune{ + controlMsg.Prune[i] = RpcControlPrune{ TopicID: prune.GetTopic(), PeerIDs: peerIDs, } @@ -132,7 +132,7 @@ func newRPCMeta(pidBytes []byte, meta *pubsub_pb.TraceEvent_RPCMeta) *rpcMeta { slog.Warn("Failed parsing peer ID", tele.LogAttrError(err)) } - return &rpcMeta{ + return &RpcMeta{ PeerID: pid, Subscriptions: subs, Messages: msgs, From e67fb0f894e031e0748be6c0773bc47273d24df0 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Thu, 11 Apr 2024 17:15:40 +1000 Subject: [PATCH 03/14] feat: Add event_type --- host/event_types.go | 99 ++++++++++++++++++++++++++++++++++++++++ host/event_types_test.go | 35 ++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 host/event_types.go create mode 100644 host/event_types_test.go diff --git a/host/event_types.go b/host/event_types.go new file mode 100644 index 0000000..f9d8ba8 --- /dev/null +++ b/host/event_types.go @@ -0,0 +1,99 @@ +package host + +import "strings" + +// EventType represents the type of an event. +type EventType string + +const ( + // General events + EventTypeUnknown EventType = "UNKNOWN" + + // P2P events + EventTypeConnected EventType = "CONNECTED" + EventTypeDisconnected EventType = "DISCONNECTED" + EventTypeAddPeer EventType = "ADD_PEER" + EventTypeRemovePeer EventType = "REMOVE_PEER" + EventTypePublishMessage EventType = "PUBLISH_MESSAGE" + EventTypeRejectMessage EventType = "REJECT_MESSAGE" + EventTypeDuplicateMessage EventType = "DUPLICATE_MESSAGE" + EventTypeDeliverMessage EventType = "DELIVER_MESSAGE" + EventTypeRecvRPC EventType = "RECV_RPC" + EventTypeSendRPC EventType = "SEND_RPC" + EventTypeDropRPC EventType = "DROP_RPC" + EventTypeJoin EventType = "JOIN" + EventTypeLeave EventType = "LEAVE" + EventTypeGraft EventType = "GRAFT" + EventTypePrune EventType = "PRUNE" + EventTypeValidateMessage EventType = "VALIDATE_MESSAGE" + EventTypeThrottlePeer EventType = "THROTTLE_PEER" + EventTypeUndeliverableMessage EventType = "UNDELIVERABLE_MESSAGE" + + // HANDLE_ events + EventTypeHandleMessage EventType = "HANDLE_MESSAGE" + EventTypeHandleStream EventType = "HANDLE_STREAM" + EventTypeHandleStatus EventType = "HANDLE_STATUS" + EventTypeHandleMetadata EventType = "HANDLE_METADATA" + EventTypeHandleAggregateAndProof EventType = "HANDLE_AGGREGATE_AND_PROOF" + EventTypeHandleBlobSidecarsByRange EventType = "HANDLE_BLOB_SIDECARS_BY_RANGE" + EventTypeHandleBlobSidecarsByRoot EventType = "HANDLE_BLOB_SIDECARS_BY_ROOT" + EventTypeHandlePing EventType = "HANDLE_PING" + EventTypeHandleGoodbye EventType = "HANDLE_GOODBYE" + EventTypeHandleBeaconBlocksByRange EventType = "HANDLE_BEACON_BLOCKS_BY_RANGE" + EventTypeHandleBeaconBlocksByRoot EventType = "HANDLE_BEACON_BLOCKS_BY_ROOT" + + // REQUEST_ events + EventTypeRequestMetadata EventType = "REQUEST_METADATA" + EventTypeRequestStatus EventType = "REQUEST_STATUS" + EventTypeRequestPing EventType = "REQUEST_PING" +) + +// AllValidEvents returns all valid event types. +func AllValidEvents() []EventType { + return []EventType{ + EventTypeUnknown, + EventTypeConnected, + EventTypeDisconnected, + EventTypeAddPeer, + EventTypeRemovePeer, + EventTypePublishMessage, + EventTypeRejectMessage, + EventTypeDuplicateMessage, + EventTypeDeliverMessage, + EventTypeRecvRPC, + EventTypeSendRPC, + EventTypeDropRPC, + EventTypeJoin, + EventTypeLeave, + EventTypeGraft, + EventTypePrune, + EventTypeValidateMessage, + EventTypeThrottlePeer, + EventTypeUndeliverableMessage, + EventTypeHandleMessage, + EventTypeHandleStream, + EventTypeHandleStatus, + EventTypeHandleMetadata, + EventTypeHandleAggregateAndProof, + EventTypeHandleBlobSidecarsByRange, + EventTypeHandleBlobSidecarsByRoot, + EventTypeHandlePing, + EventTypeHandleGoodbye, + EventTypeHandleBeaconBlocksByRange, + EventTypeHandleBeaconBlocksByRoot, + EventTypeRequestMetadata, + EventTypeRequestStatus, + EventTypeRequestPing, + } +} + +// EventTypeFromBeaconChainProtocol returns the EventType for a given protocol string. +func EventTypeFromBeaconChainProtocol(protocol string) EventType { + // Usual protocol string: /eth2/beacon_chain/req/metadata/2/ssz_snappy + parts := strings.Split(protocol, "/") + if len(parts) > 4 { + return EventType("HANDLE_" + strings.ToUpper(parts[4])) + } + + return EventTypeUnknown +} diff --git a/host/event_types_test.go b/host/event_types_test.go new file mode 100644 index 0000000..53d6178 --- /dev/null +++ b/host/event_types_test.go @@ -0,0 +1,35 @@ +package host_test + +import ( + "testing" + + "github.com/probe-lab/hermes/host" +) + +func TestEventTypeFromBeaconChainProtocol(t *testing.T) { + tests := []struct { + name string + protocol string + expected host.EventType + }{ + { + name: "Valid protocol with metadata", + protocol: "/eth2/beacon_chain/req/metadata/2/ssz_snappy", + expected: host.EventTypeHandleMetadata, + }, + { + name: "Invalid protocol", + protocol: "/invalid/protocol", + expected: host.EventTypeUnknown, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := host.EventTypeFromBeaconChainProtocol(tt.protocol) + if result != tt.expected { + t.Errorf("EventTypeFromBeaconChainProtocol(%s) = %v, want %v", tt.protocol, result, tt.expected) + } + }) + } +} From 1564b5db6dc6293b432dfcfd1e9d18a591333fbc Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Tue, 16 Apr 2024 16:45:33 +1000 Subject: [PATCH 04/14] chore: EventNames --- host/event_types.go | 39 --------------------------------------- host/host.go | 2 +- 2 files changed, 1 insertion(+), 40 deletions(-) diff --git a/host/event_types.go b/host/event_types.go index f9d8ba8..c61611e 100644 --- a/host/event_types.go +++ b/host/event_types.go @@ -48,45 +48,6 @@ const ( EventTypeRequestPing EventType = "REQUEST_PING" ) -// AllValidEvents returns all valid event types. -func AllValidEvents() []EventType { - return []EventType{ - EventTypeUnknown, - EventTypeConnected, - EventTypeDisconnected, - EventTypeAddPeer, - EventTypeRemovePeer, - EventTypePublishMessage, - EventTypeRejectMessage, - EventTypeDuplicateMessage, - EventTypeDeliverMessage, - EventTypeRecvRPC, - EventTypeSendRPC, - EventTypeDropRPC, - EventTypeJoin, - EventTypeLeave, - EventTypeGraft, - EventTypePrune, - EventTypeValidateMessage, - EventTypeThrottlePeer, - EventTypeUndeliverableMessage, - EventTypeHandleMessage, - EventTypeHandleStream, - EventTypeHandleStatus, - EventTypeHandleMetadata, - EventTypeHandleAggregateAndProof, - EventTypeHandleBlobSidecarsByRange, - EventTypeHandleBlobSidecarsByRoot, - EventTypeHandlePing, - EventTypeHandleGoodbye, - EventTypeHandleBeaconBlocksByRange, - EventTypeHandleBeaconBlocksByRoot, - EventTypeRequestMetadata, - EventTypeRequestStatus, - EventTypeRequestPing, - } -} - // EventTypeFromBeaconChainProtocol returns the EventType for a given protocol string. func EventTypeFromBeaconChainProtocol(protocol string) EventType { // Usual protocol string: /eth2/beacon_chain/req/metadata/2/ssz_snappy diff --git a/host/host.go b/host/host.go index b299c20..9d0945f 100644 --- a/host/host.go +++ b/host/host.go @@ -263,7 +263,7 @@ func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error) { func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { return func(ctx context.Context, msg *pubsub.Message) error { evt := &TraceEvent{ - Type: "HANDLE_MESSAGE", + Type: EventTypeHandleMessage, PeerID: h.ID(), Timestamp: time.Now(), Payload: map[string]any{ From 5bcb536550d4044ab9484cd52b480a45781939aa Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 22 Apr 2024 12:40:36 +1000 Subject: [PATCH 05/14] Merge main --- host/host.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/host/host.go b/host/host.go index c60d051..73a53db 100644 --- a/host/host.go +++ b/host/host.go @@ -328,6 +328,6 @@ func (h *Host) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) { traceCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - h.cfg.DataStream.PutRecord(traceCtx, trace) + h.cfg.DataStream.PutEvent(traceCtx, trace) } } From 1ecd5ab982f99c3f9d6042dc91384f44604e36b6 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 22 Apr 2024 19:39:37 +1000 Subject: [PATCH 06/14] feat: Raw EventType strings --- eth/node.go | 4 ++-- eth/pubsub.go | 2 +- eth/reqresp.go | 8 ++++---- host/event_types.go | 8 ++++---- host/event_types_test.go | 6 +++--- host/flush_tracer.go | 38 +++++++++++++++++++------------------- host/host.go | 8 ++++---- 7 files changed, 37 insertions(+), 37 deletions(-) diff --git a/eth/node.go b/eth/node.go index a4aec45..dfa45f0 100644 --- a/eth/node.go +++ b/eth/node.go @@ -90,9 +90,9 @@ func NewNode(cfg *NodeConfig) (*Node, error) { DroppedRecordF: func(ctx context.Context, record gk.Record) { tevt, ok := record.(*host.TraceEvent) if !ok { - droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", string(host.EventTypeUnknown)))) + droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", "UNKNOWN"))) } else { - droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", string(tevt.Type)))) + droppedTraces.Add(ctx, 1, metric.WithAttributes(attribute.String("evt_type", tevt.Type))) } slog.Warn("Dropped record", "partition_key", record.PartitionKey(), "size", len(record.Data())) }, diff --git a/eth/pubsub.go b/eth/pubsub.go index 18f8385..8c1b37b 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -143,7 +143,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) evt := &host.TraceEvent{ - Type: host.EventTypeHandleAggregateAndProof, + Type: "HANDLE_BEACON_BLOCK", // TODO(sam.calder-mason): Change to BEACON_BLOCKS_BY PeerID: p.host.ID(), Timestamp: now, Payload: map[string]any{ diff --git a/eth/reqresp.go b/eth/reqresp.go index bc76c4d..0505188 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -247,7 +247,7 @@ func (r *ReqResp) wrapStreamHandler(ctx context.Context, name string, handler Co } end := time.Now() - traceType := hermeshost.EventTypeHandleStream + traceType := "HANDLE_STREAM" protocol := string(s.Protocol()) @@ -568,7 +568,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e } traceEvt := &hermeshost.TraceEvent{ - Type: hermeshost.EventTypeRequestStatus, + Type: "REQUEST_STATUS", PeerID: r.host.ID(), Timestamp: time.Now(), Payload: reqData, @@ -623,7 +623,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { defer func() { traceEvt := &hermeshost.TraceEvent{ - Type: hermeshost.EventTypeRequestPing, + Type: "REQUEST_PING", PeerID: r.host.ID(), Timestamp: time.Now(), Payload: map[string]string{ @@ -687,7 +687,7 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV } traceEvt := &hermeshost.TraceEvent{ - Type: hermeshost.EventTypeRequestMetadata, + Type: "REQUEST_METADATA", PeerID: r.host.ID(), Timestamp: time.Now(), Payload: reqData, diff --git a/host/event_types.go b/host/event_types.go index c61611e..1d7b189 100644 --- a/host/event_types.go +++ b/host/event_types.go @@ -48,13 +48,13 @@ const ( EventTypeRequestPing EventType = "REQUEST_PING" ) -// EventTypeFromBeaconChainProtocol returns the EventType for a given protocol string. -func EventTypeFromBeaconChainProtocol(protocol string) EventType { +// EventTypeFromBeaconChainProtocol returns the event type for a given protocol string. +func EventTypeFromBeaconChainProtocol(protocol string) string { // Usual protocol string: /eth2/beacon_chain/req/metadata/2/ssz_snappy parts := strings.Split(protocol, "/") if len(parts) > 4 { - return EventType("HANDLE_" + strings.ToUpper(parts[4])) + return "HANDLE_" + strings.ToUpper(parts[4]) } - return EventTypeUnknown + return "UNKNOWN" } diff --git a/host/event_types_test.go b/host/event_types_test.go index 53d6178..c62bba2 100644 --- a/host/event_types_test.go +++ b/host/event_types_test.go @@ -10,17 +10,17 @@ func TestEventTypeFromBeaconChainProtocol(t *testing.T) { tests := []struct { name string protocol string - expected host.EventType + expected string }{ { name: "Valid protocol with metadata", protocol: "/eth2/beacon_chain/req/metadata/2/ssz_snappy", - expected: host.EventTypeHandleMetadata, + expected: "HANDLE_METADATA", }, { name: "Invalid protocol", protocol: "/invalid/protocol", - expected: host.EventTypeUnknown, + expected: "UNKNOWN", }, } diff --git a/host/flush_tracer.go b/host/flush_tracer.go index 7d5e85f..4a346b2 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -17,7 +17,7 @@ import ( ) type TraceEvent struct { - Type EventType + Type string PeerID peer.ID Timestamp time.Time Payload any `json:"Data"` // cannot use field "Data" because of gk.Record method @@ -48,11 +48,11 @@ var _ gk.Record = (*TraceEvent)(nil) var _ pubsub.RawTracer = (*Host)(nil) -func (h *Host) FlushTrace(evtType EventType, payload any) { +func (h *Host) FlushTrace(evtType string, payload any) { h.FlushTraceWithTimestamp(evtType, time.Now(), payload) } -func (h *Host) FlushTraceWithTimestamp(evtType EventType, timestamp time.Time, payload any) { +func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payload any) { evt := &TraceEvent{ Type: evtType, PeerID: h.ID(), @@ -72,46 +72,46 @@ func (h *Host) FlushTraceWithTimestamp(evtType EventType, timestamp time.Time, p } func (h *Host) AddPeer(p peer.ID, proto protocol.ID) { - h.FlushTrace(EventTypeAddPeer, map[string]any{ + h.FlushTrace("ADD_PEER", map[string]any{ "PeerID": p, "Protocol": proto, }) } func (h *Host) RemovePeer(p peer.ID) { - h.FlushTrace(EventTypeRemovePeer, map[string]any{ + h.FlushTrace("REMOVE_PEER", map[string]any{ "PeerID": p, }) } func (h *Host) Join(topic string) { - h.FlushTrace(EventTypeJoin, map[string]any{ + h.FlushTrace("JOIN", map[string]any{ "Topic": topic, }) } func (h *Host) Leave(topic string) { - h.FlushTrace(EventTypeLeave, map[string]any{ + h.FlushTrace("LEAVE", map[string]any{ "Topic": topic, }) } func (h *Host) Graft(p peer.ID, topic string) { - h.FlushTrace(EventTypeGraft, map[string]any{ + h.FlushTrace("GRAFT", map[string]any{ "PeerID": p, "Topic": topic, }) } func (h *Host) Prune(p peer.ID, topic string) { - h.FlushTrace(EventTypePrune, map[string]any{ + h.FlushTrace("PRUNE", map[string]any{ "PeerID": p, "Topic": topic, }) } func (h *Host) ValidateMessage(msg *pubsub.Message) { - h.FlushTrace(EventTypeValidateMessage, map[string]any{ + h.FlushTrace("VALIDATE_MESSAGE", map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -122,7 +122,7 @@ func (h *Host) ValidateMessage(msg *pubsub.Message) { } func (h *Host) DeliverMessage(msg *pubsub.Message) { - h.FlushTrace(EventTypeDeliverMessage, map[string]any{ + h.FlushTrace("DELIVER_MESSAGE", map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -133,7 +133,7 @@ func (h *Host) DeliverMessage(msg *pubsub.Message) { } func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { - h.FlushTrace(EventTypeRejectMessage, map[string]any{ + h.FlushTrace("REJECT_MESSAGE", map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -145,7 +145,7 @@ func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { } func (h *Host) DuplicateMessage(msg *pubsub.Message) { - h.FlushTrace(EventTypeDuplicateMessage, map[string]any{ + h.FlushTrace("DUPLICATE_MESSAGE", map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -156,7 +156,7 @@ func (h *Host) DuplicateMessage(msg *pubsub.Message) { } func (h *Host) ThrottlePeer(p peer.ID) { - h.FlushTrace(EventTypeThrottlePeer, map[string]any{ + h.FlushTrace("THROTTLE_PEER", map[string]any{ "PeerID": p, }) } @@ -174,7 +174,7 @@ func (h *Host) DropRPC(rpc *pubsub.RPC, p peer.ID) { } func (h *Host) UndeliverableMessage(msg *pubsub.Message) { - h.FlushTrace(EventTypeUndeliverableMessage, map[string]any{ + h.FlushTrace("UNDELIVERABLE_MESSAGE", map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -186,18 +186,18 @@ func (h *Host) Trace(evt *pubsubpb.TraceEvent) { ts := time.Unix(0, evt.GetTimestamp()) switch evt.GetType() { case pubsubpb.TraceEvent_PUBLISH_MESSAGE: - h.FlushTraceWithTimestamp(EventTypePublishMessage, ts, map[string]any{ + h.FlushTraceWithTimestamp("PUBLISH_MESSAGE", ts, map[string]any{ "MsgID": evt.GetPublishMessage().GetMessageID(), "Topic": evt.GetPublishMessage().GetTopic(), }) case pubsubpb.TraceEvent_RECV_RPC: payload := newRPCMeta(evt.GetRecvRPC().GetReceivedFrom(), evt.GetRecvRPC().GetMeta()) - h.FlushTraceWithTimestamp(EventTypeRecvRPC, ts, payload) + h.FlushTraceWithTimestamp("RECV_RPC", ts, payload) case pubsubpb.TraceEvent_SEND_RPC: payload := newRPCMeta(evt.GetSendRPC().GetSendTo(), evt.GetSendRPC().GetMeta()) - h.FlushTraceWithTimestamp(EventTypeSendRPC, ts, payload) + h.FlushTraceWithTimestamp("SEND_RPC", ts, payload) case pubsubpb.TraceEvent_DROP_RPC: payload := newRPCMeta(evt.GetDropRPC().GetSendTo(), evt.GetDropRPC().GetMeta()) - h.FlushTraceWithTimestamp(EventTypeDropRPC, ts, payload) + h.FlushTraceWithTimestamp("DROP_RPC", ts, payload) } } diff --git a/host/host.go b/host/host.go index 73a53db..2a0d3e1 100644 --- a/host/host.go +++ b/host/host.go @@ -75,7 +75,7 @@ func (h *Host) Serve(ctx context.Context) error { return fmt.Errorf("host started without gossip sub initialization: %w", suture.ErrTerminateSupervisorTree) } - eventHandler := func(n network.Network, c network.Conn, evtType EventType) { + eventHandler := func(n network.Network, c network.Conn, evtType string) { evt := &TraceEvent{ Type: evtType, PeerID: h.ID(), @@ -106,8 +106,8 @@ func (h *Host) Serve(ctx context.Context) error { } notifiee := &network.NotifyBundle{ - ConnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, EventTypeConnected) }, - DisconnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, EventTypeDisconnected) }, + ConnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, "CONNECTED") }, + DisconnectedF: func(n network.Network, c network.Conn) { eventHandler(n, c, "DISCONNECTED") }, } h.Host.Network().Notify(notifiee) @@ -271,7 +271,7 @@ func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error) { func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { return func(ctx context.Context, msg *pubsub.Message) error { evt := &TraceEvent{ - Type: EventTypeHandleMessage, + Type: "HANDLE_MESSAGE", PeerID: h.ID(), Timestamp: time.Now(), Payload: map[string]any{ From c64d3e46f550cea908a1ec63d2db0ff288316487 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 22 Apr 2024 19:39:55 +1000 Subject: [PATCH 07/14] feat: Raw EventType strings --- host/event_types.go | 46 --------------------------------------------- 1 file changed, 46 deletions(-) diff --git a/host/event_types.go b/host/event_types.go index 1d7b189..8fabecd 100644 --- a/host/event_types.go +++ b/host/event_types.go @@ -2,52 +2,6 @@ package host import "strings" -// EventType represents the type of an event. -type EventType string - -const ( - // General events - EventTypeUnknown EventType = "UNKNOWN" - - // P2P events - EventTypeConnected EventType = "CONNECTED" - EventTypeDisconnected EventType = "DISCONNECTED" - EventTypeAddPeer EventType = "ADD_PEER" - EventTypeRemovePeer EventType = "REMOVE_PEER" - EventTypePublishMessage EventType = "PUBLISH_MESSAGE" - EventTypeRejectMessage EventType = "REJECT_MESSAGE" - EventTypeDuplicateMessage EventType = "DUPLICATE_MESSAGE" - EventTypeDeliverMessage EventType = "DELIVER_MESSAGE" - EventTypeRecvRPC EventType = "RECV_RPC" - EventTypeSendRPC EventType = "SEND_RPC" - EventTypeDropRPC EventType = "DROP_RPC" - EventTypeJoin EventType = "JOIN" - EventTypeLeave EventType = "LEAVE" - EventTypeGraft EventType = "GRAFT" - EventTypePrune EventType = "PRUNE" - EventTypeValidateMessage EventType = "VALIDATE_MESSAGE" - EventTypeThrottlePeer EventType = "THROTTLE_PEER" - EventTypeUndeliverableMessage EventType = "UNDELIVERABLE_MESSAGE" - - // HANDLE_ events - EventTypeHandleMessage EventType = "HANDLE_MESSAGE" - EventTypeHandleStream EventType = "HANDLE_STREAM" - EventTypeHandleStatus EventType = "HANDLE_STATUS" - EventTypeHandleMetadata EventType = "HANDLE_METADATA" - EventTypeHandleAggregateAndProof EventType = "HANDLE_AGGREGATE_AND_PROOF" - EventTypeHandleBlobSidecarsByRange EventType = "HANDLE_BLOB_SIDECARS_BY_RANGE" - EventTypeHandleBlobSidecarsByRoot EventType = "HANDLE_BLOB_SIDECARS_BY_ROOT" - EventTypeHandlePing EventType = "HANDLE_PING" - EventTypeHandleGoodbye EventType = "HANDLE_GOODBYE" - EventTypeHandleBeaconBlocksByRange EventType = "HANDLE_BEACON_BLOCKS_BY_RANGE" - EventTypeHandleBeaconBlocksByRoot EventType = "HANDLE_BEACON_BLOCKS_BY_ROOT" - - // REQUEST_ events - EventTypeRequestMetadata EventType = "REQUEST_METADATA" - EventTypeRequestStatus EventType = "REQUEST_STATUS" - EventTypeRequestPing EventType = "REQUEST_PING" -) - // EventTypeFromBeaconChainProtocol returns the event type for a given protocol string. func EventTypeFromBeaconChainProtocol(protocol string) string { // Usual protocol string: /eth2/beacon_chain/req/metadata/2/ssz_snappy From 256581a84e3485119bea231bff3903d606fb7285 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 24 Apr 2024 13:27:00 +1000 Subject: [PATCH 08/14] refactor: Add root hash calculation in getSignedBeaconBlockForForkDigest --- eth/pubsub.go | 52 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 15 deletions(-) diff --git a/eth/pubsub.go b/eth/pubsub.go index 8c1b37b..c6d5440 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -131,7 +131,9 @@ func (n *Node) FilterIncomingSubscriptions(id peer.ID, subs []*pubsubpb.RPC_SubO } func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) error { - genericBlock, err := p.getSignedBeaconBlockForForkDigest(msg.Data) + now := time.Now() + + genericBlock, root, err := p.getSignedBeaconBlockForForkDigest(msg.Data) if err != nil { return err } @@ -139,11 +141,10 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err slot := genericBlock.GetSlot() ProposerIndex := genericBlock.GetProposerIndex() - now := time.Now() slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) evt := &host.TraceEvent{ - Type: "HANDLE_BEACON_BLOCK", // TODO(sam.calder-mason): Change to BEACON_BLOCKS_BY + Type: "HANDLE_MESSAGE", PeerID: p.host.ID(), Timestamp: now, Payload: map[string]any{ @@ -154,6 +155,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err "Seq": msg.GetSeqno(), "ValIdx": ProposerIndex, "Slot": slot, + "Root": root, "TimeInSlot": now.Sub(slotStart).Seconds(), }, } @@ -174,7 +176,7 @@ type GenericBeaconBlock interface { GetProposerIndex() primitives.ValidatorIndex } -func (p *PubSub) getSignedBeaconBlockForForkDigest(msgData []byte) (genericSbb GenericBeaconBlock, err error) { +func (p *PubSub) getSignedBeaconBlockForForkDigest(msgData []byte) (genericSbb GenericBeaconBlock, root [32]byte, err error) { // get the correct fork switch p.cfg.ForkVersion { @@ -182,48 +184,68 @@ func (p *PubSub) getSignedBeaconBlockForForkDigest(msgData []byte) (genericSbb G phase0Sbb := ethtypes.SignedBeaconBlock{} err = p.cfg.Encoder.DecodeGossip(msgData, &phase0Sbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding phase0 beacon block gossip message: %w", err) } genericSbb = phase0Sbb.GetBlock() - return genericSbb, err + root, err = phase0Sbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case AltairForkVersion: altairSbb := ethtypes.SignedBeaconBlockAltair{} err = p.cfg.Encoder.DecodeGossip(msgData, &altairSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding altair beacon block gossip message: %w", err) } genericSbb = altairSbb.GetBlock() - return genericSbb, err + root, err = altairSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case BellatrixForkVersion: BellatrixSbb := ethtypes.SignedBeaconBlockBellatrix{} err = p.cfg.Encoder.DecodeGossip(msgData, &BellatrixSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding bellatrix beacon block gossip message: %w", err) } genericSbb = BellatrixSbb.GetBlock() - return genericSbb, err + root, err = BellatrixSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case CapellaForkVersion: capellaSbb := ethtypes.SignedBeaconBlockCapella{} err = p.cfg.Encoder.DecodeGossip(msgData, &capellaSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding capella beacon block gossip message: %w", err) } genericSbb = capellaSbb.GetBlock() - return genericSbb, err + root, err = capellaSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil case DenebForkVersion: denebSbb := ethtypes.SignedBeaconBlockDeneb{} err = p.cfg.Encoder.DecodeGossip(msgData, &denebSbb) if err != nil { - return genericSbb, fmt.Errorf("decode beacon block gossip message: %w", err) + return genericSbb, [32]byte{}, fmt.Errorf("error decoding deneb beacon block gossip message: %w", err) } genericSbb = denebSbb.GetBlock() - return genericSbb, err + root, err = denebSbb.Block.HashTreeRoot() + if err != nil { + return genericSbb, [32]byte{}, fmt.Errorf("invalid hash tree root: %w", err) + } + return genericSbb, root, nil default: - return genericSbb, fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) + return genericSbb, [32]byte{}, fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) } } From d87881163f3be325282dd2613fa02c577e327c38 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Wed, 24 Apr 2024 15:56:12 +1000 Subject: [PATCH 09/14] feat: add "Timestamp" field to beacon block handling --- eth/pubsub.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/pubsub.go b/eth/pubsub.go index c6d5440..2ed6786 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -157,6 +157,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err "Slot": slot, "Root": root, "TimeInSlot": now.Sub(slotStart).Seconds(), + "Timestamp": now, }, } From 9ea0ab5634cb0b4d2546590d5d7cd19fd9cd4225 Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 27 May 2024 14:05:28 +1000 Subject: [PATCH 10/14] refactor: Change method names from PutEvent to PutRecord --- eth/pubsub.go | 2 +- eth/reqresp.go | 8 ++++---- host/callback.go | 4 ++-- host/flush_tracer.go | 2 +- host/host.go | 6 +++--- host/kinesis.go | 4 ++-- host/producer.go | 2 +- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/eth/pubsub.go b/eth/pubsub.go index 986aab3..f644c39 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -182,7 +182,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err }, } - if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } diff --git a/eth/reqresp.go b/eth/reqresp.go index 0505188..04b0519 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -278,7 +278,7 @@ func (r *ReqResp) wrapStreamHandler(ctx context.Context, name string, handler Co Payload: commonData, } - if err := r.cfg.DataStream.PutEvent(ctx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(ctx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -575,7 +575,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -631,7 +631,7 @@ func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { }, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -693,7 +693,7 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV Payload: reqData, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } diff --git a/host/callback.go b/host/callback.go index 9e1cdd4..c2df4db 100644 --- a/host/callback.go +++ b/host/callback.go @@ -47,8 +47,8 @@ func (c *CallbackDataStream) Stop(ctx context.Context) error { return ctx.Err() } -// PutEvent sends an event to the callback if the stream has not been stopped. -func (c *CallbackDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { +// PutRecord sends an event to the callback if the stream has not been stopped. +func (c *CallbackDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { if c.stopped { return ctx.Err() } diff --git a/host/flush_tracer.go b/host/flush_tracer.go index 4a346b2..a82a8f6 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -63,7 +63,7 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("Failed to put trace event payload", tele.LogAttrError(err)) return } diff --git a/host/host.go b/host/host.go index 3149eba..e09dc24 100644 --- a/host/host.go +++ b/host/host.go @@ -97,7 +97,7 @@ func (h *Host) Serve(ctx context.Context) error { }, } - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("Failed to put event payload", tele.LogAttrError(err)) return } @@ -284,7 +284,7 @@ func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { }, } - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } @@ -329,6 +329,6 @@ func (h *Host) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) { traceCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - h.cfg.DataStream.PutEvent(traceCtx, trace) + h.cfg.DataStream.PutRecord(traceCtx, trace) } } diff --git a/host/kinesis.go b/host/kinesis.go index a9ce570..61163f7 100644 --- a/host/kinesis.go +++ b/host/kinesis.go @@ -67,8 +67,8 @@ func (k *KinesisDataStream) Stop(ctx context.Context) error { return k.producer.WaitIdle(ctx) } -// PutEvent sends an event to the Kinesis data stream. -func (k *KinesisDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { +// PutRecord sends an event to the Kinesis data stream. +func (k *KinesisDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { if event != nil { kRecord := gk.Record(event) diff --git a/host/producer.go b/host/producer.go index 874c1eb..8e6f755 100644 --- a/host/producer.go +++ b/host/producer.go @@ -7,7 +7,7 @@ import ( type DataStream interface { Start(ctx context.Context) error Stop(ctx context.Context) error - PutEvent(ctx context.Context, event *TraceEvent) error + PutRecord(ctx context.Context, event *TraceEvent) error Type() DataStreamType } From 46eca097008f09aff7957b7f14e18d09979bbd7c Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 27 May 2024 14:10:03 +1000 Subject: [PATCH 11/14] Tidying --- host/flush_tracer.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/host/flush_tracer.go b/host/flush_tracer.go index a82a8f6..de87c2b 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -72,39 +72,39 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl } func (h *Host) AddPeer(p peer.ID, proto protocol.ID) { - h.FlushTrace("ADD_PEER", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_ADD_PEER.String(), map[string]any{ "PeerID": p, "Protocol": proto, }) } func (h *Host) RemovePeer(p peer.ID) { - h.FlushTrace("REMOVE_PEER", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_REMOVE_PEER.String(), map[string]any{ "PeerID": p, }) } func (h *Host) Join(topic string) { - h.FlushTrace("JOIN", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_JOIN.String(), map[string]any{ "Topic": topic, }) } func (h *Host) Leave(topic string) { - h.FlushTrace("LEAVE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_LEAVE.String(), map[string]any{ "Topic": topic, }) } func (h *Host) Graft(p peer.ID, topic string) { - h.FlushTrace("GRAFT", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_GRAFT.String(), map[string]any{ "PeerID": p, "Topic": topic, }) } func (h *Host) Prune(p peer.ID, topic string) { - h.FlushTrace("PRUNE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_PRUNE.String(), map[string]any{ "PeerID": p, "Topic": topic, }) @@ -122,7 +122,7 @@ func (h *Host) ValidateMessage(msg *pubsub.Message) { } func (h *Host) DeliverMessage(msg *pubsub.Message) { - h.FlushTrace("DELIVER_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_DELIVER_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -133,7 +133,7 @@ func (h *Host) DeliverMessage(msg *pubsub.Message) { } func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { - h.FlushTrace("REJECT_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_REJECT_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -145,7 +145,7 @@ func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { } func (h *Host) DuplicateMessage(msg *pubsub.Message) { - h.FlushTrace("DUPLICATE_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_DUPLICATE_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -186,18 +186,18 @@ func (h *Host) Trace(evt *pubsubpb.TraceEvent) { ts := time.Unix(0, evt.GetTimestamp()) switch evt.GetType() { case pubsubpb.TraceEvent_PUBLISH_MESSAGE: - h.FlushTraceWithTimestamp("PUBLISH_MESSAGE", ts, map[string]any{ + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_PUBLISH_MESSAGE.String(), ts, map[string]any{ "MsgID": evt.GetPublishMessage().GetMessageID(), "Topic": evt.GetPublishMessage().GetTopic(), }) case pubsubpb.TraceEvent_RECV_RPC: payload := newRPCMeta(evt.GetRecvRPC().GetReceivedFrom(), evt.GetRecvRPC().GetMeta()) - h.FlushTraceWithTimestamp("RECV_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_RECV_RPC.String(), ts, payload) case pubsubpb.TraceEvent_SEND_RPC: payload := newRPCMeta(evt.GetSendRPC().GetSendTo(), evt.GetSendRPC().GetMeta()) - h.FlushTraceWithTimestamp("SEND_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_SEND_RPC.String(), ts, payload) case pubsubpb.TraceEvent_DROP_RPC: payload := newRPCMeta(evt.GetDropRPC().GetSendTo(), evt.GetDropRPC().GetMeta()) - h.FlushTraceWithTimestamp("DROP_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_DROP_RPC.String(), ts, payload) } } From d55cf4aa51814d6d959a7733a8f605cf6928b34f Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 27 May 2024 15:09:26 +1000 Subject: [PATCH 12/14] Start kinesis producer --- eth/pubsub.go | 1 - host/kinesis.go | 5 ++++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/eth/pubsub.go b/eth/pubsub.go index f644c39..3950ba4 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -178,7 +178,6 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err "Slot": slot, "Root": root, "TimeInSlot": now.Sub(slotStart).Seconds(), - "Timestamp": now, }, } diff --git a/host/kinesis.go b/host/kinesis.go index 61163f7..2f0fa5e 100644 --- a/host/kinesis.go +++ b/host/kinesis.go @@ -36,9 +36,12 @@ func (k *KinesisDataStream) Start(ctx context.Context) error { dsCtx, dsCancel := context.WithCancel(ctx) k.ctx = dsCtx - k.cancelFn = dsCancel + if err := k.producer.Start(ctx); err != nil { + return err + } + <-dsCtx.Done() return dsCtx.Err() From 8b015a8758e1f4f222d66ca08b6c4f9203b47eea Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 27 May 2024 15:30:10 +1000 Subject: [PATCH 13/14] Ensure timestamps are as accurate as possible --- eth/pubsub.go | 34 +++++++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/eth/pubsub.go b/eth/pubsub.go index 3950ba4..e0ae077 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -189,6 +189,8 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err } func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + if msg == nil || msg.Topic == nil || *msg.Topic == "" { return fmt.Errorf("nil message or topic") } @@ -199,7 +201,6 @@ func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) err return fmt.Errorf("decode attestation gossip message: %w", err) } - now := time.Now() evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), @@ -226,6 +227,8 @@ func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) err } func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + ap := ðtypes.SignedAggregateAttestationAndProof{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil { return fmt.Errorf("decode aggregate and proof message: %w", err) @@ -234,7 +237,7 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -260,6 +263,8 @@ func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Messag } func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + ve := ðtypes.VoluntaryExit{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil { return fmt.Errorf("decode voluntary exit message: %w", err) @@ -268,7 +273,7 @@ func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) err evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -288,6 +293,8 @@ func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) err } func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + as := ðtypes.AttesterSlashing{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil { return fmt.Errorf("decode attester slashing message: %w", err) @@ -296,7 +303,7 @@ func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub. evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -316,6 +323,8 @@ func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub. } func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + ps := ðtypes.ProposerSlashing{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil { return fmt.Errorf("decode proposer slashing message: %w", err) @@ -324,7 +333,7 @@ func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub. evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -348,6 +357,8 @@ func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub. } func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + cp := ðtypes.SignedContributionAndProof{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil { return fmt.Errorf("decode contribution and proof message: %w", err) @@ -356,7 +367,7 @@ func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pu evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -379,6 +390,8 @@ func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pu } func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + sc := ðtypes.SyncCommitteeMessage{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil { return fmt.Errorf("decode sync committee message: %w", err) @@ -387,7 +400,7 @@ func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Mes evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -409,6 +422,8 @@ func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Mes } func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + pb := ðtypes.BLSToExecutionChange{} if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil { return fmt.Errorf("decode bls to execution change message: %w", err) @@ -417,7 +432,7 @@ func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pub evt := &host.TraceEvent{ Type: eventTypeHandleMessage, PeerID: p.host.ID(), - Timestamp: time.Now(), + Timestamp: now, Payload: map[string]any{ "PeerID": msg.ReceivedFrom.String(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -438,6 +453,8 @@ func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pub } func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + switch p.cfg.ForkVersion { case DenebForkVersion: var blob ethtypes.BlobSidecar @@ -451,7 +468,6 @@ func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) err slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex() - now := time.Now() evt := &host.TraceEvent{ Type: "HANDLE_MESSAGE", PeerID: p.host.ID(), From c71f0152e0937722908989397e2ad5ccd4e2838a Mon Sep 17 00:00:00 2001 From: Sam Calder-Mason Date: Mon, 3 Jun 2024 11:07:45 +1000 Subject: [PATCH 14/14] Update host/kinesis.go Co-authored-by: Mikel Cortes <45786396+cortze@users.noreply.github.com> --- host/kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/host/kinesis.go b/host/kinesis.go index 2f0fa5e..dad62f6 100644 --- a/host/kinesis.go +++ b/host/kinesis.go @@ -54,7 +54,7 @@ func (k *KinesisDataStream) Stop(ctx context.Context) error { if err := k.producer.WaitIdle(timeoutCtx); err != nil { slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err)) } - + timeoutCncl() // stop the producer k.cancelFn()