Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rpc component of dubbo #3833

Open
wants to merge 9 commits into
base: 2023.x
Choose a base branch
from

Conversation

CheerWinLi
Copy link

@CheerWinLi CheerWinLi commented Aug 24, 2024

Describe what this PR does / why we need it

about the thoughts on issue #3750

Describe how you did it

不同RPC协议的本质区别在于序列化和数据格式,在原有的springcloud的REST/HTTP协议上添加dubbo(triple)和grpc协议,sca体系中,微服务之间的调用多为feign调用,为了不改变sca体系中client跟sever原有的rest风格(使用springboot)开发习惯,feign进行http请求关键的部分在Feign.Client中,我们实现Feign.Client,在execute方法中,建立与server的netty连接,通过这个netty我们将http请求涉及的(url,body,method,headers,封装为HttpMetadata对象)通过rpc协议(triple,grpc)进行传输,在server侧根据获取到的httpMetadata对象构建http请求,交由server侧的spring调用链(dispatcherServlet)进行处理

The fundamental differences between different RPC (Remote Procedure Call) protocols lie in their serialization and data formats. The existing Spring Cloud framework, which primarily uses the REST/HTTP protocols, can be extended by adding support for protocols like Dubbo (Triple) and gRPC, In the SCA system, the invocation between microservices is mostly done through Feign calls. In order to not change the original REST style (using SpringBoot) development habits of the client and server in the SCA system, the key part of Feign's HTTP request is in the Feign.Client. We implement the Feign.Client, and in the execute method, we establish a Netty connection with the server. Through this Netty, we encapsulate the HTTP request-related information (URL, body, method, headers) into an HttpMetadata object, and then transmit it through the RPC protocol (Triple, gRPC). On the server side, we construct the HTTP request based on the obtained HttpMetadata object and pass it to the Spring call chain for processing(dispatcherServlet)

SpringCloudAlibaba介绍

Spring Cloud Alibaba(下文简称为SCA) 致力于提供微服务开发的一站式解决方案。此项目包含开发分布式应用服务的必需组件,方便开发者通过 Spring Cloud 编程模型轻松使用这些组件来开发分布式应用服务。 依托 SCA,您只需要添加一些注解和少量配置,就可以将 Spring Cloud 应用接入阿里分布式应用解决方案,通过阿里中间件来迅速搭建分布式应用系统。

SpringCloudAlibaba-rpc-starter介绍:

为 SCA 提供业内主流的rpc(dubbo/grpc)接入能力

方案选型:

在SCA体系中,服务的producer跟consumer使用http协议进行通信(一般是openfeign),与

在java体系中,业内主流的rpc框架底层大多数使用基于netty的tcp协议进行传输,而不同rpc协议之间的本质区别在于它们的数据格式跟序列化方式的不同

基于这样的认知,我们在producer侧调用netty.bind,同时在producer进行服务注册的时候将对应的rpc.netty.port添加到注册中心的metadata上


@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(prefix = RpcProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class RpcRegistryConfiguration {

	@Autowired
	private NacosRegistration nacosRegistration;
	@Autowired
	private RpcProperties rpcProperties;

	@PostConstruct
	public void init() {
		Map<String, String> metadata = new HashMap<>();
		metadata.put(RpcProperties.NETTY_PORT_PREFIX, String.valueOf(rpcProperties.getPort()));
		nacosRegistration.getMetadata().putAll(metadata);
	}

	@Bean
	@ConditionalOnMissingBean
	public RpcNettyServerListener initRpcNettyServerListener() {
		return new RpcNettyServerListener();
	}
}

producer跟consumer之间的通信为了适应sca体系,使用的仍是openfeign调用

在consumer侧对Feign.Client进行实现,对execute方法进行重写

其参数Request中包含了一个http请求的基本参数(method,url,body,headers),将这些封装到HttpMetadata中

@Override
	public Response execute(Request request, Request.Options options) {
		String url = urlResolver.resolveOriginalUrl(loadBalancerClient, request.url());
		this.client = urlResolver.getClient(url);
		HttpMetadata httpMetadata = initHttpMetadata(
				urlResolver.getPathFromUrl(request.url()),
				request.httpMethod().name(),
				request.headers(),
				request.body()
		);
		HttpRpcResponse httpRpcResponse = null;
		try {
			CompletableFuture<Object> future = client.request(httpMetadata);
			httpRpcResponse = (HttpRpcResponse) future.get();
		}
		catch (RemotingException | InterruptedException | ExecutionException e) {
			throw new RuntimeException(e);
		}
		return Response.builder()
				.status(httpRpcResponse.getStatusCode())
				.reason(httpRpcResponse.getReasonPhrase())
				.headers(httpRpcResponse.getHeaders())
				.body(httpRpcResponse.getBody())
				.request(request)
				.build();
	}

同时consumer根据Feign注解中的value值从loadbalance中获取到producer的服务信息,从中解析出rpc.netty.port,进行consumer的netty.connet,完成了netty的连接

public String resolveOriginalUrl(LoadBalancerClient loadBalancerClient, String url) {
		URL result = null;
		try {
			result = new URL(url);
		}
		catch (MalformedURLException e) {
			throw new RuntimeException(e);
		}
		String serviceId = result.getHost();
		ServiceInstance choose = loadBalancerClient.choose(serviceId);
		int nettyPort = Integer.parseInt(choose.getMetadata().get(RpcProperties.NETTY_PORT_PREFIX));
		return "exchange://" + choose.getHost() + ":" + nettyPort;
	}

	private void initLocalClient(String url) {

		org.apache.dubbo.common.URL targetUrl = org.apache.dubbo.common.URL.valueOf(url);
		try {
			clientMap.put(url, Exchangers.connect(targetUrl, new ExchangeHandlerDispatcher() {
						@Override
						public void received(Channel channel, Object message) {
							super.received(channel, message);
						}
					}
			));
		}
		catch (RemotingException e) {
			throw new RuntimeException(e);
		}
	}

这时只需要让netty的encoderHandler跟decoderHandler对应上rpc协议即可,所以方案中选用了Dubbo的Exchanger,默认传输的协议是Dubbo,使用的是Hessian2序列化。

将上述封装好的HttpMetadata通过netty传输到producer侧,根据httpMetadata的参数重新构建出HttpServletRequest,通过DispacherServlet进行Spring服务链的调用,完成服务通信

private void initLocalServer(String host, int port) {
		URL url = URL.valueOf("exchange://" + host + ":" + port);
		try {
			this.server = Exchangers.bind(url, new ExchangeHandlerAdapter(FrameworkModel.defaultModel()) {
						@Override
						public CompletableFuture<Object> reply(ExchangeChannel channel, Object msg) {
							HttpMetadata httpMetadata = (HttpMetadata) msg;
							String url = httpMetadata.getUrl();
							System.out.println(url);
							MockHttpServletRequest request = MockHttpServletRequestConverter.getMockHttpServletRequest(httpMetadata, url);
							MockHttpServletResponse response = new MockHttpServletResponse();
							try {
								dispatcherServlet.service(request, response);
							}
							catch (ServletException | IOException e) {
								throw new RuntimeException(e);
							}
							HttpRpcResponse httpRpcResponse = new HttpRpcResponse();
							httpRpcResponse.setStatusCode(response.getStatus());
							httpRpcResponse.setBody(response.getContentAsByteArray());
							httpRpcResponse.setReasonPhrase(response.getErrorMessage());
							httpRpcResponse.setHeaders(convertHeaders(response));
							return CompletableFuture.completedFuture(httpRpcResponse);
						}
					}
			);
		}
		catch (RemotingException e) {
			throw new RuntimeException(e);
		}
	}

通过这种方式,在producer跟consumer都属于sca体系下实现了rpc协议的接入

@CLAassistant
Copy link

CLAassistant commented Aug 24, 2024

CLA assistant check
All committers have signed the CLA.

@yuluo-yx yuluo-yx added area/dubbo Tagged as dubbo-related issue/pr area/spring cloud area/glcc glcc 赛题 labels Aug 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/dubbo Tagged as dubbo-related issue/pr area/glcc glcc 赛题 area/spring cloud
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants