Skip to content

Commit

Permalink
refactor: 优化上下文逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed May 30, 2024
1 parent 834426f commit 41649b7
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -25,40 +26,53 @@ public JdbcReactiveSqlExecutor() {

@Override
public Mono<Integer> update(Publisher<SqlRequest> request) {

return getConnection()
return Mono
.deferContextual(ctx -> getConnection()
.flatMap(connection -> toFlux(request)
.map(sql -> doUpdate(connection, sql))
.reduce(Math::addExact))
.defaultIfEmpty(0);
.map(sql -> doUpdate(ctx.getOrDefault(Logger.class, log), connection, sql))
.reduce(Math::addExact))
.defaultIfEmpty(0));

}

@Override
public Mono<Void> execute(Publisher<SqlRequest> request) {
return getConnection()

return Mono
.deferContextual(ctx -> getConnection()
.flatMap(connection -> toFlux(request)
.doOnNext(sql -> doExecute(connection, sql))
.then());
.doOnNext(sql -> doExecute(ctx.getOrDefault(Logger.class, log), connection, sql))
.then()));
}

@Override
public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
return Flux
.create(sink -> {
@SuppressWarnings("all")
Disposable disposable = getConnection()
.deferContextual(ctx -> {
Logger logger = ctx.getOrDefault(Logger.class, log);
return Flux
.create(sink -> {
@SuppressWarnings("all")
Disposable disposable = getConnection()
.flatMap(connection -> toFlux(request)
.doOnNext(sql -> doSelect(connection, sql, consumer(wrapper, sink::next)))
.then())
.doOnNext(sql -> this
.doSelect(
logger,
connection,
sql,
consumer(wrapper, sink::next),
t -> sink.isCancelled()))
.then())
.subscribe((ignore) -> sink.complete(),
sink::error,
sink::complete,
Context.of(sink.contextView()));

sink.onCancel(disposable)
.onDispose(disposable);
});
sink.onCancel(disposable)
.onDispose(disposable);
});
});

}

protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.sql.*;
import java.util.List;
import java.util.function.Predicate;

import static org.hswebframework.ezorm.rdb.executor.jdbc.JdbcSqlExecutorHelper.*;
import static org.hswebframework.ezorm.rdb.utils.SqlUtils.printSql;
Expand All @@ -30,7 +31,7 @@ protected void releaseResultSet(ResultSet resultSet) {
}

@SneakyThrows
protected int doUpdate(Connection connection, SqlRequest request) {
protected int doUpdate(Logger logger, Connection connection, SqlRequest request) {
printSql(logger, request);
PreparedStatement statement = null;
try {
Expand Down Expand Up @@ -66,8 +67,12 @@ protected int doUpdate(Connection connection, SqlRequest request) {
}
}

protected int doUpdate(Connection connection, SqlRequest request) {
return doUpdate(logger, connection, request);
}

@SneakyThrows
protected void doExecute(Connection connection, SqlRequest request) {
protected void doExecute(Logger logger, Connection connection, SqlRequest request) {
PreparedStatement statement = null;
try {
if (!request.isEmpty()) {
Expand Down Expand Up @@ -97,6 +102,10 @@ protected void doExecute(Connection connection, SqlRequest request) {
}
}

protected void doExecute(Connection connection, SqlRequest request) {
doExecute(logger, connection, request);
}

@SneakyThrows
protected Object getResultValue(ResultSetMetaData metaData, ResultSet set, int columnIndex) {

Expand All @@ -120,7 +129,11 @@ protected Object getResultValue(ResultSetMetaData metaData, ResultSet set, int c
}

@SneakyThrows
public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrapper<T, R> wrapper) {
protected <T, R> R doSelect(Logger logger,
Connection connection,
SqlRequest request,
ResultWrapper<T, R> wrapper,
Predicate<T> stopped) {
PreparedStatement statement = connection.prepareStatement(request.getSql());
try {
printSql(logger, request);
Expand All @@ -143,7 +156,7 @@ public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrappe
data = context.getRowInstance();
}
index++;
if (!wrapper.completedWrapRow(data)) {
if (!wrapper.completedWrapRow(data) || stopped.test(data)) {
break;
}
}
Expand All @@ -155,4 +168,10 @@ public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrappe
releaseStatement(statement);
}
}


@SneakyThrows
public <T, R> R doSelect(Connection connection, SqlRequest request, ResultWrapper<T, R> wrapper) {
return doSelect(logger, connection, request, wrapper, (t) -> false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
import org.hswebframework.ezorm.rdb.operator.dml.delete.DeleteOperator;
import org.slf4j.Logger;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

public class DefaultReactiveDelete extends DefaultDelete<ReactiveDelete> implements ReactiveDelete {

private final Logger logger;

private final Function<Context, Context> context;
public DefaultReactiveDelete(RDBTableMetadata tableMetadata,
DeleteOperator operator,
Logger logger,
Function<Context, Context> context,
ContextKeyValue<?>... keyValues) {
super(tableMetadata, operator, keyValues);
this.logger = logger;
this.context = context;
}

public BiFunction<ReactiveDelete, Mono<Integer>, Mono<Integer>> mapper = (reactiveDelete, integerMono) -> integerMono;
Expand All @@ -28,7 +29,7 @@ public Mono<Integer> execute() {
return mapper.apply(this, this
.doExecute()
.reactive()
.contextWrite(ctx -> ctx.put(Logger.class, logger)));
.contextWrite(context));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.util.function.BiFunction;
import java.util.function.Function;

import static org.hswebframework.ezorm.rdb.events.ContextKeys.source;
Expand All @@ -29,16 +29,16 @@

public class DefaultReactiveQuery<T> extends DefaultQuery<T, ReactiveQuery<T>> implements ReactiveQuery<T> {

private final Logger logger;
private final Function<Context, Context> context;

public DefaultReactiveQuery(TableOrViewMetadata tableMetadata,
EntityColumnMapping mapping,
DMLOperator operator,
ResultWrapper<T, ?> wrapper,
Logger logger,
Function<Context, Context> context,
ContextKeyValue<?>... keyValues) {
super(tableMetadata, mapping, operator, wrapper, keyValues);
this.logger = logger;
this.context = context;
}

@Override
Expand All @@ -55,7 +55,7 @@ public Flux<T> fetch() {
.when(param.isForUpdate(), QueryOperator::forUpdate)
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetch")))
.reactive())
.contextWrite(ctx->ctx.put(Logger.class,logger));
.contextWrite(context);
}

@Override
Expand All @@ -73,7 +73,7 @@ public Mono<T> fetchOne() {
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetchOne")))
.reactive()
.take(1))
.contextWrite(ctx-> ctx.put(Logger.class,logger))
.contextWrite(context)
.singleOrEmpty();
}

Expand Down Expand Up @@ -111,7 +111,7 @@ public Mono<Integer> count() {
.map(Number::intValue)
.reduce(Math::addExact)
.switchIfEmpty(Mono.just(0)))
.contextWrite(ctx-> ctx.put(Logger.class,logger))
.contextWrite(context)
.singleOrEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import java.util.Collection;
import java.util.function.Function;
import java.util.function.Supplier;

public class DefaultReactiveRepository<E, K> extends DefaultRepository<E> implements ReactiveRepository<E, K> {
Expand All @@ -26,9 +28,9 @@ public class DefaultReactiveRepository<E, K> extends DefaultRepository<E> implem
public DefaultReactiveRepository(DatabaseOperator operator, String table, Class<E> type, ResultWrapper<E, ?> wrapper) {
this(operator,
() -> operator
.getMetadata()
.getTable(table)
.orElseThrow(() -> new UnsupportedOperationException("table [" + table + "] doesn't exist")), type, wrapper);
.getMetadata()
.getTable(table)
.orElseThrow(() -> new UnsupportedOperationException("table [" + table + "] doesn't exist")), type, wrapper);
}

public DefaultReactiveRepository(DatabaseOperator operator, RDBTableMetadata table, Class<E> type, ResultWrapper<E, ?> wrapper) {
Expand All @@ -53,7 +55,7 @@ public Mono<E> newInstance() {
@Override
public Mono<E> findById(Mono<K> primaryKey) {
return primaryKey
.flatMap(k -> createQuery().where(getIdColumn(), k).fetchOne());
.flatMap(k -> createQuery().where(getIdColumn(), k).fetchOne());
}

@Override
Expand All @@ -75,84 +77,78 @@ public Mono<Integer> deleteById(Publisher<K> key) {
@Override
public Mono<Integer> updateById(K id, Mono<E> data) {
return data
.flatMap(_data -> createUpdate()
.where(getIdColumn(), id)
.set(_data)
.execute());
.flatMap(_data -> createUpdate()
.where(getIdColumn(), id)
.set(_data)
.execute());
}

@Override
public Mono<SaveResult> save(Publisher<E> data) {
return Flux
.from(data)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> doSave(list).reactive().as(this::setupLogger))
.defaultIfEmpty(SaveResult.of(0, 0));
.from(data)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> doSave(list).reactive().contextWrite(this::applyContext))
.defaultIfEmpty(SaveResult.of(0, 0));
}

@Override
public Mono<Integer> insert(Publisher<E> data) {
return Flux
.from(data)
.buffer(100)
.as(this::insertBatch);
.from(data)
.buffer(100)
.as(this::insertBatch);
}

@Override
public Mono<Integer> insertBatch(Publisher<? extends Collection<E>> data) {
return Flux
.from(data)
.filter(CollectionUtils::isNotEmpty)
.flatMap(e -> doInsert(e).reactive())
.reduce(Math::addExact)
.defaultIfEmpty(0)
.as(this::setupLogger);
.from(data)
.filter(CollectionUtils::isNotEmpty)
.flatMap(e -> doInsert(e).reactive())
.reduce(Math::addExact)
.defaultIfEmpty(0)
.contextWrite(this::applyContext);
}

@Override
public ReactiveQuery<E> createQuery() {
return new DefaultReactiveQuery<>(getTable()
, mapping
, operator.dml()
, wrapper
, logger
, getDefaultContextKeyValue());
, mapping
, operator.dml()
, wrapper
, this::applyContext
, getDefaultContextKeyValue());
}

@Override
public ReactiveUpdate<E> createUpdate() {
return new DefaultReactiveUpdate<>(
getTable()
, operator.dml().update(getTable().getFullName())
, mapping
, logger
, getDefaultContextKeyValue());
getTable()
, operator.dml().update(getTable().getFullName())
, mapping
, this::applyContext
, getDefaultContextKeyValue());
}

@Override
public ReactiveDelete createDelete() {
return new DefaultReactiveDelete(getTable()
, operator.dml().delete(getTable().getFullName())
, logger
, getDefaultContextKeyValue()
, operator.dml().delete(getTable().getFullName())
, this::applyContext
, getDefaultContextKeyValue()
);
}


private <T> Mono<T> setupLogger(Mono<T> async) {
return async.contextWrite(ctx -> ctx.put(Logger.class, logger));
}

private <T> Flux<T> setupLogger(Flux<T> async) {
return async.contextWrite(ctx -> ctx.put(Logger.class, logger));
protected Context applyContext(Context context) {
return context.put(Logger.class, logger);
}


@Override
public QueryOperator nativeQuery() {
return operator
.dml()
.query(getTable());
.dml()
.query(getTable());
}
}
Loading

0 comments on commit 41649b7

Please sign in to comment.