Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: SudhanshuBawane <sudhanshu.bawane.ctr@sumologic.com>
  • Loading branch information
SudhanshuBawane committed Feb 29, 2024
1 parent 6238877 commit 02d1133
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 18 deletions.
9 changes: 7 additions & 2 deletions backend/agentd/agentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,13 @@ func (a *Agentd) handleUserEvent(event store.WatchEventUserConfig) error {
if event.User == nil {
return errors.New("nil entry received from the user config watcher")
}
a.bus.Publish("userChanges", event.User.Username)

topic := messaging.UserConfigTopic(event.User.GetMetadata().Namespace, event.User.GetMetadata().Name)
if err := a.bus.Publish(topic, &event); err != nil {
return err
}
//a.bus.Publish("userChanges", event.User.Username)
logger.WithField("topic", topic).
Debug("successfully published an user config update to the bus")
return nil
}

Expand Down
140 changes: 133 additions & 7 deletions backend/agentd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
)

const (
UserNotFound = "not found"

deletedEventSentinel = -1

// Time to wait before force close on connection.
Expand Down Expand Up @@ -227,6 +229,10 @@ func NewSession(ctx context.Context, cfg SessionConfig) (*Session, error) {
subscriptions: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
},
userConfig: &userConfig{
subscription: make(chan messaging.Subscription, 1),
updatesChannel: make(chan interface{}, 10),
},
}

// Optionally subscribe to burial notifications
Expand Down Expand Up @@ -356,17 +362,65 @@ func (s *Session) sender() {
for {
var msg *transport.Message
select {
//2608 ---- user -----
//---- user -----//
case u := <-s.userReceiver.ch:
user, ok := u.(corev2.User)
user, ok := u.(*corev2.User)
if !ok {

logger.WithField("key", ok)
}

if user.Disabled && user.Username == s.user {
return
}
// -----entity -------

//case u := <-s.userConfig.updatesChannel:
// watchEvent, ok := u.(*store.WatchEventUserConfig)
// fmt.Println("========== usrConfig Updates ========", watchEvent)
// if !ok {
// logger.Errorf("session received unexoected struct : %T", u)
// continue
// }
//
// if watchEvent.User.Disabled && watchEvent.User.Username == s.user {
// return
// }
// //fmt.Println("========== usrConfig Updates ========", watchEvent)
////// Handle the delete/disable event
////switch watchEvent.Action {
////case store.WatchDelete:
//// return
////}
//
//if watchEvent.User == nil {
// logger.Error("session received nil user in watch event")
//}
////
//lagger := logger.WithFields(logrus.Fields{
// "action": watchEvent.Action.String(),
// "user": watchEvent.User.GetMetadata().Name,
// "namespace": watchEvent.User.GetMetadata().Namespace,
//})
//lagger.Debug("user update received")
//
//configReq := storev2.NewResourceRequestFromV2Resource(s.ctx, watchEvent.User)
//wrapper, err := storev2.WrapResource(watchEvent.User)
//if err != nil {
// lagger.WithError(err).Error("could not warp the user config")
// continue
//}
//
//if err := s.storev2.CreateOrUpdate(configReq, wrapper); err != nil {
// sessionErrorCounter.WithLabelValues(err.Error()).Inc()
// lagger.WithError(err).Error("could not update the user config")
//}

//bytes, err := s.marshal(watchEvent.User)
//if err != nil {
// lagger.WithError(err).Error("session failed to serialize user config")
//}
//msg = transport.NewMessage(transport.MessageTypeUserConfig, bytes)

// ---- entity ----//
case e := <-s.entityConfig.updatesChannel:
watchEvent, ok := e.(*store.WatchEventEntityConfig)
if !ok {
Expand Down Expand Up @@ -495,6 +549,8 @@ func (s *Session) sender() {
// 3. Start goroutine that waits for context cancellation, and shuts down service.
func (s *Session) Start() (err error) {
defer close(s.entityConfig.subscriptions)
defer close(s.userConfig.subscription)

sessionCounter.WithLabelValues(s.cfg.Namespace).Inc()
s.wg = &sync.WaitGroup{}
s.wg.Add(2)
Expand All @@ -518,21 +574,84 @@ func (s *Session) Start() (err error) {
"namespace": s.cfg.Namespace,
})

// Subscribe the agent to its entity_config topic
// Subscribe the agent to its entity_config and user_config topic
topic := messaging.EntityConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
userTopic := messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName)
lager.WithField("topic", topic).Debug("subscribing to topic")
logger.WithField("topic", userTopic).Debug("subscribing to topic")
// Get a unique name for the agent, which will be used as the consumer of the
// bus, in order to avoid problems with an agent reconnecting before its
// session is ended
agentName := agentUUID(s.cfg.Namespace, s.cfg.AgentName)

// Determine if user already exits
userSubscription, usrErr := s.bus.Subscribe(userTopic, agentName, s.userConfig)
if usrErr != nil {
lager.WithError(err).Error("error starting subscription")
return err
}
s.userConfig.subscription <- userSubscription
usrReq := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev2.User{}).StoreName())
usrWrapper, err := s.storev2.Get(usrReq)
if err != nil {
// Just exit but don't send error about absence of user config
var errNotFound *store.ErrNotFound
if !errors.As(err, &errNotFound) {
lager.WithError(err).Error("error querying the user config")
return err
}
lager.Debug("no user config found")

// Indicate to the agent that this user does not exist
meta := corev2.NewObjectMeta(UserNotFound, s.cfg.Namespace)
watchEvent := &store.WatchEventUserConfig{
User: &corev2.User{
Username: s.user,
},
Action: store.WatchCreate,
Metadata: &meta,
}
err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent)
if err != nil {
lager.WithError(err).Error("error publishing user config")
return err
}
} else {
// A user config already exists, therefore we should the stored user subscriptions
// rather than what the agent provided us for the subscriptions
lager.Debug("an user config was found")

var storedUserConfig corev2.User
err = usrWrapper.UnwrapInto(&storedUserConfig)
if err != nil {
lager.WithError(err).Error("error unwrapping user config")
return err
}

// Remove the managed_by label if the value is sensu-agent, in case of disabled user
if storedUserConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" {
delete(storedUserConfig.GetMetadata().Labels, corev2.ManagedByLabel)
}

// Send back this user config to the agent so it uses that rather than it's local config
watchEvent := &store.WatchEventUserConfig{
Action: store.WatchUpdate,
User: &storedUserConfig,
}
err = s.bus.Publish(messaging.UserConfigTopic(s.cfg.Namespace, s.cfg.AgentName), watchEvent)
if err != nil {
lager.WithError(err).Error("error publishing user config")
return err
}
}

// Determine if the entity already exists
subscription, err := s.bus.Subscribe(topic, agentName, s.entityConfig)
if err != nil {
lager.WithError(err).Error("error starting subscription")
return err
}
s.entityConfig.subscriptions <- subscription

// Determine if the entity already exists
req := storev2.NewResourceRequest(s.ctx, s.cfg.Namespace, s.cfg.AgentName, (&corev3.EntityConfig{}).StoreName())
wrapper, err := s.storev2.Get(req)
if err != nil {
Expand Down Expand Up @@ -624,6 +743,7 @@ func (s *Session) stop() {
}
}()
defer close(s.entityConfig.updatesChannel)
defer close(s.userConfig.updatesChannel)
defer close(s.checkChannel)

sessionCounter.WithLabelValues(s.cfg.Namespace).Dec()
Expand All @@ -648,6 +768,12 @@ func (s *Session) stop() {
}
}

for sub := range s.userConfig.subscription {
if err := sub.Cancel(); err != nil {
logger.WithError(err).Error("unable to unsubscribe from message bus")
}
}

// Unsubscribe the session from every configured check subscriptions
s.unsubscribe(s.cfg.Subscriptions)
}
Expand Down
9 changes: 2 additions & 7 deletions backend/agentd/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/gogo/protobuf/proto"
corev2 "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/agent"
"github.com/sensu/sensu-go/backend/store"
etcdstore "github.com/sensu/sensu-go/backend/store/etcd"
storev2 "github.com/sensu/sensu-go/backend/store/v2"
Expand Down Expand Up @@ -110,12 +109,8 @@ func GetUserConfigWatcher(ctx context.Context, client *clientv3.Client) <-chan s
}

// Remove the managed_by label if the value is sensu-agent, in case the user is disabled
//if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" {
// delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel)
//}

if userConfig.Disabled {
agent.GracefulShutdown(cancel)
if userConfig.GetMetadata().Labels[corev2.ManagedByLabel] == "sensu-agent" {
delete(userConfig.GetMetadata().Labels, corev2.ManagedByLabel)
}

ch <- store.WatchEventUserConfig{
Expand Down
6 changes: 6 additions & 0 deletions backend/messaging/message_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
// to agents
TopicEntityConfig = "sensu:entity-config"

TopicUserConfig = "sensu:user-config"

// TopicEvent is the topic for events that have been written to Etcd and
// normalized by eventd.
TopicEvent = "sensu:event"
Expand Down Expand Up @@ -104,6 +106,10 @@ func EntityConfigTopic(namespace, name string) string {
return fmt.Sprintf("%s:%s:%s", TopicEntityConfig, namespace, name)
}

func UserConfigTopic(namespace, name string) string {
return fmt.Sprintf("%s:%s:%s", TopicUserConfig, namespace, name)
}

// SubscriptionTopic is a helper to determine the proper topic name for a
// subscription based on the namespace
func SubscriptionTopic(namespace, sub string) string {
Expand Down
7 changes: 5 additions & 2 deletions backend/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

corev2 "github.com/sensu/core/v2"

Check failure on line 8 in backend/store/store.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

package "github.com/sensu/core/v2" is being imported more than once (ST1019)
v2 "github.com/sensu/core/v2"

Check failure on line 9 in backend/store/store.go

View workflow job for this annotation

GitHub Actions / staticcheck (project)

other import of "github.com/sensu/core/v2"
corev3 "github.com/sensu/core/v3"
"github.com/sensu/sensu-go/backend/store/patch"
"github.com/sensu/sensu-go/types"
Expand Down Expand Up @@ -159,8 +160,10 @@ type WatchEventEntityConfig struct {
// WatchEventUserConfig contains and updated entity config and the action that
// occurred during this modification
type WatchEventUserConfig struct {
User *corev2.User
Action WatchActionType
User *corev2.User
Action WatchActionType
Metadata *v2.ObjectMeta `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata"`
//Subscriptions []string `protobuf:"bytes,4,rep,name=subscriptions,proto3" json:"subscriptions"`
}

// Store is used to abstract the durable storage used by the Sensu backend
Expand Down

0 comments on commit 02d1133

Please sign in to comment.