Skip to content

Commit

Permalink
fixes the TODO to enable the stop flag
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Rash <jordan@synadia.com>
  • Loading branch information
jordan-rash committed Jan 7, 2025
1 parent 79041f6 commit f67be6f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 5 deletions.
9 changes: 7 additions & 2 deletions cmd/nex/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,16 @@ func (c CopyWorkload) Run(ctx context.Context, globals *Globals) error {
}

if c.StopOriginal {
// controller.UndeployWorkload(c.WorkloadId)
_ = 0 // need to stop c.WorkloadId here
_, err := controller.UndeployWorkload(globals.Namespace, c.WorkloadId)
if err != nil {
return err
}
}

fmt.Printf("Workload successfully copied! New workloadId: %s\n", startResp.Id)
if c.StopOriginal {
fmt.Printf("Workload %s stopped\n", c.WorkloadId)
}
return nil
}

Expand Down
8 changes: 5 additions & 3 deletions node/internal/actors/process_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
a.state = models.WorkloadStateError
}
}
a.state = models.WorkloadStateError
a.retryCounter++
if a.state != models.WorkloadStateStopped {
a.state = models.WorkloadStateError
a.retryCounter++
}
}

if a.retryCounter == a.retryCount {
Expand Down Expand Up @@ -232,7 +234,7 @@ func (a *processActor) KillOsProcess() error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for _ = range ticker.C {
for range ticker.C {
if !a.process.IsRunning() {
ticker.Stop()
a.logger.Debug("workload stopped gracefully", slog.String("id", a.id))
Expand Down
59 changes: 59 additions & 0 deletions test/copy_workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,62 @@ func TestMultipleCopyWorkload(t *testing.T) {
be.NilErr(t, nex1.Wait())
be.True(t, passed)
}

func TestCopyWorkloadWithStop(t *testing.T) {
workingDir := t.TempDir()

binPath := BuildTestBinary(t, "./testdata/forever/main.go", workingDir)

nexCli := buildNexCli(t, workingDir)

s := StartNatsServer(t, workingDir)
defer s.Shutdown()

nex1 := startNexNodeCmd(t, workingDir, Node1ServerSeed, Node1XKeySeed, s.ClientURL(), "node1", "nexus")
nex1.SysProcAttr = sysProcAttr()

nex2 := startNexNodeCmd(t, workingDir, "", "", s.ClientURL(), "node2", "nexus")
nex2.SysProcAttr = sysProcAttr()

be.NilErr(t, nex1.Start())
be.NilErr(t, nex2.Start())

go func() {
time.Sleep(500 * time.Millisecond)

origStdOut := new(bytes.Buffer)
origDeploy := exec.Command(nexCli, "workload", "run", "-s", s.ClientURL(), "--name", "tester", fmt.Sprintf("--node-tags=%s=%s", models.TagNodeName, "node1"), "file://"+binPath, fmt.Sprintf("--env=NATS_URL=%s", s.ClientURL()))
origDeploy.Stdout = origStdOut
err := origDeploy.Run()
be.NilErr(t, err)

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String()))
be.Equal(t, 2, len(match))
origWorkloadId := match[1]

copyStdOut := new(bytes.Buffer)
copyDeploy := exec.Command(nexCli, "workload", "copy", "-s", s.ClientURL(), origWorkloadId, fmt.Sprintf("--node-tags=%s=%s", models.TagNodeName, "node2"), "--stop")
copyDeploy.Stdout = copyStdOut
be.NilErr(t, copyDeploy.Run())

time.Sleep(500 * time.Millisecond)
node1InfoStdOut := new(bytes.Buffer)
node1InfoStdErr := new(bytes.Buffer)
node1Info := exec.Command(nexCli, "node", "info", "-s", s.ClientURL(), "--json", Node1ServerPublicKey)
node1Info.Stdout = node1InfoStdOut
node1Info.Stderr = node1InfoStdErr
be.NilErr(t, node1Info.Run())

resp := new(gen.NodeInfoResponseJson)
be.NilErr(t, json.Unmarshal(node1InfoStdOut.Bytes(), resp))

be.Equal(t, 0, len(resp.WorkloadSummaries))

be.NilErr(t, stopProcess(nex1.Process))
be.NilErr(t, stopProcess(nex2.Process))
}()

be.NilErr(t, nex1.Wait())
be.NilErr(t, nex2.Wait())
}

0 comments on commit f67be6f

Please sign in to comment.