From f8c831c1c59729a591753fcd68f96f858020e467 Mon Sep 17 00:00:00 2001 From: Faisal Memon Date: Tue, 10 Oct 2023 11:34:51 -0700 Subject: [PATCH] Populate and prune attested node events (#4527) Signed-off-by: Faisal Memon --- pkg/common/telemetry/names.go | 3 + .../telemetry/server/datastore/event.go | 12 ++ .../telemetry/server/datastore/wrapper.go | 12 ++ .../server/datastore/wrapper_test.go | 16 ++ pkg/server/datastore/datastore.go | 13 ++ pkg/server/datastore/sqlstore/sqlstore.go | 91 ++++++++++- .../datastore/sqlstore/sqlstore_test.go | 142 +++++++++++++++++- pkg/server/endpoints/endpoints.go | 5 +- test/fakes/fakedatastore/fakedatastore.go | 14 ++ 9 files changed, 295 insertions(+), 13 deletions(-) diff --git a/pkg/common/telemetry/names.go b/pkg/common/telemetry/names.go index 691f531055..5ea0bcb4af 100644 --- a/pkg/common/telemetry/names.go +++ b/pkg/common/telemetry/names.go @@ -672,6 +672,9 @@ const ( // to add clarity Node = "node" + // NodeEvent functionality related to a node entity or type being created, updated, or deleted + NodeEvent = "node_event" + // Notifier functionality related to some notifying entity; should be used with other tags // to add clarity Notifier = "notifier" diff --git a/pkg/common/telemetry/server/datastore/event.go b/pkg/common/telemetry/server/datastore/event.go index 7b6dac31ce..07418b68f0 100644 --- a/pkg/common/telemetry/server/datastore/event.go +++ b/pkg/common/telemetry/server/datastore/event.go @@ -15,3 +15,15 @@ func StartListRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.Call func StartPruneRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.CallCounter { return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Prune) } + +// StartListAttestedNodesEventsCall return metric +// for server's datastore, on listing attested node events. +func StartListAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCounter { + return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.List) +} + +// StartPruneAttestedNodesEventsCall return metric +// for server's datastore, on pruning attested node events. +func StartPruneAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCounter { + return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Prune) +} diff --git a/pkg/common/telemetry/server/datastore/wrapper.go b/pkg/common/telemetry/server/datastore/wrapper.go index fc38e4fda2..830cca6058 100644 --- a/pkg/common/telemetry/server/datastore/wrapper.go +++ b/pkg/common/telemetry/server/datastore/wrapper.go @@ -144,6 +144,12 @@ func (w metricsWrapper) ListAttestedNodes(ctx context.Context, req *datastore.Li return w.ds.ListAttestedNodes(ctx, req) } +func (w metricsWrapper) ListAttestedNodesEvents(ctx context.Context, req *datastore.ListAttestedNodesEventsRequest) (_ *datastore.ListAttestedNodesEventsResponse, err error) { + callCounter := StartListAttestedNodesEventsCall(w.m) + defer callCounter.Done(&err) + return w.ds.ListAttestedNodesEvents(ctx, req) +} + func (w metricsWrapper) ListBundles(ctx context.Context, req *datastore.ListBundlesRequest) (_ *datastore.ListBundlesResponse, err error) { callCounter := StartListBundleCall(w.m) defer callCounter.Done(&err) @@ -186,6 +192,12 @@ func (w metricsWrapper) CountRegistrationEntries(ctx context.Context) (_ int32, return w.ds.CountRegistrationEntries(ctx) } +func (w metricsWrapper) PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) (err error) { + callCounter := StartPruneAttestedNodesEventsCall(w.m) + defer callCounter.Done(&err) + return w.ds.PruneAttestedNodesEvents(ctx, olderThan) +} + func (w metricsWrapper) PruneBundle(ctx context.Context, trustDomainID string, expiresBefore time.Time) (_ bool, err error) { callCounter := StartPruneBundleCall(w.m) defer callCounter.Done(&err) diff --git a/pkg/common/telemetry/server/datastore/wrapper_test.go b/pkg/common/telemetry/server/datastore/wrapper_test.go index 3bac96b450..bda355b937 100644 --- a/pkg/common/telemetry/server/datastore/wrapper_test.go +++ b/pkg/common/telemetry/server/datastore/wrapper_test.go @@ -126,6 +126,10 @@ func TestWithMetrics(t *testing.T) { key: "datastore.node.list", methodName: "ListAttestedNodes", }, + { + key: "datastore.node_event.list", + methodName: "ListAttestedNodesEvents", + }, { key: "datastore.bundle.list", methodName: "ListBundles", @@ -146,6 +150,10 @@ func TestWithMetrics(t *testing.T) { key: "datastore.federation_relationship.list", methodName: "ListFederationRelationships", }, + { + key: "datastore.node_event.prune", + methodName: "PruneAttestedNodesEvents", + }, { key: "datastore.bundle.prune", methodName: "PruneBundle", @@ -374,6 +382,10 @@ func (ds *fakeDataStore) ListAttestedNodes(context.Context, *datastore.ListAttes return &datastore.ListAttestedNodesResponse{}, ds.err } +func (ds *fakeDataStore) ListAttestedNodesEvents(context.Context, *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) { + return &datastore.ListAttestedNodesEventsResponse{}, ds.err +} + func (ds *fakeDataStore) ListBundles(context.Context, *datastore.ListBundlesRequest) (*datastore.ListBundlesResponse, error) { return &datastore.ListBundlesResponse{}, ds.err } @@ -390,6 +402,10 @@ func (ds *fakeDataStore) ListRegistrationEntriesEvents(context.Context, *datasto return &datastore.ListRegistrationEntriesEventsResponse{}, ds.err } +func (ds *fakeDataStore) PruneAttestedNodesEvents(context.Context, time.Duration) error { + return ds.err +} + func (ds *fakeDataStore) PruneBundle(context.Context, string, time.Time) (bool, error) { return false, ds.err } diff --git a/pkg/server/datastore/datastore.go b/pkg/server/datastore/datastore.go index 623d810195..e249fded6b 100644 --- a/pkg/server/datastore/datastore.go +++ b/pkg/server/datastore/datastore.go @@ -52,6 +52,10 @@ type DataStore interface { ListAttestedNodes(context.Context, *ListAttestedNodesRequest) (*ListAttestedNodesResponse, error) UpdateAttestedNode(context.Context, *common.AttestedNode, *common.AttestedNodeMask) (*common.AttestedNode, error) + // Nodes Events + ListAttestedNodesEvents(ctx context.Context, req *ListAttestedNodesEventsRequest) (*ListAttestedNodesEventsResponse, error) + PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) error + // Node selectors GetNodeSelectors(ctx context.Context, spiffeID string, dataConsistency DataConsistency) ([]*common.Selector, error) ListNodeSelectors(context.Context, *ListNodeSelectorsRequest) (*ListNodeSelectorsResponse, error) @@ -154,6 +158,15 @@ type ListAttestedNodesResponse struct { Pagination *Pagination } +type ListAttestedNodesEventsRequest struct { + GreaterThanEventID uint +} + +type ListAttestedNodesEventsResponse struct { + SpiffeIDs []string + FirstEventID uint +} + type ListBundlesRequest struct { Pagination *Pagination } diff --git a/pkg/server/datastore/sqlstore/sqlstore.go b/pkg/server/datastore/sqlstore/sqlstore.go index 03edaa37fc..bc3e97fd6a 100644 --- a/pkg/server/datastore/sqlstore/sqlstore.go +++ b/pkg/server/datastore/sqlstore/sqlstore.go @@ -258,7 +258,10 @@ func (ds *Plugin) CreateAttestedNode(ctx context.Context, node *common.AttestedN if err = ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) { attestedNode, err = createAttestedNode(tx, node) - return err + if err != nil { + return err + } + return createAttestedNodeEvent(tx, node.SpiffeId) }); err != nil { return nil, err } @@ -305,7 +308,10 @@ func (ds *Plugin) ListAttestedNodes(ctx context.Context, func (ds *Plugin) UpdateAttestedNode(ctx context.Context, n *common.AttestedNode, mask *common.AttestedNodeMask) (node *common.AttestedNode, err error) { if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) { node, err = updateAttestedNode(tx, n, mask) - return err + if err != nil { + return err + } + return createAttestedNodeEvent(tx, n.SpiffeId) }); err != nil { return nil, err } @@ -316,13 +322,35 @@ func (ds *Plugin) UpdateAttestedNode(ctx context.Context, n *common.AttestedNode func (ds *Plugin) DeleteAttestedNode(ctx context.Context, spiffeID string) (attestedNode *common.AttestedNode, err error) { if err = ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) { attestedNode, err = deleteAttestedNodeAndSelectors(tx, spiffeID) - return err + if err != nil { + return err + } + return createAttestedNodeEvent(tx, spiffeID) }); err != nil { return nil, err } return attestedNode, nil } +// ListAttestedNodesEvents lists all attested node events +func (ds *Plugin) ListAttestedNodesEvents(ctx context.Context, req *datastore.ListAttestedNodesEventsRequest) (resp *datastore.ListAttestedNodesEventsResponse, err error) { + if err = ds.withReadTx(ctx, func(tx *gorm.DB) (err error) { + resp, err = listAttestedNodesEvents(tx, req) + return err + }); err != nil { + return nil, err + } + return resp, nil +} + +// PruneAttestedNodesEvents deletes all attested node events older than a specified duration (i.e. more than 24 hours old) +func (ds *Plugin) PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) (err error) { + return ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) { + err = pruneAttestedNodesEvents(tx, olderThan) + return err + }) +} + // SetNodeSelectors sets node (agent) selectors by SPIFFE ID, deleting old selectors first func (ds *Plugin) SetNodeSelectors(ctx context.Context, spiffeID string, selectors []*common.Selector) (err error) { return ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) { @@ -1418,6 +1446,57 @@ func listAttestedNodes(ctx context.Context, db *sqlDB, log logrus.FieldLogger, r } } +func createAttestedNodeEvent(tx *gorm.DB, spiffeID string) error { + if !fflag.IsSet(fflag.FlagEventsBasedCache) { + return nil + } + + newAttestedNodeEvent := AttestedNodeEvent{ + SpiffeID: spiffeID, + } + + if err := tx.Create(&newAttestedNodeEvent).Error; err != nil { + return sqlError.Wrap(err) + } + + return nil +} + +func listAttestedNodesEvents(tx *gorm.DB, req *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) { + if !fflag.IsSet(fflag.FlagEventsBasedCache) { + return &datastore.ListAttestedNodesEventsResponse{}, nil + } + + var events []AttestedNodeEvent + if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil { + return nil, sqlError.Wrap(err) + } + + resp := &datastore.ListAttestedNodesEventsResponse{ + SpiffeIDs: make([]string, 0, len(events)), + } + for _, event := range events { + resp.SpiffeIDs = append(resp.SpiffeIDs, event.SpiffeID) + } + if len(events) > 0 { + resp.FirstEventID = events[0].ID + } + + return resp, nil +} + +func pruneAttestedNodesEvents(tx *gorm.DB, olderThan time.Duration) error { + if !fflag.IsSet(fflag.FlagEventsBasedCache) { + return nil + } + + if err := tx.Where("created_at < ?", time.Now().Add(-olderThan)).Delete(&AttestedNodeEvent{}).Error; err != nil { + return sqlError.Wrap(err) + } + + return nil +} + // filterNodesBySelectorSet filters nodes based on provided selectors func filterNodesBySelectorSet(nodes []*common.AttestedNode, selectors []*common.Selector) []*common.AttestedNode { type selectorKey struct { @@ -3654,11 +3733,13 @@ func listRegistrationEntriesEvents(tx *gorm.DB, req *datastore.ListRegistrationE } var events []RegisteredEntryEvent - if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Error; err != nil { + if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil { return nil, sqlError.Wrap(err) } - resp := &datastore.ListRegistrationEntriesEventsResponse{} + resp := &datastore.ListRegistrationEntriesEventsResponse{ + EntryIDs: make([]string, 0, len(events)), + } for _, event := range events { resp.EntryIDs = append(resp.EntryIDs, event.EntryID) } diff --git a/pkg/server/datastore/sqlstore/sqlstore_test.go b/pkg/server/datastore/sqlstore/sqlstore_test.go index 6485997f21..f9524ff1b5 100644 --- a/pkg/server/datastore/sqlstore/sqlstore_test.go +++ b/pkg/server/datastore/sqlstore/sqlstore_test.go @@ -1451,6 +1451,134 @@ func (s *PluginSuite) TestDeleteAttestedNode() { }) } +func (s *PluginSuite) TestListAttestedNodesEvents() { + var expectedSpiffeIDs []string + + // Create an attested node + node1, err := s.ds.CreateAttestedNode(ctx, &common.AttestedNode{ + SpiffeId: "foo", + AttestationDataType: "aws-tag", + CertSerialNumber: "badcafe", + CertNotAfter: time.Now().Add(time.Hour).Unix(), + }) + s.Require().NoError(err) + expectedSpiffeIDs = append(expectedSpiffeIDs, node1.SpiffeId) + + resp, err := s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) + s.Require().NoError(err) + s.Require().Equal(expectedSpiffeIDs, resp.SpiffeIDs) + + // Create second attested node + node2, err := s.ds.CreateAttestedNode(ctx, &common.AttestedNode{ + SpiffeId: "bar", + AttestationDataType: "aws-tag", + CertSerialNumber: "badcafe", + CertNotAfter: time.Now().Add(time.Hour).Unix(), + }) + s.Require().NoError(err) + expectedSpiffeIDs = append(expectedSpiffeIDs, node2.SpiffeId) + + resp, err = s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) + s.Require().NoError(err) + s.Require().Equal(expectedSpiffeIDs, resp.SpiffeIDs) + + // Update first attested node + updatedNode, err := s.ds.UpdateAttestedNode(ctx, node1, nil) + s.Require().NoError(err) + expectedSpiffeIDs = append(expectedSpiffeIDs, updatedNode.SpiffeId) + + resp, err = s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) + s.Require().NoError(err) + s.Require().Equal(expectedSpiffeIDs, resp.SpiffeIDs) + + // Delete second atttested node + deletedNode, err := s.ds.DeleteAttestedNode(ctx, node2.SpiffeId) + s.Require().NoError(err) + expectedSpiffeIDs = append(expectedSpiffeIDs, deletedNode.SpiffeId) + + resp, err = s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) + s.Require().NoError(err) + s.Require().Equal(expectedSpiffeIDs, resp.SpiffeIDs) + + // Check filtering events by id + tests := []struct { + name string + greaterThanEventID uint + expectedSpiffeIDs []string + expectedFirstEventID uint + }{ + { + name: "All Events", + greaterThanEventID: 0, + expectedFirstEventID: 1, + expectedSpiffeIDs: []string{node1.SpiffeId, node2.SpiffeId, node1.SpiffeId, node2.SpiffeId}, + }, + { + name: "Half of the Events", + greaterThanEventID: 2, + expectedFirstEventID: 3, + expectedSpiffeIDs: []string{node1.SpiffeId, node2.SpiffeId}, + }, + { + name: "None of the Events", + greaterThanEventID: 4, + expectedFirstEventID: 0, + expectedSpiffeIDs: []string{}, + }, + } + for _, test := range tests { + s.T().Run(test.name, func(t *testing.T) { + resp, err = s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{ + GreaterThanEventID: test.greaterThanEventID, + }) + s.Require().NoError(err) + s.Require().Equal(test.expectedFirstEventID, resp.FirstEventID) + s.Require().Equal(test.expectedSpiffeIDs, resp.SpiffeIDs) + }) + } +} + +func (s *PluginSuite) TestPruneAttestedNodesEvents() { + node, err := s.ds.CreateAttestedNode(ctx, &common.AttestedNode{ + SpiffeId: "foo", + AttestationDataType: "aws-tag", + CertSerialNumber: "badcafe", + CertNotAfter: time.Now().Add(time.Hour).Unix(), + }) + s.Require().NoError(err) + + resp, err := s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) + s.Require().NoError(err) + s.Require().Equal(node.SpiffeId, resp.SpiffeIDs[0]) + + for _, tt := range []struct { + name string + olderThan time.Duration + expectedSpiffeIDs []string + }{ + { + name: "Don't prune valid events", + olderThan: 1 * time.Hour, + expectedSpiffeIDs: []string{node.SpiffeId}, + }, + { + name: "Prune old events", + olderThan: 0 * time.Second, + expectedSpiffeIDs: []string{}, + }, + } { + s.T().Run(tt.name, func(t *testing.T) { + s.Require().Eventuallyf(func() bool { + err = s.ds.PruneAttestedNodesEvents(ctx, tt.olderThan) + s.Require().NoError(err) + resp, err := s.ds.ListAttestedNodesEvents(ctx, &datastore.ListAttestedNodesEventsRequest{}) + s.Require().NoError(err) + return reflect.DeepEqual(tt.expectedSpiffeIDs, resp.SpiffeIDs) + }, 10*time.Second, 50*time.Millisecond, "Failed to prune entries correctly") + }) + } +} + func (s *PluginSuite) TestNodeSelectors() { foo1 := []*common.Selector{ {Type: "FOO1", Value: "1"}, @@ -3791,25 +3919,25 @@ func (s *PluginSuite) TestListRegistrationEntriesEvents() { name string greaterThanEventID uint expectedEntryIDs []string - expectedFirstEntryID uint + expectedFirstEventID uint }{ { name: "All Events", greaterThanEventID: 0, - expectedFirstEntryID: 1, + expectedFirstEventID: 1, expectedEntryIDs: []string{entry1.EntryId, entry2.EntryId, entry1.EntryId, entry2.EntryId}, }, { name: "Half of the Events", greaterThanEventID: 2, - expectedFirstEntryID: 3, + expectedFirstEventID: 3, expectedEntryIDs: []string{entry1.EntryId, entry2.EntryId}, }, { name: "None of the Events", greaterThanEventID: 4, - expectedFirstEntryID: 0, - expectedEntryIDs: nil, + expectedFirstEventID: 0, + expectedEntryIDs: []string{}, }, } for _, test := range tests { @@ -3818,7 +3946,7 @@ func (s *PluginSuite) TestListRegistrationEntriesEvents() { GreaterThanEventID: test.greaterThanEventID, }) s.Require().NoError(err) - s.Require().Equal(test.expectedFirstEntryID, resp.FirstEventID) + s.Require().Equal(test.expectedFirstEventID, resp.FirstEventID) s.Require().Equal(test.expectedEntryIDs, resp.EntryIDs) }) } @@ -3851,7 +3979,7 @@ func (s *PluginSuite) TestPruneRegistrationEntriesEvents() { { name: "Prune old events", olderThan: 0 * time.Second, - expectedEntryIDs: nil, + expectedEntryIDs: []string{}, }, } { s.T().Run(tt.name, func(t *testing.T) { diff --git a/pkg/server/endpoints/endpoints.go b/pkg/server/endpoints/endpoints.go index e4502b7da9..bb316c0f21 100644 --- a/pkg/server/endpoints/endpoints.go +++ b/pkg/server/endpoints/endpoints.go @@ -117,7 +117,10 @@ func New(ctx context.Context, c Config) (*Endpoints, error) { pruneEventsFn := func(ctx context.Context, olderThan time.Duration) error { ds := c.Catalog.GetDataStore() - return ds.PruneRegistrationEntriesEvents(ctx, olderThan) + if err := ds.PruneRegistrationEntriesEvents(ctx, olderThan); err != nil { + return err + } + return ds.PruneAttestedNodesEvents(ctx, olderThan) } if c.CacheReloadInterval == 0 { diff --git a/test/fakes/fakedatastore/fakedatastore.go b/test/fakes/fakedatastore/fakedatastore.go index 6cf18b8081..d89ad6acc9 100644 --- a/test/fakes/fakedatastore/fakedatastore.go +++ b/test/fakes/fakedatastore/fakedatastore.go @@ -163,6 +163,20 @@ func (s *DataStore) DeleteAttestedNode(ctx context.Context, spiffeID string) (*c return s.ds.DeleteAttestedNode(ctx, spiffeID) } +func (s *DataStore) ListAttestedNodesEvents(ctx context.Context, req *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) { + if err := s.getNextError(); err != nil { + return nil, err + } + return s.ds.ListAttestedNodesEvents(ctx, req) +} + +func (s *DataStore) PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) error { + if err := s.getNextError(); err != nil { + return err + } + return s.ds.PruneAttestedNodesEvents(ctx, olderThan) +} + func (s *DataStore) TaintX509CA(ctx context.Context, trustDomainID string, publicKeyToTaint crypto.PublicKey) error { if err := s.getNextError(); err != nil { return err