Skip to content

Commit

Permalink
[issue #10148]Replace sync forward request with async request (#10158)
Browse files Browse the repository at this point in the history
* Replace sync forward request with async request in DistroFilter. issue #10148

* extract method for config default headers

* add env switch for async distro forward.

* Fixed code review problems:
1. Move async forward switch from sys module to naming module.
2. use nacos code style to format code.

* Fixed nacos code checkstyle:
1. one import per Class
2. add javadoc

* In order to avoid additional overhead, move switch from GlobalConfig to ClientConfig and cache the env switch.

* Move switch from ClientConfig to DistroConfig.

* Removed unused import.

* Add test for async forward for DistroFilter

* Add license

* rename test method name

* Should enable async forward

* fixed test

* set async forward request switch to true in test

* fixed test: create and set property with MockEnvironment

* fixed check style

* move MockEnvironment init to BeforeClass

* add setter for asyncForwardRequest switch
  • Loading branch information
yuyijq authored Apr 20, 2023
1 parent bbf1bd8 commit a273705
Show file tree
Hide file tree
Showing 11 changed files with 362 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,29 @@ public <T> void postForm(String url, Header header, Map<String, String> bodyValu
responseType, callback);
}

/**
* async general http request.
*
* <p>{@code responseType} can be an RestResult or RestResult data {@code T} type.
*
* <p>{@code callback} Result callback execution,
* if you need response headers, you can convert the received RestResult to HttpRestResult.
*
* @param url url
* @param httpMethod http header param
* @param requestEntity http body param
* @param responseType return type
* @param callback callback {@link Callback#onReceive(com.alibaba.nacos.common.model.RestResult)}
*/
@SuppressWarnings("unchecked")
private <T> void execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type type,
public <T> void execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType,
Callback<T> callback) {
try {
URI uri = HttpUtils.buildUri(url, requestEntity.getQuery());
if (logger.isDebugEnabled()) {
logger.debug("HTTP method: {}, url: {}, body: {}", httpMethod, uri, requestEntity.getBody());
}
ResponseHandler<T> responseHandler = super.selectResponseHandler(type);
ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType);
clientRequest.execute(uri, httpMethod, requestEntity, responseHandler, callback);
} catch (Exception e) {
// When an exception occurs, use Callback to pass it instead of throw it directly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class DistroConfig extends AbstractDynamicConfig {

private long loadDataTimeoutMillis = DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS;

private boolean asyncDistroForward = DistroConstants.DEFAULT_ASYNC_DISTRO_FORWARD_VALUE;

private DistroConfig() {
super(DISTRO);
resetConfig();
Expand All @@ -65,6 +67,9 @@ protected void getConfigFromEnv() {
DistroConstants.DEFAULT_DATA_LOAD_RETRY_DELAY_MILLISECONDS);
loadDataTimeoutMillis = EnvUtil.getProperty(DistroConstants.DATA_LOAD_TIMEOUT_MILLISECONDS, Long.class,
DistroConstants.DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS);
asyncDistroForward = EnvUtil.getProperty(DistroConstants.NACOS_ASYNC_DISTRO_FORWARD_NAME, Boolean.class,
DistroConstants.DEFAULT_ASYNC_DISTRO_FORWARD_VALUE);

}

public static DistroConfig getInstance() {
Expand Down Expand Up @@ -127,6 +132,14 @@ public void setLoadDataTimeoutMillis(long loadDataTimeoutMillis) {
this.loadDataTimeoutMillis = loadDataTimeoutMillis;
}

public boolean isAsyncDistroForward() {
return asyncDistroForward;
}

public void setAsyncDistroForward(boolean asyncDistroForward) {
this.asyncDistroForward = asyncDistroForward;
}

@Override
protected String printConfig() {
return "DistroConfig{" + "syncDelayMillis=" + syncDelayMillis + ", syncTimeoutMillis=" + syncTimeoutMillis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ public class DistroConstants {

public static final long DEFAULT_DATA_LOAD_TIMEOUT_MILLISECONDS = 30000L;

public static final String NACOS_ASYNC_DISTRO_FORWARD_NAME = "nacos.async.distro.forward";

public static final boolean DEFAULT_ASYNC_DISTRO_FORWARD_VALUE = false;

}
8 changes: 8 additions & 0 deletions naming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,14 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-netty</artifactId>
</dependency>
<dependency>
<groupId>org.mock-server</groupId>
<artifactId>mockserver-client-java</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,5 +109,4 @@ public final class Constants {
* Min value of instance weight.
*/
public static final double MIN_WEIGHT_VALUE = 0.00D;

}
42 changes: 37 additions & 5 deletions naming/src/main/java/com/alibaba/nacos/naming/misc/HttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.alibaba.nacos.common.http.param.Header;
import com.alibaba.nacos.common.http.param.MediaType;
import com.alibaba.nacos.common.http.param.Query;
import com.alibaba.nacos.common.model.RequestHttpEntity;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.HttpMethod;
import com.alibaba.nacos.common.utils.VersionUtils;
Expand Down Expand Up @@ -102,11 +103,7 @@ public static RestResult<String> request(String url, List<String> headers, Map<S
if (CollectionUtils.isNotEmpty(headers)) {
header.addAll(headers);
}
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
header.addParam(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
header.addParam(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
header.addParam(HttpHeaderConsts.REQUEST_SOURCE_HEADER, EnvUtil.getLocalAddress());
header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, encoding);
configDefaultHeaders(header, encoding);
AuthHeaderUtil.addIdentityToHeader(header);

HttpClientConfig httpClientConfig = HttpClientConfig.builder().setConTimeOutMillis(connectTimeout)
Expand All @@ -122,6 +119,14 @@ public static RestResult<String> request(String url, List<String> headers, Map<S
return RestResult.<String>builder().withCode(500).withMsg(e.toString()).build();
}
}

This comment has been minimized.

Copy link
@ANHKIET0806

ANHKIET0806 May 9, 2023

100


private static void configDefaultHeaders(Header header, String encoding) {
header.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
header.addParam(HttpHeaderConsts.CLIENT_VERSION_HEADER, VersionUtils.version);
header.addParam(HttpHeaderConsts.USER_AGENT_HEADER, UtilsAndCommons.SERVER_VERSION);
header.addParam(HttpHeaderConsts.REQUEST_SOURCE_HEADER, EnvUtil.getLocalAddress());
header.addParam(HttpHeaderConsts.ACCEPT_CHARSET, encoding);
}

/**
* Request http get method by async.
Expand Down Expand Up @@ -202,6 +207,33 @@ public static void asyncHttpRequest(String url, List<String> headers, Map<String
}
}

/**
* Do http request by async with request body.
*
* @param url request url
* @param headers request headers
* @param paramValues request params
* @param body request body
* @param callback async callback func
* @param method http method
* @throws Exception exception when request
*/
public static void asyncHttpRequest(String url, List<String> headers, Map<String, String> paramValues, String body,
Callback<String> callback, String method) throws Exception {

Query query = Query.newInstance().initParams(paramValues);
query.addParam(FieldsConstants.ENCODING, ENCODING);
query.addParam(FieldsConstants.NOFIX, NOFIX);

Header header = Header.newInstance();
if (CollectionUtils.isNotEmpty(headers)) {
header.addAll(headers);
}
configDefaultHeaders(header, "UTF-8");
AuthHeaderUtil.addIdentityToHeader(header);
ASYNC_REST_TEMPLATE.execute(url, method, new RequestHttpEntity(header, query, body), String.class, callback);
}

/**
* Request http post method by async with large body.
*
Expand Down
69 changes: 58 additions & 11 deletions naming/src/main/java/com/alibaba/nacos/naming/web/DistroFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package com.alibaba.nacos.naming.web;

import com.alibaba.nacos.common.constant.HttpHeaderConsts;
import com.alibaba.nacos.common.http.Callback;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.IoUtils;
import com.alibaba.nacos.core.code.ControllerMethodsCache;
import com.alibaba.nacos.core.distributed.distro.DistroConfig;
import com.alibaba.nacos.core.utils.ReuseHttpServletRequest;
import com.alibaba.nacos.core.utils.WebUtils;
import com.alibaba.nacos.naming.core.DistroMapper;
Expand All @@ -30,6 +32,7 @@
import com.alibaba.nacos.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;

import javax.servlet.AsyncContext;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
Expand Down Expand Up @@ -117,7 +120,6 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
}

final String targetServer = distroMapper.mapSrv(distroTag);

List<String> headerList = new ArrayList<>(16);
Enumeration<String> headers = req.getHeaderNames();
while (headers.hasMoreElements()) {
Expand All @@ -129,26 +131,71 @@ public void doFilter(ServletRequest servletRequest, ServletResponse servletRespo
final String body = IoUtils.toString(req.getInputStream(), StandardCharsets.UTF_8.name());
final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());

RestResult<String> result = HttpClient
.request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, StandardCharsets.UTF_8.name(), req.getMethod());
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
if (!DistroConfig.getInstance().isAsyncDistroForward()) {
syncForward(req, resp, urlString, targetServer, headerList, paramsValue, body);
} else {
asyncForward(req, resp, urlString, targetServer, headerList, paramsValue, body);
}
} catch (AccessControlException e) {
resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
} catch (NoSuchMethodException e) {
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
"no such api:" + req.getMethod() + ":" + req.getRequestURI());
} catch (Exception e) {
onError(resp, e);
}
}

private void syncForward(ReuseHttpServletRequest req, HttpServletResponse resp, String urlString,
String targetServer, List<String> headerList, Map<String, String> paramsValue, String body) {
RestResult<String> result = HttpClient.request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList,
paramsValue, body, PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, StandardCharsets.UTF_8.name(),
req.getMethod());
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + targetServer + urlString);
}
}

private void asyncForward(HttpServletRequest req, HttpServletResponse resp, String urlString, String targetServer,
List<String> headerList, Map<String, String> paramsValue, String body) throws Exception {
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(PROXY_READ_TIMEOUT);
HttpClient.asyncHttpRequest(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
new Callback<String>() {
@Override
public void onReceive(RestResult<String> result) {
String data = result.ok() ? result.getData() : result.getMessage();
try {
WebUtils.response(resp, data, result.getCode());
} catch (Exception ignore) {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + targetServer + urlString);
}
asyncContext.complete();
}

@Override
public void onError(Throwable e) {
DistroFilter.this.onError(resp, e);
asyncContext.complete();
}

@Override
public void onCancel() {

}
}, req.getMethod());
}

private void onError(HttpServletResponse response, Throwable e) {
try {
Loggers.SRV_LOG.warn("[DISTRO-FILTER] Server failed: ", e);
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Server failed, " + ExceptionUtil.getAllExceptionMsg(e));
} catch (Exception ignore) {
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public FilterRegistrationBean distroFilterRegistration() {
registration.setFilter(distroFilter());
registration.addUrlPatterns(UTL_PATTERNS);
registration.setName(DISTRO_FILTER);
registration.setAsyncSupported(true);
registration.setOrder(7);
return registration;
}
Expand Down
Loading

0 comments on commit a273705

Please sign in to comment.