From 29bd659ac36b743f40c6602213ff66254b3df763 Mon Sep 17 00:00:00 2001 From: Casey Davenport Date: Fri, 2 Dec 2016 13:19:48 -0800 Subject: [PATCH 1/3] Support Apply for weps to set status.PodIP --- lib/backend/k8s/k8s.go | 30 ++++++++++++++++++++++++++++-- lib/backend/k8s/k8s_fv_test.go | 4 +++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/lib/backend/k8s/k8s.go b/lib/backend/k8s/k8s.go index 5c9a1bc4a..09458bb77 100644 --- a/lib/backend/k8s/k8s.go +++ b/lib/backend/k8s/k8s.go @@ -114,8 +114,13 @@ func (c *KubeClient) Update(d *model.KVPair) (*model.KVPair, error) { // Set an existing entry in the datastore. This ignores whether an entry already // exists. func (c *KubeClient) Apply(d *model.KVPair) (*model.KVPair, error) { - log.Infof("Ignoring 'Apply' for %s", d.Key) - return d, nil + switch d.Key.(type) { + case model.WorkloadEndpointKey: + return c.applyWorkloadEndpoint(d) + default: + log.Infof("Ignoring 'Apply' for %s", d.Key) + return d, nil + } } // Delete an entry in the datastore. This is a no-op when using the k8s backend. @@ -214,6 +219,27 @@ func (c *KubeClient) getProfile(k model.ProfileKey) (*model.KVPair, error) { return c.converter.namespaceToProfile(namespace) } +// applyWorkloadEndpoint patches the existing Pod to include an IP address, if +// one has been set on the workload endpoint. +func (c *KubeClient) applyWorkloadEndpoint(k *model.KVPair) (*model.KVPair, error) { + ips := k.Value.(*model.WorkloadEndpoint).IPv4Nets + if len(ips) > 0 { + ns, name := c.converter.parseWorkloadID(k.Key.(model.WorkloadEndpointKey).WorkloadID) + pod, err := c.clientSet.Pods(ns).Get(name) + if err != nil { + return nil, err + } + pod.Status.PodIP = ips[0].IP.String() + pod, err = c.clientSet.Pods(ns).Update(pod) + if err != nil { + return nil, err + } + log.Debugf("Successfully applied pod: %+v", pod) + return c.converter.podToWorkloadEndpoint(pod) + } + return k, nil +} + // listWorkloadEndpoints lists WorkloadEndpoints from the k8s API based on existing Pods. func (c *KubeClient) listWorkloadEndpoints(l model.WorkloadEndpointListOptions) ([]*model.KVPair, error) { // If a workload is provided, we can do an exact lookup of this diff --git a/lib/backend/k8s/k8s_fv_test.go b/lib/backend/k8s/k8s_fv_test.go index 39049db86..b93edc673 100644 --- a/lib/backend/k8s/k8s_fv_test.go +++ b/lib/backend/k8s/k8s_fv_test.go @@ -283,7 +283,9 @@ var _ = Describe("Test Syncer API for Kubernetes backend", func() { Expect(len(weps)).To(Equal(1)) // Perform a Get and ensure no error in the Calico API. - _, err = c.Get(model.WorkloadEndpointKey{WorkloadID: fmt.Sprintf("default.%s", pod.ObjectMeta.Name)}) + wep, err := c.Get(model.WorkloadEndpointKey{WorkloadID: fmt.Sprintf("default.%s", pod.ObjectMeta.Name)}) + Expect(err).NotTo(HaveOccurred()) + _, err = c.Apply(wep) Expect(err).NotTo(HaveOccurred()) }) From 93d0e155f084dcbfa18a685db4369a6a98e376a8 Mon Sep 17 00:00:00 2001 From: Casey Davenport Date: Fri, 2 Dec 2016 15:41:51 -0800 Subject: [PATCH 2/3] PodStatus instead of Pod --- lib/backend/k8s/k8s.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/backend/k8s/k8s.go b/lib/backend/k8s/k8s.go index 09458bb77..9d7d1e706 100644 --- a/lib/backend/k8s/k8s.go +++ b/lib/backend/k8s/k8s.go @@ -224,13 +224,15 @@ func (c *KubeClient) getProfile(k model.ProfileKey) (*model.KVPair, error) { func (c *KubeClient) applyWorkloadEndpoint(k *model.KVPair) (*model.KVPair, error) { ips := k.Value.(*model.WorkloadEndpoint).IPv4Nets if len(ips) > 0 { + log.Debugf("Applying workload with IPs: %+v", ips) ns, name := c.converter.parseWorkloadID(k.Key.(model.WorkloadEndpointKey).WorkloadID) pod, err := c.clientSet.Pods(ns).Get(name) if err != nil { return nil, err } pod.Status.PodIP = ips[0].IP.String() - pod, err = c.clientSet.Pods(ns).Update(pod) + log.Debugf("Pod IP: %+v", pod.Status.PodIP) + pod, err = c.clientSet.Pods(ns).UpdateStatus(pod) if err != nil { return nil, err } From 9f1acb4210c0d6c050abda012372c29ba8c543b7 Mon Sep 17 00:00:00 2001 From: Casey Davenport Date: Mon, 5 Dec 2016 17:15:12 -0800 Subject: [PATCH 3/3] Add error conversion from k8s errors -> libcalico errors --- lib/backend/k8s/k8s.go | 21 +++++++++--------- lib/backend/k8s/util.go | 47 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 11 deletions(-) create mode 100644 lib/backend/k8s/util.go diff --git a/lib/backend/k8s/k8s.go b/lib/backend/k8s/k8s.go index 9d7d1e706..7d785a738 100644 --- a/lib/backend/k8s/k8s.go +++ b/lib/backend/k8s/k8s.go @@ -77,13 +77,13 @@ func NewKubeClient(kc *KubeConfig) (*KubeClient, error) { config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( &loadingRules, configOverrides).ClientConfig() if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, nil) } // Create the clientset cs, err := kubernetes.NewForConfig(config) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, nil) } log.Debugf("Created k8s clientSet: %+v", cs) return &KubeClient{clientSet: cs}, nil @@ -187,7 +187,7 @@ func (c *KubeClient) listProfiles(l model.ProfileListOptions) ([]*model.KVPair, // Otherwise, enumerate all. namespaces, err := c.clientSet.Namespaces().List(k8sapi.ListOptions{}) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, l) } // For each Namespace, return a profile. @@ -213,7 +213,7 @@ func (c *KubeClient) getProfile(k model.ProfileKey) (*model.KVPair, error) { } namespace, err := c.clientSet.Namespaces().Get(namespaceName) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, k) } return c.converter.namespaceToProfile(namespace) @@ -228,13 +228,12 @@ func (c *KubeClient) applyWorkloadEndpoint(k *model.KVPair) (*model.KVPair, erro ns, name := c.converter.parseWorkloadID(k.Key.(model.WorkloadEndpointKey).WorkloadID) pod, err := c.clientSet.Pods(ns).Get(name) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, k.Key) } pod.Status.PodIP = ips[0].IP.String() - log.Debugf("Pod IP: %+v", pod.Status.PodIP) pod, err = c.clientSet.Pods(ns).UpdateStatus(pod) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, k.Key) } log.Debugf("Successfully applied pod: %+v", pod) return c.converter.podToWorkloadEndpoint(pod) @@ -265,7 +264,7 @@ func (c *KubeClient) listWorkloadEndpoints(l model.WorkloadEndpointListOptions) // We don't yet support hostname, orchestratorID, for the k8s backend. pods, err := c.clientSet.Pods("").List(k8sapi.ListOptions{}) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, l) } // For each Pod, return a workload endpoint. @@ -293,7 +292,7 @@ func (c *KubeClient) getWorkloadEndpoint(k model.WorkloadEndpointKey) (*model.KV pod, err := c.clientSet.Pods(namespace).Get(podName) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, k) } // Decide if this pod should be displayed. @@ -322,7 +321,7 @@ func (c *KubeClient) listPolicies(l model.PolicyListOptions) ([]*model.KVPair, e Timeout(10 * time.Second). Do().Into(&networkPolicies) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, l) } // For each policy, turn it into a Policy and generate the list. @@ -354,7 +353,7 @@ func (c *KubeClient) getPolicy(k model.PolicyKey) (*model.KVPair, error) { Timeout(10 * time.Second). Do().Into(&networkPolicy) if err != nil { - return nil, err + return nil, k8sErrorToCalico(err, k) } return c.converter.networkPolicyToPolicy(&networkPolicy) } diff --git a/lib/backend/k8s/util.go b/lib/backend/k8s/util.go new file mode 100644 index 000000000..46a92058e --- /dev/null +++ b/lib/backend/k8s/util.go @@ -0,0 +1,47 @@ +// Copyright (c) 2016 Tigera, Inc. All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package k8s + +import ( + "github.com/projectcalico/libcalico-go/lib/errors" + + kerrors "k8s.io/client-go/pkg/api/errors" +) + +// k8sErrorToCalico returns the equivalent libcalico error for the given +// kubernetes error. +func k8sErrorToCalico(ke error, id interface{}) error { + if kerrors.IsAlreadyExists(ke) { + return errors.ErrorResourceAlreadyExists{ + Err: ke, + Identifier: id, + } + } + if kerrors.IsNotFound(ke) { + return errors.ErrorResourceDoesNotExist{ + Err: ke, + Identifier: id, + } + } + if kerrors.IsForbidden(ke) || kerrors.IsUnauthorized(ke) { + return errors.ErrorConnectionUnauthorized{ + Err: ke, + } + } + return errors.ErrorDatastoreError{ + Err: ke, + Identifier: id, + } +}