Skip to content

Commit

Permalink
(3.1.1 backport) CBG-3108 don't log non SG indexes (#6317) (#6330)
Browse files Browse the repository at this point in the history
- add context based logging to include bucket and collection information
  • Loading branch information
torcolvin authored Jun 30, 2023
1 parent 709bf6c commit d26ad31
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 155 deletions.
126 changes: 71 additions & 55 deletions base/bucket_n1ql_test.go

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions base/cluster_n1ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,24 @@ func (cl *ClusterOnlyN1QLStore) BucketName() string {
return cl.bucketName
}

func (cl *ClusterOnlyN1QLStore) BuildDeferredIndexes(indexSet []string) error {
return BuildDeferredIndexes(cl, indexSet)
func (cl *ClusterOnlyN1QLStore) BuildDeferredIndexes(ctx context.Context, indexSet []string) error {
return BuildDeferredIndexes(ctx, cl, indexSet)
}

func (cl *ClusterOnlyN1QLStore) CreateIndex(indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return CreateIndex(cl, indexName, expression, filterExpression, options)
func (cl *ClusterOnlyN1QLStore) CreateIndex(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return CreateIndex(ctx, cl, indexName, expression, filterExpression, options)
}

func (cl *ClusterOnlyN1QLStore) CreatePrimaryIndex(indexName string, options *N1qlIndexOptions) error {
return CreatePrimaryIndex(cl, indexName, options)
func (cl *ClusterOnlyN1QLStore) CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error {
return CreatePrimaryIndex(ctx, cl, indexName, options)
}

func (cl *ClusterOnlyN1QLStore) ExplainQuery(statement string, params map[string]interface{}) (plan map[string]interface{}, err error) {
return ExplainQuery(cl, statement, params)
}

func (cl *ClusterOnlyN1QLStore) DropIndex(indexName string) error {
return DropIndex(cl, indexName)
func (cl *ClusterOnlyN1QLStore) DropIndex(ctx context.Context, indexName string) error {
return DropIndex(ctx, cl, indexName)
}

// IndexMetaKeyspaceID returns the value of keyspace_id for the system:indexes table for the collection.
Expand Down Expand Up @@ -190,12 +190,12 @@ func (cl *ClusterOnlyN1QLStore) runQuery(statement string, n1qlOptions *gocb.Que
return queryResults, err
}

func (cl *ClusterOnlyN1QLStore) WaitForIndexesOnline(indexNames []string, failfast bool) error {
return WaitForIndexesOnline(cl.cluster, cl.bucketName, cl.scopeName, cl.collectionName, indexNames, failfast)
func (cl *ClusterOnlyN1QLStore) WaitForIndexesOnline(ctx context.Context, indexNames []string, failfast bool) error {
return WaitForIndexesOnline(ctx, cl.cluster, cl.bucketName, cl.scopeName, cl.collectionName, indexNames, failfast)
}

func (cl *ClusterOnlyN1QLStore) GetIndexMeta(indexName string) (exists bool, meta *IndexMeta, err error) {
return GetIndexMeta(cl, indexName)
func (cl *ClusterOnlyN1QLStore) GetIndexMeta(ctx context.Context, indexName string) (exists bool, meta *IndexMeta, err error) {
return GetIndexMeta(ctx, cl, indexName)
}

func (cl *ClusterOnlyN1QLStore) IsErrNoResults(err error) bool {
Expand Down
25 changes: 12 additions & 13 deletions base/collection_n1ql.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,31 +117,30 @@ func (c *Collection) ExplainQuery(statement string, params map[string]interface{
return ExplainQuery(c, statement, params)
}

func (c *Collection) CreateIndex(indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return CreateIndex(c, indexName, expression, filterExpression, options)
func (c *Collection) CreateIndex(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
return CreateIndex(ctx, c, indexName, expression, filterExpression, options)
}

func (c *Collection) CreatePrimaryIndex(indexName string, options *N1qlIndexOptions) error {
return CreatePrimaryIndex(c, indexName, options)
func (c *Collection) CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error {
return CreatePrimaryIndex(ctx, c, indexName, options)
}

// WaitForIndexesOnline takes set of indexes and watches them till they're online.
func (c *Collection) WaitForIndexesOnline(indexNames []string, failfast bool) error {
return WaitForIndexesOnline(c.Bucket.cluster, c.BucketName(), c.ScopeName(), c.CollectionName(), indexNames, failfast)
func (c *Collection) WaitForIndexesOnline(ctx context.Context, indexNames []string, failfast bool) error {
return WaitForIndexesOnline(ctx, c.Bucket.cluster, c.BucketName(), c.ScopeName(), c.CollectionName(), indexNames, failfast)
}

func (c *Collection) GetIndexMeta(indexName string) (exists bool, meta *IndexMeta, err error) {
return GetIndexMeta(c, indexName)
func (c *Collection) GetIndexMeta(ctx context.Context, indexName string) (exists bool, meta *IndexMeta, err error) {
return GetIndexMeta(ctx, c, indexName)
}

// DropIndex drops the specified index from the current bucket.
func (c *Collection) DropIndex(indexName string) error {
return DropIndex(c, indexName)
func (c *Collection) DropIndex(ctx context.Context, indexName string) error {
return DropIndex(ctx, c, indexName)
}

// Issues a build command for any deferred sync gateway indexes associated with the bucket.
func (c *Collection) BuildDeferredIndexes(indexSet []string) error {
return BuildDeferredIndexes(c, indexSet)
func (c *Collection) BuildDeferredIndexes(ctx context.Context, indexSet []string) error {
return BuildDeferredIndexes(ctx, c, indexSet)
}

func (b *GocbV2Bucket) runQuery(statement string, n1qlOptions *gocb.QueryOptions) (*gocb.QueryResult, error) {
Expand Down
78 changes: 42 additions & 36 deletions base/collection_n1ql_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"strings"
"time"

"golang.org/x/exp/slices"

"github.com/couchbase/gocb/v2"
sgbucket "github.com/couchbase/sg-bucket"
pkgerrors "github.com/pkg/errors"
Expand All @@ -36,20 +38,20 @@ const (
// N1QLStore defines the set of operations Sync Gateway uses to manage and interact with N1QL
type N1QLStore interface {
GetName() string
BuildDeferredIndexes(indexSet []string) error
CreateIndex(indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error
CreatePrimaryIndex(indexName string, options *N1qlIndexOptions) error
DropIndex(indexName string) error
BuildDeferredIndexes(ctx context.Context, indexSet []string) error
CreateIndex(ctx context.Context, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error
CreatePrimaryIndex(ctx context.Context, indexName string, options *N1qlIndexOptions) error
DropIndex(ctx context.Context, indexName string) error
ExplainQuery(statement string, params map[string]interface{}) (plan map[string]interface{}, err error)
GetIndexMeta(indexName string) (exists bool, meta *IndexMeta, err error)
GetIndexMeta(ctx context.Context, indexName string) (exists bool, meta *IndexMeta, err error)
Query(statement string, params map[string]interface{}, consistency ConsistencyMode, adhoc bool) (results sgbucket.QueryResultIterator, err error)
IsErrNoResults(error) bool
EscapedKeyspace() string
IndexMetaBucketID() string
IndexMetaScopeID() string
IndexMetaKeyspaceID() string
BucketName() string
WaitForIndexesOnline(indexNames []string, failfast bool) error
WaitForIndexesOnline(ctx context.Context, indexNames []string, failfast bool) error

// executeQuery performs the specified query without any built-in retry handling and returns the resultset
executeQuery(statement string) (sgbucket.QueryResultIterator, error)
Expand Down Expand Up @@ -90,7 +92,7 @@ func ExplainQuery(store N1QLStore, statement string, params map[string]interface
//
// CreateIndex("myIndex", "field1, field2, nested.field", "field1 > 0", N1qlIndexOptions{numReplica:1})
// CREATE INDEX myIndex on myBucket(field1, field2, nested.field) WHERE field1 > 0 WITH {"numReplica":1}
func CreateIndex(store N1QLStore, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
func CreateIndex(ctx context.Context, store N1QLStore, indexName string, expression string, filterExpression string, options *N1qlIndexOptions) error {
createStatement := fmt.Sprintf("CREATE INDEX `%s` ON %s(%s)", indexName, store.EscapedKeyspace(), expression)

// Add filter expression, when present
Expand All @@ -101,7 +103,7 @@ func CreateIndex(store N1QLStore, indexName string, expression string, filterExp
// Replace any KeyspaceQueryToken references in the index expression
createStatement = strings.Replace(createStatement, KeyspaceQueryToken, store.EscapedKeyspace(), -1)

createErr := createIndex(store, indexName, createStatement, options)
createErr := createIndex(ctx, store, indexName, createStatement, options)
if createErr != nil {
if strings.Contains(createErr.Error(), "already exists") || strings.Contains(createErr.Error(), "duplicate index name") {
return ErrAlreadyExists
Expand All @@ -110,12 +112,12 @@ func CreateIndex(store N1QLStore, indexName string, expression string, filterExp
return createErr
}

func CreatePrimaryIndex(store N1QLStore, indexName string, options *N1qlIndexOptions) error {
func CreatePrimaryIndex(ctx context.Context, store N1QLStore, indexName string, options *N1qlIndexOptions) error {
createStatement := fmt.Sprintf("CREATE PRIMARY INDEX `%s` ON %s", indexName, store.EscapedKeyspace())
return createIndex(store, indexName, createStatement, options)
return createIndex(ctx, store, indexName, createStatement, options)
}

func createIndex(store N1QLStore, indexName string, createStatement string, options *N1qlIndexOptions) error {
func createIndex(ctx context.Context, store N1QLStore, indexName string, createStatement string, options *N1qlIndexOptions) error {

if options != nil {
withClause, marshalErr := JSONMarshal(options)
Expand All @@ -125,35 +127,35 @@ func createIndex(store N1QLStore, indexName string, createStatement string, opti
createStatement = fmt.Sprintf(`%s with %s`, createStatement, withClause)
}

DebugfCtx(context.TODO(), KeyQuery, "Attempting to create index using statement: [%s]", UD(createStatement))
DebugfCtx(ctx, KeyQuery, "Attempting to create index using statement: [%s]", UD(createStatement))

err := store.executeStatement(createStatement)
if err == nil {
return nil
}

if IsIndexerRetryIndexError(err) {
InfofCtx(context.TODO(), KeyQuery, "Indexer error creating index - waiting for server background retry. Error:%v", err)
InfofCtx(ctx, KeyQuery, "Indexer error creating index - waiting for server background retry. Error:%v", err)
// Wait for bucket to be created in background before returning
return waitForIndexExistence(store, indexName, true)
return waitForIndexExistence(ctx, store, indexName, true)
}

if IsCreateDuplicateIndexError(err) {
InfofCtx(context.TODO(), KeyQuery, "Duplicate index creation in progress - waiting for index readiness. Error:%v", err)
InfofCtx(ctx, KeyQuery, "Duplicate index creation in progress - waiting for index readiness. Error:%v", err)
// Wait for bucket to be created in background before returning
return waitForIndexExistence(store, indexName, true)
return waitForIndexExistence(ctx, store, indexName, true)
}

return pkgerrors.WithStack(RedactErrorf("Error creating index with statement: %s. Error: %v", UD(createStatement), err))
}

// Waits for index to exist/not exist. Used in response to background create/drop processing by server.
func waitForIndexExistence(store N1QLStore, indexName string, shouldExist bool) error {
func waitForIndexExistence(ctx context.Context, store N1QLStore, indexName string, shouldExist bool) error {

worker := func() (shouldRetry bool, err error, value interface{}) {
// GetIndexMeta has its own error retry handling,
// but keep the retry logic up here for checking if the index exists.
exists, _, err := store.GetIndexMeta(indexName)
exists, _, err := store.GetIndexMeta(ctx, indexName)
if err != nil {
return false, err, nil
}
Expand All @@ -175,7 +177,7 @@ func waitForIndexExistence(store N1QLStore, indexName string, shouldExist bool)
}

// BuildDeferredIndexes issues a build command for any deferred sync gateway indexes associated with the bucket.
func BuildDeferredIndexes(s N1QLStore, indexSet []string) error {
func BuildDeferredIndexes(ctx context.Context, s N1QLStore, indexSet []string) error {

if len(indexSet) == 0 {
return nil
Expand Down Expand Up @@ -218,15 +220,15 @@ func BuildDeferredIndexes(s N1QLStore, indexSet []string) error {
return nil
}

InfofCtx(context.TODO(), KeyQuery, "Building deferred indexes: %v", deferredIndexes)
buildErr := buildIndexes(s, deferredIndexes)
InfofCtx(ctx, KeyQuery, "Building deferred indexes: %v", deferredIndexes)
buildErr := buildIndexes(ctx, s, deferredIndexes)
return buildErr
}

// BuildIndexes executes a BUILD INDEX statement in the current bucket, using the form:
//
// BUILD INDEX ON `bucket.Name`(`index1`, `index2`, ...)
func buildIndexes(s N1QLStore, indexNames []string) error {
func buildIndexes(ctx context.Context, s N1QLStore, indexNames []string) error {
if len(indexNames) == 0 {
return nil
}
Expand All @@ -241,7 +243,7 @@ func buildIndexes(s N1QLStore, indexNames []string) error {
if IsIndexerRetryBuildError(err) {
InfofCtx(context.TODO(), KeyQuery, "Indexer error creating index - waiting for background build. Error:%v", err)
// Wait for bucket to be created in background before returning
return s.WaitForIndexesOnline(indexNames, false)
return s.WaitForIndexesOnline(ctx, indexNames, false)
}

return err
Expand All @@ -263,13 +265,13 @@ type getIndexMetaRetryValues struct {
meta *IndexMeta
}

func GetIndexMeta(store N1QLStore, indexName string) (exists bool, meta *IndexMeta, err error) {
func GetIndexMeta(ctx context.Context, store N1QLStore, indexName string) (exists bool, meta *IndexMeta, err error) {

worker := func() (shouldRetry bool, err error, value interface{}) {
exists, meta, err := getIndexMetaWithoutRetry(store, indexName)
if err != nil {
// retry
WarnfCtx(context.TODO(), "Error from GetIndexMeta for index %s: %v will retry", indexName, err)
WarnfCtx(ctx, "Error from GetIndexMeta for index %s: %v will retry", indexName, err)
return true, err, nil
}
return false, nil, getIndexMetaRetryValues{
Expand Down Expand Up @@ -318,7 +320,7 @@ func getIndexMetaWithoutRetry(store N1QLStore, indexName string) (exists bool, m
}

// DropIndex drops the specified index from the current bucket.
func DropIndex(store N1QLStore, indexName string) error {
func DropIndex(ctx context.Context, store N1QLStore, indexName string) error {
statement := fmt.Sprintf("DROP INDEX default:%s.`%s`", store.EscapedKeyspace(), indexName)

err := store.executeStatement(statement)
Expand All @@ -327,9 +329,9 @@ func DropIndex(store N1QLStore, indexName string) error {
}

if IsIndexerRetryIndexError(err) {
InfofCtx(context.TODO(), KeyQuery, "Indexer error dropping index - waiting for server background retry. Error:%v", err)
InfofCtx(ctx, KeyQuery, "Indexer error dropping index - waiting for server background retry. Error:%v", err)
// Wait for bucket to be dropped in background before returning
return waitForIndexExistence(store, indexName, false)
return waitForIndexExistence(ctx, store, indexName, false)
}

return err
Expand Down Expand Up @@ -516,8 +518,7 @@ func IndexMetaKeyspaceID(bucketName, scopeName, collectionName string) string {
}

// WaitForIndexesOnline takes set of indexes and watches them till they're online.
func WaitForIndexesOnline(cluster *gocb.Cluster, bucketName, scopeName, collectionName string, indexNames []string, failfast bool) error {
logCtx := context.TODO()
func WaitForIndexesOnline(ctx context.Context, cluster *gocb.Cluster, bucketName, scopeName, collectionName string, indexNames []string, failfast bool) error {
mgr := cluster.QueryIndexes()
maxNumAttempts := 180
if failfast {
Expand All @@ -542,17 +543,22 @@ func WaitForIndexesOnline(cluster *gocb.Cluster, bucketName, scopeName, collecti
}
// check each of the current indexes state, add to map once finished to make sure each index online is only being logged once
for i := 0; i < len(currIndexes); i++ {
if currIndexes[i].State == IndexStateOnline {
if !onlineIndexes[currIndexes[i].Name] {
InfofCtx(logCtx, KeyAll, "Index %s is online", MD(currIndexes[i].Name))
onlineIndexes[currIndexes[i].Name] = true
name := currIndexes[i].Name
// use slices.Contains since the number of indexes is expected to be small
if currIndexes[i].State == IndexStateOnline && slices.Contains(indexNames, name) {
if !onlineIndexes[name] {
InfofCtx(ctx, KeyAll, "Index %s is online", MD(name))
onlineIndexes[name] = true
}
}
}
// check online index against indexes we watch to have online, increase counter as each comes online
var offlineIndexes []string
for _, listVal := range indexNames {
if onlineIndexes[listVal] {
watchedOnlineIndexCount++
} else {
offlineIndexes = append(offlineIndexes, listVal)
}
}

Expand All @@ -562,9 +568,9 @@ func WaitForIndexesOnline(cluster *gocb.Cluster, bucketName, scopeName, collecti
retryCount++
shouldContinue, sleepMs := retrySleeper(retryCount)
if !shouldContinue {
return fmt.Errorf("error waiting for indexes for bucket %s....", MD(bucketName))
return fmt.Errorf("error waiting for indexes %s ...", strings.Join(offlineIndexes, ", "))
}
InfofCtx(logCtx, KeyAll, "Indexes for bucket %s not ready - retrying...", MD(bucketName))
InfofCtx(ctx, KeyAll, "Indexes %s not ready - retrying...", strings.Join(offlineIndexes, ", "))
time.Sleep(time.Millisecond * time.Duration(sleepMs))
}
}
Expand Down
4 changes: 2 additions & 2 deletions base/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,10 @@ func DropAllIndexes(ctx context.Context, n1QLStore N1QLStore) error {
defer wg.Done()

InfofCtx(ctx, KeySGTest, "Dropping index %s on bucket %s...", indexToDrop, n1QLStore.GetName())
dropErr := n1QLStore.DropIndex(indexToDrop)
dropErr := n1QLStore.DropIndex(ctx, indexToDrop)
if dropErr != nil {
// Retry dropping index if first try fails before returning error
dropRetry := n1QLStore.DropIndex(indexToDrop)
dropRetry := n1QLStore.DropIndex(ctx, indexToDrop)
if dropRetry != nil {
asyncErrors <- dropErr
ErrorfCtx(ctx, "...failed to drop index %s on bucket %s: %s", indexToDrop, n1QLStore.GetName(), dropErr)
Expand Down
2 changes: 1 addition & 1 deletion db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func (dbCtx *DatabaseContext) RemoveObsoleteIndexes(ctx context.Context, preview
errs = errs.Append(errors.New(err))
continue
}
collectionRemovedIndexes, err := removeObsoleteIndexes(n1qlStore, previewOnly, dbCtx.UseXattrs(), dbCtx.UseViews(), sgIndexes)
collectionRemovedIndexes, err := removeObsoleteIndexes(ctx, n1qlStore, previewOnly, dbCtx.UseXattrs(), dbCtx.UseViews(), sgIndexes)
if err != nil {
errs = errs.Append(err)
continue
Expand Down
5 changes: 3 additions & 2 deletions db/functions/function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,12 +684,13 @@ func setupTestDBForBucketWithOptions(t testing.TB, tBucket base.Bucket, dbcOptio

// createPrimaryIndex returns true if there was no index created before
func createPrimaryIndex(t *testing.T, n1qlStore base.N1QLStore) bool {
hasPrimary, _, err := base.GetIndexMeta(n1qlStore, base.PrimaryIndexName)
ctx := base.TestCtx(t)
hasPrimary, _, err := base.GetIndexMeta(ctx, n1qlStore, base.PrimaryIndexName)
assert.NoError(t, err)
if hasPrimary {
return false
}
err = n1qlStore.CreatePrimaryIndex(base.PrimaryIndexName, nil)
err = n1qlStore.CreatePrimaryIndex(ctx, base.PrimaryIndexName, nil)
assert.NoError(t, err)
return true
}
2 changes: 1 addition & 1 deletion db/functions/graphql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func TestUserGraphQLWithN1QL(t *testing.T) {

if createdPrimaryIdx {
defer func() {
err := n1qlStore.DropIndex(base.PrimaryIndexName)
err := n1qlStore.DropIndex(base.TestCtx(t), base.PrimaryIndexName)
require.NoError(t, err)
}()

Expand Down
Loading

0 comments on commit d26ad31

Please sign in to comment.