-
Notifications
You must be signed in to change notification settings - Fork 0
RPC Protocol
RPC Protocol is a built-in protocol allowing transferring any objects serialized as JSON in RPC style.
This means that for each transfer sent you can receive response.
NOTE: For JSON serialization used the Jackson library. Github
The RPC protocol works the same on both, the server side and the client side, so server also can send request to the client and get response.
For sending request we need to define interface with methods on each request (we are call it origin). It can look like this.
interface MyFirstOrigin {
void sendString(String s);
void sendNumbers(int[] arr);
}
And to receive and handle requests, we need define on the other side a class with methods for each request (we are call it endpoint).
class MyFirstEndpoint {
public void sendString(String s) {
System.out.println(s);
}
public void sendNumbers(int[] arr) {
System.out.println(Arrays.toString(arr));
}
}
So how the protocol can guess which method and which class will be called?
We need to define identificators for class and each method (we are call it path).
For this is used annotatins com.github.tix320.sonder.api.common.rpc.Origin and com.github.tix320.sonder.api.common.rpc.Endpoint.
@Origin("firstRPC")
interface MyFirstOrigin {
@Origin("sendString")
void sendString(String s);
@Origin // you can omit the path, in which case the method name will be taken i.e. "sendNumbers"
void sendNumbers(int[] arr);
}
@Endpoint("firstRPC")
class MyFirstEndpoint {
@Endpoint("sendString")
public void sendString(String s) {
System.out.println(s);
}
@Endpoint // The same logic here
public void sendNumbers(int[] arr) {
System.out.println(Arrays.toString(arr));
}
}
To send response from endpoint just need change the return type:
@Endpoint("sendString")
public int sendString(String s) {
System.out.println(s);
return s.length();
}
In origin side there is small complexity due the asynchronous communication. Method must return not the exact type of data, but the MonoObservable object.
@Origin("sendString")
MonoObservable<Integer> sendString(String s);
Complete example:
MyFirstOrigin myOrigin = ...;
myOrigin.sendString("my_first_rpc_call").subscribe(length -> {
System.out.println(length);
assert length == 17;
});
The protocol also allows subscription to any resource. By invoking some endpoint you will subscribe to stream of objects.
In that case both sides must annotate method with com.github.tix320.sonder.api.common.rpc.Subscription annotation and make return type Observable.
@Origin
@Subscription
Observable<Integer> subscribeToNews();
import com.github.tix320.kiwi.api.reactive.publisher.Publisher;
@Endpoint("sendString")
@Subscription
public Observable<Integer> subscribeToNews() {
Publisher<Integer> publisher = Publisher.simple();
CompletableFuture.runAsync(() -> {
for (int i = 0; i < 5; i++) {
publisher.publish(i);
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
publisher.complete();
});
return publisher.asObservable();
}
Usage:
MyFirstOrigin myOrigin = ...;
myOrigin.subscribeToNews().subscribe(number -> {
System.out.println(number );
});
// Output 1 , 2, 3, 4, 5 with 1sec interval
On the server side to send request to certain client, you need specify clientId in method call, adding parameter with type long and annotate it with @ClientId.
@Origin
MonoObservable<Integer> sendString(String s, @ClientId long clientId);
In endpoints you can inject source clientId similary.
@Endpoint
public int sendString(String s, @ClientId long sourceClientId) {
System.out.printf("Received from %s: %s", sourceClientId, s);
return s.length();
}
The @ClientId parameter is not part of transfer content in both cases, so we are call it extra paramter.
You can define your own custom extra parameters by implementing interfaces com.github.tix320.sonder.api.common.rpc.extra.OriginExtraArgExtractor and com.github.tix320.sonder.api.common.rpc.extra.EndpointExtraArgInjector for origins and endpoints respectievly.
Below is implementation of ClientId extra param for endpoint:
import java.lang.reflect.Method;
import com.github.tix320.sonder.api.common.communication.Headers;
import com.github.tix320.sonder.api.common.rpc.extra.ClientID;
import com.github.tix320.sonder.api.common.rpc.extra.EndpointExtraArgInjector;
import com.github.tix320.sonder.api.common.rpc.extra.ExtraParamDefinition;
public final class EndpointClientIdInjector implements EndpointExtraArgInjector<ClientID, Long> {
@Override
public ExtraParamDefinition<ClientID, Long> getParamDefinition() {
return new ExtraParamDefinition<>(ClientID.class, long.class, false);
}
@Override
public Long extract(Method method, ClientID annotation, Headers headers) {
return headers.getNonNullLong(Headers.SOURCE_ID);
}
}
To create RPC protocol instance you need com.github.tix320.sonder.api.common.rpc.RPCProtocolBuilder.
But for server and client instances builders are different, so you can get builder by SonderClient.getRPCProtocolBuilder() and SonderServer.getRPCProtocolBuilder().
RPCProtocolBuilder builder = SonderServer.getRPCProtocolBuilder();
Methods of builder:
- registerOriginInterfaces(Class<?>... classes) - Register origin interfaces annotated with @Origin.
- registerEndpointClasses(Class<?>... classes) - Register endpoint classes annotated with @Endpoint.
- registerEndpointInstances(List instances) - Register already created endpoint class instances.
- scanOriginPackages(String... packagesToScan) - Scan packages and retrieve origin interfaces annotated with @Origin.
- scanEndpointPackages(String... packagesToScan) - Scan packages and retrieve endpoint classes annotated with @Endpoint.
- scanEndpointPackages(List packagesToScan, Function<Class<?>, Object> factory) - The same as previous with instance custom factory.
- processOriginInstances(Consumer<Map<Class<?>, Object>> consumer) - Function to apply already registered origin interface instances. So you need call this after all origin interface registrations.
- registerOriginExtraArgExtractor(OriginExtraArgExtractor, ?>... extractors) - Register extra arg extractor for origins specified above Extra Parameters.
- registerEndpointExtraArgInjector(EndpointExtraArgInjector, ?>... injectors) - Register extra arg injector for endpoints specified above Extra Parameters.
Example:
RPCProtocol protocol = SonderServer.getRPCProtocolBuilder()
.scanOriginPackages("my.origin.package")
.scanEndpointPackages(List.of("my.endpoint.package"), clazz -> createInstanceOfClass(clazz))
.processOriginInstances(System.out::println)
.registerEndpointExtraArgInjector(new MyExtraArgInjector())
.build();
SonderServer sonderServer = SonderServer.forAddress(new InetSocketAddress(8888))
.registerProtocol(protocol)
.build();