Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fly] Missing observations #570

Merged
merged 1 commit into from
Jul 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions deploy/fly/env/production.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ NAME=wormscan-fly
REPLICAS=5
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=512Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_LIMITS_CPU=700m
RESOURCES_REQUESTS_MEMORY=384Mi
RESOURCES_REQUESTS_CPU=250m
RESOURCES_REQUESTS_CPU=500m
SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
PPROF_ENABLED=false
MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=2500
VAAS_CHANNEL_SIZE=150
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
7 changes: 6 additions & 1 deletion deploy/fly/env/staging.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ PPROF_ENABLED=true
MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=2500
VAAS_CHANNEL_SIZE=150
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
7 changes: 6 additions & 1 deletion deploy/fly/env/test.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ PPROF_ENABLED=false
MAX_HEALTH_TIME_SECONDS=300
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=50
VAAS_CHANNEL_SIZE=50
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
10 changes: 10 additions & 0 deletions deploy/fly/fly-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ spec:
value: "{{ .ALERT_ENABLED }}"
- name: METRICS_ENABLED
value: "{{ .METRICS_ENABLED }}"
- name: OBSERVATIONS_CHANNEL_SIZE
value: "{{ .OBSERVATIONS_CHANNEL_SIZE }}"
- name: VAAS_CHANNEL_SIZE
value: "{{ .VAAS_CHANNEL_SIZE }}"
- name: HEARTBEATS_CHANNEL_SIZE
value: "{{ .HEARTBEATS_CHANNEL_SIZE }}"
- name: GOVERNOR_CONFIG_CHANNEL_SIZE
value: "{{ .GOVERNOR_CONFIG_CHANNEL_SIZE }}"
- name: GOVERNOR_STATUS_CHANNEL_SIZE
value: "{{ .GOVERNOR_STATUS_CHANNEL_SIZE }}"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
Expand Down
23 changes: 23 additions & 0 deletions fly/config/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package config

import (
"context"
"fmt"
"os"
"strconv"

"github.com/joho/godotenv"
"github.com/sethvargo/go-envconfig"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
)
Expand Down Expand Up @@ -119,3 +122,23 @@ func GetPrefix() string {
prefix := p2pNetwork.Enviroment + "-" + GetEnvironment()
return prefix
}

type Configuration struct {
ObservationsChannelSize int `env:"OBSERVATIONS_CHANNEL_SIZE,required"`
VaasChannelSize int `env:"VAAS_CHANNEL_SIZE,required"`
HeartbeatsChannelSize int `env:"HEARTBEATS_CHANNEL_SIZE,required"`
GovernorConfigChannelSize int `env:"GOVERNOR_CONFIG_CHANNEL_SIZE,required"`
GovernorStatusChannelSize int `env:"GOVERNOR_STATUS_CHANNEL_SIZE,required"`
}

// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")

var configuration Configuration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}

return &configuration, nil
}
1 change: 1 addition & 0 deletions fly/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/joho/godotenv v1.4.0
github.com/libp2p/go-libp2p-core v0.20.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/sethvargo/go-envconfig v0.9.0
github.com/stretchr/testify v1.8.1
github.com/test-go/testify v1.1.4
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
Expand Down
2 changes: 2 additions & 0 deletions fly/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2724,6 +2724,8 @@ github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/serialx/hashring v0.0.0-20190422032157-8b2912629002/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc=
github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE=
github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0=
github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs=
github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc=
github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
25 changes: 18 additions & 7 deletions fly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"log"
"strconv"
"strings"

Expand Down Expand Up @@ -215,6 +216,12 @@ func main() {
fmt.Println("No .env file found")
}

// load configuration
cfg, err := config.New(rootCtx)
if err != nil {
log.Fatal("Error creating config", err)
}

// Node's main lifecycle context.
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()
Expand Down Expand Up @@ -280,25 +287,25 @@ func main() {
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
obsvC := make(chan *gossipv1.SignedObservation, cfg.ObservationsChannelSize)

// Inbound observation requests
// Inbound observation requests - we don't add a environment because we are going to delete this channel
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, cfg.VaasChannelSize)

// Heartbeat updates
heartbeatC := make(chan *gossipv1.Heartbeat, 50)
heartbeatC := make(chan *gossipv1.Heartbeat, cfg.HeartbeatsChannelSize)

// Guardian set state managed by processor
gst := common.NewGuardianSetState(heartbeatC)

// Governor cfg
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, 50)
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, cfg.GovernorConfigChannelSize)

// Governor status
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, 50)
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, cfg.GovernorStatusChannelSize)

// Bootstrap guardian set, otherwise heartbeats would be skipped
// TODO: fetch this and probably figure out how to update it live
Expand Down Expand Up @@ -575,12 +582,16 @@ func discardMessages[T any](ctx context.Context, obsvReqC chan T) {
// filterObservation filter observation by enviroment.
func filterObservationByEnv(o *gossipv1.SignedObservation, enviroment string) bool {
if enviroment == domain.P2pTestNet {
// filter pyth message in test enviroment (for solana and pyth chain).
// filter pyth message in testnet gossip network (for solana and pyth chain).
if strings.Contains((o.GetMessageId()), "1/f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0") ||
strings.HasPrefix("26/", o.GetMessageId()) {
return true
}
}
// filter pyth message in mainnet gossip network (for pyth chain).
if enviroment == domain.P2pMainNet && strings.HasPrefix("26/", o.GetMessageId()) {
return true
}
return false
}

Expand Down
Loading