Skip to content

Commit

Permalink
Merge pull request #839 from gkvijay/vpp-integ
Browse files Browse the repository at this point in the history
Added RemoteEndpoint/PolicyRule notifications in netplugin for VPP driver
  • Loading branch information
jojimt authored Apr 21, 2017
2 parents f6c22c9 + 67a1e8f commit b1fbada
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 10 deletions.
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ type NetworkDriver interface {
CreateEndpoint(id string) error
UpdateEndpointGroup(id string) error
DeleteEndpoint(id string) error
CreateRemoteEndpoint(id string) error
DeleteRemoteEndpoint(id string) error
CreateHostAccPort(portName, globalIP string, nw int) (string, error)
DeleteHostAccPort(id string) error
AddPeerHost(node ServiceInfo) error
Expand All @@ -139,6 +141,8 @@ type NetworkDriver interface {
// Set global config
GlobalConfigUpdate(inst InstanceInfo) error
InspectNameserver() ([]byte, error)
AddPolicyRule(id string) error
DelPolicyRule(id string) error
}

// WatchState is used to provide a difference between core.State structs by
Expand Down
20 changes: 20 additions & 0 deletions drivers/fakenetepdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ func (d *FakeNetEpDriver) DeleteEndpoint(id string) (err error) {
return core.Errorf("Not implemented")
}

// CreateRemoteEndpoint is not implemented.
func (d *FakeNetEpDriver) CreateRemoteEndpoint(id string) error {
return core.Errorf("Not implemented")
}

// DeleteRemoteEndpoint is not implemented.
func (d *FakeNetEpDriver) DeleteRemoteEndpoint(id string) (err error) {
return core.Errorf("Not implemented")
}

// CreateHostAccPort is not implemented.
func (d *FakeNetEpDriver) CreateHostAccPort(id, a string, nw int) (string, error) {
return "", core.Errorf("Not implemented")
Expand Down Expand Up @@ -123,3 +133,13 @@ func (d *FakeNetEpDriver) GlobalConfigUpdate(inst core.InstanceInfo) error {
func (d *FakeNetEpDriver) InspectNameserver() ([]byte, error) {
return []byte{}, core.Errorf("Not implemented")
}

// AddPolicyRule is not implemented
func (d *FakeNetEpDriver) AddPolicyRule(id string) error {
return core.Errorf("Not implemented")
}

// DelPolicyRule is not implemented
func (d *FakeNetEpDriver) DelPolicyRule(id string) error {
return core.Errorf("Not implemented")
}
26 changes: 25 additions & 1 deletion drivers/ovsdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,19 @@ func (d *OvsDriver) DeleteEndpoint(id string) error {
return nil
}

// CreateRemoteEndpoint creates a remote endpoint by named identifier
func (d *OvsDriver) CreateRemoteEndpoint(id string) error {

log.Debug("OVS driver ignoring remote EP create as it uses its own EP sync")
return nil
}

// DeleteRemoteEndpoint deletes a remote endpoint by named identifier
func (d *OvsDriver) DeleteRemoteEndpoint(id string) error {
log.Debug("OVS driver ignoring remote EP delete as it uses its own EP sync")
return nil
}

// AddPeerHost adds VTEPs if necessary
func (d *OvsDriver) AddPeerHost(node core.ServiceInfo) error {
// Nothing to do if this is our own IP
Expand Down Expand Up @@ -834,7 +847,6 @@ func (d *OvsDriver) GlobalConfigUpdate(inst core.InstanceInfo) error {
func (d *OvsDriver) InspectNameserver() ([]byte, error) {
if d.nameServer == nil {
return []byte{}, nil

}

ns, err := d.nameServer.InspectState()
Expand All @@ -846,3 +858,15 @@ func (d *OvsDriver) InspectNameserver() ([]byte, error) {

return jsonState, nil
}

// AddPolicyRule creates a policy rule
func (d *OvsDriver) AddPolicyRule(id string) error {
log.Debug("OVS driver ignoring PolicyRule create as it uses ofnet sync")
return nil
}

// DelPolicyRule deletes a policy rule
func (d *OvsDriver) DelPolicyRule(id string) error {
log.Debug("OVS driver ignoring PolicyRule delete as it uses ofnet sync")
return nil
}
20 changes: 20 additions & 0 deletions mgmtfn/k8splugin/kubeClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ func (d *KubeTestNetDrv) DeleteEndpoint(id string) (err error) {
return nil
}

// CreateRemoteEndpoint is not implemented.
func (d *KubeTestNetDrv) CreateRemoteEndpoint(id string) error {
return core.Errorf("Not implemented")
}

// DeleteRemoteEndpoint is not implemented.
func (d *KubeTestNetDrv) DeleteRemoteEndpoint(id string) (err error) {
return core.Errorf("Not implemented")
}

// AddPeerHost is not implemented.
func (d *KubeTestNetDrv) AddPeerHost(node core.ServiceInfo) error {
return nil
Expand Down Expand Up @@ -176,6 +186,16 @@ func (d *KubeTestNetDrv) InspectNameserver() ([]byte, error) {
return []byte{}, core.Errorf("Not implemented")
}

// AddPolicyRule is not implemented
func (d *KubeTestNetDrv) AddPolicyRule(id string) error {
return core.Errorf("Not implemented")
}

// DelPolicyRule is not implemented
func (d *KubeTestNetDrv) DelPolicyRule(id string) error {
return core.Errorf("Not implemented")
}

// AddSvcSpec is implemented.
func (d *KubeTestNetDrv) AddSvcSpec(svcName string, spec *core.ServiceSpec) error {
d.services[svcName] = spec
Expand Down
84 changes: 84 additions & 0 deletions netmaster/mastercfg/policyRuleState.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/***
Copyright 2017 Cisco Systems Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mastercfg

import (
"encoding/json"
"fmt"

"github.com/contiv/netplugin/core"
"github.com/contiv/ofnet"
)

const (
policyRuleConfigPathPrefix = StateConfigPath + "policyRule/"
policyRuleConfigPath = policyRuleConfigPathPrefix + "%s"
)

// CfgPolicyRule implements the State interface for policy rules
type CfgPolicyRule struct {
core.CommonState
ofnet.OfnetPolicyRule
}

// Write the state.
func (s *CfgPolicyRule) Write() error {
key := fmt.Sprintf(policyRuleConfigPath, s.RuleId)
return s.StateDriver.WriteState(key, s, json.Marshal)
}

// Read the state for a given identifier.
func (s *CfgPolicyRule) Read(id string) error {
key := fmt.Sprintf(policyRuleConfigPath, id)
return s.StateDriver.ReadState(key, s, json.Unmarshal)
}

// ReadAll reads all state objects for the policy rules.
func (s *CfgPolicyRule) ReadAll() ([]core.State, error) {
return s.StateDriver.ReadAllState(policyRuleConfigPathPrefix, s, json.Unmarshal)
}

// WatchAll fills a channel on each state event related to policy rules.
func (s *CfgPolicyRule) WatchAll(rsps chan core.WatchState) error {
return s.StateDriver.WatchAllState(policyRuleConfigPathPrefix, s, json.Unmarshal,
rsps)
}

// Clear removes the state.
func (s *CfgPolicyRule) Clear() error {
key := fmt.Sprintf(policyRuleConfigPath, s.RuleId)
return s.StateDriver.ClearState(key)
}

// addPolicyRuleState adds policy rule to state store
func addPolicyRuleState(ofnetRule *ofnet.OfnetPolicyRule) error {
ruleCfg := &CfgPolicyRule{}
ruleCfg.StateDriver = stateStore
ruleCfg.OfnetPolicyRule = (*ofnetRule)

// Save the rule
return ruleCfg.Write()
}

// delPolicyRuleState deletes policy rule from state store
func delPolicyRuleState(ofnetRule *ofnet.OfnetPolicyRule) error {
ruleCfg := &CfgPolicyRule{}
ruleCfg.StateDriver = stateStore
ruleCfg.OfnetPolicyRule = (*ofnetRule)

// Delete the rule
return ruleCfg.Clear()
}
119 changes: 119 additions & 0 deletions netmaster/mastercfg/policyRuleState_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/***
Copyright 2014 Cisco Systems Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mastercfg

import (
"testing"

"github.com/contiv/netplugin/core"
)

const (
testRuleID = "testPolicyRule"
ruleCfgKey = policyRuleConfigPathPrefix + testRuleID
)

type testRuleStateDriver struct{}

var policyRuleStateDriver = &testRuleStateDriver{}

func (d *testRuleStateDriver) Init(instInfo *core.InstanceInfo) error {
return core.Errorf("Shouldn't be called!")
}

func (d *testRuleStateDriver) Deinit() {
}

func (d *testRuleStateDriver) Write(key string, value []byte) error {
return core.Errorf("Shouldn't be called!")
}

func (d *testRuleStateDriver) Read(key string) ([]byte, error) {
return []byte{}, core.Errorf("Shouldn't be called!")
}

func (d *testRuleStateDriver) ReadAll(baseKey string) ([][]byte, error) {
return [][]byte{}, core.Errorf("Shouldn't be called!")
}

func (d *testRuleStateDriver) WatchAll(baseKey string, rsps chan [2][]byte) error {
return core.Errorf("not supported")
}

func (d *testRuleStateDriver) validateKey(key string) error {
if key != ruleCfgKey {
return core.Errorf("Unexpected key. recvd: %s expected: %s ",
key, ruleCfgKey)
}

return nil
}

func (d *testRuleStateDriver) ClearState(key string) error {
return d.validateKey(key)
}

func (d *testRuleStateDriver) ReadState(key string, value core.State,
unmarshal func([]byte, interface{}) error) error {
return d.validateKey(key)
}

func (d *testRuleStateDriver) ReadAllState(key string, value core.State,
unmarshal func([]byte, interface{}) error) ([]core.State, error) {
return nil, core.Errorf("Shouldn't be called!")
}

func (d *testRuleStateDriver) WatchAllState(baseKey string, sType core.State,
unmarshal func([]byte, interface{}) error, rsps chan core.WatchState) error {
return core.Errorf("not supported")
}

func (d *testRuleStateDriver) WriteState(key string, value core.State,
marshal func(interface{}) ([]byte, error)) error {
return d.validateKey(key)
}

func TestCfgPolicyRuleRead(t *testing.T) {
ruleCfg := &CfgPolicyRule{}
ruleCfg.StateDriver = policyRuleStateDriver

err := ruleCfg.Read(testRuleID)
if err != nil {
t.Fatalf("read config state failed. Error: %s", err)
}
}

func TestCfgPolicyRuleWrite(t *testing.T) {
ruleCfg := &CfgPolicyRule{}
ruleCfg.StateDriver = policyRuleStateDriver
ruleCfg.RuleId = testRuleID

err := ruleCfg.Write()
if err != nil {
t.Fatalf("write config state failed. Error: %s", err)
}
}

func TestCfgPolicyRuleClear(t *testing.T) {
ruleCfg := &CfgPolicyRule{}
ruleCfg.StateDriver = policyRuleStateDriver
ruleCfg.RuleId = testRuleID

err := ruleCfg.Clear()
if err != nil {
t.Fatalf("clear config state failed. Error: %s", err)
}
}
13 changes: 13 additions & 0 deletions netmaster/mastercfg/policyState.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ func (gp *EpgPolicy) createOfnetRule(rule *contivModel.Rule, dir string) (*ofnet
return nil, err
}

// Send AddRule to netplugin agents
err = addPolicyRuleState(ofnetRule)
if err != nil {
log.Errorf("Error creating rule {%+v}. Err: %v", ofnetRule, err)
return nil, err
}

log.Infof("Added rule {%+v} to policyDB", ofnetRule)

return ofnetRule, nil
Expand Down Expand Up @@ -371,6 +378,12 @@ func (gp *EpgPolicy) DelRule(rule *contivModel.Rule) error {
if err != nil {
log.Errorf("Error deleting the ofnet rule {%+v}. Err: %v", ofnetRule, err)
}

// Send DelRule to netplugin agents
err = delPolicyRuleState(ofnetRule)
if err != nil {
log.Errorf("Error deleting the ofnet rule {%+v}. Err: %v", ofnetRule, err)
}
}

// delete the cache
Expand Down
4 changes: 4 additions & 0 deletions netplugin/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func (ag *Agent) HandleEvents() error {

go handleBgpEvents(ag.netPlugin, opts, recvErr)

go handleEndpointEvents(ag.netPlugin, opts, recvErr)

go handleEpgEvents(ag.netPlugin, opts, recvErr)

go handleServiceLBEvents(ag.netPlugin, opts, recvErr)
Expand All @@ -214,6 +216,8 @@ func (ag *Agent) HandleEvents() error {

go handleGlobalCfgEvents(ag.netPlugin, opts, recvErr)

go handlePolicyRuleEvents(ag.netPlugin, opts, recvErr)

if ag.pluginConfig.Instance.PluginMode == "docker" {
go ag.monitorDockerEvents(recvErr)
} else if ag.pluginConfig.Instance.PluginMode == "kubernetes" {
Expand Down
Loading

0 comments on commit b1fbada

Please sign in to comment.