From 82ae3b26e3c7024e99404d5ea919d21533210ef4 Mon Sep 17 00:00:00 2001 From: Sergey Ukustov Date: Fri, 8 Sep 2023 17:55:36 +0300 Subject: [PATCH 1/4] wip listA --- packages/core/package.json | 1 + .../__tests__/anchor-resuming-service.test.ts | 3 +- .../anchor-resuming-service.ts | 29 +++++++++---------- .../level-anchor-request-store.test.ts | 6 ++-- .../core/src/store/anchor-request-store.ts | 24 +++++++++------ 5 files changed, 35 insertions(+), 28 deletions(-) diff --git a/packages/core/package.json b/packages/core/package.json index 24e7927bc0..b0285691b1 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -74,6 +74,7 @@ "dag-jose": "^4.0.0", "dids": "^4.0.0", "it-all": "^3.0.1", + "it-batch": "^3.0.1", "it-first": "^3.0.1", "knex": "^2.1.0", "least-recent": "^1.0.3", diff --git a/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts b/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts index 9301636ef9..c1a31937e0 100644 --- a/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts +++ b/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts @@ -8,6 +8,7 @@ import tmp from 'tmp-promise' import { createIPFS } from '@ceramicnetwork/ipfs-daemon' import { AnchorResumingService } from '../anchor-resuming-service.js' import type { CASResponse } from '@ceramicnetwork/codecs' +import all from 'it-all' describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => { jest.setTimeout(10000) @@ -64,7 +65,7 @@ describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => { }) ) - const loaded = (await ceramic.repository.anchorRequestStore.list()).map((result) => + const loaded = (await all(ceramic.repository.anchorRequestStore.list())).map((result) => result.key.toString() ) // LevelDB Store stores keys ordered lexicographically diff --git a/packages/core/src/state-management/anchor-resuming-service.ts b/packages/core/src/state-management/anchor-resuming-service.ts index 305fb8f339..3accc9b9eb 100644 --- a/packages/core/src/state-management/anchor-resuming-service.ts +++ b/packages/core/src/state-management/anchor-resuming-service.ts @@ -1,5 +1,3 @@ -import type { StreamID } from '@ceramicnetwork/streamid' -import type { AnchorRequestStoreListResult } from '../store/anchor-request-store.js' import type { Repository } from './repository.js' import { LogStyle, type DiagnosticsLogger } from '@ceramicnetwork/common' import { TaskQueue } from '../ancillary/task-queue.js' @@ -57,21 +55,20 @@ export class AnchorResumingService { this.logger.imp(`Resuming polling for streams with pending anchors`) let numRestoredStreams = 0 - let gt: StreamID | undefined = undefined - let batch = new Array() - do { - batch = await repository.anchorRequestStore.list(RESUME_BATCH_SIZE, gt) - for (const listResult of batch) { - this.resumeQ.add(async () => { - await repository.fromMemoryOrStore(listResult.key) - this.logger.verbose(`Resumed running state for stream id: ${listResult.key.toString()}`) - numRestoredStreams++ - }) - await this.delay(this.getDelay()) + let n = 0 + for await (const item of repository.anchorRequestStore.list()) { + this.resumeQ.add(async () => { + await repository.fromMemoryOrStore(item.key) + this.logger.verbose(`Resumed running state for stream id: ${item.key}`) + numRestoredStreams++ + }) + await this.delay(this.getDelay()) + n += 1 + if (n >= RESUME_BATCH_SIZE) { + await this.resumeQ.onIdle() + n = 0 } - gt = batch[batch.length - 1]?.key - await this.resumeQ.onIdle() - } while (batch.length > 0 && !this.#shouldBeClosed) + } this.logger.imp( `Finished resuming polling for ${numRestoredStreams} streams which had pending anchors` diff --git a/packages/core/src/store/__tests__/level-anchor-request-store.test.ts b/packages/core/src/store/__tests__/level-anchor-request-store.test.ts index ced592d8ea..8aa8243e2b 100644 --- a/packages/core/src/store/__tests__/level-anchor-request-store.test.ts +++ b/packages/core/src/store/__tests__/level-anchor-request-store.test.ts @@ -12,6 +12,8 @@ import tmp from 'tmp-promise' import { StreamID } from '@ceramicnetwork/streamid' import { createIPFS } from '@ceramicnetwork/ipfs-daemon' import { createCeramic } from '../../__tests__/create-ceramic.js' +import first from 'it-first' +import all from 'it-all' const MODEL_CONTENT_1: ModelDefinition = { name: 'MyModel 1', @@ -224,10 +226,10 @@ describe('LevelDB-backed AnchorRequestStore state store', () => { { key: streamId3, value: anchorRequestData3 }, ].sort(sortByKeyStreamId) - const retrieved = await anchorRequestStore.list() + const retrieved = await all(anchorRequestStore.list()) expect(retrieved.sort(sortByKeyStreamId)).toEqual(sortedParams) - const retrievedWithLimit = await anchorRequestStore.list(1) + const retrievedWithLimit = await first(anchorRequestStore.list()) expect(retrievedWithLimit).toEqual(sortedParams.slice(0, 1)) }) diff --git a/packages/core/src/store/anchor-request-store.ts b/packages/core/src/store/anchor-request-store.ts index ec5530b221..f2ac572590 100644 --- a/packages/core/src/store/anchor-request-store.ts +++ b/packages/core/src/store/anchor-request-store.ts @@ -51,18 +51,24 @@ export class AnchorRequestStore extends ObjectStore return this.store.exists(generateKey(key), this.useCaseName) } - async list(limit?: number, gt?: StreamID): Promise> { - return ( - await this.store.find({ - limit: limit, + async *list(): AsyncIterable { + let hasMore = true + let gt: StreamID | undefined = undefined + do { + const [found] = await this.store.find({ + limit: 1, useCaseName: this.useCaseName, gt: gt ? generateKey(gt) : undefined, }) - ).map((result) => { - return { - key: StreamID.fromString(result.key), - value: deserializeAnchorRequestData(result.value), + if (found) { + gt = StreamID.fromString(found.key) + yield { + key: gt, + value: deserializeAnchorRequestData(found.value), + } + } else { + hasMore = false } - }) + } while (hasMore) } } From d4e08a517b328395eda2e19bd1f319a16fd2d933 Mon Sep 17 00:00:00 2001 From: Sergey Ukustov Date: Mon, 11 Sep 2023 15:42:44 +0300 Subject: [PATCH 2/4] chore: Get batching back --- .../__tests__/anchor-resuming-service.test.ts | 2 +- .../anchor-resuming-service.ts | 24 ++++++++---------- .../level-anchor-request-store.test.ts | 4 ++- .../core/src/store/anchor-request-store.ts | 25 ++++++++++--------- 4 files changed, 28 insertions(+), 27 deletions(-) diff --git a/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts b/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts index c1a31937e0..91b30c812f 100644 --- a/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts +++ b/packages/core/src/state-management/__tests__/anchor-resuming-service.test.ts @@ -65,7 +65,7 @@ describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => { }) ) - const loaded = (await all(ceramic.repository.anchorRequestStore.list())).map((result) => + const loaded = (await all(ceramic.repository.anchorRequestStore.list())).reduce((acc, array) => acc.concat(array), []).map((result) => result.key.toString() ) // LevelDB Store stores keys ordered lexicographically diff --git a/packages/core/src/state-management/anchor-resuming-service.ts b/packages/core/src/state-management/anchor-resuming-service.ts index 3accc9b9eb..7cb3d7b5a6 100644 --- a/packages/core/src/state-management/anchor-resuming-service.ts +++ b/packages/core/src/state-management/anchor-resuming-service.ts @@ -55,19 +55,17 @@ export class AnchorResumingService { this.logger.imp(`Resuming polling for streams with pending anchors`) let numRestoredStreams = 0 - let n = 0 - for await (const item of repository.anchorRequestStore.list()) { - this.resumeQ.add(async () => { - await repository.fromMemoryOrStore(item.key) - this.logger.verbose(`Resumed running state for stream id: ${item.key}`) - numRestoredStreams++ - }) - await this.delay(this.getDelay()) - n += 1 - if (n >= RESUME_BATCH_SIZE) { - await this.resumeQ.onIdle() - n = 0 + for await (const batch of repository.anchorRequestStore.list()) { + for (const item of batch) { + if (this.#shouldBeClosed) return + this.resumeQ.add(async () => { + await repository.fromMemoryOrStore(item.key) + this.logger.verbose(`Resumed running state for stream id: ${item.key}`) + numRestoredStreams++ + }) + await this.delay(this.getDelay()) } + await this.resumeQ.onIdle() } this.logger.imp( @@ -78,7 +76,7 @@ export class AnchorResumingService { async close(): Promise { this.logger.debug('Closing AnchorResumingService') this.#shouldBeClosed = true - this.resumeQ.clear() + await this.resumeQ.onIdle() this.logger.debug('Waiting for remaining AnchorResumingService tasks to stop') await this.resumeQ.onIdle() this.logger.debug('AnchorResumingService closed') diff --git a/packages/core/src/store/__tests__/level-anchor-request-store.test.ts b/packages/core/src/store/__tests__/level-anchor-request-store.test.ts index 8aa8243e2b..77985e12b0 100644 --- a/packages/core/src/store/__tests__/level-anchor-request-store.test.ts +++ b/packages/core/src/store/__tests__/level-anchor-request-store.test.ts @@ -226,7 +226,9 @@ describe('LevelDB-backed AnchorRequestStore state store', () => { { key: streamId3, value: anchorRequestData3 }, ].sort(sortByKeyStreamId) - const retrieved = await all(anchorRequestStore.list()) + const retrieved = await all(anchorRequestStore.list()).then((batches) => + batches.reduce((acc, array) => acc.concat(array), []) + ) expect(retrieved.sort(sortByKeyStreamId)).toEqual(sortedParams) const retrievedWithLimit = await first(anchorRequestStore.list()) diff --git a/packages/core/src/store/anchor-request-store.ts b/packages/core/src/store/anchor-request-store.ts index f2ac572590..c7eec85e54 100644 --- a/packages/core/src/store/anchor-request-store.ts +++ b/packages/core/src/store/anchor-request-store.ts @@ -51,24 +51,25 @@ export class AnchorRequestStore extends ObjectStore return this.store.exists(generateKey(key), this.useCaseName) } - async *list(): AsyncIterable { - let hasMore = true + async *list(batchSize = 1): AsyncIterable> { let gt: StreamID | undefined = undefined do { - const [found] = await this.store.find({ - limit: 1, + const batch = await this.store.find({ + limit: batchSize, useCaseName: this.useCaseName, gt: gt ? generateKey(gt) : undefined, }) - if (found) { - gt = StreamID.fromString(found.key) - yield { - key: gt, - value: deserializeAnchorRequestData(found.value), - } + if (batch.length > 0) { + gt = StreamID.fromString(batch[batch.length - 1].key) + yield batch.map((item) => { + return { + key: StreamID.fromString(item.key), + value: deserializeAnchorRequestData(item.value), + } + }) } else { - hasMore = false + return } - } while (hasMore) + } while (true) } } From 99424a453d34c8be5aadc373d5ad00af9e240196 Mon Sep 17 00:00:00 2001 From: Sergey Ukustov Date: Mon, 11 Sep 2023 15:52:31 +0300 Subject: [PATCH 3/4] chore: proper batch size --- packages/core/src/state-management/anchor-resuming-service.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/core/src/state-management/anchor-resuming-service.ts b/packages/core/src/state-management/anchor-resuming-service.ts index 7cb3d7b5a6..49be06d757 100644 --- a/packages/core/src/state-management/anchor-resuming-service.ts +++ b/packages/core/src/state-management/anchor-resuming-service.ts @@ -55,7 +55,7 @@ export class AnchorResumingService { this.logger.imp(`Resuming polling for streams with pending anchors`) let numRestoredStreams = 0 - for await (const batch of repository.anchorRequestStore.list()) { + for await (const batch of repository.anchorRequestStore.list(RESUME_BATCH_SIZE)) { for (const item of batch) { if (this.#shouldBeClosed) return this.resumeQ.add(async () => { @@ -76,7 +76,6 @@ export class AnchorResumingService { async close(): Promise { this.logger.debug('Closing AnchorResumingService') this.#shouldBeClosed = true - await this.resumeQ.onIdle() this.logger.debug('Waiting for remaining AnchorResumingService tasks to stop') await this.resumeQ.onIdle() this.logger.debug('AnchorResumingService closed') From 6f736eb46e8cf713e03fe69248fed20186434c9f Mon Sep 17 00:00:00 2001 From: Sergey Ukustov Date: Wed, 13 Sep 2023 14:16:25 +0300 Subject: [PATCH 4/4] chore: get clear back --- packages/core/src/state-management/anchor-resuming-service.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/core/src/state-management/anchor-resuming-service.ts b/packages/core/src/state-management/anchor-resuming-service.ts index 49be06d757..846e71c14b 100644 --- a/packages/core/src/state-management/anchor-resuming-service.ts +++ b/packages/core/src/state-management/anchor-resuming-service.ts @@ -76,6 +76,7 @@ export class AnchorResumingService { async close(): Promise { this.logger.debug('Closing AnchorResumingService') this.#shouldBeClosed = true + this.resumeQ.clear() this.logger.debug('Waiting for remaining AnchorResumingService tasks to stop') await this.resumeQ.onIdle() this.logger.debug('AnchorResumingService closed')