Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trace support for IDONTWANT control messages #37

Merged
merged 9 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,5 @@ fabric.properties
*.sh
*.txt
.vscode
*/venv
*/venv
build/
24 changes: 21 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,30 @@ GIT_DIRTY := $(shell git diff --quiet || echo '-dirty')
GIT_SHA := $(shell git rev-parse --short HEAD)
GIT_TAG := ${GIT_SHA}${GIT_DIRTY}

GOCC=go
TARGET_PATH=./cmd/hermes
BIN_PATH=./build
BIN=./build/hermes
GIT_PACKAGE=github.com/probe-lab/hermes

.PHONY: install uninstall build clean tidy format docker docker-push

build:
$(GOCC) get $(TARGET_PATH)
$(GOCC) build -o $(BIN) $(TARGET_PATH)

install:
$(GOCC) install $(GIT_PACKAGE)

uninstall:
$(GOCC) clean $(GIT_PACKAGE)

format:
gofumpt -w -l .

docker:
docker build --platform linux/amd64 -t "${REPO_SERVER}/probelab:hermes-${GIT_TAG}" .

docker-push: docker
docker push "${REPO_SERVER}/probelab:hermes-${GIT_TAG}"
docker rmi "${REPO_SERVER}/probelab:hermes-${GIT_TAG}"

format:
gofumpt -w -l .
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ COMMANDS:
GLOBAL OPTIONS:
--help, -h show help

Kinesis Configuration:
DataStream Configuration:

--kinesis.region value The region of the AWS Kinesis Data Stream [$HERMES_KINESIS_REGION]
--kinesis.stream value The name of the AWS Kinesis Data Stream [$HERMES_KINESIS_DATA_STREAM]
--data.stream.type value Format where the traces will be submitted: logger, kinesis, or callback. (default: "logger") [$HERMES_DATA_STREAM_TYPE]
--kinesis.region value The region of the AWS Kinesis Data Stream [$HERMES_KINESIS_REGION]
--kinesis.stream value The name of the AWS Kinesis Data Stream [$HERMES_KINESIS_DATA_STREAM]

Logging Configuration:

Expand Down
36 changes: 26 additions & 10 deletions cmd/hermes/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ import (
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"

"github.com/probe-lab/hermes/host"
"github.com/probe-lab/hermes/tele"
)

const (
flagCategoryLogging = "Logging Configuration:"
flagCategoryTelemetry = "Telemetry Configuration:"
flagCategoryKinesis = "Kinesis Configuration:"
flagCategoryLogging = "Logging Configuration:"
flagCategoryTelemetry = "Telemetry Configuration:"
flagCategoryDataStream = "DataStream Configuration:"
)

var rootConfig = struct {
Expand All @@ -39,6 +40,7 @@ var rootConfig = struct {
TracingEnabled bool
TracingAddr string
TracingPort int
DataStreamType string
KinesisRegion string
KinesisStream string

Expand All @@ -61,6 +63,7 @@ var rootConfig = struct {
TracingEnabled: false,
TracingAddr: "localhost",
TracingPort: 4317, // default jaeger port
DataStreamType: host.DataStreamTypeLogger.String(),
KinesisRegion: "",
KinesisStream: "",

Expand Down Expand Up @@ -169,21 +172,29 @@ var rootFlags = []cli.Flag{
Destination: &rootConfig.TracingPort,
Category: flagCategoryTelemetry,
},
&cli.StringFlag{
Name: "data.stream.type",
EnvVars: []string{"HERMES_DATA_STREAM_TYPE"},
Usage: "Format where the traces will be submitted: logger, kinesis, or callback.",
Value: rootConfig.DataStreamType,
Destination: &rootConfig.DataStreamType,
Category: flagCategoryDataStream,
},
&cli.StringFlag{
Name: "kinesis.region",
EnvVars: []string{"HERMES_KINESIS_REGION"},
Usage: "The region of the AWS Kinesis Data Stream",
Value: rootConfig.KinesisRegion,
Destination: &rootConfig.KinesisRegion,
Category: flagCategoryKinesis,
Category: flagCategoryDataStream,
},
&cli.StringFlag{
Name: "kinesis.stream",
EnvVars: []string{"HERMES_KINESIS_DATA_STREAM"},
Usage: "The name of the AWS Kinesis Data Stream",
Value: rootConfig.KinesisStream,
Destination: &rootConfig.KinesisStream,
Category: flagCategoryKinesis,
Category: flagCategoryDataStream,
},
}

Expand Down Expand Up @@ -232,12 +243,17 @@ func rootBefore(c *cli.Context) error {
}

// if either parameter is set explicitly, we consider Kinesis to be enabled
if c.IsSet("kinesis.region") || c.IsSet("kinesis.stream") {
awsConfig, err := config.LoadDefaultConfig(c.Context, config.WithRegion(rootConfig.KinesisRegion))
if err != nil {
return fmt.Errorf("load AWS configuration: %w", err)
if c.IsSet("data.stream.type") {
dataStreamType := host.DataStreamtypeFromStr(c.String("data.stream.type"))
if dataStreamType == host.DataStreamTypeKinesis {
if c.IsSet("kinesis.region") || c.IsSet("kinesis.stream") {
awsConfig, err := config.LoadDefaultConfig(c.Context, config.WithRegion(rootConfig.KinesisRegion))
if err != nil {
return fmt.Errorf("load AWS configuration: %w", err)
}
rootConfig.awsConfig = &awsConfig
}
}
rootConfig.awsConfig = &awsConfig
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions cmd/hermes/cmd_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/otel"

"github.com/probe-lab/hermes/eth"
"github.com/probe-lab/hermes/host"
"github.com/probe-lab/hermes/tele"
)

Expand Down Expand Up @@ -294,6 +295,7 @@ func cmdEthAction(c *cli.Context) error {
PrysmPortHTTP: ethConfig.PrysmPortHTTP,
PrysmPortGRPC: ethConfig.PrysmPortGRPC,
AWSConfig: rootConfig.awsConfig,
DataStreamType: host.DataStreamtypeFromStr(rootConfig.DataStreamType),
KinesisRegion: rootConfig.KinesisRegion,
KinesisStream: rootConfig.KinesisStream,
MaxPeers: ethConfig.MaxPeers,
Expand Down
12 changes: 10 additions & 2 deletions eth/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@ func NewNode(cfg *NodeConfig) (*Node, error) {
initNetworkForkVersions(cfg.BeaconConfig)

var ds host.DataStream
if cfg.AWSConfig != nil {
switch cfg.DataStreamType {
case host.DataStreamTypeLogger:
ds = new(host.TraceLogger)

case host.DataStreamTypeKinesis:
droppedTraces, err := cfg.Meter.Int64Counter("dropped_traces")
if err != nil {
return nil, fmt.Errorf("new dropped_traces counter: %w", err)
Expand Down Expand Up @@ -111,8 +115,12 @@ func NewNode(cfg *NodeConfig) (*Node, error) {
}

ds = host.NewKinesisDataStream(p)
} else {

case host.DataStreamTypeCallback:
ds = host.NewCallbackDataStream()

default:
return nil, fmt.Errorf("not recognised data-stream (%s)", cfg.DataStreamType)
}

hostCfg := &host.Config{
Expand Down
24 changes: 14 additions & 10 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ type NodeConfig struct {
PrysmPortHTTP int
PrysmPortGRPC int

// The AWS Kinesis Data Stream configuration
AWSConfig *aws.Config // if set, we consider Kinesis to be enabled
KinesisRegion string
KinesisStream string
// The Data Stream configuration
DataStreamType host.DataStreamType
AWSConfig *aws.Config
KinesisRegion string
KinesisStream string

// The maximum number of peers our libp2p host can be connected to.
MaxPeers int
Expand Down Expand Up @@ -162,13 +163,16 @@ func (n *NodeConfig) Validate() error {
return fmt.Errorf("dialer count must be positive, got %d", n.DialConcurrency)
}

if n.AWSConfig != nil {
if n.KinesisStream == "" {
return fmt.Errorf("kinesis is enabled but stream is not set")
}
// ensure that if the data stream is AWS, the parameters where given
if n.DataStreamType == host.DataStreamTypeKinesis {
if n.AWSConfig != nil {
if n.KinesisStream == "" {
return fmt.Errorf("kinesis is enabled but stream is not set")
}

if n.KinesisRegion == "" {
return fmt.Errorf("kinesis is enabled but region is not set")
if n.KinesisRegion == "" {
return fmt.Errorf("kinesis is enabled but region is not set")
}
}
}

Expand Down
1 change: 0 additions & 1 deletion eth/prysm.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,5 +339,4 @@ func (p *PrysmClient) isOnNetwork(ctx context.Context, hermesForkDigest [4]byte)
return true, nil
}
return false, nil

}
2 changes: 1 addition & 1 deletion eth/topic_score_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func committeeCountPerSlot(activeValidators uint64) uint64 {
}

func slotCommitteeCount(activeValidatorCount uint64) uint64 {
var committeesPerSlot = activeValidatorCount / currentBeaconConfig.SecondsPerSlot / currentBeaconConfig.TargetCommitteeSize
committeesPerSlot := activeValidatorCount / currentBeaconConfig.SecondsPerSlot / currentBeaconConfig.TargetCommitteeSize
if committeesPerSlot > currentBeaconConfig.MaxCommitteesPerSlot {
return currentBeaconConfig.MaxCommitteesPerSlot
}
Expand Down
Loading
Loading