Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(chainHead_v1): operationWaitingForContinue and chainHead_v1_continue #838

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 142 additions & 48 deletions packages/core/src/rpc/rpc-spec/chainHead_v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import { defaultLogger } from '../../logger.js'

const logger = defaultLogger.child({ name: 'rpc-chainHead_v1' })

const callbacks = new Map<string, (data: any) => void>()
type DescendantValuesParams = {
prefix: string
startKey: string
}
const following = new Map<
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One downside about global variables is that for some use cases, we need to handle chopsticks instances running at a same time (e.g. unit tests) so need to ensure they don't interfere with each other.
In this particular case, it is fine as everything is indexed by a unique random ID. The only drawback I can see is potential memory leak (i.e. destroy a chopsticks instance will not clear entry here). But that's not a very big deal here.
So this is ok for now. Maybe later refactor this into context somehow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string,
{
callback: (data: any) => void
pendingDescendantValues: Map<string, { hash: HexString; params: DescendantValuesParams[] }>
}
>()

async function afterResponse(fn: () => void) {
await new Promise((resolve) => setTimeout(resolve, 0))
Expand Down Expand Up @@ -61,11 +71,11 @@ export const chainHead_v1_follow: Handler<[boolean], string> = async (

const cleanup = () => {
context.chain.headState.unsubscribeHead(id)
callbacks.delete(id)
following.delete(id)
}

const callback = subscribe('chainHead_v1_followEvent', id, cleanup)
callbacks.set(id, callback)
following.set(id, { callback, pendingDescendantValues: new Map() })

afterResponse(async () => {
callback({
Expand Down Expand Up @@ -103,7 +113,7 @@ export const chainHead_v1_header: Handler<[string, HexString], HexString | null>
context,
[followSubscription, hash],
) => {
if (!callbacks.has(followSubscription)) return null
if (!following.has(followSubscription)) return null
const block = await context.chain.getBlock(hash)

return block ? (await block.header).toHex() : null
Expand Down Expand Up @@ -134,21 +144,21 @@ export const chainHead_v1_call: Handler<[string, HexString, string, HexString],
const block = await context.chain.getBlock(hash)

if (!block) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationError',
operationId,
error: `Block ${hash} not found`,
})
} else {
try {
const resp = await block.call(method, [callParameters])
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationCallDone',
operationId,
output: resp.result,
})
} catch (ex: any) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationError',
operationId,
error: ex.message,
Expand All @@ -166,6 +176,47 @@ export interface StorageItemRequest {
type: 'value' | 'hash' | 'closestDescendantMerkleValue' | 'descendantsValues' | 'descendantsHashes'
}

const PAGE_SIZE = 1000
async function getDescendantValues(
block: Block,
params: DescendantValuesParams,
): Promise<{
items: Array<{
key: string
value?: HexString
}>
next: DescendantValuesParams | null
}> {
const keys = await block.getKeysPaged({
...params,
pageSize: PAGE_SIZE,
})

const items = await Promise.all(
keys.map((key) =>
block.get(key).then((value) => ({
key,
value,
})),
),
)

if (keys.length < PAGE_SIZE) {
return {
items,
next: null,
}
}

return {
items,
next: {
...params,
startKey: keys[PAGE_SIZE - 1],
},
}
}

/**
* Query the storage for a given block
*
Expand All @@ -183,7 +234,7 @@ export const chainHead_v1_storage: Handler<
afterResponse(async () => {
const block = await context.chain.getBlock(hash)
if (!block) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationError',
operationId,
error: 'Block not found',
Expand All @@ -196,55 +247,50 @@ export const chainHead_v1_storage: Handler<
case 'value': {
const value = await block.get(sir.key)
if (value) {
callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationStorageItems',
operationId,
items: [{ key: sir.key, value }],
})
}
break
return null
}
case 'descendantsValues': {
// TODO expose pagination
const pageSize = 100
let startKey: string | null = '0x'
while (startKey) {
const keys = await block.getKeysPaged({
prefix: sir.key,
pageSize,
startKey,
})
startKey = keys[pageSize - 1] ?? null

const items = await Promise.all(
keys.map((key) =>
block.get(key).then((value) => ({
key,
value,
})),
),
)
callbacks.get(followSubscription)?.({
event: 'operationStorageItems',
operationId,
items,
})
break
}
break
const { items, next } = await getDescendantValues(block, { prefix: sir.key, startKey: '0x' })

following.get(followSubscription)?.callback({
event: 'operationStorageItems',
operationId,
items,
})

return next
}
default:
// TODO
console.warn(`Storage type not implemented ${sir.type}`)
return null
}
}

await Promise.all(items.map(handleStorageItemRequest))
const listResult = await Promise.all(items.map(handleStorageItemRequest))
const pending = listResult.filter((v) => v !== null)

callbacks.get(followSubscription)?.({
event: 'operationStorageDone',
operationId,
})
if (!pending.length) {
following.get(followSubscription)?.callback({
event: 'operationStorageDone',
operationId,
})
} else {
const follower = following.get(followSubscription)
if (follower) {
follower.pendingDescendantValues.set(operationId, { hash, params: pending })
follower.callback({
event: 'operationWaitingForContinue',
operationId,
})
}
}
})

return {
Expand All @@ -268,7 +314,7 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted |
context,
[followSubscription, hash],
) => {
if (!callbacks.has(followSubscription)) return limitReached
if (!following.has(followSubscription)) return limitReached
const block = await context.chain.getBlock(hash)
if (!block) {
throw new ResponseError(-32801, 'Block not found')
Expand All @@ -278,7 +324,7 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted |
afterResponse(async () => {
const body = await block.extrinsics

callbacks.get(followSubscription)?.({
following.get(followSubscription)?.callback({
event: 'operationBodyDone',
operationId,
value: body,
Expand All @@ -288,18 +334,66 @@ export const chainHead_v1_body: Handler<[string, HexString], OperationStarted |
return operationStarted(operationId)
}

// Currently no-ops, will come into play when pagination is implemented
/**
* Resume an operation paused through `operationWaitingForContinue`
*
* @param context
* @param params - [`followSubscription`, `operationId`]
*/
export const chainHead_v1_continue: Handler<[string, HexString], null> = async (
_context,
[_followSubscription, _operationId],
context,
[followSubscription, operationId],
) => {
const follower = following.get(followSubscription)
const pendingOp = follower?.pendingDescendantValues.get(operationId)
if (!pendingOp || !follower) {
throw new ResponseError(-32803, "Operation ID doesn't have anything pending")
}
const block = await context.chain.getBlock(pendingOp.hash)
if (!block) {
throw new ResponseError(-32801, 'Block not found')
}

afterResponse(async () => {
const handlePendingOperation = async (params: DescendantValuesParams) => {
const { items, next } = await getDescendantValues(block, params)

follower.callback({
event: 'operationStorageItems',
operationId,
items,
})

return next
}

const listResult = await Promise.all(pendingOp.params.map(handlePendingOperation))
const pending = listResult.filter((v) => v !== null)

if (!pending.length) {
follower.pendingDescendantValues.delete(operationId)
follower.callback({
event: 'operationStorageDone',
operationId,
})
} else {
follower.pendingDescendantValues.set(operationId, { hash: pendingOp.hash, params: pending })
follower.callback({
event: 'operationWaitingForContinue',
operationId,
})
}
})

return null
}

export const chainHead_v1_stopOperation: Handler<[string, HexString], null> = async (
_context,
[_followSubscription, _operationId],
[followSubscription, operationId],
) => {
following.get(followSubscription)?.pendingDescendantValues.delete(operationId)

return null
}

Expand Down
14 changes: 14 additions & 0 deletions packages/e2e/src/chainHead_v1.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ describe('chainHead_v1 rpc', () => {
chainHead.unfollow()
})

it('runs through multiple pages of storage queries', async () => {
const chainHead = testApi.observableClient.chainHead$()

const receivedItems = await firstValueFrom(
chainHead.storage$(null, 'descendantsValues', (ctx) =>
ctx.dynamicBuilder.buildStorage('System', 'BlockHash').enc(),
),
)

expect(receivedItems.length).toEqual(1201)

chainHead.unfollow()
})

it('resolves the header for a specific block', async () => {
const chainHead = testApi.observableClient.chainHead$()

Expand Down
Loading