diff --git a/packages/core/src/rpc/rpc-spec/chainHead_v1.ts b/packages/core/src/rpc/rpc-spec/chainHead_v1.ts index 946b5f7b..8f3e28dd 100644 --- a/packages/core/src/rpc/rpc-spec/chainHead_v1.ts +++ b/packages/core/src/rpc/rpc-spec/chainHead_v1.ts @@ -5,7 +5,17 @@ import { defaultLogger } from '../../logger.js' const logger = defaultLogger.child({ name: 'rpc-chainHead_v1' }) -const callbacks = new Map void>() +type DescendantValuesParams = { + prefix: string + startKey: string +} +const following = new Map< + string, + { + callback: (data: any) => void + pendingDescendantValues: Map + } +>() async function afterResponse(fn: () => void) { await new Promise((resolve) => setTimeout(resolve, 0)) @@ -61,11 +71,11 @@ export const chainHead_v1_follow: Handler<[boolean], string> = async ( const cleanup = () => { context.chain.headState.unsubscribeHead(id) - callbacks.delete(id) + following.delete(id) } const callback = subscribe('chainHead_v1_followEvent', id, cleanup) - callbacks.set(id, callback) + following.set(id, { callback, pendingDescendantValues: new Map() }) afterResponse(async () => { callback({ @@ -103,7 +113,7 @@ export const chainHead_v1_header: Handler<[string, HexString], HexString | null> context, [followSubscription, hash], ) => { - if (!callbacks.has(followSubscription)) return null + if (!following.has(followSubscription)) return null const block = await context.chain.getBlock(hash) return block ? (await block.header).toHex() : null @@ -134,7 +144,7 @@ export const chainHead_v1_call: Handler<[string, HexString, string, HexString], const block = await context.chain.getBlock(hash) if (!block) { - callbacks.get(followSubscription)?.({ + following.get(followSubscription)?.callback({ event: 'operationError', operationId, error: `Block ${hash} not found`, @@ -142,13 +152,13 @@ export const chainHead_v1_call: Handler<[string, HexString, string, HexString], } else { try { const resp = await block.call(method, [callParameters]) - callbacks.get(followSubscription)?.({ + following.get(followSubscription)?.callback({ event: 'operationCallDone', operationId, output: resp.result, }) } catch (ex: any) { - callbacks.get(followSubscription)?.({ + following.get(followSubscription)?.callback({ event: 'operationError', operationId, error: ex.message, @@ -166,6 +176,47 @@ export interface StorageItemRequest { type: 'value' | 'hash' | 'closestDescendantMerkleValue' | 'descendantsValues' | 'descendantsHashes' } +const PAGE_SIZE = 1000 +async function getDescendantValues( + block: Block, + params: DescendantValuesParams, +): Promise<{ + items: Array<{ + key: string + value?: HexString + }> + next: DescendantValuesParams | null +}> { + const keys = await block.getKeysPaged({ + ...params, + pageSize: PAGE_SIZE, + }) + + const items = await Promise.all( + keys.map((key) => + block.get(key).then((value) => ({ + key, + value, + })), + ), + ) + + if (keys.length < PAGE_SIZE) { + return { + items, + next: null, + } + } + + return { + items, + next: { + ...params, + startKey: keys[PAGE_SIZE - 1], + }, + } +} + /** * Query the storage for a given block * @@ -183,7 +234,7 @@ export const chainHead_v1_storage: Handler< afterResponse(async () => { const block = await context.chain.getBlock(hash) if (!block) { - callbacks.get(followSubscription)?.({ + following.get(followSubscription)?.callback({ event: 'operationError', operationId, error: 'Block not found', @@ -196,55 +247,50 @@ export const chainHead_v1_storage: Handler< case 'value': { const value = await block.get(sir.key) if (value) { - callbacks.get(followSubscription)?.({ + following.get(followSubscription)?.callback({ event: 'operationStorageItems', operationId, items: [{ key: sir.key, value }], }) } - break + return null } case 'descendantsValues': { - // TODO expose pagination - const pageSize = 100 - let startKey: string | null = '0x' - while (startKey) { - const keys = await block.getKeysPaged({ - prefix: sir.key, - pageSize, - startKey, - }) - startKey = keys[pageSize - 1] ?? null - - const items = await Promise.all( - keys.map((key) => - block.get(key).then((value) => ({ - key, - value, - })), - ), - ) - callbacks.get(followSubscription)?.({ - event: 'operationStorageItems', - operationId, - items, - }) - break - } - break + const { items, next } = await getDescendantValues(block, { prefix: sir.key, startKey: '0x' }) + + following.get(followSubscription)?.callback({ + event: 'operationStorageItems', + operationId, + items, + }) + + return next } default: // TODO console.warn(`Storage type not implemented ${sir.type}`) + return null } } - await Promise.all(items.map(handleStorageItemRequest)) + const listResult = await Promise.all(items.map(handleStorageItemRequest)) + const pending = listResult.filter((v) => v !== null) - callbacks.get(followSubscription)?.({ - event: 'operationStorageDone', - operationId, - }) + if (!pending.length) { + following.get(followSubscription)?.callback({ + event: 'operationStorageDone', + operationId, + }) + } else { + const follower = following.get(followSubscription) + if (follower) { + follower.pendingDescendantValues.set(operationId, { hash, params: pending }) + follower.callback({ + event: 'operationWaitingForContinue', + operationId, + }) + } + } }) return { @@ -268,7 +314,7 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted | context, [followSubscription, hash], ) => { - if (!callbacks.has(followSubscription)) return limitReached + if (!following.has(followSubscription)) return limitReached const block = await context.chain.getBlock(hash) if (!block) { throw new ResponseError(-32801, 'Block not found') @@ -278,7 +324,7 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted | afterResponse(async () => { const body = await block.extrinsics - callbacks.get(followSubscription)?.({ + following.get(followSubscription)?.callback({ event: 'operationBodyDone', operationId, value: body, @@ -288,18 +334,66 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted | return operationStarted(operationId) } -// Currently no-ops, will come into play when pagination is implemented +/** + * Resume an operation paused through `operationWaitingForContinue` + * + * @param context + * @param params - [`followSubscription`, `operationId`] + */ export const chainHead_v1_continue: Handler<[string, HexString], null> = async ( - _context, - [_followSubscription, _operationId], + context, + [followSubscription, operationId], ) => { + const follower = following.get(followSubscription) + const pendingOp = follower?.pendingDescendantValues.get(operationId) + if (!pendingOp || !follower) { + throw new ResponseError(-32803, "Operation ID doesn't have anything pending") + } + const block = await context.chain.getBlock(pendingOp.hash) + if (!block) { + throw new ResponseError(-32801, 'Block not found') + } + + afterResponse(async () => { + const handlePendingOperation = async (params: DescendantValuesParams) => { + const { items, next } = await getDescendantValues(block, params) + + follower.callback({ + event: 'operationStorageItems', + operationId, + items, + }) + + return next + } + + const listResult = await Promise.all(pendingOp.params.map(handlePendingOperation)) + const pending = listResult.filter((v) => v !== null) + + if (!pending.length) { + follower.pendingDescendantValues.delete(operationId) + follower.callback({ + event: 'operationStorageDone', + operationId, + }) + } else { + follower.pendingDescendantValues.set(operationId, { hash: pendingOp.hash, params: pending }) + follower.callback({ + event: 'operationWaitingForContinue', + operationId, + }) + } + }) + return null } export const chainHead_v1_stopOperation: Handler<[string, HexString], null> = async ( _context, - [_followSubscription, _operationId], + [followSubscription, operationId], ) => { + following.get(followSubscription)?.pendingDescendantValues.delete(operationId) + return null } diff --git a/packages/e2e/src/chainHead_v1.test.ts b/packages/e2e/src/chainHead_v1.test.ts index 5cc8943d..c3a6e34a 100644 --- a/packages/e2e/src/chainHead_v1.test.ts +++ b/packages/e2e/src/chainHead_v1.test.ts @@ -82,6 +82,20 @@ describe('chainHead_v1 rpc', () => { chainHead.unfollow() }) + it('runs through multiple pages of storage queries', async () => { + const chainHead = testApi.observableClient.chainHead$() + + const receivedItems = await firstValueFrom( + chainHead.storage$(null, 'descendantsValues', (ctx) => + ctx.dynamicBuilder.buildStorage('System', 'BlockHash').enc(), + ), + ) + + expect(receivedItems.length).toEqual(1201) + + chainHead.unfollow() + }) + it('resolves the header for a specific block', async () => { const chainHead = testApi.observableClient.chainHead$()