From e58a7a43ad6cc250673dd7cdaeada358e39e2bf5 Mon Sep 17 00:00:00 2001 From: Mingkuan Li Date: Fri, 26 Feb 2021 17:09:56 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DSetBelongTo=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E9=80=92=E5=BD=92=E8=B0=83=E7=94=A8=E6=8E=A5=E4=B8=8B=E6=9D=A5?= =?UTF-8?q?=E7=9A=84Processor=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- input/input_box.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/input/input_box.go b/input/input_box.go index e1c161ec..5e1e1eaf 100644 --- a/input/input_box.go +++ b/input/input_box.go @@ -82,24 +82,24 @@ func (box *InputBox) buildTopology(workerIdx int) *topology.ProcessorNode { filterBoxes := topology.BuildFilterBoxes(box.config, filter.BuildFilter) + var firstNode *topology.ProcessorNode + for _, b := range filterBoxes { + firstNode = topology.AppendProcessorsToLink(firstNode, b) + } + firstNode = topology.AppendProcessorsToLink(firstNode, outputProcessor) + // Set BelongTo - for i, b := range filterBoxes { + var node *topology.ProcessorNode + node = firstNode + for _, b := range filterBoxes { + node = node.Next v := reflect.ValueOf(b.Filter) f := v.MethodByName("SetBelongTo") if f.IsValid() { - if i == len(filterBoxes)-1 { - f.Call([]reflect.Value{reflect.ValueOf(outputProcessor)}) - } else { - f.Call([]reflect.Value{reflect.ValueOf(filterBoxes[i+1])}) - } + f.Call([]reflect.Value{reflect.ValueOf(node)}) } } - var firstNode *topology.ProcessorNode - for _, b := range filterBoxes { - firstNode = topology.AppendProcessorsToLink(firstNode, b) - } - firstNode = topology.AppendProcessorsToLink(firstNode, outputProcessor) return firstNode }