diff --git a/deploy/fly/env/production.env b/deploy/fly/env/production.env index 09c6852ef..1b551e965 100644 --- a/deploy/fly/env/production.env +++ b/deploy/fly/env/production.env @@ -4,9 +4,9 @@ NAME=wormscan-fly REPLICAS=5 IMAGE_NAME= RESOURCES_LIMITS_MEMORY=512Mi -RESOURCES_LIMITS_CPU=500m +RESOURCES_LIMITS_CPU=700m RESOURCES_REQUESTS_MEMORY=384Mi -RESOURCES_REQUESTS_CPU=250m +RESOURCES_REQUESTS_CPU=500m SQS_URL= SQS_AWS_REGION= P2P_NETWORK=mainnet @@ -14,4 +14,9 @@ PPROF_ENABLED=false MAX_HEALTH_TIME_SECONDS=90 AWS_IAM_ROLE= ALERT_ENABLED=false -METRICS_ENABLED=true \ No newline at end of file +METRICS_ENABLED=true +OBSERVATIONS_CHANNEL_SIZE=2500 +VAAS_CHANNEL_SIZE=150 +HEARTBEATS_CHANNEL_SIZE=50 +GOVERNOR_CONFIG_CHANNEL_SIZE=50 +GOVERNOR_STATUS_CHANNEL_SIZE=50 \ No newline at end of file diff --git a/deploy/fly/env/staging.env b/deploy/fly/env/staging.env index 9f3b06cda..2666c0176 100644 --- a/deploy/fly/env/staging.env +++ b/deploy/fly/env/staging.env @@ -14,4 +14,9 @@ PPROF_ENABLED=true MAX_HEALTH_TIME_SECONDS=90 AWS_IAM_ROLE= ALERT_ENABLED=false -METRICS_ENABLED=true \ No newline at end of file +METRICS_ENABLED=true +OBSERVATIONS_CHANNEL_SIZE=2500 +VAAS_CHANNEL_SIZE=150 +HEARTBEATS_CHANNEL_SIZE=50 +GOVERNOR_CONFIG_CHANNEL_SIZE=50 +GOVERNOR_STATUS_CHANNEL_SIZE=50 \ No newline at end of file diff --git a/deploy/fly/env/test.env b/deploy/fly/env/test.env index aff9e9c62..00a1109f1 100644 --- a/deploy/fly/env/test.env +++ b/deploy/fly/env/test.env @@ -14,4 +14,9 @@ PPROF_ENABLED=false MAX_HEALTH_TIME_SECONDS=300 AWS_IAM_ROLE= ALERT_ENABLED=false -METRICS_ENABLED=true \ No newline at end of file +METRICS_ENABLED=true +OBSERVATIONS_CHANNEL_SIZE=50 +VAAS_CHANNEL_SIZE=50 +HEARTBEATS_CHANNEL_SIZE=50 +GOVERNOR_CONFIG_CHANNEL_SIZE=50 +GOVERNOR_STATUS_CHANNEL_SIZE=50 \ No newline at end of file diff --git a/deploy/fly/fly-service.yaml b/deploy/fly/fly-service.yaml index 6987c8610..ed65b4adb 100644 --- a/deploy/fly/fly-service.yaml +++ b/deploy/fly/fly-service.yaml @@ -83,6 +83,16 @@ spec: value: "{{ .ALERT_ENABLED }}" - name: METRICS_ENABLED value: "{{ .METRICS_ENABLED }}" + - name: OBSERVATIONS_CHANNEL_SIZE + value: "{{ .OBSERVATIONS_CHANNEL_SIZE }}" + - name: VAAS_CHANNEL_SIZE + value: "{{ .VAAS_CHANNEL_SIZE }}" + - name: HEARTBEATS_CHANNEL_SIZE + value: "{{ .HEARTBEATS_CHANNEL_SIZE }}" + - name: GOVERNOR_CONFIG_CHANNEL_SIZE + value: "{{ .GOVERNOR_CONFIG_CHANNEL_SIZE }}" + - name: GOVERNOR_STATUS_CHANNEL_SIZE + value: "{{ .GOVERNOR_STATUS_CHANNEL_SIZE }}" resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/fly/config/config.go b/fly/config/config.go index a41b71595..88f2aed1b 100644 --- a/fly/config/config.go +++ b/fly/config/config.go @@ -1,10 +1,13 @@ package config import ( + "context" "fmt" "os" "strconv" + "github.com/joho/godotenv" + "github.com/sethvargo/go-envconfig" "github.com/wormhole-foundation/wormhole-explorer/common/client/alert" "github.com/wormhole-foundation/wormhole-explorer/common/domain" ) @@ -119,3 +122,23 @@ func GetPrefix() string { prefix := p2pNetwork.Enviroment + "-" + GetEnvironment() return prefix } + +type Configuration struct { + ObservationsChannelSize int `env:"OBSERVATIONS_CHANNEL_SIZE,required"` + VaasChannelSize int `env:"VAAS_CHANNEL_SIZE,required"` + HeartbeatsChannelSize int `env:"HEARTBEATS_CHANNEL_SIZE,required"` + GovernorConfigChannelSize int `env:"GOVERNOR_CONFIG_CHANNEL_SIZE,required"` + GovernorStatusChannelSize int `env:"GOVERNOR_STATUS_CHANNEL_SIZE,required"` +} + +// New creates a configuration with the values from .env file and environment variables. +func New(ctx context.Context) (*Configuration, error) { + _ = godotenv.Load(".env", "../.env") + + var configuration Configuration + if err := envconfig.Process(ctx, &configuration); err != nil { + return nil, err + } + + return &configuration, nil +} diff --git a/fly/go.mod b/fly/go.mod index 8e8ad83ba..c3ec7ee19 100644 --- a/fly/go.mod +++ b/fly/go.mod @@ -17,6 +17,7 @@ require ( github.com/joho/godotenv v1.4.0 github.com/libp2p/go-libp2p-core v0.20.0 github.com/patrickmn/go-cache v2.1.0+incompatible + github.com/sethvargo/go-envconfig v0.9.0 github.com/stretchr/testify v1.8.1 github.com/test-go/testify v1.1.4 github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 diff --git a/fly/go.sum b/fly/go.sum index 74af6e2e3..4c4cc5568 100644 --- a/fly/go.sum +++ b/fly/go.sum @@ -2724,6 +2724,8 @@ github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/serialx/hashring v0.0.0-20190422032157-8b2912629002/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc= +github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE= +github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0= github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs= github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc= github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= diff --git a/fly/main.go b/fly/main.go index 4ecf94207..5cdd53ceb 100644 --- a/fly/main.go +++ b/fly/main.go @@ -3,6 +3,7 @@ package main import ( "context" "flag" + "log" "strconv" "strings" @@ -215,6 +216,12 @@ func main() { fmt.Println("No .env file found") } + // load configuration + cfg, err := config.New(rootCtx) + if err != nil { + log.Fatal("Error creating config", err) + } + // Node's main lifecycle context. rootCtx, rootCtxCancel = context.WithCancel(context.Background()) defer rootCtxCancel() @@ -280,25 +287,25 @@ func main() { sendC := make(chan []byte) // Inbound observations - obsvC := make(chan *gossipv1.SignedObservation, 50) + obsvC := make(chan *gossipv1.SignedObservation, cfg.ObservationsChannelSize) - // Inbound observation requests + // Inbound observation requests - we don't add a environment because we are going to delete this channel obsvReqC := make(chan *gossipv1.ObservationRequest, 50) // Inbound signed VAAs - signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50) + signedInC := make(chan *gossipv1.SignedVAAWithQuorum, cfg.VaasChannelSize) // Heartbeat updates - heartbeatC := make(chan *gossipv1.Heartbeat, 50) + heartbeatC := make(chan *gossipv1.Heartbeat, cfg.HeartbeatsChannelSize) // Guardian set state managed by processor gst := common.NewGuardianSetState(heartbeatC) // Governor cfg - govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, 50) + govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, cfg.GovernorConfigChannelSize) // Governor status - govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, 50) + govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, cfg.GovernorStatusChannelSize) // Bootstrap guardian set, otherwise heartbeats would be skipped // TODO: fetch this and probably figure out how to update it live @@ -575,12 +582,16 @@ func discardMessages[T any](ctx context.Context, obsvReqC chan T) { // filterObservation filter observation by enviroment. func filterObservationByEnv(o *gossipv1.SignedObservation, enviroment string) bool { if enviroment == domain.P2pTestNet { - // filter pyth message in test enviroment (for solana and pyth chain). + // filter pyth message in testnet gossip network (for solana and pyth chain). if strings.Contains((o.GetMessageId()), "1/f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0") || strings.HasPrefix("26/", o.GetMessageId()) { return true } } + // filter pyth message in mainnet gossip network (for pyth chain). + if enviroment == domain.P2pMainNet && strings.HasPrefix("26/", o.GetMessageId()) { + return true + } return false }