Skip to content

Commit

Permalink
refactor: 优化构造SQL性能 (#72)
Browse files Browse the repository at this point in the history
* refactor: 优化构造SQL性能

自定义条件时, 建议根据情况使用 `BatchSqlFragments`或者`SqlFragments#single,of` 或者 `SimpleSqlFragments.of`

* refactor: 优化BatchSqlFragments初始化指定容量

* refactor: 优化上下文逻辑
  • Loading branch information
zhou-hao authored May 31, 2024
1 parent d77de93 commit eabe284
Show file tree
Hide file tree
Showing 70 changed files with 1,704 additions and 865 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.hswebframework.ezorm.core.FeatureType;

import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
@Getter
@Setter
public class Term implements Serializable,Cloneable {
public class Term implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;

/**
Expand All @@ -35,13 +35,13 @@ public class Term implements Serializable,Cloneable {
/**
* 链接类型
*/
@Schema(description = "多个条件关联类型",defaultValue = "and")
@Schema(description = "多个条件关联类型", defaultValue = "and")
private Type type = Type.and;

/**
* 条件类型
*/
@Schema(description = "动态条件类型",defaultValue = "eq")
@Schema(description = "动态条件类型", defaultValue = "eq")
private String termType = TermType.eq;

/**
Expand All @@ -59,6 +59,17 @@ public class Term implements Serializable,Cloneable {
@Schema(description = "嵌套条件")
private List<Term> terms = new LinkedList<>();

public static Term of(String column,
String termType,
Object value,
String... options) {
Term term = new Term();
term.setColumn(column);
term.setTermType(termType);
term.setValue(value);
term.getOptions().addAll(Arrays.asList(options));
return term;
}

public Term or(String term, Object value) {
return or(term, TermType.eq, value);
Expand Down
6 changes: 6 additions & 0 deletions hsweb-easy-orm-rdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.0.0-jre</version>
</dependency>

</dependencies>

<repositories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -25,40 +26,53 @@ public JdbcReactiveSqlExecutor() {

@Override
public Mono<Integer> update(Publisher<SqlRequest> request) {

return getConnection()
return Mono
.deferContextual(ctx -> getConnection()
.flatMap(connection -> toFlux(request)
.map(sql -> doUpdate(connection, sql))
.reduce(Math::addExact))
.defaultIfEmpty(0);
.map(sql -> doUpdate(ctx.getOrDefault(Logger.class, log), connection, sql))
.reduce(Math::addExact))
.defaultIfEmpty(0));

}

@Override
public Mono<Void> execute(Publisher<SqlRequest> request) {
return getConnection()

return Mono
.deferContextual(ctx -> getConnection()
.flatMap(connection -> toFlux(request)
.doOnNext(sql -> doExecute(connection, sql))
.then());
.doOnNext(sql -> doExecute(ctx.getOrDefault(Logger.class, log), connection, sql))
.then()));
}

@Override
public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
return Flux
.create(sink -> {
@SuppressWarnings("all")
Disposable disposable = getConnection()
.deferContextual(ctx -> {
Logger logger = ctx.getOrDefault(Logger.class, log);
return Flux
.create(sink -> {
@SuppressWarnings("all")
Disposable disposable = getConnection()
.flatMap(connection -> toFlux(request)
.doOnNext(sql -> doSelect(connection, sql, consumer(wrapper, sink::next)))
.then())
.doOnNext(sql -> this
.doSelect(
logger,
connection,
sql,
consumer(wrapper, sink::next),
t -> sink.isCancelled()))
.then())
.subscribe((ignore) -> sink.complete(),
sink::error,
sink::complete,
Context.of(sink.contextView()));

sink.onCancel(disposable)
.onDispose(disposable);
});
sink.onCancel(disposable)
.onDispose(disposable);
});
});

}

protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.sql.*;
import java.util.List;
import java.util.function.Predicate;

import static org.hswebframework.ezorm.rdb.executor.jdbc.JdbcSqlExecutorHelper.*;
import static org.hswebframework.ezorm.rdb.utils.SqlUtils.printSql;
Expand All @@ -30,7 +31,7 @@ protected void releaseResultSet(ResultSet resultSet) {
}

@SneakyThrows
protected int doUpdate(Connection connection, SqlRequest request) {
protected int doUpdate(Logger logger, Connection connection, SqlRequest request) {
printSql(logger, request);
PreparedStatement statement = null;
try {
Expand Down Expand Up @@ -66,8 +67,12 @@ protected int doUpdate(Connection connection, SqlRequest request) {
}
}

protected int doUpdate(Connection connection, SqlRequest request) {
return doUpdate(logger, connection, request);
}

@SneakyThrows
protected void doExecute(Connection connection, SqlRequest request) {
protected void doExecute(Logger logger, Connection connection, SqlRequest request) {
PreparedStatement statement = null;
try {
if (!request.isEmpty()) {
Expand Down Expand Up @@ -97,6 +102,10 @@ protected void doExecute(Connection connection, SqlRequest request) {
}
}

protected void doExecute(Connection connection, SqlRequest request) {
doExecute(logger, connection, request);
}

@SneakyThrows
protected Object getResultValue(ResultSetMetaData metaData, ResultSet set, int columnIndex) {

Expand All @@ -120,7 +129,11 @@ protected Object getResultValue(ResultSetMetaData metaData, ResultSet set, int c
}

@SneakyThrows
public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrapper<T, R> wrapper) {
protected <T, R> R doSelect(Logger logger,
Connection connection,
SqlRequest request,
ResultWrapper<T, R> wrapper,
Predicate<T> stopped) {
PreparedStatement statement = connection.prepareStatement(request.getSql());
try {
printSql(logger, request);
Expand All @@ -143,7 +156,7 @@ public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrappe
data = context.getRowInstance();
}
index++;
if (!wrapper.completedWrapRow(data)) {
if (!wrapper.completedWrapRow(data) || stopped.test(data)) {
break;
}
}
Expand All @@ -155,4 +168,10 @@ public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrappe
releaseStatement(statement);
}
}


@SneakyThrows
public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrapper<T, R> wrapper) {
return doSelect(logger, connection, request, wrapper, (t) -> false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
import org.hswebframework.ezorm.rdb.operator.dml.delete.DeleteOperator;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

public class DefaultReactiveDelete extends DefaultDelete<ReactiveDelete> implements ReactiveDelete {

private final Logger logger;

private final Function<Context, Context> context;
public DefaultReactiveDelete(RDBTableMetadata tableMetadata,
DeleteOperator operator,
Logger logger,
Function<Context, Context> context,
ContextKeyValue<?>... keyValues) {
super(tableMetadata, operator, keyValues);
this.logger = logger;
this.context = context;
}

public BiFunction<ReactiveDelete, Mono<Integer>, Mono<Integer>> mapper = (reactiveDelete, integerMono) -> integerMono;
Expand All @@ -28,7 +29,7 @@ public Mono<Integer> execute() {
return mapper.apply(this, this
.doExecute()
.reactive()
.contextWrite(ctx -> ctx.put(Logger.class, logger)));
.contextWrite(context));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

import static org.hswebframework.ezorm.rdb.events.ContextKeys.source;
Expand All @@ -29,16 +29,16 @@

public class DefaultReactiveQuery<T> extends DefaultQuery<T, ReactiveQuery<T>> implements ReactiveQuery<T> {

private final Logger logger;
private final Function<Context, Context> context;

public DefaultReactiveQuery(TableOrViewMetadata tableMetadata,
EntityColumnMapping mapping,
DMLOperator operator,
ResultWrapper<T, ?> wrapper,
Logger logger,
Function<Context, Context> context,
ContextKeyValue<?>... keyValues) {
super(tableMetadata, mapping, operator, wrapper, keyValues);
this.logger = logger;
this.context = context;
}

@Override
Expand All @@ -55,7 +55,7 @@ public Flux<T> fetch() {
.when(param.isForUpdate(), QueryOperator::forUpdate)
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetch")))
.reactive())
.contextWrite(ctx->ctx.put(Logger.class,logger));
.contextWrite(context);
}

@Override
Expand All @@ -73,7 +73,7 @@ public Mono<T> fetchOne() {
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetchOne")))
.reactive()
.take(1))
.contextWrite(ctx-> ctx.put(Logger.class,logger))
.contextWrite(context)
.singleOrEmpty();
}

Expand Down Expand Up @@ -111,7 +111,7 @@ public Mono<Integer> count() {
.map(Number::intValue)
.reduce(Math::addExact)
.switchIfEmpty(Mono.just(0)))
.contextWrite(ctx-> ctx.put(Logger.class,logger))
.contextWrite(context)
.singleOrEmpty();
}

Expand Down
Loading

0 comments on commit eabe284

Please sign in to comment.