Skip to content

Commit

Permalink
refactor: make all struct to topology
Browse files Browse the repository at this point in the history
  • Loading branch information
childe committed Dec 11, 2019
1 parent f8699a9 commit 8e8e191
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 177 deletions.
125 changes: 0 additions & 125 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,11 @@ package filter

import (
"plugin"
"reflect"

"github.com/childe/gohangout/condition_filter"
"github.com/childe/gohangout/field_deleter"
"github.com/childe/gohangout/field_setter"
"github.com/childe/gohangout/topology"
"github.com/childe/gohangout/value_render"
"github.com/golang/glog"
)

func BuildFilterBoxes(config map[string]interface{}) []*FilterBox {
if _, ok := config["filters"]; !ok {
return nil
}

filtersI := config["filters"].([]interface{})
filters := make([]topology.Filter, len(filtersI))

for i := 0; i < len(filters); i++ {
for filterTypeI, filterConfigI := range filtersI[i].(map[interface{}]interface{}) {
filterType := filterTypeI.(string)
glog.Infof("filter type: %s", filterType)
filterConfig := filterConfigI.(map[interface{}]interface{})
glog.Infof("filter config: %v", filterConfig)

filterPlugin := BuildFilter(filterType, filterConfig)

filters[i] = filterPlugin
}
}

boxes := make([]*FilterBox, len(filters))
for i := 0; i < len(filters); i++ {
for _, cfg := range filtersI[i].(map[interface{}]interface{}) {
boxes[i] = NewFilterBox(cfg.(map[interface{}]interface{}))
boxes[i].Filter = filters[i]
}
}

return boxes
}

func BuildFilter(filterType string, config map[interface{}]interface{}) topology.Filter {
switch filterType {
case "Add":
Expand Down Expand Up @@ -115,91 +78,3 @@ func BuildFilter(filterType string, config map[interface{}]interface{}) topology
return newFunc.(func(map[interface{}]interface{}) interface{})(config).(topology.Filter)
}
}

type FilterBox struct {
Filter topology.Filter

conditionFilter *condition_filter.ConditionFilter

config map[interface{}]interface{}

failTag string
removeFields []field_deleter.FieldDeleter
addFields map[field_setter.FieldSetter]value_render.ValueRender
}

func NewFilterBox(config map[interface{}]interface{}) *FilterBox {
f := FilterBox{
config: config,
conditionFilter: condition_filter.NewConditionFilter(config),
}

if v, ok := config["failTag"]; ok {
f.failTag = v.(string)
} else {
f.failTag = ""
}

if remove_fields, ok := config["remove_fields"]; ok {
f.removeFields = make([]field_deleter.FieldDeleter, 0)
for _, field := range remove_fields.([]interface{}) {
f.removeFields = append(f.removeFields, field_deleter.NewFieldDeleter(field.(string)))
}
} else {
f.removeFields = nil
}

if add_fields, ok := config["add_fields"]; ok {
f.addFields = make(map[field_setter.FieldSetter]value_render.ValueRender)
for k, v := range add_fields.(map[interface{}]interface{}) {
fieldSetter := field_setter.NewFieldSetter(k.(string))
if fieldSetter == nil {
glog.Fatalf("could build field setter from %s", k.(string))
}
f.addFields[fieldSetter] = value_render.GetValueRender(v.(string))
}
} else {
f.addFields = nil
}
return &f
}

func (f *FilterBox) PostProcess(event map[string]interface{}, success bool) map[string]interface{} {
if success {
if f.removeFields != nil {
for _, d := range f.removeFields {
d.Delete(event)
}
}
for fs, v := range f.addFields {
event = fs.SetField(event, v.Render(event), "", false)
}
} else {
if f.failTag != "" {
if tags, ok := event["tags"]; ok {
if reflect.TypeOf(tags).Kind() == reflect.String {
event["tags"] = []string{tags.(string), f.failTag}
} else if reflect.TypeOf(tags).Kind() == reflect.Array {
event["tags"] = append(tags.([]interface{}), f.failTag)
} else {
}
} else {
event["tags"] = f.failTag
}
}
}
return event
}

func (b *FilterBox) Process(event map[string]interface{}) map[string]interface{} {
var rst bool

if b.conditionFilter.Pass(event) {
event, rst = b.Filter.Filter(event)
if event == nil {
return nil
}
event = b.PostProcess(event, rst)
}
return event
}
6 changes: 3 additions & 3 deletions filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
type FiltersFilter struct {
config map[interface{}]interface{}
processorNode *topology.ProcessorNode
filterBoxes []*FilterBox
filterBoxes []*topology.FilterBox
}

func NewFiltersFilter(config map[interface{}]interface{}) *FiltersFilter {
Expand All @@ -23,7 +23,7 @@ func NewFiltersFilter(config map[interface{}]interface{}) *FiltersFilter {
_config[k.(string)] = v
}

f.filterBoxes = BuildFilterBoxes(_config)
f.filterBoxes = topology.BuildFilterBoxes(_config, BuildFilter)
if len(f.filterBoxes) == 0 {
glog.Fatal("no filters configured in Filters")
}
Expand All @@ -40,7 +40,7 @@ func (f *FiltersFilter) Filter(event map[string]interface{}) (map[string]interfa
}

func (f *FiltersFilter) SetBelongTo(next topology.Processor) {
var b *FilterBox = f.filterBoxes[len(f.filterBoxes)-1]
var b *topology.FilterBox = f.filterBoxes[len(f.filterBoxes)-1]
v := reflect.ValueOf(b.Filter)
fun := v.MethodByName("SetBelongTo")
if fun.IsValid() {
Expand Down
10 changes: 5 additions & 5 deletions input/input_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type InputBox struct {
config map[string]interface{} // whole config
input topology.Input
outputsInAllWorker [][]*output.OutputBox
outputsInAllWorker [][]*topology.OutputBox
stop bool
once sync.Once
shutdownChan chan bool
Expand Down Expand Up @@ -49,17 +49,17 @@ func (box *InputBox) beat(workerIdx int) {
}

func (box *InputBox) buildTopology(workerIdx int) *topology.ProcessorNode {
outputs := output.BuildOutputs(box.config)
outputs := topology.BuildOutputs(box.config, output.BuildOutput)
box.outputsInAllWorker[workerIdx] = outputs

var outputProcessor topology.Processor
if len(outputs) == 1 {
outputProcessor = outputs[0]
} else {
outputProcessor = (output.OutputsProcessor)(outputs)
outputProcessor = (topology.OutputsProcessor)(outputs)
}

filterBoxes := filter.BuildFilterBoxes(box.config)
filterBoxes := topology.BuildFilterBoxes(box.config, filter.BuildFilter)

// Set BelongTo
for i, b := range filterBoxes {
Expand All @@ -83,7 +83,7 @@ func (box *InputBox) buildTopology(workerIdx int) *topology.ProcessorNode {
}

func (box *InputBox) Beat(worker int) {
box.outputsInAllWorker = make([][]*output.OutputBox, worker)
box.outputsInAllWorker = make([][]*topology.OutputBox, worker)
for i := 0; i < worker; i++ {
go box.beat(i)
}
Expand Down
46 changes: 2 additions & 44 deletions output/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,7 @@ import (
"github.com/golang/glog"
)

type OutputBox struct {
topology.Output
*condition_filter.ConditionFilter
}

func BuildOutputs(config map[string]interface{}) []*OutputBox {
rst := make([]*OutputBox, 0)

for _, outputI := range config["outputs"].([]interface{}) {
// len(outputI) is 1
for outputTypeI, outputConfigI := range outputI.(map[interface{}]interface{}) {
outputType := outputTypeI.(string)
glog.Infof("output type: %s", outputType)
outputConfig := outputConfigI.(map[interface{}]interface{})
glog.Infof("output config: %v", outputConfig)
outputPlugin := BuildOutput(outputType, outputConfig)
rst = append(rst, outputPlugin)
}
}
return rst
}

func BuildOutput(outputType string, config map[interface{}]interface{}) *OutputBox {
func BuildOutput(outputType string, config map[interface{}]interface{}) *topology.OutputBox {
var output topology.Output
switch outputType {
case "Dot":
Expand Down Expand Up @@ -59,28 +37,8 @@ func BuildOutput(outputType string, config map[interface{}]interface{}) *OutputB
output = newFunc.(func(map[interface{}]interface{}) interface{})(config).(topology.Output)
}

return &OutputBox{
return &topology.OutputBox{
output,
condition_filter.NewConditionFilter(config),
}
}

// Process implement Processor interface
func (p *OutputBox) Process(event map[string]interface{}) map[string]interface{} {
if p.Pass(event) {
p.Emit(event)
}
return nil
}

type OutputsProcessor []*OutputBox

// Process implement Processor interface
func (p OutputsProcessor) Process(event map[string]interface{}) map[string]interface{} {
for _, o := range ([]*OutputBox)(p) {
if o.Pass(event) {
o.Emit(event)
}
}
return nil
}
Loading

0 comments on commit 8e8e191

Please sign in to comment.