Skip to content

Commit

Permalink
consumer按照order排序
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Dec 14, 2024
1 parent c9b5c56 commit c3ec4f4
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package io.nop.batch.core;

public interface BatchConstants {
String VAR_RECORD = "record";
String VAR_ITEM = "item";

String VAR_BATCH_TASK_CTX = "batchTaskCtx";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
import io.nop.batch.core.IBatchLoaderProvider;
import io.nop.batch.core.IBatchTaskContext;

import java.util.function.Function;
import java.util.function.BiFunction;

public class AdaptedBatchLoaderProvider<S> implements IBatchLoaderProvider<S> {
private final Function<IBatchLoader<S>, IBatchLoader<S>> adapter;
private final BiFunction<IBatchLoader<S>, IBatchTaskContext, IBatchLoader<S>> adapter;
private final IBatchLoaderProvider<S> provider;

public AdaptedBatchLoaderProvider(Function<IBatchLoader<S>, IBatchLoader<S>> adapter, IBatchLoaderProvider<S> provider) {
public AdaptedBatchLoaderProvider(BiFunction<IBatchLoader<S>, IBatchTaskContext, IBatchLoader<S>> adapter,
IBatchLoaderProvider<S> provider) {
this.adapter = adapter;
this.provider = provider;
}

@Override
public IBatchLoader<S> setup(IBatchTaskContext context) {
IBatchLoader<S> loader = provider.setup(context);
return adapter.apply(loader);
return adapter.apply(loader, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
*/
package io.nop.batch.core.split;

import io.nop.api.core.convert.ConvertHelper;
import io.nop.api.core.exceptions.NopException;
import io.nop.batch.core.BatchConstants;
import io.nop.core.lang.eval.EvalExprProvider;
import io.nop.core.lang.eval.IEvalAction;
import io.nop.core.lang.eval.IEvalScope;
import io.nop.dataset.record.IRecordTagger;

import java.util.Collection;
import java.util.Collections;

public class ExprRecordTagger<T, C> implements IRecordTagger<T, C> {
private final IEvalAction expr;
Expand All @@ -26,14 +27,11 @@ public ExprRecordTagger(IEvalAction expr) {
@Override
public Collection<String> getTags(T record, C context) {
IEvalScope scope = EvalExprProvider.newEvalScope();
scope.setLocalValue(null, BatchConstants.VAR_RECORD, record);
scope.setLocalValue(null, BatchConstants.VAR_ITEM, record);
Object value = expr.invoke(scope);
if (value == null)
return null;

if (value instanceof Collection)
return ((Collection<String>) value);

return Collections.singletonList(value.toString());
return ConvertHelper.toCsvSet(value, NopException::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.nop.batch.orm.support.OrmBatchRecordSnapshotBuilder;
import io.nop.commons.collections.OrderByComparator;
import io.nop.commons.concurrent.executor.GlobalExecutors;
import io.nop.commons.util.CollectionHelper;
import io.nop.commons.util.MathHelper;
import io.nop.commons.util.retry.IRetryPolicy;
import io.nop.core.lang.eval.IEvalAction;
Expand Down Expand Up @@ -213,14 +212,18 @@ private void buildTask(BatchTaskBuilder<Object, Object> builder, IBeanProvider b
IRecordTagger<Object, IBatchChunkContext> tagger = getTagger(beanContainer);
IRecordSplitter<Object, Object, IBatchChunkContext> splitter = tagger == null ? null : new RecordTagSplitter<>(tagger);

if (batchTaskModel.getConsumers().size() == 1) {
List<BatchConsumerModel> consumers = new ArrayList<>(batchTaskModel.getConsumers());
consumers.sort(BatchConsumerModel::compareTo);

if (consumers.size() == 1 && splitter == null) {
IBatchConsumerProvider<Object> writer = getWriter(batchTaskModel.getConsumers().get(0), beanContainer);
builder.consumer(writer);
} else {
Map<String, List<IBatchConsumerProvider<Object>>> map = new HashMap<>();
for (BatchConsumerModel consumerModel : batchTaskModel.getConsumers()) {
for (BatchConsumerModel consumerModel : consumers) {
IBatchConsumerProvider<Object> writer = getWriter(consumerModel, beanContainer);
map.computeIfAbsent(consumerModel.getForTag(), k -> new ArrayList<>()).add(writer);
String tag = consumerModel.getForTag();
map.computeIfAbsent(tag, k -> new ArrayList<>()).add(writer);
}

List<IBatchConsumerProvider<Object>> list = map.remove(null);
Expand All @@ -233,8 +236,8 @@ private void buildTask(BatchTaskBuilder<Object, Object> builder, IBeanProvider b

if (splitter != null) {
Map<String, IBatchConsumerProvider<Object>> consumerMap = new HashMap<>();
map.forEach((name, consumers) -> {
IBatchConsumerProvider<Object> writer = MultiBatchConsumerProvider.fromList(consumers);
map.forEach((name, consumerList) -> {
IBatchConsumerProvider<Object> writer = MultiBatchConsumerProvider.fromList(consumerList);
consumerMap.put(name, writer);
});

Expand Down Expand Up @@ -480,8 +483,8 @@ private IRecordTagger<Object, IBatchChunkContext> getTagger(IBeanProvider beanCo

if (taggerModel.getSource() != null)
return (record, ctx) ->
CollectionHelper.toCollection(taggerModel.getSource().call2(null,
record, ctx, ctx.getEvalScope()), true);
ConvertHelper.toCsvSet(taggerModel.getSource().call2(null,
record, ctx, ctx.getEvalScope()), NopException::new);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

import io.nop.batch.dsl.model._gen._BatchConsumerModel;

public class BatchConsumerModel extends _BatchConsumerModel{
public BatchConsumerModel(){
public class BatchConsumerModel extends _BatchConsumerModel implements Comparable<BatchConsumerModel> {
public BatchConsumerModel() {

}

@Override
public int compareTo(BatchConsumerModel o) {
return Integer.compare(getOrder(), o.getOrder());
}
}
4 changes: 4 additions & 0 deletions nop-xdefs/src/main/resources/_vfs/nop/schema/task/batch.xdef
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,14 @@
<adapter xdef:value="xpl-fn:(processor)=>any"/>
</processor>

<!-- 选择每一个item所对应的consumer,返回tag列表。 -->
<tagger bean="bean-name" xdef:name="BatchTaggerModel">
<source xdef:value="xpl-fn:(item,batchChunkCtx)=>Collection"/>
</tagger>

<!--
@forTag 用于匹配tagger所返回的标签。如果没有设置,则表示不受tagger匹配影响,总是消费item
-->
<consumer bean="bean-name" name="!var-name" order="!int=0" forTag="string"
xdef:unique-attr="name" xdef:name="BatchConsumerModel" xdef:ref="BatchListenersModel"
aggregator="bean-name" metaProvider="bean-name">
Expand Down

0 comments on commit c3ec4f4

Please sign in to comment.