Skip to content
This repository has been archived by the owner on Mar 27, 2024. It is now read-only.

feat: Added GetBulkAsRawMap function to the MongoDB API #259

Merged
merged 1 commit into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 91 additions & 13 deletions component/storage/mongodb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,39 @@ func (s *Store) GetBulk(keys ...string) ([][]byte, error) {
return allValues, nil
}

// GetBulkAsRawMap fetches the values associated with the given keys and returns the documents (as maps).
// If no data exists under a given key, then nil is returned for that value. It is not considered an error.
// Depending on the implementation, this method may be faster than calling Get for each key individually.
// If any of the given keys are empty, then an error will be returned.
func (s *Store) GetBulkAsRawMap(keys ...string) ([]map[string]interface{}, error) {
if len(keys) == 0 {
return nil, errors.New("keys slice must contain at least one key")
}

for _, key := range keys {
if key == "" {
return nil, errors.New("key cannot be empty")
}
}

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()

cursor, err := s.coll.Find(ctxWithTimeout, bson.M{"_id": bson.D{
{Key: "$in", Value: keys},
}})
if err != nil {
return nil, fmt.Errorf("failed to run Find command in MongoDB: %w", err)
}

allValues, err := s.collectBulkGetResultsAsRawMap(keys, cursor)
if err != nil {
return nil, err
}

return allValues, nil
}

// Query does a query for data as defined by the documentation in storage.Store (the interface).
// This implementation also supports querying for data tagged with multiple tag name + value pairs (using AND logic).
// To do this, separate the tag name + value pairs using &&. You can still omit one or both of the tag values
Expand All @@ -691,12 +724,12 @@ func (s *Store) Query(expression string, options ...storage.QueryOption) (storag
return &Iterator{}, errInvalidQueryExpressionFormat
}

filter, err := prepareFilter(strings.Split(expression, "&&"), false)
filter, err := PrepareFilter(strings.Split(expression, "&&"), false)
if err != nil {
return nil, err
}

findOptions := s.createMongoDBFindOptions(options)
findOptions := s.CreateMongoDBFindOptions(options)

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()
Expand Down Expand Up @@ -933,6 +966,30 @@ func (s *Store) collectBulkGetResults(keys []string, cursor *mongo.Cursor) ([][]
return allValues, nil
}

func (s *Store) collectBulkGetResultsAsRawMap(keys []string, cursor *mongo.Cursor) ([]map[string]interface{}, error) {
allValues := make([]map[string]interface{}, len(keys))

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), s.timeout)
defer cancel()

for cursor.Next(ctxWithTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to note here: since this context is getting reused, all the cursor.Next calls need to complete before the timeout. Might be an issue if there's a large result set.

I see that I also did the same thing in the collectBulkGetResults method. Perhaps it's not urgent right now, but might be worth reworking in the future to use a new context for each call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something to note here: since this context is getting reused, all the cursor.Next calls need to complete before the timeout. Might be an issue if there's a large result set.

I see that I also did the same thing in the collectBulkGetResults method. Perhaps it's not urgent right now, but might be worth reworking in the future to use a new context for each call.

It depends on how timeout is defined. If timeout is defined as the timeout for the entire function, i.e. GetBulk, then this is correct. Otherwise, you'd have a variable timeout for GetBulk depending on how many items in the result set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably need to define the timeout more explicitly. Right now here's how I defined it above the WithTimeout function:

// WithTimeout is an option for specifying the timeout for all calls to MongoDB.
// The timeout is 10 seconds by default.

When I created that option, I was thinking the timeout was per remote call to MongoDB (which a cursor.Next would be, depending on the current page). So far in the other areas of the code, I create a new timeout for every possible MongoDB call (with the exception of the bulk results collection as mentioned above). But I now see a problem with that approach. Some calls to MongoDB can take longer than others (e.g. some Find calls that execute queries). I think ideally the caller should be able to pass in their own timeout to each call, which is tricky with the current interface. I think I should create a followup issue for me to address this in the future. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ideally the caller should be providing the context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #262

key, value, err := getKeyAndRawMapFromMongoDBResult(cursor)
if err != nil {
return nil, fmt.Errorf("failed to get value from MongoDB result: %w", err)
}

for i := 0; i < len(keys); i++ {
if key == keys[i] {
allValues[i] = value

break
}
}
}

return allValues, nil
}

func (s *Store) executeBulkWriteCommand(models []mongo.WriteModel, atLeastOneInsertOneModel bool) error {
var attemptsMade int

Expand Down Expand Up @@ -993,7 +1050,8 @@ func (s *Store) executeBulkWriteCommand(models []mongo.WriteModel, atLeastOneIns
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(s.timeBetweenRetries), s.maxRetries))
}

func (s *Store) createMongoDBFindOptions(options []storage.QueryOption) *mongooptions.FindOptions {
// CreateMongoDBFindOptions converts the given storage options into MongoDB options.
func (s *Store) CreateMongoDBFindOptions(options []storage.QueryOption) *mongooptions.FindOptions {
queryOptions := getQueryOptions(options)

findOptions := mongooptions.Find()
Expand Down Expand Up @@ -1261,6 +1319,25 @@ func getKeyAndValueFromMongoDBResult(decoder decoder) (key string, value []byte,
return data.Key, valueBytes, nil
}

func getKeyAndRawMapFromMongoDBResult(decoder decoder) (key string, doc map[string]interface{}, err error) {
doc, errGetDataWrapper := getValueAsRawMapFromMongoDBResult(decoder)
if errGetDataWrapper != nil {
return "", nil, fmt.Errorf("failed to get data wrapper from MongoDB result: %w", errGetDataWrapper)
}

id, ok := doc["_id"]
if !ok {
return "", nil, fmt.Errorf("no _id field in document")
}

key, ok = id.(string)
if !ok {
return "", nil, fmt.Errorf("_id field in document is not a string")
}

return key, doc, nil
}

func getTagsFromMongoDBResult(decoder decoder) ([]storage.Tag, error) {
data, err := getDataWrapperFromMongoDBResult(decoder)
if err != nil {
Expand Down Expand Up @@ -1307,7 +1384,8 @@ func getQueryOptions(options []storage.QueryOption) storage.QueryOptions {
return queryOptions
}

func prepareFilter(expressions []string, isJSONQuery bool) (bson.D, error) {
// PrepareFilter converts the expression into a MongoDB filter.
func PrepareFilter(expressions []string, isJSONQuery bool) (bson.D, error) {
operands := make(bson.D, len(expressions))

for i, exp := range expressions {
Expand All @@ -1332,6 +1410,14 @@ func prepareSingleOperand(expression string, isJSONQuery bool) (bson.E, error) {
return bson.E{}, err
}

var key string

if isJSONQuery {
key = splitExpression[0]
} else {
key = fmt.Sprintf("tags.%s", splitExpression[0])
}

if operator == "$lt" || operator == "$lte" || operator == "$gt" || operator == "$gte" {
value, err := strconv.Atoi(splitExpression[1])
if err != nil {
Expand All @@ -1343,14 +1429,6 @@ func prepareSingleOperand(expression string, isJSONQuery bool) (bson.E, error) {
{Key: operator, Value: value},
}

var key string

if isJSONQuery {
key = splitExpression[0]
} else {
key = fmt.Sprintf("tags.%s", splitExpression[0])
}

operand := bson.E{
Key: key,
Value: filterValue,
Expand All @@ -1368,7 +1446,7 @@ func prepareSingleOperand(expression string, isJSONQuery bool) (bson.E, error) {
}

operand := bson.E{
Key: fmt.Sprintf("tags.%s", splitExpression[0]),
Key: key,
Value: filterValue,
}

Expand Down
47 changes: 47 additions & 0 deletions component/storage/mongodb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func doAllTests(t *testing.T, connString string) {
testBatchIsNewKeyError(t, connString)
testPing(t, connString)
testGetAsRawMap(t, connString)
testGetBulkAsRawMap(t, connString)
testCustomIndexAndQuery(t, connString)
testDocumentReplacementAndMarshalling(t, connString)
}
Expand Down Expand Up @@ -1397,6 +1398,52 @@ func testGetAsRawMap(t *testing.T, connString string) {
"unexpected retrieved test data")
}

func testGetBulkAsRawMap(t *testing.T, connString string) {
t.Helper()

provider, err := mongodb.NewProvider(connString)
require.NoError(t, err)

storeName := randomStoreName()

store, err := provider.OpenStore(storeName)
require.NoError(t, err)

var ok bool

mongoDBStore, ok := store.(*mongodb.Store)
require.True(t, ok)

_, err = mongoDBStore.GetBulkAsRawMap("TestKey1", "")
require.EqualError(t, err, "key cannot be empty")

testData1 := map[string]interface{}{
"field1": "value1",
"field2": int64(2),
"field3": true,
}

testData2 := map[string]interface{}{
"field1": "value1",
"field2": int64(2),
"field3": true,
}

require.NoError(t, mongoDBStore.PutAsJSON("TestKey1", testData1))
require.NoError(t, mongoDBStore.PutAsJSON("TestKey2", testData2))

retrievedTestData, err := mongoDBStore.GetBulkAsRawMap("TestKey1", "TestKey2")
require.NoError(t, err)
require.Len(t, retrievedTestData, 2)

// The retrieved test data should be the same as the input test data, except that there's an _id field now.
testData1["_id"] = "TestKey1"
testData2["_id"] = "TestKey2"

require.True(t, reflect.DeepEqual(testData1, retrievedTestData[0]), "unexpected retrieved test data")
require.True(t, reflect.DeepEqual(testData2, retrievedTestData[1]), "unexpected retrieved test data")
}

func testCustomIndexAndQuery(t *testing.T, connString string) {
t.Helper()
t.Run("Using individual PutAsJSON calls", func(t *testing.T) {
Expand Down