diff --git a/dev/test-studio/sanity.config.ts b/dev/test-studio/sanity.config.ts index f2b4bf85270..d9e64b6064b 100644 --- a/dev/test-studio/sanity.config.ts +++ b/dev/test-studio/sanity.config.ts @@ -220,6 +220,15 @@ export default defineConfig([ plugins: [sharedSettings()], basePath: '/playground', }, + { + name: 'listener-events', + title: 'Listener events debug', + subtitle: 'Listener events debugging', + projectId: 'ppsg7ml5', + dataset: 'data-loss', + plugins: [sharedSettings()], + basePath: '/listener-events', + }, { name: 'playground-partial-indexing', title: 'Test Studio (playground-partial-indexing)', diff --git a/packages/sanity/src/core/store/_legacy/document/debug.ts b/packages/sanity/src/core/store/_legacy/document/debug.ts new file mode 100644 index 00000000000..85d41b1e4ea --- /dev/null +++ b/packages/sanity/src/core/store/_legacy/document/debug.ts @@ -0,0 +1,3 @@ +import createDebug from 'debug' + +export const debug = createDebug('sanity:document-store') diff --git a/packages/sanity/src/core/store/_legacy/document/getPairListener.ts b/packages/sanity/src/core/store/_legacy/document/getPairListener.ts index aadbc5384c0..e0f0fc9f173 100644 --- a/packages/sanity/src/core/store/_legacy/document/getPairListener.ts +++ b/packages/sanity/src/core/store/_legacy/document/getPairListener.ts @@ -2,9 +2,12 @@ import {type SanityClient} from '@sanity/client' import {type SanityDocument} from '@sanity/types' import {groupBy} from 'lodash' -import {defer, type Observable, of as observableOf, of, timer} from 'rxjs' -import {concatMap, map, mergeMap, scan} from 'rxjs/operators' +import {defer, merge, type Observable, of, throwError, timer} from 'rxjs' +import {catchError, concatMap, filter, map, mergeMap, scan, share} from 'rxjs/operators' +import {LISTENER_RESET_DELAY} from '../../../preview/constants' +import {shareReplayLatest} from '../../../preview/utils/shareReplayLatest' +import {debug} from './debug' import { type IdPair, type MutationEvent, @@ -12,6 +15,7 @@ import { type ReconnectEvent, type WelcomeEvent, } from './types' +import {OutOfSyncError, sequentializeListenerEvents} from './utils/sequentializeListenerEvents' interface Snapshots { draft: SanityDocument | null @@ -28,6 +32,13 @@ export interface InitialSnapshotEvent { /** @internal */ export interface PairListenerOptions { tag?: string + + /** + * Called when we recover from sync error + * Meant for error tracking / telemetry purposes + * @param error - the {@link OutOfSyncError} recovered from + */ + onSyncErrorRecovery?(error: OutOfSyncError): void } /** @internal */ @@ -65,9 +76,10 @@ export function getPairListener( options: PairListenerOptions = {}, ): Observable { const {publishedId, draftId} = idPair - return defer( - () => - client.observable.listen( + + const sharedEvents = defer(() => + client.observable + .listen( `*[_id == $publishedId || _id == $draftId]`, { publishedId, @@ -79,20 +91,35 @@ export function getPairListener( effectFormat: 'mendoza', tag: options.tag || 'document.pair-listener', }, - ) as Observable, - ).pipe( - concatMap((event) => - event.type === 'welcome' + ) + .pipe( + //filter((event) => Math.random() < 0.99 || event.type !== 'mutation'), + shareReplayLatest({ + predicate: (event) => event.type === 'welcome' || event.type === 'reconnect', + resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY), + }), + ), + ) as Observable + + const pairEvents$ = sharedEvents.pipe( + concatMap((event) => { + return event.type === 'welcome' ? fetchInitialDocumentSnapshots().pipe( - concatMap((snapshots) => [ - createSnapshotEvent(draftId, snapshots.draft), - createSnapshotEvent(publishedId, snapshots.published), + mergeMap(({draft, published}) => [ + createSnapshotEvent(draftId, draft), + createSnapshotEvent(publishedId, published), ]), ) - : observableOf(event), - ), + : of(event) + }), scan( - (acc: {next: ListenerEvent[]; buffer: ListenerEvent[]}, msg) => { + ( + acc: { + next: (InitialSnapshotEvent | ListenerEvent)[] + buffer: (InitialSnapshotEvent | ListenerEvent)[] + }, + msg, + ) => { // we only care about mutation events if (!isMutationEvent(msg)) { return {next: [msg], buffer: []} @@ -124,9 +151,39 @@ export function getPairListener( ), // note: this flattens the array, and in the case of an empty array, no event will be pushed downstream mergeMap((v) => v.next), - concatMap((result) => - (window as any).SLOW ? timer(10000).pipe(map(() => result)) : of(result), + share(), + ) + + const draftEvents$ = pairEvents$.pipe( + filter((event) => + event.type === 'mutation' || event.type === 'snapshot' ? event.documentId === draftId : true, + ), + sequentializeListenerEvents(), + ) + + const publishedEvents$ = pairEvents$.pipe( + filter((event) => + event.type === 'mutation' || event.type === 'snapshot' + ? event.documentId === publishedId + : true, ), + sequentializeListenerEvents(), + ) + + return merge(draftEvents$, publishedEvents$).pipe( + catchError((err, caught$) => { + if (err instanceof OutOfSyncError) { + debug('Recovering from OutOfSyncError: %s', OutOfSyncError.name) + if (typeof options?.onSyncErrorRecovery === 'function') { + options?.onSyncErrorRecovery(err) + } else { + console.error(err) + } + // this will retry immediately + return caught$ + } + return throwError(() => err) + }), ) function fetchInitialDocumentSnapshots(): Observable { diff --git a/packages/sanity/src/core/store/_legacy/document/types.ts b/packages/sanity/src/core/store/_legacy/document/types.ts index 19e0b005ebe..e848dca31f5 100644 --- a/packages/sanity/src/core/store/_legacy/document/types.ts +++ b/packages/sanity/src/core/store/_legacy/document/types.ts @@ -14,6 +14,8 @@ export interface MutationEvent { mutations: MutationPayload[] effects: {apply: unknown; revert: unknown} + previousRev: string + resultRev: string transactionTotalEvents: number transactionCurrentEvent: number visibility: 'transaction' | 'query' diff --git a/packages/sanity/src/core/store/_legacy/document/utils/__test__/eventChainUtils.test.ts b/packages/sanity/src/core/store/_legacy/document/utils/__test__/eventChainUtils.test.ts new file mode 100644 index 00000000000..94ef846e713 --- /dev/null +++ b/packages/sanity/src/core/store/_legacy/document/utils/__test__/eventChainUtils.test.ts @@ -0,0 +1,44 @@ +import {describe, expect, it} from '@jest/globals' + +import {discardChainTo, toOrderedChains} from '../eventChainUtils' +import {mutationEvent} from './test-utils' + +describe(toOrderedChains.name, () => { + it('returns a list of chains', () => { + const events = [ + // missing + mutationEvent({previousRev: 'a', resultRev: 'b', mutations: []}), + mutationEvent({previousRev: 'b', resultRev: 'c', mutations: []}), + mutationEvent({previousRev: 'c', resultRev: 'd', mutations: []}), + mutationEvent({previousRev: 'd', resultRev: 'e', mutations: []}), + mutationEvent({previousRev: 'e', resultRev: 'f', mutations: []}), + + // mutationEvent({previousRev: 'g', resultRev: 'h', mutations: []}), // missing + mutationEvent({previousRev: 'h', resultRev: 'i', mutations: []}), + mutationEvent({previousRev: 'i', resultRev: 'j', mutations: []}), + mutationEvent({previousRev: 'j', resultRev: 'k', mutations: []}), + mutationEvent({previousRev: 'k', resultRev: 'l', mutations: []}), + mutationEvent({previousRev: 'l', resultRev: 'm', mutations: []}), + ] + const [first, second] = toOrderedChains(events) + + expect(first.map((ev) => ev.resultRev)).toEqual(['b', 'c', 'd', 'e', 'f']) + expect(second.map((ev) => ev.resultRev)).toEqual(['i', 'j', 'k', 'l', 'm']) + }) +}) + +describe(discardChainTo.name, () => { + it('discards mutation events in the chain up to the provided revision', () => { + const events = [ + mutationEvent({previousRev: 'a', resultRev: 'b', mutations: []}), + mutationEvent({previousRev: 'b', resultRev: 'c', mutations: []}), + mutationEvent({previousRev: 'c', resultRev: 'd', mutations: []}), + mutationEvent({previousRev: 'd', resultRev: 'e', mutations: []}), + mutationEvent({previousRev: 'e', resultRev: 'f', mutations: []}), + ] + const [discarded, applicable] = discardChainTo(events, 'd') + // Note, it's still in the order received + expect(discarded.map((ev) => ev.resultRev)).toEqual(['b', 'c', 'd']) + expect(applicable.map((ev) => ev.resultRev)).toEqual(['e', 'f']) + }) +}) diff --git a/packages/sanity/src/core/store/_legacy/document/utils/__test__/sequentializeEvents.test.ts b/packages/sanity/src/core/store/_legacy/document/utils/__test__/sequentializeEvents.test.ts new file mode 100644 index 00000000000..87ea9fd485d --- /dev/null +++ b/packages/sanity/src/core/store/_legacy/document/utils/__test__/sequentializeEvents.test.ts @@ -0,0 +1,240 @@ +/* eslint-disable no-nested-ternary */ +import {expect, test} from '@jest/globals' +import {from, lastValueFrom} from 'rxjs' +import {toArray} from 'rxjs/operators' + +import {type MutationPayload} from '../../buffered-doc' +import {type ListenerEvent} from '../../getPairListener' +import {type MutationEvent} from '../../types' +import { + DeadlineExceededError, + MaxBufferExceededError, + sequentializeListenerEvents, +} from '../sequentializeListenerEvents' + +function mutationEvent({ + previousRev, + resultRev, + mutations, +}: { + previousRev: string + resultRev: string + mutations: MutationPayload[] +}): MutationEvent { + return { + type: 'mutation', + documentId: 'test', + transactionId: resultRev, + effects: {revert: [], apply: []}, + mutations, + previousRev: previousRev, + resultRev: resultRev, + transition: 'update', + transactionCurrentEvent: 1, + transactionTotalEvents: 1, + visibility: 'transaction', + } +} + +test("it accumulates events that doesn't apply in a chain starting at the current head revision", async () => { + const events = from([ + { + type: 'snapshot', + documentId: 'test', + document: { + _rev: 'one', + _id: 'test', + _type: 'test', + name: 'initial', + _createdAt: '2024-10-02T06:40:16.414Z', + _updatedAt: '2024-10-02T06:40:16.414Z', + }, + }, + // this has the snapshot revision as it's previous and should be passed on as-is + mutationEvent({ + previousRev: 'one', + resultRev: 'two', + mutations: [{patch: {set: {name: 'OK'}}}], + }), + // this is part of an unbroken chain, but received out of order + mutationEvent({ + previousRev: 'four', + resultRev: 'five', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // this is part of an unbroken chain, but received out of order + mutationEvent({ + previousRev: 'three', + resultRev: 'four', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // we have a complete unbroken chain when receiving this + mutationEvent({ + previousRev: 'two', + resultRev: 'three', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + ] satisfies ListenerEvent[]) + + expect( + (await lastValueFrom(events.pipe(sequentializeListenerEvents(), toArray()))).map((event) => { + return [ + event.type, + event.type === 'mutation' + ? event.previousRev + : event.type === 'snapshot' + ? event.document?._rev + : null, + ] + }), + ).toEqual([ + ['snapshot', 'one'], + ['mutation', 'one'], + ['mutation', 'two'], + ['mutation', 'three'], + ['mutation', 'four'], + ]) +}) + +test('it ignores events already applied to the current head revision', async () => { + const events = from([ + { + type: 'snapshot', + documentId: 'test', + document: { + _rev: 'one', + _id: 'test', + _type: 'test', + name: 'initial', + _createdAt: '2024-10-02T06:40:16.414Z', + _updatedAt: '2024-10-02T06:40:16.414Z', + }, + }, + // this is already applied to the snapshot emitted above and should be ignored + mutationEvent({ + previousRev: 'minus-one', + resultRev: 'zero', + mutations: [{patch: {set: {name: 'SHOULD BE IGNORED'}}}], + }), + // this is already applied to the snapshot emitted above and should be ignored + mutationEvent({ + previousRev: 'zero', + resultRev: 'one', + mutations: [{patch: {set: {name: 'SHOULD ALSO BE IGNORED'}}}], + }), + // this has the snapshot revision as it's previous and should be applied + mutationEvent({ + previousRev: 'one', + resultRev: 'two', + mutations: [{patch: {set: {name: 'SHOULD BE APPLIED'}}}], + }), + ] satisfies ListenerEvent[]) + + expect( + (await lastValueFrom(events.pipe(sequentializeListenerEvents(), toArray()))).map((event) => { + return event?.type === 'mutation' ? event.mutations : event?.type + }), + ).toEqual(['snapshot', [{patch: {set: {name: 'SHOULD BE APPLIED'}}}]]) +}) + +test('it throws an MaxBufferExceededError if the buffer exceeds `maxBuffer`', async () => { + const events = from([ + { + type: 'snapshot', + documentId: 'test', + document: { + _rev: 'one', + _id: 'test', + _type: 'test', + name: 'initial', + _createdAt: '2024-10-02T06:40:16.414Z', + _updatedAt: '2024-10-02T06:40:16.414Z', + }, + }, + // this has the snapshot revision as it's previous and should be passed on as-is + mutationEvent({ + previousRev: 'one', + resultRev: 'two', + mutations: [{patch: {set: {name: 'OK'}}}], + }), + // this is part of an unbroken chain, but received out of order + mutationEvent({ + previousRev: 'four', + resultRev: 'five', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // this breaks the chain + mutationEvent({ + previousRev: 'six', + resultRev: 'seven', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // this is part of an unbroken chain, but received out of order + mutationEvent({ + previousRev: 'three', + resultRev: 'four', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // we have a complete unbroken chain when receiving this + mutationEvent({ + previousRev: 'two', + resultRev: 'three', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + ] satisfies ListenerEvent[]) + + await expect( + lastValueFrom(events.pipe(sequentializeListenerEvents({maxBufferSize: 3}), toArray())), + ).rejects.toThrowError(MaxBufferExceededError) +}) + +test('it throws an OutOfSyncError if the buffer exceeds `maxBuffer`', async () => { + const events = from([ + { + type: 'snapshot', + documentId: 'test', + document: { + _rev: 'one', + _id: 'test', + _type: 'test', + name: 'initial', + _createdAt: '2024-10-02T06:40:16.414Z', + _updatedAt: '2024-10-02T06:40:16.414Z', + }, + }, + // this has the snapshot revision as it's previous and should be passed on as-is + mutationEvent({ + previousRev: 'one', + resultRev: 'two', + mutations: [{patch: {set: {name: 'OK'}}}], + }), + // this is part of an unbroken chain, but received out of order + mutationEvent({ + previousRev: 'four', + resultRev: 'five', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // this breaks the chain + mutationEvent({ + previousRev: 'six', + resultRev: 'seven', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // this is part of an unbroken chain, but received out of order + mutationEvent({ + previousRev: 'three', + resultRev: 'four', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + // we have a complete unbroken chain when receiving this + mutationEvent({ + previousRev: 'two', + resultRev: 'three', + mutations: [{patch: {set: {name: 'Out of order'}}}], + }), + ] satisfies ListenerEvent[]) + + await expect( + lastValueFrom(events.pipe(sequentializeListenerEvents({resolveChainDeadline: 100}), toArray())), + ).rejects.toThrowError(DeadlineExceededError) +}) diff --git a/packages/sanity/src/core/store/_legacy/document/utils/__test__/test-utils.ts b/packages/sanity/src/core/store/_legacy/document/utils/__test__/test-utils.ts new file mode 100644 index 00000000000..671850f99ed --- /dev/null +++ b/packages/sanity/src/core/store/_legacy/document/utils/__test__/test-utils.ts @@ -0,0 +1,26 @@ +import {type MutationPayload} from '../../buffered-doc' +import {type MutationEvent} from '../../types' + +export function mutationEvent({ + previousRev, + resultRev, + mutations, +}: { + previousRev: string + resultRev: string + mutations: MutationPayload[] +}): MutationEvent { + return { + type: 'mutation', + documentId: 'test', + transactionId: resultRev, + effects: {revert: [], apply: []}, + mutations, + previousRev: previousRev, + resultRev: resultRev, + transition: 'update', + transactionCurrentEvent: 1, + transactionTotalEvents: 1, + visibility: 'transaction', + } +} diff --git a/packages/sanity/src/core/store/_legacy/document/utils/eventChainUtils.ts b/packages/sanity/src/core/store/_legacy/document/utils/eventChainUtils.ts new file mode 100644 index 00000000000..59c27b91236 --- /dev/null +++ b/packages/sanity/src/core/store/_legacy/document/utils/eventChainUtils.ts @@ -0,0 +1,41 @@ +import {type MutationEvent} from '../types' + +export function discardChainTo(chain: MutationEvent[], revision: string | undefined) { + const revisionIndex = chain.findIndex((event) => event.resultRev === revision) + + return split(chain, revisionIndex + 1) +} + +function split(array: T[], index: number) { + if (index < 0) { + return [[], array] + } + return [array.slice(0, index), array.slice(index)] +} + +export function toOrderedChains(events: T[]) { + const parents: Record = {} + + events.forEach((event) => { + parents[event.resultRev] = events.find((other) => other.resultRev === event.previousRev) + }) + + // get entries without a parent (if there's more than one, we have a problem) + const orphans = Object.entries(parents).filter(([, parent]) => { + return !parent + })! + + return orphans.map((orphan) => { + const [headRev] = orphan + + let current = events.find((event) => event.resultRev === headRev) + + const sortedList: T[] = [] + while (current) { + sortedList.push(current) + // eslint-disable-next-line no-loop-func + current = events.find((event) => event.previousRev === current?.resultRev) + } + return sortedList + }) +} diff --git a/packages/sanity/src/core/store/_legacy/document/utils/sequentializeListenerEvents.ts b/packages/sanity/src/core/store/_legacy/document/utils/sequentializeListenerEvents.ts new file mode 100644 index 00000000000..ecca27712e2 --- /dev/null +++ b/packages/sanity/src/core/store/_legacy/document/utils/sequentializeListenerEvents.ts @@ -0,0 +1,189 @@ +import {partition} from 'lodash' +import {concat, type Observable, of, switchMap, throwError, timer} from 'rxjs' +import {mergeMap, scan} from 'rxjs/operators' + +import {debug} from '../debug' +import {type ListenerEvent} from '../getPairListener' +import {type MutationEvent} from '../types' +import {discardChainTo, toOrderedChains} from './eventChainUtils' + +interface ListenerSequenceState { + /** + * Tracks the latest revision from the server that can be applied locally + * Once we receive a mutation event that has a `previousRev` that equals `base.revision` + * we will move `base.revision` to the event's `resultRev` + * `base.revision` will be undefined if document doesn't exist. + * `base` is `undefined` until the snapshot event is received + */ + base: {revision: string | undefined} | undefined + /** + * Array of events to pass on to the stream, e.g. when mutation applies to current head revision, or a chain is complete + */ + emitEvents: ListenerEvent[] + /** + * Buffer to keep track of events that doesn't line up in a [previousRev, resultRev] -- [previousRev, resultRev] sequence + * This can happen if events arrive out of order, or if an event in the middle for some reason gets lost + */ + buffer: MutationEvent[] +} + +const DEFAULT_MAX_BUFFER_SIZE = 20 +const DEFAULT_DEADLINE_MS = 30000 + +const EMPTY_ARRAY: never[] = [] + +/** + * Takes an input observable of listener events that might arrive out of order, and emits them in sequence + * If we receive mutation events that doesn't line up in [previousRev, resultRev] pairs we'll put them in a buffer and + * check if we have an unbroken chain every time we receive a new event + * + * If the buffer grows beyond `maxBufferSize`, or if `resolveChainDeadline` milliseconds passes before the chain resolves + * an OutOfSyncError will be thrown on the stream + * + * @internal + */ +export function sequentializeListenerEvents(options?: { + maxBufferSize?: number + resolveChainDeadline?: number +}) { + const {resolveChainDeadline = DEFAULT_DEADLINE_MS, maxBufferSize = DEFAULT_MAX_BUFFER_SIZE} = + options || {} + + return (input$: Observable): Observable => { + return input$.pipe( + scan( + (state: ListenerSequenceState, event: ListenerEvent): ListenerSequenceState => { + if (event.type === 'mutation' && !state.base) { + throw new Error('Invalid state. Cannot create a sequence without a base') + } + if (event.type === 'snapshot') { + // When receiving a new snapshot, we can safely discard the current orphaned and chainable buffers + return { + base: {revision: event.document?._rev}, + buffer: EMPTY_ARRAY, + emitEvents: [event], + } + } + + if (event.type === 'mutation') { + // Note: the buffer may have multiple holes in it (this is a worst case scenario, and probably not likely, but still), + // so we need to consider all possible chains + // `toOrderedChains` will return all detected chains and each of the returned chains will be orderered + // Once we have a list of chains, we can then discard any chain that leads up to the current revision + // since they are already applied on the document + const orderedChains = toOrderedChains(state.buffer.concat(event)).map((chain) => { + // in case the chain leads up to the current revision + const [discarded, rest] = discardChainTo(chain, state.base!.revision) + if (discarded.length > 0) { + debug('Discarded %d mutations already applied to document', discarded.length) + } + return rest + }) + + const [applicableChains, _nextBuffer] = partition(orderedChains, (chain) => { + // note: there can be at most one applicable chain + return state.base!.revision === chain[0]?.previousRev + }) + + const nextBuffer = _nextBuffer.flat() + if (applicableChains.length > 1) { + throw new Error('Expected at most one applicable chain') + } + if (applicableChains.length > 0 && applicableChains[0].length > 0) { + // we now have a continuous chain that can apply on the base revision + // Move current base revision to the last mutation event in the applicable chain + const lastMutation = applicableChains[0].at(-1)! + const nextBaseRevision = + // special case: if the mutation deletes the document it technically has no revision, despite + // resultRev pointing at a transaction id. + lastMutation.transition === 'disappear' ? undefined : lastMutation?.resultRev + return { + base: {revision: nextBaseRevision}, + emitEvents: applicableChains[0], + buffer: nextBuffer, + } + } + + if ( + nextBuffer.length >= + ((globalThis as any).__sanity_debug_maxBufferSize ?? maxBufferSize) + ) { + throw new MaxBufferExceededError( + `Too many unchainable mutation events: ${state.buffer.length}`, + state, + ) + } + return { + ...state, + buffer: nextBuffer, + emitEvents: EMPTY_ARRAY, + } + } + // Any other event (e.g. 'reconnect' is passed on verbatim) + return {...state, emitEvents: [event]} + }, + { + emitEvents: EMPTY_ARRAY, + base: undefined, + buffer: EMPTY_ARRAY, + }, + ), + switchMap((state) => { + const deadline = + (globalThis as any).__sanity_debug_resolveChainDeadline ?? resolveChainDeadline + + if (state.buffer.length > 0) { + debug( + "Detected %d listener event(s) that can't be applied in sequence. This could be due to events arriving out of order. Will throw an error if chain can't be resolved within %dms", + state.buffer.length, + deadline, + ) + return concat( + of(state), + timer(deadline).pipe( + mergeMap(() => + throwError(() => { + return new DeadlineExceededError( + `Did not resolve chain within a deadline of ${resolveChainDeadline}ms`, + state, + ) + }), + ), + ), + ) + } + return of(state) + }), + mergeMap((state) => { + // this will simply flatten the list of events into individual emissions + // if the flushEvents array is empty, nothing will be emitted + return state.emitEvents + }), + ) + } +} + +export class OutOfSyncError extends Error { + /** + * Attach state to the error for debugging/reporting + */ + state: ListenerSequenceState + constructor(message: string, state: ListenerSequenceState) { + super(message) + this.name = 'OutOfSyncError' + this.state = state + } +} + +export class DeadlineExceededError extends OutOfSyncError { + constructor(message: string, state: ListenerSequenceState) { + super(message, state) + this.name = 'DeadlineExceededError' + } +} +export class MaxBufferExceededError extends OutOfSyncError { + constructor(message: string, state: ListenerSequenceState) { + super(message, state) + this.name = 'MaxBufferExceededError' + } +}