Skip to content

Commit

Permalink
feat: Add functionality to StreamLoader to load at a CommitID (#2950)
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody authored Sep 13, 2023
1 parent 510e811 commit 8103038
Show file tree
Hide file tree
Showing 7 changed files with 494 additions and 293 deletions.
280 changes: 1 addition & 279 deletions packages/core/src/__tests__/state-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {
IpfsApi,
SignatureStatus,
Stream,
StreamUtils,
TestUtils,
AnchorCommit,
} from '@ceramicnetwork/common'
Expand All @@ -25,20 +24,16 @@ import { createIPFS } from '@ceramicnetwork/ipfs-daemon'
import { createCeramic } from './create-ceramic.js'
import { Ceramic } from '../ceramic.js'
import { TileDocument } from '@ceramicnetwork/stream-tile'
import { streamFromState } from '../state-management/stream-from-state.js'
import * as uint8arrays from 'uint8arrays'
import * as sha256 from '@stablelib/sha256'
import { CommitID, StreamID } from '@ceramicnetwork/streamid'
import { StreamID } from '@ceramicnetwork/streamid'
import { from, Subject, timer, firstValueFrom } from 'rxjs'
import { concatMap, map } from 'rxjs/operators'
import { MAX_RESPONSE_INTERVAL } from '../pubsub/message-bus.js'
import cloneDeep from 'lodash.clonedeep'
import { StateLink } from '../state-management/state-link.js'
import { InMemoryAnchorService } from '../anchor/memory/in-memory-anchor-service.js'
import { whenSubscriptionDone } from './when-subscription-done.util.js'
import { CASResponse, AnchorRequestStatusName } from '@ceramicnetwork/codecs'

const FAKE_CID = CID.parse('bafybeig6xv5nwphfmvcnektpnojts33jqcuam7bmye2pb54adnrtccjlsu')
const INITIAL_CONTENT = { abc: 123, def: 456 }
const STRING_MAP_SCHEMA = {
$schema: 'http://json-schema.org/draft-07/schema#',
Expand Down Expand Up @@ -154,279 +149,6 @@ describe('anchor', () => {
await ceramic2.close()
})

test('commit history and atCommit', async () => {
const stream = await TileDocument.create<any>(ceramic, INITIAL_CONTENT)
stream.subscribe()
const streamState = await ceramic.repository.load(stream.id, {})

const commit0 = stream.allCommitIds[0]
expect(stream.commitId).toEqual(commit0)
expect(commit0.equals(CommitID.make(streamState.id, streamState.id.cid))).toBeTruthy()

await TestUtils.anchorUpdate(ceramic, stream)
expect(stream.allCommitIds.length).toEqual(2)
expect(stream.anchorCommitIds.length).toEqual(1)
const commit1 = stream.allCommitIds[1]
expect(commit1.equals(commit0)).toBeFalsy()
expect(commit1).toEqual(stream.commitId)
expect(commit1).toEqual(stream.anchorCommitIds[0])

const newContent = { abc: 321, def: 456, gh: 987 }
const updateRec = await stream.makeCommit(ceramic, newContent)
await ceramic.repository.applyCommit(streamState.id, updateRec, {
anchor: true,
publish: false,
})
expect(stream.allCommitIds.length).toEqual(3)
expect(stream.anchorCommitIds.length).toEqual(1)
const commit2 = stream.allCommitIds[2]
expect(commit2.equals(commit1)).toBeFalsy()
expect(commit2).toEqual(stream.commitId)

await TestUtils.anchorUpdate(ceramic, stream)
expect(stream.allCommitIds.length).toEqual(4)
expect(stream.anchorCommitIds.length).toEqual(2)
const commit3 = stream.allCommitIds[3]
expect(commit3.equals(commit2)).toBeFalsy()
expect(commit3).toEqual(stream.commitId)
expect(commit3).toEqual(stream.anchorCommitIds[1])
expect(stream.content).toEqual(newContent)
expect(stream.state.signature).toEqual(SignatureStatus.SIGNED)
expect(stream.state.anchorStatus).not.toEqual(AnchorStatus.NOT_REQUESTED)
expect(stream.state.log.length).toEqual(4)

// Apply a final commit that does not get anchored
const finalContent = { foo: 'bar' }
const updateRec2 = await stream.makeCommit(ceramic, finalContent)
await ceramic.repository.applyCommit(streamState.id, updateRec2, {
anchor: true,
publish: false,
})

expect(stream.allCommitIds.length).toEqual(5)
expect(stream.anchorCommitIds.length).toEqual(2)
const commit4 = stream.allCommitIds[4]
expect(commit4.equals(commit3)).toBeFalsy()
expect(commit4).toEqual(stream.commitId)
expect(commit4.equals(stream.anchorCommitIds[1])).toBeFalsy()
expect(stream.state.log.length).toEqual(5)

await TestUtils.waitForAnchor(stream, 3000)

// Correctly check out a specific commit
const streamStateOriginal = cloneDeep(streamState.state)
const streamV0 = await ceramic.repository.stateManager.atCommit(streamState, commit0)
expect(streamV0.id.equals(commit0.baseID)).toBeTruthy()
expect(streamV0.value.log.length).toEqual(1)
expect(streamV0.value.metadata.controllers).toEqual(controllers)
expect(streamV0.value.content).toEqual(INITIAL_CONTENT)
expect(streamV0.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)

const streamV1 = await ceramic.repository.stateManager.atCommit(streamState, commit1)
expect(streamV1.id.equals(commit1.baseID)).toBeTruthy()
expect(streamV1.value.log.length).toEqual(2)
expect(streamV1.value.metadata.controllers).toEqual(controllers)
expect(streamV1.value.content).toEqual(INITIAL_CONTENT)
expect(streamV1.value.anchorStatus).toEqual(AnchorStatus.ANCHORED)

const streamV2 = await ceramic.repository.stateManager.atCommit(streamState, commit2)
expect(streamV2.id.equals(commit2.baseID)).toBeTruthy()
expect(streamV2.value.log.length).toEqual(3)
expect(streamV2.value.metadata.controllers).toEqual(controllers)
expect(streamV2.value.next.content).toEqual(newContent)
expect(streamV2.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)

const streamV3 = await ceramic.repository.stateManager.atCommit(streamState, commit3)
expect(streamV3.id.equals(commit3.baseID)).toBeTruthy()
expect(streamV3.value.log.length).toEqual(4)
expect(streamV3.value.metadata.controllers).toEqual(controllers)
expect(streamV3.value.content).toEqual(newContent)
expect(streamV3.value.anchorStatus).toEqual(AnchorStatus.ANCHORED)

const streamV4 = await ceramic.repository.stateManager.atCommit(streamState, commit4)
expect(streamV4.id.equals(commit4.baseID)).toBeTruthy()
expect(streamV4.value.log.length).toEqual(5)
expect(streamV4.value.metadata.controllers).toEqual(controllers)
expect(streamV4.value.next.content).toEqual(finalContent)
expect(streamV4.value.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)

// Ensure that stateManager.atCommit does not mutate the passed in state object
expect(StreamUtils.serializeState(streamState.state)).toEqual(
StreamUtils.serializeState(streamStateOriginal)
)
})

describe('atCommit', () => {
test('non-existing commit', async () => {
const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false })
const streamState = await ceramic.repository.load(stream.id, {})
// Emulate loading a non-existing commit
const nonExistentCommitID = CommitID.make(stream.id, FAKE_CID)
const originalRetrieve = ceramic.dispatcher.retrieveCommit.bind(ceramic.dispatcher)
ceramic.dispatcher.retrieveCommit = jest.fn(async (cid: CID) => {
if (cid.equals(FAKE_CID)) {
return null
} else {
return originalRetrieve(cid)
}
})
await expect(
ceramic.repository.stateManager.atCommit(streamState, nonExistentCommitID)
).rejects.toThrow(`No commit found for CID ${nonExistentCommitID.commit?.toString()}`)
})

test('return read-only snapshot', async () => {
const stream1 = await TileDocument.create<any>(ceramic, INITIAL_CONTENT, null, {
anchor: false,
syncTimeoutSeconds: 0,
})
await stream1.update({ abc: 321, def: 456, gh: 987 })
await TestUtils.anchorUpdate(ceramic, stream1)

const ceramic2 = await createCeramic(ipfs, { anchorOnRequest: false })
const stream2 = await TileDocument.load(ceramic, stream1.id)
const streamState2 = await ceramic2.repository.load(stream2.id, { syncTimeoutSeconds: 0 })
const snapshot = await ceramic2.repository.stateManager.atCommit(
streamState2,
stream1.commitId
)

expect(StreamUtils.statesEqual(snapshot.state, stream1.state))
const snapshotStream = streamFromState<TileDocument>(
ceramic2.context,
ceramic2._streamHandlers,
snapshot.value
)

// Snapshot is read-only
await expect(snapshotStream.update({ abc: 1010 })).rejects.toThrow(
'Historical stream commits cannot be modified. Load the stream without specifying a commit to make updates.'
)

// We fast-forward streamState2, because the commit is legit
expect(streamState2.state).toEqual(stream1.state)

await ceramic2.close()
})

test('return read-only snapshot: do not lose own anchor status', async () => {
// Prepare commits to play
const tile1 = await TileDocument.create<any>(ceramic, INITIAL_CONTENT, null, {
anchor: false,
syncTimeoutSeconds: 0,
})
await tile1.update({ a: 1 })

// Let's pretend we have a stream in PENDING state
const pendingState = {
...tile1.state,
anchorStatus: AnchorStatus.PENDING,
}
const base$ = new StateLink(pendingState)
// We request a snapshot at the latest commit
const snapshot = await ceramic.repository.stateManager.atCommit(base$, tile1.commitId)
// Do not fast-forward the base state: retain PENDING anchor status
expect(base$.state).toBe(pendingState)
// The snapshot is reported to be anchored though
expect(snapshot.state.anchorStatus).toEqual(AnchorStatus.NOT_REQUESTED)
})

test('commit ahead of current state', async () => {
const stream = await TileDocument.create(ceramic, INITIAL_CONTENT, null, { anchor: false })
const streamState = await ceramic.repository.load(stream.id, {})
// Provide a new commit that the repository doesn't currently know about
const newContent = { abc: 321, def: 456, gh: 987 }
const updateCommit = await stream.makeCommit(ceramic, newContent)
const futureCommitCID = await ceramic.dispatcher.storeCommit(updateCommit)
const futureCommitID = CommitID.make(stream.id, futureCommitCID)

// Now load the stream at a commitID ahead of what is currently in the state in the repository.
// The existing RunningState from the repository should also get updated
const state$ = await ceramic.repository.load(futureCommitID.baseID, {})
const snapshot = await ceramic.repository.stateManager.atCommit(state$, futureCommitID)
expect(snapshot.value.next.content).toEqual(newContent)
expect(snapshot.value.log.length).toEqual(2)
expect(StreamUtils.serializeState(streamState.state)).toEqual(
StreamUtils.serializeState(snapshot.value)
)
})
})

test('handles basic conflict', async () => {
const stream1 = await TileDocument.create(ceramic, INITIAL_CONTENT)
stream1.subscribe()
const streamState1 = await ceramic.repository.load(stream1.id, {})
const streamId = stream1.id
await TestUtils.anchorUpdate(ceramic, stream1)
const tipPreUpdate = stream1.tip

const newContent = { abc: 321, def: 456, gh: 987 }
let updateRec = await stream1.makeCommit(ceramic, newContent)
await ceramic.repository.applyCommit(streamState1.id, updateRec, {
anchor: true,
publish: false,
})

await TestUtils.anchorUpdate(ceramic, stream1)
expect(stream1.content).toEqual(newContent)
const tipValidUpdate = stream1.tip
// create invalid change that happened after main change

const initialState = await ceramic.repository.stateManager
.atCommit(streamState1, CommitID.make(streamId, streamId.cid))
.then((stream) => stream.state)
const state$ = new RunningState(initialState, true)
ceramic.repository.add(state$)
await ceramic.repository._internals.handleTip(state$, tipPreUpdate)
await new Promise((resolve) => setTimeout(resolve, 1000))

const conflictingNewContent = { asdf: 2342 }
const stream2 = streamFromState<TileDocument>(
ceramic.context,
ceramic._streamHandlers,
state$.value,
ceramic.repository.updates$
)
stream2.subscribe()
updateRec = await stream2.makeCommit(ceramic, conflictingNewContent)
await ceramic.repository.applyCommit(state$.id, updateRec, {
anchor: true,
publish: false,
})

await TestUtils.anchorUpdate(ceramic, stream2)
const tipInvalidUpdate = state$.tip
expect(stream2.content).toEqual(conflictingNewContent)
// loading tip from valid log to stream with invalid
// log results in valid state
await ceramic.repository._internals.handleTip(state$, tipValidUpdate)
expect(stream2.content).toEqual(newContent)

// loading tip from invalid log to stream with valid
// log results in valid state
await ceramic.repository._internals.handleTip(streamState1, tipInvalidUpdate)
expect(stream1.content).toEqual(newContent)

// Loading valid commit works
const streamState1Original = cloneDeep(streamState1.state)
const streamAtValidCommit = await ceramic.repository.stateManager.atCommit(
streamState1,
CommitID.make(streamId, tipValidUpdate)
)
expect(streamAtValidCommit.value.content).toEqual(newContent)

// Loading invalid commit fails
await expect(
ceramic.repository.stateManager.atCommit(
streamState1,
CommitID.make(streamId, tipInvalidUpdate)
)
).rejects.toThrow(/rejected by conflict resolution/)

// Ensure that stateManager.atCommit does not mutate the passed in state object
expect(JSON.stringify(streamState1.state)).toEqual(JSON.stringify(streamState1Original))
}, 10000)

test('enforces schema in update that assigns schema', async () => {
const schemaDoc = await TileDocument.create(ceramic, STRING_MAP_SCHEMA)
await TestUtils.anchorUpdate(ceramic, schemaDoc)
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/conflict-resolution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ export class ConflictResolution {
// rejected it.
const commitIndex = baseStateLog.findIndex(commitId.commit)
if (commitIndex < 0) {
// Note this should never happen. Since we set `throwOnConflict` in the opts, applyTip()
// have thrown already if the commit was rejected by conflict resolution. It would indicate
// programming error if this Error was ever actually thrown.
throw new Error(
`Requested commit CID ${commitId.commit.toString()} not found in the log for stream ${commitId.baseID.toString()}`
)
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ export class Dispatcher {

if (retries > 0) {
continue
} else {
throw new Error(`Timeout error while loading CID ${asCid.toString()} from IPFS: ${err}`)
}
}

Expand Down
Loading

0 comments on commit 8103038

Please sign in to comment.