Skip to content

Commit

Permalink
Remove non-traced events
Browse files Browse the repository at this point in the history
  • Loading branch information
samcm committed Apr 17, 2024
1 parent 05cf749 commit 5544910
Show file tree
Hide file tree
Showing 17 changed files with 1,627 additions and 5,111 deletions.
311 changes: 0 additions & 311 deletions pkg/proto/libp2p/trace.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package libp2p

import (
"encoding/hex"
"fmt"
"time"

Expand All @@ -21,29 +20,6 @@ func EventTypeFromHermesEventType(e host.EventType) EventType {
return EventType(0) // Return an Unknown EventType if not found
}

// Helper function to convert a Hermes TraceEvent to a libp2p PublishMessage
func TraceEventToPublishMessage(event *host.TraceEvent) (*PublishMessage, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for PublishMessage")
}

messageID, ok := payload["MsgID"].(string)
if !ok {
return nil, fmt.Errorf("msgID is required for PublishMessage")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for PublishMessage")
}

return &PublishMessage{
MessageId: wrapperspb.String(messageID),
Topic: wrapperspb.String(topic),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p AddPeer
func TraceEventToAddPeer(event *host.TraceEvent) (*AddPeer, error) {
payload, ok := event.Payload.(map[string]any)
Expand Down Expand Up @@ -118,227 +94,6 @@ func TraceEventToLeave(event *host.TraceEvent) (*Leave, error) {
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p Graft
func TraceEventToGraft(event *host.TraceEvent) (*Graft, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for Graft")
}

peerID, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("peerID is required for Graft")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for Graft")
}

return &Graft{
PeerId: wrapperspb.String(peerID.String()),
Topic: wrapperspb.String(topic),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p Prune
func TraceEventToPrune(event *host.TraceEvent) (*Prune, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for Prune")
}

peerID, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("peerID is required for Prune")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for Prune")
}

return &Prune{
PeerId: wrapperspb.String(peerID.String()),
Topic: wrapperspb.String(topic),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p DeliverMessage
func TraceEventToDeliverMessage(event *host.TraceEvent) (*DeliverMessage, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for DeliverMessage")
}

messageID, ok := payload["MsgID"].(string)
if !ok {
return nil, fmt.Errorf("msgID is required for DeliverMessage")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for DeliverMessage")
}

receivedFrom, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("peerID is required for DeliverMessage")
}

return &DeliverMessage{
MessageId: wrapperspb.String(messageID),
Topic: wrapperspb.String(topic),
PeerId: wrapperspb.String(receivedFrom.String()),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p RejectMessage
func TraceEventToRejectMessage(event *host.TraceEvent) (*RejectMessage, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for RejectMessage")
}

messageID, ok := payload["MsgID"].(string)
if !ok {
return nil, fmt.Errorf("msgID is required for RejectMessage")
}

receivedFrom, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("peerID is required for RejectMessage")
}

reason, ok := payload["Reason"].(string)
if !ok {
return nil, fmt.Errorf("reason is required for RejectMessage")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for RejectMessage")
}

msgSize, ok := payload["MsgSize"].(int)
if !ok {
return nil, fmt.Errorf("msgSize is required for RejectMessage")
}

seqNo, ok := payload["SeqNo"].([]byte)
if !ok {
return nil, fmt.Errorf("seqNo is required for RejectMessage")
}

return &RejectMessage{
MessageId: wrapperspb.String(messageID),
PeerId: wrapperspb.String(receivedFrom.String()),
Reason: wrapperspb.String(reason),
Topic: wrapperspb.String(topic),
MessageSize: wrapperspb.UInt32(uint32(msgSize)),
SeqNo: wrapperspb.String(hex.EncodeToString(seqNo)),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p DuplicateMessage
func TraceEventToDuplicateMessage(event *host.TraceEvent) (*DuplicateMessage, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for DuplicateMessage")
}

messageID, ok := payload["MsgID"].(string)
if !ok {
return nil, fmt.Errorf("msgID is required for DuplicateMessage")
}

receivedFrom, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("peerID is required for DuplicateMessage")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for DuplicateMessage")
}

local, ok := payload["Local"].(bool)
if !ok {
return nil, fmt.Errorf("local flag is required for DuplicateMessage")
}

msgSize, ok := payload["MsgSize"].(int)
if !ok {
return nil, fmt.Errorf("msgSize is required for DuplicateMessage")
}

seqNo, ok := payload["SeqNo"].([]byte)
if !ok {
return nil, fmt.Errorf("seqNo is required for DuplicateMessage")
}

return &DuplicateMessage{
MessageId: wrapperspb.String(messageID),
PeerId: wrapperspb.String(receivedFrom.String()),
Topic: wrapperspb.String(topic),
MessageSize: wrapperspb.UInt32(uint32(msgSize)),
SeqNo: wrapperspb.String(hex.EncodeToString(seqNo)),
Local: wrapperspb.Bool(local),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p ThrottlePeer
func TraceEventToThrottlePeer(event *host.TraceEvent) (*ThrottlePeer, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for ThrottlePeer")
}

peerID, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("peerID is required for ThrottlePeer")
}

return &ThrottlePeer{
PeerId: wrapperspb.String(peerID.String()),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p UndeliverableMessage
func TraceEventToUndeliverableMessage(event *host.TraceEvent) (*UndeliverableMessage, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for UndeliverableMessage")
}

messageID, ok := payload["MsgID"].(string)
if !ok {
return nil, fmt.Errorf("msgID is required for UndeliverableMessage")
}

receivedFrom, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("PeerID is required for UndeliverableMessage")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for UndeliverableMessage")
}

local, ok := payload["Local"].(bool)
if !ok {
return nil, fmt.Errorf("local flag is required for UndeliverableMessage")
}

return &UndeliverableMessage{
MessageId: wrapperspb.String(messageID),
PeerId: wrapperspb.String(receivedFrom.String()),
Topic: wrapperspb.String(topic),
Local: wrapperspb.Bool(local),
}, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p RecvRPC
func TraceEventToRecvRPC(event *host.TraceEvent) (*RecvRPC, error) {
payload, ok := event.Payload.(*host.RpcMeta)
Expand Down Expand Up @@ -379,26 +134,6 @@ func TraceEventToSendRPC(event *host.TraceEvent) (*SendRPC, error) {
return r, nil
}

// Helper function to convert a Hermes TraceEvent to a libp2p RecvRPC
func TraceEventToDropRPC(event *host.TraceEvent) (*DropRPC, error) {
payload, ok := event.Payload.(*host.RpcMeta)
if !ok {
return nil, fmt.Errorf("invalid payload type for rpc")
}

r := &DropRPC{
SendTo: wrapperspb.String(payload.PeerID.String()),
Meta: &RPCMeta{
PeerId: wrapperspb.String(payload.PeerID.String()),
Messages: convertRPCMessages(payload.Messages),
Subscriptions: convertRPCSubscriptions(payload.Subscriptions),
Control: convertRPCControl(payload.Control),
},
}

return r, nil
}

func convertRPCMessages(messages []host.RpcMetaMsg) []*MessageMeta {
ourMessages := make([]*MessageMeta, len(messages))

Expand Down Expand Up @@ -552,49 +287,3 @@ func TraceEventToDisconnected(event *host.TraceEvent) (*Disconnected, error) {
Transient: wrapperspb.Bool(payload.Transient),
}, nil
}

func TraceEventToValidateMessage(event *host.TraceEvent) (*ValidateMessage, error) {
payload, ok := event.Payload.(map[string]any)
if !ok {
return nil, fmt.Errorf("invalid payload type for ValidateMessage")
}

messageID, ok := payload["MsgID"].(string)
if !ok {
return nil, fmt.Errorf("messageID is required for ValidateMessage")
}

receivedFrom, ok := payload["PeerID"].(peer.ID)
if !ok {
return nil, fmt.Errorf("PeerID is required for ValidateMessage")
}

topic, ok := payload["Topic"].(string)
if !ok {
return nil, fmt.Errorf("topic is required for ValidateMessage")
}

local, ok := payload["Local"].(bool)
if !ok {
return nil, fmt.Errorf("local flag is required for ValidateMessage")
}

msgSize, ok := payload["MsgSize"].(int)
if !ok {
return nil, fmt.Errorf("msgSize is required for ValidateMessage")
}

seqNo, ok := payload["SeqNo"].([]byte)
if !ok {
return nil, fmt.Errorf("seqNo is required for ValidateMessage")
}

return &ValidateMessage{
MessageId: wrapperspb.String(messageID),
PeerId: wrapperspb.String(receivedFrom.String()),
Topic: wrapperspb.String(topic),
Local: wrapperspb.Bool(local),
MessageSize: wrapperspb.UInt32(uint32(msgSize)),
SeqNo: wrapperspb.String(hex.EncodeToString(seqNo)),
}, nil
}
Loading

0 comments on commit 5544910

Please sign in to comment.