From 7278bf3de898b136adaf34917a1a4783eac65d2e Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sat, 17 Aug 2024 08:28:52 +0200 Subject: [PATCH] chore: allow to disable gallery endpoints, improve p2p connection handling (#3256) * Add more debug messages Signed-off-by: Ettore Di Giacinto * feat: allow to disable gallery endpoints Signed-off-by: Ettore Di Giacinto * improve p2p messaging Signed-off-by: Ettore Di Giacinto * improve error handling Signed-off-by: Ettore Di Giacinto * Make sure to close the listening socket when context is exhausted Signed-off-by: Ettore Di Giacinto --------- Signed-off-by: Ettore Di Giacinto --- core/cli/run.go | 5 +++++ core/config/application_config.go | 6 ++++++ core/http/routes/localai.go | 21 +++++++++++---------- core/p2p/federated.go | 8 ++++---- core/p2p/federated_server.go | 10 ++++------ core/p2p/p2p.go | 29 ++++++++++++++--------------- 6 files changed, 44 insertions(+), 35 deletions(-) diff --git a/core/cli/run.go b/core/cli/run.go index 707f6afbcc0a..b2f73ef935a7 100644 --- a/core/cli/run.go +++ b/core/cli/run.go @@ -64,6 +64,7 @@ type RunCMD struct { EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"` WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"` Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"` + DisableGalleryEndpoint bool `env:"LOCALAI_DISABLE_GALLERY_ENDPOINT,DISABLE_GALLERY_ENDPOINT" help:"Disable the gallery endpoints" group:"api"` } func (r *RunCMD) Run(ctx *cliContext.Context) error { @@ -164,6 +165,10 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error { opts = append(opts, config.DisableWebUI) } + if r.DisableGalleryEndpoint { + opts = append(opts, config.DisableGalleryEndpoint) + } + if idleWatchDog || busyWatchDog { opts = append(opts, config.EnableWatchDog) if idleWatchDog { diff --git a/core/config/application_config.go b/core/config/application_config.go index 6e8c46e1ba31..947c4f136ba5 100644 --- a/core/config/application_config.go +++ b/core/config/application_config.go @@ -57,6 +57,8 @@ type ApplicationConfig struct { ModelsURL []string WatchDogBusyTimeout, WatchDogIdleTimeout time.Duration + + DisableGalleryEndpoint bool } type AppOption func(*ApplicationConfig) @@ -131,6 +133,10 @@ var EnableWatchDogIdleCheck = func(o *ApplicationConfig) { o.WatchDogIdle = true } +var DisableGalleryEndpoint = func(o *ApplicationConfig) { + o.DisableGalleryEndpoint = true +} + var EnableWatchDogBusyCheck = func(o *ApplicationConfig) { o.WatchDog = true o.WatchDogBusy = true diff --git a/core/http/routes/localai.go b/core/http/routes/localai.go index 9c4200101583..105991e85904 100644 --- a/core/http/routes/localai.go +++ b/core/http/routes/localai.go @@ -21,17 +21,18 @@ func RegisterLocalAIRoutes(app *fiber.App, app.Get("/swagger/*", swagger.HandlerDefault) // default // LocalAI API endpoints + if !appConfig.DisableGalleryEndpoint { + modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService) + app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint()) + app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint()) - modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService) - app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint()) - app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint()) - - app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint()) - app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint()) - app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint()) - app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint()) - app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint()) - app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint()) + app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint()) + app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint()) + app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint()) + app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint()) + app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint()) + app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint()) + } app.Post("/tts", auth, localai.TTSEndpoint(cl, ml, appConfig)) diff --git a/core/p2p/federated.go b/core/p2p/federated.go index 8e468ef68edc..454ddc1b357f 100644 --- a/core/p2p/federated.go +++ b/core/p2p/federated.go @@ -80,7 +80,7 @@ func (fs *FederatedServer) SelectLeastUsedServer() string { fs.Lock() defer fs.Unlock() - log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table") + log.Debug().Any("request_table", fs.requestTable).Msgf("SelectLeastUsedServer()") // cycle over requestTable and find the entry with the lower number // if there are multiple entries with the same number, select one randomly @@ -93,7 +93,7 @@ func (fs *FederatedServer) SelectLeastUsedServer() string { minKey = k } } - log.Debug().Any("requests_served", min).Msgf("Selected tunnel %s", minKey) + log.Debug().Any("requests_served", min).Any("request_table", fs.requestTable).Msgf("Selected tunnel %s", minKey) return minKey } @@ -104,7 +104,7 @@ func (fs *FederatedServer) RecordRequest(nodeID string) { // increment the counter for the nodeID in the requestTable fs.requestTable[nodeID]++ - log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table") + log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Recording request") } func (fs *FederatedServer) ensureRecordExist(nodeID string) { @@ -114,5 +114,5 @@ func (fs *FederatedServer) ensureRecordExist(nodeID string) { fs.requestTable[nodeID] = 0 } - log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table") + log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Ensure record exists") } diff --git a/core/p2p/federated_server.go b/core/p2p/federated_server.go index 18080b2f775c..acd6e7bf9031 100644 --- a/core/p2p/federated_server.go +++ b/core/p2p/federated_server.go @@ -46,7 +46,10 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { return err } // ll.Info("Binding local port on", srcaddr) - + go func() { + <-ctx.Done() + l.Close() + }() ledger, _ := node.Ledger() // Announce ourselves so nodes accepts our connection @@ -54,17 +57,12 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { ctx, 10*time.Second, func() { - // Retrieve current ID for ip in the blockchain - //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) - // If mismatch, update the blockchain - //if !found { updatedMap := map[string]interface{}{} updatedMap[node.Host().ID().String()] = &types.User{ PeerID: node.Host().ID().String(), Timestamp: time.Now().String(), } ledger.Add(protocol.UsersLedgerKey, updatedMap) - // } }, ) diff --git a/core/p2p/p2p.go b/core/p2p/p2p.go index af2106be2bc4..758cb621b059 100644 --- a/core/p2p/p2p.go +++ b/core/p2p/p2p.go @@ -51,6 +51,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv zlog.Error().Err(err).Msg("Error listening") return err } + go func() { + <-ctx.Done() + l.Close() + }() + // ll.Info("Binding local port on", srcaddr) ledger, _ := node.Ledger() @@ -60,17 +65,12 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv ctx, 10*time.Second, func() { - // Retrieve current ID for ip in the blockchain - //_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String()) - // If mismatch, update the blockchain - //if !found { updatedMap := map[string]interface{}{} updatedMap[node.Host().ID().String()] = &types.User{ PeerID: node.Host().ID().String(), Timestamp: time.Now().String(), } ledger.Add(protocol.UsersLedgerKey, updatedMap) - // } }, ) @@ -197,14 +197,13 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin return default: time.Sleep(5 * time.Second) - zlog.Debug().Msg("Searching for workers") data := ledger.LastBlock().Storage[servicesID] zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data") for k, v := range data { - zlog.Info().Msgf("Found worker %s", k) + zlog.Debug().Msgf("New worker found in the ledger data '%s'", k) nd := &NodeData{} if err := v.Unmarshal(nd); err != nil { zlog.Error().Msg("cannot unmarshal node data") @@ -245,8 +244,10 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string // Start the service port, err := freeport.GetFreePort() if err != nil { - fmt.Print(err) + zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID) + return } + tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port) nd.TunnelAddress = tunnelAddress service[nd.Name] = nodeServiceData{ @@ -310,10 +311,6 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er ctx, 20*time.Second, func() { - // Retrieve current ID for ip in the blockchain - //_, found := ledger.GetKey("services_localai", name) - // If mismatch, update the blockchain - //if !found { updatedMap := map[string]interface{}{} updatedMap[name] = &NodeData{ Name: name, @@ -321,7 +318,6 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er ID: nodeID(name), } ledger.Add(servicesID, updatedMap) - // } }, ) @@ -354,7 +350,10 @@ func newNodeOpts(token string) ([]node.Option, error) { if loglevel == "" { loglevel = "info" } - + libp2ploglevel := os.Getenv("LOCALAI_LIBP2P_LOGLEVEL") + if libp2ploglevel == "" { + libp2ploglevel = "info" + } c := config.Config{ Limit: config.ResourceLimit{ Enable: noLimits, @@ -363,7 +362,7 @@ func newNodeOpts(token string) ([]node.Option, error) { NetworkToken: token, LowProfile: false, LogLevel: loglevel, - Libp2pLogLevel: "fatal", + Libp2pLogLevel: libp2ploglevel, Ledger: config.Ledger{ SyncInterval: defaultInterval, AnnounceInterval: defaultInterval,