From 0db83cb8ffedf9c2cce1d2898e4e2b8b264ef329 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 17 Nov 2023 15:44:23 -0500 Subject: [PATCH 1/7] CBG-3271 make sure expiration works on a closed bucket Make sure the following works: - open bucket - write doc with expiry of 1 sec - close bucket - wait more than 1 sec The doc should expire, because another caller can re-open foo, and would expect the document to be deleted. This is akin to metadata purge interval. Implementation: - Abstracted ExpirationManager into a separate struct / file for readability. - ExpirationManager exists on every Bucket, since expiration is set based on a CRUD operation, which could occur on any copy of the bucket. - The functions for expiration use a new function _underlyingDB which returns a queryable object that is never closed. --- bucket.go | 35 ++++++++------ bucket_api.go | 72 +++++++++++++---------------- bucket_test.go | 27 +++++++++-- collection.go | 35 +++++++++----- designdoc.go | 4 +- expiration_manager.go | 104 ++++++++++++++++++++++++++++++++++++++++++ feeds.go | 2 +- views.go | 2 +- 8 files changed, 205 insertions(+), 76 deletions(-) create mode 100644 expiration_manager.go diff --git a/bucket.go b/bucket.go index 151cf79..32277e6 100644 --- a/bucket.go +++ b/bucket.go @@ -36,13 +36,12 @@ type Bucket struct { name string // Bucket name collections collectionsMap // Collections, indexed by DataStoreName collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed - mutex *sync.Mutex // mutex for synchronized access to Bucket - sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) - expTimer *time.Timer // Schedules expiration of docs - nextExp *uint32 // Timestamp when expTimer will run (0 if never) - serial uint32 // Serial number for logging - inMemory bool // True if it's an in-memory database - closed bool // represents state when it is closed + mutex *sync.Mutex // mutex for synchronized access to Bucket + sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) + expManager *expirationManager // expiration manager for bucket + serial uint32 // Serial number for logging + inMemory bool // True if it's an in-memory database + closed bool // represents state when it is closed } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection @@ -174,10 +173,10 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err collections: make(map[sgbucket.DataStoreNameImpl]*Collection), collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed), mutex: &sync.Mutex{}, - nextExp: new(uint32), inMemory: inMemory, serial: serial, } + bucket.expManager = newExpirationManager(bucket.doExpiration) defer func() { if err != nil { _ = bucket.CloseAndDelete(context.TODO()) @@ -307,7 +306,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) { return } -// Returns the database handle as a `queryable` interface value. +// db returns the database handle as a `queryable` interface value. // If the bucket has been closed, it returns a special `closedDB` value that will return // ErrBucketClosed from any call. func (bucket *Bucket) db() queryable { @@ -316,7 +315,7 @@ func (bucket *Bucket) db() queryable { return bucket._db() } -// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. +// db returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. func (bucket *Bucket) _db() queryable { if bucket.closed { return closedDB{} @@ -324,8 +323,17 @@ func (bucket *Bucket) _db() queryable { return bucket.sqliteDB } +// _underlyingDB returns the database handle as a `queryable` interface value. This should only be used for bucket maintanence operations that should occur regardless of if bucket is open or closed. This is not safe to call without bucket.mutex being locked the caller. +func (bucket *Bucket) _underlyingDB() queryable { + if bucket.sqliteDB == nil { + logError("bucket.sqliteDB is nil for _underlyingDB call. This function is being called after bucket is closed.") + return closedDB{} + } + return bucket.sqliteDB +} + // Runs a function within a SQLite transaction. -func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error { +func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error, checkClosedBucket bucketClosedCheck) error { // SQLite allows only a single writer, so use a mutex to avoid BUSY and LOCKED errors. // However, these errors can still occur (somehow?), so we retry if we get one. // --Update, 25 July 2023: After adding "_txlock=immediate" to the DB options when opening, @@ -333,7 +341,7 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error { bucket.mutex.Lock() defer bucket.mutex.Unlock() - if bucket.closed { + if checkClosedBucket == true && bucket.closed { return ErrBucketClosed } @@ -380,8 +388,7 @@ func (b *Bucket) copy() *Bucket { collections: make(collectionsMap), mutex: b.mutex, sqliteDB: b.sqliteDB, - expTimer: b.expTimer, - nextExp: b.nextExp, + expManager: b.expManager, serial: b.serial, inMemory: b.inMemory, } diff --git a/bucket_api.go b/bucket_api.go index 3a26d5a..7c2f3a1 100644 --- a/bucket_api.go +++ b/bucket_api.go @@ -13,11 +13,17 @@ import ( "database/sql" "errors" "fmt" - "time" sgbucket "github.com/couchbase/sg-bucket" ) +type bucketClosedCheck bool + +const ( + checkBucketClosed bucketClosedCheck = true + skipCheckBucketClosed bucketClosedCheck = false +) + func (bucket *Bucket) String() string { return fmt.Sprintf("B#%d", bucket.serial) } @@ -62,9 +68,7 @@ func (bucket *Bucket) Close(_ context.Context) { // _closeSqliteDB closes the underlying sqlite database and shuts down dcpFeeds. Must have a lock to call this function. func (bucket *Bucket) _closeSqliteDB() { - if bucket.expTimer != nil { - bucket.expTimer.Stop() - } + bucket.expManager.stop() for _, c := range bucket.collections { c.close() } @@ -174,10 +178,16 @@ func (bucket *Bucket) DropDataStore(name sgbucket.DataStoreName) error { return bucket.dropCollection(sc) } +// ListDataStores returns a list of the names of all data stores in the bucket. func (bucket *Bucket) ListDataStores() (result []sgbucket.DataStoreName, err error) { - traceEnter("ListDataStores", "%s", bucket) - defer func() { traceExit("ListDataStores", err, "%v", result) }() - rows, err := bucket.db().Query(`SELECT id, scope, name FROM collections ORDER BY id`) + return bucket.listDataStores(bucket.db()) +} + +// listDataStores returns a list of the names of all data stores in the bucket, given a specific db handle. +func (bucket *Bucket) listDataStores(db queryable) (result []sgbucket.DataStoreName, err error) { + traceEnter("listDataStores", "%s", bucket) + defer func() { traceExit("listDataStores", err, "%v", result) }() + rows, err := db.Query(`SELECT id, scope, name FROM collections ORDER BY id`) if err != nil { return nil, err } @@ -309,8 +319,8 @@ func (bucket *Bucket) dropCollection(name sgbucket.DataStoreNameImpl) error { //////// EXPIRATION (CUSTOM API): -// Returns the earliest expiration time of any document, or 0 if none. -func (bucket *Bucket) NextExpiration() (exp Exp, err error) { +// nextExpiration returns the earliest expiration time of any document, or 0 if none. +func (bucket *Bucket) nextExpiration() (exp Exp, err error) { var expVal sql.NullInt64 row := bucket.db().QueryRow(`SELECT min(exp) FROM documents WHERE exp > 0`) err = scan(row, &expVal) @@ -320,9 +330,9 @@ func (bucket *Bucket) NextExpiration() (exp Exp, err error) { return } -// Immediately deletes all expired documents in this bucket. -func (bucket *Bucket) ExpireDocuments() (int64, error) { - names, err := bucket.ListDataStores() +// expireDocuments immediately deletes all expired documents in this bucket. +func (bucket *Bucket) expireDocuments() (int64, error) { + names, err := bucket.listDataStores(bucket._underlyingDB()) if err != nil { return 0, err } @@ -330,7 +340,7 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) { for _, name := range names { if coll, err := bucket.getCollection(name.(sgbucket.DataStoreNameImpl)); err != nil { return 0, err - } else if n, err := coll.ExpireDocuments(); err != nil { + } else if n, err := coll.expireDocuments(); err != nil { return 0, err } else { count += n @@ -339,40 +349,20 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) { return count, nil } +// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. func (bucket *Bucket) scheduleExpiration() { - if nextExp, err := bucket.NextExpiration(); err == nil && nextExp > 0 { - bucket.scheduleExpirationAtOrBefore(nextExp) - } -} - -func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) { - if exp > 0 { - bucket.mutex.Lock() - defer bucket.mutex.Unlock() - if exp < *bucket.nextExp || *bucket.nextExp == 0 { - bucket.nextExp = &exp - dur := expDuration(exp) - if dur < 0 { - dur = 0 - } - debug("EXP: Scheduling in %s", dur) - if bucket.expTimer == nil { - bucket.expTimer = time.AfterFunc(dur, bucket.doExpiration) - } else { - bucket.expTimer.Reset(dur) - } - } + if nextExp, err := bucket.nextExpiration(); err == nil && nextExp > 0 { + bucket.expManager._scheduleExpirationAtOrBefore(nextExp) } } func (bucket *Bucket) doExpiration() { - bucket.mutex.Lock() - bucket.nextExp = func(x uint32) *uint32 { return &x }(0) - bucket.mutex.Unlock() + bucket.expManager._clearNext() debug("EXP: Running scheduled expiration...") - if n, err := bucket.ExpireDocuments(); err != nil { - logError("Bucket %s error expiring docs: %v", bucket, err) + if n, err := bucket.expireDocuments(); err != nil { + // If there's an error expiring docs, it means there is a programming error of a leaked expiration goroutine. + panic("Error expiring docs: " + err.Error()) } else if n > 0 { info("Bucket %s expired %d docs", bucket, n) } @@ -389,7 +379,7 @@ func (bucket *Bucket) PurgeTombstones() (count int64, err error) { count, err = result.RowsAffected() } return err - }) + }, true) traceExit("PurgeTombstones", err, "%d", count) return } diff --git a/bucket_test.go b/bucket_test.go index 4d53e3e..0a70129 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -355,7 +355,7 @@ func TestExpiration(t *testing.T) { bucket := makeTestBucket(t) c := bucket.DefaultDataStore() - exp, err := bucket.NextExpiration() + exp, err := bucket.nextExpiration() require.NoError(t, err) require.Equal(t, Exp(0), exp) @@ -367,7 +367,7 @@ func TestExpiration(t *testing.T) { requireAddRaw(t, c, "k3", 0, []byte("v3")) requireAddRaw(t, c, "k4", exp4, []byte("v4")) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) // Usually this will return exp2, but if this is slow enough that the expiration goroutine runs to expire document k2, it can return exp4. require.Contains(t, []Exp{exp2, exp4}, exp) @@ -375,9 +375,9 @@ func TestExpiration(t *testing.T) { log.Printf("... waiting 1 sec ...") time.Sleep(1 * time.Second) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) - assert.Equal(t, exp4, exp) + require.Equal(t, int(exp4), int(exp)) _, _, err = c.GetRaw("k1") assert.NoError(t, err) @@ -391,7 +391,7 @@ func TestExpiration(t *testing.T) { log.Printf("... waiting 2 secs ...") time.Sleep(2 * time.Second) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) assert.Equal(t, uint32(0), exp) @@ -403,6 +403,23 @@ func TestExpiration(t *testing.T) { assert.Equal(t, int64(2), n) } +func TestExpirationAfterClose(t *testing.T) { + bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), CreateNew) + ctx := testCtx(t) + defer func() { + assert.NoError(t, bucket.CloseAndDelete(ctx)) + }() + require.NoError(t, err) + c := bucket.DefaultDataStore() + + // set expiry long enough that Close will happen first + exp := Exp(time.Now().Add(1 * time.Second).Unix()) + requireAddRaw(t, c, "docID", exp, []byte("v1")) + bucket.Close(ctx) + // sleep to ensure we won't panic + time.Sleep(2 * time.Second) +} + func TestUriFromPathWindows(t *testing.T) { ensureNoLeaks(t) if runtime.GOOS != "windows" { diff --git a/collection.go b/collection.go index a2a776b..93c62b9 100644 --- a/collection.go +++ b/collection.go @@ -357,22 +357,26 @@ func (c *Collection) WriteCas(key string, flags int, exp Exp, cas CAS, val any, return } +// Remove creates a document tombstone. It removes the document's value and user xattrs. func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) { traceEnter("Remove", "%q, 0x%x", key, cas) - casOut, err = c.remove(key, &cas) + casOut, err = c.remove(key, &cas, checkBucketClosed) traceExit("Remove", err, "0x%x", casOut) return } +// Delete creates a document tombstone. It removes the document's value and user xattrs. Equivalent to Remove without a CAS check. func (c *Collection) Delete(key string) (err error) { traceEnter("Delete", "%q", key) - _, err = c.remove(key, nil) + _, err = c.remove(key, nil, checkBucketClosed) traceExit("Delete", err, "ok") return err } -func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) { - err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (e *event, err error) { +// remove creates a document tombstone. It removes the document's value and user xattrs. checkClosed will allow removing the document even the bucket instance is "closed". +func (c *Collection) remove(key string, ifCas *CAS, checkClosed bucketClosedCheck) (casOut CAS, err error) { + fmt.Println("remove", checkClosed) + err = c.withNewCasAndBucketClosedCheck(func(txn *sql.Tx, newCas CAS) (e *event, err error) { // Get the doc, possibly checking cas: var cas CAS var rawXattrs []byte @@ -416,7 +420,7 @@ func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) { } casOut = newCas return - }) + }, checkClosed) return } @@ -518,14 +522,14 @@ func (c *Collection) IsSupported(feature sgbucket.BucketStoreFeature) bool { //////// EXPIRATION -// Immediately deletes all expired documents in this collection. -func (c *Collection) ExpireDocuments() (count int64, err error) { - traceEnter("ExpireDocuments", "") - defer func() { traceExit("ExpireDocuments", err, "%d", count) }() +// _expireDocuments immediately deletes all expired documents in this collection. +func (c *Collection) expireDocuments() (count int64, err error) { + traceEnter("_expireDocuments", "") + defer func() { traceExit("_expireDocuments", err, "%d", count) }() // First find all the expired docs and collect their keys: exp := nowAsExpiry() - rows, err := c.db().Query(`SELECT key FROM documents + rows, err := c.bucket._underlyingDB().Query(`SELECT key FROM documents WHERE collection = ?1 AND exp > 0 AND exp <= ?2`, c.id, exp) if err != nil { return @@ -546,8 +550,10 @@ func (c *Collection) ExpireDocuments() (count int64, err error) { // will get its own db connection, and if the db only supports one connection (i.e. in-memory) // having both queries active would deadlock.) for _, key := range keys { - if c.Delete(key) == nil { + _, err = c.remove(key, nil, skipCheckBucketClosed) + if err == nil { count++ + } else { } } return @@ -581,6 +587,11 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) { // Runs a function within a SQLite transaction, passing it a new CAS to assign to the // document being modified. The function returns an event to be posted. func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error { + return c.withNewCasAndBucketClosedCheck(fn, checkBucketClosed) +} + +// withNewCasAndBucketClosedCheck runs a function within a SQLite transaction like withNewCas. This allows the caller to bypass the bucket closed status, suitable for functions that need to run on the underlying bucket object. +func (c *Collection) withNewCasAndBucketClosedCheck(fn func(txn *sql.Tx, newCas CAS) (*event, error), checkClosed bucketClosedCheck) error { var e *event err := c.bucket.inTransaction(func(txn *sql.Tx) error { newCas, err := c.bucket.getLastCas(txn) @@ -592,7 +603,7 @@ func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error) } } return err - }) + }, checkClosed) if err == nil && e != nil { c.postNewEvent(e) } diff --git a/designdoc.go b/designdoc.go index eca21e8..7914ac6 100644 --- a/designdoc.go +++ b/designdoc.go @@ -103,7 +103,7 @@ func (c *Collection) PutDDoc(_ context.Context, designDoc string, ddoc *sgbucket } c.forgetCachedViews(designDoc) return nil - }) + }, checkBucketClosed) traceExit("PutDDoc", err, "ok") return err } @@ -121,7 +121,7 @@ func (c *Collection) DeleteDDoc(designDoc string) error { } } return err - }) + }, checkBucketClosed) traceExit("DeleteDDoc", err, "ok") return err } diff --git a/expiration_manager.go b/expiration_manager.go new file mode 100644 index 0000000..32bb18f --- /dev/null +++ b/expiration_manager.go @@ -0,0 +1,104 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "sync" + "time" +) + +// expirationManager handles expiration for a given bucket. It stores a timer which will call expirationFunc to delete documents. The value of when the timer +type expirationManager struct { + mutex *sync.Mutex // mutex for synchronized access to expirationManager + timer *time.Timer // Schedules expiration of docs + nextExp *uint32 // Timestamp when expTimer will run (0 if never) + expirationFunc func() // Function to call when timer expires +} + +func newExpirationManager(expiractionFunc func()) *expirationManager { + var nextExp uint32 + return &expirationManager{ + mutex: &sync.Mutex{}, + nextExp: &nextExp, + expirationFunc: expiractionFunc, + } +} + +// stop stops existing timers and waits for any expiration processes to complete +func (e *expirationManager) stop() { + e.mutex.Lock() + defer e.mutex.Unlock() + if e.timer != nil { + e.timer.Stop() + } +} + +// _getNext returns the next expiration time, 0 if there is no scheduled expiration. +func (e *expirationManager) _getNext() uint32 { + return *e.nextExp +} + +// setNext sets the next expiration time and schedules an expiration to occur after that time. +func (e *expirationManager) setNext(exp uint32) { + e.mutex.Lock() + defer e.mutex.Unlock() + e._setNext(exp) +} + +// _clearNext clears the next expiration time. +func (e *expirationManager) _clearNext() { + var exp uint32 + e.nextExp = &exp +} + +// setNext sets the next expiration time and schedules an expiration to occur after that time. Requires caller to have acquired mutex. +func (e *expirationManager) _setNext(exp uint32) { + info("_setNext ", exp) + e.nextExp = &exp + if exp == 0 { + e.timer = nil + return + } + dur := expDuration(exp) + if dur < 0 { + dur = 0 + } + debug("EXP: Scheduling in %s", dur) + if e.timer == nil { + e.timer = time.AfterFunc(dur, e.runExpiry) + } else { + e.timer.Reset(dur) + } +} + +// scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. +func (e *expirationManager) scheduleExpirationAtOrBefore(exp uint32) { + e.mutex.Lock() + defer e.mutex.Unlock() + e._scheduleExpirationAtOrBefore(exp) +} + +// _scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. Requires the mutext to be held. +func (e *expirationManager) _scheduleExpirationAtOrBefore(exp uint32) { + if exp == 0 { + return + } + currentNextExp := e._getNext() + // if zero will unset the timer. + if currentNextExp == 0 || exp < currentNextExp { + e._setNext(exp) + } +} + +// runExpiry is called when the timer expires. It calls the expirationFunc and then reschedules the timer if necessary. +func (e *expirationManager) runExpiry() { + e.mutex.Lock() + defer e.mutex.Unlock() + e.expirationFunc() +} diff --git a/feeds.go b/feeds.go index d5799f4..5c52947 100644 --- a/feeds.go +++ b/feeds.go @@ -168,7 +168,7 @@ func (c *Collection) postNewEvent(e *event) { feedEvent := e.asFeedEvent() c.postEvent(feedEvent) - c.bucket.scheduleExpirationAtOrBefore(e.exp) + c.bucket.expManager._scheduleExpirationAtOrBefore(e.exp) /* // Tell collections of other buckets on the same db file to post the event too: diff --git a/views.go b/views.go index 3e779ab..f18a2e6 100644 --- a/views.go +++ b/views.go @@ -306,7 +306,7 @@ func (c *Collection) updateView(ctx context.Context, designDoc string, viewName view.lastCas = latestCas } return err - }) + }, checkBucketClosed) return view, err } From ed119622aeb83acc3898122f36c29cd252cbe5d9 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 17 Nov 2023 17:03:38 -0500 Subject: [PATCH 2/7] Make expirationManager work on canonical bucket --- bucket.go | 15 +++------------ bucket_api.go | 22 +++++----------------- bucket_registry.go | 3 +-- collection.go | 24 ++++++++---------------- designdoc.go | 4 ++-- views.go | 2 +- 6 files changed, 20 insertions(+), 50 deletions(-) diff --git a/bucket.go b/bucket.go index 32277e6..9125430 100644 --- a/bucket.go +++ b/bucket.go @@ -202,7 +202,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err } registerBucket(bucket) - return bucket, err + return bucket.copy(), err } // Creates or re-opens a bucket, like OpenBucket. @@ -323,17 +323,8 @@ func (bucket *Bucket) _db() queryable { return bucket.sqliteDB } -// _underlyingDB returns the database handle as a `queryable` interface value. This should only be used for bucket maintanence operations that should occur regardless of if bucket is open or closed. This is not safe to call without bucket.mutex being locked the caller. -func (bucket *Bucket) _underlyingDB() queryable { - if bucket.sqliteDB == nil { - logError("bucket.sqliteDB is nil for _underlyingDB call. This function is being called after bucket is closed.") - return closedDB{} - } - return bucket.sqliteDB -} - // Runs a function within a SQLite transaction. -func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error, checkClosedBucket bucketClosedCheck) error { +func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error) error { // SQLite allows only a single writer, so use a mutex to avoid BUSY and LOCKED errors. // However, these errors can still occur (somehow?), so we retry if we get one. // --Update, 25 July 2023: After adding "_txlock=immediate" to the DB options when opening, @@ -341,7 +332,7 @@ func (bucket *Bucket) inTransaction(fn func(txn *sql.Tx) error, checkClosedBucke bucket.mutex.Lock() defer bucket.mutex.Unlock() - if checkClosedBucket == true && bucket.closed { + if bucket.closed { return ErrBucketClosed } diff --git a/bucket_api.go b/bucket_api.go index 7c2f3a1..70cacb7 100644 --- a/bucket_api.go +++ b/bucket_api.go @@ -17,13 +17,6 @@ import ( sgbucket "github.com/couchbase/sg-bucket" ) -type bucketClosedCheck bool - -const ( - checkBucketClosed bucketClosedCheck = true - skipCheckBucketClosed bucketClosedCheck = false -) - func (bucket *Bucket) String() string { return fmt.Sprintf("B#%d", bucket.serial) } @@ -180,14 +173,9 @@ func (bucket *Bucket) DropDataStore(name sgbucket.DataStoreName) error { // ListDataStores returns a list of the names of all data stores in the bucket. func (bucket *Bucket) ListDataStores() (result []sgbucket.DataStoreName, err error) { - return bucket.listDataStores(bucket.db()) -} - -// listDataStores returns a list of the names of all data stores in the bucket, given a specific db handle. -func (bucket *Bucket) listDataStores(db queryable) (result []sgbucket.DataStoreName, err error) { - traceEnter("listDataStores", "%s", bucket) - defer func() { traceExit("listDataStores", err, "%v", result) }() - rows, err := db.Query(`SELECT id, scope, name FROM collections ORDER BY id`) + traceEnter("ListDataStores", "%s", bucket) + defer func() { traceExit("ListDataStores", err, "%v", result) }() + rows, err := bucket.db().Query(`SELECT id, scope, name FROM collections ORDER BY id`) if err != nil { return nil, err } @@ -332,7 +320,7 @@ func (bucket *Bucket) nextExpiration() (exp Exp, err error) { // expireDocuments immediately deletes all expired documents in this bucket. func (bucket *Bucket) expireDocuments() (int64, error) { - names, err := bucket.listDataStores(bucket._underlyingDB()) + names, err := bucket.ListDataStores() if err != nil { return 0, err } @@ -379,7 +367,7 @@ func (bucket *Bucket) PurgeTombstones() (count int64, err error) { count, err = result.RowsAffected() } return err - }, true) + }) traceExit("PurgeTombstones", err, "%d", count) return } diff --git a/bucket_registry.go b/bucket_registry.go index 32cae80..36dd9b1 100644 --- a/bucket_registry.go +++ b/bucket_registry.go @@ -43,8 +43,7 @@ func (r *bucketRegistry) registerBucket(bucket *Bucket) { defer r.lock.Unlock() _, ok := r.buckets[name] if !ok { - b := bucket.copy() - r.buckets[name] = b + r.buckets[name] = bucket } r.bucketCount[name] += 1 } diff --git a/collection.go b/collection.go index 93c62b9..3375233 100644 --- a/collection.go +++ b/collection.go @@ -360,7 +360,7 @@ func (c *Collection) WriteCas(key string, flags int, exp Exp, cas CAS, val any, // Remove creates a document tombstone. It removes the document's value and user xattrs. func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) { traceEnter("Remove", "%q, 0x%x", key, cas) - casOut, err = c.remove(key, &cas, checkBucketClosed) + casOut, err = c.remove(key, &cas) traceExit("Remove", err, "0x%x", casOut) return } @@ -368,15 +368,14 @@ func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) { // Delete creates a document tombstone. It removes the document's value and user xattrs. Equivalent to Remove without a CAS check. func (c *Collection) Delete(key string) (err error) { traceEnter("Delete", "%q", key) - _, err = c.remove(key, nil, checkBucketClosed) + _, err = c.remove(key, nil) traceExit("Delete", err, "ok") return err } // remove creates a document tombstone. It removes the document's value and user xattrs. checkClosed will allow removing the document even the bucket instance is "closed". -func (c *Collection) remove(key string, ifCas *CAS, checkClosed bucketClosedCheck) (casOut CAS, err error) { - fmt.Println("remove", checkClosed) - err = c.withNewCasAndBucketClosedCheck(func(txn *sql.Tx, newCas CAS) (e *event, err error) { +func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) { + err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (e *event, err error) { // Get the doc, possibly checking cas: var cas CAS var rawXattrs []byte @@ -420,7 +419,7 @@ func (c *Collection) remove(key string, ifCas *CAS, checkClosed bucketClosedChec } casOut = newCas return - }, checkClosed) + }) return } @@ -529,7 +528,7 @@ func (c *Collection) expireDocuments() (count int64, err error) { // First find all the expired docs and collect their keys: exp := nowAsExpiry() - rows, err := c.bucket._underlyingDB().Query(`SELECT key FROM documents + rows, err := c.db().Query(`SELECT key FROM documents WHERE collection = ?1 AND exp > 0 AND exp <= ?2`, c.id, exp) if err != nil { return @@ -550,10 +549,8 @@ func (c *Collection) expireDocuments() (count int64, err error) { // will get its own db connection, and if the db only supports one connection (i.e. in-memory) // having both queries active would deadlock.) for _, key := range keys { - _, err = c.remove(key, nil, skipCheckBucketClosed) - if err == nil { + if c.Delete(key) == nil { count++ - } else { } } return @@ -587,11 +584,6 @@ func (c *Collection) setLastCas(txn *sql.Tx, cas CAS) (err error) { // Runs a function within a SQLite transaction, passing it a new CAS to assign to the // document being modified. The function returns an event to be posted. func (c *Collection) withNewCas(fn func(txn *sql.Tx, newCas CAS) (*event, error)) error { - return c.withNewCasAndBucketClosedCheck(fn, checkBucketClosed) -} - -// withNewCasAndBucketClosedCheck runs a function within a SQLite transaction like withNewCas. This allows the caller to bypass the bucket closed status, suitable for functions that need to run on the underlying bucket object. -func (c *Collection) withNewCasAndBucketClosedCheck(fn func(txn *sql.Tx, newCas CAS) (*event, error), checkClosed bucketClosedCheck) error { var e *event err := c.bucket.inTransaction(func(txn *sql.Tx) error { newCas, err := c.bucket.getLastCas(txn) @@ -603,7 +595,7 @@ func (c *Collection) withNewCasAndBucketClosedCheck(fn func(txn *sql.Tx, newCas } } return err - }, checkClosed) + }) if err == nil && e != nil { c.postNewEvent(e) } diff --git a/designdoc.go b/designdoc.go index 7914ac6..eca21e8 100644 --- a/designdoc.go +++ b/designdoc.go @@ -103,7 +103,7 @@ func (c *Collection) PutDDoc(_ context.Context, designDoc string, ddoc *sgbucket } c.forgetCachedViews(designDoc) return nil - }, checkBucketClosed) + }) traceExit("PutDDoc", err, "ok") return err } @@ -121,7 +121,7 @@ func (c *Collection) DeleteDDoc(designDoc string) error { } } return err - }, checkBucketClosed) + }) traceExit("DeleteDDoc", err, "ok") return err } diff --git a/views.go b/views.go index f18a2e6..3e779ab 100644 --- a/views.go +++ b/views.go @@ -306,7 +306,7 @@ func (c *Collection) updateView(ctx context.Context, designDoc string, viewName view.lastCas = latestCas } return err - }, checkBucketClosed) + }) return view, err } From 5539ee519d4d1b9493b09086444850ad3cb0da90 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 17 Nov 2023 17:09:23 -0500 Subject: [PATCH 3/7] Switch to expiryManager in standardization of terminology --- bucket.go | 12 +++++----- expiration_manager.go => expiry_manager.go | 26 +++++++++++----------- 2 files changed, 19 insertions(+), 19 deletions(-) rename expiration_manager.go => expiry_manager.go (77%) diff --git a/bucket.go b/bucket.go index 9125430..98bf11b 100644 --- a/bucket.go +++ b/bucket.go @@ -36,12 +36,12 @@ type Bucket struct { name string // Bucket name collections collectionsMap // Collections, indexed by DataStoreName collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed - mutex *sync.Mutex // mutex for synchronized access to Bucket - sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) - expManager *expirationManager // expiration manager for bucket - serial uint32 // Serial number for logging - inMemory bool // True if it's an in-memory database - closed bool // represents state when it is closed + mutex *sync.Mutex // mutex for synchronized access to Bucket + sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) + expManager *expiryManager // expiration manager for bucket + serial uint32 // Serial number for logging + inMemory bool // True if it's an in-memory database + closed bool // represents state when it is closed } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection diff --git a/expiration_manager.go b/expiry_manager.go similarity index 77% rename from expiration_manager.go rename to expiry_manager.go index 32bb18f..f6d9138 100644 --- a/expiration_manager.go +++ b/expiry_manager.go @@ -13,17 +13,17 @@ import ( "time" ) -// expirationManager handles expiration for a given bucket. It stores a timer which will call expirationFunc to delete documents. The value of when the timer -type expirationManager struct { - mutex *sync.Mutex // mutex for synchronized access to expirationManager +// expiryManager handles expiration for a given bucket. It stores a timer which will call expirationFunc to delete documents. The value of when the timer +type expiryManager struct { + mutex *sync.Mutex // mutex for synchronized access to expiryManager timer *time.Timer // Schedules expiration of docs nextExp *uint32 // Timestamp when expTimer will run (0 if never) expirationFunc func() // Function to call when timer expires } -func newExpirationManager(expiractionFunc func()) *expirationManager { +func newExpirationManager(expiractionFunc func()) *expiryManager { var nextExp uint32 - return &expirationManager{ + return &expiryManager{ mutex: &sync.Mutex{}, nextExp: &nextExp, expirationFunc: expiractionFunc, @@ -31,7 +31,7 @@ func newExpirationManager(expiractionFunc func()) *expirationManager { } // stop stops existing timers and waits for any expiration processes to complete -func (e *expirationManager) stop() { +func (e *expiryManager) stop() { e.mutex.Lock() defer e.mutex.Unlock() if e.timer != nil { @@ -40,25 +40,25 @@ func (e *expirationManager) stop() { } // _getNext returns the next expiration time, 0 if there is no scheduled expiration. -func (e *expirationManager) _getNext() uint32 { +func (e *expiryManager) _getNext() uint32 { return *e.nextExp } // setNext sets the next expiration time and schedules an expiration to occur after that time. -func (e *expirationManager) setNext(exp uint32) { +func (e *expiryManager) setNext(exp uint32) { e.mutex.Lock() defer e.mutex.Unlock() e._setNext(exp) } // _clearNext clears the next expiration time. -func (e *expirationManager) _clearNext() { +func (e *expiryManager) _clearNext() { var exp uint32 e.nextExp = &exp } // setNext sets the next expiration time and schedules an expiration to occur after that time. Requires caller to have acquired mutex. -func (e *expirationManager) _setNext(exp uint32) { +func (e *expiryManager) _setNext(exp uint32) { info("_setNext ", exp) e.nextExp = &exp if exp == 0 { @@ -78,14 +78,14 @@ func (e *expirationManager) _setNext(exp uint32) { } // scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. -func (e *expirationManager) scheduleExpirationAtOrBefore(exp uint32) { +func (e *expiryManager) scheduleExpirationAtOrBefore(exp uint32) { e.mutex.Lock() defer e.mutex.Unlock() e._scheduleExpirationAtOrBefore(exp) } // _scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. Requires the mutext to be held. -func (e *expirationManager) _scheduleExpirationAtOrBefore(exp uint32) { +func (e *expiryManager) _scheduleExpirationAtOrBefore(exp uint32) { if exp == 0 { return } @@ -97,7 +97,7 @@ func (e *expirationManager) _scheduleExpirationAtOrBefore(exp uint32) { } // runExpiry is called when the timer expires. It calls the expirationFunc and then reschedules the timer if necessary. -func (e *expirationManager) runExpiry() { +func (e *expiryManager) runExpiry() { e.mutex.Lock() defer e.mutex.Unlock() e.expirationFunc() From 509051072b87a42784296fe7f3b3c895e2d9639b Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 17 Nov 2023 17:11:52 -0500 Subject: [PATCH 4/7] Skip slow test --- bucket_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/bucket_test.go b/bucket_test.go index 0a70129..f7844be 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -404,6 +404,7 @@ func TestExpiration(t *testing.T) { } func TestExpirationAfterClose(t *testing.T) { + t.Skip("Slow test useful for debugging issues with expiration") bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), CreateNew) ctx := testCtx(t) defer func() { From d5e2377e565cf59733923b2ae54dc9a946a7fb0b Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Fri, 17 Nov 2023 18:09:15 -0500 Subject: [PATCH 5/7] Address race conditions --- expiry_manager.go | 3 +++ feeds.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/expiry_manager.go b/expiry_manager.go index f6d9138..3aad700 100644 --- a/expiry_manager.go +++ b/expiry_manager.go @@ -79,6 +79,9 @@ func (e *expiryManager) _setNext(exp uint32) { // scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. func (e *expiryManager) scheduleExpirationAtOrBefore(exp uint32) { + if exp == 0 { + return + } e.mutex.Lock() defer e.mutex.Unlock() e._scheduleExpirationAtOrBefore(exp) diff --git a/feeds.go b/feeds.go index 5c52947..0684dd7 100644 --- a/feeds.go +++ b/feeds.go @@ -168,7 +168,7 @@ func (c *Collection) postNewEvent(e *event) { feedEvent := e.asFeedEvent() c.postEvent(feedEvent) - c.bucket.expManager._scheduleExpirationAtOrBefore(e.exp) + c.bucket.expManager.scheduleExpirationAtOrBefore(e.exp) /* // Tell collections of other buckets on the same db file to post the event too: From 515e69fe5c2fa774db89ef7c637105e5c448e563 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Mon, 20 Nov 2023 17:47:45 -0500 Subject: [PATCH 6/7] make sure there is no race in opening a bucket close a duplicate bucket if two callers are using OpenBucket at the same time --- bucket.go | 25 +++++++++++++++++++------ bucket_api.go | 6 +++--- bucket_registry.go | 26 +++++++++++++++++++++----- 3 files changed, 43 insertions(+), 14 deletions(-) diff --git a/bucket.go b/bucket.go index 98bf11b..800156d 100644 --- a/bucket.go +++ b/bucket.go @@ -86,6 +86,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err traceEnter("OpenBucket", "%q, %d", urlStr, mode) defer func() { traceExit("OpenBucket", err, "ok") }() + ctx := context.TODO() u, err := encodeDBURL(urlStr) if err != nil { return nil, err @@ -94,13 +95,18 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err bucket := getCachedBucket(bucketName) if bucket != nil { + defer func() { + if err != nil { + bucket.Close(ctx) + } + }() + if mode == CreateNew { return nil, fs.ErrExist } if urlStr != bucket.url { return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr) } - registerBucket(bucket) return bucket, nil } @@ -179,7 +185,7 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err bucket.expManager = newExpirationManager(bucket.doExpiration) defer func() { if err != nil { - _ = bucket.CloseAndDelete(context.TODO()) + _ = bucket.CloseAndDelete(ctx) } }() @@ -193,16 +199,23 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err if err = bucket.initializeSchema(bucketName); err != nil { return nil, err } - } else { - bucket.scheduleExpiration() } err = bucket.setName(bucketName) if err != nil { return nil, err } - registerBucket(bucket) - return bucket.copy(), err + exists, bucketCopy := registerBucket(bucket) + // someone else beat registered the bucket in the registry, that's OK we'll close ours + if exists { + bucket.Close(ctx) + } + // only schedule expiration if bucket is not new. This doesn't need to be locked because only one bucket will execute this code. + if vers != 0 { + bucket._scheduleExpiration() + } + + return bucketCopy, err } // Creates or re-opens a bucket, like OpenBucket. diff --git a/bucket_api.go b/bucket_api.go index 70cacb7..4aec92a 100644 --- a/bucket_api.go +++ b/bucket_api.go @@ -337,8 +337,8 @@ func (bucket *Bucket) expireDocuments() (int64, error) { return count, nil } -// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. -func (bucket *Bucket) scheduleExpiration() { +// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. This requires locking expiration manager. +func (bucket *Bucket) _scheduleExpiration() { if nextExp, err := bucket.nextExpiration(); err == nil && nextExp > 0 { bucket.expManager._scheduleExpirationAtOrBefore(nextExp) } @@ -355,7 +355,7 @@ func (bucket *Bucket) doExpiration() { info("Bucket %s expired %d docs", bucket, n) } - bucket.scheduleExpiration() + bucket._scheduleExpiration() } // Completely removes all deleted documents (tombstones). diff --git a/bucket_registry.go b/bucket_registry.go index 36dd9b1..561207a 100644 --- a/bucket_registry.go +++ b/bucket_registry.go @@ -35,17 +35,26 @@ func init() { } } -// registryBucket adds a newly opened Bucket to the registry. -func (r *bucketRegistry) registerBucket(bucket *Bucket) { +// registryBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. +func (r *bucketRegistry) registerBucket(bucket *Bucket) (bool, *Bucket) { name := bucket.GetName() debug("registerBucket %v %s at %s", bucket, name, bucket.url) r.lock.Lock() defer r.lock.Unlock() + return r._registerBucket(bucket) +} + +// _registryBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. +func (r *bucketRegistry) _registerBucket(bucket *Bucket) (bool, *Bucket) { + name := bucket.GetName() + debug("_registerBucket %v %s at %s", bucket, name, bucket.url) + _, ok := r.buckets[name] if !ok { r.buckets[name] = bucket } r.bucketCount[name] += 1 + return ok, r.buckets[name].copy() } // getCachedBucket returns a bucket from the registry if it exists. @@ -56,7 +65,9 @@ func (r *bucketRegistry) getCachedBucket(name string) *Bucket { if bucket == nil { return nil } - return bucket.copy() + // return a copy of the bucket + _, bucket = r._registerBucket(bucket) + return bucket } // unregisterBucket removes a Bucket from the registry. Must be called before closing. @@ -115,8 +126,13 @@ func getCachedBucket(name string) *Bucket { return cluster.getCachedBucket(name) } -// registryBucket adds a newly opened Bucket to the registry. -func registerBucket(bucket *Bucket) { +// registryBucket adds a copy of a Bucket to the registry. Returns true if the bucket already exists. +func registerBucket(bucket *Bucket) (bool, *Bucket) { + return cluster.registerBucket(bucket) +} + +// registryNewBucket adds a newly opened Bucket to the registry. +func registerNewBucket(bucket *Bucket) { cluster.registerBucket(bucket) } From 30107c54fe7d7f1173146d78efc8750083173d80 Mon Sep 17 00:00:00 2001 From: Tor Colvin Date: Tue, 21 Nov 2023 15:14:08 -0500 Subject: [PATCH 7/7] Move more bucket registration and error handling into getCachedBucket --- bucket.go | 18 ++++-------------- bucket_registry.go | 40 ++++++++++++++++++---------------------- 2 files changed, 22 insertions(+), 36 deletions(-) diff --git a/bucket.go b/bucket.go index 800156d..2e5d90b 100644 --- a/bucket.go +++ b/bucket.go @@ -93,22 +93,12 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err } urlStr = u.String() - bucket := getCachedBucket(bucketName) + bucket, err := getCachedBucket(bucketName, urlStr, mode) + if err != nil { + return nil, err + } if bucket != nil { - defer func() { - if err != nil { - bucket.Close(ctx) - } - }() - - if mode == CreateNew { - return nil, fs.ErrExist - } - if urlStr != bucket.url { - return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr) - } return bucket, nil - } query := u.Query() diff --git a/bucket_registry.go b/bucket_registry.go index 561207a..d6ed324 100644 --- a/bucket_registry.go +++ b/bucket_registry.go @@ -10,6 +10,8 @@ package rosmar import ( "context" + "fmt" + "io/fs" "sync" ) @@ -35,19 +37,12 @@ func init() { } } -// registryBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. +// registerBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. func (r *bucketRegistry) registerBucket(bucket *Bucket) (bool, *Bucket) { name := bucket.GetName() - debug("registerBucket %v %s at %s", bucket, name, bucket.url) + debug("_registerBucket %v %s at %s", bucket, name, bucket.url) r.lock.Lock() defer r.lock.Unlock() - return r._registerBucket(bucket) -} - -// _registryBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. -func (r *bucketRegistry) _registerBucket(bucket *Bucket) (bool, *Bucket) { - name := bucket.GetName() - debug("_registerBucket %v %s at %s", bucket, name, bucket.url) _, ok := r.buckets[name] if !ok { @@ -58,16 +53,22 @@ func (r *bucketRegistry) _registerBucket(bucket *Bucket) (bool, *Bucket) { } // getCachedBucket returns a bucket from the registry if it exists. -func (r *bucketRegistry) getCachedBucket(name string) *Bucket { +func (r *bucketRegistry) getCachedBucket(name, url string, mode OpenMode) (*Bucket, error) { r.lock.Lock() defer r.lock.Unlock() bucket := r.buckets[name] if bucket == nil { - return nil + return nil, nil + } + if mode == CreateNew { + return nil, fs.ErrExist } - // return a copy of the bucket - _, bucket = r._registerBucket(bucket) - return bucket + if url != bucket.url { + return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", name, bucket.url, url) + } + + r.bucketCount[name] += 1 + return r.buckets[name].copy(), nil } // unregisterBucket removes a Bucket from the registry. Must be called before closing. @@ -121,9 +122,9 @@ func (r *bucketRegistry) getBucketNames() []string { return names } -// getCachedBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called. -func getCachedBucket(name string) *Bucket { - return cluster.getCachedBucket(name) +// getCachedBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called. Returns an error if the bucket can not be opened, but nil error and nil bucket if there is no registered bucket. +func getCachedBucket(name, url string, mode OpenMode) (*Bucket, error) { + return cluster.getCachedBucket(name, url, mode) } // registryBucket adds a copy of a Bucket to the registry. Returns true if the bucket already exists. @@ -131,11 +132,6 @@ func registerBucket(bucket *Bucket) (bool, *Bucket) { return cluster.registerBucket(bucket) } -// registryNewBucket adds a newly opened Bucket to the registry. -func registerNewBucket(bucket *Bucket) { - cluster.registerBucket(bucket) -} - // unregisterBucket removes a Bucket from the registry. Must be called before closing. func unregisterBucket(bucket *Bucket) { cluster.unregisterBucket(bucket)