Skip to content

Commit

Permalink
feat(chainHead_v1): operationWaitingForContinue and chainHead_v1_cont…
Browse files Browse the repository at this point in the history
…inue (#838)

* feat(chainHead_v1): operationWaitingForContinue and chainHead_v1_continue

* fix inconsistent page size
  • Loading branch information
voliva authored Oct 16, 2024
1 parent e375a9f commit ac3d39e
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 48 deletions.
190 changes: 142 additions & 48 deletions packages/core/src/rpc/rpc-spec/chainHead_v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import { defaultLogger } from '../../logger.js'

const logger = defaultLogger.child({ name: 'rpc-chainHead_v1' })

const callbacks = new Map<string, (data: any) => void>()
type DescendantValuesParams = {
prefix: string
startKey: string
}
const following = new Map<
string,
{
callback: (data: any) => void
pendingDescendantValues: Map<string, { hash: HexString; params: DescendantValuesParams[] }>
}
>()

async function afterResponse(fn: () => void) {
await new Promise((resolve) => setTimeout(resolve, 0))
Expand Down Expand Up @@ -61,11 +71,11 @@ export const chainHead_v1_follow: Handler<[boolean], string> = async (

const cleanup = () => {
context.chain.headState.unsubscribeHead(id)
callbacks.delete(id)
following.delete(id)
}

const callback = subscribe('chainHead_v1_followEvent', id, cleanup)
callbacks.set(id, callback)
following.set(id, { callback, pendingDescendantValues: new Map() })

afterResponse(async () => {
callback({
Expand Down Expand Up @@ -103,7 +113,7 @@ export const chainHead_v1_header: Handler<[string, HexString], HexString | null>
context,
[followSubscription, hash],
) => {
if (!callbacks.has(followSubscription)) return null
if (!following.has(followSubscription)) return null
const block = await context.chain.getBlock(hash)

return block ? (await block.header).toHex() : null
Expand Down Expand Up @@ -134,21 +144,21 @@ export const chainHead_v1_call: Handler<[string, HexString, string, HexString],
const block = await context.chain.getBlock(hash)

if (!block) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationError',
operationId,
error: `Block ${hash} not found`,
})
} else {
try {
const resp = await block.call(method, [callParameters])
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationCallDone',
operationId,
output: resp.result,
})
} catch (ex: any) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationError',
operationId,
error: ex.message,
Expand All @@ -166,6 +176,47 @@ export interface StorageItemRequest {
type: 'value' | 'hash' | 'closestDescendantMerkleValue' | 'descendantsValues' | 'descendantsHashes'
}

const PAGE_SIZE = 1000
async function getDescendantValues(
block: Block,
params: DescendantValuesParams,
): Promise<{
items: Array<{
key: string
value?: HexString
}>
next: DescendantValuesParams | null
}> {
const keys = await block.getKeysPaged({
...params,
pageSize: PAGE_SIZE,
})

const items = await Promise.all(
keys.map((key) =>
block.get(key).then((value) => ({
key,
value,
})),
),
)

if (keys.length < PAGE_SIZE) {
return {
items,
next: null,
}
}

return {
items,
next: {
...params,
startKey: keys[PAGE_SIZE - 1],
},
}
}

/**
* Query the storage for a given block
*
Expand All @@ -183,7 +234,7 @@ export const chainHead_v1_storage: Handler<
afterResponse(async () => {
const block = await context.chain.getBlock(hash)
if (!block) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationError',
operationId,
error: 'Block not found',
Expand All @@ -196,55 +247,50 @@ export const chainHead_v1_storage: Handler<
case 'value': {
const value = await block.get(sir.key)
if (value) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationStorageItems',
operationId,
items: [{ key: sir.key, value }],
})
}
break
return null
}
case 'descendantsValues': {
// TODO expose pagination
const pageSize = 100
let startKey: string | null = '0x'
while (startKey) {
const keys = await block.getKeysPaged({
prefix: sir.key,
pageSize,
startKey,
})
startKey = keys[pageSize - 1] ?? null

const items = await Promise.all(
keys.map((key) =>
block.get(key).then((value) => ({
key,
value,
})),
),
)
callbacks.get(followSubscription)?.({
event: 'operationStorageItems',
operationId,
items,
})
break
}
break
const { items, next } = await getDescendantValues(block, { prefix: sir.key, startKey: '0x' })

following.get(followSubscription)?.callback({
event: 'operationStorageItems',
operationId,
items,
})

return next
}
default:
// TODO
console.warn(`Storage type not implemented ${sir.type}`)
return null
}
}

await Promise.all(items.map(handleStorageItemRequest))
const listResult = await Promise.all(items.map(handleStorageItemRequest))
const pending = listResult.filter((v) => v !== null)

callbacks.get(followSubscription)?.({
event: 'operationStorageDone',
operationId,
})
if (!pending.length) {
following.get(followSubscription)?.callback({
event: 'operationStorageDone',
operationId,
})
} else {
const follower = following.get(followSubscription)
if (follower) {
follower.pendingDescendantValues.set(operationId, { hash, params: pending })
follower.callback({
event: 'operationWaitingForContinue',
operationId,
})
}
}
})

return {
Expand All @@ -268,7 +314,7 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted |
context,
[followSubscription, hash],
) => {
if (!callbacks.has(followSubscription)) return limitReached
if (!following.has(followSubscription)) return limitReached
const block = await context.chain.getBlock(hash)
if (!block) {
throw new ResponseError(-32801, 'Block not found')
Expand All @@ -278,7 +324,7 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted |
afterResponse(async () => {
const body = await block.extrinsics

callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationBodyDone',
operationId,
value: body,
Expand All @@ -288,18 +334,66 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted |
return operationStarted(operationId)
}

// Currently no-ops, will come into play when pagination is implemented
/**
* Resume an operation paused through `operationWaitingForContinue`
*
* @param context
* @param params - [`followSubscription`, `operationId`]
*/
export const chainHead_v1_continue: Handler<[string, HexString], null> = async (
_context,
[_followSubscription, _operationId],
context,
[followSubscription, operationId],
) => {
const follower = following.get(followSubscription)
const pendingOp = follower?.pendingDescendantValues.get(operationId)
if (!pendingOp || !follower) {
throw new ResponseError(-32803, "Operation ID doesn't have anything pending")
}
const block = await context.chain.getBlock(pendingOp.hash)
if (!block) {
throw new ResponseError(-32801, 'Block not found')
}

afterResponse(async () => {
const handlePendingOperation = async (params: DescendantValuesParams) => {
const { items, next } = await getDescendantValues(block, params)

follower.callback({
event: 'operationStorageItems',
operationId,
items,
})

return next
}

const listResult = await Promise.all(pendingOp.params.map(handlePendingOperation))
const pending = listResult.filter((v) => v !== null)

if (!pending.length) {
follower.pendingDescendantValues.delete(operationId)
follower.callback({
event: 'operationStorageDone',
operationId,
})
} else {
follower.pendingDescendantValues.set(operationId, { hash: pendingOp.hash, params: pending })
follower.callback({
event: 'operationWaitingForContinue',
operationId,
})
}
})

return null
}

export const chainHead_v1_stopOperation: Handler<[string, HexString], null> = async (
_context,
[_followSubscription, _operationId],
[followSubscription, operationId],
) => {
following.get(followSubscription)?.pendingDescendantValues.delete(operationId)

return null
}

Expand Down
14 changes: 14 additions & 0 deletions packages/e2e/src/chainHead_v1.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ describe('chainHead_v1 rpc', () => {
chainHead.unfollow()
})

it('runs through multiple pages of storage queries', async () => {
const chainHead = testApi.observableClient.chainHead$()

const receivedItems = await firstValueFrom(
chainHead.storage$(null, 'descendantsValues', (ctx) =>
ctx.dynamicBuilder.buildStorage('System', 'BlockHash').enc(),
),
)

expect(receivedItems.length).toEqual(1201)

chainHead.unfollow()
})

it('resolves the header for a specific block', async () => {
const chainHead = testApi.observableClient.chainHead$()

Expand Down

0 comments on commit ac3d39e

Please sign in to comment.