Skip to content

Commit

Permalink
DAOS-11551 control: Flush c output buffer. (#12632) (#12650)
Browse files Browse the repository at this point in the history
Update go code to flush and wait for completion on shutdown.

This avoids loosing any output from the c helper code.

Signed-off-by: Ashley Pittman <ashley.m.pittman@intel.com>
  • Loading branch information
ashleypittman authored Jul 18, 2023
1 parent bea574e commit 2c372ab
Showing 1 changed file with 12 additions and 18 deletions.
30 changes: 12 additions & 18 deletions src/control/cmd/daos/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"fmt"
"io"
"os"
"time"
"unsafe"

"github.com/google/uuid"
Expand Down Expand Up @@ -138,24 +137,22 @@ func freeString(str *C.char) {
C.free(unsafe.Pointer(str))
}

func createWriteStream(ctx context.Context, prefix string, printLn func(line string)) (*C.FILE, func(), error) {
// Create a FILE object for the handler to use for
// printing output or errors, and call the callback
// for each line.
func createWriteStream(ctx context.Context, printLn func(line string)) (*C.FILE, func(), error) {
// Create a FILE object for the handler to use for printing output or errors, and call the
// callback for each line.
r, w, err := os.Pipe()
if err != nil {
return nil, nil, err
}
done := make(chan bool, 1)

stream, err := fd2FILE(w.Fd(), "w")
if err != nil {
return nil, nil, err
}

go func(ctx context.Context, prefix string) {
if prefix != "" {
prefix = ": "
}
go func(ctx context.Context) {
defer close(done)

rdr := bufio.NewReader(r)
for {
Expand All @@ -165,21 +162,20 @@ func createWriteStream(ctx context.Context, prefix string, printLn func(line str
default:
line, err := rdr.ReadString('\n')
if err != nil {
if !(errors.Is(err, io.EOF) || errors.Is(err, os.ErrClosed)) {
if !errors.Is(err, io.EOF) {
printLn(fmt.Sprintf("read err: %s", err))
}
return
}
printLn(fmt.Sprintf("%s%s", prefix, line))
printLn(line)
}
}
}(ctx, prefix)
}(ctx)

return stream, func() {
C.fflush(stream)
C.fclose(stream)
r.Close()
w.Close()
<-done
}, nil
}

Expand Down Expand Up @@ -220,15 +216,15 @@ func allocCmdArgs(log logging.Logger) (ap *C.struct_cmd_args_s, cleanFn func(),
ap.sysname = C.CString(build.DefaultSystemName)

ctx, cancel := context.WithCancel(context.Background())
outStream, outCleanup, err := createWriteStream(ctx, "", log.Info)
outStream, outCleanup, err := createWriteStream(ctx, log.Info)
if err != nil {
freeCmdArgs(ap)
cancel()
return nil, nil, err
}
ap.outstream = outStream

errStream, errCleanup, err := createWriteStream(ctx, "handler", log.Error)
errStream, errCleanup, err := createWriteStream(ctx, log.Error)
if err != nil {
outCleanup()
freeCmdArgs(ap)
Expand All @@ -241,8 +237,6 @@ func allocCmdArgs(log logging.Logger) (ap *C.struct_cmd_args_s, cleanFn func(),
outCleanup()
errCleanup()
freeCmdArgs(ap)
// Give the streams a chance to flush.
time.Sleep(250 * time.Millisecond)
cancel()
}, nil
}
Expand Down

0 comments on commit 2c372ab

Please sign in to comment.