Skip to content

Commit

Permalink
Populate and prune attested node events (#4527)
Browse files Browse the repository at this point in the history
Signed-off-by: Faisal Memon <fymemon@yahoo.com>
  • Loading branch information
faisal-memon authored Oct 10, 2023
1 parent 144bf61 commit f8c831c
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 13 deletions.
3 changes: 3 additions & 0 deletions pkg/common/telemetry/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,9 @@ const (
// to add clarity
Node = "node"

// NodeEvent functionality related to a node entity or type being created, updated, or deleted
NodeEvent = "node_event"

// Notifier functionality related to some notifying entity; should be used with other tags
// to add clarity
Notifier = "notifier"
Expand Down
12 changes: 12 additions & 0 deletions pkg/common/telemetry/server/datastore/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,15 @@ func StartListRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.Call
func StartPruneRegistrationEntriesEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.RegistrationEntryEvent, telemetry.Prune)
}

// StartListAttestedNodesEventsCall return metric
// for server's datastore, on listing attested node events.
func StartListAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.List)
}

// StartPruneAttestedNodesEventsCall return metric
// for server's datastore, on pruning attested node events.
func StartPruneAttestedNodesEventsCall(m telemetry.Metrics) *telemetry.CallCounter {
return telemetry.StartCall(m, telemetry.Datastore, telemetry.NodeEvent, telemetry.Prune)
}
12 changes: 12 additions & 0 deletions pkg/common/telemetry/server/datastore/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,12 @@ func (w metricsWrapper) ListAttestedNodes(ctx context.Context, req *datastore.Li
return w.ds.ListAttestedNodes(ctx, req)
}

func (w metricsWrapper) ListAttestedNodesEvents(ctx context.Context, req *datastore.ListAttestedNodesEventsRequest) (_ *datastore.ListAttestedNodesEventsResponse, err error) {
callCounter := StartListAttestedNodesEventsCall(w.m)
defer callCounter.Done(&err)
return w.ds.ListAttestedNodesEvents(ctx, req)
}

func (w metricsWrapper) ListBundles(ctx context.Context, req *datastore.ListBundlesRequest) (_ *datastore.ListBundlesResponse, err error) {
callCounter := StartListBundleCall(w.m)
defer callCounter.Done(&err)
Expand Down Expand Up @@ -186,6 +192,12 @@ func (w metricsWrapper) CountRegistrationEntries(ctx context.Context) (_ int32,
return w.ds.CountRegistrationEntries(ctx)
}

func (w metricsWrapper) PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) (err error) {
callCounter := StartPruneAttestedNodesEventsCall(w.m)
defer callCounter.Done(&err)
return w.ds.PruneAttestedNodesEvents(ctx, olderThan)
}

func (w metricsWrapper) PruneBundle(ctx context.Context, trustDomainID string, expiresBefore time.Time) (_ bool, err error) {
callCounter := StartPruneBundleCall(w.m)
defer callCounter.Done(&err)
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/telemetry/server/datastore/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func TestWithMetrics(t *testing.T) {
key: "datastore.node.list",
methodName: "ListAttestedNodes",
},
{
key: "datastore.node_event.list",
methodName: "ListAttestedNodesEvents",
},
{
key: "datastore.bundle.list",
methodName: "ListBundles",
Expand All @@ -146,6 +150,10 @@ func TestWithMetrics(t *testing.T) {
key: "datastore.federation_relationship.list",
methodName: "ListFederationRelationships",
},
{
key: "datastore.node_event.prune",
methodName: "PruneAttestedNodesEvents",
},
{
key: "datastore.bundle.prune",
methodName: "PruneBundle",
Expand Down Expand Up @@ -374,6 +382,10 @@ func (ds *fakeDataStore) ListAttestedNodes(context.Context, *datastore.ListAttes
return &datastore.ListAttestedNodesResponse{}, ds.err
}

func (ds *fakeDataStore) ListAttestedNodesEvents(context.Context, *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) {
return &datastore.ListAttestedNodesEventsResponse{}, ds.err
}

func (ds *fakeDataStore) ListBundles(context.Context, *datastore.ListBundlesRequest) (*datastore.ListBundlesResponse, error) {
return &datastore.ListBundlesResponse{}, ds.err
}
Expand All @@ -390,6 +402,10 @@ func (ds *fakeDataStore) ListRegistrationEntriesEvents(context.Context, *datasto
return &datastore.ListRegistrationEntriesEventsResponse{}, ds.err
}

func (ds *fakeDataStore) PruneAttestedNodesEvents(context.Context, time.Duration) error {
return ds.err
}

func (ds *fakeDataStore) PruneBundle(context.Context, string, time.Time) (bool, error) {
return false, ds.err
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type DataStore interface {
ListAttestedNodes(context.Context, *ListAttestedNodesRequest) (*ListAttestedNodesResponse, error)
UpdateAttestedNode(context.Context, *common.AttestedNode, *common.AttestedNodeMask) (*common.AttestedNode, error)

// Nodes Events
ListAttestedNodesEvents(ctx context.Context, req *ListAttestedNodesEventsRequest) (*ListAttestedNodesEventsResponse, error)
PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) error

// Node selectors
GetNodeSelectors(ctx context.Context, spiffeID string, dataConsistency DataConsistency) ([]*common.Selector, error)
ListNodeSelectors(context.Context, *ListNodeSelectorsRequest) (*ListNodeSelectorsResponse, error)
Expand Down Expand Up @@ -154,6 +158,15 @@ type ListAttestedNodesResponse struct {
Pagination *Pagination
}

type ListAttestedNodesEventsRequest struct {
GreaterThanEventID uint
}

type ListAttestedNodesEventsResponse struct {
SpiffeIDs []string
FirstEventID uint
}

type ListBundlesRequest struct {
Pagination *Pagination
}
Expand Down
91 changes: 86 additions & 5 deletions pkg/server/datastore/sqlstore/sqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,10 @@ func (ds *Plugin) CreateAttestedNode(ctx context.Context, node *common.AttestedN

if err = ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) {
attestedNode, err = createAttestedNode(tx, node)
return err
if err != nil {
return err
}
return createAttestedNodeEvent(tx, node.SpiffeId)
}); err != nil {
return nil, err
}
Expand Down Expand Up @@ -305,7 +308,10 @@ func (ds *Plugin) ListAttestedNodes(ctx context.Context,
func (ds *Plugin) UpdateAttestedNode(ctx context.Context, n *common.AttestedNode, mask *common.AttestedNodeMask) (node *common.AttestedNode, err error) {
if err = ds.withReadModifyWriteTx(ctx, func(tx *gorm.DB) (err error) {
node, err = updateAttestedNode(tx, n, mask)
return err
if err != nil {
return err
}
return createAttestedNodeEvent(tx, n.SpiffeId)
}); err != nil {
return nil, err
}
Expand All @@ -316,13 +322,35 @@ func (ds *Plugin) UpdateAttestedNode(ctx context.Context, n *common.AttestedNode
func (ds *Plugin) DeleteAttestedNode(ctx context.Context, spiffeID string) (attestedNode *common.AttestedNode, err error) {
if err = ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) {
attestedNode, err = deleteAttestedNodeAndSelectors(tx, spiffeID)
return err
if err != nil {
return err
}
return createAttestedNodeEvent(tx, spiffeID)
}); err != nil {
return nil, err
}
return attestedNode, nil
}

// ListAttestedNodesEvents lists all attested node events
func (ds *Plugin) ListAttestedNodesEvents(ctx context.Context, req *datastore.ListAttestedNodesEventsRequest) (resp *datastore.ListAttestedNodesEventsResponse, err error) {
if err = ds.withReadTx(ctx, func(tx *gorm.DB) (err error) {
resp, err = listAttestedNodesEvents(tx, req)
return err
}); err != nil {
return nil, err
}
return resp, nil
}

// PruneAttestedNodesEvents deletes all attested node events older than a specified duration (i.e. more than 24 hours old)
func (ds *Plugin) PruneAttestedNodesEvents(ctx context.Context, olderThan time.Duration) (err error) {
return ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) {
err = pruneAttestedNodesEvents(tx, olderThan)
return err
})
}

// SetNodeSelectors sets node (agent) selectors by SPIFFE ID, deleting old selectors first
func (ds *Plugin) SetNodeSelectors(ctx context.Context, spiffeID string, selectors []*common.Selector) (err error) {
return ds.withWriteTx(ctx, func(tx *gorm.DB) (err error) {
Expand Down Expand Up @@ -1418,6 +1446,57 @@ func listAttestedNodes(ctx context.Context, db *sqlDB, log logrus.FieldLogger, r
}
}

func createAttestedNodeEvent(tx *gorm.DB, spiffeID string) error {
if !fflag.IsSet(fflag.FlagEventsBasedCache) {
return nil
}

newAttestedNodeEvent := AttestedNodeEvent{
SpiffeID: spiffeID,
}

if err := tx.Create(&newAttestedNodeEvent).Error; err != nil {
return sqlError.Wrap(err)
}

return nil
}

func listAttestedNodesEvents(tx *gorm.DB, req *datastore.ListAttestedNodesEventsRequest) (*datastore.ListAttestedNodesEventsResponse, error) {
if !fflag.IsSet(fflag.FlagEventsBasedCache) {
return &datastore.ListAttestedNodesEventsResponse{}, nil
}

var events []AttestedNodeEvent
if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}

resp := &datastore.ListAttestedNodesEventsResponse{
SpiffeIDs: make([]string, 0, len(events)),
}
for _, event := range events {
resp.SpiffeIDs = append(resp.SpiffeIDs, event.SpiffeID)
}
if len(events) > 0 {
resp.FirstEventID = events[0].ID
}

return resp, nil
}

func pruneAttestedNodesEvents(tx *gorm.DB, olderThan time.Duration) error {
if !fflag.IsSet(fflag.FlagEventsBasedCache) {
return nil
}

if err := tx.Where("created_at < ?", time.Now().Add(-olderThan)).Delete(&AttestedNodeEvent{}).Error; err != nil {
return sqlError.Wrap(err)
}

return nil
}

// filterNodesBySelectorSet filters nodes based on provided selectors
func filterNodesBySelectorSet(nodes []*common.AttestedNode, selectors []*common.Selector) []*common.AttestedNode {
type selectorKey struct {
Expand Down Expand Up @@ -3654,11 +3733,13 @@ func listRegistrationEntriesEvents(tx *gorm.DB, req *datastore.ListRegistrationE
}

var events []RegisteredEntryEvent
if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Error; err != nil {
if err := tx.Find(&events, "id > ?", req.GreaterThanEventID).Order("id asc").Error; err != nil {
return nil, sqlError.Wrap(err)
}

resp := &datastore.ListRegistrationEntriesEventsResponse{}
resp := &datastore.ListRegistrationEntriesEventsResponse{
EntryIDs: make([]string, 0, len(events)),
}
for _, event := range events {
resp.EntryIDs = append(resp.EntryIDs, event.EntryID)
}
Expand Down
Loading

0 comments on commit f8c831c

Please sign in to comment.