Skip to content

Commit

Permalink
fix(p2p): allocate tunnels only when needed (#3259)
Browse files Browse the repository at this point in the history
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
  • Loading branch information
mudler authored Aug 17, 2024
1 parent 1ed5af1 commit 27b03a5
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
4 changes: 2 additions & 2 deletions core/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {

os.Setenv("LLAMACPP_GRPC_SERVERS", tunnelEnvVar)
log.Debug().Msgf("setting LLAMACPP_GRPC_SERVERS to %s", tunnelEnvVar)
}); err != nil {
}, true); err != nil {
return err
}
}
Expand All @@ -153,7 +153,7 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
return err
}

if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil); err != nil {
if err := p2p.ServiceDiscoverer(context.Background(), node, token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.FederatedID), nil, false); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/p2p/federated_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {

if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel NodeData) {
log.Debug().Msgf("Discovered node: %s", tunnel.ID)
}); err != nil {
}, true); err != nil {
return err
}

Expand Down
33 changes: 18 additions & 15 deletions core/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,11 @@ func allocateLocalService(ctx context.Context, node *node.Node, listenAddr, serv

// This is the main of the server (which keeps the env variable updated)
// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData)) error {
func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID string, discoveryFunc func(serviceID string, node NodeData), allocate bool) error {
if servicesID == "" {
servicesID = defaultServicesID
}
tunnels, err := discoveryTunnels(ctx, n, token, servicesID)
tunnels, err := discoveryTunnels(ctx, n, token, servicesID, allocate)
if err != nil {
return err
}
Expand All @@ -170,7 +170,7 @@ func ServiceDiscoverer(ctx context.Context, n *node.Node, token, servicesID stri
return nil
}

func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string) (chan NodeData, error) {
func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID string, allocate bool) (chan NodeData, error) {
tunnels := make(chan NodeData)

err := n.Start(ctx)
Expand Down Expand Up @@ -209,7 +209,7 @@ func discoveryTunnels(ctx context.Context, n *node.Node, token, servicesID strin
zlog.Error().Msg("cannot unmarshal node data")
continue
}
ensureService(ctx, n, nd, k)
ensureService(ctx, n, nd, k, allocate)
muservice.Lock()
if _, ok := service[nd.Name]; ok {
tunnels <- service[nd.Name].NodeData
Expand All @@ -231,7 +231,7 @@ type nodeServiceData struct {
var service = map[string]nodeServiceData{}
var muservice sync.Mutex

func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string) {
func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string, allocate bool) {
muservice.Lock()
defer muservice.Unlock()
if ndService, found := service[nd.Name]; !found {
Expand All @@ -240,22 +240,25 @@ func ensureService(ctx context.Context, n *node.Node, nd *NodeData, sserv string
zlog.Debug().Msgf("Node %s is offline", nd.ID)
return
}

newCtxm, cancel := context.WithCancel(ctx)
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
return
}
if allocate {
// Start the service
port, err := freeport.GetFreePort()
if err != nil {
zlog.Error().Err(err).Msgf("Could not allocate a free port for %s", nd.ID)
return
}

tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
tunnelAddress := fmt.Sprintf("127.0.0.1:%d", port)
nd.TunnelAddress = tunnelAddress
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
}
service[nd.Name] = nodeServiceData{
NodeData: *nd,
CancelFunc: cancel,
}
go allocateLocalService(newCtxm, n, tunnelAddress, sserv)
zlog.Debug().Msgf("Starting service %s on %s", sserv, tunnelAddress)
} else {
// Check if the service is still alive
// if not cancel the context
Expand Down
2 changes: 1 addition & 1 deletion core/p2p/p2p_disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (f *FederatedServer) Start(ctx context.Context) error {
return fmt.Errorf("not implemented")
}

func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData)) error {
func ServiceDiscoverer(ctx context.Context, node *node.Node, token, servicesID string, fn func(string, NodeData), allocate bool) error {
return fmt.Errorf("not implemented")
}

Expand Down

0 comments on commit 27b03a5

Please sign in to comment.