Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Almost never-ending stream of stored anchors #2942

Merged
merged 4 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"dag-jose": "^4.0.0",
"dids": "^4.0.0",
"it-all": "^3.0.1",
"it-batch": "^3.0.1",
"it-first": "^3.0.1",
"knex": "^2.1.0",
"least-recent": "^1.0.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import tmp from 'tmp-promise'
import { createIPFS } from '@ceramicnetwork/ipfs-daemon'
import { AnchorResumingService } from '../anchor-resuming-service.js'
import type { CASResponse } from '@ceramicnetwork/codecs'
import all from 'it-all'

describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => {
jest.setTimeout(10000)
Expand Down Expand Up @@ -64,7 +65,7 @@ describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => {
})
)

const loaded = (await ceramic.repository.anchorRequestStore.list()).map((result) =>
const loaded = (await all(ceramic.repository.anchorRequestStore.list())).reduce((acc, array) => acc.concat(array), []).map((result) =>
result.key.toString()
)
// LevelDB Store stores keys ordered lexicographically
Expand Down
17 changes: 6 additions & 11 deletions packages/core/src/state-management/anchor-resuming-service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { StreamID } from '@ceramicnetwork/streamid'
import type { AnchorRequestStoreListResult } from '../store/anchor-request-store.js'
import type { Repository } from './repository.js'
import { LogStyle, type DiagnosticsLogger } from '@ceramicnetwork/common'
import { TaskQueue } from '../ancillary/task-queue.js'
Expand Down Expand Up @@ -57,21 +55,18 @@ export class AnchorResumingService {
this.logger.imp(`Resuming polling for streams with pending anchors`)

let numRestoredStreams = 0
let gt: StreamID | undefined = undefined
let batch = new Array<AnchorRequestStoreListResult>()
do {
batch = await repository.anchorRequestStore.list(RESUME_BATCH_SIZE, gt)
for (const listResult of batch) {
for await (const batch of repository.anchorRequestStore.list(RESUME_BATCH_SIZE)) {
for (const item of batch) {
if (this.#shouldBeClosed) return
this.resumeQ.add(async () => {
await repository.fromMemoryOrStore(listResult.key)
this.logger.verbose(`Resumed running state for stream id: ${listResult.key.toString()}`)
await repository.fromMemoryOrStore(item.key)
this.logger.verbose(`Resumed running state for stream id: ${item.key}`)
numRestoredStreams++
})
await this.delay(this.getDelay())
}
gt = batch[batch.length - 1]?.key
await this.resumeQ.onIdle()
} while (batch.length > 0 && !this.#shouldBeClosed)
}

this.logger.imp(
`Finished resuming polling for ${numRestoredStreams} streams which had pending anchors`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import tmp from 'tmp-promise'
import { StreamID } from '@ceramicnetwork/streamid'
import { createIPFS } from '@ceramicnetwork/ipfs-daemon'
import { createCeramic } from '../../__tests__/create-ceramic.js'
import first from 'it-first'
import all from 'it-all'

const MODEL_CONTENT_1: ModelDefinition = {
name: 'MyModel 1',
Expand Down Expand Up @@ -224,10 +226,12 @@ describe('LevelDB-backed AnchorRequestStore state store', () => {
{ key: streamId3, value: anchorRequestData3 },
].sort(sortByKeyStreamId)

const retrieved = await anchorRequestStore.list()
const retrieved = await all(anchorRequestStore.list()).then((batches) =>
batches.reduce((acc, array) => acc.concat(array), [])
)
expect(retrieved.sort(sortByKeyStreamId)).toEqual(sortedParams)

const retrievedWithLimit = await anchorRequestStore.list(1)
const retrievedWithLimit = await first(anchorRequestStore.list())
expect(retrievedWithLimit).toEqual(sortedParams.slice(0, 1))
})

Expand Down
25 changes: 16 additions & 9 deletions packages/core/src/store/anchor-request-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,25 @@ export class AnchorRequestStore extends ObjectStore<StreamID, AnchorRequestData>
return this.store.exists(generateKey(key), this.useCaseName)
}

async list(limit?: number, gt?: StreamID): Promise<Array<AnchorRequestStoreListResult>> {
return (
await this.store.find({
limit: limit,
async *list(batchSize = 1): AsyncIterable<Array<AnchorRequestStoreListResult>> {
let gt: StreamID | undefined = undefined
do {
const batch = await this.store.find({
limit: batchSize,
useCaseName: this.useCaseName,
gt: gt ? generateKey(gt) : undefined,
})
).map((result) => {
return {
key: StreamID.fromString(result.key),
value: deserializeAnchorRequestData(result.value),
if (batch.length > 0) {
gt = StreamID.fromString(batch[batch.length - 1].key)
yield batch.map((item) => {
return {
key: StreamID.fromString(item.key),
value: deserializeAnchorRequestData(item.value),
}
})
} else {
return
}
})
} while (true)
}
}