diff --git a/core/core.go b/core/core.go index cbc8cbd4a..1e3037080 100755 --- a/core/core.go +++ b/core/core.go @@ -116,6 +116,8 @@ type NetworkDriver interface { CreateEndpoint(id string) error UpdateEndpointGroup(id string) error DeleteEndpoint(id string) error + CreateRemoteEndpoint(id string) error + DeleteRemoteEndpoint(id string) error CreateHostAccPort(portName, globalIP string, nw int) (string, error) DeleteHostAccPort(id string) error AddPeerHost(node ServiceInfo) error @@ -139,6 +141,8 @@ type NetworkDriver interface { // Set global config GlobalConfigUpdate(inst InstanceInfo) error InspectNameserver() ([]byte, error) + AddPolicyRule(id string) error + DelPolicyRule(id string) error } // WatchState is used to provide a difference between core.State structs by diff --git a/drivers/fakenetepdriver.go b/drivers/fakenetepdriver.go index 07ea71095..09596cdb9 100755 --- a/drivers/fakenetepdriver.go +++ b/drivers/fakenetepdriver.go @@ -45,6 +45,16 @@ func (d *FakeNetEpDriver) DeleteEndpoint(id string) (err error) { return core.Errorf("Not implemented") } +// CreateRemoteEndpoint is not implemented. +func (d *FakeNetEpDriver) CreateRemoteEndpoint(id string) error { + return core.Errorf("Not implemented") +} + +// DeleteRemoteEndpoint is not implemented. +func (d *FakeNetEpDriver) DeleteRemoteEndpoint(id string) (err error) { + return core.Errorf("Not implemented") +} + // CreateHostAccPort is not implemented. func (d *FakeNetEpDriver) CreateHostAccPort(id, a string, nw int) (string, error) { return "", core.Errorf("Not implemented") @@ -123,3 +133,13 @@ func (d *FakeNetEpDriver) GlobalConfigUpdate(inst core.InstanceInfo) error { func (d *FakeNetEpDriver) InspectNameserver() ([]byte, error) { return []byte{}, core.Errorf("Not implemented") } + +// AddPolicyRule is not implemented +func (d *FakeNetEpDriver) AddPolicyRule(id string) error { + return core.Errorf("Not implemented") +} + +// DelPolicyRule is not implemented +func (d *FakeNetEpDriver) DelPolicyRule(id string) error { + return core.Errorf("Not implemented") +} diff --git a/drivers/ovsdriver.go b/drivers/ovsdriver.go index 1e7d27ca4..df7a5a32b 100644 --- a/drivers/ovsdriver.go +++ b/drivers/ovsdriver.go @@ -556,6 +556,19 @@ func (d *OvsDriver) DeleteEndpoint(id string) error { return nil } +// CreateRemoteEndpoint creates a remote endpoint by named identifier +func (d *OvsDriver) CreateRemoteEndpoint(id string) error { + + log.Debug("OVS driver ignoring remote EP create as it uses its own EP sync") + return nil +} + +// DeleteRemoteEndpoint deletes a remote endpoint by named identifier +func (d *OvsDriver) DeleteRemoteEndpoint(id string) error { + log.Debug("OVS driver ignoring remote EP delete as it uses its own EP sync") + return nil +} + // AddPeerHost adds VTEPs if necessary func (d *OvsDriver) AddPeerHost(node core.ServiceInfo) error { // Nothing to do if this is our own IP @@ -834,7 +847,6 @@ func (d *OvsDriver) GlobalConfigUpdate(inst core.InstanceInfo) error { func (d *OvsDriver) InspectNameserver() ([]byte, error) { if d.nameServer == nil { return []byte{}, nil - } ns, err := d.nameServer.InspectState() @@ -846,3 +858,15 @@ func (d *OvsDriver) InspectNameserver() ([]byte, error) { return jsonState, nil } + +// AddPolicyRule creates a policy rule +func (d *OvsDriver) AddPolicyRule(id string) error { + log.Debug("OVS driver ignoring PolicyRule create as it uses ofnet sync") + return nil +} + +// DelPolicyRule deletes a policy rule +func (d *OvsDriver) DelPolicyRule(id string) error { + log.Debug("OVS driver ignoring PolicyRule delete as it uses ofnet sync") + return nil +} diff --git a/mgmtfn/k8splugin/kubeClient_test.go b/mgmtfn/k8splugin/kubeClient_test.go index b3f3e7384..83bd33d0c 100644 --- a/mgmtfn/k8splugin/kubeClient_test.go +++ b/mgmtfn/k8splugin/kubeClient_test.go @@ -111,6 +111,16 @@ func (d *KubeTestNetDrv) DeleteEndpoint(id string) (err error) { return nil } +// CreateRemoteEndpoint is not implemented. +func (d *KubeTestNetDrv) CreateRemoteEndpoint(id string) error { + return core.Errorf("Not implemented") +} + +// DeleteRemoteEndpoint is not implemented. +func (d *KubeTestNetDrv) DeleteRemoteEndpoint(id string) (err error) { + return core.Errorf("Not implemented") +} + // AddPeerHost is not implemented. func (d *KubeTestNetDrv) AddPeerHost(node core.ServiceInfo) error { return nil @@ -176,6 +186,16 @@ func (d *KubeTestNetDrv) InspectNameserver() ([]byte, error) { return []byte{}, core.Errorf("Not implemented") } +// AddPolicyRule is not implemented +func (d *KubeTestNetDrv) AddPolicyRule(id string) error { + return core.Errorf("Not implemented") +} + +// DelPolicyRule is not implemented +func (d *KubeTestNetDrv) DelPolicyRule(id string) error { + return core.Errorf("Not implemented") +} + // AddSvcSpec is implemented. func (d *KubeTestNetDrv) AddSvcSpec(svcName string, spec *core.ServiceSpec) error { d.services[svcName] = spec diff --git a/netmaster/mastercfg/policyRuleState.go b/netmaster/mastercfg/policyRuleState.go new file mode 100755 index 000000000..0479f076a --- /dev/null +++ b/netmaster/mastercfg/policyRuleState.go @@ -0,0 +1,84 @@ +/*** +Copyright 2017 Cisco Systems Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mastercfg + +import ( + "encoding/json" + "fmt" + + "github.com/contiv/netplugin/core" + "github.com/contiv/ofnet" +) + +const ( + policyRuleConfigPathPrefix = StateConfigPath + "policyRule/" + policyRuleConfigPath = policyRuleConfigPathPrefix + "%s" +) + +// CfgPolicyRule implements the State interface for policy rules +type CfgPolicyRule struct { + core.CommonState + ofnet.OfnetPolicyRule +} + +// Write the state. +func (s *CfgPolicyRule) Write() error { + key := fmt.Sprintf(policyRuleConfigPath, s.RuleId) + return s.StateDriver.WriteState(key, s, json.Marshal) +} + +// Read the state for a given identifier. +func (s *CfgPolicyRule) Read(id string) error { + key := fmt.Sprintf(policyRuleConfigPath, id) + return s.StateDriver.ReadState(key, s, json.Unmarshal) +} + +// ReadAll reads all state objects for the policy rules. +func (s *CfgPolicyRule) ReadAll() ([]core.State, error) { + return s.StateDriver.ReadAllState(policyRuleConfigPathPrefix, s, json.Unmarshal) +} + +// WatchAll fills a channel on each state event related to policy rules. +func (s *CfgPolicyRule) WatchAll(rsps chan core.WatchState) error { + return s.StateDriver.WatchAllState(policyRuleConfigPathPrefix, s, json.Unmarshal, + rsps) +} + +// Clear removes the state. +func (s *CfgPolicyRule) Clear() error { + key := fmt.Sprintf(policyRuleConfigPath, s.RuleId) + return s.StateDriver.ClearState(key) +} + +// addPolicyRuleState adds policy rule to state store +func addPolicyRuleState(ofnetRule *ofnet.OfnetPolicyRule) error { + ruleCfg := &CfgPolicyRule{} + ruleCfg.StateDriver = stateStore + ruleCfg.OfnetPolicyRule = (*ofnetRule) + + // Save the rule + return ruleCfg.Write() +} + +// delPolicyRuleState deletes policy rule from state store +func delPolicyRuleState(ofnetRule *ofnet.OfnetPolicyRule) error { + ruleCfg := &CfgPolicyRule{} + ruleCfg.StateDriver = stateStore + ruleCfg.OfnetPolicyRule = (*ofnetRule) + + // Delete the rule + return ruleCfg.Clear() +} diff --git a/netmaster/mastercfg/policyRuleState_test.go b/netmaster/mastercfg/policyRuleState_test.go new file mode 100644 index 000000000..a3199ed66 --- /dev/null +++ b/netmaster/mastercfg/policyRuleState_test.go @@ -0,0 +1,119 @@ +/*** +Copyright 2014 Cisco Systems Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mastercfg + +import ( + "testing" + + "github.com/contiv/netplugin/core" +) + +const ( + testRuleID = "testPolicyRule" + ruleCfgKey = policyRuleConfigPathPrefix + testRuleID +) + +type testRuleStateDriver struct{} + +var policyRuleStateDriver = &testRuleStateDriver{} + +func (d *testRuleStateDriver) Init(instInfo *core.InstanceInfo) error { + return core.Errorf("Shouldn't be called!") +} + +func (d *testRuleStateDriver) Deinit() { +} + +func (d *testRuleStateDriver) Write(key string, value []byte) error { + return core.Errorf("Shouldn't be called!") +} + +func (d *testRuleStateDriver) Read(key string) ([]byte, error) { + return []byte{}, core.Errorf("Shouldn't be called!") +} + +func (d *testRuleStateDriver) ReadAll(baseKey string) ([][]byte, error) { + return [][]byte{}, core.Errorf("Shouldn't be called!") +} + +func (d *testRuleStateDriver) WatchAll(baseKey string, rsps chan [2][]byte) error { + return core.Errorf("not supported") +} + +func (d *testRuleStateDriver) validateKey(key string) error { + if key != ruleCfgKey { + return core.Errorf("Unexpected key. recvd: %s expected: %s ", + key, ruleCfgKey) + } + + return nil +} + +func (d *testRuleStateDriver) ClearState(key string) error { + return d.validateKey(key) +} + +func (d *testRuleStateDriver) ReadState(key string, value core.State, + unmarshal func([]byte, interface{}) error) error { + return d.validateKey(key) +} + +func (d *testRuleStateDriver) ReadAllState(key string, value core.State, + unmarshal func([]byte, interface{}) error) ([]core.State, error) { + return nil, core.Errorf("Shouldn't be called!") +} + +func (d *testRuleStateDriver) WatchAllState(baseKey string, sType core.State, + unmarshal func([]byte, interface{}) error, rsps chan core.WatchState) error { + return core.Errorf("not supported") +} + +func (d *testRuleStateDriver) WriteState(key string, value core.State, + marshal func(interface{}) ([]byte, error)) error { + return d.validateKey(key) +} + +func TestCfgPolicyRuleRead(t *testing.T) { + ruleCfg := &CfgPolicyRule{} + ruleCfg.StateDriver = policyRuleStateDriver + + err := ruleCfg.Read(testRuleID) + if err != nil { + t.Fatalf("read config state failed. Error: %s", err) + } +} + +func TestCfgPolicyRuleWrite(t *testing.T) { + ruleCfg := &CfgPolicyRule{} + ruleCfg.StateDriver = policyRuleStateDriver + ruleCfg.RuleId = testRuleID + + err := ruleCfg.Write() + if err != nil { + t.Fatalf("write config state failed. Error: %s", err) + } +} + +func TestCfgPolicyRuleClear(t *testing.T) { + ruleCfg := &CfgPolicyRule{} + ruleCfg.StateDriver = policyRuleStateDriver + ruleCfg.RuleId = testRuleID + + err := ruleCfg.Clear() + if err != nil { + t.Fatalf("clear config state failed. Error: %s", err) + } +} diff --git a/netmaster/mastercfg/policyState.go b/netmaster/mastercfg/policyState.go index 1f0f26db6..da5b733c8 100644 --- a/netmaster/mastercfg/policyState.go +++ b/netmaster/mastercfg/policyState.go @@ -293,6 +293,13 @@ func (gp *EpgPolicy) createOfnetRule(rule *contivModel.Rule, dir string) (*ofnet return nil, err } + // Send AddRule to netplugin agents + err = addPolicyRuleState(ofnetRule) + if err != nil { + log.Errorf("Error creating rule {%+v}. Err: %v", ofnetRule, err) + return nil, err + } + log.Infof("Added rule {%+v} to policyDB", ofnetRule) return ofnetRule, nil @@ -371,6 +378,12 @@ func (gp *EpgPolicy) DelRule(rule *contivModel.Rule) error { if err != nil { log.Errorf("Error deleting the ofnet rule {%+v}. Err: %v", ofnetRule, err) } + + // Send DelRule to netplugin agents + err = delPolicyRuleState(ofnetRule) + if err != nil { + log.Errorf("Error deleting the ofnet rule {%+v}. Err: %v", ofnetRule, err) + } } // delete the cache diff --git a/netplugin/agent/agent.go b/netplugin/agent/agent.go index 5830cb403..d6585d97b 100644 --- a/netplugin/agent/agent.go +++ b/netplugin/agent/agent.go @@ -206,6 +206,8 @@ func (ag *Agent) HandleEvents() error { go handleBgpEvents(ag.netPlugin, opts, recvErr) + go handleEndpointEvents(ag.netPlugin, opts, recvErr) + go handleEpgEvents(ag.netPlugin, opts, recvErr) go handleServiceLBEvents(ag.netPlugin, opts, recvErr) @@ -214,6 +216,8 @@ func (ag *Agent) HandleEvents() error { go handleGlobalCfgEvents(ag.netPlugin, opts, recvErr) + go handlePolicyRuleEvents(ag.netPlugin, opts, recvErr) + if ag.pluginConfig.Instance.PluginMode == "docker" { go ag.monitorDockerEvents(recvErr) } else if ag.pluginConfig.Instance.PluginMode == "kubernetes" { diff --git a/netplugin/agent/state_event.go b/netplugin/agent/state_event.go index c39571690..0950c4d49 100644 --- a/netplugin/agent/state_event.go +++ b/netplugin/agent/state_event.go @@ -35,7 +35,7 @@ const ( contivVxGWName = "contivh1" ) -func skipHost(vtepIP, homingHost, myHostLabel string) bool { +func checkRemoteHost(vtepIP, homingHost, myHostLabel string) bool { return (vtepIP == "" && homingHost != myHostLabel || vtepIP != "" && homingHost == myHostLabel) } @@ -207,9 +207,9 @@ func processNetEvent(netPlugin *plugin.NetPlugin, nwCfg *mastercfg.CfgNetworkSta } } if err != nil { - log.Errorf("Network operation %s failed. Error: %s", operStr, err) + log.Errorf("Network %s operation %s failed. Error: %s", nwCfg.ID, operStr, err) } else { - log.Infof("Network operation %s succeeded", operStr) + log.Infof("Network %s operation %s succeeded", nwCfg.ID, operStr) } return @@ -231,15 +231,17 @@ func processEpState(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, epID st log.Errorf("Failed to read config for ep '%s' \n", epID) return err } - // if the endpoint is not for this host, ignore it - if skipHost(epCfg.VtepIP, epCfg.HomingHost, opts.HostLabel) { - log.Infof("skipping mismatching host for ep %s. EP's host %s (my host: %s)", - epID, epCfg.HomingHost, opts.HostLabel) - return nil + eptype := "local" + if checkRemoteHost(epCfg.VtepIP, epCfg.HomingHost, opts.HostLabel) { + eptype = "remote" } // Create the endpoint - err = netPlugin.CreateEndpoint(epID) + if eptype == "local" { + err = netPlugin.CreateEndpoint(epID) + } else { + err = netPlugin.CreateRemoteEndpoint(epID) + } if err != nil { log.Errorf("Endpoint operation create failed. Error: %s", err) return err @@ -250,6 +252,38 @@ func processEpState(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, epID st return err } +// processRemoteEpState updates endpoint state +func processRemoteEpState(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, epCfg *mastercfg.CfgEndpointState, isDelete bool) error { + // take a lock to ensure we are programming one event at a time. + netPlugin.Lock() + defer netPlugin.Unlock() + + if !checkRemoteHost(epCfg.VtepIP, epCfg.HomingHost, opts.HostLabel) { + // Skip local endpoint update, as they are handled directly in dockplugin + return nil + } + + if isDelete { + // Delete remote endpoint + err := netPlugin.DeleteRemoteEndpoint(epCfg.ID) + if err != nil { + log.Errorf("Endpoint %s delete operation failed. Error: %s", epCfg.ID, err) + return err + } + log.Infof("Endpoint %s delete operation succeeded", epCfg.ID) + } else { + // Create remote endpoint + err := netPlugin.CreateRemoteEndpoint(epCfg.ID) + if err != nil { + log.Errorf("Endpoint %s create operation failed. Error: %s", epCfg.ID, err) + return err + } + log.Infof("Endpoint %s create operation succeeded", epCfg.ID) + } + + return nil +} + //processBgpEvent processes Bgp neighbor add/delete events func processBgpEvent(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, hostID string, isDelete bool) error { var err error @@ -446,6 +480,41 @@ func processSvcProviderUpdEvent(netPlugin *plugin.NetPlugin, svcProvider *master return nil } +// processPolicyRuleState updates policy rule state +func processPolicyRuleState(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, ruleID string, isDelete bool) error { + netPlugin.Lock() + defer netPlugin.Unlock() + + // read policy config + ruleCfg := &mastercfg.CfgPolicyRule{} + ruleCfg.StateDriver = netPlugin.StateDriver + + err := ruleCfg.Read(ruleID) + if err != nil { + log.Errorf("Failed to read config for policy rule '%s' \n", ruleID) + return err + } + if isDelete { + // Delete endpoint + err = netPlugin.DelPolicyRule(ruleID) + if err != nil { + log.Errorf("PolicyRule %s delete operation failed. Error: %s", ruleID, err) + return err + } + log.Infof("PolicyRule %s delete operation succeeded", ruleID) + } else { + // Create endpoint + err = netPlugin.AddPolicyRule(ruleID) + if err != nil { + log.Errorf("PolicyRule %s create operation failed. Error: %s", ruleID, err) + return err + } + log.Infof("PolicyRule %s create operation succeeded", ruleID) + } + + return err +} + func processStateEvent(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, rsps chan core.WatchState) { for { // block on change notifications @@ -507,6 +576,10 @@ func processStateEvent(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, rsps processNetEvent(netPlugin, nwCfg, isDelete, opts) } } + if epCfg, ok := currentState.(*mastercfg.CfgEndpointState); ok { + log.Infof("Received %q for Endpoint: %q", eventStr, epCfg.ID) + processRemoteEpState(netPlugin, opts, epCfg, isDelete) + } if bgpCfg, ok := currentState.(*mastercfg.CfgBgpState); ok { log.Infof("Received %q for Bgp: %q", eventStr, bgpCfg.Hostname) processBgpEvent(netPlugin, opts, bgpCfg.Hostname, isDelete) @@ -526,6 +599,10 @@ func processStateEvent(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, rsps svcProvider.ServiceName, svcProvider.Providers) processSvcProviderUpdEvent(netPlugin, svcProvider, isDelete) } + if ruleCfg, ok := currentState.(*mastercfg.CfgPolicyRule); ok { + log.Infof("Received %q for PolicyRule: %q", eventStr, ruleCfg.RuleId) + processPolicyRuleState(netPlugin, opts, ruleCfg.RuleId, isDelete) + } } } @@ -548,6 +625,15 @@ func handleBgpEvents(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, recvEr log.Errorf("Error from handleBgpEvents") } +func handleEndpointEvents(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, retErr chan error) { + rsps := make(chan core.WatchState) + go processStateEvent(netPlugin, opts, rsps) + cfg := mastercfg.CfgEndpointState{} + cfg.StateDriver = netPlugin.StateDriver + retErr <- cfg.WatchAll(rsps) + log.Errorf("Error from handleEndpointEvents") +} + func handleEpgEvents(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, recvErr chan error) { rsps := make(chan core.WatchState) @@ -586,3 +672,12 @@ func handleGlobalCfgEvents(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, recvErr <- cfg.WatchAll(rsps) log.Errorf("Error from handleGlobalCfgEvents") } + +func handlePolicyRuleEvents(netPlugin *plugin.NetPlugin, opts core.InstanceInfo, retErr chan error) { + rsps := make(chan core.WatchState) + go processStateEvent(netPlugin, opts, rsps) + cfg := mastercfg.CfgPolicyRule{} + cfg.StateDriver = netPlugin.StateDriver + retErr <- cfg.WatchAll(rsps) + log.Errorf("Error from handlePolicyRuleEvents") +} diff --git a/netplugin/plugin/netplugin.go b/netplugin/plugin/netplugin.go index 258e31fa3..592fe86bd 100755 --- a/netplugin/plugin/netplugin.go +++ b/netplugin/plugin/netplugin.go @@ -149,6 +149,16 @@ func (p *NetPlugin) DeleteEndpoint(id string) error { return p.NetworkDriver.DeleteEndpoint(id) } +// CreateRemoteEndpoint creates an endpoint for a given ID. +func (p *NetPlugin) CreateRemoteEndpoint(id string) error { + return p.NetworkDriver.CreateRemoteEndpoint(id) +} + +// DeleteRemoteEndpoint destroys an endpoint for an ID. +func (p *NetPlugin) DeleteRemoteEndpoint(id string) error { + return p.NetworkDriver.DeleteRemoteEndpoint(id) +} + // CreateHostAccPort creates a host access port func (p *NetPlugin) CreateHostAccPort(portName, globalIP string) (string, error) { p.Lock() @@ -325,3 +335,13 @@ func (p *NetPlugin) DelSvcSpec(svcName string, spec *core.ServiceSpec) { defer p.Unlock() p.NetworkDriver.DelSvcSpec(svcName, spec) } + +// AddPolicyRule creates a policy rule +func (p *NetPlugin) AddPolicyRule(id string) error { + return p.NetworkDriver.AddPolicyRule(id) +} + +// DelPolicyRule creates a policy rule +func (p *NetPlugin) DelPolicyRule(id string) error { + return p.NetworkDriver.DelPolicyRule(id) +}