Skip to content

Commit

Permalink
[improve](meta) serialize meta via gson part3
Browse files Browse the repository at this point in the history
Switch meta serialization to gson
Contains the following classes:

Database
SmallFileMgr$SmallFile
catalog.Table
loadv2.LoadJob
loadv2.LoadJobFinalOperation
RestoreJob
BackupJob

Co-authored-by: iszhangpch
  • Loading branch information
dataroaring committed Jun 21, 2024
1 parent ac7984e commit 14e7a64
Show file tree
Hide file tree
Showing 50 changed files with 674 additions and 753 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ public final class FeMetaVersion {
public static final int VERSION_134 = 134;
// For mate gson
public static final int VERSION_135 = 135;
// For mate gson
public static final int VERSION_136 = 136;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_135;
public static final int VERSION_CURRENT = VERSION_136;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
60 changes: 18 additions & 42 deletions fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
package org.apache.doris.backup;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

Expand Down Expand Up @@ -169,53 +170,28 @@ public void setTypeRead(boolean isTypeRead) {
public abstract Status updateRepo(Repository repo);

public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
job = new RestoreJob();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
job = new RestoreJob();
} else {
throw new IOException("Unknown job type: " + type.name());
}

job.setTypeRead(true);
job.readFields(in);
return job;
} else {
throw new IOException("Unknown job type: " + type.name());
return GsonUtils.GSON.fromJson(Text.readString(in), AbstractJob.class);
}

job.setTypeRead(true);
job.readFields(in);
return job;
}

@Override
public void write(DataOutput out) throws IOException {
// ATTN: must write type first
Text.writeString(out, type.name());

out.writeLong(repoId);
Text.writeString(out, label);
out.writeLong(jobId);
out.writeLong(dbId);
Text.writeString(out, dbName);

out.writeLong(createTime);
out.writeLong(finishedTime);
out.writeLong(timeoutMs);

if (!taskErrMsg.isEmpty()) {
out.writeBoolean(true);
// we only save at most 3 err msgs
int savedNum = Math.min(3, taskErrMsg.size());
out.writeInt(savedNum);
for (Map.Entry<Long, String> entry : taskErrMsg.entrySet()) {
if (savedNum == 0) {
break;
}
out.writeLong(entry.getKey());
Text.writeString(out, entry.getValue());
savedNum--;
}
Preconditions.checkState(savedNum == 0, savedNum);
} else {
out.writeBoolean(false);
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

@Deprecated
Expand Down
66 changes: 7 additions & 59 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
Expand All @@ -53,7 +55,6 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
Expand All @@ -63,7 +64,6 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
Expand Down Expand Up @@ -977,64 +977,12 @@ private String getBackupObjs() {
}

public static BackupJob read(DataInput in) throws IOException {
BackupJob job = new BackupJob();
job.readFields(in);
return job;
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);

// table refs
out.writeInt(tableRefs.size());
for (TableRef tblRef : tableRefs) {
tblRef.write(out);
}

// state
Text.writeString(out, state.name());

// times
out.writeLong(snapshotFinishedTime);
out.writeLong(snapshotUploadFinishedTime);

// snapshot info
out.writeInt(snapshotInfos.size());
for (SnapshotInfo info : snapshotInfos.values()) {
info.write(out);
}

// backup meta
if (backupMeta == null) {
out.writeBoolean(false);
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
BackupJob job = new BackupJob();
job.readFields(in);
return job;
} else {
out.writeBoolean(true);
backupMeta.write(out);
}

// No need to persist job info. It is generated then write to file

// metaInfoFilePath and jobInfoFilePath
if (Strings.isNullOrEmpty(localMetaInfoFilePath)) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, localMetaInfoFilePath);
}

if (Strings.isNullOrEmpty(localJobInfoFilePath)) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, localJobInfoFilePath);
}

// write properties
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
return GsonUtils.GSON.fromJson(Text.readString(in), BackupJob.class);
}
}

Expand Down
33 changes: 21 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static BackupMeta fromFile(String filePath, int metaVersion) throws IOExc
metaContext.setMetaVersion(metaVersion);
metaContext.setThreadLocalInfo();
try (DataInputStream dis = new DataInputStream(new FileInputStream(file))) {
BackupMeta backupMeta = BackupMeta.read(dis);
BackupMeta backupMeta = BackupMeta.read(dis, metaVersion);
return backupMeta;
} finally {
MetaContext.remove();
Expand All @@ -110,22 +110,31 @@ public boolean compatibleWith(BackupMeta other) {
return false;
}

public static BackupMeta read(DataInput in, int metaVersion) throws IOException {
if (metaVersion < FeMetaVersion.VERSION_136) {
BackupMeta backupMeta = new BackupMeta();
backupMeta.readFields(in);
return backupMeta;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, BackupMeta.class);
}
}

public static BackupMeta read(DataInput in) throws IOException {
BackupMeta backupMeta = new BackupMeta();
backupMeta.readFields(in);
return backupMeta;
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
BackupMeta backupMeta = new BackupMeta();
backupMeta.readFields(in);
return backupMeta;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, BackupMeta.class);
}
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(tblNameMap.size());
for (Table table : tblNameMap.values()) {
table.write(out);
}
out.writeInt(resourceNameMap.size());
for (Resource resource : resourceNameMap.values()) {
resource.write(out);
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

public void readFields(DataInput in) throws IOException {
Expand Down
84 changes: 7 additions & 77 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.gson.GsonPostProcessable;
// import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
Expand Down Expand Up @@ -100,7 +100,6 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -2097,85 +2096,16 @@ private void setTableStateToNormal(Database db, boolean committed, boolean isRep
}
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);

Text.writeString(out, backupTimestamp);
jobInfo.write(out);
out.writeBoolean(allowLoad);

Text.writeString(out, state.name());

if (backupMeta != null) {
out.writeBoolean(true);
backupMeta.write(out);
public static RestoreJob read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
RestoreJob job = new RestoreJob();
job.readFields(in);
return job;
} else {
out.writeBoolean(false);
}

fileMapping.write(out);

out.writeLong(metaPreparedTime);
out.writeLong(snapshotFinishedTime);
out.writeLong(downloadFinishedTime);

replicaAlloc.write(out);

out.writeInt(restoredPartitions.size());
for (Pair<String, Partition> entry : restoredPartitions) {
Text.writeString(out, entry.first);
entry.second.write(out);
}

out.writeInt(restoredTbls.size());
for (Table tbl : restoredTbls) {
tbl.write(out);
}

out.writeInt(restoredVersionInfo.rowKeySet().size());
for (long tblId : restoredVersionInfo.rowKeySet()) {
out.writeLong(tblId);
out.writeInt(restoredVersionInfo.row(tblId).size());
for (Map.Entry<Long, Long> entry : restoredVersionInfo.row(tblId).entrySet()) {
out.writeLong(entry.getKey());
out.writeLong(entry.getValue());
// It is version hash in the past,
// but it useless but should compatible with old version so that write 0 here
out.writeLong(0L);
}
}

out.writeInt(snapshotInfos.rowKeySet().size());
for (long tabletId : snapshotInfos.rowKeySet()) {
out.writeLong(tabletId);
Map<Long, SnapshotInfo> map = snapshotInfos.row(tabletId);
out.writeInt(map.size());
for (Map.Entry<Long, SnapshotInfo> entry : map.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
}
}

out.writeInt(restoredResources.size());
for (Resource resource : restoredResources) {
resource.write(out);
}

// write properties
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
return GsonUtils.GSON.fromJson(Text.readString(in), RestoreJob.class);
}
}

public static RestoreJob read(DataInput in) throws IOException {
RestoreJob job = new RestoreJob();
job.readFields(in);
return job;
}

@Deprecated
@Override
public void readFields(DataInput in) throws IOException {
Expand Down
Loading

0 comments on commit 14e7a64

Please sign in to comment.