Skip to content

Commit

Permalink
feat(NODE-3639): add a general stage to the aggregation pipeline buil…
Browse files Browse the repository at this point in the history
…der (#4079)
  • Loading branch information
prenaissance authored Apr 18, 2024
1 parent 2645513 commit 8fca1aa
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 35 deletions.
64 changes: 31 additions & 33 deletions src/cursor/aggregation_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,45 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
);
}

/** Add a stage to the aggregation pipeline
* @example
* ```
* const documents = await users.aggregate().addStage({ $match: { name: /Mike/ } }).toArray();
* ```
* @example
* ```
* const documents = await users.aggregate()
* .addStage<{ name: string }>({ $project: { name: true } })
* .toArray(); // type of documents is { name: string }[]
* ```
*/
addStage(stage: Document): this;
addStage<T = Document>(stage: Document): AggregationCursor<T>;
addStage<T = Document>(stage: Document): AggregationCursor<T> {
assertUninitialized(this);
this[kPipeline].push(stage);
return this as unknown as AggregationCursor<T>;
}

/** Add a group stage to the aggregation pipeline */
group<T = TSchema>($group: Document): AggregationCursor<T>;
group($group: Document): this {
assertUninitialized(this);
this[kPipeline].push({ $group });
return this;
return this.addStage({ $group });
}

/** Add a limit stage to the aggregation pipeline */
limit($limit: number): this {
assertUninitialized(this);
this[kPipeline].push({ $limit });
return this;
return this.addStage({ $limit });
}

/** Add a match stage to the aggregation pipeline */
match($match: Document): this {
assertUninitialized(this);
this[kPipeline].push({ $match });
return this;
return this.addStage({ $match });
}

/** Add an out stage to the aggregation pipeline */
out($out: { db: string; coll: string } | string): this {
assertUninitialized(this);
this[kPipeline].push({ $out });
return this;
return this.addStage({ $out });
}

/**
Expand Down Expand Up @@ -157,50 +169,36 @@ export class AggregationCursor<TSchema = any> extends AbstractCursor<TSchema> {
* ```
*/
project<T extends Document = Document>($project: Document): AggregationCursor<T> {
assertUninitialized(this);
this[kPipeline].push({ $project });
return this as unknown as AggregationCursor<T>;
return this.addStage<T>({ $project });
}

/** Add a lookup stage to the aggregation pipeline */
lookup($lookup: Document): this {
assertUninitialized(this);
this[kPipeline].push({ $lookup });
return this;
return this.addStage({ $lookup });
}

/** Add a redact stage to the aggregation pipeline */
redact($redact: Document): this {
assertUninitialized(this);
this[kPipeline].push({ $redact });
return this;
return this.addStage({ $redact });
}

/** Add a skip stage to the aggregation pipeline */
skip($skip: number): this {
assertUninitialized(this);
this[kPipeline].push({ $skip });
return this;
return this.addStage({ $skip });
}

/** Add a sort stage to the aggregation pipeline */
sort($sort: Sort): this {
assertUninitialized(this);
this[kPipeline].push({ $sort });
return this;
return this.addStage({ $sort });
}

/** Add a unwind stage to the aggregation pipeline */
unwind($unwind: Document | string): this {
assertUninitialized(this);
this[kPipeline].push({ $unwind });
return this;
return this.addStage({ $unwind });
}

/** Add a geoNear stage to the aggregation pipeline */
geoNear($geoNear: Document): this {
assertUninitialized(this);
this[kPipeline].push({ $geoNear });
return this;
return this.addStage({ $geoNear });
}
}
68 changes: 66 additions & 2 deletions test/integration/crud/aggregation.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { expect } from 'chai';

import { MongoInvalidArgumentError } from '../../mongodb';
import { type MongoClient, MongoInvalidArgumentError } from '../../mongodb';
import { filterForCommands } from '../shared';

describe('Aggregation', function () {
let client;
let client: MongoClient;

beforeEach(async function () {
client = this.configuration.newClient();
Expand Down Expand Up @@ -939,4 +939,68 @@ describe('Aggregation', function () {
.finally(() => client.close());
}
});

it('should return identical results for array aggregations and builder aggregations', async function () {
const databaseName = this.configuration.db;
const db = client.db(databaseName);
const collection = db.collection(
'shouldReturnIdenticalResultsForArrayAggregationsAndBuilderAggregations'
);

const docs = [
{
title: 'this is my title',
author: 'bob',
posted: new Date(),
pageViews: 5,
tags: ['fun', 'good', 'fun'],
other: { foo: 5 },
comments: [
{ author: 'joe', text: 'this is cool' },
{ author: 'sam', text: 'this is bad' }
]
}
];

await collection.insertMany(docs, { writeConcern: { w: 1 } });
const arrayPipelineCursor = collection.aggregate([
{
$project: {
author: 1,
tags: 1
}
},
{ $unwind: '$tags' },
{
$group: {
_id: { tags: '$tags' },
authors: { $addToSet: '$author' }
}
},
{ $sort: { _id: -1 } }
]);

const builderPipelineCursor = collection
.aggregate()
.project({ author: 1, tags: 1 })
.unwind('$tags')
.group({ _id: { tags: '$tags' }, authors: { $addToSet: '$author' } })
.sort({ _id: -1 });

const builderGenericStageCursor = collection
.aggregate()
.addStage({ $project: { author: 1, tags: 1 } })
.addStage({ $unwind: '$tags' })
.addStage({ $group: { _id: { tags: '$tags' }, authors: { $addToSet: '$author' } } })
.addStage({ $sort: { _id: -1 } });

const expectedResults = [
{ _id: { tags: 'good' }, authors: ['bob'] },
{ _id: { tags: 'fun' }, authors: ['bob'] }
];

expect(await arrayPipelineCursor.toArray()).to.deep.equal(expectedResults);
expect(await builderPipelineCursor.toArray()).to.deep.equal(expectedResults);
expect(await builderGenericStageCursor.toArray()).to.deep.equal(expectedResults);
});
});

0 comments on commit 8fca1aa

Please sign in to comment.