diff --git a/flow/tasks/os/exec.go b/flow/tasks/os/exec.go index 8dbdb957..82db6520 100644 --- a/flow/tasks/os/exec.go +++ b/flow/tasks/os/exec.go @@ -2,129 +2,205 @@ package os import ( "bytes" + "time" + "context" "fmt" "io" "os" "os/exec" "strings" + "sync" + "syscall" "cuelang.org/go/cue" - hofcontext "github.com/hofstadter-io/hof/flow/context" "github.com/hofstadter-io/hof/lib/cuetils" ) -type Exec struct{} +type Exec struct { + cmd *exec.Cmd + cancel context.CancelFunc + wg sync.WaitGroup +} func NewExec(val cue.Value) (hofcontext.Runner, error) { return &Exec{}, nil } -func (T *Exec) Run(ctx *hofcontext.Context) (interface{}, error) { +func (T *Exec) Run(hctx *hofcontext.Context) (interface{}, error) { + + v := hctx.Value + + // Create a cancellable context for process management + goctx, cancel := context.WithCancel(context.Background()) + T.cancel = cancel + // Add defer at start of Run method + defer func() { + if T.cancel != nil { + T.cancel() + } + }() - v := ctx.Value - var cmd *exec.Cmd + // Setup channels for process management + processComplete := make(chan struct{}, 1) + processError := make(chan error, 1) + // Setup return value with defaults + ret := map[string]interface{}{ + "exitcode": -1, + "success": false, + } + + // Lock CUE access and extract config init_schemas(v.Context()) - // unify with schema v = v.Unify(task_exec) if v.Err() != nil { return nil, cuetils.ExpandCueError(v.Err()) } - // TODO, rework how i/o works for exec + + // Extract all configuration under lock + hctx.CUELock.Lock() + cmds, dir, env, stdin, stdout, stderr, doExit, exterr := extractExecConfig(v) + hctx.CUELock.Unlock() + + if exterr != nil { + return nil, fmt.Errorf("os.Exec field: value extraction failed: %w", exterr) + } - // todo, check failure modes, fill, not return error? - // (in all tasks, really) + // Create and configure command with goctx + T.cmd = exec.CommandContext(goctx, cmds[0], cmds[1:]...) + T.cmd.Dir = dir + T.cmd.Env = env - var stdout, stderr io.Writer + // Setup I/O + if stdin != nil { + T.cmd.Stdin = stdin + } + if stdout != nil { + T.cmd.Stdout = stdout + } + if stderr != nil { + T.cmd.Stderr = stderr + } - ferr := func() error { - ctx.CUELock.Lock() - defer func() { - ctx.CUELock.Unlock() - }() - // get and create command - cmds, err := extractCmd(v) - if err != nil { - return err + // Launch process in background + T.wg.Add(1) + go func() { + defer T.wg.Done() + defer close(processComplete) + defer close(processError) + + // Start the process + if err := T.cmd.Start(); err != nil { + processError <- err + return } - cmd = exec.Command(cmds[0], cmds[1:]...) - // get dir / env for command - dir, err := extractDir(v) + // Wait for process completion + err := T.cmd.Wait() if err != nil { - return err + processError <- err } - cmd.Dir = dir + processComplete <- struct{}{} + }() - env, err := extractEnv(v) - if err != nil { - return err + // Wait for completion, error, or cancellation + select { + case <-processComplete: + // Update return value with process state + if T.cmd.ProcessState != nil { + ret["exitcode"] = T.cmd.ProcessState.ExitCode() + ret["success"] = T.cmd.ProcessState.Success() } - cmd.Env = env + ret, _ = fillIO(v, ret, stdout, stderr) - // setup i/o for command - var stdin io.Reader - stdin, stdout, stderr, err = extractIO(v) - if err != nil { - return err + case err := <-processError: + // Handle error according to user preferences + ret["error"] = err.Error() + if doExit { + return ret, err } - if stdin != nil { - cmd.Stdin = stdin - } - if stdout != nil { - cmd.Stdout = stdout - } - if stderr != nil { - cmd.Stderr = stderr - } + case <-goctx.Done(): + // Flow context cancelled (CTRL-C, timeout, etc) + T.cleanup() + ret["error"] = "flow cancelled" - return nil - }() - if ferr != nil { - return nil, ferr - } + // Capture any output that occurred before cancellation + ret, _ = fillIO(v, ret, stdout, stderr) - // (TODO): check for user's abort mode preference - doExit, err := extractExit(v) - if err != nil { - return nil, err + return ret, goctx.Err() } - // - // run command - // - rerr := cmd.Run() - - // TODO, how to run in the background and wait for signal? - - // build return value - ret := make(map[string]interface{}) + return ret, nil +} - // - // possibly fill stdout/stderr - // - ret, err = fillIO(v, ret, stdout, stderr) - if err != nil { - return nil, err +func (T *Exec) cleanup() { + // Cancel our process context + if T.cancel != nil { + T.cancel() } - // fill exit code / successful - ret["exitcode"] = cmd.ProcessState.ExitCode() - ret["success"] = cmd.ProcessState.Success() + // Try graceful shutdown first + if T.cmd != nil && T.cmd.Process != nil { + _ = T.cmd.Process.Signal(syscall.SIGTERM) + } + // Wait briefly for cleanup + done := make(chan struct{}) + go func() { + T.wg.Wait() + close(done) + }() - if rerr != nil { - ret["error"] = rerr.Error() - if doExit { - fmt.Printf("In %v\n%v", v.Path(), ret) - return ret, rerr - } else if ctx.ShowErrors { - fmt.Printf("In %v\n%v", v.Path(), ret) + select { + case <-done: + return + case <-time.After(3 * time.Second): + // Force kill if still running + if T.cmd != nil && T.cmd.Process != nil { + _ = T.cmd.Process.Kill() } } - return ret, nil +} + +func extractExecConfig(v cue.Value) ( + cmds []string, + dir string, + env []string, + stdin io.Reader, + stdout io.Writer, + stderr io.Writer, + doExit bool, + err error, +) { + // Extract command + cmds, err = extractCmd(v) + if err != nil { + return + } + + // Extract directory + dir, err = extractDir(v) + if err != nil { + return + } + + // Extract environment + env, err = extractEnv(v) + if err != nil { + return + } + + // Extract I/O configuration + stdin, stdout, stderr, err = extractIO(v) + if err != nil { + return + } + + // Extract exit behavior + doExit, err = extractExit(v) + return } func extractCmd(ex cue.Value) ([]string, error) { @@ -154,7 +230,7 @@ func extractCmd(ex cue.Value) ([]string, error) { cmds = append(cmds, c) } default: - return nil, fmt.Errorf("unsupported cmd type: ", val.IncompleteKind()) + return nil, fmt.Errorf("unsupported cmd type: %T", val.IncompleteKind()) } return cmds, nil @@ -213,89 +289,104 @@ func extractEnv(ex cue.Value) ([]string, error) { return nil, nil } +// extractIO handles configuring input/output streams for command execution based on CUE configuration. +// It supports several IO modes for each stream: +// - String input is treated as direct input (except "-" which means use stdin) +// - Bytes input is used directly +// - Boolean true connects to process standard streams +// - Null or unspecified uses sensible defaults func extractIO(ex cue.Value) (Stdin io.Reader, Stdout, Stderr io.Writer, err error) { - // handle stdin, - iv := ex.LookupPath(cue.ParsePath("stdin")) - if iv.Exists() { - switch iv.IncompleteKind() { - case cue.StringKind: - s, err := iv.String() - if err != nil { - return nil, nil, nil, err - } - if s == "-" { - // (BUG): works around centralized printing - Stdin = os.Stdin - } - Stdin = strings.NewReader(s) - - case cue.BytesKind: - b, err := iv.Bytes() - if err != nil { - return nil, nil, nil, err - } - Stdin = bytes.NewReader(b) - - case cue.BoolKind: - Stdin = os.Stdin - - case cue.NullKind: - // do nothing so no Stdin is set - - default: - return nil, nil, nil, fmt.Errorf("unsupported type %v for stdin", iv.IncompleteKind()) - } - } - - // handle Stdout - ov := ex.LookupPath(cue.ParsePath("stdout")) - if !ov.Exists() { - Stdout = os.Stdout - } else { - switch ov.IncompleteKind() { - // we want a bytes writer for Now - // will return the proper format when filling the value back - case cue.StringKind: - fallthrough - case cue.BytesKind: - Stdout = new(bytes.Buffer) - - case cue.BoolKind: - Stdin = os.Stdout - - case cue.NullKind: - // do nothing so no Stdin is set - - default: - return nil, nil, nil, fmt.Errorf("unsupported type %v for stdout", ov.IncompleteKind()) - } - } - - // handle Stderr - ev := ex.LookupPath(cue.ParsePath("stderr")) - if !ev.Exists() { - Stderr = os.Stderr - } else { - switch ev.IncompleteKind() { - // we want a bytes writer for Now - // will return the proper format when filling the value back - case cue.StringKind: - fallthrough - case cue.BytesKind: - Stderr = new(bytes.Buffer) - - case cue.BoolKind: - Stdin = os.Stderr - - case cue.NullKind: - // do nothing so no Stdin is set - - default: - return nil, nil, nil, fmt.Errorf("unsupported type %v for stderr", ev.IncompleteKind()) - } - } - return Stdin, Stdout, Stderr, nil + // handle stdin, + iv := ex.LookupPath(cue.ParsePath("stdin")) + if iv.Exists() { + switch iv.IncompleteKind() { + case cue.StringKind: + s, err := iv.String() + if err != nil { + return nil, nil, nil, err + } + // Special case: "-" means use standard input + // This allows tasks to specify they want to read from stdin + if s == "-" { + // (BUG): works around centralized printing + Stdin = os.Stdin + } else { + // Otherwise create a reader from the string content + Stdin = strings.NewReader(s) + } + + case cue.BytesKind: + b, err := iv.Bytes() + if err != nil { + return nil, nil, nil, err + } + // Create a reader directly from bytes + Stdin = bytes.NewReader(b) + + case cue.BoolKind: + Stdin = os.Stdin + + case cue.NullKind: + // do nothing so no Stdin is set + + default: + return nil, nil, nil, fmt.Errorf("unsupported type %v for stdin", iv.IncompleteKind()) + } + } + + // Handle stdout configuration + ov := ex.LookupPath(cue.ParsePath("stdout")) + if !ov.Exists() { + // Default to process stdout if not specified + Stdout = os.Stdout + } else { + switch ov.IncompleteKind() { + // we want a bytes writer for Now + // will return the proper format when filling the value back + case cue.StringKind: + fallthrough + case cue.BytesKind: + Stdout = new(bytes.Buffer) + + case cue.BoolKind: + Stdout = os.Stdout + + case cue.NullKind: + // do nothing so no Stdout is set + + default: + return nil, nil, nil, fmt.Errorf("unsupported type %v for stdout", ov.IncompleteKind()) + } + } + + // Handle stderr configuration + // This follows the same pattern as stdout + ev := ex.LookupPath(cue.ParsePath("stderr")) + if !ev.Exists() { + // Default to process stderr if not specified + Stderr = os.Stderr + } else { + switch ev.IncompleteKind() { + // we want a bytes writer for Now + // will return the proper format when filling the value back + case cue.StringKind: + fallthrough + case cue.BytesKind: + Stderr = new(bytes.Buffer) + + case cue.BoolKind: + Stderr = os.Stderr + + case cue.NullKind: + // do nothing so no Stderr is set + + default: + return nil, nil, nil, fmt.Errorf("unsupported type %v for stderr", ev.IncompleteKind()) + } + } + + return Stdin, Stdout, Stderr, nil } func fillIO(ex cue.Value, ret map[string]interface{}, Stdout, Stderr io.Writer) (map[string]interface{}, error) { diff --git a/flow/tasks/st/patch.go b/flow/tasks/st/patch.go index c76354bd..c5dd11d7 100644 --- a/flow/tasks/st/patch.go +++ b/flow/tasks/st/patch.go @@ -21,9 +21,9 @@ func (T *Patch) Run(ctx *hofcontext.Context) (interface{}, error) { v := ctx.Value o := v.LookupPath(cue.ParsePath("orig")) - n := v.LookupPath(cue.ParsePath("patch")) + p := v.LookupPath(cue.ParsePath("patch")) - r, err := structural.PatchValue(o, n, nil) + r, err := structural.PatchValue(p, o, nil) if err != nil { return nil, err }