Skip to content

Commit

Permalink
fix: add direct-channel again (as test file)
Browse files Browse the repository at this point in the history
  • Loading branch information
shifty11 committed Mar 5, 2024
1 parent bd46f32 commit c0fef82
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 14 deletions.
3 changes: 3 additions & 0 deletions protocol/core/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ RUN yarn install
# Copy source files
COPY ./src ./src

# Override the entrypoint to use the docker entrypoint
COPY ./src/entrypoint_docker.ts ./src/entrypoint.ts

# Build the docker core
RUN yarn run build:binaries

Expand Down
1 change: 1 addition & 0 deletions protocol/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"scripts": {
"build": "rimraf dist && yarn format && tsc",
"build:binaries": "yarn build && rimraf out && pkg package.json",
"build:docker": "docker build -t kyvejs/protocol .",
"test": "jest",
"format": "prettier --write . --ignore-path ./.prettierignore",
"doc": "typedoc"
Expand Down
Empty file added protocol/core/src/entrypoint.ts
Empty file.
10 changes: 10 additions & 0 deletions protocol/core/src/entrypoint_docker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// This is the entry point for the protocol if used with grpc (within docker)
import { ProtocolConfig } from "./types";
import { Validator } from "./index";

const config: Partial<ProtocolConfig> = {
host: process.env.HOST || "locahost",
port: parseInt(process.env.PORT || "50051"),
useGrpc: true,
};
new Validator(config).bootstrap();
16 changes: 7 additions & 9 deletions protocol/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import {
waitForUploadInterval,
} from "./methods";

import { ICacheProvider, IMetrics, IRuntime } from "./types";
import { ICacheProvider, IMetrics, IRuntime, ProtocolConfig } from "./types";
import { IDLE_TIME, sleep, standardizeError } from "./utils";

import { SupportedChains } from "@kyvejs/sdk/dist/constants";
Expand Down Expand Up @@ -163,12 +163,11 @@ export class Validator {
* runtime class here in order to run the
*
* @method constructor
* @param host the host of the protocol core
* @param port the port of the protocol core
* @param {ProtocolConfig} protocolConfig that defines the runtime
*/
constructor(host: string, port: number) {
constructor(protocolConfig: Partial<ProtocolConfig>) {
// set provided runtime
this.runtime = new GrpcRuntime(host, port);
this.runtime = new GrpcRuntime(protocolConfig);

// set @kyvejs/protocol version
this.protocolVersion = protocolVersion;
Expand Down Expand Up @@ -350,7 +349,6 @@ export * from "./types";
// export utils
export * from "./utils";

new Validator(
process.env.HOST || "locahost",
parseInt(process.env.PORT || "50051")
).bootstrap();
// entrypoint import has to be here because of circular dependency
// overwrite the entrypoint for docker builds
import "./entrypoint";
12 changes: 9 additions & 3 deletions protocol/core/src/runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
RuntimeServiceClient,
DataItem,
} from "../proto/kyverdk/runtime/v1/runtime";
import { IRuntime } from "../types";
import { IRuntime, ProtocolConfig } from "../types";
import { ClientOptions } from "@grpc/grpc-js";

// config is a serialized string
Expand All @@ -24,13 +24,19 @@ export default class GrpcRuntime implements IRuntime {

public config!: IConfig;

constructor(host: string, port: number) {
constructor(protocolConfig: Partial<ProtocolConfig>) {
const options: Partial<ClientOptions> = {
"grpc.max_send_message_length": maxMessageSize,
"grpc.max_receive_message_length": maxMessageSize,
};
if (!protocolConfig.useGrpc) {
if (protocolConfig.channelOverride === undefined) {
throw new Error("protocolConfig.channelOverride is undefined");
}
options.channelOverride = protocolConfig.channelOverride;
}
this.grpcClient = new RuntimeServiceClient(
`${host || "localhost"}:${port || 50051}`,
`${protocolConfig.host || "localhost"}:${protocolConfig.port || 50051}`,
grpc.credentials.createInsecure(),
options
);
Expand Down
8 changes: 8 additions & 0 deletions protocol/core/src/types/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import * as grpc from "@grpc/grpc-js";

export interface ProtocolConfig {
host: string;
port: number;
useGrpc: boolean;
channelOverride: grpc.Channel | undefined;
}
1 change: 1 addition & 0 deletions protocol/core/src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./interfaces";
export * from "./metrics";
export * from "./config";
2 changes: 1 addition & 1 deletion protocol/core/test/checks/is_pool_active.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { isPoolActive, setupMetrics } from "../../src/methods";
import { register } from "prom-client";
import { newTestValidator } from "../mocks/runtime.mock";
import { genesis_pool } from "../mocks/constants";
import { PoolStatus } from "../../../types/lcd/kyve/pool/v1beta1/pool";
import { PoolStatus } from "@kyvejs/types/lcd/kyve/pool/v1beta1/pool";

/*
Expand Down
158 changes: 158 additions & 0 deletions protocol/core/test/mocks/grpcChannel.mock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import * as grpc from "@grpc/grpc-js";
import { Metadata, ServiceError } from "@grpc/grpc-js";
import { ChannelRef } from "@grpc/grpc-js/build/src/channelz";
import { ServerSurfaceCall } from "@grpc/grpc-js/build/src/server-call";
import { Call, MessageContext } from "@grpc/grpc-js/build/src/call-interface";

import * as runtime from "../../src/proto/kyverdk/runtime/v1/runtime";
import * as Buffer from "buffer";

export class DirectCall implements Call {
private listener: grpc.InterceptingListener;
private method: any;
private requestType: any;
private responseType: any;

constructor(method: any, requestType: any, responseType: any) {
this.listener = {
onReceiveMessage(_message: any) {},
onReceiveStatus(_status: grpc.StatusObject) {},
onReceiveMetadata(_metadata: Metadata) {},
};
this.method = method;
this.requestType = requestType;
this.responseType = responseType;
}

cancelWithStatus(_status: grpc.status, _details: string): void {}

getCallNumber(): number {
return 0;
}

getPeer(): string {
return "";
}

halfClose(): void {}

sendMessageWithContext(_context: MessageContext, message: Buffer): void {
const payload = this.requestType.decode(message);

// calls the grpc method directly without using grpc
this.method(
{ request: payload },
(error: ServiceError | null, response: any) => {
if (error) {
this.listener.onReceiveStatus(error);
return;
}

const value = this.responseType.create(response);
const msg = Buffer.Buffer.from(
this.responseType.encode(value).finish()
);

this.listener.onReceiveMessage(msg);
this.listener.onReceiveStatus({
code: grpc.status.OK,
details: "",
metadata: new Metadata(),
});
}
);
}

setCredentials(_credentials: grpc.CallCredentials): void {}

start(_metadata: Metadata, listener: grpc.InterceptingListener): void {
this.listener = listener;
}

startRead(): void {}
}

/*
* DirectChannel can be used to call grpc methods directly without using grpc.
* This is used to run the runtime service in the same process as the client.
* It emulates the legacy mode of the runtime service without grpc/docker.
*
* Requirements:
* - The integration must be written in typescript as well
* - The integration must provide the server method to the protocol
* - The grpc/proto naming convention must be the following:
* - Method name: <methodName>
* - Request type: <MethodName>Request
* - Response type: <MethodName>Response
*
* Example:
* ```proto
* rpc GetRuntimeName(GetRuntimeNameRequest) returns (GetRuntimeNameResponse);
* ```
* ```typescript
* getRuntimeName(request: GetRuntimeNameRequest): GetRuntimeNameResponse {}
* ```
*/
// @ts-ignore
export class DirectChannel implements grpc.Channel {
private services: {};

constructor(services: {}) {
this.services = services;
}

close(): void {}

createCall(
call: string,
_deadline: grpc.Deadline,
_host: string | null | undefined,
_parentCall: ServerSurfaceCall | null,
_propagateFlags: number | null | undefined
): Call {
// Extract the method name from the path
// Example: /kyverdk.runtime.v1.RuntimeService/GetRuntimeName -> GetRuntimeName
const callName = call.split("/")[2];

// Convert the method name to camelCase
const camelCaseCallName =
callName.charAt(0).toLowerCase() + callName.slice(1);

// Get the request type from the runtime service
// Example: GetRuntimeName -> GetRuntimeNameRequest
// @ts-ignore
const requestType: any = runtime[`${callName}Request`];

// Get the response type from the runtime service
// Example: GetRuntimeName -> GetRuntimeNameResponse
// @ts-ignore
const responseType: any = runtime[`${callName}Response`];

// @ts-ignore
const method = this.services[camelCaseCallName];

// console.debug(
// `Call method ${method.name} ${callName}Request -> ${callName}Response`
// );

return new DirectCall(method, requestType, responseType);
}

getChannelzRef(): ChannelRef {
return { id: 0, kind: "channel", name: "" };
}

getConnectivityState(_tryToConnect: boolean): grpc.connectivityState {
return grpc.connectivityState.READY;
}

getTarget(): string {
return "";
}

watchConnectivityState(
_currentState: grpc.connectivityState,
_deadline: Date | number,
_callback: (error?: Error) => void
): void {}
}
4 changes: 3 additions & 1 deletion protocol/core/test/mocks/runtime.mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
PrevalidateDataItemResponse,
RuntimeServiceServer,
} from "../../src/proto/kyverdk/runtime/v1/runtime";
import { DirectChannel } from "./grpcChannel.mock";

export const TestRuntime = jest.fn().mockImplementation(() => {
return {
Expand Down Expand Up @@ -52,7 +53,8 @@ export const newTestValidator = (): Validator => {
};
const config: Partial<ProtocolConfig> = {
useGrpc: false,
services: runtimeServices,
// @ts-ignore
channelOverride: new DirectChannel(runtimeServices),
};
const v = new Validator(config);
v["runtime"] = runtime;
Expand Down

0 comments on commit c0fef82

Please sign in to comment.