Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Jun 20, 2024
1 parent 9a9c371 commit fc14d93
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 23 deletions.
35 changes: 35 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;
Expand Down Expand Up @@ -211,6 +212,40 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

// for restore job
public void writeFields(DataOutput out) throws IOException {
// ATTN: must write type first
Text.writeString(out, type.name());

out.writeLong(repoId);
Text.writeString(out, label);
out.writeLong(jobId);
out.writeLong(dbId);
Text.writeString(out, dbName);

out.writeLong(createTime);
out.writeLong(finishedTime);
out.writeLong(timeoutMs);

if (!taskErrMsg.isEmpty()) {
out.writeBoolean(true);
// we only save at most 3 err msgs
int savedNum = Math.min(3, taskErrMsg.size());
out.writeInt(savedNum);
for (Map.Entry<Long, String> entry : taskErrMsg.entrySet()) {
if (savedNum == 0) {
break;
}
out.writeLong(entry.getKey());
Text.writeString(out, entry.getValue());
savedNum--;
}
Preconditions.checkState(savedNum == 0, savedNum);
} else {
out.writeBoolean(false);
}
}

@Deprecated
public void readFields(DataInput in) throws IOException {
if (!isTypeRead) {
Expand Down
86 changes: 63 additions & 23 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -2097,37 +2097,77 @@ private void setTableStateToNormal(Database db, boolean committed, boolean isRep
}
}

@Override
public void write(DataOutput out) throws IOException {
// ATTN: must write type first
Text.writeString(out, type.name());
super.writeFields(out);

out.writeLong(repoId);
Text.writeString(out, label);
out.writeLong(jobId);
out.writeLong(dbId);
Text.writeString(out, dbName);
Text.writeString(out, backupTimestamp);
jobInfo.write(out);
out.writeBoolean(allowLoad);

out.writeLong(createTime);
out.writeLong(finishedTime);
out.writeLong(timeoutMs);
Text.writeString(out, state.name());

if (!taskErrMsg.isEmpty()) {
if (backupMeta != null) {
out.writeBoolean(true);
// we only save at most 3 err msgs
int savedNum = Math.min(3, taskErrMsg.size());
out.writeInt(savedNum);
for (Map.Entry<Long, String> entry : taskErrMsg.entrySet()) {
if (savedNum == 0) {
break;
}
out.writeLong(entry.getKey());
Text.writeString(out, entry.getValue());
savedNum--;
}
Preconditions.checkState(savedNum == 0, savedNum);
backupMeta.write(out);
} else {
out.writeBoolean(false);
}

fileMapping.write(out);

out.writeLong(metaPreparedTime);
out.writeLong(snapshotFinishedTime);
out.writeLong(downloadFinishedTime);

replicaAlloc.write(out);

out.writeInt(restoredPartitions.size());
for (Pair<String, Partition> entry : restoredPartitions) {
Text.writeString(out, entry.first);
entry.second.write(out);
}

out.writeInt(restoredTbls.size());
for (Table tbl : restoredTbls) {
tbl.write(out);
}

out.writeInt(restoredVersionInfo.rowKeySet().size());
for (long tblId : restoredVersionInfo.rowKeySet()) {
out.writeLong(tblId);
out.writeInt(restoredVersionInfo.row(tblId).size());
for (Map.Entry<Long, Long> entry : restoredVersionInfo.row(tblId).entrySet()) {
out.writeLong(entry.getKey());
out.writeLong(entry.getValue());
// It is version hash in the past,
// but it useless but should compatible with old version so that write 0 here
out.writeLong(0L);
}
}

out.writeInt(snapshotInfos.rowKeySet().size());
for (long tabletId : snapshotInfos.rowKeySet()) {
out.writeLong(tabletId);
Map<Long, SnapshotInfo> map = snapshotInfos.row(tabletId);
out.writeInt(map.size());
for (Map.Entry<Long, SnapshotInfo> entry : map.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
}
}

out.writeInt(restoredResources.size());
for (Resource resource : restoredResources) {
resource.write(out);
}

// write properties
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}

public static RestoreJob read(DataInput in) throws IOException {
Expand Down

0 comments on commit fc14d93

Please sign in to comment.