diff --git a/component/storage/mongodb/store.go b/component/storage/mongodb/store.go index 3c0d7d3e..19c562ff 100644 --- a/component/storage/mongodb/store.go +++ b/component/storage/mongodb/store.go @@ -674,6 +674,39 @@ func (s *Store) GetBulk(keys ...string) ([][]byte, error) { return allValues, nil } +// GetBulkAsRawMap fetches the values associated with the given keys and returns the documents (as maps). +// If no data exists under a given key, then nil is returned for that value. It is not considered an error. +// Depending on the implementation, this method may be faster than calling Get for each key individually. +// If any of the given keys are empty, then an error will be returned. +func (s *Store) GetBulkAsRawMap(keys ...string) ([]map[string]interface{}, error) { + if len(keys) == 0 { + return nil, errors.New("keys slice must contain at least one key") + } + + for _, key := range keys { + if key == "" { + return nil, errors.New("key cannot be empty") + } + } + + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), s.timeout) + defer cancel() + + cursor, err := s.coll.Find(ctxWithTimeout, bson.M{"_id": bson.D{ + {Key: "$in", Value: keys}, + }}) + if err != nil { + return nil, fmt.Errorf("failed to run Find command in MongoDB: %w", err) + } + + allValues, err := s.collectBulkGetResultsAsRawMap(keys, cursor) + if err != nil { + return nil, err + } + + return allValues, nil +} + // Query does a query for data as defined by the documentation in storage.Store (the interface). // This implementation also supports querying for data tagged with multiple tag name + value pairs (using AND logic). // To do this, separate the tag name + value pairs using &&. You can still omit one or both of the tag values @@ -691,12 +724,12 @@ func (s *Store) Query(expression string, options ...storage.QueryOption) (storag return &Iterator{}, errInvalidQueryExpressionFormat } - filter, err := prepareFilter(strings.Split(expression, "&&"), false) + filter, err := PrepareFilter(strings.Split(expression, "&&"), false) if err != nil { return nil, err } - findOptions := s.createMongoDBFindOptions(options) + findOptions := s.CreateMongoDBFindOptions(options) ctxWithTimeout, cancel := context.WithTimeout(context.Background(), s.timeout) defer cancel() @@ -933,6 +966,30 @@ func (s *Store) collectBulkGetResults(keys []string, cursor *mongo.Cursor) ([][] return allValues, nil } +func (s *Store) collectBulkGetResultsAsRawMap(keys []string, cursor *mongo.Cursor) ([]map[string]interface{}, error) { + allValues := make([]map[string]interface{}, len(keys)) + + ctxWithTimeout, cancel := context.WithTimeout(context.Background(), s.timeout) + defer cancel() + + for cursor.Next(ctxWithTimeout) { + key, value, err := getKeyAndRawMapFromMongoDBResult(cursor) + if err != nil { + return nil, fmt.Errorf("failed to get value from MongoDB result: %w", err) + } + + for i := 0; i < len(keys); i++ { + if key == keys[i] { + allValues[i] = value + + break + } + } + } + + return allValues, nil +} + func (s *Store) executeBulkWriteCommand(models []mongo.WriteModel, atLeastOneInsertOneModel bool) error { var attemptsMade int @@ -993,7 +1050,8 @@ func (s *Store) executeBulkWriteCommand(models []mongo.WriteModel, atLeastOneIns }, backoff.WithMaxRetries(backoff.NewConstantBackOff(s.timeBetweenRetries), s.maxRetries)) } -func (s *Store) createMongoDBFindOptions(options []storage.QueryOption) *mongooptions.FindOptions { +// CreateMongoDBFindOptions converts the given storage options into MongoDB options. +func (s *Store) CreateMongoDBFindOptions(options []storage.QueryOption) *mongooptions.FindOptions { queryOptions := getQueryOptions(options) findOptions := mongooptions.Find() @@ -1261,6 +1319,25 @@ func getKeyAndValueFromMongoDBResult(decoder decoder) (key string, value []byte, return data.Key, valueBytes, nil } +func getKeyAndRawMapFromMongoDBResult(decoder decoder) (key string, doc map[string]interface{}, err error) { + doc, errGetDataWrapper := getValueAsRawMapFromMongoDBResult(decoder) + if errGetDataWrapper != nil { + return "", nil, fmt.Errorf("failed to get data wrapper from MongoDB result: %w", errGetDataWrapper) + } + + id, ok := doc["_id"] + if !ok { + return "", nil, fmt.Errorf("no _id field in document") + } + + key, ok = id.(string) + if !ok { + return "", nil, fmt.Errorf("_id field in document is not a string") + } + + return key, doc, nil +} + func getTagsFromMongoDBResult(decoder decoder) ([]storage.Tag, error) { data, err := getDataWrapperFromMongoDBResult(decoder) if err != nil { @@ -1307,7 +1384,8 @@ func getQueryOptions(options []storage.QueryOption) storage.QueryOptions { return queryOptions } -func prepareFilter(expressions []string, isJSONQuery bool) (bson.D, error) { +// PrepareFilter converts the expression into a MongoDB filter. +func PrepareFilter(expressions []string, isJSONQuery bool) (bson.D, error) { operands := make(bson.D, len(expressions)) for i, exp := range expressions { @@ -1332,6 +1410,14 @@ func prepareSingleOperand(expression string, isJSONQuery bool) (bson.E, error) { return bson.E{}, err } + var key string + + if isJSONQuery { + key = splitExpression[0] + } else { + key = fmt.Sprintf("tags.%s", splitExpression[0]) + } + if operator == "$lt" || operator == "$lte" || operator == "$gt" || operator == "$gte" { value, err := strconv.Atoi(splitExpression[1]) if err != nil { @@ -1343,14 +1429,6 @@ func prepareSingleOperand(expression string, isJSONQuery bool) (bson.E, error) { {Key: operator, Value: value}, } - var key string - - if isJSONQuery { - key = splitExpression[0] - } else { - key = fmt.Sprintf("tags.%s", splitExpression[0]) - } - operand := bson.E{ Key: key, Value: filterValue, @@ -1368,7 +1446,7 @@ func prepareSingleOperand(expression string, isJSONQuery bool) (bson.E, error) { } operand := bson.E{ - Key: fmt.Sprintf("tags.%s", splitExpression[0]), + Key: key, Value: filterValue, } diff --git a/component/storage/mongodb/store_test.go b/component/storage/mongodb/store_test.go index c2ed6a18..97a866ef 100644 --- a/component/storage/mongodb/store_test.go +++ b/component/storage/mongodb/store_test.go @@ -294,6 +294,7 @@ func doAllTests(t *testing.T, connString string) { testBatchIsNewKeyError(t, connString) testPing(t, connString) testGetAsRawMap(t, connString) + testGetBulkAsRawMap(t, connString) testCustomIndexAndQuery(t, connString) testDocumentReplacementAndMarshalling(t, connString) } @@ -1397,6 +1398,52 @@ func testGetAsRawMap(t *testing.T, connString string) { "unexpected retrieved test data") } +func testGetBulkAsRawMap(t *testing.T, connString string) { + t.Helper() + + provider, err := mongodb.NewProvider(connString) + require.NoError(t, err) + + storeName := randomStoreName() + + store, err := provider.OpenStore(storeName) + require.NoError(t, err) + + var ok bool + + mongoDBStore, ok := store.(*mongodb.Store) + require.True(t, ok) + + _, err = mongoDBStore.GetBulkAsRawMap("TestKey1", "") + require.EqualError(t, err, "key cannot be empty") + + testData1 := map[string]interface{}{ + "field1": "value1", + "field2": int64(2), + "field3": true, + } + + testData2 := map[string]interface{}{ + "field1": "value1", + "field2": int64(2), + "field3": true, + } + + require.NoError(t, mongoDBStore.PutAsJSON("TestKey1", testData1)) + require.NoError(t, mongoDBStore.PutAsJSON("TestKey2", testData2)) + + retrievedTestData, err := mongoDBStore.GetBulkAsRawMap("TestKey1", "TestKey2") + require.NoError(t, err) + require.Len(t, retrievedTestData, 2) + + // The retrieved test data should be the same as the input test data, except that there's an _id field now. + testData1["_id"] = "TestKey1" + testData2["_id"] = "TestKey2" + + require.True(t, reflect.DeepEqual(testData1, retrievedTestData[0]), "unexpected retrieved test data") + require.True(t, reflect.DeepEqual(testData2, retrievedTestData[1]), "unexpected retrieved test data") +} + func testCustomIndexAndQuery(t *testing.T, connString string) { t.Helper() t.Run("Using individual PutAsJSON calls", func(t *testing.T) {