diff --git a/packages/core/src/__tests__/state-manager.test.ts b/packages/core/src/__tests__/state-manager.test.ts index 4d84020037..a9289a74a8 100644 --- a/packages/core/src/__tests__/state-manager.test.ts +++ b/packages/core/src/__tests__/state-manager.test.ts @@ -14,7 +14,6 @@ import { IpfsApi, SignatureStatus, Stream, - StreamUtils, TestUtils, AnchorCommit, } from '@ceramicnetwork/common' @@ -25,20 +24,16 @@ import { createIPFS } from '@ceramicnetwork/ipfs-daemon' import { createCeramic } from './create-ceramic.js' import { Ceramic } from '../ceramic.js' import { TileDocument } from '@ceramicnetwork/stream-tile' -import { streamFromState } from '../state-management/stream-from-state.js' import * as uint8arrays from 'uint8arrays' import * as sha256 from '@stablelib/sha256' -import { CommitID, StreamID } from '@ceramicnetwork/streamid' +import { StreamID } from '@ceramicnetwork/streamid' import { from, Subject, timer, firstValueFrom } from 'rxjs' import { concatMap, map } from 'rxjs/operators' import { MAX_RESPONSE_INTERVAL } from '../pubsub/message-bus.js' -import cloneDeep from 'lodash.clonedeep' -import { StateLink } from '../state-management/state-link.js' import { InMemoryAnchorService } from '../anchor/memory/in-memory-anchor-service.js' import { whenSubscriptionDone } from './when-subscription-done.util.js' import { CASResponse, AnchorRequestStatusName } from '@ceramicnetwork/codecs' -const FAKE_CID = CID.parse('bafybeig6xv5nwphfmvcnektpnojts33jqcuam7bmye2pb54adnrtccjlsu') const INITIAL_CONTENT = { abc: 123, def: 456 } const STRING_MAP_SCHEMA = { $schema: 'http://json-schema.org/draft-07/schema#', @@ -154,279 +149,6 @@ describe('anchor', () => { await ceramic2.close() }) - test('commit history and atCommit', async () => { - const stream = await TileDocument.create(ceramic, INITIAL_CONTENT) - stream.subscribe() - const streamState = await ceramic.repository.load(stream.id, {}) - - const commit0 = stream.allCommitIds[0] - expect(stream.commitId).toEqual(commit0) - expect(commit0.equals(CommitID.make(streamState.id, streamState.id.cid))).toBeTruthy() - - await TestUtils.anchorUpdate(ceramic, stream) - expect(stream.allCommitIds.length).toEqual(2) - expect(stream.anchorCommitIds.length).toEqual(1) - const commit1 = stream.allCommitIds[1] - expect(commit1.equals(commit0)).toBeFalsy() - expect(commit1).toEqual(stream.commitId) - expect(commit1).toEqual(stream.anchorCommitIds[0]) - - const newContent = { abc: 321, def: 456, gh: 987 } - const updateRec = await stream.makeCommit(ceramic, newContent) - await ceramic.repository.applyCommit(streamState.id, updateRec, { - anchor: true, - publish: false, - }) - expect(stream.allCommitIds.length).toEqual(3) - expect(stream.anchorCommitIds.length).toEqual(1) - const commit2 = stream.allCommitIds[2] - expect(commit2.equals(commit1)).toBeFalsy() - expect(commit2).toEqual(stream.commitId) - - await TestUtils.anchorUpdate(ceramic, stream) - expect(stream.allCommitIds.length).toEqual(4) - expect(stream.anchorCommitIds.length).toEqual(2) - const commit3 = stream.allCommitIds[3] - expect(commit3.equals(commit2)).toBeFalsy() - expect(commit3).toEqual(stream.commitId) - expect(commit3).toEqual(stream.anchorCommitIds[1]) - expect(stream.content).toEqual(newContent) - expect(stream.state.signature).toEqual(SignatureStatus.SIGNED) - expect(stream.state.anchorStatus).not.toEqual(AnchorStatus.NOT_REQUESTED) - expect(stream.state.log.length).toEqual(4) - - // Apply a final commit that does not get anchored - const finalContent = { foo: 'bar' } - const updateRec2 = await stream.makeCommit(ceramic, finalContent) - await ceramic.repository.applyCommit(streamState.id, updateRec2, { - anchor: true, - publish: false, - }) - - expect(stream.allCommitIds.length).toEqual(5) - expect(stream.anchorCommitIds.length).toEqual(2) - const commit4 = stream.allCommitIds[4] - expect(commit4.equals(commit3)).toBeFalsy() - expect(commit4).toEqual(stream.commitId) - expect(commit4.equals(stream.anchorCommitIds[1])).toBeFalsy() - expect(stream.state.log.length).toEqual(5) - - await TestUtils.waitForAnchor(stream, 3000) - - // Correctly check out a specific commit - const streamStateOriginal = cloneDeep(streamState.state) - const streamV0 = await ceramic.repository.stateManager.atCommit(streamState, commit0) - expect(streamV0.id.equals(commit0.baseID)).toBeTruthy() - expect(streamV0.value.log.length).toEqual(1) - expect(streamV0.value.metadata.controllers).toEqual(controllers) - expect(streamV0.value.content).toEqual(INITIAL_CONTENT) - expect(streamV0.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) - - const streamV1 = await ceramic.repository.stateManager.atCommit(streamState, commit1) - expect(streamV1.id.equals(commit1.baseID)).toBeTruthy() - expect(streamV1.value.log.length).toEqual(2) - expect(streamV1.value.metadata.controllers).toEqual(controllers) - expect(streamV1.value.content).toEqual(INITIAL_CONTENT) - expect(streamV1.value.anchorStatus).toEqual(AnchorStatus.ANCHORED) - - const streamV2 = await ceramic.repository.stateManager.atCommit(streamState, commit2) - expect(streamV2.id.equals(commit2.baseID)).toBeTruthy() - expect(streamV2.value.log.length).toEqual(3) - expect(streamV2.value.metadata.controllers).toEqual(controllers) - expect(streamV2.value.next.content).toEqual(newContent) - expect(streamV2.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) - - const streamV3 = await ceramic.repository.stateManager.atCommit(streamState, commit3) - expect(streamV3.id.equals(commit3.baseID)).toBeTruthy() - expect(streamV3.value.log.length).toEqual(4) - expect(streamV3.value.metadata.controllers).toEqual(controllers) - expect(streamV3.value.content).toEqual(newContent) - expect(streamV3.value.anchorStatus).toEqual(AnchorStatus.ANCHORED) - - const streamV4 = await ceramic.repository.stateManager.atCommit(streamState, commit4) - expect(streamV4.id.equals(commit4.baseID)).toBeTruthy() - expect(streamV4.value.log.length).toEqual(5) - expect(streamV4.value.metadata.controllers).toEqual(controllers) - expect(streamV4.value.next.content).toEqual(finalContent) - expect(streamV4.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) - - // Ensure that stateManager.atCommit does not mutate the passed in state object - expect(StreamUtils.serializeState(streamState.state)).toEqual( - StreamUtils.serializeState(streamStateOriginal) - ) - }) - - describe('atCommit', () => { - test('non-existing commit', async () => { - const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false }) - const streamState = await ceramic.repository.load(stream.id, {}) - // Emulate loading a non-existing commit - const nonExistentCommitID = CommitID.make(stream.id, FAKE_CID) - const originalRetrieve = ceramic.dispatcher.retrieveCommit.bind(ceramic.dispatcher) - ceramic.dispatcher.retrieveCommit = jest.fn(async (cid: CID) => { - if (cid.equals(FAKE_CID)) { - return null - } else { - return originalRetrieve(cid) - } - }) - await expect( - ceramic.repository.stateManager.atCommit(streamState, nonExistentCommitID) - ).rejects.toThrow(`No commit found for CID ${nonExistentCommitID.commit?.toString()}`) - }) - - test('return read-only snapshot', async () => { - const stream1 = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { - anchor: false, - syncTimeoutSeconds: 0, - }) - await stream1.update({ abc: 321, def: 456, gh: 987 }) - await TestUtils.anchorUpdate(ceramic, stream1) - - const ceramic2 = await createCeramic(ipfs, { anchorOnRequest: false }) - const stream2 = await TileDocument.load(ceramic, stream1.id) - const streamState2 = await ceramic2.repository.load(stream2.id, { syncTimeoutSeconds: 0 }) - const snapshot = await ceramic2.repository.stateManager.atCommit( - streamState2, - stream1.commitId - ) - - expect(StreamUtils.statesEqual(snapshot.state, stream1.state)) - const snapshotStream = streamFromState( - ceramic2.context, - ceramic2._streamHandlers, - snapshot.value - ) - - // Snapshot is read-only - await expect(snapshotStream.update({ abc: 1010 })).rejects.toThrow( - 'Historical stream commits cannot be modified. Load the stream without specifying a commit to make updates.' - ) - - // We fast-forward streamState2, because the commit is legit - expect(streamState2.state).toEqual(stream1.state) - - await ceramic2.close() - }) - - test('return read-only snapshot: do not lose own anchor status', async () => { - // Prepare commits to play - const tile1 = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { - anchor: false, - syncTimeoutSeconds: 0, - }) - await tile1.update({ a: 1 }) - - // Let's pretend we have a stream in PENDING state - const pendingState = { - ...tile1.state, - anchorStatus: AnchorStatus.PENDING, - } - const base$ = new StateLink(pendingState) - // We request a snapshot at the latest commit - const snapshot = await ceramic.repository.stateManager.atCommit(base$, tile1.commitId) - // Do not fast-forward the base state: retain PENDING anchor status - expect(base$.state).toBe(pendingState) - // The snapshot is reported to be anchored though - expect(snapshot.state.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) - }) - - test('commit ahead of current state', async () => { - const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false }) - const streamState = await ceramic.repository.load(stream.id, {}) - // Provide a new commit that the repository doesn't currently know about - const newContent = { abc: 321, def: 456, gh: 987 } - const updateCommit = await stream.makeCommit(ceramic, newContent) - const futureCommitCID = await ceramic.dispatcher.storeCommit(updateCommit) - const futureCommitID = CommitID.make(stream.id, futureCommitCID) - - // Now load the stream at a commitID ahead of what is currently in the state in the repository. - // The existing RunningState from the repository should also get updated - const state$ = await ceramic.repository.load(futureCommitID.baseID, {}) - const snapshot = await ceramic.repository.stateManager.atCommit(state$, futureCommitID) - expect(snapshot.value.next.content).toEqual(newContent) - expect(snapshot.value.log.length).toEqual(2) - expect(StreamUtils.serializeState(streamState.state)).toEqual( - StreamUtils.serializeState(snapshot.value) - ) - }) - }) - - test('handles basic conflict', async () => { - const stream1 = await TileDocument.create(ceramic, INITIAL_CONTENT) - stream1.subscribe() - const streamState1 = await ceramic.repository.load(stream1.id, {}) - const streamId = stream1.id - await TestUtils.anchorUpdate(ceramic, stream1) - const tipPreUpdate = stream1.tip - - const newContent = { abc: 321, def: 456, gh: 987 } - let updateRec = await stream1.makeCommit(ceramic, newContent) - await ceramic.repository.applyCommit(streamState1.id, updateRec, { - anchor: true, - publish: false, - }) - - await TestUtils.anchorUpdate(ceramic, stream1) - expect(stream1.content).toEqual(newContent) - const tipValidUpdate = stream1.tip - // create invalid change that happened after main change - - const initialState = await ceramic.repository.stateManager - .atCommit(streamState1, CommitID.make(streamId, streamId.cid)) - .then((stream) => stream.state) - const state$ = new RunningState(initialState, true) - ceramic.repository.add(state$) - await ceramic.repository._internals.handleTip(state$, tipPreUpdate) - await new Promise((resolve) => setTimeout(resolve, 1000)) - - const conflictingNewContent = { asdf: 2342 } - const stream2 = streamFromState( - ceramic.context, - ceramic._streamHandlers, - state$.value, - ceramic.repository.updates$ - ) - stream2.subscribe() - updateRec = await stream2.makeCommit(ceramic, conflictingNewContent) - await ceramic.repository.applyCommit(state$.id, updateRec, { - anchor: true, - publish: false, - }) - - await TestUtils.anchorUpdate(ceramic, stream2) - const tipInvalidUpdate = state$.tip - expect(stream2.content).toEqual(conflictingNewContent) - // loading tip from valid log to stream with invalid - // log results in valid state - await ceramic.repository._internals.handleTip(state$, tipValidUpdate) - expect(stream2.content).toEqual(newContent) - - // loading tip from invalid log to stream with valid - // log results in valid state - await ceramic.repository._internals.handleTip(streamState1, tipInvalidUpdate) - expect(stream1.content).toEqual(newContent) - - // Loading valid commit works - const streamState1Original = cloneDeep(streamState1.state) - const streamAtValidCommit = await ceramic.repository.stateManager.atCommit( - streamState1, - CommitID.make(streamId, tipValidUpdate) - ) - expect(streamAtValidCommit.value.content).toEqual(newContent) - - // Loading invalid commit fails - await expect( - ceramic.repository.stateManager.atCommit( - streamState1, - CommitID.make(streamId, tipInvalidUpdate) - ) - ).rejects.toThrow(/rejected by conflict resolution/) - - // Ensure that stateManager.atCommit does not mutate the passed in state object - expect(JSON.stringify(streamState1.state)).toEqual(JSON.stringify(streamState1Original)) - }, 10000) - test('enforces schema in update that assigns schema', async () => { const schemaDoc = await TileDocument.create(ceramic, STRING_MAP_SCHEMA) await TestUtils.anchorUpdate(ceramic, schemaDoc) diff --git a/packages/core/src/conflict-resolution.ts b/packages/core/src/conflict-resolution.ts index 8822926850..ca4d3025c6 100644 --- a/packages/core/src/conflict-resolution.ts +++ b/packages/core/src/conflict-resolution.ts @@ -374,6 +374,9 @@ export class ConflictResolution { // rejected it. const commitIndex = baseStateLog.findIndex(commitId.commit) if (commitIndex < 0) { + // Note this should never happen. Since we set `throwOnConflict` in the opts, applyTip() + // have thrown already if the commit was rejected by conflict resolution. It would indicate + // programming error if this Error was ever actually thrown. throw new Error( `Requested commit CID ${commitId.commit.toString()} not found in the log for stream ${commitId.baseID.toString()}` ) diff --git a/packages/core/src/dispatcher.ts b/packages/core/src/dispatcher.ts index 77ebd853dc..17c1524ea9 100644 --- a/packages/core/src/dispatcher.ts +++ b/packages/core/src/dispatcher.ts @@ -387,6 +387,8 @@ export class Dispatcher { if (retries > 0) { continue + } else { + throw new Error(`Timeout error while loading CID ${asCid.toString()} from IPFS: ${err}`) } } diff --git a/packages/core/src/state-management/__tests__/repository.test.ts b/packages/core/src/state-management/__tests__/repository.test.ts index ee1ca21def..44f61dd077 100644 --- a/packages/core/src/state-management/__tests__/repository.test.ts +++ b/packages/core/src/state-management/__tests__/repository.test.ts @@ -1,7 +1,9 @@ import { jest } from '@jest/globals' import { + AnchorStatus, CommitType, IpfsApi, + SignatureStatus, StreamState, StreamUtils, SyncOptions, @@ -13,8 +15,12 @@ import { createIPFS } from '@ceramicnetwork/ipfs-daemon' import { Repository } from '../repository.js' import { createCeramic } from '../../__tests__/create-ceramic.js' import { TileDocumentHandler } from '@ceramicnetwork/stream-tile-handler' -import { StreamID } from '@ceramicnetwork/streamid' +import { CommitID, StreamID } from '@ceramicnetwork/streamid' import { RunningState } from '../running-state.js' +import { streamFromState } from '../stream-from-state.js' +import cloneDeep from 'lodash.clonedeep' +import { CID } from 'multiformats/cid' +import { StateLink } from '../state-link.js' const STRING_MAP_SCHEMA = { $schema: 'http://json-schema.org/draft-07/schema#', @@ -189,6 +195,293 @@ describe('#load', () => { expect(syncSpy).toBeCalledTimes(2) }, 30000) + describe('loadAtCommit', () => { + const INITIAL_CONTENT = { abc: 123, def: 456 } + + test('commit history and loadAtCommit', async () => { + const stream = await TileDocument.create(ceramic, INITIAL_CONTENT) + stream.subscribe() + const streamState = await ceramic.repository.load(stream.id, {}) + + const commit0 = stream.allCommitIds[0] + expect(stream.commitId).toEqual(commit0) + expect(commit0.equals(CommitID.make(streamState.id, streamState.id.cid))).toBeTruthy() + + await TestUtils.anchorUpdate(ceramic, stream) + expect(stream.allCommitIds.length).toEqual(2) + expect(stream.anchorCommitIds.length).toEqual(1) + const commit1 = stream.allCommitIds[1] + expect(commit1.equals(commit0)).toBeFalsy() + expect(commit1).toEqual(stream.commitId) + expect(commit1).toEqual(stream.anchorCommitIds[0]) + + const newContent = { abc: 321, def: 456, gh: 987 } + const updateRec = await stream.makeCommit(ceramic, newContent) + await ceramic.repository.applyCommit(streamState.id, updateRec, { + anchor: true, + publish: false, + }) + expect(stream.allCommitIds.length).toEqual(3) + expect(stream.anchorCommitIds.length).toEqual(1) + const commit2 = stream.allCommitIds[2] + expect(commit2.equals(commit1)).toBeFalsy() + expect(commit2).toEqual(stream.commitId) + + await TestUtils.anchorUpdate(ceramic, stream) + expect(stream.allCommitIds.length).toEqual(4) + expect(stream.anchorCommitIds.length).toEqual(2) + const commit3 = stream.allCommitIds[3] + expect(commit3.equals(commit2)).toBeFalsy() + expect(commit3).toEqual(stream.commitId) + expect(commit3).toEqual(stream.anchorCommitIds[1]) + expect(stream.content).toEqual(newContent) + expect(stream.state.signature).toEqual(SignatureStatus.SIGNED) + expect(stream.state.anchorStatus).not.toEqual(AnchorStatus.NOT_REQUESTED) + expect(stream.state.log.length).toEqual(4) + + // Apply a final commit that does not get anchored + const finalContent = { foo: 'bar' } + const updateRec2 = await stream.makeCommit(ceramic, finalContent) + await ceramic.repository.applyCommit(streamState.id, updateRec2, { + anchor: true, + publish: false, + }) + + expect(stream.allCommitIds.length).toEqual(5) + expect(stream.anchorCommitIds.length).toEqual(2) + const commit4 = stream.allCommitIds[4] + expect(commit4.equals(commit3)).toBeFalsy() + expect(commit4).toEqual(stream.commitId) + expect(commit4.equals(stream.anchorCommitIds[1])).toBeFalsy() + expect(stream.state.log.length).toEqual(5) + + await TestUtils.anchorUpdate(ceramic, stream) + + // Correctly check out a specific commit + const streamStateOriginal = cloneDeep(streamState.state) + const streamV0 = await ceramic.repository.loadAtCommit(commit0, { syncTimeoutSeconds: 0 }) + expect(streamV0.id.equals(commit0.baseID)).toBeTruthy() + expect(streamV0.value.log.length).toEqual(1) + expect(streamV0.value.metadata.controllers).toEqual([ceramic.did.id]) + expect(streamV0.value.content).toEqual(INITIAL_CONTENT) + expect(streamV0.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + // Ensure that loading at a CommitID does not mutate the state object that exists in the + // Repository's cache + expect(StreamUtils.serializeState(streamState.state)).toEqual( + StreamUtils.serializeState(streamStateOriginal) + ) + + const streamV1 = await ceramic.repository.loadAtCommit(commit1, { syncTimeoutSeconds: 0 }) + expect(streamV1.id.equals(commit1.baseID)).toBeTruthy() + expect(streamV1.value.log.length).toEqual(2) + expect(streamV1.value.metadata.controllers).toEqual([ceramic.did.id]) + expect(streamV1.value.content).toEqual(INITIAL_CONTENT) + expect(streamV1.value.anchorStatus).toEqual(AnchorStatus.ANCHORED) + + const streamV2 = await ceramic.repository.loadAtCommit(commit2, { syncTimeoutSeconds: 0 }) + expect(streamV2.id.equals(commit2.baseID)).toBeTruthy() + expect(streamV2.value.log.length).toEqual(3) + expect(streamV2.value.metadata.controllers).toEqual([ceramic.did.id]) + expect(streamV2.value.next.content).toEqual(newContent) + expect(streamV2.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const streamV3 = await ceramic.repository.loadAtCommit(commit3, { syncTimeoutSeconds: 0 }) + expect(streamV3.id.equals(commit3.baseID)).toBeTruthy() + expect(streamV3.value.log.length).toEqual(4) + expect(streamV3.value.metadata.controllers).toEqual([ceramic.did.id]) + expect(streamV3.value.content).toEqual(newContent) + expect(streamV3.value.anchorStatus).toEqual(AnchorStatus.ANCHORED) + + const streamV4 = await ceramic.repository.loadAtCommit(commit4, { syncTimeoutSeconds: 0 }) + expect(streamV4.id.equals(commit4.baseID)).toBeTruthy() + expect(streamV4.value.log.length).toEqual(5) + expect(streamV4.value.metadata.controllers).toEqual([ceramic.did.id]) + expect(streamV4.value.next.content).toEqual(finalContent) + expect(streamV4.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + }) + + test('non-existing commit', async () => { + const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false }) + // Emulate loading a non-existing commit + const fakeCid = TestUtils.randomCID() + const nonExistentCommitID = CommitID.make(stream.id, fakeCid) + const originalRetrieve = ceramic.dispatcher.retrieveCommit.bind(ceramic.dispatcher) + ceramic.dispatcher.retrieveCommit = jest.fn(async (cid: CID) => { + if (cid.equals(fakeCid)) { + return null + } else { + return originalRetrieve(cid) + } + }) + await expect( + ceramic.repository.loadAtCommit(nonExistentCommitID, { syncTimeoutSeconds: 0 }) + ).rejects.toThrow(`No commit found for CID ${nonExistentCommitID.commit?.toString()}`) + }) + + test('loadAtCommit returns read-only snapshot', async () => { + const stream1 = await TileDocument.create(ceramic, { foo: 'bar' }, null, { + anchor: false, + }) + await stream1.update({ abc: 321, def: 456, gh: 987 }) + await TestUtils.anchorUpdate(ceramic, stream1) + + const ceramic2 = await createCeramic(ipfs, { anchorOnRequest: false }) + const streamState2 = await ceramic2.repository.load(stream1.id, { + sync: SyncOptions.NEVER_SYNC, + syncTimeoutSeconds: 0, + }) + expect(streamState2.state.log.length).toEqual(1) + const snapshot = await ceramic2.repository.loadAtCommit(stream1.commitId, { + syncTimeoutSeconds: 0, + }) + + expect(StreamUtils.statesEqual(snapshot.state, stream1.state)) + const snapshotStream = streamFromState( + ceramic2.context, + ceramic2._streamHandlers, + snapshot.value + ) + + // Snapshot is read-only + await expect(snapshotStream.update({ abc: 1010 })).rejects.toThrow( + 'Historical stream commits cannot be modified. Load the stream without specifying a commit to make updates.' + ) + + // We fast-forward streamState2, because the commit is legit + expect(streamState2.state).toEqual(stream1.state) + + await ceramic2.close() + }) + + test('return read-only snapshot: do not lose own anchor status', async () => { + // Prepare commits to play + const tile1 = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { + anchor: false, + syncTimeoutSeconds: 0, + }) + await tile1.update({ a: 1 }) + + // Let's pretend we have a stream in PENDING state + const pendingState = { + ...tile1.state, + anchorStatus: AnchorStatus.PENDING, + } + const base$ = new StateLink(pendingState) + + const loadSpy = jest.spyOn(ceramic.repository, 'load') + loadSpy.mockImplementationOnce(async () => { + return base$ as any as RunningState + }) + // We request a snapshot at the latest commit + const snapshot = await ceramic.repository.loadAtCommit(tile1.commitId, { + syncTimeoutSeconds: 0, + }) + // Do not fast-forward the base state: retain PENDING anchor status + expect(base$.state).toBe(pendingState) + // The snapshot is reported to be anchored though + expect(snapshot.state.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + }) + + test('commit ahead of current state', async () => { + const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false }) + const streamState = await ceramic.repository.load(stream.id, {}) + // Provide a new commit that the repository doesn't currently know about + const newContent = { abc: 321, def: 456, gh: 987 } + const updateCommit = await stream.makeCommit(ceramic, newContent) + const futureCommitCID = await ceramic.dispatcher.storeCommit(updateCommit) + const futureCommitID = CommitID.make(stream.id, futureCommitCID) + + // Now load the stream at a commitID ahead of what is currently in the state in the repository. + // The existing RunningState from the repository should also get updated + const snapshot = await ceramic.repository.loadAtCommit(futureCommitID, { + syncTimeoutSeconds: 0, + }) + expect(snapshot.value.next.content).toEqual(newContent) + expect(snapshot.value.log.length).toEqual(2) + expect(StreamUtils.serializeState(streamState.state)).toEqual( + StreamUtils.serializeState(snapshot.value) + ) + }) + + test('handles basic conflict', async () => { + const stream1 = await TileDocument.create(ceramic, INITIAL_CONTENT) + stream1.subscribe() + const streamState1 = await ceramic.repository.load(stream1.id, {}) + const streamId = stream1.id + await TestUtils.anchorUpdate(ceramic, stream1) + const tipPreUpdate = stream1.tip + + const newContent = { abc: 321, def: 456, gh: 987 } + let updateRec = await stream1.makeCommit(ceramic, newContent) + await ceramic.repository.applyCommit(streamState1.id, updateRec, { + anchor: true, + publish: false, + }) + + await TestUtils.anchorUpdate(ceramic, stream1) + expect(stream1.content).toEqual(newContent) + const tipValidUpdate = stream1.tip + // create invalid change that happened after main change + + const initialState = await ceramic.repository + .loadAtCommit(CommitID.make(streamId, streamId.cid), { syncTimeoutSeconds: 0 }) + .then((stream) => stream.state) + const state$ = new RunningState(initialState, true) + ceramic.repository.add(state$) + await ceramic.repository._internals.handleTip(state$, tipPreUpdate) + await new Promise((resolve) => setTimeout(resolve, 1000)) + + const conflictingNewContent = { asdf: 2342 } + const stream2 = streamFromState( + ceramic.context, + ceramic._streamHandlers, + state$.value, + ceramic.repository.updates$ + ) + stream2.subscribe() + updateRec = await stream2.makeCommit(ceramic, conflictingNewContent) + await ceramic.repository.applyCommit(state$.id, updateRec, { + anchor: true, + publish: false, + }) + + await TestUtils.anchorUpdate(ceramic, stream2) + const tipInvalidUpdate = state$.tip + expect(stream2.content).toEqual(conflictingNewContent) + // loading tip from valid log to stream with invalid + // log results in valid state + await ceramic.repository._internals.handleTip(state$, tipValidUpdate) + expect(stream2.content).toEqual(newContent) + + // loading tip from invalid log to stream with valid + // log results in valid state + await ceramic.repository._internals.handleTip(streamState1, tipInvalidUpdate) + expect(stream1.content).toEqual(newContent) + + // Loading valid commit works + const streamState1Original = cloneDeep(streamState1.state) + const streamAtValidCommit = await ceramic.repository.loadAtCommit( + CommitID.make(streamId, tipValidUpdate), + { syncTimeoutSeconds: 0 } + ) + expect(streamAtValidCommit.value.content).toEqual(newContent) + + // Loading invalid commit fails + await expect( + ceramic.repository.loadAtCommit(CommitID.make(streamId, tipInvalidUpdate), { + syncTimeoutSeconds: 0, + }) + ).rejects.toThrow(/rejected by conflict resolution/) + + // Ensure that loading at a CommitID does not mutate the state object that exists in the + // Repository's cache + expect(StreamUtils.serializeState(streamState1.state)).toEqual( + StreamUtils.serializeState(streamState1Original) + ) + }, 10000) + }) + describe('sync: SYNC_ALWAYS', () => { describe('pinned', () => { test('revalidate current state, rewrite', async () => { diff --git a/packages/core/src/stream-loading/__tests__/stream-loader.test.ts b/packages/core/src/stream-loading/__tests__/stream-loader.test.ts index a5c72e0741..059fed9311 100644 --- a/packages/core/src/stream-loading/__tests__/stream-loader.test.ts +++ b/packages/core/src/stream-loading/__tests__/stream-loader.test.ts @@ -21,6 +21,8 @@ import { StreamLoader } from '../stream-loader.js' import { TipFetcher } from '../tip-fetcher.js' import { AnchorTimestampExtractor } from '../anchor-timestamp-extractor.js' import { InMemoryAnchorService } from '../../anchor/memory/in-memory-anchor-service.js' +import { CommitID } from '@ceramicnetwork/streamid' +import cloneDeep from 'lodash.clonedeep' const TOPIC = '/ceramic/test12345' const CONTENT0 = { step: 0 } @@ -126,4 +128,123 @@ describe('StreamLoader test', () => { ) }) }) + + describe('syncStream', () => { + test('basic stream sync', async () => { + const doc = await TileDocument.create(ceramic, CONTENT0) + + const state0 = await streamLoader.loadStream(doc.id, 3) + expectStatesEqualWithPendingAnchor(doc.state, state0) + + await TestUtils.anchorUpdate(ceramic, doc) + const state1 = await streamLoader.syncStream(state0, 3) + expect(state1).not.toEqual(state0) + expect(StreamUtils.serializeState(state1)).toEqual(StreamUtils.serializeState(doc.state)) + + await doc.update(CONTENT1) + const state2A = await streamLoader.syncStream(state0, 3) + const state2B = await streamLoader.syncStream(state1, 3) + expectStatesEqualWithPendingAnchor(doc.state, state2A) + expectStatesEqualWithPendingAnchor(doc.state, state2B) + expect(state2A).toEqual(state2B) + + await doc.update(CONTENT2) + await TestUtils.anchorUpdate(ceramic, doc) + + const state3A = await streamLoader.syncStream(state0, 3) + const state3B = await streamLoader.syncStream(state1, 3) + const state3C = await streamLoader.syncStream(state2A, 3) + expect(StreamUtils.serializeState(state3A)).toEqual(StreamUtils.serializeState(doc.state)) + expect(StreamUtils.serializeState(state3B)).toEqual(StreamUtils.serializeState(doc.state)) + expect(StreamUtils.serializeState(state3C)).toEqual(StreamUtils.serializeState(doc.state)) + }) + }) + + describe('stateAtCommit', () => { + test('basic ability to load stream at various CommitIDs', async () => { + const doc = await TileDocument.create(ceramic, CONTENT0) + await TestUtils.anchorUpdate(ceramic, doc) + await doc.update(CONTENT1) + await doc.update(CONTENT2) + await TestUtils.anchorUpdate(ceramic, doc) + const commits = doc.allCommitIds + expect(commits.length).toEqual(5) + for (const commit of commits) { + expect(doc.id.toString()).toEqual(commit.baseID.toString()) + } + + const stateV0 = await streamLoader.stateAtCommit(doc.state, commits[0]) + expect(stateV0.log.length).toEqual(1) + expect(stateV0.content).toEqual(CONTENT0) + expect(stateV0.next).toBeUndefined() + expect(stateV0.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const stateV1 = await streamLoader.stateAtCommit(doc.state, commits[1]) + expect(stateV1.log.length).toEqual(2) + expect(stateV1.content).toEqual(CONTENT0) + expect(stateV1.next).toBeUndefined() + expect(stateV1.anchorStatus).toEqual(AnchorStatus.ANCHORED) + + const stateV2 = await streamLoader.stateAtCommit(doc.state, commits[2]) + expect(stateV2.log.length).toEqual(3) + expect(stateV2.content).toEqual(CONTENT0) + expect(stateV2.next.content).toEqual(CONTENT1) + expect(stateV2.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const stateV3 = await streamLoader.stateAtCommit(doc.state, commits[3]) + expect(stateV3.log.length).toEqual(4) + expect(stateV3.content).toEqual(CONTENT0) + expect(stateV3.next.content).toEqual(CONTENT2) + expect(stateV3.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const stateV4 = await streamLoader.stateAtCommit(doc.state, commits[4]) + expect(stateV4.log.length).toEqual(5) + expect(stateV4.content).toEqual(CONTENT2) + expect(stateV4.next).toBeUndefined() + expect(stateV4.anchorStatus).toEqual(AnchorStatus.ANCHORED) + }) + + test('commit ahead of current state', async () => { + const stream = await TileDocument.create(ceramic, CONTENT0) + const initialState = cloneDeep(stream.state) + await stream.update(CONTENT1) + await TestUtils.anchorUpdate(ceramic, stream) + + // Now load the stream at a commitID ahead of what is currently in the state + const updatedState1 = await streamLoader.stateAtCommit(initialState, stream.allCommitIds[1]) + expect(updatedState1.log.length).toEqual(2) + expect(updatedState1.content).toEqual(CONTENT0) + expect(updatedState1.next.content).toEqual(CONTENT1) + expect(updatedState1.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED) + + const updatedState2 = await streamLoader.stateAtCommit(initialState, stream.allCommitIds[2]) + expect(updatedState2.log.length).toEqual(3) + expect(updatedState2.content).toEqual(CONTENT1) + expect(updatedState2.next).toBeUndefined() + expect(updatedState2.anchorStatus).toEqual(AnchorStatus.ANCHORED) + }) + + test('non-existent commit', async () => { + const doc = await TileDocument.create(ceramic, CONTENT0) + + const nonExistentCommitID = CommitID.make(doc.id, TestUtils.randomCID()) + + await expect(streamLoader.stateAtCommit(doc.state, nonExistentCommitID)).rejects.toThrow( + /Timeout error while loading CID/ + ) + }) + + test('throw if commit rejected by conflict resolution', async () => { + const stream = await TileDocument.create(ceramic, CONTENT0) + const conflictingUpdate = await stream.makeCommit(ceramic, CONTENT2) + await stream.update(CONTENT1) + await TestUtils.anchorUpdate(ceramic, stream) + + const conflictingUpdateCID = await dispatcher.storeCommit(conflictingUpdate) + + await expect( + streamLoader.stateAtCommit(stream.state, CommitID.make(stream.id, conflictingUpdateCID)) + ).rejects.toThrow(/rejected by conflict resolution/) + }) + }) }) diff --git a/packages/core/src/stream-loading/state-manipulator.ts b/packages/core/src/stream-loading/state-manipulator.ts index 12480a29d3..e1a9c070c1 100644 --- a/packages/core/src/stream-loading/state-manipulator.ts +++ b/packages/core/src/stream-loading/state-manipulator.ts @@ -10,6 +10,7 @@ import { StreamUtils, UnappliableStreamLog, } from '@ceramicnetwork/common' +import { CID } from 'multiformats/cid' import { HandlersMap } from '../handlers-map.js' import { LogSyncer } from './log-syncer.js' @@ -28,7 +29,7 @@ interface ApplyFullLogOpts { * the existing state. * @param throwOnConflict - if true, throws if the log to apply is rejected by conflict resolution. */ -interface ApplyLogToStateOpts { +export interface ApplyLogToStateOpts { throwOnInvalidCommit: boolean throwIfStale: boolean throwOnConflict: boolean @@ -228,19 +229,43 @@ export class StateManipulator { // Remote log was selected. We need to build the state that corresponds to the new log. // First get the stream state at the divergence point - const sharedLogWithoutTimestamps = await this.logSyncer.syncFullLog( - streamId, + const sharedState = await this.resetStateToCommit( + initialState, initialState.log[conflictIdx].cid ) + + // Now apply the new log to the shared state + return this._applyLog(handler, sharedState, logToApply, opts.throwOnInvalidCommit) + } + + /** + * Given a StreamState and the CID of a commit in that state's log, return a new StreamState + * representing the state of the Stream as of that commit CID. + * @param initialState + * @param commitCid - cid of a commit that must be in the log of 'initialState' + */ + async resetStateToCommit(initialState: StreamState, commitCid: CID): Promise { + const streamId = StreamUtils.streamIdFromState(initialState) + // It is invalid to call this function if the commitCid is not a part of the initialState log. + const commitIndex = initialState.log.findIndex((logEntry) => logEntry.cid.equals(commitCid)) + if (commitIndex < 0) { + // Note this should never happen - it would indicate a programmer bug if this Error was ever + // actually thrown. + throw new Error( + `Requested commit CID ${commitCid.toString()} not found in the log for stream ${streamId.toString()}` + ) + } + + const sharedLogWithoutTimestamps = await this.logSyncer.syncFullLog(streamId, commitCid) const sharedLogWithTimestamps = this._copyTrustedTimestamps( initialState.log, sharedLogWithoutTimestamps ) - const sharedState = await this._applyLog(handler, null, sharedLogWithTimestamps, true) - // Now apply the new log to the shared state - return this._applyLog(handler, sharedState, logToApply, opts.throwOnInvalidCommit) + const handler = this.streamTypeHandlers.get(initialState.type) + return this._applyLog(handler, null, sharedLogWithTimestamps, true) } + /** * Applies a log of new commits to an existing StreamState to get the updated StreamState * resulting from applying the log. It's possible that the new StreamState could be the same as diff --git a/packages/core/src/stream-loading/stream-loader.ts b/packages/core/src/stream-loading/stream-loader.ts index 09d1ca1171..238d2e2348 100644 --- a/packages/core/src/stream-loading/stream-loader.ts +++ b/packages/core/src/stream-loading/stream-loader.ts @@ -1,9 +1,10 @@ import { LogSyncer } from './log-syncer.js' import { TipFetcher } from './tip-fetcher.js' -import { StateManipulator } from './state-manipulator.js' +import { ApplyLogToStateOpts, StateManipulator } from './state-manipulator.js' import { AnchorTimestampExtractor } from './anchor-timestamp-extractor.js' import { DiagnosticsLogger, StreamState, StreamUtils } from '@ceramicnetwork/common' -import { StreamID } from '@ceramicnetwork/streamid' +import { CommitID, StreamID } from '@ceramicnetwork/streamid' +import { CID } from 'multiformats/cid' /** * Class to contain all the logic for loading a stream, including fetching the relevant commit @@ -45,6 +46,20 @@ export class StreamLoader { async syncStream(state: StreamState, syncTimeoutSecs: number): Promise { const streamID = StreamUtils.streamIdFromState(state) const tip = await this.tipFetcher.findTip(streamID, syncTimeoutSecs) + + return this._applyTipToState(state, tip, { + throwOnInvalidCommit: false, + throwIfStale: false, + throwOnConflict: false, + }) + } + + private async _applyTipToState( + state: StreamState, + tip: CID, + opts: ApplyLogToStateOpts + ): Promise { + const streamID = StreamUtils.streamIdFromState(state) const logWithoutTimestamps = await this.logSyncer.syncLogUntilMatch( streamID, tip, @@ -53,10 +68,30 @@ export class StreamLoader { const logWithTimestamps = await this.anchorTimestampExtractor.verifyAnchorAndApplyTimestamps( logWithoutTimestamps ) - return await this.stateManipulator.applyLogToState(state, logWithTimestamps, { - throwOnInvalidCommit: false, - throwIfStale: false, - throwOnConflict: false, - }) + return await this.stateManipulator.applyLogToState(state, logWithTimestamps, opts) + } + + /** + * Given the currently known about StreamState for a Stream, return the state of that stream + * at a specific CommitID. + * @param state + * @param commitId + */ + async stateAtCommit(initialState: StreamState, commitId: CommitID): Promise { + // Throw if any commit fails to apply as we are trying to load at a specific commit and want + // to error if we can't. + const opts = { throwOnInvalidCommit: true, throwOnConflict: true, throwIfStale: false } + + // If 'commit' is ahead of 'initialState', sync state up to 'commit' + const baseState = await this._applyTipToState(initialState, commitId.commit, opts) + + // If the commitId is now the tip, we're done. + if (baseState.log[baseState.log.length - 1].cid.equals(commitId.commit)) { + return baseState + } + + // If the requested commit is included in the log, but isn't the most recent commit, we need + // to reset the state to the state at the requested commit. + return this.stateManipulator.resetStateToCommit(baseState, commitId.commit) } }