Skip to content

Commit

Permalink
[improvement](meta) seriaze meta via gson part2
Browse files Browse the repository at this point in the history
iszhangpch handles most of work.
dataroaring handles upgrading mtmv by introducing GsonUtils134.
  • Loading branch information
dataroaring committed Jun 18, 2024
1 parent ab18c0f commit 5b8f573
Show file tree
Hide file tree
Showing 14 changed files with 1,143 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ public final class FeMetaVersion {
public static final int VERSION_133 = 133;
// For mate gson
public static final int VERSION_134 = 134;
// For mate gson
public static final int VERSION_135 = 135;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_134;
public static final int VERSION_CURRENT = VERSION_135;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

package org.apache.doris.alter;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -32,6 +38,7 @@
*/
public class BatchAlterJobPersistInfo implements Writable {

@SerializedName("l")
private List<AlterJobV2> alterJobV2List;

public BatchAlterJobPersistInfo(List<AlterJobV2> alterJobV2List) {
Expand All @@ -40,19 +47,20 @@ public BatchAlterJobPersistInfo(List<AlterJobV2> alterJobV2List) {

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(alterJobV2List.size());
for (AlterJobV2 alterJobV2 : alterJobV2List) {
alterJobV2.write(out);
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

public static BatchAlterJobPersistInfo read(DataInput in) throws IOException {
int size = in.readInt();
List<AlterJobV2> alterJobV2List = new ArrayList<>();
for (int i = 0; i < size; i++) {
alterJobV2List.add(AlterJobV2.read(in));
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_135) {
int size = in.readInt();
List<AlterJobV2> alterJobV2List = new ArrayList<>();
for (int i = 0; i < size; i++) {
alterJobV2List.add(AlterJobV2.read(in));
}
return new BatchAlterJobPersistInfo(alterJobV2List);
} else {
return GsonUtils.GSON.fromJson(Text.readString(in), BatchAlterJobPersistInfo.class);
}
return new BatchAlterJobPersistInfo(alterJobV2List);
}

public List<AlterJobV2> getAlterJobV2List() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -84,13 +85,15 @@
*/
public class TableRef implements ParseNode, Writable {
private static final Logger LOG = LogManager.getLogger(TableRef.class);
@SerializedName("n")
protected TableName name;
// Legal aliases of this table ref. Contains the explicit alias as its sole element if
// there is one. Otherwise, contains the two implicit aliases. Implicit aliases are set
// in the c'tor of the corresponding resolved table ref (subclasses of TableRef) during
// analysis. By convention, for table refs with multiple implicit aliases, aliases_[0]
// contains the fully-qualified implicit alias to ensure that aliases_[0] always
// uniquely identifies this table ref regardless of whether it has an explicit alias.
@SerializedName("a")
protected String[] aliases;
protected List<Long> sampleTabletIds;
// Indicates whether this table ref is given an explicit alias,
Expand Down Expand Up @@ -128,6 +131,7 @@ public class TableRef implements ParseNode, Writable {
protected List<TupleId> correlatedTupleIds = Lists.newArrayList();
// analysis output
protected TupleDescriptor desc;
@SerializedName("p")
private PartitionNames partitionNames = null;
private ArrayList<String> joinHints;
private ArrayList<String> sortHints;
Expand Down
87 changes: 46 additions & 41 deletions fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
package org.apache.doris.backup;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -43,12 +46,14 @@ public enum JobType {
BACKUP, RESTORE
}

@SerializedName("t")
protected JobType type;

// must be set right before job's running
protected Env env;
// repo will be set at first run()
protected Repository repo;
@SerializedName("rid")
protected long repoId;

/*
Expand All @@ -57,16 +62,23 @@ public enum JobType {
* And each time this method is called, the snapshot tasks will be sent with (maybe) different
* version and version hash. So we have to use different job id to identify the tasks in different batches.
*/
@SerializedName("jid")
protected long jobId = -1;

@SerializedName("l")
protected String label;
@SerializedName("dbid")
protected long dbId;
@SerializedName("dbn")
protected String dbName;

protected Status status = Status.OK;

@SerializedName("ct")
protected long createTime = -1;
@SerializedName("ft")
protected long finishedTime = -1;
@SerializedName("to")
protected long timeoutMs;

// task signature -> <finished num / total num>
Expand All @@ -75,6 +87,7 @@ public enum JobType {
protected boolean isTypeRead = false;

// save err msg of tasks
@SerializedName("msg")
protected Map<Long, String> taskErrMsg = Maps.newHashMap();

protected AbstractJob(JobType type) {
Expand Down Expand Up @@ -158,55 +171,47 @@ public void setTypeRead(boolean isTypeRead) {
public abstract Status updateRepo(Repository repo);

public static AbstractJob read(DataInput in) throws IOException {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
job = new RestoreJob();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_135) {
AbstractJob job = null;
JobType type = JobType.valueOf(Text.readString(in));
if (type == JobType.BACKUP) {
job = new BackupJob();
} else if (type == JobType.RESTORE) {
job = new RestoreJob();
} else {
throw new IOException("Unknown job type: " + type.name());
}

job.setTypeRead(true);
job.readFields(in);
return job;
} else {
throw new IOException("Unknown job type: " + type.name());
String json = Text.readString(in);
JsonObject jsonObject = GsonUtils.GSON.fromJson(json, JsonObject.class);
JobType type = JobType.valueOf(jsonObject.get("t").getAsString());
switch (type) {
case BACKUP:
return GsonUtils.GSON.fromJson(json, BackupJob.class);
case RESTORE:
return GsonUtils.GSON.fromJson(json, RestoreJob.class);
default:
throw new IOException("Unknown job type: " + type.name());
}
}

job.setTypeRead(true);
job.readFields(in);
return job;
}

@Override
public void write(DataOutput out) throws IOException {
// ATTN: must write type first
Text.writeString(out, type.name());

out.writeLong(repoId);
Text.writeString(out, label);
out.writeLong(jobId);
out.writeLong(dbId);
Text.writeString(out, dbName);

out.writeLong(createTime);
out.writeLong(finishedTime);
out.writeLong(timeoutMs);

if (!taskErrMsg.isEmpty()) {
out.writeBoolean(true);
// we only save at most 3 err msgs
int savedNum = Math.min(3, taskErrMsg.size());
out.writeInt(savedNum);
for (Map.Entry<Long, String> entry : taskErrMsg.entrySet()) {
if (savedNum == 0) {
break;
}
out.writeLong(entry.getKey());
Text.writeString(out, entry.getValue());
savedNum--;
}
Preconditions.checkState(savedNum == 0, savedNum);
} else {
out.writeBoolean(false);
if (taskErrMsg != null) {
Map<Long, String> newMsg = Maps.newHashMap();
taskErrMsg.entrySet().stream().limit(3)
.forEach(e -> newMsg.put(e.getKey(), e.getValue()));
taskErrMsg = newMsg;
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

@Deprecated
public void readFields(DataInput in) throws IOException {
if (!isTypeRead) {
type = JobType.valueOf(Text.readString(in));
Expand Down
77 changes: 18 additions & 59 deletions fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskExecutor;
Expand All @@ -53,16 +55,15 @@
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
Expand Down Expand Up @@ -92,18 +93,24 @@ public enum BackupJobState {
}

// all objects which need backup
@SerializedName("ref")
private List<TableRef> tableRefs = Lists.newArrayList();

@SerializedName("st")
private BackupJobState state;

@SerializedName("sft")
private long snapshotFinishedTime = -1;
@SerializedName("suft")
private long snapshotUploadFinishedTime = -1;

// save task id map to the backend it be executed
private Map<Long, Long> unfinishedTaskIds = Maps.newConcurrentMap();
// tablet id -> snapshot info
@SerializedName("si")
private Map<Long, SnapshotInfo> snapshotInfos = Maps.newConcurrentMap();
// save all related table[partition] info
@SerializedName("meta")
private BackupMeta backupMeta;
// job info file content
private BackupJobInfo jobInfo;
Expand All @@ -112,9 +119,12 @@ public enum BackupJobState {
// after job is done, this dir should be deleted
private Path localJobDirPath = null;
// save the local file path of meta info and job info file
@SerializedName("mifp")
private String localMetaInfoFilePath = null;
@SerializedName("jifp")
private String localJobInfoFilePath = null;
// backup properties && table commit seq with table id
@SerializedName("prop")
private Map<String, String> properties = Maps.newHashMap();

private byte[] metaInfoBytes = null;
Expand Down Expand Up @@ -967,67 +977,16 @@ private String getBackupObjs() {
}

public static BackupJob read(DataInput in) throws IOException {
BackupJob job = new BackupJob();
job.readFields(in);
return job;
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);

// table refs
out.writeInt(tableRefs.size());
for (TableRef tblRef : tableRefs) {
tblRef.write(out);
}

// state
Text.writeString(out, state.name());

// times
out.writeLong(snapshotFinishedTime);
out.writeLong(snapshotUploadFinishedTime);

// snapshot info
out.writeInt(snapshotInfos.size());
for (SnapshotInfo info : snapshotInfos.values()) {
info.write(out);
}

// backup meta
if (backupMeta == null) {
out.writeBoolean(false);
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_135) {
BackupJob job = new BackupJob();
job.readFields(in);
return job;
} else {
out.writeBoolean(true);
backupMeta.write(out);
}

// No need to persist job info. It is generated then write to file

// metaInfoFilePath and jobInfoFilePath
if (Strings.isNullOrEmpty(localMetaInfoFilePath)) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, localMetaInfoFilePath);
}

if (Strings.isNullOrEmpty(localJobInfoFilePath)) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, localJobInfoFilePath);
}

// write properties
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
return GsonUtils.GSON.fromJson(Text.readString(in), BackupJob.class);
}
}

@Deprecated
public void readFields(DataInput in) throws IOException {
super.readFields(in);

Expand Down
Loading

0 comments on commit 5b8f573

Please sign in to comment.