Skip to content

Commit

Permalink
Merge pull request #37 from probe-lab/feat/support-idontwants
Browse files Browse the repository at this point in the history
Add trace support for IDONTWANT control messages
  • Loading branch information
cortze authored Sep 25, 2024
2 parents 226c724 + 614e34e commit f3c2886
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 155 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,5 @@ fabric.properties
*.sh
*.txt
.vscode
*/venv
*/venv
build/
24 changes: 21 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,30 @@ GIT_DIRTY := $(shell git diff --quiet || echo '-dirty')
GIT_SHA := $(shell git rev-parse --short HEAD)
GIT_TAG := ${GIT_SHA}${GIT_DIRTY}

GOCC=go
TARGET_PATH=./cmd/hermes
BIN_PATH=./build
BIN=./build/hermes
GIT_PACKAGE=github.com/probe-lab/hermes

.PHONY: install uninstall build clean tidy format docker docker-push

build:
$(GOCC) get $(TARGET_PATH)
$(GOCC) build -o $(BIN) $(TARGET_PATH)

install:
$(GOCC) install $(GIT_PACKAGE)

uninstall:
$(GOCC) clean $(GIT_PACKAGE)

format:
gofumpt -w -l .

docker:
docker build --platform linux/amd64 -t "${REPO_SERVER}/probelab:hermes-${GIT_TAG}" .

docker-push: docker
docker push "${REPO_SERVER}/probelab:hermes-${GIT_TAG}"
docker rmi "${REPO_SERVER}/probelab:hermes-${GIT_TAG}"

format:
gofumpt -w -l .
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ COMMANDS:
GLOBAL OPTIONS:
--help, -h show help
Kinesis Configuration:
DataStream Configuration:
--kinesis.region value The region of the AWS Kinesis Data Stream [$HERMES_KINESIS_REGION]
--kinesis.stream value The name of the AWS Kinesis Data Stream [$HERMES_KINESIS_DATA_STREAM]
--data.stream.type value Format where the traces will be submitted: logger, kinesis, or callback. (default: "logger") [$HERMES_DATA_STREAM_TYPE]
--kinesis.region value The region of the AWS Kinesis Data Stream [$HERMES_KINESIS_REGION]
--kinesis.stream value The name of the AWS Kinesis Data Stream [$HERMES_KINESIS_DATA_STREAM]
Logging Configuration:
Expand Down
36 changes: 26 additions & 10 deletions cmd/hermes/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"

"github.com/probe-lab/hermes/host"
"github.com/probe-lab/hermes/tele"
)

const (
flagCategoryLogging = "Logging Configuration:"
flagCategoryTelemetry = "Telemetry Configuration:"
flagCategoryKinesis = "Kinesis Configuration:"
flagCategoryLogging = "Logging Configuration:"
flagCategoryTelemetry = "Telemetry Configuration:"
flagCategoryDataStream = "DataStream Configuration:"
)

var rootConfig = struct {
Expand All @@ -39,6 +40,7 @@ var rootConfig = struct {
TracingEnabled bool
TracingAddr string
TracingPort int
DataStreamType string
KinesisRegion string
KinesisStream string

Expand All @@ -61,6 +63,7 @@ var rootConfig = struct {
TracingEnabled: false,
TracingAddr: "localhost",
TracingPort: 4317, // default jaeger port
DataStreamType: host.DataStreamTypeLogger.String(),
KinesisRegion: "",
KinesisStream: "",

Expand Down Expand Up @@ -169,21 +172,29 @@ var rootFlags = []cli.Flag{
Destination: &rootConfig.TracingPort,
Category: flagCategoryTelemetry,
},
&cli.StringFlag{
Name: "data.stream.type",
EnvVars: []string{"HERMES_DATA_STREAM_TYPE"},
Usage: "Format where the traces will be submitted: logger, kinesis, or callback.",
Value: rootConfig.DataStreamType,
Destination: &rootConfig.DataStreamType,
Category: flagCategoryDataStream,
},
&cli.StringFlag{
Name: "kinesis.region",
EnvVars: []string{"HERMES_KINESIS_REGION"},
Usage: "The region of the AWS Kinesis Data Stream",
Value: rootConfig.KinesisRegion,
Destination: &rootConfig.KinesisRegion,
Category: flagCategoryKinesis,
Category: flagCategoryDataStream,
},
&cli.StringFlag{
Name: "kinesis.stream",
EnvVars: []string{"HERMES_KINESIS_DATA_STREAM"},
Usage: "The name of the AWS Kinesis Data Stream",
Value: rootConfig.KinesisStream,
Destination: &rootConfig.KinesisStream,
Category: flagCategoryKinesis,
Category: flagCategoryDataStream,
},
}

Expand Down Expand Up @@ -232,12 +243,17 @@ func rootBefore(c *cli.Context) error {
}

// if either parameter is set explicitly, we consider Kinesis to be enabled
if c.IsSet("kinesis.region") || c.IsSet("kinesis.stream") {
awsConfig, err := config.LoadDefaultConfig(c.Context, config.WithRegion(rootConfig.KinesisRegion))
if err != nil {
return fmt.Errorf("load AWS configuration: %w", err)
if c.IsSet("data.stream.type") {
dataStreamType := host.DataStreamtypeFromStr(c.String("data.stream.type"))
if dataStreamType == host.DataStreamTypeKinesis {
if c.IsSet("kinesis.region") || c.IsSet("kinesis.stream") {
awsConfig, err := config.LoadDefaultConfig(c.Context, config.WithRegion(rootConfig.KinesisRegion))
if err != nil {
return fmt.Errorf("load AWS configuration: %w", err)
}
rootConfig.awsConfig = &awsConfig
}
}
rootConfig.awsConfig = &awsConfig
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions cmd/hermes/cmd_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/otel"

"github.com/probe-lab/hermes/eth"
"github.com/probe-lab/hermes/host"
"github.com/probe-lab/hermes/tele"
)

Expand Down Expand Up @@ -294,6 +295,7 @@ func cmdEthAction(c *cli.Context) error {
PrysmPortHTTP: ethConfig.PrysmPortHTTP,
PrysmPortGRPC: ethConfig.PrysmPortGRPC,
AWSConfig: rootConfig.awsConfig,
DataStreamType: host.DataStreamtypeFromStr(rootConfig.DataStreamType),
KinesisRegion: rootConfig.KinesisRegion,
KinesisStream: rootConfig.KinesisStream,
MaxPeers: ethConfig.MaxPeers,
Expand Down
12 changes: 10 additions & 2 deletions eth/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ func NewNode(cfg *NodeConfig) (*Node, error) {
initNetworkForkVersions(cfg.BeaconConfig)

var ds host.DataStream
if cfg.AWSConfig != nil {
switch cfg.DataStreamType {
case host.DataStreamTypeLogger:
ds = new(host.TraceLogger)

case host.DataStreamTypeKinesis:
droppedTraces, err := cfg.Meter.Int64Counter("dropped_traces")
if err != nil {
return nil, fmt.Errorf("new dropped_traces counter: %w", err)
Expand Down Expand Up @@ -111,8 +115,12 @@ func NewNode(cfg *NodeConfig) (*Node, error) {
}

ds = host.NewKinesisDataStream(p)
} else {

case host.DataStreamTypeCallback:
ds = host.NewCallbackDataStream()

default:
return nil, fmt.Errorf("not recognised data-stream (%s)", cfg.DataStreamType)
}

hostCfg := &host.Config{
Expand Down
24 changes: 14 additions & 10 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ type NodeConfig struct {
PrysmPortHTTP int
PrysmPortGRPC int

// The AWS Kinesis Data Stream configuration
AWSConfig *aws.Config // if set, we consider Kinesis to be enabled
KinesisRegion string
KinesisStream string
// The Data Stream configuration
DataStreamType host.DataStreamType
AWSConfig *aws.Config
KinesisRegion string
KinesisStream string

// The maximum number of peers our libp2p host can be connected to.
MaxPeers int
Expand Down Expand Up @@ -162,13 +163,16 @@ func (n *NodeConfig) Validate() error {
return fmt.Errorf("dialer count must be positive, got %d", n.DialConcurrency)
}

if n.AWSConfig != nil {
if n.KinesisStream == "" {
return fmt.Errorf("kinesis is enabled but stream is not set")
}
// ensure that if the data stream is AWS, the parameters where given
if n.DataStreamType == host.DataStreamTypeKinesis {
if n.AWSConfig != nil {
if n.KinesisStream == "" {
return fmt.Errorf("kinesis is enabled but stream is not set")
}

if n.KinesisRegion == "" {
return fmt.Errorf("kinesis is enabled but region is not set")
if n.KinesisRegion == "" {
return fmt.Errorf("kinesis is enabled but region is not set")
}
}
}

Expand Down
1 change: 0 additions & 1 deletion eth/prysm.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,5 +339,4 @@ func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte)
return true, nil
}
return false, nil

}
2 changes: 1 addition & 1 deletion eth/topic_score_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func committeeCountPerSlot(activeValidators uint64) uint64 {
}

func slotCommitteeCount(activeValidatorCount uint64) uint64 {
var committeesPerSlot = activeValidatorCount / currentBeaconConfig.SecondsPerSlot / currentBeaconConfig.TargetCommitteeSize
committeesPerSlot := activeValidatorCount / currentBeaconConfig.SecondsPerSlot / currentBeaconConfig.TargetCommitteeSize
if committeesPerSlot > currentBeaconConfig.MaxCommitteesPerSlot {
return currentBeaconConfig.MaxCommitteesPerSlot
}
Expand Down
Loading

0 comments on commit f3c2886

Please sign in to comment.