Skip to content

Commit

Permalink
Merge pull request #67 from kubescape/nodes
Browse files Browse the repository at this point in the history
add logic to sync nodes and avoid too frequent updates
  • Loading branch information
matthyx authored Mar 1, 2024
2 parents f39ea81 + 259d579 commit 4a38bcf
Show file tree
Hide file tree
Showing 9 changed files with 1,414 additions and 32 deletions.
40 changes: 32 additions & 8 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"regexp"
"slices"
"strconv"
"strings"
Expand All @@ -31,6 +32,13 @@ import (

const envMultiplier = "EVENT_MULTIPLIER"

var fieldsToRemove = map[string][][]string{
"default": {},
"/v1/nodes": {{"status", "conditions"}},
}

var emptyPatch = regexp.MustCompile(`\{"metadata":\{"resourceVersion":"(\d+)"\}\}`)

type BatchProcessingFunc func(context.Context, *Client, domain.BatchItems) error

// resourceVersionGetter is an interface used to get resource version from events.
Expand Down Expand Up @@ -282,6 +290,10 @@ func (c *Client) callPutOrPatch(ctx context.Context, id domain.KindName, baseObj
if err != nil {
return fmt.Errorf("create merge patch: %w", err)
}
// skip patch containing only resource version
if emptyPatch.Match(patch) {
return nil
}
err = c.callbacks.PatchObject(ctx, id, checksum, patch)
if err != nil {
return fmt.Errorf("send patch object: %w", err)
Expand Down Expand Up @@ -329,7 +341,7 @@ func (c *Client) GetObject(ctx context.Context, id domain.KindName, baseObject [
if err != nil {
return fmt.Errorf("get resource: %w", err)
}
newObject, err := utils.FilterAndMarshal(obj)
newObject, err := c.filterAndMarshal(obj)
if err != nil {
return fmt.Errorf("marshal resource: %w", err)
}
Expand All @@ -353,7 +365,7 @@ func (c *Client) patchObject(ctx context.Context, id domain.KindName, checksum s
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
object, err := utils.FilterAndMarshal(obj)
object, err := c.filterAndMarshal(obj)
if err != nil {
return nil, fmt.Errorf("marshal resource: %w", err)
}
Expand Down Expand Up @@ -421,7 +433,7 @@ func (c *Client) verifyObject(id domain.KindName, newChecksum string) ([]byte, e
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
object, err := utils.FilterAndMarshal(obj)
object, err := c.filterAndMarshal(obj)
if err != nil {
return nil, fmt.Errorf("marshal resource: %w", err)
}
Expand Down Expand Up @@ -456,7 +468,7 @@ func (c *Client) getExistingStorageObjects(ctx context.Context) (string, error)
if c.multiplier > 0 {
c.multiplyVerifyObject(ctx, id, obj)
} else {
newObject, err := utils.FilterAndMarshal(obj)
newObject, err := c.filterAndMarshal(obj)
if err != nil {
logger.L().Ctx(ctx).Error("cannot marshal object", helpers.Error(err), helpers.String("id", id.String()))
continue
Expand Down Expand Up @@ -485,7 +497,7 @@ func (c *Client) multiplyVerifyObject(ctx context.Context, id domain.KindName, o
obj.SetLabels(labels)
}
}
newObject, err := utils.FilterAndMarshal(obj)
newObject, err := c.filterAndMarshal(obj)
if err != nil {
logger.L().Ctx(ctx).Error("cannot marshal object", helpers.Error(err), helpers.String("id", id.String()))
continue
Expand All @@ -498,15 +510,27 @@ func (c *Client) multiplyVerifyObject(ctx context.Context, id domain.KindName, o
}
}

func (c *Client) filterAndMarshal(d *unstructured.Unstructured) ([]byte, error) {
utils.RemoveManagedFields(d)
fields, ok := fieldsToRemove[c.kind.String()]
if !ok {
fields = fieldsToRemove["default"]
}
if err := utils.RemoveSpecificFields(d, fields); err != nil {
return nil, fmt.Errorf("remove specific fields: %w", err)
}
return d.MarshalJSON()
}

func (c *Client) getObjectFromUnstructured(d *unstructured.Unstructured) ([]byte, error) {
if c.res.Group == "spdx.softwarecomposition.kubescape.io" {
obj, err := c.getResource(d.GetNamespace(), d.GetName())
if err != nil {
return nil, fmt.Errorf("get resource: %w", err)
}
return utils.FilterAndMarshal(obj)
return c.filterAndMarshal(obj)
}
return utils.FilterAndMarshal(d)
return c.filterAndMarshal(d)
}

// Batch processing functions
Expand Down Expand Up @@ -612,7 +636,7 @@ func reconcileBatchProcessingFunc(ctx context.Context, c *Client, items domain.B
ResourceVersion: resourceVersion,
}

newObject, marshalErr := utils.FilterAndMarshal(&item)
newObject, marshalErr := c.filterAndMarshal(&item)
if marshalErr != nil {
err = multierr.Append(err, fmt.Errorf("marshal resource: %w", marshalErr))
continue
Expand Down
76 changes: 74 additions & 2 deletions adapters/incluster/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package incluster

import (
"context"
"testing"
"time"

"github.com/kinbiko/jsonassert"
"github.com/kubescape/synchronizer/domain"
"github.com/kubescape/synchronizer/utils"
"github.com/stretchr/testify/assert"
Expand All @@ -18,8 +22,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/utils/ptr"
"testing"
"time"
)

var (
Expand Down Expand Up @@ -123,3 +125,73 @@ func TestClient_watchRetry(t *testing.T) {
})
}
}

func TestClient_filterAndMarshal(t *testing.T) {
type fields struct {
client dynamic.Interface
account string
cluster string
kind *domain.Kind
multiplier int
callbacks domain.Callbacks
res schema.GroupVersionResource
ShadowObjects map[string][]byte
Strategy domain.Strategy
batchProcessingFunc map[domain.BatchType]BatchProcessingFunc
}
tests := []struct {
name string
fields fields
obj *unstructured.Unstructured
want []byte
wantErr bool
}{
{
name: "filter pod (no modifications)",
fields: fields{
kind: domain.KindFromString(context.TODO(), "/v1/pods"),
},
obj: utils.FileToUnstructured("../../../utils/testdata/pod.json"),
want: utils.FileContent("../../../utils/testdata/pod.json"),
},
{
name: "filter node",
fields: fields{
kind: domain.KindFromString(context.TODO(), "/v1/nodes"),
},
obj: utils.FileToUnstructured("../../../utils/testdata/node.json"),
want: utils.FileContent("testdata/nodeFiltered.json"),
},
{
name: "filter networkPolicy",
fields: fields{
kind: domain.KindFromString(context.TODO(), "networking.k8s.io/v1/NetworkPolicy"),
},
obj: utils.FileToUnstructured("../../../utils/testdata/networkPolicy.json"),
want: utils.FileContent("../../../utils/testdata/networkPolicyCleaned.json"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
client: tt.fields.client,
account: tt.fields.account,
cluster: tt.fields.cluster,
kind: tt.fields.kind,
multiplier: tt.fields.multiplier,
callbacks: tt.fields.callbacks,
res: tt.fields.res,
ShadowObjects: tt.fields.ShadowObjects,
Strategy: tt.fields.Strategy,
batchProcessingFunc: tt.fields.batchProcessingFunc,
}
got, err := c.filterAndMarshal(tt.obj)
if (err != nil) != tt.wantErr {
t.Errorf("filterAndMarshal() error = %v, wantErr %v", err, tt.wantErr)
return
}
ja := jsonassert.New(t)
ja.Assertf(string(got), string(tt.want))
})
}
}
Loading

0 comments on commit 4a38bcf

Please sign in to comment.