diff --git a/input/input_box.go b/input/input_box.go index e1c161ec..5e1e1eaf 100644 --- a/input/input_box.go +++ b/input/input_box.go @@ -82,24 +82,24 @@ func (box *InputBox) buildTopology(workerIdx int) *topology.ProcessorNode { filterBoxes := topology.BuildFilterBoxes(box.config, filter.BuildFilter) + var firstNode *topology.ProcessorNode + for _, b := range filterBoxes { + firstNode = topology.AppendProcessorsToLink(firstNode, b) + } + firstNode = topology.AppendProcessorsToLink(firstNode, outputProcessor) + // Set BelongTo - for i, b := range filterBoxes { + var node *topology.ProcessorNode + node = firstNode + for _, b := range filterBoxes { + node = node.Next v := reflect.ValueOf(b.Filter) f := v.MethodByName("SetBelongTo") if f.IsValid() { - if i == len(filterBoxes)-1 { - f.Call([]reflect.Value{reflect.ValueOf(outputProcessor)}) - } else { - f.Call([]reflect.Value{reflect.ValueOf(filterBoxes[i+1])}) - } + f.Call([]reflect.Value{reflect.ValueOf(node)}) } } - var firstNode *topology.ProcessorNode - for _, b := range filterBoxes { - firstNode = topology.AppendProcessorsToLink(firstNode, b) - } - firstNode = topology.AppendProcessorsToLink(firstNode, outputProcessor) return firstNode }