diff --git a/README.md b/README.md index b00a573..9997a90 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![GoDoc](https://pkg.go.dev/badge/github.com/probe-lab/hermes)](https://pkg.go.dev/github.com/probe-lab/hermes) Hermes is a GossipSub listener and tracer. It subscribes to all relevant pubsub topics -and traces all protocol interactions. As of `2024-03-27`, Hermes supports the Ethereum +and traces all protocol interactions. As of `2024-05-21`, Hermes supports the Ethereum network. ## Table of Contents @@ -21,6 +21,7 @@ network. - [Telemetry](#telemetry) - [Metrics](#metrics) - [Tracing](#tracing) + - [Differences with other tools](#differences-with-other-tools) - [Maintainers](#maintainers) - [Contributing](#contributing) - [License](#license) @@ -199,6 +200,20 @@ You can find the UI at [`http://localhost:16686`](http://localhost:16686). Port Run Hermes with the `--tracing` flag. To change the address of the trace collector, you can also specify `--tracing.addr` and `--tracing.port`. +## Differences with other tools +Hermes jumps to the web3/blockchain/libp2p ecosystem despite a large variety of tools around it, such as the many existing network crawlers or light clients for most mature networks. Although at first sight it might look like a competitor to those, there was still a large incentive to develop it. Here, we describe the gap that Hermes fills as well as the use-cases that it is suitable for. +Hermes was designed to behave as a light node in each supported network, where, in addition to being an honest participant in the network and supporting all the protocols and RPC endpoints, it also allows streaming of custom internal events (mostly libp2p-related). + +Hermes avoids being based on a custom fork of existing full/light clients, which would come with non-negligible maintenance baggage and would complicate having control of events. + +Currently available similar tools: + +### Armiarma Crawler from MigaLabs vs Hermes from ProbeLab +Although both Hermes and Armiarma seem to be focusing on the same goals at first sight, they have significant differences in their use cases and their targets in data collection and metrics. + +[Armiarma](https://github.com/migalabs/armiarma) is a network crawler that relies on running discv5 peer discovery service and a libp2p host 24/7 to establish connections. However, significant modifications have been made to connect to as many peers as possible (custom peering module). This way, it tries to identify as many peers as possible in the network periodically. Thus, its focus is mainly on opening and identifying as many peers as possible, rather than maintaining stable connections to other peers in the network. + +On the other hand, although Hermes also relies on a continuously running discv5 and libp2p host, it uses the libp2p connection manager for a different purpose to Armiarma (which is to connect to as many peers as possible). In the case of Hermes, the connection manager is used to decide who it connects to (i.e., simulating the behaviour of a standard node). Furthermore, it backs up some of the RPC, which requires keeping the chain db calls on a trusted node. This way, it behaves like a light node to the network, which is honest and beneficial for the rest of the network, allowing us to track all desired networking events from stable connections, while at the same time having a highly customizable tracing system. ## Maintainers diff --git a/cmd/hermes/cmd_eth.go b/cmd/hermes/cmd_eth.go index 6609144..e3b11cf 100644 --- a/cmd/hermes/cmd_eth.go +++ b/cmd/hermes/cmd_eth.go @@ -33,6 +33,10 @@ var ethConfig = &struct { DialConcurrency int DialTimeout time.Duration MaxPeers int + GenesisSSZURL string + ConfigURL string + BootnodesURL string + DepositContractBlockURL string }{ PrivateKeyStr: "", // unset means it'll be generated Chain: params.MainnetName, @@ -48,6 +52,10 @@ var ethConfig = &struct { DialConcurrency: 16, DialTimeout: 5 * time.Second, MaxPeers: 30, // arbitrary + GenesisSSZURL: "", + ConfigURL: "", + BootnodesURL: "", + DepositContractBlockURL: "", } var cmdEth = &cli.Command{ @@ -167,6 +175,34 @@ var cmdEthFlags = []cli.Flag{ Value: ethConfig.MaxPeers, Destination: ðConfig.MaxPeers, }, + &cli.StringFlag{ + Name: "genesis.ssz.url", + EnvVars: []string{"HERMES_ETH_GENESIS_SSZ_URL"}, + Usage: "The .ssz URL from which to fetch the genesis data, requires 'chain=devnet'", + Value: ethConfig.GenesisSSZURL, + Destination: ðConfig.GenesisSSZURL, + }, + &cli.StringFlag{ + Name: "config.yaml.url", + EnvVars: []string{"HERMES_ETH_CONFIG_URL"}, + Usage: "The .yaml URL from which to fetch the beacon chain config, requires 'chain=devnet'", + Value: ethConfig.ConfigURL, + Destination: ðConfig.ConfigURL, + }, + &cli.StringFlag{ + Name: "bootnodes.yaml.url", + EnvVars: []string{"HERMES_ETH_BOOTNODES_URL"}, + Usage: "The .yaml URL from which to fetch the bootnode ENRs, requires 'chain=devnet'", + Value: ethConfig.BootnodesURL, + Destination: ðConfig.BootnodesURL, + }, + &cli.StringFlag{ + Name: "deposit-contract-block.txt.url", + EnvVars: []string{"HERMES_ETH_DEPOSIT_CONTRACT_BLOCK_URL"}, + Usage: "The .txt URL from which to fetch the deposit contract block. Requires 'chain=devnet'", + Value: ethConfig.DepositContractBlockURL, + Destination: ðConfig.DepositContractBlockURL, + }, } func cmdEthAction(c *cli.Context) error { @@ -176,20 +212,45 @@ func cmdEthAction(c *cli.Context) error { // Print hermes configuration for debugging purposes printEthConfig() - // Extract chain configuration parameters based on the given chain name - genConfig, netConfig, beaConfig, err := eth.GetConfigsByNetworkName(ethConfig.Chain) - if err != nil { - return fmt.Errorf("get config for %s: %w", ethConfig.Chain, err) + var config *eth.NetworkConfig + // Derive network configuration + if ethConfig.Chain != params.DevnetName { + slog.Info("Deriving known network config:", "chain", ethConfig.Chain) + + c, err := eth.DeriveKnownNetworkConfig(c.Context, ethConfig.Chain) + if err != nil { + return fmt.Errorf("derive network config: %w", err) + } + + config = c + } else { + slog.Info("Deriving devnet network config") + + c, err := eth.DeriveDevnetConfig(c.Context, eth.DevnetOptions{ + ConfigURL: ethConfig.ConfigURL, + BootnodesURL: ethConfig.BootnodesURL, + DepositContractBlockURL: ethConfig.DepositContractBlockURL, + GenesisSSZURL: ethConfig.GenesisSSZURL, + }) + if err != nil { + return fmt.Errorf("failed to derive devnet network config: %w", err) + } + config = c } - genesisRoot := genConfig.GenesisValidatorRoot - genesisTime := genConfig.GenesisTime + // Overriding configuration so that functions like ComputForkDigest take the + // correct input data from the global configuration. + params.OverrideBeaconConfig(config.Beacon) + params.OverrideBeaconNetworkConfig(config.Network) + + genesisRoot := config.Genesis.GenesisValidatorRoot + genesisTime := config.Genesis.GenesisTime // compute fork version and fork digest currentSlot := slots.Since(genesisTime) currentEpoch := slots.ToEpoch(currentSlot) - currentForkVersion, err := eth.GetCurrentForkVersion(currentEpoch, beaConfig) + currentForkVersion, err := eth.GetCurrentForkVersion(currentEpoch, config.Beacon) if err != nil { return fmt.Errorf("compute fork version for epoch %d: %w", currentEpoch, err) } @@ -201,13 +262,13 @@ func cmdEthAction(c *cli.Context) error { // Overriding configuration so that functions like ComputForkDigest take the // correct input data from the global configuration. - params.OverrideBeaconConfig(beaConfig) - params.OverrideBeaconNetworkConfig(netConfig) + params.OverrideBeaconConfig(config.Beacon) + params.OverrideBeaconNetworkConfig(config.Network) cfg := ð.NodeConfig{ - GenesisConfig: genConfig, - NetworkConfig: netConfig, - BeaconConfig: beaConfig, + GenesisConfig: config.Genesis, + NetworkConfig: config.Network, + BeaconConfig: config.Beacon, ForkDigest: forkDigest, ForkVersion: currentForkVersion, PrivateKeyStr: ethConfig.PrivateKeyStr, diff --git a/cmd/hermes/cmd_eth_chains.go b/cmd/hermes/cmd_eth_chains.go index 35a6c31..dda4b56 100644 --- a/cmd/hermes/cmd_eth_chains.go +++ b/cmd/hermes/cmd_eth_chains.go @@ -27,28 +27,27 @@ func cmdEthChainsAction(c *cli.Context) error { slog.Info("Supported chains:") for _, chain := range chains { - - genConfig, _, beaConfig, err := eth.GetConfigsByNetworkName(chain) + config, err := eth.DeriveKnownNetworkConfig(c.Context, chain) if err != nil { return fmt.Errorf("get config for %s: %w", chain, err) } slog.Info(chain) forkVersions := [][]byte{ - beaConfig.GenesisForkVersion, - beaConfig.AltairForkVersion, - beaConfig.BellatrixForkVersion, - beaConfig.CapellaForkVersion, - beaConfig.DenebForkVersion, + config.Beacon.GenesisForkVersion, + config.Beacon.AltairForkVersion, + config.Beacon.BellatrixForkVersion, + config.Beacon.CapellaForkVersion, + config.Beacon.DenebForkVersion, } for _, forkVersion := range forkVersions { - epoch, found := beaConfig.ForkVersionSchedule[[4]byte(forkVersion)] + epoch, found := config.Beacon.ForkVersionSchedule[[4]byte(forkVersion)] if !found { return fmt.Errorf("fork version schedule not found for %x", forkVersion) } - forkName, found := beaConfig.ForkVersionNames[[4]byte(forkVersion)] + forkName, found := config.Beacon.ForkVersionNames[[4]byte(forkVersion)] if !found { return fmt.Errorf("fork version name not found for %x", forkVersion) } @@ -57,7 +56,7 @@ func cmdEthChainsAction(c *cli.Context) error { continue } - digest, err := signing.ComputeForkDigest(forkVersion, genConfig.GenesisValidatorRoot) + digest, err := signing.ComputeForkDigest(forkVersion, config.Genesis.GenesisValidatorRoot) if err != nil { return err } diff --git a/eth/fetch.go b/eth/fetch.go new file mode 100644 index 0000000..70971bc --- /dev/null +++ b/eth/fetch.go @@ -0,0 +1,121 @@ +package eth + +import ( + "context" + "encoding/binary" + "io" + "net/http" + + "github.com/prysmaticlabs/prysm/v5/config/params" + "gopkg.in/yaml.v3" +) + +// FetchConfigFromURL fetches the beacon chain config from a given URL. +func FetchConfigFromURL(ctx context.Context, url string) (*params.BeaconChainConfig, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + config := params.MainnetConfig().Copy() + + out, err := params.UnmarshalConfig(data, config) + if err != nil { + return nil, err + } + + return out, nil +} + +// FetchBootnodeENRsFromURL fetches the bootnode ENRs from a given URL. +func FetchBootnodeENRsFromURL(ctx context.Context, url string) ([]string, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer response.Body.Close() + + data, err := io.ReadAll(response.Body) + if err != nil { + return nil, err + } + + var enrs []string + err = yaml.Unmarshal(data, &enrs) + if err != nil { + return nil, err + } + + return enrs, nil +} + +// FetchDepositContractBlockFromURL fetches the deposit contract block from a given URL. +func FetchDepositContractBlockFromURL(ctx context.Context, url string) (uint64, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return 0, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return 0, err + } + defer response.Body.Close() + + data, err := io.ReadAll(response.Body) + if err != nil { + return 0, err + } + + var block uint64 + + err = yaml.Unmarshal(data, &block) + if err != nil { + return 0, err + } + + return block, nil +} + +// FetchGenesisDetailsFromURL fetches the genesis time and validators root from a given URL. +func FetchGenesisDetailsFromURL(ctx context.Context, url string) (uint64, [32]byte, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return 0, [32]byte{}, err + } + + response, err := http.DefaultClient.Do(req) + if err != nil { + return 0, [32]byte{}, err + } + defer response.Body.Close() + + // Read only the first 40 bytes (8 bytes for GenesisTime + 32 bytes for GenesisValidatorsRoot) + data := make([]byte, 40) + _, err = io.ReadFull(response.Body, data) + if err != nil { + return 0, [32]byte{}, err + } + + genesisTime := binary.LittleEndian.Uint64(data[:8]) + var genesisValidatorsRoot [32]byte + copy(genesisValidatorsRoot[:], data[8:]) + + return genesisTime, genesisValidatorsRoot, nil +} diff --git a/eth/genesis.go b/eth/genesis.go index 8df71c8..86220c3 100644 --- a/eth/genesis.go +++ b/eth/genesis.go @@ -44,21 +44,6 @@ type GenesisConfig struct { GenesisTime time.Time // Time at Genesis } -// GetConfigsByNetworkName returns the GenesisConfig, NetworkConfig, -// BeaconChainConfig and any error based on the input network name -func GetConfigsByNetworkName(net string) (*GenesisConfig, *params.NetworkConfig, *params.BeaconChainConfig, error) { - switch net { - case params.MainnetName: - return GenesisConfigs[net], params.BeaconNetworkConfig(), params.MainnetConfig(), nil - case params.SepoliaName: - return GenesisConfigs[net], params.BeaconNetworkConfig(), params.SepoliaConfig(), nil - case params.HoleskyName: - return GenesisConfigs[net], params.BeaconNetworkConfig(), params.HoleskyConfig(), nil - default: - return nil, nil, nil, fmt.Errorf("network %s not found", net) - } -} - var GenesisConfigs = map[string]*GenesisConfig{ params.MainnetName: { GenesisValidatorRoot: hexToBytes("4b363db94e286120d76eb905340fdd4e54bfe9f06bf33ff6cf5ad27f511bfe95"), diff --git a/eth/network_config.go b/eth/network_config.go new file mode 100644 index 0000000..154289a --- /dev/null +++ b/eth/network_config.go @@ -0,0 +1,120 @@ +package eth + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/prysmaticlabs/prysm/v5/config/params" +) + +type NetworkConfig struct { + Genesis *GenesisConfig + Network *params.NetworkConfig + Beacon *params.BeaconChainConfig +} + +func DeriveKnownNetworkConfig(ctx context.Context, network string) (*NetworkConfig, error) { + if network == params.DevnetName { + return nil, errors.New("network devnet not supported - use DeriveDevnetConfig instead") + } + + defaultBeaconNetworkConfig := params.BeaconNetworkConfig() + + switch network { + case params.MainnetName: + return &NetworkConfig{ + Genesis: GenesisConfigs[network], + Beacon: params.MainnetConfig(), + Network: defaultBeaconNetworkConfig, + }, nil + case params.SepoliaName: + return &NetworkConfig{ + Genesis: GenesisConfigs[network], + Beacon: params.SepoliaConfig(), + Network: defaultBeaconNetworkConfig, + }, nil + case params.HoleskyName: + return &NetworkConfig{ + Genesis: GenesisConfigs[network], + Beacon: params.HoleskyConfig(), + Network: defaultBeaconNetworkConfig, + }, nil + case params.DevnetName: + return nil, errors.New("network devnet not supported") + default: + return nil, fmt.Errorf("network %s not found", network) + } +} + +type DevnetOptions struct { + ConfigURL string + BootnodesURL string + DepositContractBlockURL string + GenesisSSZURL string +} + +func (o *DevnetOptions) Validate() error { + if o.ConfigURL == "" { + return errors.New("config URL is required") + } + + if o.BootnodesURL == "" { + return errors.New("bootnodes URL is required") + } + + if o.DepositContractBlockURL == "" { + return errors.New("deposit contract block URL is required") + } + + if o.GenesisSSZURL == "" { + return errors.New("genesis SSZ URL is required") + } + + return nil +} + +func DeriveDevnetConfig(ctx context.Context, options DevnetOptions) (*NetworkConfig, error) { + if err := options.Validate(); err != nil { + return nil, fmt.Errorf("invalid options: %w", err) + } + + // Fetch the beacon chain config from the provided URL + beaconConfig, err := FetchConfigFromURL(ctx, options.ConfigURL) + if err != nil { + return nil, fmt.Errorf("fetch beacon config: %w", err) + } + + // Fetch bootnode ENRs from the provided URL + bootnodeENRs, err := FetchBootnodeENRsFromURL(ctx, options.BootnodesURL) + if err != nil { + return nil, fmt.Errorf("fetch bootnode ENRs: %w", err) + } + + // Fetch deposit contract block from the provided URL + depositContractBlock, err := FetchDepositContractBlockFromURL(ctx, options.DepositContractBlockURL) + if err != nil { + return nil, fmt.Errorf("fetch deposit contract block: %w", err) + } + + // Fetch genesis details from the provided URL + genesisTime, genesisValidatorsRoot, err := FetchGenesisDetailsFromURL(ctx, options.GenesisSSZURL) + if err != nil { + return nil, fmt.Errorf("fetch genesis details: %w", err) + } + + network := params.BeaconNetworkConfig() + + network.BootstrapNodes = bootnodeENRs + network.ContractDeploymentBlock = depositContractBlock + + return &NetworkConfig{ + Genesis: &GenesisConfig{ + GenesisTime: time.Unix(int64(genesisTime), 0), + GenesisValidatorRoot: genesisValidatorsRoot[:], + }, + Network: network, + Beacon: beaconConfig, + }, nil +} diff --git a/eth/node.go b/eth/node.go index dfa45f0..12ebf2c 100644 --- a/eth/node.go +++ b/eth/node.go @@ -185,7 +185,7 @@ func NewNode(cfg *NodeConfig) (*Node, error) { } // initialize the custom Prysm client to communicate with its API - pryClient, err := NewPrysmClient(cfg.PrysmHost, cfg.PrysmPortHTTP, cfg.PrysmPortGRPC, cfg.DialTimeout) + pryClient, err := NewPrysmClient(cfg.PrysmHost, cfg.PrysmPortHTTP, cfg.PrysmPortGRPC, cfg.DialTimeout, cfg.GenesisConfig) if err != nil { return nil, fmt.Errorf("new prysm client") } diff --git a/eth/node_config.go b/eth/node_config.go index 185211f..f582de5 100644 --- a/eth/node_config.go +++ b/eth/node_config.go @@ -381,10 +381,10 @@ func desiredPubSubBaseTopics() []string { p2p.GossipAttesterSlashingMessage, p2p.GossipProposerSlashingMessage, p2p.GossipContributionAndProofMessage, - // p2p.GossipSyncCommitteeMessage, + p2p.GossipSyncCommitteeMessage, p2p.GossipBlsToExecutionChangeMessage, + p2p.GossipBlobSidecarMessage, p2p.GossipDataColumnSidecarMessage, - // p2p.GossipBlobSidecarMessage, } } @@ -451,7 +451,7 @@ func (n *NodeConfig) composeEthTopic(base string, encoder encoder.NetworkEncodin return fmt.Sprintf(base, n.ForkDigest) + encoder.ProtocolSuffix() } -func (n *NodeConfig) composeSubnettedEthTopic(base string, encoder encoder.NetworkEncoding, subnet uint64) string { +func (n *NodeConfig) composeEthTopicWithSubnet(base string, encoder encoder.NetworkEncoding, subnet uint64) string { return fmt.Sprintf(base, n.ForkDigest, subnet) + encoder.ProtocolSuffix() } @@ -467,8 +467,8 @@ func (n *NodeConfig) getDesiredFullTopics(encoder encoder.NetworkEncoding) []str } subnets, withSubnets := hasSubnets(topicBase) if withSubnets { - for subnet := uint64(1); subnet <= subnets; subnet++ { - fullTopics = append(fullTopics, n.composeSubnettedEthTopic(topicFormat, encoder, subnet)) + for subnet := uint64(0); subnet < subnets; subnet++ { + fullTopics = append(fullTopics, n.composeEthTopicWithSubnet(topicFormat, encoder, subnet)) } } else { fullTopics = append(fullTopics, n.composeEthTopic(topicFormat, encoder)) @@ -482,8 +482,9 @@ func (n *NodeConfig) getDefaultTopicScoreParams(encoder encoder.NetworkEncoding, desiredTopics := n.getDesiredFullTopics(encoder) topicScores := make(map[string]*pubsub.TopicScoreParams, len(desiredTopics)) for _, topic := range desiredTopics { - params := topicToScoreParamsMapper(topic, activeValidators) - topicScores[topic] = params + if params := topicToScoreParamsMapper(topic, activeValidators); params != nil { + topicScores[topic] = params + } } return topicScores } diff --git a/eth/prysm.go b/eth/prysm.go index 7d0ce96..6920494 100644 --- a/eth/prysm.go +++ b/eth/prysm.go @@ -38,9 +38,10 @@ type PrysmClient struct { tracer trace.Tracer beaconClient eth.BeaconChainClient beaconApiClient *apiCli.Client + genesis *GenesisConfig } -func NewPrysmClient(host string, portHTTP int, portGRPC int, timeout time.Duration) (*PrysmClient, error) { +func NewPrysmClient(host string, portHTTP int, portGRPC int, timeout time.Duration, genesis *GenesisConfig) (*PrysmClient, error) { tracer := otel.GetTracerProvider().Tracer("prysm_client") conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, portGRPC), @@ -63,6 +64,7 @@ func NewPrysmClient(host string, portHTTP int, portGRPC int, timeout time.Durati beaconApiClient: apiCli, timeout: timeout, tracer: tracer, + genesis: genesis, }, nil } @@ -320,26 +322,17 @@ func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte) } span.End() }() + // this checks whether the local fork_digest at hermes matches the one that the remote node keeps // request the genesis - nodeCnf, err := p.beaconApiClient.GetConfigSpec(ctx) - if err != nil { - return false, fmt.Errorf("request prysm node config to compose forkdigest: %w", err) - } - cnf := nodeCnf.Data.(map[string]interface{}) - genesisConf, _, _, err := GetConfigsByNetworkName(cnf["CONFIG_NAME"].(string)) - if err != nil { - return false, fmt.Errorf("not identified network from trusted node (%s): %w", cnf["CONFIG_NAME"].(string), err) - } - nodeFork, err := p.beaconApiClient.GetFork(ctx, apiCli.StateOrBlockId("head")) if err != nil { return false, fmt.Errorf("request beacon fork to compose forkdigest: %w", err) } - forkDigest, err := signing.ComputeForkDigest(nodeFork.CurrentVersion, genesisConf.GenesisValidatorRoot) + forkDigest, err := signing.ComputeForkDigest(nodeFork.CurrentVersion, p.genesis.GenesisValidatorRoot) if err != nil { - return false, fmt.Errorf("create fork digest (%s, %x): %w", hex.EncodeToString(nodeFork.CurrentVersion), genesisConf.GenesisValidatorRoot, err) + return false, fmt.Errorf("create fork digest (%s, %x): %w", hex.EncodeToString(nodeFork.CurrentVersion), p.genesis.GenesisValidatorRoot, err) } // check if our version is within the versions of the node if forkDigest == hermesForkDigest { diff --git a/eth/prysm_test.go b/eth/prysm_test.go index 6f14862..7d0753c 100644 --- a/eth/prysm_test.go +++ b/eth/prysm_test.go @@ -86,7 +86,7 @@ func TestPrysmClient_AddTrustedPeer(t *testing.T) { port, err := strconv.Atoi(serverURL.Port()) require.NoError(t, err) - p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second) + p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second, nil) require.NoError(t, err) err = p.AddTrustedPeer(context.Background(), pid, maddr) @@ -150,7 +150,7 @@ func TestPrysmClient_RemoveTrustedPeer(t *testing.T) { port, err := strconv.Atoi(serverURL.Port()) require.NoError(t, err) - p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second) + p, err := NewPrysmClient(serverURL.Hostname(), port, 0, time.Second, nil) require.NoError(t, err) err = p.RemoveTrustedPeer(context.Background(), pid) diff --git a/eth/pubsub.go b/eth/pubsub.go index d65c1a7..265c963 100644 --- a/eth/pubsub.go +++ b/eth/pubsub.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "github.com/ethereum/go-ethereum/common/hexutil" pubsub "github.com/libp2p/go-libp2p-pubsub" pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb" "github.com/libp2p/go-libp2p/core/peer" @@ -21,6 +22,8 @@ import ( "github.com/probe-lab/hermes/tele" ) +const eventTypeHandleMessage = "HANDLE_MESSAGE" + type PubSubConfig struct { Topics []string ForkVersion ForkVersion @@ -113,6 +116,24 @@ func (p *PubSub) mapPubSubTopicWithHandlers(topic string) host.TopicHandler { return p.handleBeaconDataColumnSidecar case strings.Contains(topic, p2p.GossipBlockMessage): return p.handleBeaconBlock + case strings.Contains(topic, p2p.GossipAggregateAndProofMessage): + return p.handleAggregateAndProof + case strings.Contains(topic, p2p.GossipAttestationMessage): + return p.handleAttestation + case strings.Contains(topic, p2p.GossipExitMessage): + return p.handleExitMessage + case strings.Contains(topic, p2p.GossipAttesterSlashingMessage): + return p.handleAttesterSlashingMessage + case strings.Contains(topic, p2p.GossipProposerSlashingMessage): + return p.handleProposerSlashingMessage + case strings.Contains(topic, p2p.GossipContributionAndProofMessage): + return p.handleContributtionAndProofMessage + case strings.Contains(topic, p2p.GossipSyncCommitteeMessage): + return p.handleSyncCommitteeMessage + case strings.Contains(topic, p2p.GossipBlsToExecutionChangeMessage): + return p.handleBlsToExecutionChangeMessage + case strings.Contains(topic, p2p.GossipBlobSidecarMessage): + return p.handleBlobSidecar default: return p.host.TracedTopicHandler(host.NoopHandler) } @@ -159,7 +180,7 @@ func (p *PubSub) handleBeaconAttestation(ctx context.Context, msg *pubsub.Messag }, } - if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } @@ -190,7 +211,7 @@ func (p *PubSub) handleBeaconDataColumnSidecar(ctx context.Context, msg *pubsub. }, } - if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } @@ -212,7 +233,7 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) evt := &host.TraceEvent{ - Type: "HANDLE_MESSAGE", + Type: eventTypeHandleMessage, PeerID: p.host.ID(), Timestamp: now, Payload: map[string]any{ @@ -225,17 +246,336 @@ func (p *PubSub) handleBeaconBlock(ctx context.Context, msg *pubsub.Message) err "Slot": slot, "Root": root, "TimeInSlot": now.Sub(slotStart).Seconds(), - "Timestamp": now, }, } - if err := p.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleAttestation(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + if msg == nil || msg.Topic == nil || *msg.Topic == "" { + return fmt.Errorf("nil message or topic") + } + + attestation := ethtypes.Attestation{} + err := p.cfg.Encoder.DecodeGossip(msg.Data, &attestation) + if err != nil { + return fmt.Errorf("decode attestation gossip message: %w", err) + } + + payload := map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "CommIdx": attestation.GetData().GetCommitteeIndex(), + "Slot": attestation.GetData().GetSlot(), + "BeaconBlockRoot": attestation.GetData().GetBeaconBlockRoot(), + "Source": attestation.GetData().GetSource(), + "Target": attestation.GetData().GetTarget(), + } + + // If the attestation only has one aggregation bit set, we can add an additional field to the payload + // that denotes _which_ aggregation bit is set. This is required to determine which validator created the attestation. + // In the pursuit of reducing the amount of data stored in the data stream we omit this field if the attestation is + // aggregated. + if attestation.GetAggregationBits().Count() == 1 { + payload["AggregatePos"] = attestation.AggregationBits.BitIndices()[0] + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: payload, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } return nil } +func (p *PubSub) handleAggregateAndProof(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + ap := ðtypes.SignedAggregateAttestationAndProof{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, ap); err != nil { + return fmt.Errorf("decode aggregate and proof message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Sig": hexutil.Encode(ap.GetSignature()), + "AggIdx": ap.GetMessage().GetAggregatorIndex(), + "SelectionProof": hexutil.Encode(ap.GetMessage().GetSelectionProof()), + // There are other details in the SignedAggregateAttestationAndProof message, add them when needed. + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn( + "failed putting topic handler event", + "topic", msg.GetTopic(), + "err", tele.LogAttrError(err), + ) + } + + return nil +} + +func (p *PubSub) handleExitMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + ve := ðtypes.VoluntaryExit{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, ve); err != nil { + return fmt.Errorf("decode voluntary exit message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Epoch": ve.GetEpoch(), + "ValIdx": ve.GetValidatorIndex(), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting voluntary exit event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleAttesterSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + as := ðtypes.AttesterSlashing{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, as); err != nil { + return fmt.Errorf("decode attester slashing message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Att1_indices": as.GetAttestation_1().GetAttestingIndices(), + "Att2_indices": as.GetAttestation_2().GetAttestingIndices(), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting attester slashing event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleProposerSlashingMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + ps := ðtypes.ProposerSlashing{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, ps); err != nil { + return fmt.Errorf("decode proposer slashing message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Header1_Slot": ps.GetHeader_1().GetHeader().GetSlot(), + "Header1_ProposerIndex": ps.GetHeader_1().GetHeader().GetProposerIndex(), + "Header1_StateRoot": hexutil.Encode(ps.GetHeader_1().GetHeader().GetStateRoot()), + "Header2_Slot": ps.GetHeader_2().GetHeader().GetSlot(), + "Header2_ProposerIndex": ps.GetHeader_2().GetHeader().GetProposerIndex(), + "Header2_StateRoot": hexutil.Encode(ps.GetHeader_2().GetHeader().GetStateRoot()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting proposer slashing event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleContributtionAndProofMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + cp := ðtypes.SignedContributionAndProof{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, cp); err != nil { + return fmt.Errorf("decode contribution and proof message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Sig": hexutil.Encode(cp.GetSignature()), + "AggIdx": cp.GetMessage().GetAggregatorIndex(), + "Contrib_Slot": cp.GetMessage().GetContribution().GetSlot(), + "Contrib_SubCommitteeIdx": cp.GetMessage().GetContribution().GetSubcommitteeIndex(), + "Contrib_BlockRoot": cp.GetMessage().GetContribution().GetBlockRoot(), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting contribution and proof event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleSyncCommitteeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + sc := ðtypes.SyncCommitteeMessage{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, sc); err != nil { + return fmt.Errorf("decode sync committee message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Slot": sc.GetSlot(), + "ValIdx": sc.GetValidatorIndex(), + "BlockRoot": hexutil.Encode(sc.GetBlockRoot()), + "Signature": hexutil.Encode(sc.GetSignature()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting sync committee event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleBlsToExecutionChangeMessage(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + pb := ðtypes.BLSToExecutionChange{} + if err := p.cfg.Encoder.DecodeGossip(msg.Data, pb); err != nil { + return fmt.Errorf("decode bls to execution change message: %w", err) + } + + evt := &host.TraceEvent{ + Type: eventTypeHandleMessage, + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "ValIdx": pb.GetValidatorIndex(), + "FromBlsPubkey": hexutil.Encode(pb.GetFromBlsPubkey()), + "ToExecutionAddress": hexutil.Encode(pb.GetToExecutionAddress()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting bls to execution change event", tele.LogAttrError(err)) + } + + return nil +} + +func (p *PubSub) handleBlobSidecar(ctx context.Context, msg *pubsub.Message) error { + now := time.Now() + + switch p.cfg.ForkVersion { + case DenebForkVersion: + var blob ethtypes.BlobSidecar + err := p.cfg.Encoder.DecodeGossip(msg.Data, &blob) + if err != nil { + slog.Error("decode blob sidecar gossip message", tele.LogAttrError(err)) + return err + } + + slot := blob.GetSignedBlockHeader().GetHeader().GetSlot() + slotStart := p.cfg.GenesisTime.Add(time.Duration(slot) * p.cfg.SecondsPerSlot) + proposerIndex := blob.GetSignedBlockHeader().GetHeader().GetProposerIndex() + + evt := &host.TraceEvent{ + Type: "HANDLE_MESSAGE", + PeerID: p.host.ID(), + Timestamp: now, + Payload: map[string]any{ + "PeerID": msg.ReceivedFrom.String(), + "MsgID": hex.EncodeToString([]byte(msg.ID)), + "MsgSize": len(msg.Data), + "Topic": msg.GetTopic(), + "Seq": msg.GetSeqno(), + "Slot": slot, + "ValIdx": proposerIndex, + "index": blob.GetIndex(), + "TimeInSlot": now.Sub(slotStart).Seconds(), + "StateRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetStateRoot()), + "BodyRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetBodyRoot()), + "ParentRoot": hexutil.Encode(blob.GetSignedBlockHeader().GetHeader().GetParentRoot()), + }, + } + + if err := p.cfg.DataStream.PutRecord(ctx, evt); err != nil { + slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) + } + default: + return fmt.Errorf("non recognized fork-version: %d", p.cfg.ForkVersion[:]) + } + + return nil +} + type GenericSignedBeaconBlock interface { GetBlock() GenericBeaconBlock } diff --git a/eth/reqresp.go b/eth/reqresp.go index 0505188..04b0519 100644 --- a/eth/reqresp.go +++ b/eth/reqresp.go @@ -278,7 +278,7 @@ func (r *ReqResp) wrapStreamHandler(ctx context.Context, name string, handler Co Payload: commonData, } - if err := r.cfg.DataStream.PutEvent(ctx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(ctx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -575,7 +575,7 @@ func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, e } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -631,7 +631,7 @@ func (r *ReqResp) Ping(ctx context.Context, pid peer.ID) (err error) { }, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } @@ -693,7 +693,7 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV Payload: reqData, } traceCtx := context.Background() - if err := r.cfg.DataStream.PutEvent(traceCtx, traceEvt); err != nil { + if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil { slog.Warn("failed to put record", tele.LogAttrError(err)) } diff --git a/go.mod b/go.mod index 3e46406..4fdd9e8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/kinesis v1.27.3 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 github.com/dennis-tra/go-kinesis v0.0.0-20240326083914-7acf5f8dc24e - github.com/ethereum/go-ethereum v1.13.14 + github.com/ethereum/go-ethereum v1.13.15 github.com/ferranbt/fastssz v0.1.3 github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 @@ -34,6 +34,7 @@ require ( golang.org/x/time v0.5.0 google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 + gopkg.in/yaml.v3 v3.0.1 ) require ( @@ -202,7 +203,6 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apimachinery v0.20.0 // indirect k8s.io/client-go v0.20.0 // indirect k8s.io/klog/v2 v2.80.0 // indirect diff --git a/go.sum b/go.sum index 1abfd14..4121e8f 100644 --- a/go.sum +++ b/go.sum @@ -249,8 +249,8 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ethereum/c-kzg-4844 v1.0.1-0.20240422190800-13be436f5927 h1:ffWmm0RUR2+VqJsCkf94HqgEwZi2fgbm2iq+O/GdJNI= github.com/ethereum/c-kzg-4844 v1.0.1-0.20240422190800-13be436f5927/go.mod h1:VewdlzQmpT5QSrVhbBuGoCdFJkpaJlO1aQputP83wc0= -github.com/ethereum/go-ethereum v1.13.14 h1:EwiY3FZP94derMCIam1iW4HFVrSgIcpsu0HwTQtm6CQ= -github.com/ethereum/go-ethereum v1.13.14/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU= +github.com/ethereum/go-ethereum v1.13.15 h1:U7sSGYGo4SPjP6iNIifNoyIAiNjrmQkz6EwQG+/EZWo= +github.com/ethereum/go-ethereum v1.13.15/go.mod h1:TN8ZiHrdJwSe8Cb6x+p0hs5CxhJZPbqB7hHkaUXcmIU= github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/ferranbt/fastssz v0.1.3 h1:ZI+z3JH05h4kgmFXdHuR1aWYsgrg7o+Fw7/NCzM16Mo= diff --git a/host/callback.go b/host/callback.go index 9e1cdd4..c2df4db 100644 --- a/host/callback.go +++ b/host/callback.go @@ -47,8 +47,8 @@ func (c *CallbackDataStream) Stop(ctx context.Context) error { return ctx.Err() } -// PutEvent sends an event to the callback if the stream has not been stopped. -func (c *CallbackDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { +// PutRecord sends an event to the callback if the stream has not been stopped. +func (c *CallbackDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { if c.stopped { return ctx.Err() } diff --git a/host/flush_tracer.go b/host/flush_tracer.go index 4a346b2..de87c2b 100644 --- a/host/flush_tracer.go +++ b/host/flush_tracer.go @@ -63,7 +63,7 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) defer cancel() - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("Failed to put trace event payload", tele.LogAttrError(err)) return } @@ -72,39 +72,39 @@ func (h *Host) FlushTraceWithTimestamp(evtType string, timestamp time.Time, payl } func (h *Host) AddPeer(p peer.ID, proto protocol.ID) { - h.FlushTrace("ADD_PEER", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_ADD_PEER.String(), map[string]any{ "PeerID": p, "Protocol": proto, }) } func (h *Host) RemovePeer(p peer.ID) { - h.FlushTrace("REMOVE_PEER", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_REMOVE_PEER.String(), map[string]any{ "PeerID": p, }) } func (h *Host) Join(topic string) { - h.FlushTrace("JOIN", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_JOIN.String(), map[string]any{ "Topic": topic, }) } func (h *Host) Leave(topic string) { - h.FlushTrace("LEAVE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_LEAVE.String(), map[string]any{ "Topic": topic, }) } func (h *Host) Graft(p peer.ID, topic string) { - h.FlushTrace("GRAFT", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_GRAFT.String(), map[string]any{ "PeerID": p, "Topic": topic, }) } func (h *Host) Prune(p peer.ID, topic string) { - h.FlushTrace("PRUNE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_PRUNE.String(), map[string]any{ "PeerID": p, "Topic": topic, }) @@ -122,7 +122,7 @@ func (h *Host) ValidateMessage(msg *pubsub.Message) { } func (h *Host) DeliverMessage(msg *pubsub.Message) { - h.FlushTrace("DELIVER_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_DELIVER_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -133,7 +133,7 @@ func (h *Host) DeliverMessage(msg *pubsub.Message) { } func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { - h.FlushTrace("REJECT_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_REJECT_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -145,7 +145,7 @@ func (h *Host) RejectMessage(msg *pubsub.Message, reason string) { } func (h *Host) DuplicateMessage(msg *pubsub.Message) { - h.FlushTrace("DUPLICATE_MESSAGE", map[string]any{ + h.FlushTrace(pubsubpb.TraceEvent_DUPLICATE_MESSAGE.String(), map[string]any{ "PeerID": msg.ReceivedFrom, "Topic": msg.GetTopic(), "MsgID": hex.EncodeToString([]byte(msg.ID)), @@ -186,18 +186,18 @@ func (h *Host) Trace(evt *pubsubpb.TraceEvent) { ts := time.Unix(0, evt.GetTimestamp()) switch evt.GetType() { case pubsubpb.TraceEvent_PUBLISH_MESSAGE: - h.FlushTraceWithTimestamp("PUBLISH_MESSAGE", ts, map[string]any{ + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_PUBLISH_MESSAGE.String(), ts, map[string]any{ "MsgID": evt.GetPublishMessage().GetMessageID(), "Topic": evt.GetPublishMessage().GetTopic(), }) case pubsubpb.TraceEvent_RECV_RPC: payload := newRPCMeta(evt.GetRecvRPC().GetReceivedFrom(), evt.GetRecvRPC().GetMeta()) - h.FlushTraceWithTimestamp("RECV_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_RECV_RPC.String(), ts, payload) case pubsubpb.TraceEvent_SEND_RPC: payload := newRPCMeta(evt.GetSendRPC().GetSendTo(), evt.GetSendRPC().GetMeta()) - h.FlushTraceWithTimestamp("SEND_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_SEND_RPC.String(), ts, payload) case pubsubpb.TraceEvent_DROP_RPC: payload := newRPCMeta(evt.GetDropRPC().GetSendTo(), evt.GetDropRPC().GetMeta()) - h.FlushTraceWithTimestamp("DROP_RPC", ts, payload) + h.FlushTraceWithTimestamp(pubsubpb.TraceEvent_DROP_RPC.String(), ts, payload) } } diff --git a/host/host.go b/host/host.go index 2a0d3e1..e09dc24 100644 --- a/host/host.go +++ b/host/host.go @@ -97,7 +97,7 @@ func (h *Host) Serve(ctx context.Context) error { }, } - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("Failed to put event payload", tele.LogAttrError(err)) return } @@ -270,6 +270,7 @@ func (h *Host) PrivateListenMaddr() (ma.Multiaddr, error) { func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { return func(ctx context.Context, msg *pubsub.Message) error { + slog.Debug("Handling gossip message", "topic", msg.GetTopic()) evt := &TraceEvent{ Type: "HANDLE_MESSAGE", PeerID: h.ID(), @@ -283,7 +284,7 @@ func (h *Host) TracedTopicHandler(handler TopicHandler) TopicHandler { }, } - if err := h.cfg.DataStream.PutEvent(ctx, evt); err != nil { + if err := h.cfg.DataStream.PutRecord(ctx, evt); err != nil { slog.Warn("failed putting topic handler event", tele.LogAttrError(err)) } @@ -328,6 +329,6 @@ func (h *Host) UpdatePeerScore(scores map[peer.ID]*pubsub.PeerScoreSnapshot) { traceCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - h.cfg.DataStream.PutEvent(traceCtx, trace) + h.cfg.DataStream.PutRecord(traceCtx, trace) } } diff --git a/host/kinesis.go b/host/kinesis.go index a9ce570..dad62f6 100644 --- a/host/kinesis.go +++ b/host/kinesis.go @@ -36,9 +36,12 @@ func (k *KinesisDataStream) Start(ctx context.Context) error { dsCtx, dsCancel := context.WithCancel(ctx) k.ctx = dsCtx - k.cancelFn = dsCancel + if err := k.producer.Start(ctx); err != nil { + return err + } + <-dsCtx.Done() return dsCtx.Err() @@ -51,7 +54,7 @@ func (k *KinesisDataStream) Stop(ctx context.Context) error { if err := k.producer.WaitIdle(timeoutCtx); err != nil { slog.Info("Error waiting for producer to become idle", tele.LogAttrError(err)) } - + timeoutCncl() // stop the producer k.cancelFn() @@ -67,8 +70,8 @@ func (k *KinesisDataStream) Stop(ctx context.Context) error { return k.producer.WaitIdle(ctx) } -// PutEvent sends an event to the Kinesis data stream. -func (k *KinesisDataStream) PutEvent(ctx context.Context, event *TraceEvent) error { +// PutRecord sends an event to the Kinesis data stream. +func (k *KinesisDataStream) PutRecord(ctx context.Context, event *TraceEvent) error { if event != nil { kRecord := gk.Record(event) diff --git a/host/producer.go b/host/producer.go index 874c1eb..8e6f755 100644 --- a/host/producer.go +++ b/host/producer.go @@ -7,7 +7,7 @@ import ( type DataStream interface { Start(ctx context.Context) error Stop(ctx context.Context) error - PutEvent(ctx context.Context, event *TraceEvent) error + PutRecord(ctx context.Context, event *TraceEvent) error Type() DataStreamType }