Skip to content

Commit

Permalink
Re-introduce atomics from #6176
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Dec 13, 2024
1 parent dbb3907 commit aca44bf
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
4 changes: 4 additions & 0 deletions server/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
if err := s.enableJetStreamClustering(); err != nil {
return err
}
// Set our atomic bool to clustered.
s.jsClustered.Store(true)
}

// Mark when we are up and running.
Expand Down Expand Up @@ -965,6 +967,8 @@ func (s *Server) shutdownJetStream() {
cc.c = nil
}
cc.meta = nil
// Set our atomic bool to false.
s.jsClustered.Store(false)
}
js.mu.Unlock()

Expand Down
6 changes: 1 addition & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
}

func (s *Server) JetStreamIsClustered() bool {
js := s.getJetStream()
if js == nil {
return false
}
return js.isClustered()
return s.jsClustered.Load()
}

func (s *Server) JetStreamIsLeader() bool {
Expand Down
14 changes: 7 additions & 7 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ type Server struct {
listenerErr error
gacc *Account
sys *internal
sysAcc atomic.Pointer[Account]
js atomic.Pointer[jetStream]
isMetaLeader atomic.Bool
jsClustered atomic.Bool
accounts sync.Map
tmpAccounts sync.Map // Temporarily stores accounts that are being built
activeAccounts int32
Expand Down Expand Up @@ -1281,6 +1283,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
if err == nil && s.sys != nil && acc != s.sys.account {
// sys.account.clients (including internal client)/respmap/etc... are transferred separately
s.sys.account = acc
s.sysAcc.Store(acc)
}
if err != nil {
return awcsti, fmt.Errorf("error resolving system account: %v", err)
Expand Down Expand Up @@ -1636,13 +1639,7 @@ func (s *Server) SetSystemAccount(accName string) error {

// SystemAccount returns the system account if set.
func (s *Server) SystemAccount() *Account {
var sacc *Account
s.mu.RLock()
if s.sys != nil {
sacc = s.sys.account
}
s.mu.RUnlock()
return sacc
return s.sysAcc.Load()
}

// GlobalAccount returns the global account.
Expand Down Expand Up @@ -1714,6 +1711,9 @@ func (s *Server) setSystemAccount(acc *Account) error {
s.sys.wg.Add(1)
s.mu.Unlock()

// Store in atomic for fast lookup.
s.sysAcc.Store(acc)

// Register with the account.
s.sys.client.registerWithAccount(acc)

Expand Down

0 comments on commit aca44bf

Please sign in to comment.