Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Jun 20, 2024
1 parent 96b8bd5 commit a08d922
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1029,6 +1029,11 @@ public boolean equals(Object obj) {
&& this.jobType.equals(other.jobType);
}

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

public void readFields(DataInput in) throws IOException {

if (!isJobTypeRead) {
Expand Down
103 changes: 37 additions & 66 deletions fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,12 @@
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
Expand All @@ -51,7 +49,6 @@
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.CleanLabelOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.thrift.TUniqueId;
Expand All @@ -61,7 +58,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
Expand Down Expand Up @@ -101,7 +97,6 @@
public class LoadManager implements Writable {
private static final Logger LOG = LogManager.getLogger(LoadManager.class);

@SerializedName("itlj")
protected Map<Long, LoadJob> idToLoadJob = Maps.newConcurrentMap();
protected Map<Long, Map<String, List<LoadJob>>> dbIdToLabelToLoadJobs = Maps.newConcurrentMap();
protected LoadJobScheduler loadJobScheduler;
Expand Down Expand Up @@ -934,75 +929,51 @@ public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId
@Override
public void write(DataOutput out) throws IOException {
long currentTimeMs = System.currentTimeMillis();
idToLoadJob.entrySet().removeIf(entry -> entry.getValue().isExpired(currentTimeMs)
|| entry.getValue() instanceof MiniLoadJob);
Text.writeString(out, GsonUtils.GSON.toJson(this));
List<LoadJob> loadJobs =
idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs))
.filter(t -> !(t instanceof MiniLoadJob)).collect(Collectors.toList());

out.writeInt(loadJobs.size());
for (LoadJob loadJob : loadJobs) {
loadJob.write(out);
}
}

/**
* Read from file.
**/
public void readFields(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_136) {
LoadManager loadManager = GsonUtils.GSON.fromJson(Text.readString(in), LoadManager.class);
long currentTimeMs = System.currentTimeMillis();
loadManager.idToLoadJob.entrySet().stream()
.filter(entry -> entry.getValue().getJobType() != EtlJobType.MINI)
.filter(entry -> !entry.getValue().isExpired(currentTimeMs))
.forEach(entry -> {
idToLoadJob.put(entry.getKey(), entry.getValue());
Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(entry.getValue().getDbId());
if (map == null) {
map = Maps.newConcurrentMap();
dbIdToLabelToLoadJobs.put(entry.getValue().getDbId(), map);
}
List<LoadJob> jobs = map.get(entry.getValue().getLabel());
if (jobs == null) {
jobs = Lists.newArrayList();
map.put(entry.getValue().getLabel(), jobs);
}
jobs.add(entry.getValue());
// The callback of load job which is replayed by image need to be registered
// in callback factory. The commit and visible txn will callback the unfinished load job.
// Otherwise, the load job always does not be completed while the txn is visible.
if (!entry.getValue().isCompleted()) {
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(entry.getValue());
}
});

} else {
long currentTimeMs = System.currentTimeMillis();
int size = in.readInt();
for (int i = 0; i < size; i++) {
LoadJob loadJob = LoadJob.read(in);
if (loadJob.isExpired(currentTimeMs)) {
continue;
}
long currentTimeMs = System.currentTimeMillis();
int size = in.readInt();
for (int i = 0; i < size; i++) {
LoadJob loadJob = LoadJob.read(in);
if (loadJob.isExpired(currentTimeMs)) {
continue;
}

if (loadJob.getJobType() == EtlJobType.MINI) {
LOG.warn("skip mini load job {} in db {} as it is no longer supported", loadJob.getId(),
loadJob.getDbId());
continue;
}
idToLoadJob.put(loadJob.getId(), loadJob);
Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
if (map == null) {
map = Maps.newConcurrentMap();
dbIdToLabelToLoadJobs.put(loadJob.getDbId(), map);
}
if (loadJob.getJobType() == EtlJobType.MINI) {
LOG.warn("skip mini load job {} in db {} as it is no longer supported", loadJob.getId(),
loadJob.getDbId());
continue;
}
idToLoadJob.put(loadJob.getId(), loadJob);
Map<String, List<LoadJob>> map = dbIdToLabelToLoadJobs.get(loadJob.getDbId());
if (map == null) {
map = Maps.newConcurrentMap();
dbIdToLabelToLoadJobs.put(loadJob.getDbId(), map);
}

List<LoadJob> jobs = map.get(loadJob.getLabel());
if (jobs == null) {
jobs = Lists.newArrayList();
map.put(loadJob.getLabel(), jobs);
}
jobs.add(loadJob);
// The callback of load job which is replayed by image need to be registered in callback factory.
// The commit and visible txn will callback the unfinished load job.
// Otherwise, the load job always does not be completed while the txn is visible.
if (!loadJob.isCompleted()) {
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
}
List<LoadJob> jobs = map.get(loadJob.getLabel());
if (jobs == null) {
jobs = Lists.newArrayList();
map.put(loadJob.getLabel(), jobs);
}
jobs.add(loadJob);
// The callback of load job which is replayed by image need to be registered in callback factory.
// The commit and visible txn will callback the unfinished load job.
// Otherwise, the load job always does not be completed while the txn is visible.
if (!loadJob.isCompleted()) {
Env.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
}
}
}
Expand Down

0 comments on commit a08d922

Please sign in to comment.