Skip to content

Commit

Permalink
[fly] Missing observations (#570)
Browse files Browse the repository at this point in the history
Add environment variables for buffer channels (observations y vaas)
Increment size of buffer channels for observations and vaas
Align k8s resources in fly

Co-authored-by: walker-16 <agpazos85@gmail.com>
  • Loading branch information
ftocal and walker-16 committed Jul 24, 2023
1 parent 1d512db commit 94307b8
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 12 deletions.
11 changes: 8 additions & 3 deletions deploy/fly/env/production.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ NAME=wormscan-fly
REPLICAS=5
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=512Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_LIMITS_CPU=700m
RESOURCES_REQUESTS_MEMORY=384Mi
RESOURCES_REQUESTS_CPU=250m
RESOURCES_REQUESTS_CPU=500m
SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
PPROF_ENABLED=false
MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=2500
VAAS_CHANNEL_SIZE=150
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
7 changes: 6 additions & 1 deletion deploy/fly/env/staging.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ PPROF_ENABLED=true
MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=2500
VAAS_CHANNEL_SIZE=150
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
7 changes: 6 additions & 1 deletion deploy/fly/env/test.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ PPROF_ENABLED=false
MAX_HEALTH_TIME_SECONDS=300
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=50
VAAS_CHANNEL_SIZE=50
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50
10 changes: 10 additions & 0 deletions deploy/fly/fly-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ spec:
value: "{{ .ALERT_ENABLED }}"
- name: METRICS_ENABLED
value: "{{ .METRICS_ENABLED }}"
- name: OBSERVATIONS_CHANNEL_SIZE
value: "{{ .OBSERVATIONS_CHANNEL_SIZE }}"
- name: VAAS_CHANNEL_SIZE
value: "{{ .VAAS_CHANNEL_SIZE }}"
- name: HEARTBEATS_CHANNEL_SIZE
value: "{{ .HEARTBEATS_CHANNEL_SIZE }}"
- name: GOVERNOR_CONFIG_CHANNEL_SIZE
value: "{{ .GOVERNOR_CONFIG_CHANNEL_SIZE }}"
- name: GOVERNOR_STATUS_CHANNEL_SIZE
value: "{{ .GOVERNOR_STATUS_CHANNEL_SIZE }}"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
Expand Down
23 changes: 23 additions & 0 deletions fly/config/config.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package config

import (
"context"
"fmt"
"os"
"strconv"

"github.com/joho/godotenv"
"github.com/sethvargo/go-envconfig"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
)
Expand Down Expand Up @@ -119,3 +122,23 @@ func GetPrefix() string {
prefix := p2pNetwork.Enviroment + "-" + GetEnvironment()
return prefix
}

type Configuration struct {
ObservationsChannelSize int `env:"OBSERVATIONS_CHANNEL_SIZE,required"`
VaasChannelSize int `env:"VAAS_CHANNEL_SIZE,required"`
HeartbeatsChannelSize int `env:"HEARTBEATS_CHANNEL_SIZE,required"`
GovernorConfigChannelSize int `env:"GOVERNOR_CONFIG_CHANNEL_SIZE,required"`
GovernorStatusChannelSize int `env:"GOVERNOR_STATUS_CHANNEL_SIZE,required"`
}

// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")

var configuration Configuration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}

return &configuration, nil
}
1 change: 1 addition & 0 deletions fly/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/joho/godotenv v1.4.0
github.com/libp2p/go-libp2p-core v0.20.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/sethvargo/go-envconfig v0.9.0
github.com/stretchr/testify v1.8.1
github.com/test-go/testify v1.1.4
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8
Expand Down
2 changes: 2 additions & 0 deletions fly/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2724,6 +2724,8 @@ github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/serialx/hashring v0.0.0-20190422032157-8b2912629002/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc=
github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE=
github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0=
github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs=
github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc=
github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
Expand Down
25 changes: 18 additions & 7 deletions fly/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"log"
"strconv"
"strings"

Expand Down Expand Up @@ -215,6 +216,12 @@ func main() {
fmt.Println("No .env file found")
}

// load configuration
cfg, err := config.New(rootCtx)
if err != nil {
log.Fatal("Error creating config", err)
}

// Node's main lifecycle context.
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()
Expand Down Expand Up @@ -280,25 +287,25 @@ func main() {
sendC := make(chan []byte)

// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
obsvC := make(chan *gossipv1.SignedObservation, cfg.ObservationsChannelSize)

// Inbound observation requests
// Inbound observation requests - we don't add a environment because we are going to delete this channel
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)

// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, cfg.VaasChannelSize)

// Heartbeat updates
heartbeatC := make(chan *gossipv1.Heartbeat, 50)
heartbeatC := make(chan *gossipv1.Heartbeat, cfg.HeartbeatsChannelSize)

// Guardian set state managed by processor
gst := common.NewGuardianSetState(heartbeatC)

// Governor cfg
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, 50)
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, cfg.GovernorConfigChannelSize)

// Governor status
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, 50)
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, cfg.GovernorStatusChannelSize)

// Bootstrap guardian set, otherwise heartbeats would be skipped
// TODO: fetch this and probably figure out how to update it live
Expand Down Expand Up @@ -575,12 +582,16 @@ func discardMessages[T any](ctx context.Context, obsvReqC chan T) {
// filterObservation filter observation by enviroment.
func filterObservationByEnv(o *gossipv1.SignedObservation, enviroment string) bool {
if enviroment == domain.P2pTestNet {
// filter pyth message in test enviroment (for solana and pyth chain).
// filter pyth message in testnet gossip network (for solana and pyth chain).
if strings.Contains((o.GetMessageId()), "1/f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0") ||
strings.HasPrefix("26/", o.GetMessageId()) {
return true
}
}
// filter pyth message in mainnet gossip network (for pyth chain).
if enviroment == domain.P2pMainNet && strings.HasPrefix("26/", o.GetMessageId()) {
return true
}
return false
}

Expand Down

0 comments on commit 94307b8

Please sign in to comment.