Skip to content

Commit

Permalink
CBG-3553 add bucket.Close() when opening the database (#6546) (#6548)
Browse files Browse the repository at this point in the history
* CBG-3553 add bucket.Close() when opening the database

- close the DatabaseContext if it's not managed by the database if it hasn't been added to
- changed newSequenceAllocator to not leak a goroutine

* Don't create db automatically

* Improve error message

* Don't close dbcontext if already registered

* Remove test which did not test anything
  • Loading branch information
torcolvin authored Oct 23, 2023
1 parent 42eefb9 commit f63068e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 6 deletions.
7 changes: 5 additions & 2 deletions db/sequence_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ func newSequenceAllocator(ctx context.Context, datastore base.DataStore, dbStats

// The reserveNotify channel manages communication between the releaseSequenceMonitor goroutine and _reserveSequenceRange invocations.
s.reserveNotify = make(chan struct{}, 1)
_, err := s.lastSequence(ctx) // just reads latest sequence from bucket
if err != nil {
return nil, err
}
go func() {
defer base.FatalPanicHandler()
s.releaseSequenceMonitor(ctx)
}()
_, err := s.lastSequence(ctx) // just reads latest sequence from bucket
return s, err
}

Expand Down Expand Up @@ -152,7 +155,7 @@ func (s *sequenceAllocator) lastSequence(ctx context.Context) (uint64, error) {
s.dbStats.SequenceGetCount.Add(1)
last, err := s.getSequence()
if err != nil {
base.WarnfCtx(ctx, "Error from Get in getSequence(): %v", err)
return 0, fmt.Errorf("Couldn't get sequence from bucket: %w", err)
}
return last, err
}
Expand Down
2 changes: 1 addition & 1 deletion rest/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (h *handler) handleCreateDB() error {
if errors.Is(err, base.ErrAuthError) {
return base.HTTPErrorf(http.StatusForbidden, "auth failure using provided bucket credentials for database %s", base.MD(config.Name))
}
return err
return base.HTTPErrorf(http.StatusInternalServerError, "couldn't load database: %v", err)
}
}

Expand Down
23 changes: 20 additions & 3 deletions rest/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,9 @@ func GetBucketSpec(ctx context.Context, config *DatabaseConfig, serverConfig *St
// lock to see if it's already been added by another process. If so, returns either the
// existing DatabaseContext or an error based on the useExisting flag.
// Pass in a bucketFromBucketSpecFn to replace the default ConnectToBucket function. This will cause the failFast argument to be ignored
func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config DatabaseConfig, options getOrAddDatabaseConfigOptions) (*db.DatabaseContext, error) {
func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config DatabaseConfig, options getOrAddDatabaseConfigOptions) (dbcontext *db.DatabaseContext, returnedError error) {
var bucket base.Bucket

// Generate bucket spec and validate whether db already exists
spec, err := GetBucketSpec(ctx, &config, sc.Config)
if err != nil {
Expand All @@ -490,6 +492,22 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config
if dbName == "" {
dbName = spec.BucketName
}
defer func() {
if returnedError == nil {
return
}
// database exists in global map, management is deferred to REST api
_, dbRegistered := sc.databases_[dbName]
if dbRegistered {
return
}
if dbcontext != nil {
dbcontext.Close(ctx) // will close underlying bucket
} else if bucket != nil {
bucket.Close(ctx)
}
}()

if spec.Server == "" {
spec.Server = sc.Config.Bootstrap.Server
}
Expand Down Expand Up @@ -530,7 +548,6 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config
base.MD(dbName), base.MD(spec.BucketName), base.SD(base.DefaultPool), base.SD(spec.Server))

// the connectToBucketFn is used for testing seam
var bucket base.Bucket
if options.connectToBucketFn != nil {
// the connectToBucketFn is used for testing seam
bucket, err = options.connectToBucketFn(ctx, spec, options.failFast)
Expand Down Expand Up @@ -821,7 +838,7 @@ func (sc *ServerContext) _getOrAddDatabaseFromConfig(ctx context.Context, config
}

// Create the DB Context
dbcontext, err := db.NewDatabaseContext(ctx, dbName, bucket, autoImport, contextOptions)
dbcontext, err = db.NewDatabaseContext(ctx, dbName, bucket, autoImport, contextOptions)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f63068e

Please sign in to comment.