Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the polling of agbot checks the secrets update #4154

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewAgreementBotWorker(name string, cfg *config.HorizonConfig, db persistenc
newMessagesToProcess: false,
nodeSearch: NewNodeSearch(),
secretProvider: s,
secretUpdateManager: NewSecretUpdateManager(),
secretUpdateManager: NewSecretUpdateManager(cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckInterval, cfg.AgreementBot.SecretsUpdateCheckMaxInterval, cfg.AgreementBot.SecretsUpdateCheckIncrement),
}

patternManager = NewPatternManager()
Expand Down Expand Up @@ -1634,7 +1634,7 @@ func (w *AgreementBotWorker) secretsProviderMaintenance() int {

// This function is called by the secrets update sub worker to learn about secrets that have been updated.
func (w *AgreementBotWorker) secretsUpdate() int {

nextRunWait := w.secretUpdateManager.PollInterval
secretUpdates, err := w.secretUpdateManager.CheckForUpdates(w.secretProvider, w.db)
if err != nil {
glog.Errorf(AWlogString(err))
Expand All @@ -1643,10 +1643,13 @@ func (w *AgreementBotWorker) secretsUpdate() int {
// Send out an event with the changed secrets and affected policies in it.
if secretUpdates != nil && secretUpdates.Length() != 0 {
w.Messages() <- events.NewSecretUpdatesMessage(events.UPDATED_SECRETS, secretUpdates)
nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(secretUpdates.Length())
} else {
nextRunWait = w.secretUpdateManager.AdjustSecretsPollingInterval(0)
}
}

return 0
return nextRunWait
}

func (w *AgreementBotWorker) monitorHAGroupNMPUpdates() int {
Expand Down
46 changes: 42 additions & 4 deletions agreementbot/secret_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,50 @@ import (

// The main component which holds secret updates for the governance functions.
type SecretUpdateManager struct {
PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed
PULock sync.Mutex // The lock that protects the list of pending secret updates.
PendingUpdates []*events.SecretUpdates // Secret update events that need to be processed
PollInterval int // Number of seconds to pull secret update
PollMinInterval int
PollMaxInterval int
PollIntervalIncrement int
PULock sync.Mutex // The lock that protects the list of pending secret updates.
}

func NewSecretUpdateManager() *SecretUpdateManager {
return new(SecretUpdateManager)
func NewSecretUpdateManager(pollInterval int, pollMinInterval int, pollMaxInterval int, pollIntervalIncrement int) *SecretUpdateManager {
sum := &SecretUpdateManager{
PendingUpdates: make([]*events.SecretUpdates, 0),
PollInterval: pollInterval, // 60s
PollMinInterval: pollMinInterval, // 60s
PollMaxInterval: pollMaxInterval, // 300s
PollIntervalIncrement: pollIntervalIncrement, // 30s
}
return sum
}

func (sm *SecretUpdateManager) GetPollInterval() int {
return sm.PollInterval
}

func (sm *SecretUpdateManager) SetPollInterval(interval int) {
sm.PULock.Lock()
defer sm.PULock.Unlock()
sm.PollInterval = interval
}

func (sm *SecretUpdateManager) AdjustSecretsPollingInterval(numOfSecretUpdate int) int {
if numOfSecretUpdate == 0 {
// no update, increase the poll interval
sm.PollInterval += sm.PollIntervalIncrement
if sm.PollInterval > sm.PollMaxInterval {
sm.PollInterval = sm.PollMaxInterval
}
} else {
// if there were changes, set interval to min
sm.PollInterval = sm.PollMinInterval
}

glog.V(5).Infof(smlogString(fmt.Sprintf("AdjustSecretsPollingInterval to %v, numOfSecretUpdate is: %v", sm.PollInterval, numOfSecretUpdate)))

return sm.PollInterval
}

func (sm *SecretUpdateManager) GetNextUpdateEvent() (su *events.SecretUpdates) {
Expand Down
47 changes: 31 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ type AGConfig struct {
RetryLookBackWindow uint64 // The time window (in seconds) used by the agbot to look backward in time for node changes when node agreements are retried.
PolicySearchOrder bool // When true, search policies from most recently changed to least recently changed.
Vault VaultConfig // The hashicorp vault config to connect to and fetch secrets from.
SecretsUpdateCheck int // The number of seconds between checks for updated secrets.
SecretsUpdateCheckInterval int // The number of seconds between checks for updated secrets. Default is 60
SecretsUpdateCheckMaxInterval int // As the runtime increases the SecretsUpdateCheckInterval, this value is the maximum that value can attain.
SecretsUpdateCheckIncrement int // The number of seconds to increment the SecretsUpdateCheckInterval when its time to increase the poll interval.
CSSDestinationBatchSize int // The max number of destination updates to send to CSS in a single update.
}

Expand Down Expand Up @@ -188,7 +190,15 @@ func (c *HorizonConfig) GetSecretsManagerFilePath() string {
}

func (c *HorizonConfig) GetSecretsUpdateCheck() int {
return c.AgreementBot.SecretsUpdateCheck
return c.AgreementBot.SecretsUpdateCheckInterval
}

func (c *HorizonConfig) GetSecretsUpdateCheckMaxInterval() int {
return c.AgreementBot.SecretsUpdateCheckMaxInterval
}

func (c *HorizonConfig) GetSecretsUpdateCheckIncrement() int {
return c.AgreementBot.SecretsUpdateCheckIncrement
}

func (c *HorizonConfig) GetAgbotCSSURL() string {
Expand Down Expand Up @@ -397,18 +407,20 @@ func Read(file string) (*HorizonConfig, error) {
K8sCRInstallTimeoutS: K8sCRInstallTimeoutS_DEFAULT,
},
AgreementBot: AGConfig{
MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT,
AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT,
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
ErrRescanS: AgbotErrRescan_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT,
SecretsUpdateCheck: SecretsUpdateCheck_DEFAULT,
CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT,
MessageKeyCheck: AgbotMessageKeyCheck_DEFAULT,
AgreementBatchSize: AgbotAgreementBatchSize_DEFAULT,
AgreementQueueSize: AgbotAgreementQueueSize_DEFAULT,
MessageQueueScale: AgbotMessageQueueScale_DEFAULT,
QueueHistorySize: AgbotQueueHistorySize_DEFAULT,
ErrRescanS: AgbotErrRescan_DEFAULT,
FullRescanS: AgbotFullRescan_DEFAULT,
MaxExchangeChanges: AgbotMaxChanges_DEFAULT,
RetryLookBackWindow: AgbotRetryLookBackWindow_DEFAULT,
PolicySearchOrder: AgbotPolicySearchOrder_DEFAULT,
SecretsUpdateCheckInterval: SecretsUpdateCheck_DEFAULT,
SecretsUpdateCheckMaxInterval: SecretsUpdateCheckMaxInterval_DEFAULT,
SecretsUpdateCheckIncrement: SecretsUpdateCheckIncrement_DEFAULT,
CSSDestinationBatchSize: AgbotCSSDestinationBatchSize_DEFAULT,
},
}

Expand Down Expand Up @@ -595,7 +607,10 @@ func (agc *AGConfig) String() string {
", MaxExchangeChanges: %v"+
", RetryLookBackWindow: %v"+
", PolicySearchOrder: %v"+
", Vault: {%v}",
", Vault: {%v}"+
", SecretsUpdateCheckInterval: %v"+
", SecretsUpdateCheckMaxInterval: %v"+
", SecretsUpdateCheckIncrement: %v",
agc.TxLostDelayTolerationSeconds, agc.AgreementWorkers, agc.DBPath, agc.Postgresql.String(),
agc.PartitionStale, agc.ProtocolTimeoutS, agc.AgreementTimeoutS, agc.NoDataIntervalS, agc.ActiveAgreementsURL,
agc.ActiveAgreementsUser, mask, agc.PolicyPath, agc.NewContractIntervalS, agc.ProcessGovernanceIntervalS,
Expand All @@ -604,7 +619,7 @@ func (agc *AGConfig) String() string {
agc.SecureAPIListenHost, agc.SecureAPIListenPort, agc.SecureAPIServerCert, agc.SecureAPIServerKey,
agc.PurgeArchivedAgreementHours, agc.CheckUpdatedPolicyS, agc.CSSURL, agc.CSSSSLCert, agc.CSSDestinationBatchSize, agc.AgreementBatchSize,
agc.AgreementQueueSize, agc.MessageQueueScale, agc.QueueHistorySize, agc.FullRescanS, agc.ErrRescanS, agc.MaxExchangeChanges,
agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault)
agc.RetryLookBackWindow, agc.PolicySearchOrder, agc.Vault, agc.SecretsUpdateCheckInterval, agc.SecretsUpdateCheckMaxInterval, agc.SecretsUpdateCheckIncrement)
}

func (c *VaultConfig) String() string {
Expand Down
6 changes: 6 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,11 @@ const K8sCRInstallTimeoutS_DEFAULT = 180
// Time between secret update checks
const SecretsUpdateCheck_DEFAULT = 60

// Max interval between secret update checks
const SecretsUpdateCheckMaxInterval_DEFAULT = 180

// The Default secrets check increment size
const SecretsUpdateCheckIncrement_DEFAULT = 30

// Batch destination size to send to CSS
const AgbotCSSDestinationBatchSize_DEFAULT = 200
Loading