From e340e3877e3b3ce6067aff4d314e969dcd40e34f Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Thu, 7 Sep 2023 15:13:46 -0400 Subject: [PATCH] feat: Implement StateManipulator with cleaner, more efficient conflict resolution --- .../src/__tests__/conflict-resolution.test.ts | 2 +- packages/core/src/ceramic.ts | 7 +- .../__tests__/state-manipulator.test.ts | 514 ++++++++++++++++++ .../__tests__/stream-loader.test.ts | 129 +++++ .../src/stream-loading/state-manipulator.ts | 255 ++++++++- .../core/src/stream-loading/stream-loader.ts | 10 +- 6 files changed, 905 insertions(+), 12 deletions(-) create mode 100644 packages/core/src/stream-loading/__tests__/state-manipulator.test.ts create mode 100644 packages/core/src/stream-loading/__tests__/stream-loader.test.ts diff --git a/packages/core/src/__tests__/conflict-resolution.test.ts b/packages/core/src/__tests__/conflict-resolution.test.ts index 3721eec7e2..0b14e1e52e 100644 --- a/packages/core/src/__tests__/conflict-resolution.test.ts +++ b/packages/core/src/__tests__/conflict-resolution.test.ts @@ -155,7 +155,7 @@ describe('pickLogToAccept', () => { log: [{ cid: cids[4] }, { cid: cids[0], timestamp: 10 }], } as unknown as StreamState - // When anchored in the same blockchain, same block, and with same log lengths, we should choose the one with + // When anchored in the same blockchain, same block, and with different log lengths, we should choose the one with // longer log length expect(await pickLogToAccept(state1, state2)).toEqual(state1) expect(await pickLogToAccept(state2, state1)).toEqual(state1) diff --git a/packages/core/src/ceramic.ts b/packages/core/src/ceramic.ts index 3e89fe8e22..e7ee06b3fb 100644 --- a/packages/core/src/ceramic.ts +++ b/packages/core/src/ceramic.ts @@ -265,7 +265,12 @@ export class Ceramic implements CeramicApi { ) const tipFetcher = new TipFetcher(this.dispatcher.messageBus) const logSyncer = new LogSyncer(this.dispatcher) - const stateManipulator = new StateManipulator() + const stateManipulator = new StateManipulator( + this._logger, + this._streamHandlers, + this.context, + logSyncer + ) const streamLoader = new StreamLoader( this._logger, tipFetcher, diff --git a/packages/core/src/stream-loading/__tests__/state-manipulator.test.ts b/packages/core/src/stream-loading/__tests__/state-manipulator.test.ts new file mode 100644 index 0000000000..71cf6f5dc0 --- /dev/null +++ b/packages/core/src/stream-loading/__tests__/state-manipulator.test.ts @@ -0,0 +1,514 @@ +import { beforeAll, jest } from '@jest/globals' +import { Dispatcher } from '../../dispatcher.js' +import { createIPFS, swarmConnect } from '@ceramicnetwork/ipfs-daemon' +import { createDispatcher } from '../../__tests__/create-dispatcher.js' +import { + AnchorStatus, + AppliableStreamLog, + CommitData, + CommitType, + Context, + IpfsApi, + LogEntry, + LoggerProvider, + TestUtils, +} from '@ceramicnetwork/common' +import { Ceramic } from '../../ceramic.js' +import { createCeramic } from '../../__tests__/create-ceramic.js' +import { TileDocument } from '@ceramicnetwork/stream-tile' +import { LogSyncer } from '../log-syncer.js' +import { StateManipulator } from '../state-manipulator.js' +import { HandlersMap } from '../../handlers-map.js' +import cloneDeep from 'lodash.clonedeep' +import { CID } from 'multiformats/cid' + +const TOPIC = '/ceramic/test12345' +const CONTENT0 = { step: 0 } +const CONTENT1 = { step: 1 } +const CONTENT2 = { step: 2 } + +function makeAppliable(log: Array): AppliableStreamLog { + return { commits: log, timestampStatus: 'validated' } +} + +describe('StateManipulator test', () => { + jest.setTimeout(1000 * 30) + + let dispatcher: Dispatcher + let dispatcherIpfs: IpfsApi + let logSyncer: LogSyncer + let stateManipulator: StateManipulator + + let doc: TileDocument + let commits: Array + + let ceramicIpfs: IpfsApi + let ceramic: Ceramic + + beforeAll(async () => { + ceramicIpfs = await createIPFS() + ceramic = await createCeramic(ceramicIpfs) + + dispatcherIpfs = await createIPFS() + dispatcher = await createDispatcher(dispatcherIpfs, TOPIC) + // speed up how quickly the dispatcher gives up on loading a non-existent commit from ipfs. + dispatcher._ipfsTimeout = 1000 + + const logger = new LoggerProvider().getDiagnosticsLogger() + logSyncer = new LogSyncer(dispatcher) + const handlers = new HandlersMap(logger) + stateManipulator = new StateManipulator( + logger, + handlers, + { did: ceramic.did, api: ceramic } as Context, + logSyncer + ) + + await swarmConnect(dispatcherIpfs, ceramicIpfs) + + // Create a standard stream and log to use throughout tests. + doc = await TileDocument.create(ceramic, CONTENT0) + await TestUtils.anchorUpdate(ceramic, doc) + await doc.update(CONTENT1) + await doc.update(CONTENT2) + await TestUtils.anchorUpdate(ceramic, doc) + + commits = (await logSyncer.syncFullLog(doc.id, doc.tip)).commits + }) + + afterAll(async () => { + await dispatcher.close() + await ceramic.close() + + // Wait for pubsub unsubscribe to be processed + // TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe + // has been processed + await TestUtils.delay(5000) + + await dispatcherIpfs.stop() + await ceramicIpfs.stop() + }) + + test('applyFullLog - normal case', async () => { + const state0 = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits.slice(0, 1)), + { throwOnInvalidCommit: true } + ) + expect(state0.log.length).toEqual(1) + expect(state0.content).toEqual(CONTENT0) + expect(state0.next).toBeUndefined() + expect(state0.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const state1 = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits.slice(0, 2)), + { throwOnInvalidCommit: true } + ) + expect(state1.log.length).toEqual(2) + expect(state1.content).toEqual(CONTENT0) + expect(state1.next).toBeUndefined() + expect(state1.anchorStatus).toEqual(AnchorStatus.ANCHORED) + + const state2 = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits.slice(0, 3)), + { throwOnInvalidCommit: true } + ) + expect(state2.log.length).toEqual(3) + expect(state2.content).toEqual(CONTENT0) + expect(state2.next.content).toEqual(CONTENT1) + expect(state2.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const state3 = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits.slice(0, 4)), + { throwOnInvalidCommit: true } + ) + expect(state3.log.length).toEqual(4) + expect(state3.content).toEqual(CONTENT0) + expect(state3.next.content).toEqual(CONTENT2) + expect(state3.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const state4 = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits.slice(0, 5)), + { throwOnInvalidCommit: true } + ) + expect(state4.log.length).toEqual(5) + expect(state4.content).toEqual(CONTENT2) + expect(state4.next).toBeUndefined() + expect(state4.anchorStatus).toEqual(AnchorStatus.ANCHORED) + }) + + test('applyFullLog - invalid commit', async () => { + const invalidCommit = cloneDeep(commits[3]) + invalidCommit.commit.prev = TestUtils.randomCID() // not a valid link to the stream log + const logWithInvalidCommit = commits.slice(0, 3).concat([invalidCommit, commits[4]]) + + // Will throw if 'throwOnInvalidCommit' is true + await expect( + stateManipulator.applyFullLog(doc.id.type, makeAppliable(logWithInvalidCommit), { + throwOnInvalidCommit: true, + }) + ).rejects.toThrow(/Commit doesn't properly point to previous commit in log/) + + // Will return what state it was able to successfully build if 'throwOnInvalidCommit' is false + const state = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(logWithInvalidCommit), + { + throwOnInvalidCommit: false, + } + ) + expect(state.log.length).toEqual(3) + expect(state.content).toEqual(CONTENT0) + expect(state.next.content).toEqual(CONTENT1) + expect(state.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + }) + + test('applyLogToState - clean apply to end of log', async () => { + // Make genesis state + const state0 = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits.slice(0, 1)), + { throwOnInvalidCommit: true } + ) + + // apply second and third commit + const state2 = await stateManipulator.applyLogToState( + state0, + makeAppliable(commits.slice(1, 3)), + { throwOnInvalidCommit: true, throwOnConflict: true, throwIfStale: true } + ) + expect(state2.log.length).toEqual(3) + expect(state2.content).toEqual(CONTENT0) + expect(state2.next.content).toEqual(CONTENT1) + expect(state2.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + // apply rest of log + const state4 = await stateManipulator.applyLogToState(state2, makeAppliable(commits.slice(3)), { + throwOnInvalidCommit: true, + throwOnConflict: true, + throwIfStale: true, + }) + expect(state4.log.length).toEqual(5) + expect(state4.content).toEqual(CONTENT2) + expect(state4.next).toBeUndefined() + expect(state4.anchorStatus).toEqual(AnchorStatus.ANCHORED) + }) + + describe('conflict resolution', () => { + let conflictingCommits: Array + + beforeAll(async () => { + conflictingCommits = cloneDeep(commits.slice(3)) + conflictingCommits[0].commit.header.randomField = 'this changes the hash of the commit!' + conflictingCommits[0].cid = await dispatcher.storeCommit(conflictingCommits[0].commit) + conflictingCommits[1].commit.prev = conflictingCommits[0].cid + conflictingCommits[1].cid = await dispatcher.storeCommit(conflictingCommits[1].commit) + + expect(conflictingCommits[0].cid.toString()).not.toEqual(commits[3].cid.toString()) + expect(conflictingCommits[1].cid.toString()).not.toEqual(commits[4].cid.toString()) + }) + + afterEach(() => { + // Reset any changes to timestamps made by the test + const clearTimestamps = function (log: Array) { + for (const commit of log) { + commit.timestamp = undefined + } + } + clearTimestamps(commits) + clearTimestamps(conflictingCommits) + }) + + test('Conflicting history is valid', async () => { + // this test is mostly a sanity check of the fixture setup, to ensure that the alternate log + // history (which will be used more in subsequent tests) is appliable. + const state = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits.slice(0, 3).concat(conflictingCommits)), + { + throwOnInvalidCommit: true, + } + ) + + expect(state.log.length).toEqual(5) + expect(state.content).toEqual(CONTENT2) + expect(state.log[0].cid.toString()).toEqual(commits[0].cid.toString()) + expect(state.log[1].cid.toString()).toEqual(commits[1].cid.toString()) + expect(state.log[2].cid.toString()).toEqual(commits[2].cid.toString()) + expect(state.log[3].cid.toString()).not.toEqual(commits[3].cid.toString()) + expect(state.log[4].cid.toString()).not.toEqual(commits[4].cid.toString()) + expect(state.log[3].cid.toString()).toEqual(conflictingCommits[0].cid.toString()) + expect(state.log[4].cid.toString()).toEqual(conflictingCommits[1].cid.toString()) + }) + + test('throw if stale', async () => { + const currentState = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits), + { + throwOnInvalidCommit: true, + } + ) + + // If the new long doesn't cleanly build on the old and 'throwIfStale' is true, we'll always + // throw before even considering conflict resolution rules. + await expect( + stateManipulator.applyLogToState(currentState, makeAppliable(conflictingCommits), { + throwOnInvalidCommit: true, + throwIfStale: true, + throwOnConflict: false, + }) + ).rejects.toThrow(/rejected because it builds on stale state/) + }) + + test('current state wins conflict resolution', async () => { + // Make current state be anchored earlier so it wins conflict resolution + const now = new Date().getTime() + const genesisTime = now - 10000 + const earlierAnchorTime = now - 5000 + const laterAnchorTime = now - 3000 + + commits[0].timestamp = genesisTime + commits[1].timestamp = genesisTime + commits[2].timestamp = earlierAnchorTime + commits[3].timestamp = earlierAnchorTime + commits[4].timestamp = earlierAnchorTime + + conflictingCommits[0].timestamp = laterAnchorTime + conflictingCommits[1].timestamp = laterAnchorTime + + const currentState = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits), + { + throwOnInvalidCommit: true, + } + ) + + const stateAfterApply = await stateManipulator.applyLogToState( + currentState, + makeAppliable(conflictingCommits), + { throwOnInvalidCommit: true, throwIfStale: false, throwOnConflict: false } + ) + // State should be unchanged since existing state was anchored first + expect(stateAfterApply).toEqual(currentState) + }) + + test('new history wins conflict resolution', async () => { + // Make remote log be anchored earlier so it wins conflict resolution + const now = new Date().getTime() + const genesisTime = now - 10000 + const earlierAnchorTime = now - 5000 + const laterAnchorTime = now - 3000 + + commits[0].timestamp = genesisTime + commits[1].timestamp = genesisTime + commits[2].timestamp = laterAnchorTime + commits[3].timestamp = laterAnchorTime + commits[4].timestamp = laterAnchorTime + + conflictingCommits[0].timestamp = earlierAnchorTime + conflictingCommits[1].timestamp = earlierAnchorTime + + const currentState = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits), + { + throwOnInvalidCommit: true, + } + ) + + const stateAfterApply = await stateManipulator.applyLogToState( + currentState, + makeAppliable(conflictingCommits), + { throwOnInvalidCommit: true, throwIfStale: false, throwOnConflict: true } + ) + + // State should be updated since the new log was anchored first + //expect(stateAfterApply).not.toEqual(currentState) + expect(stateAfterApply.log[3].cid.toString()).toEqual(conflictingCommits[0].cid.toString()) + expect(stateAfterApply.log[4].cid.toString()).toEqual(conflictingCommits[1].cid.toString()) + }) + + test('throwOnConflict', async () => { + // If new history does not win conflict resolution, throw + const now = new Date().getTime() + const genesisTime = now - 10000 + const earlierAnchorTime = now - 5000 + const laterAnchorTime = now - 3000 + + commits[0].timestamp = genesisTime + commits[1].timestamp = genesisTime + commits[2].timestamp = earlierAnchorTime + commits[3].timestamp = earlierAnchorTime + commits[4].timestamp = earlierAnchorTime + + conflictingCommits[0].timestamp = laterAnchorTime + conflictingCommits[1].timestamp = laterAnchorTime + + const currentState = await stateManipulator.applyFullLog( + doc.id.type, + makeAppliable(commits), + { + throwOnInvalidCommit: true, + } + ) + + await expect( + stateManipulator.applyLogToState(currentState, makeAppliable(conflictingCommits), { + throwOnInvalidCommit: true, + throwIfStale: false, + throwOnConflict: true, + }) + ).rejects.toThrow(/rejected by conflict resolution/) + }) + + describe('pickLogToAccept', () => { + // Targeted tests of the core conflict resolution logic. + + let cids: Array + + beforeAll(() => { + cids = [ + TestUtils.randomCID(), + TestUtils.randomCID(), + TestUtils.randomCID(), + TestUtils.randomCID(), + TestUtils.randomCID(), + TestUtils.randomCID(), + TestUtils.randomCID(), + ] + cids.sort(function (cid1, cid2) { + if (cid1.bytes < cid2.bytes) { + return -1 + } else if (cid1.bytes > cid2.bytes) { + return 1 + } else { + return 0 + } + }) + }) + + test('Neither log is anchored, same log lengths', async () => { + const log1: Array = [ + { cid: cids[1], type: CommitType.SIGNED }, + { cid: cids[2], type: CommitType.SIGNED }, + ] + const log2: Array = [ + { cid: cids[4], type: CommitType.SIGNED }, + { cid: cids[0], type: CommitType.SIGNED }, + ] + + // When neither log is anchored and log lengths are the same we should pick the log whose last entry has the + // smaller CID. + expect(stateManipulator._pickLogToAccept(log1, log2)).toEqual(log2) + expect(stateManipulator._pickLogToAccept(log2, log1)).toEqual(log2) + }) + + test('Neither log is anchored, different log lengths', async () => { + const log1: Array = [ + { cid: cids[1], type: CommitType.SIGNED }, + { cid: cids[2], type: CommitType.SIGNED }, + { cid: cids[3], type: CommitType.SIGNED }, + ] + const log2: Array = [ + { cid: cids[4], type: CommitType.SIGNED }, + { cid: cids[0], type: CommitType.SIGNED }, + ] + + // When neither log is anchored and log lengths are different we should pick the log with + // greater length + expect(stateManipulator._pickLogToAccept(log1, log2)).toEqual(log1) + expect(stateManipulator._pickLogToAccept(log2, log1)).toEqual(log1) + }) + + test('One log anchored before the other', async () => { + const log1: Array = [{ cid: cids[1], type: CommitType.SIGNED }] + const log2: Array = [{ cid: cids[2], type: CommitType.ANCHOR }] + + // When only one of the logs has been anchored, we pick the anchored one + expect(stateManipulator._pickLogToAccept(log1, log2)).toEqual(log2) + expect(stateManipulator._pickLogToAccept(log2, log1)).toEqual(log2) + }) + + test('Both logs anchored in different blocks', async () => { + const now = new Date().getTime() + const earlierTime = now - 10000 + const laterTime = now - 5000 + const log1: Array = [ + { cid: cids[0], type: CommitType.SIGNED, timestamp: laterTime }, + { cid: cids[1], type: CommitType.ANCHOR, timestamp: laterTime }, + ] + const log2: Array = [ + { cid: cids[2], type: CommitType.SIGNED, timestamp: earlierTime }, + { cid: cids[3], type: CommitType.ANCHOR, timestamp: earlierTime }, + ] + + // When both logs are anchored, take the one anchored in the earlier block. + expect(stateManipulator._pickLogToAccept(log1, log2)).toEqual(log2) + expect(stateManipulator._pickLogToAccept(log2, log1)).toEqual(log2) + }) + + test('Both logs anchored in same block blocks - different log lengths', async () => { + const timestamp = new Date().getTime() + const log1: Array = [ + { cid: cids[1], type: CommitType.SIGNED, timestamp }, + { cid: cids[2], type: CommitType.SIGNED, timestamp }, + { cid: cids[3], type: CommitType.ANCHOR, timestamp }, + ] + const log2: Array = [ + { cid: cids[4], type: CommitType.SIGNED, timestamp }, + { cid: cids[0], type: CommitType.ANCHOR, timestamp }, + ] + + // When anchored in the same block, and with different log lengths, we should choose the one + // with longer log length + expect(stateManipulator._pickLogToAccept(log1, log2)).toEqual(log1) + expect(stateManipulator._pickLogToAccept(log2, log1)).toEqual(log1) + }) + + test('Both logs anchored in same block blocks - different log lengths - multiple anchors', async () => { + const now = new Date().getTime() + const earlierTime = now - 10000 + const laterTime = now - 5000 + const log1: Array = [ + { cid: cids[1], type: CommitType.SIGNED, timestamp: earlierTime }, + { cid: cids[2], type: CommitType.SIGNED, timestamp: earlierTime }, + { cid: cids[3], type: CommitType.ANCHOR, timestamp: earlierTime }, + ] + const log2: Array = [ + { cid: cids[4], type: CommitType.SIGNED, timestamp: earlierTime }, + { cid: cids[0], type: CommitType.ANCHOR, timestamp: earlierTime }, + { cid: cids[5], type: CommitType.SIGNED, timestamp: laterTime }, + { cid: cids[6], type: CommitType.ANCHOR, timestamp: laterTime }, + ] + + // When first anchor is in the same block, break tie only with log length until first anchor, + // log length after the first anchor is irrelevant. + expect(stateManipulator._pickLogToAccept(log1, log2)).toEqual(log1) + expect(stateManipulator._pickLogToAccept(log2, log1)).toEqual(log1) + }) + + test('Both logs anchored in the same block with the same log lengths', async () => { + const timestamp = new Date().getTime() + const log1: Array = [ + { cid: cids[1], type: CommitType.SIGNED, timestamp }, + { cid: cids[2], type: CommitType.ANCHOR, timestamp }, + ] + const log2: Array = [ + { cid: cids[4], type: CommitType.SIGNED, timestamp }, + { cid: cids[0], type: CommitType.ANCHOR, timestamp }, + ] + + // When anchored in the same block, and with same log lengths, we should use + // the fallback mechanism of picking the log whose last entry has the smaller CID + expect(stateManipulator._pickLogToAccept(log1, log2)).toEqual(log2) + expect(stateManipulator._pickLogToAccept(log2, log1)).toEqual(log2) + }) + }) + }) +}) diff --git a/packages/core/src/stream-loading/__tests__/stream-loader.test.ts b/packages/core/src/stream-loading/__tests__/stream-loader.test.ts new file mode 100644 index 0000000000..a5c72e0741 --- /dev/null +++ b/packages/core/src/stream-loading/__tests__/stream-loader.test.ts @@ -0,0 +1,129 @@ +import { jest } from '@jest/globals' +import { Dispatcher } from '../../dispatcher.js' +import { createIPFS, swarmConnect } from '@ceramicnetwork/ipfs-daemon' +import { createDispatcher } from '../../__tests__/create-dispatcher.js' +import { + AnchorStatus, + Context, + IpfsApi, + LoggerProvider, + StreamState, + StreamUtils, + TestUtils, +} from '@ceramicnetwork/common' +import { Ceramic } from '../../ceramic.js' +import { createCeramic } from '../../__tests__/create-ceramic.js' +import { TileDocument } from '@ceramicnetwork/stream-tile' +import { LogSyncer } from '../log-syncer.js' +import { StateManipulator } from '../state-manipulator.js' +import { HandlersMap } from '../../handlers-map.js' +import { StreamLoader } from '../stream-loader.js' +import { TipFetcher } from '../tip-fetcher.js' +import { AnchorTimestampExtractor } from '../anchor-timestamp-extractor.js' +import { InMemoryAnchorService } from '../../anchor/memory/in-memory-anchor-service.js' + +const TOPIC = '/ceramic/test12345' +const CONTENT0 = { step: 0 } +const CONTENT1 = { step: 1 } +const CONTENT2 = { step: 2 } + +function expectStatesEqualWithPendingAnchor( + stateWithPendingAnchor: StreamState, + stateWithoutPendingAnchor: StreamState +) { + expect(stateWithPendingAnchor.anchorStatus).toEqual(AnchorStatus.PENDING) + expect(stateWithoutPendingAnchor.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + delete stateWithPendingAnchor.anchorStatus + delete stateWithoutPendingAnchor.anchorStatus + expect(StreamUtils.serializeState(stateWithoutPendingAnchor)).toEqual( + StreamUtils.serializeState(stateWithPendingAnchor) + ) +} + +describe('StreamLoader test', () => { + jest.setTimeout(1000 * 30) + + let dispatcher: Dispatcher + let dispatcherIpfs: IpfsApi + let streamLoader: StreamLoader + + let ceramicIpfs: IpfsApi + let ceramic: Ceramic + + beforeAll(async () => { + ceramicIpfs = await createIPFS() + ceramic = await createCeramic(ceramicIpfs, { pubsubTopic: TOPIC }) + + dispatcherIpfs = await createIPFS() + dispatcher = await createDispatcher(dispatcherIpfs, TOPIC) + // speed up how quickly the dispatcher gives up on loading a non-existent commit from ipfs. + dispatcher._ipfsTimeout = 1000 + + const logger = new LoggerProvider().getDiagnosticsLogger() + const tipFetcher = new TipFetcher(dispatcher.messageBus) + const logSyncer = new LogSyncer(dispatcher) + const anchorTimestampExtractor = new AnchorTimestampExtractor( + logger, + dispatcher, + ceramic.context.anchorService as InMemoryAnchorService + ) + const handlers = new HandlersMap(logger) + const stateManipulator = new StateManipulator( + logger, + handlers, + { did: ceramic.did, api: ceramic } as Context, + logSyncer + ) + streamLoader = new StreamLoader( + logger, + tipFetcher, + logSyncer, + anchorTimestampExtractor, + stateManipulator + ) + + await swarmConnect(dispatcherIpfs, ceramicIpfs) + }) + + afterAll(async () => { + await dispatcher.close() + await ceramic.close() + + // Wait for pubsub unsubscribe to be processed + // TODO(1963): Remove this once dispatcher.close() won't resolve until the pubsub unsubscribe + // has been processed + await TestUtils.delay(5000) + + await dispatcherIpfs.stop() + await ceramicIpfs.stop() + }) + + describe('loadStream', () => { + test('basic stream load', async () => { + const doc = await TileDocument.create(ceramic, CONTENT0) + + const loadedState0 = await streamLoader.loadStream(doc.id, 3) + expectStatesEqualWithPendingAnchor(doc.state, loadedState0) + + await TestUtils.anchorUpdate(ceramic, doc) + const loadedState1 = await streamLoader.loadStream(doc.id, 3) + expect(StreamUtils.serializeState(loadedState1)).toEqual( + StreamUtils.serializeState(doc.state) + ) + + await doc.update(CONTENT1) + const loadedState2 = await streamLoader.loadStream(doc.id, 3) + expectStatesEqualWithPendingAnchor(doc.state, loadedState2) + + await doc.update(CONTENT2) + const loadedState3 = await streamLoader.loadStream(doc.id, 3) + expectStatesEqualWithPendingAnchor(doc.state, loadedState3) + + await TestUtils.anchorUpdate(ceramic, doc) + const loadedState4 = await streamLoader.loadStream(doc.id, 3) + expect(StreamUtils.serializeState(loadedState4)).toEqual( + StreamUtils.serializeState(doc.state) + ) + }) + }) +}) diff --git a/packages/core/src/stream-loading/state-manipulator.ts b/packages/core/src/stream-loading/state-manipulator.ts index d9bd81a28b..2aa762916a 100644 --- a/packages/core/src/stream-loading/state-manipulator.ts +++ b/packages/core/src/stream-loading/state-manipulator.ts @@ -1,4 +1,38 @@ -import { AppliableStreamLog, StreamState } from '@ceramicnetwork/common' +import { + AppliableStreamLog, + CommitType, + Context, + DiagnosticsLogger, + LogEntry, + Stream, + StreamHandler, + StreamState, + StreamUtils, + UnappliableStreamLog, +} from '@ceramicnetwork/common' +import { HandlersMap } from '../handlers-map.js' +import { LogSyncer } from './log-syncer.js' + +/** + * @param throwOnInvalidCommit - if true, throws if there is an error applying a commit, otherwise + * returns the state that was built so far. + */ +interface ApplyFullLogOpts { + throwOnInvalidCommit: boolean +} + +/** + * @param throwOnInvalidCommit - if true, throws if there is an error applying a commit, otherwise + * returns the state that was built so far. + * @param throwIfStale - if true, throws if the log to apply does not build directly on top of + * the existing state. + * @param throwOnConflict - if true, throws if the log to apply is rejected by conflict resolution. + */ +interface ApplyLogToStateOpts { + throwOnInvalidCommit: boolean + throwIfStale: boolean + throwOnConflict: boolean +} /** * Entirely stateless class for applying logs of Stream commits to StreamStates. Handles all @@ -6,22 +40,227 @@ import { AppliableStreamLog, StreamState } from '@ceramicnetwork/common' * persistence of the state information. */ export class StateManipulator { + constructor( + private readonly logger: DiagnosticsLogger, + private readonly streamTypeHandlers: HandlersMap, + private readonly context: Context, + private readonly logSyncer: LogSyncer + ) {} + + async _applyLog( + handler: StreamHandler, + state: StreamState | null, + log: AppliableStreamLog, + throwOnInvalidCommit: boolean + ): Promise { + for (const commit of log.commits) { + try { + state = await handler.applyCommit(commit, this.context, state) + } catch (err) { + if (throwOnInvalidCommit || state == null) { + throw err + } else { + return state + } + } + } + return state + } + /** * Applies a complete Stream log (including a Genesis commit) in order to create a StreamState. - * @param log + * @param streamType - the StreamType that this stream should be interpreted as. + * @param log - list of commits to apply to create the stream, starting with the genesis commit + * @param opts - options to control behavior during log application. + */ + async applyFullLog( + streamType: number, + log: AppliableStreamLog, + opts: ApplyFullLogOpts + ): Promise { + if (log.commits.length < 1) { + throw new Error(`Log must contain at least one commit to apply`) // this should be impossible + } + const handler = this.streamTypeHandlers.get(streamType) + + return this._applyLog(handler, null, log, opts.throwOnInvalidCommit) + } + + /** + * Transforms an UnappliableStreamLog to an AppliableStreamLog by getting the timestamp + * information from the log of an existing StreamState for the same stream. Because the source + * log comes from a StreamState that has already had its anchor commits validated, there is no + * need to re-do the work of anchor commit validation. + * Note the source log could be longer than the destination log and may contain anchors that + * are not a part of the destination log - it is important that we ignore timestamps from anchors + * that are not present in the destination log. + * @param source - a log of LogEntries (which contains less information than a full CommitData) + * from a StreamState for the stream in question. Must be at least as long as 'dest', and the + * entries must correspond 1-1 to the entries of 'dest' (up to the length of 'dest'). Must come + * from a StreamState where the timestamps from the anchor commits has already been validated. + * @param dest - a freshly synced log containing a subset of the entries of 'source', but with + * full CommitDatas that have all the information necessary to apply the commits, but are + * missing timestamps extracted from the AnchorCommits. */ - async applyFullLog(log: AppliableStreamLog): Promise { - throw new Error(`Not yet implemented`) + _copyTrustedTimestamps(source: Array, dest: UnappliableStreamLog): AppliableStreamLog { + let timestamp = null + for (let i = dest.commits.length - 1; i >= 0; i--) { + if (source[i].type == CommitType.ANCHOR) { + timestamp = source[i].timestamp + } + if (!source[i].cid.equals(dest.commits[i].cid)) { + // this should be impossible and would indicate programmer error if it happened. + // Included only as a sanity check. + throw new Error(`Source and dest logs don't correspond!`) + } + + dest.commits[i].expirationTime = source[i].expirationTime + dest.commits[i].timestamp = timestamp + } + + return { commits: dest.commits, timestampStatus: 'validated' } + } + + _findAnchorIndex(log: Array): number { + return log.findIndex((logEntry) => logEntry.type == CommitType.ANCHOR) } + /** + * Given two different Stream logs representing two different conflicting histories of the same + * Stream, pick which history to accept, in accordance with our conflict resolution strategy. + * The inputted logs should contain only the new commits past the divergence point between the + * two histories - there should be no commits in common between the two input logs. + * @param log1 + * @param log2 + * @returns the log that is selected by the conflict resolution rules. + */ + _pickLogToAccept(log1: Array, log2: Array): Array { + const firstAnchorIndexForLog1 = this._findAnchorIndex(log1) + const firstAnchorIndexForLog2 = this._findAnchorIndex(log2) + const isLog1Anchored = firstAnchorIndexForLog1 >= 0 + const isLog2Anchored = firstAnchorIndexForLog2 >= 0 + + // When one of the logs is anchored but not the other, take the one that is anchored + if (isLog1Anchored != isLog2Anchored) { + return isLog1Anchored ? log1 : log2 + } + + if (isLog1Anchored && isLog2Anchored) { + // When both logs are anchored, take the one anchored first. + const anchorTimestamp1 = log1[firstAnchorIndexForLog1].timestamp + const anchorTimestamp2 = log2[firstAnchorIndexForLog2].timestamp + if (anchorTimestamp1 < anchorTimestamp2) { + return log1 + } else if (anchorTimestamp2 < anchorTimestamp1) { + return log2 + } + } + + // When both logs are anchored in the same block (or neither log is anchored), compare log + // lengths until that anchor (or the end of the log if not anchored) and choose the one with + // longer length. + // TODO(CDB-2746) - it's kind of dumb that we only consider the log up until the first anchor. + // This is basically a holdover from the way conflict resolution was originally implemented, but + // changing it now would be a breaking change. + const relevantLength1 = isLog1Anchored ? firstAnchorIndexForLog1 + 1 : log1.length + const relevantLength2 = isLog2Anchored ? firstAnchorIndexForLog2 + 1 : log2.length + + if (relevantLength1 > relevantLength2) { + return log1 + } else if (relevantLength1 < relevantLength2) { + return log2 + } + + // If we got this far, that means that we don't have sufficient information to make a good + // decision about which log to choose. The most common way this can happen is that neither log + // is anchored, although it can also happen if both are anchored but in the same blockNumber or + // blockTimestamp. At this point, the decision of which log to take is arbitrary, but we want it + // to still be deterministic. Therefore, we take the log whose last entry has the lowest CID. + return log1[log1.length - 1].cid.bytes < log2[log2.length - 1].cid.bytes ? log1 : log2 + } + + async _applyLogToState_noCacaoVerification( + initialState: StreamState, + logToApply: AppliableStreamLog, + opts: ApplyLogToStateOpts + ): Promise { + if (logToApply.commits.length == 0) { + return initialState + } + + const handler = this.streamTypeHandlers.get(initialState.type) + + const firstNewCommit = logToApply.commits[0].commit + const initialTip = initialState.log[initialState.log.length - 1].cid + if (firstNewCommit.prev.equals(initialTip)) { + // the new log starts where the previous one ended + return this._applyLog(handler, initialState, logToApply, opts.throwOnInvalidCommit) + } + + // we have a conflict since prev is in the log of the local state, but isn't the tip + // BEGIN CONFLICT RESOLUTION + const conflictingTip = logToApply.commits[logToApply.commits.length - 1].cid + const streamId = StreamUtils.streamIdFromState(initialState) + if (opts.throwIfStale) { + // If this tip came from a client-initiated request and it doesn't build off the node's + // current local state, that means the client has a stale view of the data. Even if the new + // commit would win the arbitrary conflict resolution with the local state, that just + // increases the likelihood of lost writes. Clients should always at least be in sync with + // their Ceramic node when authoring new writes. + throw new Error( + `Commit to stream ${streamId.toString()} rejected because it builds on stale state. Calling 'sync()' on the stream handle will synchronize the stream state in the client with that on the Ceramic node. Rejected commit CID: ${conflictingTip}. Current tip: ${initialTip}` + ) + } + + // Index of the last entry that is shared between both histories + const conflictIdx = initialState.log.findIndex((entry) => entry.cid.equals(firstNewCommit.prev)) + const localConflictingLog = initialState.log.slice(conflictIdx + 1) + const selectedLog = this._pickLogToAccept(localConflictingLog, logToApply.commits) + if (selectedLog == localConflictingLog) { + if (opts.throwOnConflict) { + throw new Error( + `Commit to stream ${streamId.toString()} rejected by conflict resolution. Rejected commit CID: ${conflictingTip.toString()}. Current tip: ${initialTip.toString()}` + ) + } + return initialState + } + + // Remote log was selected. We need to build the state that corresponds to the new log. + + // First get the stream state at the divergence point + const sharedLogWithoutTimestamps = await this.logSyncer.syncFullLog( + streamId, + initialState.log[conflictIdx].cid + ) + const sharedLogWithTimestamps = this._copyTrustedTimestamps( + initialState.log, + sharedLogWithoutTimestamps + ) + const sharedState = await this._applyLog(handler, null, sharedLogWithTimestamps, true) + + // Now apply the new log to the shared state + return this._applyLog(handler, sharedState, logToApply, opts.throwOnInvalidCommit) + } /** * Applies a log of new commits to an existing StreamState to get the updated StreamState * resulting from applying the log. It's possible that the new StreamState could be the same as * the old one if the log is rejected due to conflict resolution. - * @param state - * @param log + * @param initialState - current StreamState to apply commits onto + * @param logToApply - list of commits to apply to the given StreamState + * @param opts - options to control behavior during log application. */ - async applyLogToState(state: StreamState, log: AppliableStreamLog): Promise { - throw new Error(`Not yet implemented`) + async applyLogToState( + initialState: StreamState, + logToApply: AppliableStreamLog, + opts: ApplyLogToStateOpts + ): Promise { + const state = await this._applyLogToState_noCacaoVerification(initialState, logToApply, opts) + + // The initial state may have included commits that were valid previously but have since had + // their CACAOs expire. Before returning the state back to the caller we should double-check + // that it is based all on valid commits without expired CACAOs. + StreamUtils.checkForCacaoExpiration(state) + + return state } } diff --git a/packages/core/src/stream-loading/stream-loader.ts b/packages/core/src/stream-loading/stream-loader.ts index 7646706e1e..09d1ca1171 100644 --- a/packages/core/src/stream-loading/stream-loader.ts +++ b/packages/core/src/stream-loading/stream-loader.ts @@ -31,7 +31,9 @@ export class StreamLoader { const logWithTimestamps = await this.anchorTimestampExtractor.verifyAnchorAndApplyTimestamps( logWithoutTimestamps ) - return await this.stateManipulator.applyFullLog(logWithTimestamps) + return this.stateManipulator.applyFullLog(streamID.type, logWithTimestamps, { + throwOnInvalidCommit: false, + }) } /** @@ -51,6 +53,10 @@ export class StreamLoader { const logWithTimestamps = await this.anchorTimestampExtractor.verifyAnchorAndApplyTimestamps( logWithoutTimestamps ) - return await this.stateManipulator.applyLogToState(state, logWithTimestamps) + return await this.stateManipulator.applyLogToState(state, logWithTimestamps, { + throwOnInvalidCommit: false, + throwIfStale: false, + throwOnConflict: false, + }) } }