Skip to content

Commit

Permalink
chore: allow to disable gallery endpoints, improve p2p connection han…
Browse files Browse the repository at this point in the history
…dling (#3256)

* Add more debug messages

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat: allow to disable gallery endpoints

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* improve p2p messaging

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* improve error handling

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* Make sure to close the listening socket when context is exhausted

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
  • Loading branch information
mudler authored Aug 17, 2024
1 parent d6b3fbb commit 7278bf3
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 35 deletions.
5 changes: 5 additions & 0 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type RunCMD struct {
EnableWatchdogBusy bool `env:"LOCALAI_WATCHDOG_BUSY,WATCHDOG_BUSY" default:"false" help:"Enable watchdog for stopping backends that are busy longer than the watchdog-busy-timeout" group:"backends"`
WatchdogBusyTimeout string `env:"LOCALAI_WATCHDOG_BUSY_TIMEOUT,WATCHDOG_BUSY_TIMEOUT" default:"5m" help:"Threshold beyond which a busy backend should be stopped" group:"backends"`
Federated bool `env:"LOCALAI_FEDERATED,FEDERATED" help:"Enable federated instance" group:"federated"`
DisableGalleryEndpoint bool `env:"LOCALAI_DISABLE_GALLERY_ENDPOINT,DISABLE_GALLERY_ENDPOINT" help:"Disable the gallery endpoints" group:"api"`
}

func (r *RunCMD) Run(ctx *cliContext.Context) error {
Expand Down Expand Up @@ -164,6 +165,10 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
opts = append(opts, config.DisableWebUI)
}

if r.DisableGalleryEndpoint {
opts = append(opts, config.DisableGalleryEndpoint)
}

if idleWatchDog || busyWatchDog {
opts = append(opts, config.EnableWatchDog)
if idleWatchDog {
Expand Down
6 changes: 6 additions & 0 deletions core/config/application_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type ApplicationConfig struct {
ModelsURL []string

WatchDogBusyTimeout, WatchDogIdleTimeout time.Duration

DisableGalleryEndpoint bool
}

type AppOption func(*ApplicationConfig)
Expand Down Expand Up @@ -131,6 +133,10 @@ var EnableWatchDogIdleCheck = func(o *ApplicationConfig) {
o.WatchDogIdle = true
}

var DisableGalleryEndpoint = func(o *ApplicationConfig) {
o.DisableGalleryEndpoint = true
}

var EnableWatchDogBusyCheck = func(o *ApplicationConfig) {
o.WatchDog = true
o.WatchDogBusy = true
Expand Down
21 changes: 11 additions & 10 deletions core/http/routes/localai.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ func RegisterLocalAIRoutes(app *fiber.App,
app.Get("/swagger/*", swagger.HandlerDefault) // default

// LocalAI API endpoints
if !appConfig.DisableGalleryEndpoint {
modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService)
app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint())
app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint())

modelGalleryEndpointService := localai.CreateModelGalleryEndpointService(appConfig.Galleries, appConfig.ModelPath, galleryService)
app.Post("/models/apply", auth, modelGalleryEndpointService.ApplyModelGalleryEndpoint())
app.Post("/models/delete/:name", auth, modelGalleryEndpointService.DeleteModelGalleryEndpoint())

app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint())
app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint())
app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint())
app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint())
app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint())
app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint())
app.Get("/models/available", auth, modelGalleryEndpointService.ListModelFromGalleryEndpoint())
app.Get("/models/galleries", auth, modelGalleryEndpointService.ListModelGalleriesEndpoint())
app.Post("/models/galleries", auth, modelGalleryEndpointService.AddModelGalleryEndpoint())
app.Delete("/models/galleries", auth, modelGalleryEndpointService.RemoveModelGalleryEndpoint())
app.Get("/models/jobs/:uuid", auth, modelGalleryEndpointService.GetOpStatusEndpoint())
app.Get("/models/jobs", auth, modelGalleryEndpointService.GetAllStatusEndpoint())
}

app.Post("/tts", auth, localai.TTSEndpoint(cl, ml, appConfig))

Expand Down
8 changes: 4 additions & 4 deletions core/p2p/federated.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (fs *FederatedServer) SelectLeastUsedServer() string {
fs.Lock()
defer fs.Unlock()

log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
log.Debug().Any("request_table", fs.requestTable).Msgf("SelectLeastUsedServer()")

// cycle over requestTable and find the entry with the lower number
// if there are multiple entries with the same number, select one randomly
Expand All @@ -93,7 +93,7 @@ func (fs *FederatedServer) SelectLeastUsedServer() string {
minKey = k
}
}
log.Debug().Any("requests_served", min).Msgf("Selected tunnel %s", minKey)
log.Debug().Any("requests_served", min).Any("request_table", fs.requestTable).Msgf("Selected tunnel %s", minKey)

return minKey
}
Expand All @@ -104,7 +104,7 @@ func (fs *FederatedServer) RecordRequest(nodeID string) {
// increment the counter for the nodeID in the requestTable
fs.requestTable[nodeID]++

log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Recording request")
}

func (fs *FederatedServer) ensureRecordExist(nodeID string) {
Expand All @@ -114,5 +114,5 @@ func (fs *FederatedServer) ensureRecordExist(nodeID string) {
fs.requestTable[nodeID] = 0
}

log.Debug().Any("request_table", fs.requestTable).Msgf("Current request table")
log.Debug().Any("request_table", fs.requestTable).Any("request", nodeID).Msgf("Ensure record exists")
}
10 changes: 4 additions & 6 deletions core/p2p/federated_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,23 @@ func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error {
return err
}
// ll.Info("Binding local port on", srcaddr)

go func() {
<-ctx.Done()
l.Close()
}()
ledger, _ := node.Ledger()

// Announce ourselves so nodes accepts our connection
ledger.Announce(
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)

Expand Down
29 changes: 14 additions & 15 deletions core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
zlog.Error().Err(err).Msg("Error listening")
return err
}
go func() {
<-ctx.Done()
l.Close()
}()

// ll.Info("Binding local port on", srcaddr)

ledger, _ := node.Ledger()
Expand All @@ -60,17 +65,12 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv
ctx,
10*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey(protocol.UsersLedgerKey, node.Host().ID().String())
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[node.Host().ID().String()] = &types.User{
PeerID: node.Host().ID().String(),
Timestamp: time.Now().String(),
}
ledger.Add(protocol.UsersLedgerKey, updatedMap)
// }
},
)

Expand Down Expand Up @@ -197,14 +197,13 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
return
default:
time.Sleep(5 * time.Second)
zlog.Debug().Msg("Searching for workers")

data := ledger.LastBlock().Storage[servicesID]

zlog.Debug().Any("data", ledger.LastBlock().Storage).Msg("Ledger data")

for k, v := range data {
zlog.Info().Msgf("Found worker %s", k)
zlog.Debug().Msgf("New worker found in the ledger data '%s'", k)
nd := &NodeData{}
if err := v.Unmarshal(nd); err != nil {
zlog.Error().Msg("cannot unmarshal node data")
Expand Down Expand Up @@ -245,8 +244,10 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
fmt.Print(err)
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
return
}

tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
service[nd.Name] = nodeServiceData{
Expand Down Expand Up @@ -310,18 +311,13 @@ func ExposeService(ctx context.Context, host, port, token, servicesID string) er
ctx,
20*time.Second,
func() {
// Retrieve current ID for ip in the blockchain
//_, found := ledger.GetKey("services_localai", name)
// If mismatch, update the blockchain
//if !found {
updatedMap := map[string]interface{}{}
updatedMap[name] = &NodeData{
Name: name,
LastSeen: time.Now(),
ID: nodeID(name),
}
ledger.Add(servicesID, updatedMap)
// }
},
)

Expand Down Expand Up @@ -354,7 +350,10 @@ func newNodeOpts(token string) ([]node.Option, error) {
if loglevel == "" {
loglevel = "info"
}

libp2ploglevel := os.Getenv("LOCALAI_LIBP2P_LOGLEVEL")
if libp2ploglevel == "" {
libp2ploglevel = "info"
}
c := config.Config{
Limit: config.ResourceLimit{
Enable: noLimits,
Expand All @@ -363,7 +362,7 @@ func newNodeOpts(token string) ([]node.Option, error) {
NetworkToken: token,
LowProfile: false,
LogLevel: loglevel,
Libp2pLogLevel: "fatal",
Libp2pLogLevel: libp2ploglevel,
Ledger: config.Ledger{
SyncInterval: defaultInterval,
AnnounceInterval: defaultInterval,
Expand Down

0 comments on commit 7278bf3

Please sign in to comment.