From 2811d7bd3e3b70ecc982b02dc742e967769f5d29 Mon Sep 17 00:00:00 2001 From: childe Date: Sun, 20 Oct 2024 12:30:15 +0800 Subject: [PATCH 1/2] fix(reload): stop and then create new inputs --- gohangout.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/gohangout.go b/gohangout.go index 436759a..03bc40c 100644 --- a/gohangout.go +++ b/gohangout.go @@ -114,15 +114,14 @@ func reload() { klog.Errorf("could not parse config, ignore reload: %v", err) return } + klog.Info("stop old inputs") + inputs.stop() + boxes, err := buildPluginLink(gohangoutConfig) if err != nil { klog.Errorf("build plugin link error, ignore reload: %v", err) return } - - klog.Info("stop old inputs") - inputs.stop() - inputs = gohangoutInputs(boxes) klog.Info("start new inputs") go inputs.start() From 540982b4bd61f42a69b0ec33eb72bb0a55b05cde Mon Sep 17 00:00:00 2001 From: childe Date: Sun, 20 Oct 2024 19:33:20 +0800 Subject: [PATCH 2/2] close all connections to client when shutdonw --- input/input_box.go | 2 +- input/tcp_input.go | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/input/input_box.go b/input/input_box.go index 65bf60d..33a24e9 100644 --- a/input/input_box.go +++ b/input/input_box.go @@ -157,6 +157,6 @@ func (box *InputBox) shutdown() { // Shutdown shutdowns the inputs and outputs func (box *InputBox) Shutdown() { - box.shutdown() box.stop = true + box.shutdown() } diff --git a/input/tcp_input.go b/input/tcp_input.go index e077d54..d342606 100644 --- a/input/tcp_input.go +++ b/input/tcp_input.go @@ -19,6 +19,8 @@ type TCPInput struct { l net.Listener messages chan []byte stop bool + + connections []net.Conn } func readLine(scanner *bufio.Scanner, c net.Conn, messages chan<- []byte) { @@ -86,6 +88,7 @@ func newTCPInput(config map[interface{}]interface{}) topology.Input { } klog.Error(err) } else { + p.connections = append(p.connections, conn) scanner := bufio.NewScanner(conn) if v, ok := config["max_length"]; ok { max := v.(int) @@ -109,5 +112,8 @@ func (p *TCPInput) ReadOneEvent() map[string]interface{} { func (p *TCPInput) Shutdown() { p.stop = true p.l.Close() + for _, conn := range p.connections { + conn.Close() + } close(p.messages) }