diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java index a7ad3a3a..106169dd 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcReactiveSqlExecutor.java @@ -5,6 +5,7 @@ import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor; import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper; import org.reactivestreams.Publisher; +import org.slf4j.Logger; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -25,40 +26,53 @@ public JdbcReactiveSqlExecutor() { @Override public Mono update(Publisher request) { - - return getConnection() + return Mono + .deferContextual(ctx -> getConnection() .flatMap(connection -> toFlux(request) - .map(sql -> doUpdate(connection, sql)) - .reduce(Math::addExact)) - .defaultIfEmpty(0); + .map(sql -> doUpdate(ctx.getOrDefault(Logger.class, log), connection, sql)) + .reduce(Math::addExact)) + .defaultIfEmpty(0)); } @Override public Mono execute(Publisher request) { - return getConnection() + + return Mono + .deferContextual(ctx -> getConnection() .flatMap(connection -> toFlux(request) - .doOnNext(sql -> doExecute(connection, sql)) - .then()); + .doOnNext(sql -> doExecute(ctx.getOrDefault(Logger.class, log), connection, sql)) + .then())); } @Override public Flux select(Publisher request, ResultWrapper wrapper) { return Flux - .create(sink -> { - @SuppressWarnings("all") - Disposable disposable = getConnection() + .deferContextual(ctx -> { + Logger logger = ctx.getOrDefault(Logger.class, log); + return Flux + .create(sink -> { + @SuppressWarnings("all") + Disposable disposable = getConnection() .flatMap(connection -> toFlux(request) - .doOnNext(sql -> doSelect(connection, sql, consumer(wrapper, sink::next))) - .then()) + .doOnNext(sql -> this + .doSelect( + logger, + connection, + sql, + consumer(wrapper, sink::next), + t -> sink.isCancelled())) + .then()) .subscribe((ignore) -> sink.complete(), sink::error, sink::complete, Context.of(sink.contextView())); - sink.onCancel(disposable) - .onDispose(disposable); - }); + sink.onCancel(disposable) + .onDispose(disposable); + }); + }); + } protected Flux toFlux(Publisher request) { diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcSqlExecutor.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcSqlExecutor.java index ba313522..d04f5145 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcSqlExecutor.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/executor/jdbc/JdbcSqlExecutor.java @@ -10,6 +10,7 @@ import java.sql.*; import java.util.List; +import java.util.function.Predicate; import static org.hswebframework.ezorm.rdb.executor.jdbc.JdbcSqlExecutorHelper.*; import static org.hswebframework.ezorm.rdb.utils.SqlUtils.printSql; @@ -30,7 +31,7 @@ protected void releaseResultSet(ResultSet resultSet) { } @SneakyThrows - protected int doUpdate(Connection connection, SqlRequest request) { + protected int doUpdate(Logger logger, Connection connection, SqlRequest request) { printSql(logger, request); PreparedStatement statement = null; try { @@ -66,8 +67,12 @@ protected int doUpdate(Connection connection, SqlRequest request) { } } + protected int doUpdate(Connection connection, SqlRequest request) { + return doUpdate(logger, connection, request); + } + @SneakyThrows - protected void doExecute(Connection connection, SqlRequest request) { + protected void doExecute(Logger logger, Connection connection, SqlRequest request) { PreparedStatement statement = null; try { if (!request.isEmpty()) { @@ -97,6 +102,10 @@ protected void doExecute(Connection connection, SqlRequest request) { } } + protected void doExecute(Connection connection, SqlRequest request) { + doExecute(logger, connection, request); + } + @SneakyThrows protected Object getResultValue(ResultSetMetaData metaData, ResultSet set, int columnIndex) { @@ -120,7 +129,11 @@ protected Object getResultValue(ResultSetMetaData metaData, ResultSet set, int c } @SneakyThrows - public R doSelect(Connection connection, SqlRequest request, ResultWrapper wrapper) { + protected R doSelect(Logger logger, + Connection connection, + SqlRequest request, + ResultWrapper wrapper, + Predicate stopped) { PreparedStatement statement = connection.prepareStatement(request.getSql()); try { printSql(logger, request); @@ -143,7 +156,7 @@ public R doSelect(Connection connection, SqlRequest request, ResultWrappe data = context.getRowInstance(); } index++; - if (!wrapper.completedWrapRow(data)) { + if (!wrapper.completedWrapRow(data) || stopped.test(data)) { break; } } @@ -155,4 +168,10 @@ public R doSelect(Connection connection, SqlRequest request, ResultWrappe releaseStatement(statement); } } + + + @SneakyThrows + public R doSelect(Connection connection, SqlRequest request, ResultWrapper wrapper) { + return doSelect(logger, connection, request, wrapper, (t) -> false); + } } diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveDelete.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveDelete.java index 723ddd98..b33d08b2 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveDelete.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveDelete.java @@ -6,19 +6,20 @@ import org.hswebframework.ezorm.rdb.operator.dml.delete.DeleteOperator; import org.slf4j.Logger; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.util.function.BiFunction; +import java.util.function.Function; public class DefaultReactiveDelete extends DefaultDelete implements ReactiveDelete { - private final Logger logger; - + private final Function context; public DefaultReactiveDelete(RDBTableMetadata tableMetadata, DeleteOperator operator, - Logger logger, + Function context, ContextKeyValue... keyValues) { super(tableMetadata, operator, keyValues); - this.logger = logger; + this.context = context; } public BiFunction, Mono> mapper = (reactiveDelete, integerMono) -> integerMono; @@ -28,7 +29,7 @@ public Mono execute() { return mapper.apply(this, this .doExecute() .reactive() - .contextWrite(ctx -> ctx.put(Logger.class, logger))); + .contextWrite(context)); } @Override diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveQuery.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveQuery.java index 6c0eb9f2..3023bf39 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveQuery.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveQuery.java @@ -15,8 +15,8 @@ import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.context.Context; -import java.util.function.BiFunction; import java.util.function.Function; import static org.hswebframework.ezorm.rdb.events.ContextKeys.source; @@ -29,16 +29,16 @@ public class DefaultReactiveQuery extends DefaultQuery> implements ReactiveQuery { - private final Logger logger; + private final Function context; public DefaultReactiveQuery(TableOrViewMetadata tableMetadata, EntityColumnMapping mapping, DMLOperator operator, ResultWrapper wrapper, - Logger logger, + Function context, ContextKeyValue... keyValues) { super(tableMetadata, mapping, operator, wrapper, keyValues); - this.logger = logger; + this.context = context; } @Override @@ -55,7 +55,7 @@ public Flux fetch() { .when(param.isForUpdate(), QueryOperator::forUpdate) .fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetch"))) .reactive()) - .contextWrite(ctx->ctx.put(Logger.class,logger)); + .contextWrite(context); } @Override @@ -73,7 +73,7 @@ public Mono fetchOne() { .fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetchOne"))) .reactive() .take(1)) - .contextWrite(ctx-> ctx.put(Logger.class,logger)) + .contextWrite(context) .singleOrEmpty(); } @@ -111,7 +111,7 @@ public Mono count() { .map(Number::intValue) .reduce(Math::addExact) .switchIfEmpty(Mono.just(0))) - .contextWrite(ctx-> ctx.put(Logger.class,logger)) + .contextWrite(context) .singleOrEmpty(); } diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveRepository.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveRepository.java index dfdf084f..64bb68dc 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveRepository.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveRepository.java @@ -15,8 +15,10 @@ import org.slf4j.Logger; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.util.Collection; +import java.util.function.Function; import java.util.function.Supplier; public class DefaultReactiveRepository extends DefaultRepository implements ReactiveRepository { @@ -26,9 +28,9 @@ public class DefaultReactiveRepository extends DefaultRepository implem public DefaultReactiveRepository(DatabaseOperator operator, String table, Class type, ResultWrapper wrapper) { this(operator, () -> operator - .getMetadata() - .getTable(table) - .orElseThrow(() -> new UnsupportedOperationException("table [" + table + "] doesn't exist")), type, wrapper); + .getMetadata() + .getTable(table) + .orElseThrow(() -> new UnsupportedOperationException("table [" + table + "] doesn't exist")), type, wrapper); } public DefaultReactiveRepository(DatabaseOperator operator, RDBTableMetadata table, Class type, ResultWrapper wrapper) { @@ -53,7 +55,7 @@ public Mono newInstance() { @Override public Mono findById(Mono primaryKey) { return primaryKey - .flatMap(k -> createQuery().where(getIdColumn(), k).fetchOne()); + .flatMap(k -> createQuery().where(getIdColumn(), k).fetchOne()); } @Override @@ -75,84 +77,78 @@ public Mono deleteById(Publisher key) { @Override public Mono updateById(K id, Mono data) { return data - .flatMap(_data -> createUpdate() - .where(getIdColumn(), id) - .set(_data) - .execute()); + .flatMap(_data -> createUpdate() + .where(getIdColumn(), id) + .set(_data) + .execute()); } @Override public Mono save(Publisher data) { return Flux - .from(data) - .collectList() - .filter(CollectionUtils::isNotEmpty) - .flatMap(list -> doSave(list).reactive().as(this::setupLogger)) - .defaultIfEmpty(SaveResult.of(0, 0)); + .from(data) + .collectList() + .filter(CollectionUtils::isNotEmpty) + .flatMap(list -> doSave(list).reactive().contextWrite(this::applyContext)) + .defaultIfEmpty(SaveResult.of(0, 0)); } @Override public Mono insert(Publisher data) { return Flux - .from(data) - .buffer(100) - .as(this::insertBatch); + .from(data) + .buffer(100) + .as(this::insertBatch); } @Override public Mono insertBatch(Publisher> data) { return Flux - .from(data) - .filter(CollectionUtils::isNotEmpty) - .flatMap(e -> doInsert(e).reactive()) - .reduce(Math::addExact) - .defaultIfEmpty(0) - .as(this::setupLogger); + .from(data) + .filter(CollectionUtils::isNotEmpty) + .flatMap(e -> doInsert(e).reactive()) + .reduce(Math::addExact) + .defaultIfEmpty(0) + .contextWrite(this::applyContext); } @Override public ReactiveQuery createQuery() { return new DefaultReactiveQuery<>(getTable() - , mapping - , operator.dml() - , wrapper - , logger - , getDefaultContextKeyValue()); + , mapping + , operator.dml() + , wrapper + , this::applyContext + , getDefaultContextKeyValue()); } @Override public ReactiveUpdate createUpdate() { return new DefaultReactiveUpdate<>( - getTable() - , operator.dml().update(getTable().getFullName()) - , mapping - , logger - , getDefaultContextKeyValue()); + getTable() + , operator.dml().update(getTable().getFullName()) + , mapping + , this::applyContext + , getDefaultContextKeyValue()); } @Override public ReactiveDelete createDelete() { return new DefaultReactiveDelete(getTable() - , operator.dml().delete(getTable().getFullName()) - , logger - , getDefaultContextKeyValue() + , operator.dml().delete(getTable().getFullName()) + , this::applyContext + , getDefaultContextKeyValue() ); } - - private Mono setupLogger(Mono async) { - return async.contextWrite(ctx -> ctx.put(Logger.class, logger)); - } - - private Flux setupLogger(Flux async) { - return async.contextWrite(ctx -> ctx.put(Logger.class, logger)); + protected Context applyContext(Context context) { + return context.put(Logger.class, logger); } - @Override public QueryOperator nativeQuery() { return operator - .dml() - .query(getTable()); + .dml() + .query(getTable()); } } diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveUpdate.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveUpdate.java index f606f48e..cbe32664 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveUpdate.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/DefaultReactiveUpdate.java @@ -7,20 +7,22 @@ import org.hswebframework.ezorm.rdb.operator.dml.update.UpdateOperator; import org.slf4j.Logger; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.util.function.BiFunction; +import java.util.function.Function; public class DefaultReactiveUpdate extends DefaultUpdate> implements ReactiveUpdate { - private final Logger logger; + private final Function context; public DefaultReactiveUpdate(RDBTableMetadata table, UpdateOperator operator, EntityColumnMapping mapping, - Logger logger, + Function context, ContextKeyValue... keyValues) { super(table, operator, mapping, keyValues); - this.logger = logger; + this.context = context; } @@ -29,8 +31,8 @@ public DefaultReactiveUpdate(RDBTableMetadata table, @Override public Mono execute() { return mapper.apply(this, doExecute() - .reactive() - .contextWrite(ctx->ctx.put(Logger.class,logger))); + .reactive() + .contextWrite(context)); } @Override diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/record/RecordReactiveRepository.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/record/RecordReactiveRepository.java index e8ad3315..2f931d99 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/record/RecordReactiveRepository.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/mapping/defaults/record/RecordReactiveRepository.java @@ -6,23 +6,38 @@ import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata; import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata; import org.hswebframework.ezorm.rdb.operator.DatabaseOperator; +import org.slf4j.Logger; +import reactor.core.publisher.Mono; +import reactor.util.context.Context; import java.util.function.Supplier; public class RecordReactiveRepository extends DefaultReactiveRepository { public RecordReactiveRepository(DatabaseOperator operator, String table) { - this(operator,()->operator.getMetadata().getTable(table).orElseThrow(()->new UnsupportedOperationException("table [" + table + "] doesn't exist"))); + this(operator, () -> operator + .getMetadata() + .getTable(table) + .orElseThrow(() -> new UnsupportedOperationException("table [" + table + "] doesn't exist"))); } public RecordReactiveRepository(DatabaseOperator operator, Supplier table) { - super(operator, table, Record.class, RecordResultWrapper.of(SimpleColumnMapping.of(DefaultRecord.class,table))); + super(operator, table, Record.class, RecordResultWrapper.of(SimpleColumnMapping.of(DefaultRecord.class, table))); + } + + + @Override + protected Context applyContext(Context context) { + if (context.hasKey(Logger.class)) { + return context; + } + return super.applyContext(context); } @Override protected void initMapping(Class entityType) { - this.mapping = SimpleColumnMapping.of(entityType,tableSupplier); + this.mapping = SimpleColumnMapping.of(entityType, tableSupplier); defaultContextKeyValue.add(MappingContextKeys.columnMapping(mapping)); } diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/builder/DefaultQuerySqlBuilder.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/builder/DefaultQuerySqlBuilder.java index dda17b1b..f136e314 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/builder/DefaultQuerySqlBuilder.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/builder/DefaultQuerySqlBuilder.java @@ -6,10 +6,7 @@ import org.hswebframework.ezorm.rdb.metadata.RDBFeatureType; import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata; import org.hswebframework.ezorm.rdb.metadata.TableOrViewMetadata; -import org.hswebframework.ezorm.rdb.operator.builder.fragments.BlockSqlFragments; -import org.hswebframework.ezorm.rdb.operator.builder.fragments.NativeSql; -import org.hswebframework.ezorm.rdb.operator.builder.fragments.PrepareSqlFragments; -import org.hswebframework.ezorm.rdb.operator.builder.fragments.SqlFragments; +import org.hswebframework.ezorm.rdb.operator.builder.fragments.*; import org.hswebframework.ezorm.rdb.operator.builder.fragments.query.QuerySqlBuilder; import org.hswebframework.ezorm.rdb.operator.dml.query.QueryOperatorParameter; import org.hswebframework.ezorm.rdb.operator.dml.query.SelectColumn; @@ -67,35 +64,34 @@ protected Optional groupBy(QueryOperatorParameter parameter, Table if (CollectionUtils.isEmpty(groupBy)) { return Optional.empty(); } - PrepareSqlFragments sql = PrepareSqlFragments.of(); - + BatchSqlFragments sql = new BatchSqlFragments(groupBy.size() * 2 - 1, 0); + int idx = 0; for (SelectColumn column : groupBy) { + if (idx++ > 0) { + sql.add(SqlFragments.COMMA); + } if (column instanceof NativeSql) { sql.addSql(((NativeSql) column).getSql()) .addParameter(((NativeSql) column).getParameters()); } else { - RDBColumnMetadata columnMetadata = metadata - .getColumn(column.getColumn()) - .orElseThrow(() -> new IllegalArgumentException("unknown column " + column.getColumn())); - String fullName = columnMetadata.getFullName(); + RDBColumnMetadata columnMetadata = metadata.getColumnNow(column.getColumn()); + String fullName = columnMetadata.getFullName(); String function = column.getFunction(); if (function != null) { - sql.addFragments( - metadata - .findFeature(createFeatureId(function)) - .map(fragment -> fragment.create(fullName, columnMetadata, column)) - .filter(SqlFragments::isNotEmpty) - .orElseThrow(() -> new UnsupportedOperationException("unsupported function:" + column)) - ); + SqlFragments func = metadata + .findFeatureNow(createFeatureId(function)) + .create(fullName, columnMetadata, column); + if (func.isEmpty()) { + throw new UnsupportedOperationException("unsupported function:" + function); + } + sql.add(func); } else { sql.addSql(fullName); } } - sql.addSql(","); } - sql.removeLastSql(); return Optional.of(sql); } diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/QueryOperator.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/QueryOperator.java index d0de7afb..4cedb367 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/QueryOperator.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/QueryOperator.java @@ -260,7 +260,6 @@ public final QueryOperator orderBy(SortOrderSupplier... operators) { public abstract QueryOperator orderBy(SortOrder... operators); - //todo 暂未支持 public abstract QueryOperator groupBy(Operator... operators); public abstract QueryOperator groupBy(SelectColumn... operators); diff --git a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/query/QueryOperatorParameter.java b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/query/QueryOperatorParameter.java index c3f56dc4..4b63ca4b 100644 --- a/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/query/QueryOperatorParameter.java +++ b/hsweb-easy-orm-rdb/src/main/java/org/hswebframework/ezorm/rdb/operator/dml/query/QueryOperatorParameter.java @@ -20,13 +20,13 @@ public class QueryOperatorParameter { private String fromAlias; - private List where = new ArrayList<>(); + private List where = new ArrayList<>(5); private List joins = new ArrayList<>(); - private List orderBy = new ArrayList<>(); + private List orderBy = new ArrayList<>(2); - private List groupBy = new ArrayList<>(); + private List groupBy = new ArrayList<>(2); private List having = new ArrayList<>();