From a08d922bccd78825e451bd9fc88de34db7209fcb Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Thu, 20 Jun 2024 14:34:12 +0800 Subject: [PATCH] fix --- .../org/apache/doris/load/loadv2/LoadJob.java | 5 + .../apache/doris/load/loadv2/LoadManager.java | 103 +++++++----------- 2 files changed, 42 insertions(+), 66 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 8b72574fe67006f..f0f3eeafce649f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -1029,6 +1029,11 @@ public boolean equals(Object obj) { && this.jobType.equals(other.jobType); } + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + public void readFields(DataInput in) throws IOException { if (!isJobTypeRead) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 04663a4338a0e10..9d7c0f3bc778378 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -33,14 +33,12 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.Pair; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.UserException; -import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; @@ -51,7 +49,6 @@ import org.apache.doris.load.Load; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.CleanLabelOperationLog; -import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.thrift.TUniqueId; @@ -61,7 +58,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.gson.annotations.SerializedName; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.StopWatch; @@ -101,7 +97,6 @@ public class LoadManager implements Writable { private static final Logger LOG = LogManager.getLogger(LoadManager.class); - @SerializedName("itlj") protected Map idToLoadJob = Maps.newConcurrentMap(); protected Map>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap(); protected LoadJobScheduler loadJobScheduler; @@ -934,75 +929,51 @@ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId @Override public void write(DataOutput out) throws IOException { long currentTimeMs = System.currentTimeMillis(); - idToLoadJob.entrySet().removeIf(entry -> entry.getValue().isExpired(currentTimeMs) - || entry.getValue() instanceof MiniLoadJob); - Text.writeString(out, GsonUtils.GSON.toJson(this)); + List loadJobs = + idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)) + .filter(t -> !(t instanceof MiniLoadJob)).collect(Collectors.toList()); + + out.writeInt(loadJobs.size()); + for (LoadJob loadJob : loadJobs) { + loadJob.write(out); + } } /** * Read from file. **/ public void readFields(DataInput in) throws IOException { - if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_136) { - LoadManager loadManager = GsonUtils.GSON.fromJson(Text.readString(in), LoadManager.class); - long currentTimeMs = System.currentTimeMillis(); - loadManager.idToLoadJob.entrySet().stream() - .filter(entry -> entry.getValue().getJobType() != EtlJobType.MINI) - .filter(entry -> !entry.getValue().isExpired(currentTimeMs)) - .forEach(entry -> { - idToLoadJob.put(entry.getKey(), entry.getValue()); - Map> map = dbIdToLabelToLoadJobs.get(entry.getValue().getDbId()); - if (map == null) { - map = Maps.newConcurrentMap(); - dbIdToLabelToLoadJobs.put(entry.getValue().getDbId(), map); - } - List jobs = map.get(entry.getValue().getLabel()); - if (jobs == null) { - jobs = Lists.newArrayList(); - map.put(entry.getValue().getLabel(), jobs); - } - jobs.add(entry.getValue()); - // The callback of load job which is replayed by image need to be registered - // in callback factory. The commit and visible txn will callback the unfinished load job. - // Otherwise, the load job always does not be completed while the txn is visible. - if (!entry.getValue().isCompleted()) { - Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(entry.getValue()); - } - }); - - } else { - long currentTimeMs = System.currentTimeMillis(); - int size = in.readInt(); - for (int i = 0; i < size; i++) { - LoadJob loadJob = LoadJob.read(in); - if (loadJob.isExpired(currentTimeMs)) { - continue; - } + long currentTimeMs = System.currentTimeMillis(); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + LoadJob loadJob = LoadJob.read(in); + if (loadJob.isExpired(currentTimeMs)) { + continue; + } - if (loadJob.getJobType() == EtlJobType.MINI) { - LOG.warn("skip mini load job {} in db {} as it is no longer supported", loadJob.getId(), - loadJob.getDbId()); - continue; - } - idToLoadJob.put(loadJob.getId(), loadJob); - Map> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId()); - if (map == null) { - map = Maps.newConcurrentMap(); - dbIdToLabelToLoadJobs.put(loadJob.getDbId(), map); - } + if (loadJob.getJobType() == EtlJobType.MINI) { + LOG.warn("skip mini load job {} in db {} as it is no longer supported", loadJob.getId(), + loadJob.getDbId()); + continue; + } + idToLoadJob.put(loadJob.getId(), loadJob); + Map> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId()); + if (map == null) { + map = Maps.newConcurrentMap(); + dbIdToLabelToLoadJobs.put(loadJob.getDbId(), map); + } - List jobs = map.get(loadJob.getLabel()); - if (jobs == null) { - jobs = Lists.newArrayList(); - map.put(loadJob.getLabel(), jobs); - } - jobs.add(loadJob); - // The callback of load job which is replayed by image need to be registered in callback factory. - // The commit and visible txn will callback the unfinished load job. - // Otherwise, the load job always does not be completed while the txn is visible. - if (!loadJob.isCompleted()) { - Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); - } + List jobs = map.get(loadJob.getLabel()); + if (jobs == null) { + jobs = Lists.newArrayList(); + map.put(loadJob.getLabel(), jobs); + } + jobs.add(loadJob); + // The callback of load job which is replayed by image need to be registered in callback factory. + // The commit and visible txn will callback the unfinished load job. + // Otherwise, the load job always does not be completed while the txn is visible. + if (!loadJob.isCompleted()) { + Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob); } } }