Skip to content

Commit

Permalink
[v2.2] Port #22508 and prereqs (ContainerRuntime: Process incoming ba…
Browse files Browse the repository at this point in the history
…tches op-by-op instead of waiting for the whole batch) (#22654)

## Description

Porting the following commits from `main` to `release/client/2.2`:

* ce0a14c (Offline: Model empty batches
differently, exposing the empty Grouped Batch message to more of the
system)
* e024b86 (ContainerRuntime: Refactor
batch processing code to support either op-by-op or batch-all-at-once
semantics)
* 709f085 (ContainerRuntime: Process
incoming batches op-by-op instead of waiting for the whole batch)

## Reviewer Guidance

There was a ton of churn between 2.2 and 2.3 in this code (subset of
[these changes to container-runtime
dir](https://github.com/microsoft/FluidFramework/commits/release/client/2.3/packages/runtime/container-runtime?since=2024-08-17)),
so the cherry-pick turned into more like a manual re-implementing of the
change. In some ways it's a smaller diff compared to what needed to
happen in `main`.

So please review carefully as you would an original PR.
  • Loading branch information
markfields authored Oct 1, 2024
1 parent 1d1d489 commit 567feff
Show file tree
Hide file tree
Showing 8 changed files with 469 additions and 234 deletions.
16 changes: 16 additions & 0 deletions .changeset/metal-hornets-travel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@fluidframework/container-runtime": minor
---
---
"section": "fix"
---
Restored old op processing behavior around batched ops to avoid potential regression

There's a theoretical risk of indeterminate behavior due to a recent change to how batches of ops are processed.
This fix reverses that change.

Pull Request #21785 updated the ContainerRuntime to hold onto the messages in an incoming batch until they've all arrived, and only then process the set of messages.

While the batch is being processed, the DeltaManager and ContainerRuntime's view of the latest sequence numbers will be
out of sync. This may have unintended side effects, so out of an abundance of caution we're reversing this behavior until
we can add the proper protections to ensure the system stays properly in sync.
48 changes: 34 additions & 14 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ import { IBatchMetadata, ISavedOpMetadata } from "./metadata.js";
import {
BatchId,
BatchMessage,
BatchStartInfo,
ensureContentsDeserialized,
IBatch,
IBatchCheckpoint,
Expand All @@ -180,7 +181,6 @@ import {
OpSplitter,
Outbox,
RemoteMessageProcessor,
type InboundBatch,
} from "./opLifecycle/index.js";
import { pkgVersion } from "./packageVersion.js";
import {
Expand Down Expand Up @@ -2666,18 +2666,24 @@ export class ContainerRuntime
if (hasModernRuntimeMessageEnvelope) {
// If the message has the modern message envelope, then process it here.
// Here we unpack the message (decompress, unchunk, and/or ungroup) into a batch of messages with ContainerMessageType
const inboundBatch = this.remoteMessageProcessor.process(messageCopy, logLegacyCase);
if (inboundBatch === undefined) {
const inboundResult = this.remoteMessageProcessor.process(messageCopy, logLegacyCase);
if (inboundResult === undefined) {
// This means the incoming message is an incomplete part of a message or batch
// and we need to process more messages before the rest of the system can understand it.
return;
}

// Reach out to PendingStateManager to zip localOpMetadata into the message list if it's a local batch
const messagesWithPendingState = this.pendingStateManager.processInboundBatch(
inboundBatch,
const messagesWithPendingState = this.pendingStateManager.processInboundMessages(
inboundResult,
local,
);
if (inboundResult.type !== "fullBatch") {
assert(
messagesWithPendingState.length === 1,
"Partial batch should have exactly one message",
);
}
if (messagesWithPendingState.length > 0) {
messagesWithPendingState.forEach(({ message, localOpMetadata }) => {
const msg: MessageWithContext = {
Expand All @@ -2690,7 +2696,13 @@ export class ContainerRuntime
this.ensureNoDataModelChanges(() => this.processRuntimeMessage(msg));
});
} else {
this.ensureNoDataModelChanges(() => this.processEmptyBatch(inboundBatch, local));
assert(
inboundResult.type === "fullBatch",
"Empty batch is always considered a full batch",
);
this.ensureNoDataModelChanges(() =>
this.processEmptyBatch(inboundResult.batchStart, local),
);
}
} else {
// Check if message.type is one of values in ContainerMessageType
Expand Down Expand Up @@ -2781,19 +2793,27 @@ export class ContainerRuntime
}

/**
* Process an empty batch, which will execute expected actions while processing even if there are no messages.
* This is a separate function because the processCore function expects at least one message to process.
* It is expected to happen only when the outbox produces an empty batch due to a resubmit flow.
* Process an empty batch, which will execute expected actions while processing even if there are no inner runtime messages.
*
* @remarks - Empty batches are produced by the outbox on resubmit when the resubmit flow resulted in no runtime messages.
* This can happen if changes from a remote client "cancel out" the pending changes being resubmited by this client.
* We submit an empty batch if "offline load" (aka rehydrating from stashed state) is enabled,
* to ensure we account for this batch when comparing batchIds, checking for a forked container.
* Otherwise, we would not realize this container has forked in the case where it did fork, and a batch became empty but wasn't submitted as such.
*/
private processEmptyBatch(emptyBatch: InboundBatch, local: boolean) {
const { emptyBatchSequenceNumber: sequenceNumber, batchStartCsn } = emptyBatch;
assert(sequenceNumber !== undefined, 0x9fa /* emptyBatchSequenceNumber must be defined */);
this.emit("batchBegin", { sequenceNumber });
private processEmptyBatch(emptyBatch: BatchStartInfo, local: boolean) {
const { keyMessage, batchStartCsn } = emptyBatch;
this.scheduleManager.beforeOpProcessing(keyMessage);

this._processedClientSequenceNumber = batchStartCsn;
if (!this.hasPendingMessages()) {
this.updateDocumentDirtyState(false);
}
this.emit("batchEnd", undefined, { sequenceNumber });

// We emit this event but say isRuntimeMessage is false, because there are no actual runtime messages here being processed.
// But someone listening to this event expecting to be notified whenever a message arrives would want to know about this.
this.emit("op", keyMessage, false /* isRuntimeMessage */);
this.scheduleManager.afterOpProcessing(undefined /* error */, keyMessage);
if (local) {
this.resetReconnectCount();
}
Expand Down
3 changes: 2 additions & 1 deletion packages/runtime/container-runtime/src/opLifecycle/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ export { OpDecompressor } from "./opDecompressor.js";
export { OpSplitter, splitOp, isChunkedMessage } from "./opSplitter.js";
export {
ensureContentsDeserialized,
InboundBatch,
InboundMessageResult,
BatchStartInfo,
RemoteMessageProcessor,
unpackRuntimeMessage,
} from "./remoteMessageProcessor.js";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,56 @@ import { OpDecompressor } from "./opDecompressor.js";
import { OpGroupingManager, isGroupedBatch } from "./opGroupingManager.js";
import { OpSplitter, isChunkedMessage } from "./opSplitter.js";

/** Messages being received as a batch, with details needed to process the batch */
export interface InboundBatch {
/** Messages in this batch */
readonly messages: InboundSequencedContainerRuntimeMessage[];
/** Info about the batch we learn when we process the first message */
export interface BatchStartInfo {
/** Batch ID, if present */
readonly batchId: string | undefined;
/** clientId that sent this batch. Used to compute Batch ID if needed */
readonly clientId: string;
/**
* Client Sequence Number of the first message in the batch.
* Client Sequence Number of the Grouped Batch message, or the first message in the ungrouped batch.
* Used to compute Batch ID if needed
*
* @remarks For chunked batches, this is the CSN of the "representative" chunk (the final chunk).
* For grouped batches, clientSequenceNumber on messages is overwritten, so we track this original value here.
*/
readonly batchStartCsn: number;
/** For an empty batch (with no messages), we need to remember the empty grouped batch's sequence number */
readonly emptyBatchSequenceNumber?: number;
/**
* The first message in the batch, or if the batch is empty, the empty grouped batch message
* Used for accessing the sequence numbers for the (start of the) batch.
*
* @remarks Do not use clientSequenceNumber here, use batchStartCsn instead.
*/
readonly keyMessage: ISequencedDocumentMessage;
}

/**
* Result of processing the next inbound message.
* Depending on the message and configuration of RemoteMessageProcessor, the result may be:
* - A full batch of messages (including a single-message batch)
* - The first message of a multi-message batch
* - The next message in a multi-message batch
*/
export type InboundMessageResult =
| {
type: "fullBatch";
messages: InboundSequencedContainerRuntimeMessage[];
batchStart: BatchStartInfo;
length: number;
}
| {
type: "batchStartingMessage";
batchStart: BatchStartInfo;
nextMessage: InboundSequencedContainerRuntimeMessage;
length?: never;
}
| {
type: "nextBatchMessage";
batchEnd?: boolean;
nextMessage: InboundSequencedContainerRuntimeMessage;
length?: never;
};

function assertHasClientId(
message: ISequencedDocumentMessage,
): asserts message is ISequencedDocumentMessage & { clientId: string } {
Expand All @@ -57,12 +87,7 @@ function assertHasClientId(
* @internal
*/
export class RemoteMessageProcessor {
/**
* The current batch being received, with details needed to process it.
*
* @remarks If undefined, we are expecting the next message to start a new batch.
*/
private batchInProgress: InboundBatch | undefined;
private batchInProgress: boolean = false;

constructor(
private readonly opSplitter: OpSplitter,
Expand Down Expand Up @@ -100,7 +125,7 @@ export class RemoteMessageProcessor {
public process(
remoteMessageCopy: ISequencedDocumentMessage,
logLegacyCase: (codePath: string) => void,
): InboundBatch | undefined {
): InboundMessageResult | undefined {
let message = remoteMessageCopy;

assertHasClientId(message);
Expand Down Expand Up @@ -129,80 +154,84 @@ export class RemoteMessageProcessor {
}

if (isGroupedBatch(message)) {
// We should be awaiting a new batch (batchInProgress undefined)
assert(
this.batchInProgress === undefined,
0x9d3 /* Grouped batch interrupting another batch */,
);
// We should be awaiting a new batch (batchInProgress false)
assert(!this.batchInProgress, 0x9d3 /* Grouped batch interrupting another batch */);
const batchId = asBatchMetadata(message.metadata)?.batchId;
const groupedMessages = this.opGroupingManager.ungroupOp(message).map(unpack);

return {
type: "fullBatch",
messages: groupedMessages, // Will be [] for an empty batch
batchStartCsn: message.clientSequenceNumber,
clientId,
batchId,
// If the batch is empty, we need to return the sequence number aside
emptyBatchSequenceNumber:
groupedMessages.length === 0 ? message.sequenceNumber : undefined,
batchStart: {
batchStartCsn: message.clientSequenceNumber,
clientId,
batchId,
keyMessage: groupedMessages[0] ?? message, // For an empty batch, this is the empty grouped batch message. Needed for sequence numbers for this batch
},
length: groupedMessages.length, // Will be 0 for an empty batch
};
}

// Do a final unpack of runtime messages in case the message was not grouped, compressed, or chunked
unpackRuntimeMessage(message, logLegacyCase);

const { batchEnded } = this.addMessageToBatch(
return this.getResultBasedOnBatchMetadata(
message as InboundSequencedContainerRuntimeMessage & { clientId: string },
);

if (!batchEnded) {
// batch not yet complete
return undefined;
}

const completedBatch = this.batchInProgress;
this.batchInProgress = undefined;
return completedBatch;
}

/**
* Add the given message to the current batch, and indicate whether the batch is now complete.
*
* @returns batchEnded: true if the batch is now complete, batchEnded: false if more messages are expected
* Now that the message has been "unwrapped" as to any virtualization (grouping, compression, chunking),
* inspect the batch metadata flag and determine what kind of result to return.
*/
private addMessageToBatch(
private getResultBasedOnBatchMetadata(
message: InboundSequencedContainerRuntimeMessage & { clientId: string },
): { batchEnded: boolean } {
): InboundMessageResult {
const batchMetadataFlag = asBatchMetadata(message.metadata)?.batch;
if (this.batchInProgress === undefined) {
if (!this.batchInProgress) {
// We are waiting for a new batch
assert(batchMetadataFlag !== false, 0x9d5 /* Unexpected batch end marker */);

// Start of a new multi-message batch
if (batchMetadataFlag === true) {
this.batchInProgress = {
messages: [message],
batchId: asBatchMetadata(message.metadata)?.batchId,
clientId: message.clientId,
batchStartCsn: message.clientSequenceNumber,
this.batchInProgress = true;
return {
type: "batchStartingMessage",
batchStart: {
batchId: asBatchMetadata(message.metadata)?.batchId,
clientId: message.clientId,
batchStartCsn: message.clientSequenceNumber,
keyMessage: message,
},
nextMessage: message,
};

return { batchEnded: false };
}

// Single-message batch (Since metadata flag is undefined)
this.batchInProgress = {
return {
type: "fullBatch",
messages: [message],
batchStartCsn: message.clientSequenceNumber,
clientId: message.clientId,
batchId: asBatchMetadata(message.metadata)?.batchId,
batchStart: {
batchStartCsn: message.clientSequenceNumber,
clientId: message.clientId,
batchId: asBatchMetadata(message.metadata)?.batchId,
keyMessage: message,
},
length: 1,
};
return { batchEnded: true };
}
assert(batchMetadataFlag !== true, 0x9d6 /* Unexpected batch start marker */);

this.batchInProgress.messages.push(message);
// Clear batchInProgress state if the batch is ending
if (batchMetadataFlag === false) {
this.batchInProgress = false;
}

return { batchEnded: batchMetadataFlag === false };
return {
type: "nextBatchMessage",
nextMessage: message,
batchEnd: batchMetadataFlag === false,
};
}
}

Expand Down
Loading

0 comments on commit 567feff

Please sign in to comment.