Skip to content

Commit

Permalink
feat(NODE-6060): set fire-and-forget protocol when writeConcern is w:…
Browse files Browse the repository at this point in the history
… 0 (#4219)
  • Loading branch information
aditi-khare-mongoDB committed Sep 17, 2024
1 parent 20396e1 commit 643a875
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 13 deletions.
26 changes: 18 additions & 8 deletions src/cmap/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ export class OpQueryRequest {
awaitData: boolean;
exhaust: boolean;
partial: boolean;
/** moreToCome is an OP_MSG only concept */
moreToCome = false;

constructor(
public databaseName: string,
Expand Down Expand Up @@ -407,13 +409,21 @@ const OPTS_EXHAUST_ALLOWED = 1 << 16;

/** @internal */
export interface OpMsgOptions {
requestId: number;
serializeFunctions: boolean;
ignoreUndefined: boolean;
checkKeys: boolean;
maxBsonSize: number;
moreToCome: boolean;
exhaustAllowed: boolean;
socketTimeoutMS?: number;
session?: ClientSession;
numberToSkip?: number;
numberToReturn?: number;
returnFieldSelector?: Document;
pre32Limit?: number;
serializeFunctions?: boolean;
ignoreUndefined?: boolean;
maxBsonSize?: number;
checkKeys?: boolean;
secondaryOk?: boolean;

requestId?: number;
moreToCome?: boolean;
exhaustAllowed?: boolean;
readPreference: ReadPreference;
}

Expand Down Expand Up @@ -465,7 +475,7 @@ export class OpMsgRequest {

// flags
this.checksumPresent = false;
this.moreToCome = options.moreToCome || false;
this.moreToCome = options.moreToCome ?? command.writeConcern?.w === 0;
this.exhaustAllowed =
typeof options.exhaustAllowed === 'boolean' ? options.exhaustAllowed : false;
}
Expand Down
8 changes: 6 additions & 2 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
zlibCompressionLevel: this.description.zlibCompressionLevel
});

if (options.noResponse) {
if (options.noResponse || message.moreToCome) {
yield MongoDBResponse.empty;
return;
}
Expand Down Expand Up @@ -527,7 +527,11 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
new CommandSucceededEvent(
this,
message,
options.noResponse ? undefined : (object ??= document.toObject(bsonOptions)),
options.noResponse
? undefined
: message.moreToCome
? { ok: 1 }
: (object ??= document.toObject(bsonOptions)),
started,
this.description.serverConnectionId
)
Expand Down
5 changes: 4 additions & 1 deletion src/write_concern.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ interface CommandWriteConcernOptions {
* @see https://www.mongodb.com/docs/manual/reference/write-concern/
*/
export class WriteConcern {
/** Request acknowledgment that the write operation has propagated to a specified number of mongod instances or to mongod instances with specified tags. */
/**
* Request acknowledgment that the write operation has propagated to a specified number of mongod instances or to mongod instances with specified tags.
* If w is 0 and is set on a write operation, the server will not send a response.
*/
readonly w?: W;
/** Request acknowledgment that the write operation has been written to the on-disk journal */
readonly journal?: boolean;
Expand Down
140 changes: 138 additions & 2 deletions test/integration/read-write-concern/write_concern.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import { expect } from 'chai';
import { on, once } from 'events';
import { gte } from 'semver';
import * as sinon from 'sinon';

import {
type Collection,
type CommandStartedEvent,
type CommandSucceededEvent,
type Db,
LEGACY_HELLO_COMMAND,
MongoClient
MongoClient,
OpMsgRequest
} from '../../mongodb';
import * as mock from '../../tools/mongodb-mock/index';
import { filterForCommands } from '../shared';
Expand Down Expand Up @@ -93,7 +97,7 @@ describe('Write Concern', function () {
});

afterEach(async function () {
await db.dropDatabase();
await db.dropDatabase({ writeConcern: { w: 'majority' } });
await client.close();
});

Expand Down Expand Up @@ -168,4 +172,136 @@ describe('Write Concern', function () {
});
});
});

describe('fire-and-forget protocol', function () {
context('when writeConcern = 0 and OP_MSG is used', function () {
const writeOperations: { name: string; command: any; expectedReturnVal: any }[] = [
{
name: 'insertOne',
command: client => client.db('test').collection('test').insertOne({ a: 1 }),
expectedReturnVal: { acknowledged: false }
},
{
name: 'insertMany',
command: client =>
client
.db('test')
.collection('test')
.insertMany([{ a: 1 }, { b: 2 }]),
expectedReturnVal: { acknowledged: false }
},
{
name: 'updateOne',
command: client =>
client
.db('test')
.collection('test')
.updateOne({ i: 128 }, { $set: { c: 2 } }),
expectedReturnVal: { acknowledged: false }
},
{
name: 'updateMany',
command: client =>
client
.db('test')
.collection('test')
.updateMany({ name: 'foobar' }, { $set: { name: 'fizzbuzz' } }),
expectedReturnVal: { acknowledged: false }
},
{
name: 'deleteOne',
command: client => client.db('test').collection('test').deleteOne({ a: 1 }),
expectedReturnVal: { acknowledged: false }
},
{
name: 'deleteMany',
command: client => client.db('test').collection('test').deleteMany({ name: 'foobar' }),
expectedReturnVal: { acknowledged: false }
},
{
name: 'replaceOne',
command: client => client.db('test').collection('test').replaceOne({ a: 1 }, { b: 2 }),
expectedReturnVal: { acknowledged: false }
},
{
name: 'removeUser',
command: client => client.db('test').removeUser('albert'),
expectedReturnVal: true
},
{
name: 'findAndModify',
command: client =>
client
.db('test')
.collection('test')
.findOneAndUpdate({}, { $setOnInsert: { a: 1 } }, { upsert: true }),
expectedReturnVal: null
},
{
name: 'dropDatabase',
command: client => client.db('test').dropDatabase(),
expectedReturnVal: true
},
{
name: 'dropCollection',
command: client => client.db('test').dropCollection('test'),
expectedReturnVal: true
},
{
name: 'dropIndexes',
command: client => client.db('test').collection('test').dropIndex('a'),
expectedReturnVal: { ok: 1 }
},
{
name: 'createIndexes',
command: client => client.db('test').collection('test').createIndex({ a: 1 }),
expectedReturnVal: 'a_1'
},
{
name: 'createCollection',
command: client => client.db('test').createCollection('test'),
expectedReturnVal: {}
}
];

for (const op of writeOperations) {
context(`when the write operation ${op.name} is run`, function () {
let client;
let spy;

beforeEach(async function () {
if (gte('3.6.0', this.configuration.version)) {
this.currentTest.skipReason = 'Test requires OP_MSG, needs to be on MongoDB 3.6+';
this.skip();
}
spy = sinon.spy(OpMsgRequest.prototype, 'toBin');
client = this.configuration.newClient({ monitorCommands: true, w: 0 });
await client.connect();
});

afterEach(function () {
sinon.restore();
client.close();
});

it('the request should have moreToCome bit set', async function () {
await op.command(client);
expect(spy.returnValues[spy.returnValues.length - 1][0][16]).to.equal(2);
});

it('the return value of the command should be nullish', async function () {
const result = await op.command(client);
expect(result).to.containSubset(op.expectedReturnVal);
});

it('commandSucceededEvent should have reply with only {ok: 1}', async function () {
const events: CommandSucceededEvent[] = [];
client.on('commandSucceeded', event => events.push(event));
await op.command(client);
expect(events[0]).to.containSubset({ reply: { ok: 1 } });
});
});
}
});
});
});
11 changes: 11 additions & 0 deletions test/unit/commands.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,14 @@ describe('class OpCompressedRequest', () => {
}
});
});

describe('OpMsgRequest', () => {
describe('#constructor', () => {
context('when writeConcern = 0', () => {
it('moreToCome is set to true', async () => {
const request = new OpMsgRequest('db', { a: 1, writeConcern: { w: 0 } }, {});
expect(request.moreToCome).to.be.true;
});
});
});
});

0 comments on commit 643a875

Please sign in to comment.