Skip to content

Commit

Permalink
Merge pull request #124 from WideLee/master
Browse files Browse the repository at this point in the history
修复SetBelongTo无法递归调用接下来的Processor的问题
  • Loading branch information
childe committed Feb 28, 2021
2 parents daf6e62 + e58a7a4 commit 476ec49
Showing 1 changed file with 11 additions and 11 deletions.
22 changes: 11 additions & 11 deletions input/input_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,24 @@ func (box *InputBox) buildTopology(workerIdx int) *topology.ProcessorNode {

filterBoxes := topology.BuildFilterBoxes(box.config, filter.BuildFilter)

var firstNode *topology.ProcessorNode
for _, b := range filterBoxes {
firstNode = topology.AppendProcessorsToLink(firstNode, b)
}
firstNode = topology.AppendProcessorsToLink(firstNode, outputProcessor)

// Set BelongTo
for i, b := range filterBoxes {
var node *topology.ProcessorNode
node = firstNode
for _, b := range filterBoxes {
node = node.Next
v := reflect.ValueOf(b.Filter)
f := v.MethodByName("SetBelongTo")
if f.IsValid() {
if i == len(filterBoxes)-1 {
f.Call([]reflect.Value{reflect.ValueOf(outputProcessor)})
} else {
f.Call([]reflect.Value{reflect.ValueOf(filterBoxes[i+1])})
}
f.Call([]reflect.Value{reflect.ValueOf(node)})
}
}

var firstNode *topology.ProcessorNode
for _, b := range filterBoxes {
firstNode = topology.AppendProcessorsToLink(firstNode, b)
}
firstNode = topology.AppendProcessorsToLink(firstNode, outputProcessor)
return firstNode
}

Expand Down

0 comments on commit 476ec49

Please sign in to comment.