Skip to content

Commit

Permalink
optimize ServiceInstancePriorityLoadBalancer
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Nov 22, 2023
1 parent 3070308 commit 0c1c4ed
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,41 @@

package org.apache.linkis.rpc.conf;

import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.apache.linkis.rpc.BaseRPCSender;
import org.apache.linkis.rpc.constant.RpcConstant;
import org.apache.linkis.server.BDPJettyServerHelper;
import org.apache.linkis.server.Message;
import org.apache.linkis.server.security.SSOUtils$;
import org.apache.linkis.server.security.SecurityFilter$;

import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;

import java.util.Enumeration;

import org.springframework.stereotype.Component;
import scala.Tuple2;

import feign.RequestInterceptor;
import feign.RequestTemplate;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;


@Configuration
public class FeignConfig implements RequestInterceptor {
@Component
public class FeignRequestInterceptor implements RequestInterceptor {

@Override
public void apply(RequestTemplate requestTemplate) {
ServletRequestAttributes attributes =
(ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (null != attributes) {
HttpServletRequest request = attributes.getRequest();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String name = headerNames.nextElement();
String value = request.getHeader(name);
if (name.equalsIgnoreCase("content-length")) {
continue;
}
requestTemplate.header(name, value);
}
requestTemplate.header(
RpcConstant.LINKIS_LOAD_BALANCER_TYPE, RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC);
Map<String, Collection<String>> headers = new HashMap<>(requestTemplate.headers());
headers.put(
RpcConstant.LINKIS_LOAD_BALANCER_TYPE, Arrays.asList(RpcConstant.LINKIS_LOAD_BALANCER_TYPE_RPC));
Tuple2<String, String> userTicketKV =
SSOUtils$.MODULE$.getUserTicketKV(SecurityFilter$.MODULE$.OTHER_SYSTEM_IGNORE_UM_USER());
requestTemplate.header(userTicketKV._1, userTicketKV._2);
headers.put(userTicketKV._1, Arrays.asList(userTicketKV._2));
try {
String body = new String(requestTemplate.body(), org.apache.linkis.common.conf.Configuration.BDP_ENCODING().getValue());
Message message = BDPJettyServerHelper.gson().fromJson(body, Message.class);
headers.put(RpcConstant.FIXED_INSTANCE, Arrays.asList(BaseRPCSender.getFixedInstanceInfo(message)));
requestTemplate.headers(headers);
} catch (UnsupportedEncodingException e) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.context.annotation.Configuration;

@Configuration
@LoadBalancerClients(defaultConfiguration = {LinkisLoadBalancerClientConfiguration.class})

public class GatewayLoadBalancerConfiguration {}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.linkis.rpc.loadbalancer;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

@Configuration
@LoadBalancerClients(defaultConfiguration = {LinkisLoadBalancerClientConfiguration.class})
public class LinkisLoadBalancerClientConfiguration {
@Bean
public ReactorLoadBalancer<ServiceInstance> customLoadBalancer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,10 @@ && isRPC(linkisLoadBalancerType)
serviceInstanceResponse = getInstanceResponse(instances, clientIp);
if (null == serviceInstanceResponse) {
try {
this.wait(5000L);
eurekaClientCacheManualRefresher.refresh();
Thread.sleep(5000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
break;
}
}

Expand All @@ -144,7 +141,7 @@ private Response<ServiceInstance> getInstanceResponse(
List<ServiceInstance> instances, String clientIp) {
if (instances.isEmpty()) {
log.warn("No servers available for service: " + serviceId);
return new EmptyResponse();
return null;
}
int pos = this.position.incrementAndGet() & Integer.MAX_VALUE;

Expand All @@ -165,15 +162,10 @@ private Response<ServiceInstance> getInstanceResponse(
break;
}
}
// if (null == chooseInstance) {
// throw new NoInstanceExistsException(
// LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorCode(),
// MessageFormat.format(
// LinkisRpcErrorCodeSummary.APPLICATION_IS_NOT_EXISTS.getErrorDesc(),
// clientIp,
// serviceId));
// } else {
return new DefaultResponse(chooseInstance);
// }
if (null == chooseInstance) {
return null;
} else {
return new DefaultResponse(chooseInstance);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@

package org.apache.linkis.rpc

import org.apache.commons.lang3.StringUtils
import org.apache.linkis.DataWorkCloudApplication
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.exception.WarnException
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.protocol.Protocol
import org.apache.linkis.rpc.conf.DynamicFeignClient
import org.apache.linkis.rpc.conf.RPCConfiguration.{
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX,
BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX,
BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY
}
import org.apache.linkis.rpc.conf.RPCConfiguration.{BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_FREE_TIME_MAX, BDP_RPC_SENDER_ASYN_CONSUMER_THREAD_MAX, BDP_RPC_SENDER_ASYN_QUEUE_CAPACITY}
import org.apache.linkis.rpc.constant.RpcConstant
import org.apache.linkis.rpc.interceptor._
import org.apache.linkis.rpc.transform.{RPCConsumer, RPCProduct}
import org.apache.linkis.server.Message

import java.util

import scala.concurrent.duration.Duration
import scala.runtime.BoxedUnit

Expand Down Expand Up @@ -74,6 +71,11 @@ private[rpc] class BaseRPCSender extends Sender with Logging {

private[rpc] def getApplicationName = name


def getSenderInstance(): String = {
null
}

protected def newRPC: RPCReceiveRemote = {
getDynamicFeignClient.getFeignClient(classOf[RPCReceiveRemote], name)
}
Expand All @@ -87,6 +89,9 @@ private[rpc] class BaseRPCSender extends Sender with Logging {

override def ask(message: Any): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
if (StringUtils.isNotBlank(getSenderInstance())) {
BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
}
BaseRPCSender.addInstanceInfo(msg.getData)
val response = getRPC.receiveAndReply(msg)
RPCConsumer.getRPCConsumer.toObject(response)
Expand All @@ -95,13 +100,19 @@ private[rpc] class BaseRPCSender extends Sender with Logging {
override def ask(message: Any, timeout: Duration): Any = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
msg.data("duration", timeout.toMillis)
if (StringUtils.isNotBlank(getSenderInstance())) {
BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
}
BaseRPCSender.addInstanceInfo(msg.getData)
val response = getRPC.receiveAndReplyInMills(msg)
RPCConsumer.getRPCConsumer.toObject(response)
}

private def sendIt(message: Any, op: Message => Message): Unit = execute(message) {
val msg = RPCProduct.getRPCProduct.toMessage(message)
if (StringUtils.isNotBlank(getSenderInstance())) {
BaseRPCSender.addFixedInstanceInfo(msg.getData, getSenderInstance())
}
BaseRPCSender.addInstanceInfo(msg.getData)
RPCConsumer.getRPCConsumer.toObject(op(msg)) match {
case w: WarnException => logger.warn("RPC requests an alarm!(RPC请求出现告警!)", w)
Expand Down Expand Up @@ -172,4 +183,16 @@ private[rpc] object BaseRPCSender extends Logging {
ServiceInstance(name, instance)
}

def addFixedInstanceInfo(map: util.Map[String, Object], fixedInstance: String): Unit = {
map.put(RpcConstant.FIXED_INSTANCE, fixedInstance)
}

def getFixedInstanceInfo(message: Message): String = {
if (null != message && null != message.getData) {
message.getData.getOrDefault(RpcConstant.FIXED_INSTANCE, null).asInstanceOf[String]
} else {
null
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,11 @@ private[rpc] class SpringMVCRPCSender private[rpc] (
s"RPCSender(${serviceInstance.getApplicationName})"
} else s"RPCSender($getApplicationName, ${serviceInstance.getInstance})"

override def getSenderInstance(): String = {
if (null != serviceInstance) {
serviceInstance.getInstance
} else {
null
}
}
}

0 comments on commit 0c1c4ed

Please sign in to comment.