From aca44bfa4da09e2272beed3c31b254857ee84634 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Fri, 13 Dec 2024 09:36:57 +0100 Subject: [PATCH] Re-introduce atomics from #6176 Signed-off-by: Maurice van Veen --- server/jetstream.go | 4 ++++ server/jetstream_cluster.go | 6 +----- server/server.go | 14 +++++++------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/server/jetstream.go b/server/jetstream.go index e3f073fa95f..02920e76a4b 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -461,6 +461,8 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error { if err := s.enableJetStreamClustering(); err != nil { return err } + // Set our atomic bool to clustered. + s.jsClustered.Store(true) } // Mark when we are up and running. @@ -965,6 +967,8 @@ func (s *Server) shutdownJetStream() { cc.c = nil } cc.meta = nil + // Set our atomic bool to false. + s.jsClustered.Store(false) } js.mu.Unlock() diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 66b41f2499a..8f08b1e502a 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -224,11 +224,7 @@ func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) { } func (s *Server) JetStreamIsClustered() bool { - js := s.getJetStream() - if js == nil { - return false - } - return js.isClustered() + return s.jsClustered.Load() } func (s *Server) JetStreamIsLeader() bool { diff --git a/server/server.go b/server/server.go index e6f6c728d42..81013d1e1b9 100644 --- a/server/server.go +++ b/server/server.go @@ -141,8 +141,10 @@ type Server struct { listenerErr error gacc *Account sys *internal + sysAcc atomic.Pointer[Account] js atomic.Pointer[jetStream] isMetaLeader atomic.Bool + jsClustered atomic.Bool accounts sync.Map tmpAccounts sync.Map // Temporarily stores accounts that are being built activeAccounts int32 @@ -1281,6 +1283,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error) if err == nil && s.sys != nil && acc != s.sys.account { // sys.account.clients (including internal client)/respmap/etc... are transferred separately s.sys.account = acc + s.sysAcc.Store(acc) } if err != nil { return awcsti, fmt.Errorf("error resolving system account: %v", err) @@ -1636,13 +1639,7 @@ func (s *Server) SetSystemAccount(accName string) error { // SystemAccount returns the system account if set. func (s *Server) SystemAccount() *Account { - var sacc *Account - s.mu.RLock() - if s.sys != nil { - sacc = s.sys.account - } - s.mu.RUnlock() - return sacc + return s.sysAcc.Load() } // GlobalAccount returns the global account. @@ -1714,6 +1711,9 @@ func (s *Server) setSystemAccount(acc *Account) error { s.sys.wg.Add(1) s.mu.Unlock() + // Store in atomic for fast lookup. + s.sysAcc.Store(acc) + // Register with the account. s.sys.client.registerWithAccount(acc)