Skip to content

Commit

Permalink
lock before stdin.scan
Browse files Browse the repository at this point in the history
if workers of gohangout is more than one, all goroutines
read stdin at the same time.

maybe one got the first char, and following chars cat captured
by other gorotines
  • Loading branch information
childe committed Aug 15, 2024
1 parent 8dcc4d3 commit 851f288
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
6 changes: 3 additions & 3 deletions input/input_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type InputBox struct {

promCounter prometheus.Counter

shutdownWhenNil bool
exit func()
shutdownWhenNil bool
exit func()

addFields map[field_setter.FieldSetter]value_render.ValueRender
}
Expand All @@ -44,7 +44,7 @@ func NewInputBox(input topology.Input, inputConfig map[interface{}]interface{},

promCounter: topology.GetPromCounter(inputConfig),

exit: exit,
exit: exit,
}
if add_fields, ok := inputConfig["add_fields"]; ok {
b.addFields = make(map[field_setter.FieldSetter]value_render.ValueRender)
Expand Down
14 changes: 8 additions & 6 deletions input/stdin_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package input
import (
"bufio"
"os"
"sync"
"time"

"github.com/childe/gohangout/codec"
Expand All @@ -15,7 +16,7 @@ type StdinInput struct {
decoder codec.Decoder

scanner *bufio.Scanner
messages chan []byte
scanLock sync.Mutex

stop bool
}
Expand All @@ -30,17 +31,18 @@ func newStdinInput(config map[interface{}]interface{}) topology.Input {
codertype = v.(string)
}
p := &StdinInput{

config: config,
decoder: codec.NewDecoder(codertype),
scanner: bufio.NewScanner(os.Stdin),
messages: make(chan []byte, 10),
config: config,
decoder: codec.NewDecoder(codertype),
scanner: bufio.NewScanner(os.Stdin),
}

return p
}

func (p *StdinInput) ReadOneEvent() map[string]interface{} {
p.scanLock.Lock()
defer p.scanLock.Unlock()

if p.scanner.Scan() {
t := p.scanner.Bytes()
msg := make([]byte, len(t))
Expand Down

0 comments on commit 851f288

Please sign in to comment.