Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: 优化批量新增保存时重复数据的处理逻辑:合并相同的数据. #56

Merged
merged 1 commit into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,8 @@ public Mono<SaveResult> save(Publisher<E> data) {
public Mono<Integer> insert(Publisher<E> data) {
return Flux
.from(data)
.flatMap(e -> doInsert(e).reactive().as(this::setupLogger))
.reduce(Math::addExact)
.defaultIfEmpty(0);
.buffer(100)
.as(this::insertBatch);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,60 @@ protected void initMapping(Class<E> entityType) {
defaultContextKeyValue.add(MappingContextKeys.columnMapping(mapping));
}

protected Collection<E> tryMergeDuplicate(Collection<E> data) {
if (data.isEmpty()) {
return data;
}
Map<Object, E> merging = new HashMap<>(data.size());
List<E> merged = new ArrayList<>(data.size());
for (E datum : data) {
Object id = getProperty(datum, getIdColumn());
if (id == null) {
merged.add(datum);
} else {
merging.compute(id, (_id, old) -> {
if (old != null) {
return merge(old, datum);
}
return datum;
});
}
}
merged.addAll(merging.values());
return merged;
}

protected E merge(E older, E newer) {
ObjectPropertyOperator opt = GlobalConfig.getPropertyOperator();
for (String property : getProperties()) {
Object newerVal = opt.getProperty(newer, property).orElse(null);
if (newerVal != null) {
continue;
}
opt.getProperty(older, property)
.ifPresent(olderValue -> opt.setProperty(newer, property, olderValue));

}
return newer;
}

private Object getProperty(E data, String property) {
return GlobalConfig
.getPropertyOperator()
.getProperty(data, property)
.orElse(null);
}

protected SaveResultOperator doSave(Collection<E> data) {
Collection<E> _data = tryMergeDuplicate(data);
RDBTableMetadata table = getTable();
UpsertOperator upsert = operator.dml().upsert(table.getFullName());

return EventResultOperator.create(
() -> {
upsert.columns(getProperties());
List<String> ignore = new ArrayList<>();
for (E e : data) {
for (E e : _data) {
upsert.values(Stream.of(getProperties())
.map(property -> getInsertColumnValue(e, property, (prop, val) -> ignore.add(prop)))
.toArray());
Expand All @@ -134,7 +179,7 @@ protected SaveResultOperator doSave(Collection<E> data) {
table,
MappingEventTypes.save_before,
MappingEventTypes.save_after,
getDefaultContextKeyValue(instance(data),
getDefaultContextKeyValue(instance(_data),
type("batch"),
tableMetadata(table),
upsert(upsert))
Expand Down Expand Up @@ -176,7 +221,7 @@ private Object getInsertColumnValue(E data, String property, BiConsumer<String,
if (value != null) {
whenDefaultValue.accept(property, value);
//回填
if(!(value instanceof NativeSql)){
if (!(value instanceof NativeSql)) {
GlobalConfig.getPropertyOperator().setProperty(data, property, value);
}
}
Expand All @@ -191,14 +236,15 @@ private Object getInsertColumnValue(E data, String property) {
}

protected InsertResultOperator doInsert(Collection<E> batch) {
Collection<E> _data = tryMergeDuplicate(batch);
RDBTableMetadata table = getTable();
InsertOperator insert = operator.dml().insert(table.getFullName());

return EventResultOperator.create(
() -> {
insert.columns(getProperties());

for (E e : batch) {
for (E e : _data) {
insert.values(Stream.of(getProperties())
.map(property -> getInsertColumnValue(e, property))
.toArray());
Expand All @@ -210,7 +256,7 @@ protected InsertResultOperator doInsert(Collection<E> batch) {
MappingEventTypes.insert_before,
MappingEventTypes.insert_after,
getDefaultContextKeyValue(
instance(batch),
instance(_data),
type("batch"),
tableMetadata(table),
insert(insert))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,48 @@ public void testReactivePager() {

}

@Test
public void testInsertMerge(){

BasicTestEntity first= BasicTestEntity
.builder()
.id("test_merge")
.balance(1000L)
.name("first")
.createTime(new Date())
.tags(Arrays.asList("a", "b", "c", "d"))
.state((byte) 1)
.stateEnum(StateEnum.enabled)
.build();

BasicTestEntity second= BasicTestEntity
.builder()
.id("test_merge")
.balance(1000L)
.name("second")
.createTime(new Date())
.tags(Arrays.asList("a", "b", "c", "d"))
.state((byte) 1)
.stateEnum(StateEnum.enabled)
.build();

repository
.insert(Flux.just(first,second))
.as(StepVerifier::create)
.expectNext(1)
.verifyComplete();

repository
.createQuery()
.where(BasicTestEntity::getId,first.getId())
.select("id","name")
.fetch()
.map(BasicTestEntity::getName)
.as(StepVerifier::create)
.expectNext(second.getName())
.verifyComplete();
}

@Test
public void testInsertDuplicate() {
//10次insert
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void test() {



sqlExecutor.execute(SqlRequests.of("create index test_index on test_index_parser (age)"));
sqlExecutor.execute(SqlRequests.of("create index test_index_0 on test_index_parser (age)"));

sqlExecutor.execute(SqlRequests.of("create unique index test_index_2 on test_index_parser (name)"));

Expand Down
Loading