Skip to content

Commit

Permalink
Add support for filters and JSON-RPC batched requests (#75)
Browse files Browse the repository at this point in the history
* initial filter support

* better error handling

* support batched JSON RPC requests

* support uninstalling filters

* add chain -> filters tests

* rpc -> filters test

* tweak filters behaviour to match the spec

* lint fix
  • Loading branch information
krzkaczor authored Mar 1, 2020
1 parent fd6ae79 commit 540ebe3
Show file tree
Hide file tree
Showing 10 changed files with 295 additions and 30 deletions.
47 changes: 46 additions & 1 deletion packages/chain/src/Chain.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Address, Hash, Quantity, bnToQuantity, HexData, bufferToHexData, bufferToAddress } from './model'
import { Address, Hash, Quantity, bnToQuantity, HexData, bufferToHexData, bufferToAddress, numberToQuantity, quantityToNumber } from './model'
import {
Tag,
RpcTransactionRequest,
Expand All @@ -19,6 +19,8 @@ import { Transaction } from 'ethereumjs-tx'
import { EventEmitter } from './utils/EventEmitter'
// eslint-disable-next-line no-restricted-imports
import { InterpreterStep } from 'ethereumts-vm/dist/evm/interpreter'
import { assert, SafeDictionary } from 'ts-essentials'
import { ChainFilter } from './model/ChainFilter'

export interface TransactionEvent {
to?: Address,
Expand Down Expand Up @@ -206,4 +208,47 @@ export class Chain {
data: tx.data?.length > 0 ? bufferToHexData(tx.data) : undefined,
}
}

private filters: SafeDictionary<ChainFilter> = {}
private filtersCount = 0;
async createNewBlockFilter (): Promise<Quantity> {
const currentId = numberToQuantity(this.filtersCount++)

const block = await this.vm.getLatestBlock()
this.filters[currentId] = { type: 'block', lastSeenBlock: quantityToNumber(block.number) }

return currentId
}

async getFilterChanges (id: Quantity): Promise<Hash[]> {
const filter = this.filters[id]
if (!filter) {
throw new Error(`Filter with ${id} doesnt exist`)
}

assert(filter.type === 'block')

const latestBlockNumber = quantityToNumber(await this.vm.getBlockNumber())

const newBlockHashes: Hash[] = []
for (let i = filter.lastSeenBlock; i <= latestBlockNumber; i++) {
const block = await this.vm.getBlock(numberToQuantity(i))
newBlockHashes.push(block.hash)
}

filter.lastSeenBlock = latestBlockNumber + 1

return newBlockHashes
}

uninstallFilter (id: Quantity): boolean {
const filter = this.filters[id]

if (!filter) {
return false
}

this.filters[id] = undefined
return true
}
}
7 changes: 7 additions & 0 deletions packages/chain/src/model/ChainFilter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Internal stored representation of blockchain filter
*/
export interface ChainFilter {
type: 'block',
lastSeenBlock: number,
}
79 changes: 79 additions & 0 deletions packages/chain/test/Chain/filters.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Chain, numberToQuantity } from '../../src'
import { expect } from 'chai'

async function createChain (): Promise<Chain> {
const chain = new Chain()
await chain.init()

return chain
}

describe('Chain -> filters', () => {
it('creates filters', async () => {
const chain = await createChain()

const id = await chain.createNewBlockFilter()

expect(id).to.be.eq(numberToQuantity(0))
})

it('gets changes right after creating a filter', async () => {
const chain = await createChain()

const id = await chain.createNewBlockFilter()

const changes = await chain.getFilterChanges(id)

expect(changes).to.have.length(1)
expect(changes[0]).to.be.string
})

it('gets changes from existing filters', async () => {
const chain = await createChain()

// this block shouldn't matter
await chain.mineBlock()

const id = await chain.createNewBlockFilter()

await chain.mineBlock()

const changes = await chain.getFilterChanges(id)

expect(changes).to.have.length(2)
})

it('gets changes from existing filters from the last pool', async () => {
const chain = await createChain()

const id = await chain.createNewBlockFilter()
await chain.getFilterChanges(id)
const changes = await chain.getFilterChanges(id)

expect(changes).to.have.length(0)
})

it('throws on not-existing filters', async () => {
const chain = await createChain()

const changesPromise = chain.getFilterChanges(numberToQuantity(1))

expect(changesPromise).to.be.rejected
})

it('uninstalls existing filters', async () => {
const chain = await createChain()

const id = await chain.createNewBlockFilter()

expect(await chain.uninstallFilter(id)).to.be.true
// it should reject since this filter doesnt exist anymore
expect(chain.getFilterChanges(id)).to.be.rejected
})

it('uninstalls not-existing filters', async () => {
const chain = await createChain()

expect(await chain.uninstallFilter(numberToQuantity(1))).to.be.false
})
})
24 changes: 16 additions & 8 deletions packages/node/src/middleware/rpcRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ import bodyParser from 'body-parser'
import { asyncMiddleware } from './utils'

import { RPCExecutorType, rpcCommandsDescription } from '../rpc/schema'
import { sanitizeRPCEnvelope, sanitizeRPC, executeRPC, respondRPC } from '../rpc/middlewares'
import { parseRPCRequest, sanitizeRPC, executeRPC, respondRPC, JsonRpcResultEnvelope } from '../rpc/middlewares'

export function rpcRouter (rpcExecutor: RPCExecutorType) {
const router = Router()

router.use(bodyParser.json({ type: '*/*' }))

router.post('/', asyncMiddleware(async (req, res) => {
const envelope = await sanitizeRPCEnvelope(req.body)
const parameters = await sanitizeRPC(rpcCommandsDescription, envelope)
const result = await executeRPC(rpcExecutor, envelope.method, parameters)
const response = respondRPC(rpcCommandsDescription, envelope, result)
res.status(200).send(response)
}))
router.post(
'/',
asyncMiddleware(async (req, res) => {
const request = await parseRPCRequest(req.body)

const responses: JsonRpcResultEnvelope[] = []
for (const envelope of request.envelopes) {
const parameters = await sanitizeRPC(rpcCommandsDescription, envelope)
const result = await executeRPC(rpcExecutor, envelope.method, parameters)
responses.push(respondRPC(rpcCommandsDescription, envelope, result))
}

res.status(200).send(request.batched ? responses : responses[0])
}),
)

return router
}
42 changes: 27 additions & 15 deletions packages/node/src/rpc/middlewares.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,43 @@ const jsonRpcEnvelope = t.type({
params: t.any,
})

type RpcEnvelope = t.TypeOf<typeof jsonRpcEnvelope>
const jsonRpcRequest = t.union([jsonRpcEnvelope, t.array(jsonRpcEnvelope)])

export async function sanitizeRPCEnvelope (body: unknown) {
const result = jsonRpcEnvelope.decode(body)
type JsonRpcRequestEnvelope = t.TypeOf<typeof jsonRpcEnvelope>

export interface JsonRpcResultEnvelope {
jsonrpc: string,
id: number | string,
result: any,
}

// NOTE: one request can have multiple envelopes (batched requests)
export async function parseRPCRequest (
body: unknown,
): Promise<{ envelopes: JsonRpcRequestEnvelope[], batched: boolean }> {
const result = jsonRpcRequest.decode(body)
if (isLeft(result)) {
d(`Error during parsing RPC request. ${JSON.stringify(body)}`)
throw new IOTSError(result)
}
return result.right

if (result.right instanceof Array) {
d(`Batched RPC requests: ${result.right.length}`)
return { envelopes: result.right, batched: true }
}

return { envelopes: [result.right], batched: false }
}

export function sanitizeRPC<T extends t.Any> (
schema: RPCSchema,
{ method, params }: RpcEnvelope,
{ method, params }: JsonRpcRequestEnvelope,
): t.OutputOf<T> {
const rpcDescription = schema[method]
d(`--> RPC call ${method}`)
d(`--> RPC call data ${JSON.stringify(params)}`)
if (!rpcDescription) {
throw new NotFoundHttpError([
`RPC method: ${method} called with ${JSON.stringify(params)} not found`,
])
throw new NotFoundHttpError([`RPC method: ${method} called with ${JSON.stringify(params)} not found`])
}
// we need to normalize empty arrays to undefineds
const normalizedParams = Array.isArray(params) && params.length === 0 ? undefined : params
Expand All @@ -53,11 +69,7 @@ export function sanitizeRPC<T extends t.Any> (
throw new IOTSError(res)
}

export function executeRPC (
executors: RPCExecutors,
method: string,
params: unknown,
) {
export function executeRPC (executors: RPCExecutors, method: string, params: unknown) {
const executor = executors[method]
assert(executor, `Couldn't find executor for ${method}`)

Expand All @@ -69,9 +81,9 @@ export function executeRPC (

export function respondRPC (
schema: RPCSchema,
{ method, id }: RpcEnvelope,
{ method, id }: JsonRpcRequestEnvelope,
result: unknown,
) {
): JsonRpcResultEnvelope {
const rpcDescription = schema[method]
d(`<-- RES: ${JSON.stringify(result)}`)
assert(rpcDescription, `Couldn't find rpc description for ${method}`)
Expand Down
15 changes: 14 additions & 1 deletion packages/node/src/rpc/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ const log = t.type({
topics: t.array(hash),
transactionHash: hash,
transactionIndex: quantity,
// type: t.literal('mined'),
})

// https://github.com/ethereum/wiki/wiki/JSON-RPC#returns-31
Expand Down Expand Up @@ -147,6 +146,20 @@ export const rpcCommandsDescription = {
parameters: t.undefined,
returns: t.array(address),
},
eth_newBlockFilter: {
parameters: t.undefined,
returns: quantity,
},
eth_getFilterChanges: {
parameters: t.tuple([quantity]),
// note: currently supports only block filters
returns: t.array(hash),
},
eth_uninstallFilter: {
parameters: t.tuple([quantity]),
// note: currently supports only block filters
returns: t.boolean,
},

// ganache compatibility
// docs: https://github.com/trufflesuite/ganache-cli#custom-methods
Expand Down
9 changes: 9 additions & 0 deletions packages/node/src/services/rpcExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ export const createRpcExecutor = (
eth_accounts: () => {
return walletManager.getWallets().map(w => makeAddress(w.address))
},
eth_newBlockFilter: () => {
return chain.createNewBlockFilter()
},
eth_getFilterChanges: ([filterId]) => {
return chain.getFilterChanges(filterId)
},
eth_uninstallFilter: ([filterId]) => {
return chain.uninstallFilter(filterId)
},

// ganache compatibility
evm_increaseTime: ([n]) => {
Expand Down
28 changes: 23 additions & 5 deletions packages/node/test/middleware/errorHandler/rpc-errors.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,33 @@ describe('RPC/errors', () => {
.send({ jsonrpc: '2.0', method: 'net_version', params: [] })

expect(res).to.have.status(400)
expect(res.body).to.be.deep.eq({
expect(res.body).to.containSubset({
jsonrpc: '2.0',
error: {
code: -32600,
message: 'BadRequest',
// details are skipped
},
id: null,
})
})

it('throws error on malformed batched envelope', async () => {
const app = await buildTestApp()
const res = await request(app)
.post('/')
.send([
{ jsonrpc: '2.0', id: 1, method: 'net_version', params: [] },
{ jsonrpc: '2.0', method: 'net_version', params: [] },
])

expect(res).to.have.status(400)
expect(res.body).to.containSubset({
jsonrpc: '2.0',
error: {
code: -32600,
message: 'BadRequest',
details: [
'Invalid value undefined supplied to : { jsonrpc: "2.0", id: (number | string), method: string, params: any }/id: (number | string)/0: number',
'Invalid value undefined supplied to : { jsonrpc: "2.0", id: (number | string), method: string, params: any }/id: (number | string)/1: string',
],
// details are skipped
},
id: null,
})
Expand Down
Loading

0 comments on commit 540ebe3

Please sign in to comment.