Skip to content

Commit

Permalink
Merge branch 'develop' into stateManagerPrivate-cdb-2751
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody authored Sep 14, 2023
2 parents 91d24a4 + 21c4df6 commit d3c0b16
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 90 deletions.
11 changes: 11 additions & 0 deletions packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import {
type CeramicNetworkOptions,
} from './initialization/network-options.js'
import { usableAnchorChains, DEFAULT_ANCHOR_SERVICE_URLS } from './initialization/anchoring.js'
import { StreamUpdater } from './stream-loading/stream-updater.js'

const DEFAULT_CACHE_LIMIT = 500 // number of streams stored in the cache
const DEFAULT_QPS_LIMIT = 10 // Max number of pubsub query messages that can be published per second without rate limiting
Expand Down Expand Up @@ -249,6 +250,8 @@ export class Ceramic implements CeramicApi {

// This initialization block below has to be redone.
// Things below should be passed here as `modules` variable.
// TODO(CDB-2749): Hide 'anchorTimestampExtractor', 'tipFetcher', 'logSyncer', and
// 'stateManipulator' as implementation details of StreamLoader and StreamUpdater.
const anchorTimestampExtractor = new AnchorTimestampExtractor(
this._logger,
this.dispatcher,
Expand Down Expand Up @@ -276,6 +279,13 @@ export class Ceramic implements CeramicApi {
anchorTimestampExtractor,
stateManipulator
)
const streamUpdater = new StreamUpdater(
this._logger,
this.dispatcher,
logSyncer,
anchorTimestampExtractor,
stateManipulator
)
const pinStore = modules.pinStoreFactory.createPinStore()
const localIndex = new LocalIndexApi(
params.indexingConfig,
Expand All @@ -294,6 +304,7 @@ export class Ceramic implements CeramicApi {
conflictResolution: conflictResolution,
indexing: localIndex,
streamLoader,
streamUpdater,
})
this.syncApi = new SyncApi(
{
Expand Down
146 changes: 79 additions & 67 deletions packages/core/src/state-management/repository-internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,83 @@ export class RepositoryInternals {
return this.processAnchorResponse(state$, anchorStatus$)
}

/**
* Handle CASResponse and update state$.
*
* @param state$ - RunningState instance to update.
* @param casResponse - response from CAS.
* @return boolean - `true` if polling should stop, `false` if polling continues
*/
async handleAnchorResponse(state$: RunningState, casResponse: CASResponse): Promise<boolean> {
// We don't want to change a stream's state due to changes to the anchor
// status of a commit that is no longer the tip of the stream, so we early return
// in most cases when receiving a response to an old anchor request.
// The one exception is if the CASResponse indicates that the old commit
// is now anchored, in which case we still want to try to process the anchor commit
// and let the stream's conflict resolution mechanism decide whether or not to update
// the stream's state.
const status = casResponse.status
switch (status) {
case AnchorRequestStatusName.READY:
case AnchorRequestStatusName.PENDING: {
if (!casResponse.cid.equals(state$.tip)) return
const next = {
...state$.value,
anchorStatus: AnchorStatus.PENDING,
}
state$.next(next)
await this._updateStateIfPinned(state$)
return false
}
case AnchorRequestStatusName.PROCESSING: {
if (!casResponse.cid.equals(state$.tip)) return
state$.next({ ...state$.value, anchorStatus: AnchorStatus.PROCESSING })
await this._updateStateIfPinned(state$)
return false
}
case AnchorRequestStatusName.COMPLETED: {
if (casResponse.cid.equals(state$.tip)) {
await this.#anchorRequestStore.remove(state$.id)
}
await this._handleAnchorCommit(
state$,
casResponse.cid,
casResponse.anchorCommit.cid,
casResponse.witnessCar
)
return true
}
case AnchorRequestStatusName.FAILED: {
this.#logger.warn(
`Anchor failed for commit ${casResponse.cid} of stream ${casResponse.streamId}: ${casResponse.message}`
)

// if this is the anchor response for the tip update the state
if (casResponse.cid.equals(state$.tip)) {
state$.next({ ...state$.value, anchorStatus: AnchorStatus.FAILED })
await this.#anchorRequestStore.remove(state$.id)
}
// we stop the polling as this is a terminal state
return true
}
case AnchorRequestStatusName.REPLACED: {
this.#logger.verbose(
`Anchor request for commit ${casResponse.cid} of stream ${casResponse.streamId} is replaced`
)

// If this is the tip and the node received a REPLACED response for it the node has gotten into a weird state.
// Hopefully this should resolve through updates that will be received shortly or through syncing the stream.
if (casResponse.cid.equals(state$.tip)) {
await this.#anchorRequestStore.remove(state$.id)
}

return true
}
default:
throw new UnreachableCaseError(status, 'Unknown anchoring state')
}
}

processAnchorResponse(
state$: RunningState,
anchorStatus$: Observable<CASResponse>
Expand All @@ -306,73 +383,8 @@ export class RepositoryInternals {
.pipe(
takeUntil(stopSignal),
concatMap(async (asr) => {
// We don't want to change a stream's state due to changes to the anchor
// status of a commit that is no longer the tip of the stream, so we early return
// in most cases when receiving a response to an old anchor request.
// The one exception is if the CASResponse indicates that the old commit
// is now anchored, in which case we still want to try to process the anchor commit
// and let the stream's conflict resolution mechanism decide whether or not to update
// the stream's state.
const status = asr.status
switch (status) {
case AnchorRequestStatusName.READY:
case AnchorRequestStatusName.PENDING: {
if (!asr.cid.equals(state$.tip)) return
const next = {
...state$.value,
anchorStatus: AnchorStatus.PENDING,
}
state$.next(next)
await this._updateStateIfPinned(state$)
return
}
case AnchorRequestStatusName.PROCESSING: {
if (!asr.cid.equals(state$.tip)) return
state$.next({ ...state$.value, anchorStatus: AnchorStatus.PROCESSING })
await this._updateStateIfPinned(state$)
return
}
case AnchorRequestStatusName.COMPLETED: {
if (asr.cid.equals(state$.tip)) {
await this.#anchorRequestStore.remove(state$.id)
}

await this._handleAnchorCommit(state$, asr.cid, asr.anchorCommit.cid, asr.witnessCar)

stopSignal.next()
return
}
case AnchorRequestStatusName.FAILED: {
this.#logger.warn(
`Anchor failed for commit ${asr.cid} of stream ${asr.streamId}: ${asr.message}`
)

// if this is the anchor response for the tip update the state
if (asr.cid.equals(state$.tip)) {
state$.next({ ...state$.value, anchorStatus: AnchorStatus.FAILED })
await this.#anchorRequestStore.remove(state$.id)
}
// we stop the polling as this is a terminal state
stopSignal.next()
return
}
case AnchorRequestStatusName.REPLACED: {
this.#logger.verbose(
`Anchor request for commit ${asr.cid} of stream ${asr.streamId} is replaced`
)

// If this is the tip and the node received a REPLACED response for it the node has gotten into a weird state.
// Hopefully this should resolve through updates that will be received shortly or through syncing the stream.
if (asr.cid.equals(state$.tip)) {
await this.#anchorRequestStore.remove(state$.id)
}

stopSignal.next()
return
}
default:
throw new UnreachableCaseError(status, 'Unknown anchoring state')
}
const shouldStop = await this.handleAnchorResponse(state$, asr)
if (shouldStop) stopSignal.next()
}),
catchError((error) => {
// TODO: Combine these two log statements into one line so that they can't get split up in the
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/state-management/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { ServiceMetrics as Metrics } from '@ceramicnetwork/observability'
import { RepositoryInternals } from './repository-internals.js'
import { StreamLoader } from '../stream-loading/stream-loader.js'
import { OperationType } from './operation-type.js'
import { StreamUpdater } from '../stream-loading/stream-updater.js'
import { CID } from 'multiformats/cid'

const CACHE_EVICTED_MEMORY = 'cache_eviction_memory'
Expand All @@ -46,6 +47,7 @@ export type RepositoryDependencies = {
conflictResolution: ConflictResolution
indexing: LocalIndexApi
streamLoader: StreamLoader
streamUpdater: StreamUpdater
}

/**
Expand Down
35 changes: 35 additions & 0 deletions packages/core/src/stream-loading/apply-tip-helper.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { LogSyncer } from './log-syncer.js'
import { ApplyLogToStateOpts, StateManipulator } from './state-manipulator.js'
import { AnchorTimestampExtractor } from './anchor-timestamp-extractor.js'
import { StreamState, StreamUtils } from '@ceramicnetwork/common'
import { CID } from 'multiformats/cid'

/**
* Helper function for taking a StreamState and a tip CID and returning the new StreamState that
* results from applying that tip to the state.
* @param logSyncer
* @param anchorTimestampExtractor
* @param stateManipulator
* @param state
* @param tip
* @param opts
*/
export async function applyTipToState(
logSyncer: LogSyncer,
anchorTimestampExtractor: AnchorTimestampExtractor,
stateManipulator: StateManipulator,
state: StreamState,
tip: CID,
opts: ApplyLogToStateOpts
): Promise<StreamState> {
const streamID = StreamUtils.streamIdFromState(state)
const logWithoutTimestamps = await logSyncer.syncLogUntilMatch(
streamID,
tip,
state.log.map((logEntry) => logEntry.cid)
)
const logWithTimestamps = await anchorTimestampExtractor.verifyAnchorAndApplyTimestamps(
logWithoutTimestamps
)
return stateManipulator.applyLogToState(state, logWithTimestamps, opts)
}
43 changes: 20 additions & 23 deletions packages/core/src/stream-loading/stream-loader.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { LogSyncer } from './log-syncer.js'
import { TipFetcher } from './tip-fetcher.js'
import { ApplyLogToStateOpts, StateManipulator } from './state-manipulator.js'
import { StateManipulator } from './state-manipulator.js'
import { AnchorTimestampExtractor } from './anchor-timestamp-extractor.js'
import { DiagnosticsLogger, StreamState, StreamUtils } from '@ceramicnetwork/common'
import { CommitID, StreamID } from '@ceramicnetwork/streamid'
import { CID } from 'multiformats/cid'
import { applyTipToState } from './apply-tip-helper.js'

/**
* Class to contain all the logic for loading a stream, including fetching the relevant commit
Expand Down Expand Up @@ -47,28 +47,18 @@ export class StreamLoader {
const streamID = StreamUtils.streamIdFromState(state)
const tip = await this.tipFetcher.findTip(streamID, syncTimeoutSecs)

return this._applyTipToState(state, tip, {
throwOnInvalidCommit: false,
throwIfStale: false,
throwOnConflict: false,
})
}

private async _applyTipToState(
state: StreamState,
tip: CID,
opts: ApplyLogToStateOpts
): Promise<StreamState> {
const streamID = StreamUtils.streamIdFromState(state)
const logWithoutTimestamps = await this.logSyncer.syncLogUntilMatch(
streamID,
return applyTipToState(
this.logSyncer,
this.anchorTimestampExtractor,
this.stateManipulator,
state,
tip,
state.log.map((logEntry) => logEntry.cid)
{
throwOnInvalidCommit: false,
throwIfStale: false,
throwOnConflict: false,
}
)
const logWithTimestamps = await this.anchorTimestampExtractor.verifyAnchorAndApplyTimestamps(
logWithoutTimestamps
)
return await this.stateManipulator.applyLogToState(state, logWithTimestamps, opts)
}

/**
Expand All @@ -83,7 +73,14 @@ export class StreamLoader {
const opts = { throwOnInvalidCommit: true, throwOnConflict: true, throwIfStale: false }

// If 'commit' is ahead of 'initialState', sync state up to 'commit'
const baseState = await this._applyTipToState(initialState, commitId.commit, opts)
const baseState = await applyTipToState(
this.logSyncer,
this.anchorTimestampExtractor,
this.stateManipulator,
initialState,
commitId.commit,
opts
)

// If the commitId is now the tip, we're done.
if (baseState.log[baseState.log.length - 1].cid.equals(commitId.commit)) {
Expand Down
75 changes: 75 additions & 0 deletions packages/core/src/stream-loading/stream-updater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { LogSyncer } from './log-syncer.js'
import { StateManipulator } from './state-manipulator.js'
import { AnchorTimestampExtractor } from './anchor-timestamp-extractor.js'
import { CeramicCommit, DiagnosticsLogger, StreamState, StreamUtils } from '@ceramicnetwork/common'
import { StreamID } from '@ceramicnetwork/streamid'
import { CID } from 'multiformats/cid'
import { applyTipToState } from './apply-tip-helper.js'

interface CommitStorer {
storeCommit(data: any, streamId?: StreamID): Promise<CID>
}

/**
* Class to contain all the logic for updating new commits to a stream. It notably however does not
* manage the persistence of the state information, nor updating the cache or indexing. It is
* purely stateless.
*/
export class StreamUpdater {
constructor(
private readonly logger: DiagnosticsLogger,
private readonly commitStorer: CommitStorer,
private readonly logSyncer: LogSyncer,
private readonly anchorTimestampExtractor: AnchorTimestampExtractor,
private readonly stateManipulator: StateManipulator
) {}

/**
* Applies a tip that was learned about via the p2p network (ie from pubsub, ReCon, HDS, etc) to
* the given StreamState. Because it came from the network, we cannot trust that the tip is
* actually a valid tip for the stream. If it is not, we just return the same StreamState
* unmodified.
* @param state
* @param tip
*/
async applyTipFromNetwork(state: StreamState, tip: CID): Promise<StreamState> {
return applyTipToState(
this.logSyncer,
this.anchorTimestampExtractor,
this.stateManipulator,
state,
tip,
{
throwOnInvalidCommit: false,
throwIfStale: false,
throwOnConflict: false,
}
)
}

/**
* Apply a commit that came in from an active application request via the HTTP client. Because
* this write comes in from an active application session, we apply stricter rules to it and
* throw errors in cases that might indicate an application bug - for instance if the write
* is built on stale state in the client relative to what the server knows is the most current
* state for the Stream.
* @param state
* @param commit
*/
async applyCommitFromUser(state: StreamState, commit: CeramicCommit): Promise<StreamState> {
const tip = await this.commitStorer.storeCommit(commit, StreamUtils.streamIdFromState(state))

return applyTipToState(
this.logSyncer,
this.anchorTimestampExtractor,
this.stateManipulator,
state,
tip,
{
throwOnInvalidCommit: true,
throwIfStale: true,
throwOnConflict: true,
}
)
}
}

0 comments on commit d3c0b16

Please sign in to comment.