Skip to content

Commit

Permalink
CBG-4009 Support all config updates during async initialization (#6915)
Browse files Browse the repository at this point in the history
Backport to 3.1.8.
  • Loading branch information
adamcfraser authored Jun 24, 2024
1 parent 758f494 commit 66e5b09
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 15 deletions.
12 changes: 6 additions & 6 deletions rest/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (h *handler) handlePutDbConfig() (err error) {
if err := updatedDbConfig.setup(h.ctx(), dbName, h.server.Config.Bootstrap, dbCreds, nil, false); err != nil {
return err
}
if err := h.server.ReloadDatabaseWithConfig(contextNoCancel, *updatedDbConfig, false); err != nil {
if err := h.server.ReloadDatabaseWithConfig(contextNoCancel, *updatedDbConfig); err != nil {
return err
}
return base.HTTPErrorf(http.StatusCreated, "updated")
Expand Down Expand Up @@ -635,7 +635,7 @@ func (h *handler) handlePutDbConfig() (err error) {
}

// Load the new dbConfig before we persist the update.
err = h.server.ReloadDatabaseWithConfig(contextNoCancel, tmpConfig, true)
err = h.server.ReloadDatabaseWithConfig(contextNoCancel, tmpConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -746,7 +746,7 @@ func (h *handler) handleDeleteCollectionConfigSync() error {
defer h.server.lock.Unlock()

// TODO: Dynamic update instead of reload
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil {
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil {
return err
}

Expand Down Expand Up @@ -810,7 +810,7 @@ func (h *handler) handlePutCollectionConfigSync() error {
defer h.server.lock.Unlock()

// TODO: Dynamic update instead of reload
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil {
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil {
return err
}

Expand Down Expand Up @@ -905,7 +905,7 @@ func (h *handler) handleDeleteCollectionConfigImportFilter() error {
defer h.server.lock.Unlock()

// TODO: Dynamic update instead of reload
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil {
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil {
return err
}

Expand Down Expand Up @@ -970,7 +970,7 @@ func (h *handler) handlePutCollectionConfigImportFilter() error {
defer h.server.lock.Unlock()

// TODO: Dynamic update instead of reload
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil {
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion rest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,7 +1900,7 @@ func (sc *ServerContext) _applyConfig(nonContextStruct base.NonCancellableContex
}

// TODO: Dynamic update instead of reload
if err := sc._reloadDatabaseWithConfig(ctx, cnf, failFast, false); err != nil {
if err := sc._reloadDatabaseWithConfig(ctx, cnf, failFast); err != nil {
// remove these entries we just created above if the database hasn't loaded properly
return false, fmt.Errorf("couldn't reload database: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion rest/handler_config_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (h *handler) mutateDbConfig(mutator func(*DbConfig) error) error {
defer h.server.lock.Unlock()

// TODO: Dynamic update instead of reload
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil {
if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil {
return err
}
h.setEtag(updatedDbConfig.Version)
Expand Down
237 changes: 234 additions & 3 deletions rest/indextest/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,9 @@ func TestAsyncInitWithResync(t *testing.T) {
resp = rest.BootstrapAdminRequest(t, http.MethodDelete, "/"+dbName+"/", "")
resp.RequireStatus(http.StatusOK)

rest.DropAllTestIndexes(t, tb)
rest.DropAllTestIndexesIncludingPrimary(t, tb)

// Set testing callbacks for async initialization

collectionCount := int64(0)
initStarted := make(chan error)
unblockInit := make(chan error)
Expand Down Expand Up @@ -658,9 +657,240 @@ func TestSyncOnline(t *testing.T) {

}

// TestAsyncInitConfigUpdates verifies that a database with in-progress async
// index initialization can accept all config updates from the local node.
// (prior to CBG-4008, operations would block waiting for async init to complete)
func TestAsyncInitConfigUpdates(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server")
}
base.TestRequiresCollections(t)
base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP)

sc, closeFn := rest.StartBootstrapServer(t)
defer closeFn()

// Set testing callbacks for async initialization
collectionCount := int64(0)
initStarted := make(chan error)
unblockInit := make(chan error)
collectionCompleteCallback := func(dbName, collectionName string) {
count := atomic.AddInt64(&collectionCount, 1)
// On first collection, close initStarted channel
log.Printf("collection callback count: %v", count)
if count == 1 {
log.Printf("closing initStarted")
close(initStarted)
}
rest.WaitForChannel(t, unblockInit, "waiting for test to unblock initialization")
}
sc.DatabaseInitManager.SetCallbacks(collectionCompleteCallback, nil)

ctx := base.TestCtx(t)
// Get a test bucket, and use it to create the database.
tb := base.GetTestBucket(t)
defer tb.Close(ctx)

importFilter := "function(doc) { return true }"
syncFunc := "function(doc){ channel(doc.channels); }"

dbConfig := makeDbConfig(t, tb, syncFunc, importFilter)
dbConfig.StartOffline = base.BoolPtr(true)
dbConfigPayload, err := json.Marshal(dbConfig)
require.NoError(t, err)
dbName := "db"

keyspace := dbName
if len(dbConfig.Scopes) > 0 {
keyspaces := getRESTKeyspaces(dbName, dbConfig.Scopes)
keyspace = keyspaces[0]
}

// Create database with offline=true
resp := rest.BootstrapAdminRequest(t, http.MethodPut, "/"+dbName+"/", string(dbConfigPayload))
resp.RequireStatus(http.StatusCreated)

// Wait for init to start before interacting with the db, validate db state is offline
rest.WaitForChannel(t, initStarted, "waiting for initialization to start")
log.Printf("initialization started")
waitAndRequireDBState(t, dbName, db.DBOffline)

// Set up payloads for upserting db state
onlineConfigUpsert := rest.DbConfig{
StartOffline: base.BoolPtr(false),
}
dbOnlineConfigPayload, err := json.Marshal(onlineConfigUpsert)
require.NoError(t, err)

// Take the database online while async init is still in progress, verify state goes to Starting
resp = rest.BootstrapAdminRequest(t, http.MethodPost, "/"+dbName+"/_config", string(dbOnlineConfigPayload))
resp.RequireStatus(http.StatusCreated)
waitAndRequireDBState(t, dbName, db.DBStarting)

// Attempt to update import filter while in starting mode
importFilter = "function(doc){ return false; }"
resp = rest.BootstrapAdminRequest(t, http.MethodPut, "/"+keyspace+"/_config/import_filter", importFilter)
resp.RequireStatus(http.StatusOK)

resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "")
resp.RequireResponse(http.StatusOK, importFilter)

// Attempt to delete import filter while in starting mode
resp = rest.BootstrapAdminRequest(t, http.MethodDelete, "/"+keyspace+"/_config/import_filter", "")
resp.RequireStatus(http.StatusOK)

resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "")
resp.RequireResponse(http.StatusOK, "")

// Attempt to update sync function while in starting mode
syncFunc = "function(doc){ channel(doc.type); }"
resp = rest.BootstrapAdminRequest(t, http.MethodPut, "/"+keyspace+"/_config/sync", syncFunc)
resp.RequireStatus(http.StatusOK)

resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/sync", "")
resp.RequireResponse(http.StatusOK, syncFunc)

// Attempt to delete sync function while in starting mode
resp = rest.BootstrapAdminRequest(t, http.MethodDelete, "/"+keyspace+"/_config/sync", "")
resp.RequireStatus(http.StatusOK)

resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/sync", "")
resp.RequireResponse(http.StatusOK, "")

// Take the database back online while async init is still in progress, verify state goes to Starting
resp = rest.BootstrapAdminRequest(t, http.MethodPost, "/"+dbName+"/_config", string(dbOnlineConfigPayload))
resp.RequireStatus(http.StatusCreated)
waitAndRequireDBState(t, dbName, db.DBStarting)

// Unblock initialization, verify status goes to Online
close(unblockInit)
waitAndRequireDBState(t, dbName, db.DBOnline)

}

// TestAsyncInitRemoteConfigUpdates verifies that a database with in-progress async
// index initialization can accept config updates made on other nodes (arriving via polling)
// (prior to CBG-4008, operations would block waiting for async init to complete)
func TestAsyncInitRemoteConfigUpdates(t *testing.T) {
if base.UnitTestUrlIsWalrus() {
t.Skip("This test only works against Couchbase Server")
}
base.TestRequiresCollections(t)
base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP)

// enable config polling to allow testing of cross-node updates
bootstrapConfig := rest.BootstrapStartupConfigForTest(t)
bootstrapConfig.Bootstrap.ConfigUpdateFrequency = base.NewConfigDuration(1 * time.Second)
sc, closeFn := rest.StartServerWithConfig(t, &bootstrapConfig)
defer closeFn()

// Set testing callbacks for async initialization
collectionCount := int64(0)
initStarted := make(chan error)
unblockInit := make(chan error)
collectionCompleteCallback := func(dbName, collectionName string) {
count := atomic.AddInt64(&collectionCount, 1)
// On first collection, close initStarted channel
log.Printf("collection callback count: %v", count)
if count == 1 {
log.Printf("closing initStarted")
close(initStarted)
}
rest.WaitForChannel(t, unblockInit, "waiting for test to unblock initialization")
}
sc.DatabaseInitManager.SetCallbacks(collectionCompleteCallback, nil)

ctx := base.TestCtx(t)
// Get a test bucket, and use it to create the database.
tb := base.GetTestBucket(t)
defer tb.Close(ctx)

importFilter := "function(doc) { return true }"
syncFunc := "function(doc){ channel(doc.channels); }"

dbName := "db"
dbConfig := makeDbConfig(t, tb, syncFunc, importFilter)
dbConfig.Name = dbName
dbConfig.StartOffline = base.BoolPtr(true)

keyspace := dbName
if len(dbConfig.Scopes) > 0 {
keyspaces := getRESTKeyspaces(dbName, dbConfig.Scopes)
keyspace = keyspaces[0]
}

bucketName := tb.GetName()
groupID := sc.Config.Bootstrap.ConfigGroupID

// Simulate creation of the database with offline=true on another node
version, err := rest.GenerateDatabaseConfigVersionID(ctx, "", &dbConfig)
require.NoError(t, err)
metadataID, err := sc.BootstrapContext.ComputeMetadataIDForDbConfig(ctx, &dbConfig)
require.NoError(t, err)

databaseConfig := dbConfig.ToDatabaseConfig()
databaseConfig.Version = version
databaseConfig.MetadataID = metadataID

_, err = sc.BootstrapContext.InsertConfig(ctx, bucketName, groupID, databaseConfig)
require.NoError(t, err)

// Wait for init to start before interacting with the db, validate db state is offline
rest.WaitForChannel(t, initStarted, "waiting for initialization to start")
log.Printf("initialization started")
waitAndRequireDBState(t, dbName, db.DBOffline)

log.Printf("keyspace: %v", keyspace)

// Update the bucket config to bring the database online
_, err = sc.BootstrapContext.UpdateConfig(ctx, bucketName, groupID, dbName, func(bucketDbConfig *rest.DatabaseConfig) (updatedConfig *rest.DatabaseConfig, err error) {
bucketDbConfig.StartOffline = base.BoolPtr(false)
return bucketDbConfig, nil
})
require.NoError(t, err)
waitAndRequireDBState(t, dbName, db.DBStarting)

// Attempt to update import filter while in starting mode
importFilter = "function(doc){ return false; }"
resp := rest.BootstrapAdminRequest(t, http.MethodPut, "/"+keyspace+"/_config/import_filter", importFilter)
resp.RequireStatus(http.StatusOK)

resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "")
resp.RequireResponse(http.StatusOK, importFilter)

// Update the db config from a remote node, verify change is picked up while in starting state
_, err = sc.BootstrapContext.UpdateConfig(ctx, bucketName, groupID, dbName, func(bucketDbConfig *rest.DatabaseConfig) (updatedConfig *rest.DatabaseConfig, err error) {
_, scopeName, collectionName, err := rest.ParseKeyspace(keyspace)
require.NoError(t, err)
if scopeName == nil || collectionName == nil {
bucketDbConfig.ImportFilter = nil
} else {
bucketDbConfig.Scopes[*scopeName].Collections[*collectionName].ImportFilter = nil
}
return bucketDbConfig, nil
})
require.NoError(t, err)

// Need a wait loop here to wait for config polling to pick up the change
err = rest.WaitForConditionWithOptions(ctx, func() bool {
resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "")
if resp.StatusCode == http.StatusOK && resp.Body == "" {
return true
} else {
log.Printf("Waiting for OK and empty filter, current status: %v, filter: %q", resp.StatusCode, resp.Body)
}
return (resp.StatusCode == http.StatusOK) && resp.Body == ""
}, 200, 100)
require.NoError(t, err)

// Unblock initialization, verify status goes to Online
close(unblockInit)
waitAndRequireDBState(t, dbName, db.DBOnline)
}

func makeDbConfig(t *testing.T, tb *base.TestBucket, syncFunction string, importFilter string) rest.DbConfig {

scopesConfig := rest.GetCollectionsConfig(t, tb, 3)
scopesConfig := rest.GetCollectionsConfig(t, tb, 1)
for scopeName, scope := range scopesConfig {
for collectionName, _ := range scope.Collections {
collectionConfig := &rest.CollectionConfig{}
Expand All @@ -684,6 +914,7 @@ func makeDbConfig(t *testing.T, tb *base.TestBucket, syncFunction string, import
NumIndexReplicas: &numIndexReplicas,
EnableXattrs: &enableXattrs,
Scopes: scopesConfig,
AutoImport: false, // disable import to streamline index tests and avoid teardown races
}
return dbConfig
}
Expand Down
8 changes: 5 additions & 3 deletions rest/server_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,14 +421,16 @@ func (sc *ServerContext) ReloadDatabase(ctx context.Context, reloadDbName string
return dbContext, err
}

func (sc *ServerContext) ReloadDatabaseWithConfig(nonContextStruct base.NonCancellableContext, config DatabaseConfig, asyncOnline bool) error {
func (sc *ServerContext) ReloadDatabaseWithConfig(nonContextStruct base.NonCancellableContext, config DatabaseConfig) error {
sc.lock.Lock()
defer sc.lock.Unlock()
return sc._reloadDatabaseWithConfig(nonContextStruct.Ctx, config, true, asyncOnline)
return sc._reloadDatabaseWithConfig(nonContextStruct.Ctx, config, true)
}

func (sc *ServerContext) _reloadDatabaseWithConfig(ctx context.Context, config DatabaseConfig, failFast bool, asyncOnline bool) error {
func (sc *ServerContext) _reloadDatabaseWithConfig(ctx context.Context, config DatabaseConfig, failFast bool) error {
sc._removeDatabase(ctx, config.Name)
// use async initialization whenever using persistent config
asyncOnline := sc.persistentConfig
_, err := sc._getOrAddDatabaseFromConfig(ctx, config, getOrAddDatabaseConfigOptions{
useExisting: false,
failFast: failFast,
Expand Down
25 changes: 24 additions & 1 deletion rest/utilities_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,10 @@ func (rt *RestTester) WaitForCondition(successFunc func() bool) error {
}

func (rt *RestTester) WaitForConditionWithOptions(successFunc func() bool, maxNumAttempts, timeToSleepMs int) error {
return WaitForConditionWithOptions(rt.Context(), successFunc, maxNumAttempts, timeToSleepMs)
}

func WaitForConditionWithOptions(ctx context.Context, successFunc func() bool, maxNumAttempts, timeToSleepMs int) error {
waitForSuccess := func() (shouldRetry bool, err error, value interface{}) {
if successFunc() {
return false, nil, nil
Expand All @@ -821,7 +825,7 @@ func (rt *RestTester) WaitForConditionWithOptions(successFunc func() bool, maxNu
}

sleeper := base.CreateSleeperFunc(maxNumAttempts, timeToSleepMs)
err, _ := base.RetryLoop(rt.Context(), "Wait for condition options", waitForSuccess, sleeper)
err, _ := base.RetryLoop(ctx, "Wait for condition options", waitForSuccess, sleeper)
if err != nil {
return err
}
Expand Down Expand Up @@ -2541,6 +2545,25 @@ func DropAllTestIndexes(t *testing.T, tb *base.TestBucket) {
}
}

func DropAllTestIndexesIncludingPrimary(t *testing.T, tb *base.TestBucket) {

ctx := base.TestCtx(t)
n1qlStore, ok := base.AsN1QLStore(tb.GetMetadataStore())
require.True(t, ok)
dropErr := base.DropAllIndexes(ctx, n1qlStore)
require.NoError(t, dropErr)

dsNames := tb.GetNonDefaultDatastoreNames()
for i := 0; i < len(dsNames); i++ {
ds, err := tb.GetNamedDataStore(i)
require.NoError(t, err)
n1qlStore, ok := base.AsN1QLStore(ds)
require.True(t, ok)
dropErr := base.DropAllIndexes(ctx, n1qlStore)
require.NoError(t, dropErr)
}
}

func (sc *ServerContext) RequireInvalidDatabaseConfigNames(t *testing.T, expectedDbNames []string) {
sc.invalidDatabaseConfigTracking.m.RLock()
defer sc.invalidDatabaseConfigTracking.m.RUnlock()
Expand Down

0 comments on commit 66e5b09

Please sign in to comment.