Skip to content

Commit

Permalink
Merge pull request #330 from CorvusYe/master
Browse files Browse the repository at this point in the history
feat: session pool supports creating through `spaceFromParam`
  • Loading branch information
CorvusYe authored Dec 17, 2024
2 parents ccfb8d0 + cc64f7b commit 5074616
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 132 deletions.
14 changes: 7 additions & 7 deletions ngbatis-demo/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ nebula:
check-fixed-rate: 300000
# space name needs to be informed through annotations(@Space) or xml(space="test")
# default false(false: Session pool map will not be initialized)
use-session-pool: false
hosts: 127.0.0.1:19669
use-session-pool: true
hosts: 139.9.187.207:9669
username: root
password: bmVidWxh
password: U3RhclNoYWRvd18wOTE5
space: test
pool-config:
min-conns-size: 0
max-conns-size: 10
timeout: 6000
idle-time: 0
interval-idle: -1
wait-time: 6000
wait-time: 0
min-cluster-health-rate: 1.0
enable-ssl: false

# 开启 nGQL 输出
#logging:
# level:
# org.nebula.contrib: DEBUG
logging:
level:
org.nebula.contrib: DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void spaceFromParam() {
try {
repository.spaceFromParam(spaceName);
} catch (Exception e) {
Assert.isTrue(e instanceof QueryException && e.getMessage().contains("SpaceNotFound"));
assertSpaceFailed(e);
}
}

Expand All @@ -258,10 +258,17 @@ public void dynamicSpaceWithPage() {
page.setPageNo(1);
repository.dynamicSpaceWithPage(page, spaceName);
} catch (Exception e) {
System.out.println(e.getMessage());
Assert.isTrue(e instanceof QueryException && e.getMessage().contains("SpaceNotFound"));
assertSpaceFailed(e);
}
}

void assertSpaceFailed(Exception e) {
e.printStackTrace();
String message = e.getMessage();
Assert.isTrue(e instanceof QueryException &&
(message.contains("SpaceNotFound") || (message.contains("create session failed.")))
);
}

@Test
public void insertWithTimestamp() {
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/nebula/contrib/ngbatis/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ public SessionDispatcher getDispatcher() {
* @return SessionPool
*/
public SessionPool getSessionPool(String spaceName) {
return mapperContext.getNebulaSessionPoolMap().get(spaceName);
SessionPool sessionPool = mapperContext.getNebulaSessionPoolMap().get(spaceName);
if (sessionPool == null) {
sessionPool = dispatcher.initSessionPool(spaceName);
mapperContext.getNebulaSessionPoolMap().put(spaceName, sessionPool);
}
return sessionPool;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
import static org.nebula.contrib.ngbatis.proxy.NebulaDaoBasicExt.entityTypeAndIdType;
import static org.nebula.contrib.ngbatis.proxy.NebulaDaoBasicExt.vertexName;

import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.SessionPool;
import com.vesoft.nebula.client.graph.SessionPoolConfig;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Resource;
import org.nebula.contrib.ngbatis.config.NebulaJdbcProperties;
import org.nebula.contrib.ngbatis.config.NgbatisConfig;
import org.nebula.contrib.ngbatis.config.ParseCfgProps;
import org.nebula.contrib.ngbatis.io.DaoResourceLoader;
import org.nebula.contrib.ngbatis.models.ClassModel;
Expand Down Expand Up @@ -86,8 +83,6 @@ public MapperContext mapperContext(NebulaPool nebulaPool) {
context.setNebulaPoolConfig(nebulaJdbcProperties.getPoolConfig());
figureTagTypeMapping(interfaces.values(), context.getTagTypeMapping());

setNebulaSessionPool(context);

registerBean(context);
return context;
}
Expand Down Expand Up @@ -201,63 +196,25 @@ public NebulaPool nebulaPool() {
/**
* create and init Nebula SessionPool
*/
public void setNebulaSessionPool(MapperContext context) {
NgbatisConfig ngbatisConfig = nebulaJdbcProperties.getNgbatis();
if (ngbatisConfig.getUseSessionPool() == null || !ngbatisConfig.getUseSessionPool()) {
return;
}

context.getSpaceNameSet().add(nebulaJdbcProperties.getSpace());
Map<String, SessionPool> nebulaSessionPoolMap = context.getNebulaSessionPoolMap();
for (String spaceName : context.getSpaceNameSet()) {
SessionPool sessionPool = initSessionPool(spaceName);
if (sessionPool == null) {
log.error("{} session pool init failed.", spaceName);
continue;
}
nebulaSessionPoolMap.put(spaceName, sessionPool);
}
@Deprecated
public void setNebulaSessionPool(MapperContext context) throws Exception {
throw new Exception(
"Deprecated method, "
+ "please use IntervalCheckSessionDispatcher.setNebulaSessionPool() instead."
);
}

/**
* session pool create and init
* @param spaceName nebula space name
* @return inited SessionPool
*/
public SessionPool initSessionPool(String spaceName) {
final NgbatisConfig ngbatisConfig = nebulaJdbcProperties.getNgbatis();
NebulaPoolConfig poolConfig = nebulaJdbcProperties.getPoolConfig();

SessionPoolConfig sessionPoolConfig = new SessionPoolConfig(
nebulaJdbcProperties.getHostAddresses(),
spaceName,
nebulaJdbcProperties.getUsername(),
nebulaJdbcProperties.getPassword()
).setUseHttp2(poolConfig.isUseHttp2())
.setEnableSsl(poolConfig.isEnableSsl())
.setSslParam(poolConfig.getSslParam())
.setCustomHeaders(poolConfig.getCustomHeaders())
.setWaitTime(poolConfig.getWaitTime())
.setTimeout(poolConfig.getTimeout());

if (poolConfig.getMinConnSize() <= 0) {
sessionPoolConfig.setMinSessionSize(1);
} else {
sessionPoolConfig.setMinSessionSize(poolConfig.getMinConnSize());
}
sessionPoolConfig.setMaxSessionSize(poolConfig.getMaxConnSize());
sessionPoolConfig.setTimeout(poolConfig.getTimeout());
sessionPoolConfig.setWaitTime(poolConfig.getWaitTime());
if (null != ngbatisConfig.getSessionLifeLength()) {
int cleanTime = (int) (ngbatisConfig.getSessionLifeLength() / 1000);
sessionPoolConfig.setCleanTime(cleanTime);
}
if (null != ngbatisConfig.getCheckFixedRate()) {
int healthCheckTime = (int) (ngbatisConfig.getCheckFixedRate() / 1000);
sessionPoolConfig.setHealthCheckTime(healthCheckTime);
}

return new SessionPool(sessionPoolConfig);
@Deprecated
public SessionPool initSessionPool(String spaceName) throws Exception {
throw new Exception(
"Deprecated method, "
+ "please use SessionDispatcher.initSessionPool() instead."
);
}

@Override
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/org/nebula/contrib/ngbatis/SessionDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import static org.apache.commons.lang3.StringUtils.isNotBlank;

import com.vesoft.nebula.client.graph.SessionPool;
import com.vesoft.nebula.client.graph.data.ResultSet;
import java.util.Map;
import org.nebula.contrib.ngbatis.config.NgbatisConfig;
import org.nebula.contrib.ngbatis.models.MapperContext;
import org.nebula.contrib.ngbatis.session.LocalSession;
Expand Down Expand Up @@ -68,4 +71,25 @@ static boolean useSessionPool() {
NgbatisConfig ngbatisConfig = MapperContext.newInstance().getNgbatisConfig();
return ngbatisConfig != null && ngbatisConfig.getUseSessionPool();
}

/**
* 按 spaceName 初始化 sessionPool
*
* @param spaceName 可以是启动时不存在的空间名
* @return 初始化后的 sessionPool
*/
SessionPool initSessionPool(String spaceName);

/**
* 处理会话
* @param localSession 本地会话
* @param result 结果集,主要获取成功与否
*/
void handleSession(LocalSession localSession, ResultSet result);

ResultSet executeWithParameter(
String gql,
Map<String, Object> params,
String space,
Map<String, Object> extraReturn);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class EnvConfig {
@Autowired(required = false)
private PkGenerator pkGenerator;

@Autowired
private SessionDispatcher sessionDispatcher;

/**
Expand All @@ -80,8 +81,6 @@ public class EnvConfig {
*/
@Bean
public Env getEnv() {
properties.setPoolConfig(MapperContext.newInstance().getNebulaPoolConfig());
sessionDispatcher = new IntervalCheckSessionDispatcher(properties);
return new Env(
textResolver,
resultResolver,
Expand Down
83 changes: 19 additions & 64 deletions src/main/java/org/nebula/contrib/ngbatis/proxy/MapperProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,8 @@ public static Object invoke(ClassModel classModel, MethodModel methodModel, Obje

Map<String,Object> parasForDb = argsResolver.resolve(methodModel, args);
final long step1 = System.currentTimeMillis();
NgbatisConfig ngbatisConfig = MapperContext.newInstance().getNgbatisConfig();
if (ngbatisConfig == null || !ngbatisConfig.getUseSessionPool()) {
query = executeWithParameter(classModel, methodModel, gql, parasForDb, argMap);
} else {
query = executeBySessionPool(classModel, methodModel, gql, parasForDb, argMap);
}

query = executeWithParameter(classModel, methodModel, gql, parasForDb, argMap);

final long step2 = System.currentTimeMillis();
if (!query.isSucceeded()) {
Expand Down Expand Up @@ -206,31 +202,29 @@ private static Object pageSupport(ClassModel classModel, Method method, Object[]
* @param params 待执行脚本的参数所需的参数
* @return nebula-graph 的未被 orm 操作的原始结果集
*/
public static ResultSet executeWithParameter(ClassModel cm, MethodModel mm, String gql,
Map<String, Object> params, Map<String, Object> paramsForTemplate) {
LocalSession localSession = null;
Session session = null;
public static ResultSet executeWithParameter(
ClassModel cm, MethodModel mm, String gql,
Map<String, Object> params,
Map<String, Object> paramsForTemplate) {

ResultSet result = null;
String proxyClass = null;
String proxyMethod = null;
String localSessionSpace = null;
String autoSwitch = null;

SessionDispatcher dispatcher = ENV.getDispatcher();
Map<String, Object> extraReturn = new HashMap<>();

try {
localSession = dispatcher.poll();
if (log.isDebugEnabled()) {
proxyClass = cm.getNamespace().getName();
proxyMethod = mm.getId();
localSessionSpace = localSession.getCurrentSpace();
}

String currentSpace = getSpace(cm, mm, paramsForTemplate);
String[] qlAndSpace = qlWithSpace(localSession, gql, currentSpace);
gql = qlAndSpace[1];
autoSwitch = qlAndSpace[0] == null ? "" : qlAndSpace[0];
session = localSession.getSession();
result = session.executeWithParameter(gql, params);
localSession.setCurrentSpace(getSpace(result));
result = dispatcher.executeWithParameter(
gql, params, currentSpace, extraReturn
);

if (result.isSucceeded()) {
return result;
} else {
Expand All @@ -243,15 +237,18 @@ public static ResultSet executeWithParameter(ClassModel cm, MethodModel mm, Stri
throw new QueryException("数据查询失败:" + e.getMessage(), e);
} finally {
if (log.isDebugEnabled()) {
Object autoSwitch = extraReturn.get("autoSwitch");
Object localSessionSpace = extraReturn.get("localSessionSpace");
boolean noNeedSwitch = isEmpty(autoSwitch);
autoSwitch = (isEmpty(autoSwitch) ? "" : autoSwitch);
log.debug("\n\t- proxyMethod: {}#{}"
+ "\n\t- session space: {}"
+ (isEmpty(autoSwitch) ? "\n\t- {}" : "\n\t- auto switch to: {}")
+ (noNeedSwitch ? "\n\t- {}" : "\n\t- auto switch to: {}")
+ "\n\t- nGql:{}"
+ "\n\t- params: {}"
+ "\n\t- result:{}",
proxyClass, proxyMethod, localSessionSpace, autoSwitch, gql, paramsForTemplate, result);
}
handleSession(dispatcher, localSession, result);
}
}

Expand Down Expand Up @@ -304,38 +301,6 @@ public static ResultSet executeBySessionPool(ClassModel cm, MethodModel mm, Stri
}
}

private static void handleSession(SessionDispatcher dispatcher,
LocalSession localSession, ResultSet result) {
if (localSession != null) {
boolean sessionError = ResultSetUtil.isSessionError(result);
if (sessionError || dispatcher.timeToRelease(localSession)) {
dispatcher.release(localSession);
} else {
dispatcher.offer(localSession);
}
}
}

private static String[] qlWithSpace(LocalSession localSession, String gql, String currentSpace)
throws IOErrorException, BindSpaceFailedException {
String[] qlAndSpace = new String[2];
gql = gql.trim();
String sessionSpace = localSession.getCurrentSpace();
boolean sameSpace = Objects.equals(sessionSpace, currentSpace);
if (!sameSpace && currentSpace != null) {
qlAndSpace[0] = currentSpace;
Session session = localSession.getSession();
ResultSet execute = session.execute(String.format("USE `%s`", currentSpace));
if (!execute.isSucceeded()) {
throw new BindSpaceFailedException(
String.format(" %s \"%s\"", execute.getErrorMessage(), currentSpace)
);
}
}
qlAndSpace[1] = String.format("\n\t\t%s", gql);
return qlAndSpace;
}

/**
* 获取当前语句所执行的目标space。
* @param cm 当前接口的类模型
Expand Down Expand Up @@ -370,16 +335,6 @@ public static String getSpace(
return space;
}

/**
* 从结果集中获取当前的 space
* @param result 脚本执行之后的结果集
* @return 结果集所对应的 space
*/
private static String getSpace(ResultSet result) {
String spaceName = result.getSpaceName();
return isBlank(spaceName) ? null : spaceName;
}

public static Logger getLog() {
return log;
}
Expand Down
Loading

0 comments on commit 5074616

Please sign in to comment.