diff --git a/packages/core/package.json b/packages/core/package.json index 0315c7bfda..1f742c07d2 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -75,6 +75,7 @@ "dag-jose": "^4.0.0", "dids": "^4.0.0", "it-all": "^3.0.1", + "it-batch": "^3.0.1", "it-first": "^3.0.1", "knex": "^2.1.0", "least-recent": "^1.0.3", diff --git a/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts b/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts index 9301636ef9..91b30c812f 100644 --- a/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts +++ b/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts @@ -8,6 +8,7 @@ import tmp from 'tmp-promise' import { createIPFS } from '@ceramicnetwork/ipfs-daemon' import { AnchorResumingService } from '../anchor-resuming-service.js' import type { CASResponse } from '@ceramicnetwork/codecs' +import all from 'it-all' describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => { jest.setTimeout(10000) @@ -64,7 +65,7 @@ describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => { }) ) - const loaded = (await ceramic.repository.anchorRequestStore.list()).map((result) => + const loaded = (await all(ceramic.repository.anchorRequestStore.list())).reduce((acc, array) => acc.concat(array), []).map((result) => result.key.toString() ) // LevelDB Store stores keys ordered lexicographically diff --git a/packages/core/src/state-management/anchor-resuming-service.ts b/packages/core/src/state-management/anchor-resuming-service.ts index 305fb8f339..846e71c14b 100644 --- a/packages/core/src/state-management/anchor-resuming-service.ts +++ b/packages/core/src/state-management/anchor-resuming-service.ts @@ -1,5 +1,3 @@ -import type { StreamID } from '@ceramicnetwork/streamid' -import type { AnchorRequestStoreListResult } from '../store/anchor-request-store.js' import type { Repository } from './repository.js' import { LogStyle, type DiagnosticsLogger } from '@ceramicnetwork/common' import { TaskQueue } from '../ancillary/task-queue.js' @@ -57,21 +55,18 @@ export class AnchorResumingService { this.logger.imp(`Resuming polling for streams with pending anchors`) let numRestoredStreams = 0 - let gt: StreamID | undefined = undefined - let batch = new Array() - do { - batch = await repository.anchorRequestStore.list(RESUME_BATCH_SIZE, gt) - for (const listResult of batch) { + for await (const batch of repository.anchorRequestStore.list(RESUME_BATCH_SIZE)) { + for (const item of batch) { + if (this.#shouldBeClosed) return this.resumeQ.add(async () => { - await repository.fromMemoryOrStore(listResult.key) - this.logger.verbose(`Resumed running state for stream id: ${listResult.key.toString()}`) + await repository.fromMemoryOrStore(item.key) + this.logger.verbose(`Resumed running state for stream id: ${item.key}`) numRestoredStreams++ }) await this.delay(this.getDelay()) } - gt = batch[batch.length - 1]?.key await this.resumeQ.onIdle() - } while (batch.length > 0 && !this.#shouldBeClosed) + } this.logger.imp( `Finished resuming polling for ${numRestoredStreams} streams which had pending anchors` diff --git a/packages/core/src/store/__tests__/level-anchor-request-store.test.ts b/packages/core/src/store/__tests__/level-anchor-request-store.test.ts index ced592d8ea..77985e12b0 100644 --- a/packages/core/src/store/__tests__/level-anchor-request-store.test.ts +++ b/packages/core/src/store/__tests__/level-anchor-request-store.test.ts @@ -12,6 +12,8 @@ import tmp from 'tmp-promise' import { StreamID } from '@ceramicnetwork/streamid' import { createIPFS } from '@ceramicnetwork/ipfs-daemon' import { createCeramic } from '../../__tests__/create-ceramic.js' +import first from 'it-first' +import all from 'it-all' const MODEL_CONTENT_1: ModelDefinition = { name: 'MyModel 1', @@ -224,10 +226,12 @@ describe('LevelDB-backed AnchorRequestStore state store', () => { { key: streamId3, value: anchorRequestData3 }, ].sort(sortByKeyStreamId) - const retrieved = await anchorRequestStore.list() + const retrieved = await all(anchorRequestStore.list()).then((batches) => + batches.reduce((acc, array) => acc.concat(array), []) + ) expect(retrieved.sort(sortByKeyStreamId)).toEqual(sortedParams) - const retrievedWithLimit = await anchorRequestStore.list(1) + const retrievedWithLimit = await first(anchorRequestStore.list()) expect(retrievedWithLimit).toEqual(sortedParams.slice(0, 1)) }) diff --git a/packages/core/src/store/anchor-request-store.ts b/packages/core/src/store/anchor-request-store.ts index ec5530b221..c7eec85e54 100644 --- a/packages/core/src/store/anchor-request-store.ts +++ b/packages/core/src/store/anchor-request-store.ts @@ -51,18 +51,25 @@ export class AnchorRequestStore extends ObjectStore return this.store.exists(generateKey(key), this.useCaseName) } - async list(limit?: number, gt?: StreamID): Promise> { - return ( - await this.store.find({ - limit: limit, + async *list(batchSize = 1): AsyncIterable> { + let gt: StreamID | undefined = undefined + do { + const batch = await this.store.find({ + limit: batchSize, useCaseName: this.useCaseName, gt: gt ? generateKey(gt) : undefined, }) - ).map((result) => { - return { - key: StreamID.fromString(result.key), - value: deserializeAnchorRequestData(result.value), + if (batch.length > 0) { + gt = StreamID.fromString(batch[batch.length - 1].key) + yield batch.map((item) => { + return { + key: StreamID.fromString(item.key), + value: deserializeAnchorRequestData(item.value), + } + }) + } else { + return } - }) + } while (true) } }