From 8627bc2dd4371f3a7629bf0b24a024b1c001d3f3 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sat, 10 Aug 2024 20:50:57 +0200 Subject: [PATCH] feat(explorer): relax token deletion with error threshold (#3211) feat(explorer): relax token deletion with error threashold Signed-off-by: Ettore Di Giacinto --- core/cli/explorer.go | 9 ++++---- core/explorer/discovery.go | 43 ++++++++++++++++++++++++++++++-------- 2 files changed, 39 insertions(+), 13 deletions(-) diff --git a/core/cli/explorer.go b/core/cli/explorer.go index 0fcde7283e0b..f3e3618de18d 100644 --- a/core/cli/explorer.go +++ b/core/cli/explorer.go @@ -10,9 +10,10 @@ import ( ) type ExplorerCMD struct { - Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` - PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"` - ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"` + Address string `env:"LOCALAI_ADDRESS,ADDRESS" default:":8080" help:"Bind address for the API server" group:"api"` + PoolDatabase string `env:"LOCALAI_POOL_DATABASE,POOL_DATABASE" default:"explorer.json" help:"Path to the pool database" group:"api"` + ConnectionTimeout string `env:"LOCALAI_CONNECTION_TIMEOUT,CONNECTION_TIMEOUT" default:"2m" help:"Connection timeout for the explorer" group:"api"` + ConnectionErrorThreshold int `env:"LOCALAI_CONNECTION_ERROR_THRESHOLD,CONNECTION_ERROR_THRESHOLD" default:"3" help:"Connection failure threshold for the explorer" group:"api"` } func (e *ExplorerCMD) Run(ctx *cliContext.Context) error { @@ -26,7 +27,7 @@ func (e *ExplorerCMD) Run(ctx *cliContext.Context) error { if err != nil { return err } - ds := explorer.NewDiscoveryServer(db, dur) + ds := explorer.NewDiscoveryServer(db, dur, e.ConnectionErrorThreshold) go ds.Start(context.Background()) appHTTP := http.Explorer(db, ds) diff --git a/core/explorer/discovery.go b/core/explorer/discovery.go index 73281dc0787b..dc2b6e88dffe 100644 --- a/core/explorer/discovery.go +++ b/core/explorer/discovery.go @@ -15,9 +15,11 @@ import ( type DiscoveryServer struct { sync.Mutex - database *Database - networkState *NetworkState - connectionTime time.Duration + database *Database + networkState *NetworkState + connectionTime time.Duration + failures map[string]int + errorThreshold int } type NetworkState struct { @@ -32,16 +34,20 @@ func (s *DiscoveryServer) NetworkState() *NetworkState { // NewDiscoveryServer creates a new DiscoveryServer with the given Database. // it keeps the db state in sync with the network state -func NewDiscoveryServer(db *Database, dur time.Duration) *DiscoveryServer { +func NewDiscoveryServer(db *Database, dur time.Duration, failureThreshold int) *DiscoveryServer { if dur == 0 { dur = 50 * time.Second } + if failureThreshold == 0 { + failureThreshold = 3 + } return &DiscoveryServer{ database: db, connectionTime: dur, networkState: &NetworkState{ Networks: map[string]Network{}, }, + errorThreshold: failureThreshold, } } @@ -66,21 +72,21 @@ func (s *DiscoveryServer) runBackground() { n, err := p2p.NewNode(token) if err != nil { log.Err(err).Msg("Failed to create node") - s.database.Delete(token) + s.failedToken(token) continue } err = n.Start(c) if err != nil { log.Err(err).Msg("Failed to start node") - s.database.Delete(token) + s.failedToken(token) continue } ledger, err := n.Ledger() if err != nil { log.Err(err).Msg("Failed to start ledger") - s.database.Delete(token) + s.failedToken(token) continue } @@ -114,8 +120,27 @@ func (s *DiscoveryServer) runBackground() { } s.Unlock() } else { - log.Info().Any("network", token).Msg("No workers found in the network. Removing it from the database") - s.database.Delete(token) + s.failedToken(token) + } + } + + s.deleteFailedConnections() +} + +func (s *DiscoveryServer) failedToken(token string) { + s.Lock() + defer s.Unlock() + s.failures[token]++ +} + +func (s *DiscoveryServer) deleteFailedConnections() { + s.Lock() + defer s.Unlock() + for k, v := range s.failures { + if v > s.errorThreshold { + log.Info().Any("network", k).Msg("Network has been removed from the database") + s.database.Delete(k) + delete(s.failures, k) } } }