Skip to content

Commit

Permalink
chore: Almost never-ending stream of stored anchors (#2942)
Browse files Browse the repository at this point in the history
* wip listA

* chore: Get batching back

* chore: proper batch size

* chore: get clear back
  • Loading branch information
ukstv authored Sep 13, 2023
1 parent 60949ec commit e786c5a
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 23 deletions.
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
"dag-jose": "^4.0.0",
"dids": "^4.0.0",
"it-all": "^3.0.1",
"it-batch": "^3.0.1",
"it-first": "^3.0.1",
"knex": "^2.1.0",
"least-recent": "^1.0.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import tmp from 'tmp-promise'
import { createIPFS } from '@ceramicnetwork/ipfs-daemon'
import { AnchorResumingService } from '../anchor-resuming-service.js'
import type { CASResponse } from '@ceramicnetwork/codecs'
import all from 'it-all'

describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => {
jest.setTimeout(10000)
Expand Down Expand Up @@ -64,7 +65,7 @@ describe('resumeRunningStatesFromAnchorRequestStore(...) method', () => {
})
)

const loaded = (await ceramic.repository.anchorRequestStore.list()).map((result) =>
const loaded = (await all(ceramic.repository.anchorRequestStore.list())).reduce((acc, array) => acc.concat(array), []).map((result) =>
result.key.toString()
)
// LevelDB Store stores keys ordered lexicographically
Expand Down
17 changes: 6 additions & 11 deletions packages/core/src/state-management/anchor-resuming-service.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { StreamID } from '@ceramicnetwork/streamid'
import type { AnchorRequestStoreListResult } from '../store/anchor-request-store.js'
import type { Repository } from './repository.js'
import { LogStyle, type DiagnosticsLogger } from '@ceramicnetwork/common'
import { TaskQueue } from '../ancillary/task-queue.js'
Expand Down Expand Up @@ -57,21 +55,18 @@ export class AnchorResumingService {
this.logger.imp(`Resuming polling for streams with pending anchors`)

let numRestoredStreams = 0
let gt: StreamID | undefined = undefined
let batch = new Array<AnchorRequestStoreListResult>()
do {
batch = await repository.anchorRequestStore.list(RESUME_BATCH_SIZE, gt)
for (const listResult of batch) {
for await (const batch of repository.anchorRequestStore.list(RESUME_BATCH_SIZE)) {
for (const item of batch) {
if (this.#shouldBeClosed) return
this.resumeQ.add(async () => {
await repository.fromMemoryOrStore(listResult.key)
this.logger.verbose(`Resumed running state for stream id: ${listResult.key.toString()}`)
await repository.fromMemoryOrStore(item.key)
this.logger.verbose(`Resumed running state for stream id: ${item.key}`)
numRestoredStreams++
})
await this.delay(this.getDelay())
}
gt = batch[batch.length - 1]?.key
await this.resumeQ.onIdle()
} while (batch.length > 0 && !this.#shouldBeClosed)
}

this.logger.imp(
`Finished resuming polling for ${numRestoredStreams} streams which had pending anchors`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import tmp from 'tmp-promise'
import { StreamID } from '@ceramicnetwork/streamid'
import { createIPFS } from '@ceramicnetwork/ipfs-daemon'
import { createCeramic } from '../../__tests__/create-ceramic.js'
import first from 'it-first'
import all from 'it-all'

const MODEL_CONTENT_1: ModelDefinition = {
name: 'MyModel 1',
Expand Down Expand Up @@ -224,10 +226,12 @@ describe('LevelDB-backed AnchorRequestStore state store', () => {
{ key: streamId3, value: anchorRequestData3 },
].sort(sortByKeyStreamId)

const retrieved = await anchorRequestStore.list()
const retrieved = await all(anchorRequestStore.list()).then((batches) =>
batches.reduce((acc, array) => acc.concat(array), [])
)
expect(retrieved.sort(sortByKeyStreamId)).toEqual(sortedParams)

const retrievedWithLimit = await anchorRequestStore.list(1)
const retrievedWithLimit = await first(anchorRequestStore.list())
expect(retrievedWithLimit).toEqual(sortedParams.slice(0, 1))
})

Expand Down
25 changes: 16 additions & 9 deletions packages/core/src/store/anchor-request-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,25 @@ export class AnchorRequestStore extends ObjectStore<StreamID, AnchorRequestData>
return this.store.exists(generateKey(key), this.useCaseName)
}

async list(limit?: number, gt?: StreamID): Promise<Array<AnchorRequestStoreListResult>> {
return (
await this.store.find({
limit: limit,
async *list(batchSize = 1): AsyncIterable<Array<AnchorRequestStoreListResult>> {
let gt: StreamID | undefined = undefined
do {
const batch = await this.store.find({
limit: batchSize,
useCaseName: this.useCaseName,
gt: gt ? generateKey(gt) : undefined,
})
).map((result) => {
return {
key: StreamID.fromString(result.key),
value: deserializeAnchorRequestData(result.value),
if (batch.length > 0) {
gt = StreamID.fromString(batch[batch.length - 1].key)
yield batch.map((item) => {
return {
key: StreamID.fromString(item.key),
value: deserializeAnchorRequestData(item.value),
}
})
} else {
return
}
})
} while (true)
}
}

0 comments on commit e786c5a

Please sign in to comment.