Skip to content

Commit

Permalink
[improvement](meta) Switch meta serialization to gson 4 (apache#36568)
Browse files Browse the repository at this point in the history
## Proposed changes

Issue Number:

Switch meta serialization to gson
Contains the following classes:

RoutineLoadJob
UserPropertyInfo
AnalyzeDeletionLog
CreateTableInfo
HbPackage
~~PartitionPersistInfo~~
ReplicaPersistInfo
RoutineLoadOperation

---------

Co-authored-by: zhangpeicheng <zhangpeicheng@meituan.com>
Co-authored-by: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 26, 2024
1 parent d39b8e1 commit 3a8dbe4
Show file tree
Hide file tree
Showing 29 changed files with 407 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,11 @@ public final class FeMetaVersion {
public static final int VERSION_135 = 135;
// For mate gson
public static final int VERSION_136 = 136;
// For mate gson
public static final int VERSION_137 = 137;

// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_136;
public static final int VERSION_CURRENT = VERSION_137;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import org.apache.doris.catalog.Column;

import com.google.common.base.Preconditions;
import com.google.gson.annotations.SerializedName;

public class ImportColumnDesc {
@SerializedName("cn")
private String columnName;
@SerializedName("expr")
private Expr expr;

public ImportColumnDesc(ImportColumnDesc other) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.thrift.TStorageBackendType;

import com.google.common.base.Strings;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;

import java.util.Map;
Expand Down Expand Up @@ -153,6 +154,7 @@ public enum StorageType {
STREAM("Stream load pipe"),
AZURE("MicroSoft Azure Blob");

@SerializedName("desc")
private final String description;

StorageType(String description) {
Expand Down
48 changes: 36 additions & 12 deletions fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Backend;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -91,7 +96,7 @@
* * __10023_seg2.dat.DNW231dnklawd
* * __10023.hdr.dnmwDDWI92dDko
*/
public class Repository implements Writable {
public class Repository implements Writable, GsonPostProcessable {
public static final String PREFIX_REPO = "__palo_repository_";
public static final String PREFIX_SNAPSHOT_DIR = "__ss_";
public static final String PREFIX_DB = "__db_";
Expand All @@ -110,19 +115,25 @@ public class Repository implements Writable {
private static final String PATH_DELIMITER = "/";
private static final String CHECKSUM_SEPARATOR = ".";

@SerializedName("id")
private long id;
@SerializedName("n")
private String name;
private String errMsg;
@SerializedName("ct")
private long createTime;

// If True, user can not backup data to this repo.
@SerializedName("iro")
private boolean isReadOnly;

// BOS location should start with "bos://your_bucket_name/"
// and the specified bucket should exist.
@SerializedName("lo")
private String location;

private RemoteFileSystem fileSystem;
@SerializedName("fs")
private PersistentFileSystem fileSystem;

private Repository() {
// for persist
Expand Down Expand Up @@ -184,9 +195,26 @@ public static String replaceFileNameWithChecksumFileName(String origPath, String
}

public static Repository read(DataInput in) throws IOException {
Repository repo = new Repository();
repo.readFields(in);
return repo;
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) {
Repository repo = new Repository();
repo.readFields(in);
return repo;
} else {
return GsonUtils.GSON.fromJson(Text.readString(in), Repository.class);
}
}

@Override
public void gsonPostProcess() {
StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
if (this.fileSystem.properties.containsKey(PersistentFileSystem.STORAGE_TYPE)) {
type = StorageBackend.StorageType.valueOf(
this.fileSystem.properties.get(PersistentFileSystem.STORAGE_TYPE));
this.fileSystem.properties.remove(PersistentFileSystem.STORAGE_TYPE);
}
this.fileSystem = FileSystemFactory.get(this.fileSystem.getName(),
type,
this.fileSystem.getProperties());
}

public long getId() {
Expand All @@ -209,7 +237,7 @@ public String getErrorMsg() {
return errMsg;
}

public RemoteFileSystem getRemoteFileSystem() {
public PersistentFileSystem getRemoteFileSystem() {
return fileSystem;
}

Expand Down Expand Up @@ -836,14 +864,10 @@ private String allocLocalFileSuffix() {

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(id);
Text.writeString(out, name);
out.writeBoolean(isReadOnly);
Text.writeString(out, location);
fileSystem.write(out);
out.writeLong(createTime);
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

@Deprecated
public void readFields(DataInput in) throws IOException {
id = in.readLong();
name = Text.readString(in);
Expand Down
36 changes: 25 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,38 @@

import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;

/*
* A manager to manage all backup repositories
*/
public class RepositoryMgr extends Daemon implements Writable {
public class RepositoryMgr extends Daemon implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(RepositoryMgr.class);

// all key should be in lower case
private Map<String, Repository> repoNameMap = Maps.newConcurrentMap();
private Map<Long, Repository> repoIdMap = Maps.newConcurrentMap();
@SerializedName("rn")
private ConcurrentMap<String, Repository> repoNameMap = Maps.newConcurrentMap();
private ConcurrentMap<Long, Repository> repoIdMap = Maps.newConcurrentMap();

private ReentrantLock lock = new ReentrantLock();

Expand Down Expand Up @@ -153,19 +159,27 @@ public List<List<String>> getReposInfo() {
}

public static RepositoryMgr read(DataInput in) throws IOException {
RepositoryMgr mgr = new RepositoryMgr();
mgr.readFields(in);
return mgr;
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) {
RepositoryMgr mgr = new RepositoryMgr();
mgr.readFields(in);
return mgr;
} else {
return GsonUtils.GSON.fromJson(Text.readString(in), RepositoryMgr.class);
}

}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(repoNameMap.size());
for (Repository repo : repoNameMap.values()) {
repo.write(out);
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

@Override
public void gsonPostProcess() {
repoNameMap.forEach((n, repo) -> repoIdMap.put(repo.getId(), repo));
}

@Deprecated
public void readFields(DataInput in) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,25 @@

import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.persist.gson.GsonPreProcessable;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;

/**
* Use for persistence, Repository will persist properties of file system.
*/
public abstract class PersistentFileSystem implements FileSystem, Writable {
public abstract class PersistentFileSystem implements FileSystem, GsonPreProcessable {
public static final String STORAGE_TYPE = "_DORIS_STORAGE_TYPE_";
protected Map<String, String> properties = Maps.newHashMap();
protected String name;
protected StorageBackend.StorageType type;
@SerializedName("prop")
public Map<String, String> properties = Maps.newHashMap();
@SerializedName("n")
public String name;
public StorageBackend.StorageType type;

public boolean needFullPath() {
return type == StorageBackend.StorageType.S3
Expand Down Expand Up @@ -66,7 +67,8 @@ public StorageBackend.StorageType getStorageType() {
* @param in persisted data
* @return file systerm
*/
public static RemoteFileSystem read(DataInput in) throws IOException {
@Deprecated
public static PersistentFileSystem read(DataInput in) throws IOException {
String name = Text.readString(in);
Map<String, String> properties = Maps.newHashMap();
StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
Expand All @@ -83,14 +85,8 @@ public static RemoteFileSystem read(DataInput in) throws IOException {
return FileSystemFactory.get(name, type, properties);
}

public void write(DataOutput out) throws IOException {
// must write type first
Text.writeString(out, name);
@Override
public void gsonPreProcess() {
properties.put(STORAGE_TYPE, type.name());
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.gson.reflect.TypeToken;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -56,8 +55,11 @@ public class LoadJobFinalOperation extends TxnCommitAttachment implements Writab
@SerializedName(value = "fm")
private FailMsg failMsg;
// only used for copy into
@SerializedName("cid")
private String copyId = "";
@SerializedName("lfp")
private String loadFilePaths = "";
@SerializedName("prop")
private Map<String, String> properties = new HashMap<>();

public LoadJobFinalOperation() {
Expand Down Expand Up @@ -125,29 +127,7 @@ public Map<String, String> getProperties() {
return properties;
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(id);
loadingStatus.write(out);
out.writeInt(progress);
out.writeLong(loadStartTimestamp);
out.writeLong(finishTimestamp);
Text.writeString(out, jobState.name());
if (failMsg == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
failMsg.write(out);
}
if (Config.isCloudMode()) {
Text.writeString(out, copyId);
Text.writeString(out, loadFilePaths);
Gson gson = new Gson();
Text.writeString(out, properties == null ? "" : gson.toJson(properties));
}
}

@Deprecated
public void readFields(DataInput in) throws IOException {
super.readFields(in);
id = in.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class MiniLoadTxnCommitAttachment extends TxnCommitAttachment {
Expand Down Expand Up @@ -52,20 +51,7 @@ public String getErrorLogUrl() {
return errorLogUrl;
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeLong(filteredRows);
out.writeLong(loadedRows);
if (errorLogUrl == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Text.writeString(out, errorLogUrl);
}

}

@Deprecated
public void readFields(DataInput in) throws IOException {
super.readFields(in);
filteredRows = in.readLong();
Expand Down
Loading

0 comments on commit 3a8dbe4

Please sign in to comment.