Skip to content

Commit

Permalink
Refactor VM handling in machine manager
Browse files Browse the repository at this point in the history
  • Loading branch information
kthomas committed Feb 6, 2024
1 parent 6734e5e commit 931bd31
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 57 deletions.
5 changes: 3 additions & 2 deletions internal/node/agentcomms.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (mgr *MachineManager) handleAgentLog(m *nats.Msg) {
tokens := strings.Split(m.Subject, ".")
vmId := tokens[1]

vm, ok := mgr.allVms[vmId]
vm, ok := mgr.allVMs[vmId]
if !ok {
mgr.log.Warn("Received a log message from an unknown VM.")
return
Expand Down Expand Up @@ -60,7 +60,7 @@ func (mgr *MachineManager) handleAgentEvent(m *nats.Msg) {
tokens := strings.Split(m.Subject, ".")
vmId := tokens[1]

vm, ok := mgr.allVms[vmId]
vm, ok := mgr.allVMs[vmId]
if !ok {
mgr.log.Warn("Received an event from a VM we don't know about. Rejecting.")
return
Expand Down Expand Up @@ -109,3 +109,4 @@ func logPublishSubject(namespace string, node string, workload string, vm string
// $NEX.logs.{namespace}.{node}.{workload name}.{vm}
return fmt.Sprintf("%s.%s.%s.%s.%s", LogSubjectPrefix, namespace, node, workload, vm)
}

7 changes: 4 additions & 3 deletions internal/node/controlapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) {
return
}

runningVM := <-api.mgr.warmVms
runningVM := <-api.mgr.warmVMs
workloadName := request.DecodedClaims.Subject

api.log.
Expand Down Expand Up @@ -255,7 +255,7 @@ func (api *ApiListener) handlePing(m *nats.Msg) {
NodeId: api.nodeId,
Version: Version(),
Uptime: myUptime(now.Sub(api.start)),
RunningMachines: len(api.mgr.allVms),
RunningMachines: len(api.mgr.allVMs),
Tags: api.config.Tags,
}, nil)

Expand Down Expand Up @@ -284,7 +284,7 @@ func (api *ApiListener) handleInfo(m *nats.Msg) {
Uptime: myUptime(now.Sub(api.start)),
Tags: api.config.Tags,
SupportedWorkloadTypes: api.config.WorkloadTypes,
Machines: summarizeMachines(&api.mgr.allVms, namespace),
Machines: summarizeMachines(&api.mgr.allVMs, namespace),
Memory: stats,
}, nil)

Expand Down Expand Up @@ -376,3 +376,4 @@ func extractNamespace(subject string) (string, error) {
}
return tokens[2], nil
}

64 changes: 23 additions & 41 deletions internal/node/machine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ type MachineManager struct {
ctx context.Context
t *Telemetry

allVms map[string]*runningFirecracker
warmVms chan *runningFirecracker
allVMs map[string]*runningFirecracker
warmVMs chan *runningFirecracker

handshakes map[string]string
handshakeTimeout time.Duration // TODO: make configurable...
Expand Down Expand Up @@ -84,8 +84,8 @@ func NewMachineManager(
t: telemetry,
ctx: ctx,

allVms: make(map[string]*runningFirecracker),
warmVms: make(chan *runningFirecracker, config.MachinePoolSize-1),
allVMs: make(map[string]*runningFirecracker),
warmVMs: make(chan *runningFirecracker, config.MachinePoolSize-1),
}

_, err := m.ncInternal.Subscribe("agentint.handshake", m.handleHandshake)
Expand Down Expand Up @@ -130,13 +130,11 @@ func (m *MachineManager) Start() {
go m.awaitHandshake(vm.vmmID)
m.log.Info("Adding new VM to warm pool", slog.Any("ip", vm.ip), slog.String("vmid", vm.vmmID))

// If the pool is full, this line will block until a slot is available.
m.warmVms <- vm

// This gets executed when another goroutine pulls a vm out of the warmVms channel and unblocks
m.allVms[vm.vmmID] = vm

m.allVMs[vm.vmmID] = vm
m.t.vmCounter.Add(m.ctx, 1)

// If the pool is full, this line will block until a slot is available.
m.warmVMs <- vm
}
}
}
Expand Down Expand Up @@ -265,29 +263,10 @@ func (m *MachineManager) DeployWorkload(vm *runningFirecracker, request *agentap
func (m *MachineManager) Stop() error {
if atomic.AddUint32(&m.closing, 1) == 1 {
m.log.Info("Virtual machine manager stopping")
close(m.warmVms)

for _, vm := range m.allVms {
vm.shutdown()
_ = m.publishMachineStopped(vm) // FIXME-- this is confusing to be here as well as in StopMachine()
}

// Now empty the leftovers in the pool
for vm := range m.warmVms {
vm.shutdown()
close(m.warmVMs)

if vm.deployRequest != nil {
m.t.workloadCounter.Add(m.ctx, -1, metric.WithAttributes(attribute.String("workload_type", *vm.deployRequest.WorkloadType)))
m.t.workloadCounter.Add(m.ctx, -1, metric.WithAttributes(attribute.String("workload_type", *vm.deployRequest.WorkloadType)), metric.WithAttributes(attribute.String("namespace", vm.namespace)))
m.t.deployedByteCounter.Add(m.ctx, vm.deployRequest.TotalBytes*-1)
m.t.deployedByteCounter.Add(m.ctx, vm.deployRequest.TotalBytes*-1, metric.WithAttributes(attribute.String("namespace", vm.namespace)))
}

m.t.vmCounter.Add(m.ctx, -1)
m.t.allocatedVCPUCounter.Add(m.ctx, *vm.machine.Cfg.MachineCfg.VcpuCount*-1)
m.t.allocatedVCPUCounter.Add(m.ctx, *vm.machine.Cfg.MachineCfg.VcpuCount*-1, metric.WithAttributes(attribute.String("namespace", vm.namespace)))
m.t.allocatedMemoryCounter.Add(m.ctx, *vm.machine.Cfg.MachineCfg.MemSizeMib*-1)
m.t.allocatedMemoryCounter.Add(m.ctx, *vm.machine.Cfg.MachineCfg.MemSizeMib*-1, metric.WithAttributes(attribute.String("namespace", vm.namespace)))
for _, vm := range m.allVMs {
m.StopMachine(vm.vmmID)

Check failure on line 269 in internal/node/machine_mgr.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `m.StopMachine` is not checked (errcheck)
}

m.cleanSockets()
Expand All @@ -298,25 +277,28 @@ func (m *MachineManager) Stop() error {

// Stops a single machine. Will return an error if called with a non-existent workload/vm ID
func (m *MachineManager) StopMachine(vmID string) error {
vm, exists := m.allVms[vmID]
vm, exists := m.allVMs[vmID]
if !exists {
return fmt.Errorf("failed to stop machine %s", vmID)
}

// we do a request here to allow graceful shutdown of the workload being undeployed
subject := fmt.Sprintf("agentint.%s.undeploy", vm.vmmID)
_, err := m.ncInternal.Request(subject, []byte{}, 500*time.Millisecond)
if err != nil {
return err
if vm.deployRequest != nil {
// we do a request here to allow graceful shutdown of the workload being undeployed
subject := fmt.Sprintf("agentint.%s.undeploy", vm.vmmID)
_, err := m.ncInternal.Request(subject, []byte{}, 500*time.Millisecond) // FIXME-- allow this timeout to be configurable... 500ms is likely not enough
if err != nil {
return err
}
}

vm.shutdown()
delete(m.allVms, vmID)
delete(m.allVMs, vmID)

_ = m.publishMachineStopped(vm)

if vm.deployRequest != nil {
m.t.workloadCounter.Add(m.ctx, -1, metric.WithAttributes(attribute.String("namespace", vm.namespace)))
m.t.workloadCounter.Add(m.ctx, -1, metric.WithAttributes(attribute.String("workload_type", *vm.deployRequest.WorkloadType)))
m.t.workloadCounter.Add(m.ctx, -1, metric.WithAttributes(attribute.String("workload_type", *vm.deployRequest.WorkloadType)), metric.WithAttributes(attribute.String("namespace", vm.namespace)))
m.t.deployedByteCounter.Add(m.ctx, vm.deployRequest.TotalBytes*-1)
m.t.deployedByteCounter.Add(m.ctx, vm.deployRequest.TotalBytes*-1, metric.WithAttributes(attribute.String("namespace", vm.namespace)))
}
Expand All @@ -332,7 +314,7 @@ func (m *MachineManager) StopMachine(vmID string) error {

// Looks up a virtual machine by workload/vm ID. Returns nil if machine doesn't exist
func (m *MachineManager) LookupMachine(vmId string) *runningFirecracker {
vm, exists := m.allVms[vmId]
vm, exists := m.allVMs[vmId]
if !exists {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/node/nodeproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ func (m *MachineManagerProxy) Telemetry() *Telemetry {
}

func (m *MachineManagerProxy) VMs() map[string]*runningFirecracker {
return m.m.allVms
return m.m.allVMs
}

func (m *MachineManagerProxy) PoolVMs() chan *runningFirecracker {
return m.m.warmVms
return m.m.warmVMs
}

11 changes: 2 additions & 9 deletions internal/node/running_vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,8 @@ func (vm *runningFirecracker) shutdown() {
vm.log.Error("Failed to stop firecracker VM", slog.Any("err", err))
}

err = os.Remove(vm.machine.Cfg.SocketPath)
if err != nil {
if !errors.Is(err, fs.ErrExist) {
vm.log.Warn("Failed to delete firecracker socket", slog.Any("err", err))
}
}

// NOTE: we're not deleting the firecracker machine logs ... they're in a tempfs so they'll eventually
// go away but we might want them kept around for troubleshooting
_ = os.Remove(vm.machine.Cfg.SocketPath)
_ = os.Remove(fmt.Sprintf("%s.log", vm.machine.Cfg.SocketPath))

rootFs := getRootFsPath(vm.vmmID)
err = os.Remove(rootFs)
Expand Down

0 comments on commit 931bd31

Please sign in to comment.