Skip to content

Commit

Permalink
Kerem/eth subscribe (#113)
Browse files Browse the repository at this point in the history
* reflect-metadata

* rpc-methods wip

* rpc-methods wip

* wspair -> wscontext

* remove @Inject when unnecessary

* using autoBindInjectable

* defaultScope=singleton + skipBaseClassChecks

* config factory logging

* zodifying subscription rpc methods

* wdatatojson to util

* EthSubscribeRpcParamsType

* subscriptions/index.ts

* outbound subscription factory

* todo added

* created an eth_unsubscribe method

* created 2 util ws functions

* introducing context.abort

* better logging for the outbound subs factory

* outbound subscription

* ws-context handles subscription sharing + unsubscribes

* docs(changeset): Introducing subscription sharing

* removed todo

* removed @Inject

* linting fix

* todo converted

* more todos
  • Loading branch information
mechanical-turk authored Aug 5, 2024
1 parent 9175b64 commit a9a2496
Show file tree
Hide file tree
Showing 25 changed files with 1,218 additions and 287 deletions.
5 changes: 5 additions & 0 deletions .changeset/metal-tables-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@whatsgood/nexus": minor
---

Introducing subscription sharing
4 changes: 2 additions & 2 deletions packages/nexus/src/auth/authorization-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { NexusConfig } from "@src/nexus-config";

const AUTH_KEY_QUERY_PARAM_NAME = "key";
Expand All @@ -7,7 +7,7 @@ const AUTH_KEY_QUERY_PARAM_NAME = "key";
export class AuthorizationService {
private readonly authKey?: string;

constructor(@inject(NexusConfig) config: NexusConfig) {
constructor(config: NexusConfig) {
this.authKey = config.authKey;
}

Expand Down
6 changes: 5 additions & 1 deletion packages/nexus/src/dependency-injection/container.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import { Container } from "inversify";

export const container = new Container();
export const container = new Container({
autoBindInjectable: true,
skipBaseClassChecks: true,
defaultScope: "Singleton",
});
5 changes: 1 addition & 4 deletions packages/nexus/src/example/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import * as http from "node:http";
import { Nexus } from "@src/nexus";
import { NodeProvider } from "@src/node-provider";
import { CHAIN } from "@src/default-chains";
import { Chain } from "@src/chain";
import { Nexus, NodeProvider, CHAIN, Chain } from "@src/index";

// const llamaRpcNodeProvider = new NodeProvider({
// name: "llama-rpc",
Expand Down
10 changes: 4 additions & 6 deletions packages/nexus/src/http/http-controller.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Logger } from "pino";
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { NexusConfig } from "@src/nexus-config";
import { RpcRequestPayloadSchema } from "@src/rpc-schema";
import type { RpcResponse } from "@src/rpc-response";
Expand All @@ -25,13 +25,11 @@ export class HttpController {
private readonly logger: Logger;

constructor(
@inject(NexusConfig) private readonly config: NexusConfig,
@inject(LoggerFactory) private readonly loggerFactory: LoggerFactory,
@inject(HttpRelayHandler)
private readonly config: NexusConfig,
private readonly loggerFactory: LoggerFactory,
private readonly httpRelayHandler: HttpRelayHandler,
@inject(NexusMiddlewareHandler)
private readonly middlewareHandler: NexusMiddlewareHandler,
@inject(EventBus) private readonly eventBus: EventBus
private readonly eventBus: EventBus
) {
this.logger = this.loggerFactory.get(HttpController.name);
}
Expand Down
3 changes: 1 addition & 2 deletions packages/nexus/src/http/http-relay-handler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import type { NexusRpcContext } from "@src/nexus-rpc-context";
import { NodeEndpointPoolFactory } from "@src/node-endpoint";
import type { NodeRpcResponseFailure } from "@src/node-endpoint/node-rpc-response";
Expand All @@ -15,7 +15,6 @@ import {
@injectable()
export class HttpRelayHandler {
constructor(
@inject(NodeEndpointPoolFactory)
private readonly nodeEndpointPoolFactory: NodeEndpointPoolFactory
) {}

Expand Down
4 changes: 2 additions & 2 deletions packages/nexus/src/logging/logger-factory.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { Logger } from "pino";
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { NexusConfig } from "@src/nexus-config";

@injectable()
export class LoggerFactory {
constructor(@inject(NexusConfig) private readonly config: NexusConfig) {}
constructor(private readonly config: NexusConfig) {}

public get(name: string, options: Record<string, any> = {}): Logger {
// TODO: redact node provider url, and start logging providers directly
Expand Down
7 changes: 2 additions & 5 deletions packages/nexus/src/middleware/nexus-middleware-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Logger } from "pino";
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import { errSerialize } from "@src/utils";
import type { NexusRpcContext } from "@src/nexus-rpc-context";
import { NexusConfig } from "@src/nexus-config";
Expand All @@ -12,10 +12,7 @@ export class NexusMiddlewareHandler {
private readonly middleware: NexusMiddleware[];
private readonly logger: Logger;

constructor(
@inject(NexusConfig) config: NexusConfig,
@inject(LoggerFactory) loggerFactory: LoggerFactory
) {
constructor(config: NexusConfig, loggerFactory: LoggerFactory) {
this.middleware = [...config.middleware, authMiddleware];
this.logger = loggerFactory.get(NexusMiddlewareHandler.name);
}
Expand Down
6 changes: 4 additions & 2 deletions packages/nexus/src/nexus-config/nexus-config-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ export interface NexusConfigOptions {
export class NexusConfigFactory {
private readonly options: NexusConfigOptions;
private readonly envConfig: EnvConfig;
private readonly baseLogger: Logger;
private readonly logger: Logger;

constructor(options?: NexusConfigOptions) {
this.options = options || {};
this.envConfig = getEnvConfig(this.getEnv());
this.logger = this.getLogger();
this.baseLogger = this.getLogger();
this.logger = this.baseLogger.child({ name: NexusConfigFactory.name });
}

private getEnv() {
Expand All @@ -63,7 +65,7 @@ export class NexusConfigFactory {
chains: new Map(uniqueChains.map((chain) => [chain.chainId, chain])),
relay: this.getRelayConfig(),
port: this.getPort(),
logger: this.logger,
logger: this.baseLogger,
middleware: this.getMiddleware(),
authKey: this.getAuthKey(),
});
Expand Down
36 changes: 9 additions & 27 deletions packages/nexus/src/nexus/nexus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,17 @@ import type {
} from "@whatwg-node/server";
import { createServerAdapter } from "@whatwg-node/server";
import type { Logger } from "pino";
import { decorate, inject, injectable } from "inversify";
import { EventEmitter } from "eventemitter3";
import { injectable } from "inversify";
import {
NexusConfig,
NexusConfigFactory,
type NexusConfigOptions,
} from "@src/nexus-config";
import { HttpController } from "@src/http";
import { WsPairHandler, WsRpcServer } from "@src/websockets";
import { WsContextHandler, WsRpcServer } from "@src/websockets";
import { LoggerFactory } from "@src/logging";
import { EventBus } from "@src/events";
import { container } from "@src/dependency-injection";
import { HttpRelayHandler } from "@src/http/http-relay-handler";
import { NodeEndpointPoolFactory } from "@src/node-endpoint";
import { NexusMiddlewareHandler } from "@src/middleware";
import { AuthorizationService } from "@src/auth";

decorate(injectable(), EventEmitter); // TODO: put this somewhere else

export type NexusServerInstance = ServerAdapter<unknown, Nexus>;

Expand All @@ -33,11 +26,11 @@ export class Nexus implements ServerAdapterBaseObject<unknown> {
public readonly on: EventBus["on"];

constructor(
@inject(HttpController) private readonly controller: HttpController,
@inject(WsPairHandler) private readonly wsPairHandler: WsPairHandler,
@inject(LoggerFactory) loggerFactory: LoggerFactory,
@inject(EventBus) eventBus: EventBus,
@inject(NexusConfig) config: NexusConfig
private readonly controller: HttpController,
private readonly wsContextHandler: WsContextHandler,
loggerFactory: LoggerFactory,
eventBus: EventBus,
config: NexusConfig
) {
this.logger = loggerFactory.get(Nexus.name);
this.on = eventBus.on.bind(eventBus);
Expand All @@ -52,8 +45,8 @@ export class Nexus implements ServerAdapterBaseObject<unknown> {
public ws(httpServer: NodeHttpServer) {
const wsServer = container.resolve(WsRpcServer);

wsServer.on("connection", (pair) => {
this.wsPairHandler.handleConnection(pair);
wsServer.on("connection", (context) => {
this.wsContextHandler.handleConnection(context);
});

httpServer.on("upgrade", wsServer.handleUpgrade.bind(wsServer));
Expand All @@ -64,17 +57,6 @@ export class Nexus implements ServerAdapterBaseObject<unknown> {
const config = nexusConfigFactory.getNexusConfig();

container.bind(NexusConfig).toConstantValue(config);
container.bind(HttpController).toSelf().inSingletonScope();
container.bind(Nexus).toSelf().inSingletonScope();
container.bind(LoggerFactory).toSelf().inSingletonScope();
container.bind(WsPairHandler).toSelf().inSingletonScope();
container.bind(WsRpcServer).toSelf().inSingletonScope();
container.bind(EventBus).toSelf().inSingletonScope();
container.bind(HttpRelayHandler).toSelf().inSingletonScope();
container.bind(NodeEndpointPoolFactory).toSelf().inSingletonScope();
container.bind(NexusMiddlewareHandler).toSelf().inSingletonScope();
container.bind(EventEmitter).toSelf().inSingletonScope();
container.bind(AuthorizationService).toSelf().inSingletonScope();

const nexus = container.resolve(Nexus);

Expand Down
10 changes: 6 additions & 4 deletions packages/nexus/src/node-endpoint/node-endpoint-pool-factory.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { inject, injectable } from "inversify";
import { injectable } from "inversify";
import type { Chain } from "@src/chain";
import type { NodeProvider } from "@src/node-provider";
import { LoggerFactory } from "@src/logging";
Expand All @@ -15,8 +15,8 @@ export class NodeEndpointPoolFactory {
public readonly ws: Map<Chain, NodeEndpointPool>;

constructor(
@inject(NexusConfig) private readonly config: NexusConfig,
@inject(LoggerFactory) private readonly loggerFactory: LoggerFactory // TODO: this should not be a property.
private readonly config: NexusConfig,
private readonly loggerFactory: LoggerFactory
) {
this.nodeProviders = config.nodeProviders;
this.http = this.createChainToEndpointPoolMap("http");
Expand Down Expand Up @@ -63,7 +63,9 @@ export class NodeEndpointPoolFactory {
nodeProvider,
})
),
loggerFactory: this.loggerFactory,
logger: this.loggerFactory.get(NodeEndpointPool.name, {
chain,
}),
})
);
}
Expand Down
5 changes: 2 additions & 3 deletions packages/nexus/src/node-endpoint/node-endpoint-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import type { Logger } from "pino";
import type { Chain } from "@src/chain";
import type { RpcRequestPayloadType } from "@src/rpc-schema";
import { generatorOf, take, weightedShuffleGenerator } from "@src/utils";
import type { LoggerFactory } from "@src/logging";
import type { NexusConfig } from "@src/nexus-config";
import type { RelayConfig } from "./relay-config";
import {
Expand All @@ -28,12 +27,12 @@ export class NodeEndpointPool {
chain: Chain;
config: NexusConfig;
nodeEndpoints: NodeEndpoint[];
loggerFactory: LoggerFactory;
logger: Logger;
}) {
this.chain = params.chain;
this.nodeEndpoints = params.nodeEndpoints;
this.config = params.config.relay;
this.logger = params.loggerFactory.get(NodeEndpointPool.name);
this.logger = params.logger;

if (this.config.failure.kind === "cycle-requests") {
this.maxRelayAttempts = this.config.failure.maxAttempts;
Expand Down
57 changes: 57 additions & 0 deletions packages/nexus/src/rpc-methods/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { z } from "zod";
import {
RpcRequestPayloadSchema,
RpcResponseSuccessPayloadSchema,
} from "@src/rpc-schema";

export const eth_subscribe_newHeads = RpcRequestPayloadSchema.extend({

Check warning on line 7 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribe_newHeads' is not in camel case
method: z.literal("eth_subscribe"),
params: z.tuple([
z.literal("newHeads"),
// TODO: can this method have more params?
]),
});

export const eth_subscribe_newPendingTransactions =

Check warning on line 15 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribe_newPendingTransactions' is not in camel case
RpcRequestPayloadSchema.extend({
method: z.literal("eth_subscribe"),
params: z.tuple([
z.literal("newPendingTransactions"),
// TODO: can this method have more params?
]),
});

export const eth_subscribe = z.union([

Check warning on line 24 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribe' is not in camel case
eth_subscribe_newHeads,

Check warning on line 25 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribe_newHeads' is not in camel case
eth_subscribe_newPendingTransactions,

Check warning on line 26 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribe_newPendingTransactions' is not in camel case
]);

export type EthSubscribeRpcPayloadType = z.infer<typeof eth_subscribe>;

Check warning on line 29 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribe' is not in camel case
export type EthSubscribeRpcParamsType = EthSubscribeRpcPayloadType["params"];

export const eth_unsubscribe = RpcRequestPayloadSchema.extend({

Check warning on line 32 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_unsubscribe' is not in camel case
method: z.literal("eth_unsubscribe"),
params: z.tuple([z.string()]),
});

export const eth_subscribeSuccessResponsePayloadSchema =

Check warning on line 37 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribeSuccessResponsePayloadSchema' is not in camel case
RpcResponseSuccessPayloadSchema.extend({
result: z.string(),
});

export type EthSubscribeSuccessResponsePayloadType = z.infer<
typeof eth_subscribeSuccessResponsePayloadSchema

Check warning on line 43 in packages/nexus/src/rpc-methods/index.ts

View workflow job for this annotation

GitHub Actions / ci / Continuous Integration

Identifier 'eth_subscribeSuccessResponsePayloadSchema' is not in camel case
>;

export const eth_subscriptionPayloadSchema = z.object({
jsonrpc: z.literal("2.0"),
method: z.literal("eth_subscription"),
params: z.object({
subscription: z.string(),
result: z.unknown(), // TODO: do better narrowing here
}),
});

export type EthSubscriptionPayloadType = z.infer<
typeof eth_subscriptionPayloadSchema
>;
Loading

0 comments on commit a9a2496

Please sign in to comment.