diff --git a/rest/admin_api.go b/rest/admin_api.go index 3f6ce61b34..723439aa50 100644 --- a/rest/admin_api.go +++ b/rest/admin_api.go @@ -577,7 +577,7 @@ func (h *handler) handlePutDbConfig() (err error) { if err := updatedDbConfig.setup(h.ctx(), dbName, h.server.Config.Bootstrap, dbCreds, nil, false); err != nil { return err } - if err := h.server.ReloadDatabaseWithConfig(contextNoCancel, *updatedDbConfig, false); err != nil { + if err := h.server.ReloadDatabaseWithConfig(contextNoCancel, *updatedDbConfig); err != nil { return err } return base.HTTPErrorf(http.StatusCreated, "updated") @@ -635,7 +635,7 @@ func (h *handler) handlePutDbConfig() (err error) { } // Load the new dbConfig before we persist the update. - err = h.server.ReloadDatabaseWithConfig(contextNoCancel, tmpConfig, true) + err = h.server.ReloadDatabaseWithConfig(contextNoCancel, tmpConfig) if err != nil { return nil, err } @@ -746,7 +746,7 @@ func (h *handler) handleDeleteCollectionConfigSync() error { defer h.server.lock.Unlock() // TODO: Dynamic update instead of reload - if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { + if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil { return err } @@ -810,7 +810,7 @@ func (h *handler) handlePutCollectionConfigSync() error { defer h.server.lock.Unlock() // TODO: Dynamic update instead of reload - if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { + if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil { return err } @@ -905,7 +905,7 @@ func (h *handler) handleDeleteCollectionConfigImportFilter() error { defer h.server.lock.Unlock() // TODO: Dynamic update instead of reload - if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { + if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil { return err } @@ -970,7 +970,7 @@ func (h *handler) handlePutCollectionConfigImportFilter() error { defer h.server.lock.Unlock() // TODO: Dynamic update instead of reload - if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { + if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil { return err } diff --git a/rest/config.go b/rest/config.go index fa221f1e92..8e1767a898 100644 --- a/rest/config.go +++ b/rest/config.go @@ -1900,7 +1900,7 @@ func (sc *ServerContext) _applyConfig(nonContextStruct base.NonCancellableContex } // TODO: Dynamic update instead of reload - if err := sc._reloadDatabaseWithConfig(ctx, cnf, failFast, false); err != nil { + if err := sc._reloadDatabaseWithConfig(ctx, cnf, failFast); err != nil { // remove these entries we just created above if the database hasn't loaded properly return false, fmt.Errorf("couldn't reload database: %w", err) } diff --git a/rest/handler_config_database.go b/rest/handler_config_database.go index 9f46691808..d803fc0b84 100644 --- a/rest/handler_config_database.go +++ b/rest/handler_config_database.go @@ -84,7 +84,7 @@ func (h *handler) mutateDbConfig(mutator func(*DbConfig) error) error { defer h.server.lock.Unlock() // TODO: Dynamic update instead of reload - if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false, false); err != nil { + if err := h.server._reloadDatabaseWithConfig(h.ctx(), *updatedDbConfig, false); err != nil { return err } h.setEtag(updatedDbConfig.Version) diff --git a/rest/indextest/index_test.go b/rest/indextest/index_test.go index 8a0f44788f..98c76e1102 100644 --- a/rest/indextest/index_test.go +++ b/rest/indextest/index_test.go @@ -285,10 +285,9 @@ func TestAsyncInitWithResync(t *testing.T) { resp = rest.BootstrapAdminRequest(t, http.MethodDelete, "/"+dbName+"/", "") resp.RequireStatus(http.StatusOK) - rest.DropAllTestIndexes(t, tb) + rest.DropAllTestIndexesIncludingPrimary(t, tb) // Set testing callbacks for async initialization - collectionCount := int64(0) initStarted := make(chan error) unblockInit := make(chan error) @@ -658,9 +657,240 @@ func TestSyncOnline(t *testing.T) { } +// TestAsyncInitConfigUpdates verifies that a database with in-progress async +// index initialization can accept all config updates from the local node. +// (prior to CBG-4008, operations would block waiting for async init to complete) +func TestAsyncInitConfigUpdates(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("This test only works against Couchbase Server") + } + base.TestRequiresCollections(t) + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP) + + sc, closeFn := rest.StartBootstrapServer(t) + defer closeFn() + + // Set testing callbacks for async initialization + collectionCount := int64(0) + initStarted := make(chan error) + unblockInit := make(chan error) + collectionCompleteCallback := func(dbName, collectionName string) { + count := atomic.AddInt64(&collectionCount, 1) + // On first collection, close initStarted channel + log.Printf("collection callback count: %v", count) + if count == 1 { + log.Printf("closing initStarted") + close(initStarted) + } + rest.WaitForChannel(t, unblockInit, "waiting for test to unblock initialization") + } + sc.DatabaseInitManager.SetCallbacks(collectionCompleteCallback, nil) + + ctx := base.TestCtx(t) + // Get a test bucket, and use it to create the database. + tb := base.GetTestBucket(t) + defer tb.Close(ctx) + + importFilter := "function(doc) { return true }" + syncFunc := "function(doc){ channel(doc.channels); }" + + dbConfig := makeDbConfig(t, tb, syncFunc, importFilter) + dbConfig.StartOffline = base.BoolPtr(true) + dbConfigPayload, err := json.Marshal(dbConfig) + require.NoError(t, err) + dbName := "db" + + keyspace := dbName + if len(dbConfig.Scopes) > 0 { + keyspaces := getRESTKeyspaces(dbName, dbConfig.Scopes) + keyspace = keyspaces[0] + } + + // Create database with offline=true + resp := rest.BootstrapAdminRequest(t, http.MethodPut, "/"+dbName+"/", string(dbConfigPayload)) + resp.RequireStatus(http.StatusCreated) + + // Wait for init to start before interacting with the db, validate db state is offline + rest.WaitForChannel(t, initStarted, "waiting for initialization to start") + log.Printf("initialization started") + waitAndRequireDBState(t, dbName, db.DBOffline) + + // Set up payloads for upserting db state + onlineConfigUpsert := rest.DbConfig{ + StartOffline: base.BoolPtr(false), + } + dbOnlineConfigPayload, err := json.Marshal(onlineConfigUpsert) + require.NoError(t, err) + + // Take the database online while async init is still in progress, verify state goes to Starting + resp = rest.BootstrapAdminRequest(t, http.MethodPost, "/"+dbName+"/_config", string(dbOnlineConfigPayload)) + resp.RequireStatus(http.StatusCreated) + waitAndRequireDBState(t, dbName, db.DBStarting) + + // Attempt to update import filter while in starting mode + importFilter = "function(doc){ return false; }" + resp = rest.BootstrapAdminRequest(t, http.MethodPut, "/"+keyspace+"/_config/import_filter", importFilter) + resp.RequireStatus(http.StatusOK) + + resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "") + resp.RequireResponse(http.StatusOK, importFilter) + + // Attempt to delete import filter while in starting mode + resp = rest.BootstrapAdminRequest(t, http.MethodDelete, "/"+keyspace+"/_config/import_filter", "") + resp.RequireStatus(http.StatusOK) + + resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "") + resp.RequireResponse(http.StatusOK, "") + + // Attempt to update sync function while in starting mode + syncFunc = "function(doc){ channel(doc.type); }" + resp = rest.BootstrapAdminRequest(t, http.MethodPut, "/"+keyspace+"/_config/sync", syncFunc) + resp.RequireStatus(http.StatusOK) + + resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/sync", "") + resp.RequireResponse(http.StatusOK, syncFunc) + + // Attempt to delete sync function while in starting mode + resp = rest.BootstrapAdminRequest(t, http.MethodDelete, "/"+keyspace+"/_config/sync", "") + resp.RequireStatus(http.StatusOK) + + resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/sync", "") + resp.RequireResponse(http.StatusOK, "") + + // Take the database back online while async init is still in progress, verify state goes to Starting + resp = rest.BootstrapAdminRequest(t, http.MethodPost, "/"+dbName+"/_config", string(dbOnlineConfigPayload)) + resp.RequireStatus(http.StatusCreated) + waitAndRequireDBState(t, dbName, db.DBStarting) + + // Unblock initialization, verify status goes to Online + close(unblockInit) + waitAndRequireDBState(t, dbName, db.DBOnline) + +} + +// TestAsyncInitRemoteConfigUpdates verifies that a database with in-progress async +// index initialization can accept config updates made on other nodes (arriving via polling) +// (prior to CBG-4008, operations would block waiting for async init to complete) +func TestAsyncInitRemoteConfigUpdates(t *testing.T) { + if base.UnitTestUrlIsWalrus() { + t.Skip("This test only works against Couchbase Server") + } + base.TestRequiresCollections(t) + base.SetUpTestLogging(t, base.LevelDebug, base.KeyHTTP) + + // enable config polling to allow testing of cross-node updates + bootstrapConfig := rest.BootstrapStartupConfigForTest(t) + bootstrapConfig.Bootstrap.ConfigUpdateFrequency = base.NewConfigDuration(1 * time.Second) + sc, closeFn := rest.StartServerWithConfig(t, &bootstrapConfig) + defer closeFn() + + // Set testing callbacks for async initialization + collectionCount := int64(0) + initStarted := make(chan error) + unblockInit := make(chan error) + collectionCompleteCallback := func(dbName, collectionName string) { + count := atomic.AddInt64(&collectionCount, 1) + // On first collection, close initStarted channel + log.Printf("collection callback count: %v", count) + if count == 1 { + log.Printf("closing initStarted") + close(initStarted) + } + rest.WaitForChannel(t, unblockInit, "waiting for test to unblock initialization") + } + sc.DatabaseInitManager.SetCallbacks(collectionCompleteCallback, nil) + + ctx := base.TestCtx(t) + // Get a test bucket, and use it to create the database. + tb := base.GetTestBucket(t) + defer tb.Close(ctx) + + importFilter := "function(doc) { return true }" + syncFunc := "function(doc){ channel(doc.channels); }" + + dbName := "db" + dbConfig := makeDbConfig(t, tb, syncFunc, importFilter) + dbConfig.Name = dbName + dbConfig.StartOffline = base.BoolPtr(true) + + keyspace := dbName + if len(dbConfig.Scopes) > 0 { + keyspaces := getRESTKeyspaces(dbName, dbConfig.Scopes) + keyspace = keyspaces[0] + } + + bucketName := tb.GetName() + groupID := sc.Config.Bootstrap.ConfigGroupID + + // Simulate creation of the database with offline=true on another node + version, err := rest.GenerateDatabaseConfigVersionID(ctx, "", &dbConfig) + require.NoError(t, err) + metadataID, err := sc.BootstrapContext.ComputeMetadataIDForDbConfig(ctx, &dbConfig) + require.NoError(t, err) + + databaseConfig := dbConfig.ToDatabaseConfig() + databaseConfig.Version = version + databaseConfig.MetadataID = metadataID + + _, err = sc.BootstrapContext.InsertConfig(ctx, bucketName, groupID, databaseConfig) + require.NoError(t, err) + + // Wait for init to start before interacting with the db, validate db state is offline + rest.WaitForChannel(t, initStarted, "waiting for initialization to start") + log.Printf("initialization started") + waitAndRequireDBState(t, dbName, db.DBOffline) + + log.Printf("keyspace: %v", keyspace) + + // Update the bucket config to bring the database online + _, err = sc.BootstrapContext.UpdateConfig(ctx, bucketName, groupID, dbName, func(bucketDbConfig *rest.DatabaseConfig) (updatedConfig *rest.DatabaseConfig, err error) { + bucketDbConfig.StartOffline = base.BoolPtr(false) + return bucketDbConfig, nil + }) + require.NoError(t, err) + waitAndRequireDBState(t, dbName, db.DBStarting) + + // Attempt to update import filter while in starting mode + importFilter = "function(doc){ return false; }" + resp := rest.BootstrapAdminRequest(t, http.MethodPut, "/"+keyspace+"/_config/import_filter", importFilter) + resp.RequireStatus(http.StatusOK) + + resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "") + resp.RequireResponse(http.StatusOK, importFilter) + + // Update the db config from a remote node, verify change is picked up while in starting state + _, err = sc.BootstrapContext.UpdateConfig(ctx, bucketName, groupID, dbName, func(bucketDbConfig *rest.DatabaseConfig) (updatedConfig *rest.DatabaseConfig, err error) { + _, scopeName, collectionName, err := rest.ParseKeyspace(keyspace) + require.NoError(t, err) + if scopeName == nil || collectionName == nil { + bucketDbConfig.ImportFilter = nil + } else { + bucketDbConfig.Scopes[*scopeName].Collections[*collectionName].ImportFilter = nil + } + return bucketDbConfig, nil + }) + require.NoError(t, err) + + // Need a wait loop here to wait for config polling to pick up the change + err = rest.WaitForConditionWithOptions(ctx, func() bool { + resp = rest.BootstrapAdminRequest(t, http.MethodGet, "/"+keyspace+"/_config/import_filter", "") + if resp.StatusCode == http.StatusOK && resp.Body == "" { + return true + } else { + log.Printf("Waiting for OK and empty filter, current status: %v, filter: %q", resp.StatusCode, resp.Body) + } + return (resp.StatusCode == http.StatusOK) && resp.Body == "" + }, 200, 100) + require.NoError(t, err) + + // Unblock initialization, verify status goes to Online + close(unblockInit) + waitAndRequireDBState(t, dbName, db.DBOnline) +} + func makeDbConfig(t *testing.T, tb *base.TestBucket, syncFunction string, importFilter string) rest.DbConfig { - scopesConfig := rest.GetCollectionsConfig(t, tb, 3) + scopesConfig := rest.GetCollectionsConfig(t, tb, 1) for scopeName, scope := range scopesConfig { for collectionName, _ := range scope.Collections { collectionConfig := &rest.CollectionConfig{} @@ -684,6 +914,7 @@ func makeDbConfig(t *testing.T, tb *base.TestBucket, syncFunction string, import NumIndexReplicas: &numIndexReplicas, EnableXattrs: &enableXattrs, Scopes: scopesConfig, + AutoImport: false, // disable import to streamline index tests and avoid teardown races } return dbConfig } diff --git a/rest/server_context.go b/rest/server_context.go index 41d7aac37d..79e9513894 100644 --- a/rest/server_context.go +++ b/rest/server_context.go @@ -421,14 +421,16 @@ func (sc *ServerContext) ReloadDatabase(ctx context.Context, reloadDbName string return dbContext, err } -func (sc *ServerContext) ReloadDatabaseWithConfig(nonContextStruct base.NonCancellableContext, config DatabaseConfig, asyncOnline bool) error { +func (sc *ServerContext) ReloadDatabaseWithConfig(nonContextStruct base.NonCancellableContext, config DatabaseConfig) error { sc.lock.Lock() defer sc.lock.Unlock() - return sc._reloadDatabaseWithConfig(nonContextStruct.Ctx, config, true, asyncOnline) + return sc._reloadDatabaseWithConfig(nonContextStruct.Ctx, config, true) } -func (sc *ServerContext) _reloadDatabaseWithConfig(ctx context.Context, config DatabaseConfig, failFast bool, asyncOnline bool) error { +func (sc *ServerContext) _reloadDatabaseWithConfig(ctx context.Context, config DatabaseConfig, failFast bool) error { sc._removeDatabase(ctx, config.Name) + // use async initialization whenever using persistent config + asyncOnline := sc.persistentConfig _, err := sc._getOrAddDatabaseFromConfig(ctx, config, getOrAddDatabaseConfigOptions{ useExisting: false, failFast: failFast, diff --git a/rest/utilities_testing.go b/rest/utilities_testing.go index 796725aeed..c813289257 100644 --- a/rest/utilities_testing.go +++ b/rest/utilities_testing.go @@ -813,6 +813,10 @@ func (rt *RestTester) WaitForCondition(successFunc func() bool) error { } func (rt *RestTester) WaitForConditionWithOptions(successFunc func() bool, maxNumAttempts, timeToSleepMs int) error { + return WaitForConditionWithOptions(rt.Context(), successFunc, maxNumAttempts, timeToSleepMs) +} + +func WaitForConditionWithOptions(ctx context.Context, successFunc func() bool, maxNumAttempts, timeToSleepMs int) error { waitForSuccess := func() (shouldRetry bool, err error, value interface{}) { if successFunc() { return false, nil, nil @@ -821,7 +825,7 @@ func (rt *RestTester) WaitForConditionWithOptions(successFunc func() bool, maxNu } sleeper := base.CreateSleeperFunc(maxNumAttempts, timeToSleepMs) - err, _ := base.RetryLoop(rt.Context(), "Wait for condition options", waitForSuccess, sleeper) + err, _ := base.RetryLoop(ctx, "Wait for condition options", waitForSuccess, sleeper) if err != nil { return err } @@ -2541,6 +2545,25 @@ func DropAllTestIndexes(t *testing.T, tb *base.TestBucket) { } } +func DropAllTestIndexesIncludingPrimary(t *testing.T, tb *base.TestBucket) { + + ctx := base.TestCtx(t) + n1qlStore, ok := base.AsN1QLStore(tb.GetMetadataStore()) + require.True(t, ok) + dropErr := base.DropAllIndexes(ctx, n1qlStore) + require.NoError(t, dropErr) + + dsNames := tb.GetNonDefaultDatastoreNames() + for i := 0; i < len(dsNames); i++ { + ds, err := tb.GetNamedDataStore(i) + require.NoError(t, err) + n1qlStore, ok := base.AsN1QLStore(ds) + require.True(t, ok) + dropErr := base.DropAllIndexes(ctx, n1qlStore) + require.NoError(t, dropErr) + } +} + func (sc *ServerContext) RequireInvalidDatabaseConfigNames(t *testing.T, expectedDbNames []string) { sc.invalidDatabaseConfigTracking.m.RLock() defer sc.invalidDatabaseConfigTracking.m.RUnlock()