diff --git a/bucket.go b/bucket.go index 151cf79..2e5d90b 100644 --- a/bucket.go +++ b/bucket.go @@ -36,13 +36,12 @@ type Bucket struct { name string // Bucket name collections collectionsMap // Collections, indexed by DataStoreName collectionFeeds map[sgbucket.DataStoreNameImpl][]*dcpFeed - mutex *sync.Mutex // mutex for synchronized access to Bucket - sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) - expTimer *time.Timer // Schedules expiration of docs - nextExp *uint32 // Timestamp when expTimer will run (0 if never) - serial uint32 // Serial number for logging - inMemory bool // True if it's an in-memory database - closed bool // represents state when it is closed + mutex *sync.Mutex // mutex for synchronized access to Bucket + sqliteDB *sql.DB // SQLite database handle (do not access; call db() instead) + expManager *expiryManager // expiration manager for bucket + serial uint32 // Serial number for logging + inMemory bool // True if it's an in-memory database + closed bool // represents state when it is closed } type collectionsMap = map[sgbucket.DataStoreNameImpl]*Collection @@ -87,23 +86,19 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err traceEnter("OpenBucket", "%q, %d", urlStr, mode) defer func() { traceExit("OpenBucket", err, "ok") }() + ctx := context.TODO() u, err := encodeDBURL(urlStr) if err != nil { return nil, err } urlStr = u.String() - bucket := getCachedBucket(bucketName) + bucket, err := getCachedBucket(bucketName, urlStr, mode) + if err != nil { + return nil, err + } if bucket != nil { - if mode == CreateNew { - return nil, fs.ErrExist - } - if urlStr != bucket.url { - return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", bucketName, bucket.url, urlStr) - } - registerBucket(bucket) return bucket, nil - } query := u.Query() @@ -174,13 +169,13 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err collections: make(map[sgbucket.DataStoreNameImpl]*Collection), collectionFeeds: make(map[sgbucket.DataStoreNameImpl][]*dcpFeed), mutex: &sync.Mutex{}, - nextExp: new(uint32), inMemory: inMemory, serial: serial, } + bucket.expManager = newExpirationManager(bucket.doExpiration) defer func() { if err != nil { - _ = bucket.CloseAndDelete(context.TODO()) + _ = bucket.CloseAndDelete(ctx) } }() @@ -194,16 +189,23 @@ func OpenBucket(urlStr string, bucketName string, mode OpenMode) (b *Bucket, err if err = bucket.initializeSchema(bucketName); err != nil { return nil, err } - } else { - bucket.scheduleExpiration() } err = bucket.setName(bucketName) if err != nil { return nil, err } - registerBucket(bucket) - return bucket, err + exists, bucketCopy := registerBucket(bucket) + // someone else beat registered the bucket in the registry, that's OK we'll close ours + if exists { + bucket.Close(ctx) + } + // only schedule expiration if bucket is not new. This doesn't need to be locked because only one bucket will execute this code. + if vers != 0 { + bucket._scheduleExpiration() + } + + return bucketCopy, err } // Creates or re-opens a bucket, like OpenBucket. @@ -307,7 +309,7 @@ func (bucket *Bucket) initializeSchema(bucketName string) (err error) { return } -// Returns the database handle as a `queryable` interface value. +// db returns the database handle as a `queryable` interface value. // If the bucket has been closed, it returns a special `closedDB` value that will return // ErrBucketClosed from any call. func (bucket *Bucket) db() queryable { @@ -316,7 +318,7 @@ func (bucket *Bucket) db() queryable { return bucket._db() } -// Returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. +// db returns the database handle as a `queryable` interface value. This is the same as `db()` without locking. This is not safe to call without bucket.mutex being locked the caller. func (bucket *Bucket) _db() queryable { if bucket.closed { return closedDB{} @@ -380,8 +382,7 @@ func (b *Bucket) copy() *Bucket { collections: make(collectionsMap), mutex: b.mutex, sqliteDB: b.sqliteDB, - expTimer: b.expTimer, - nextExp: b.nextExp, + expManager: b.expManager, serial: b.serial, inMemory: b.inMemory, } diff --git a/bucket_api.go b/bucket_api.go index 3a26d5a..4aec92a 100644 --- a/bucket_api.go +++ b/bucket_api.go @@ -13,7 +13,6 @@ import ( "database/sql" "errors" "fmt" - "time" sgbucket "github.com/couchbase/sg-bucket" ) @@ -62,9 +61,7 @@ func (bucket *Bucket) Close(_ context.Context) { // _closeSqliteDB closes the underlying sqlite database and shuts down dcpFeeds. Must have a lock to call this function. func (bucket *Bucket) _closeSqliteDB() { - if bucket.expTimer != nil { - bucket.expTimer.Stop() - } + bucket.expManager.stop() for _, c := range bucket.collections { c.close() } @@ -174,6 +171,7 @@ func (bucket *Bucket) DropDataStore(name sgbucket.DataStoreName) error { return bucket.dropCollection(sc) } +// ListDataStores returns a list of the names of all data stores in the bucket. func (bucket *Bucket) ListDataStores() (result []sgbucket.DataStoreName, err error) { traceEnter("ListDataStores", "%s", bucket) defer func() { traceExit("ListDataStores", err, "%v", result) }() @@ -309,8 +307,8 @@ func (bucket *Bucket) dropCollection(name sgbucket.DataStoreNameImpl) error { //////// EXPIRATION (CUSTOM API): -// Returns the earliest expiration time of any document, or 0 if none. -func (bucket *Bucket) NextExpiration() (exp Exp, err error) { +// nextExpiration returns the earliest expiration time of any document, or 0 if none. +func (bucket *Bucket) nextExpiration() (exp Exp, err error) { var expVal sql.NullInt64 row := bucket.db().QueryRow(`SELECT min(exp) FROM documents WHERE exp > 0`) err = scan(row, &expVal) @@ -320,8 +318,8 @@ func (bucket *Bucket) NextExpiration() (exp Exp, err error) { return } -// Immediately deletes all expired documents in this bucket. -func (bucket *Bucket) ExpireDocuments() (int64, error) { +// expireDocuments immediately deletes all expired documents in this bucket. +func (bucket *Bucket) expireDocuments() (int64, error) { names, err := bucket.ListDataStores() if err != nil { return 0, err @@ -330,7 +328,7 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) { for _, name := range names { if coll, err := bucket.getCollection(name.(sgbucket.DataStoreNameImpl)); err != nil { return 0, err - } else if n, err := coll.ExpireDocuments(); err != nil { + } else if n, err := coll.expireDocuments(); err != nil { return 0, err } else { count += n @@ -339,45 +337,25 @@ func (bucket *Bucket) ExpireDocuments() (int64, error) { return count, nil } -func (bucket *Bucket) scheduleExpiration() { - if nextExp, err := bucket.NextExpiration(); err == nil && nextExp > 0 { - bucket.scheduleExpirationAtOrBefore(nextExp) - } -} - -func (bucket *Bucket) scheduleExpirationAtOrBefore(exp uint32) { - if exp > 0 { - bucket.mutex.Lock() - defer bucket.mutex.Unlock() - if exp < *bucket.nextExp || *bucket.nextExp == 0 { - bucket.nextExp = &exp - dur := expDuration(exp) - if dur < 0 { - dur = 0 - } - debug("EXP: Scheduling in %s", dur) - if bucket.expTimer == nil { - bucket.expTimer = time.AfterFunc(dur, bucket.doExpiration) - } else { - bucket.expTimer.Reset(dur) - } - } +// scheduleExpiration schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. This requires locking expiration manager. +func (bucket *Bucket) _scheduleExpiration() { + if nextExp, err := bucket.nextExpiration(); err == nil && nextExp > 0 { + bucket.expManager._scheduleExpirationAtOrBefore(nextExp) } } func (bucket *Bucket) doExpiration() { - bucket.mutex.Lock() - bucket.nextExp = func(x uint32) *uint32 { return &x }(0) - bucket.mutex.Unlock() + bucket.expManager._clearNext() debug("EXP: Running scheduled expiration...") - if n, err := bucket.ExpireDocuments(); err != nil { - logError("Bucket %s error expiring docs: %v", bucket, err) + if n, err := bucket.expireDocuments(); err != nil { + // If there's an error expiring docs, it means there is a programming error of a leaked expiration goroutine. + panic("Error expiring docs: " + err.Error()) } else if n > 0 { info("Bucket %s expired %d docs", bucket, n) } - bucket.scheduleExpiration() + bucket._scheduleExpiration() } // Completely removes all deleted documents (tombstones). diff --git a/bucket_registry.go b/bucket_registry.go index 32cae80..d6ed324 100644 --- a/bucket_registry.go +++ b/bucket_registry.go @@ -10,6 +10,8 @@ package rosmar import ( "context" + "fmt" + "io/fs" "sync" ) @@ -35,29 +37,38 @@ func init() { } } -// registryBucket adds a newly opened Bucket to the registry. -func (r *bucketRegistry) registerBucket(bucket *Bucket) { +// registerBucket adds a newly opened Bucket to the registry. Returns true if the bucket already exists, and a copy of the bucket to use. +func (r *bucketRegistry) registerBucket(bucket *Bucket) (bool, *Bucket) { name := bucket.GetName() - debug("registerBucket %v %s at %s", bucket, name, bucket.url) + debug("_registerBucket %v %s at %s", bucket, name, bucket.url) r.lock.Lock() defer r.lock.Unlock() + _, ok := r.buckets[name] if !ok { - b := bucket.copy() - r.buckets[name] = b + r.buckets[name] = bucket } r.bucketCount[name] += 1 + return ok, r.buckets[name].copy() } // getCachedBucket returns a bucket from the registry if it exists. -func (r *bucketRegistry) getCachedBucket(name string) *Bucket { +func (r *bucketRegistry) getCachedBucket(name, url string, mode OpenMode) (*Bucket, error) { r.lock.Lock() defer r.lock.Unlock() bucket := r.buckets[name] if bucket == nil { - return nil + return nil, nil + } + if mode == CreateNew { + return nil, fs.ErrExist + } + if url != bucket.url { + return nil, fmt.Errorf("bucket %q already exists at %q, will not open at %q", name, bucket.url, url) } - return bucket.copy() + + r.bucketCount[name] += 1 + return r.buckets[name].copy(), nil } // unregisterBucket removes a Bucket from the registry. Must be called before closing. @@ -111,14 +122,14 @@ func (r *bucketRegistry) getBucketNames() []string { return names } -// getCachedBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called. -func getCachedBucket(name string) *Bucket { - return cluster.getCachedBucket(name) +// getCachedBucket returns an instance of a bucket. If there are other copies of this bucket already in memory, it will return this version. If this is an in memory bucket, the bucket will not be removed until deleteBucket is called. Returns an error if the bucket can not be opened, but nil error and nil bucket if there is no registered bucket. +func getCachedBucket(name, url string, mode OpenMode) (*Bucket, error) { + return cluster.getCachedBucket(name, url, mode) } -// registryBucket adds a newly opened Bucket to the registry. -func registerBucket(bucket *Bucket) { - cluster.registerBucket(bucket) +// registryBucket adds a copy of a Bucket to the registry. Returns true if the bucket already exists. +func registerBucket(bucket *Bucket) (bool, *Bucket) { + return cluster.registerBucket(bucket) } // unregisterBucket removes a Bucket from the registry. Must be called before closing. diff --git a/bucket_test.go b/bucket_test.go index 4d53e3e..f7844be 100644 --- a/bucket_test.go +++ b/bucket_test.go @@ -355,7 +355,7 @@ func TestExpiration(t *testing.T) { bucket := makeTestBucket(t) c := bucket.DefaultDataStore() - exp, err := bucket.NextExpiration() + exp, err := bucket.nextExpiration() require.NoError(t, err) require.Equal(t, Exp(0), exp) @@ -367,7 +367,7 @@ func TestExpiration(t *testing.T) { requireAddRaw(t, c, "k3", 0, []byte("v3")) requireAddRaw(t, c, "k4", exp4, []byte("v4")) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) // Usually this will return exp2, but if this is slow enough that the expiration goroutine runs to expire document k2, it can return exp4. require.Contains(t, []Exp{exp2, exp4}, exp) @@ -375,9 +375,9 @@ func TestExpiration(t *testing.T) { log.Printf("... waiting 1 sec ...") time.Sleep(1 * time.Second) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) - assert.Equal(t, exp4, exp) + require.Equal(t, int(exp4), int(exp)) _, _, err = c.GetRaw("k1") assert.NoError(t, err) @@ -391,7 +391,7 @@ func TestExpiration(t *testing.T) { log.Printf("... waiting 2 secs ...") time.Sleep(2 * time.Second) - exp, err = bucket.NextExpiration() + exp, err = bucket.nextExpiration() require.NoError(t, err) assert.Equal(t, uint32(0), exp) @@ -403,6 +403,24 @@ func TestExpiration(t *testing.T) { assert.Equal(t, int64(2), n) } +func TestExpirationAfterClose(t *testing.T) { + t.Skip("Slow test useful for debugging issues with expiration") + bucket, err := OpenBucket(InMemoryURL, strings.ToLower(t.Name()), CreateNew) + ctx := testCtx(t) + defer func() { + assert.NoError(t, bucket.CloseAndDelete(ctx)) + }() + require.NoError(t, err) + c := bucket.DefaultDataStore() + + // set expiry long enough that Close will happen first + exp := Exp(time.Now().Add(1 * time.Second).Unix()) + requireAddRaw(t, c, "docID", exp, []byte("v1")) + bucket.Close(ctx) + // sleep to ensure we won't panic + time.Sleep(2 * time.Second) +} + func TestUriFromPathWindows(t *testing.T) { ensureNoLeaks(t) if runtime.GOOS != "windows" { diff --git a/collection.go b/collection.go index a2a776b..3375233 100644 --- a/collection.go +++ b/collection.go @@ -357,6 +357,7 @@ func (c *Collection) WriteCas(key string, flags int, exp Exp, cas CAS, val any, return } +// Remove creates a document tombstone. It removes the document's value and user xattrs. func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) { traceEnter("Remove", "%q, 0x%x", key, cas) casOut, err = c.remove(key, &cas) @@ -364,6 +365,7 @@ func (c *Collection) Remove(key string, cas CAS) (casOut CAS, err error) { return } +// Delete creates a document tombstone. It removes the document's value and user xattrs. Equivalent to Remove without a CAS check. func (c *Collection) Delete(key string) (err error) { traceEnter("Delete", "%q", key) _, err = c.remove(key, nil) @@ -371,6 +373,7 @@ func (c *Collection) Delete(key string) (err error) { return err } +// remove creates a document tombstone. It removes the document's value and user xattrs. checkClosed will allow removing the document even the bucket instance is "closed". func (c *Collection) remove(key string, ifCas *CAS) (casOut CAS, err error) { err = c.withNewCas(func(txn *sql.Tx, newCas CAS) (e *event, err error) { // Get the doc, possibly checking cas: @@ -518,10 +521,10 @@ func (c *Collection) IsSupported(feature sgbucket.BucketStoreFeature) bool { //////// EXPIRATION -// Immediately deletes all expired documents in this collection. -func (c *Collection) ExpireDocuments() (count int64, err error) { - traceEnter("ExpireDocuments", "") - defer func() { traceExit("ExpireDocuments", err, "%d", count) }() +// _expireDocuments immediately deletes all expired documents in this collection. +func (c *Collection) expireDocuments() (count int64, err error) { + traceEnter("_expireDocuments", "") + defer func() { traceExit("_expireDocuments", err, "%d", count) }() // First find all the expired docs and collect their keys: exp := nowAsExpiry() diff --git a/expiry_manager.go b/expiry_manager.go new file mode 100644 index 0000000..3aad700 --- /dev/null +++ b/expiry_manager.go @@ -0,0 +1,107 @@ +// Copyright 2023-Present Couchbase, Inc. +// +// Use of this software is governed by the Business Source License included +// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified +// in that file, in accordance with the Business Source License, use of this +// software will be governed by the Apache License, Version 2.0, included in +// the file licenses/APL2.txt. + +package rosmar + +import ( + "sync" + "time" +) + +// expiryManager handles expiration for a given bucket. It stores a timer which will call expirationFunc to delete documents. The value of when the timer +type expiryManager struct { + mutex *sync.Mutex // mutex for synchronized access to expiryManager + timer *time.Timer // Schedules expiration of docs + nextExp *uint32 // Timestamp when expTimer will run (0 if never) + expirationFunc func() // Function to call when timer expires +} + +func newExpirationManager(expiractionFunc func()) *expiryManager { + var nextExp uint32 + return &expiryManager{ + mutex: &sync.Mutex{}, + nextExp: &nextExp, + expirationFunc: expiractionFunc, + } +} + +// stop stops existing timers and waits for any expiration processes to complete +func (e *expiryManager) stop() { + e.mutex.Lock() + defer e.mutex.Unlock() + if e.timer != nil { + e.timer.Stop() + } +} + +// _getNext returns the next expiration time, 0 if there is no scheduled expiration. +func (e *expiryManager) _getNext() uint32 { + return *e.nextExp +} + +// setNext sets the next expiration time and schedules an expiration to occur after that time. +func (e *expiryManager) setNext(exp uint32) { + e.mutex.Lock() + defer e.mutex.Unlock() + e._setNext(exp) +} + +// _clearNext clears the next expiration time. +func (e *expiryManager) _clearNext() { + var exp uint32 + e.nextExp = &exp +} + +// setNext sets the next expiration time and schedules an expiration to occur after that time. Requires caller to have acquired mutex. +func (e *expiryManager) _setNext(exp uint32) { + info("_setNext ", exp) + e.nextExp = &exp + if exp == 0 { + e.timer = nil + return + } + dur := expDuration(exp) + if dur < 0 { + dur = 0 + } + debug("EXP: Scheduling in %s", dur) + if e.timer == nil { + e.timer = time.AfterFunc(dur, e.runExpiry) + } else { + e.timer.Reset(dur) + } +} + +// scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. +func (e *expiryManager) scheduleExpirationAtOrBefore(exp uint32) { + if exp == 0 { + return + } + e.mutex.Lock() + defer e.mutex.Unlock() + e._scheduleExpirationAtOrBefore(exp) +} + +// _scheduleExpirationAtOrBefore schedules the next expiration of documents to occur, from the minimum expiration value in the bucket. Requires the mutext to be held. +func (e *expiryManager) _scheduleExpirationAtOrBefore(exp uint32) { + if exp == 0 { + return + } + currentNextExp := e._getNext() + // if zero will unset the timer. + if currentNextExp == 0 || exp < currentNextExp { + e._setNext(exp) + } +} + +// runExpiry is called when the timer expires. It calls the expirationFunc and then reschedules the timer if necessary. +func (e *expiryManager) runExpiry() { + e.mutex.Lock() + defer e.mutex.Unlock() + e.expirationFunc() +} diff --git a/feeds.go b/feeds.go index d5799f4..0684dd7 100644 --- a/feeds.go +++ b/feeds.go @@ -168,7 +168,7 @@ func (c *Collection) postNewEvent(e *event) { feedEvent := e.asFeedEvent() c.postEvent(feedEvent) - c.bucket.scheduleExpirationAtOrBefore(e.exp) + c.bucket.expManager.scheduleExpirationAtOrBefore(e.exp) /* // Tell collections of other buckets on the same db file to post the event too: