From 3a8dbe48c3e3a94e766a9d551aba2344d36bd429 Mon Sep 17 00:00:00 2001 From: Peyz <30936555+iszhangpch@users.noreply.github.com> Date: Wed, 26 Jun 2024 19:02:34 +0800 Subject: [PATCH] [improvement](meta) Switch meta serialization to gson 4 (#36568) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Proposed changes Issue Number: Switch meta serialization to gson Contains the following classes: RoutineLoadJob UserPropertyInfo AnalyzeDeletionLog CreateTableInfo HbPackage ~~PartitionPersistInfo~~ ReplicaPersistInfo RoutineLoadOperation --------- Co-authored-by: zhangpeicheng Co-authored-by: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> --- .../apache/doris/common/FeMetaVersion.java | 4 +- .../doris/analysis/ImportColumnDesc.java | 3 + .../apache/doris/analysis/StorageBackend.java | 2 + .../org/apache/doris/backup/Repository.java | 48 +++++-- .../apache/doris/backup/RepositoryMgr.java | 36 ++++-- .../apache/doris/fs/PersistentFileSystem.java | 28 ++-- .../load/loadv2/LoadJobFinalOperation.java | 28 +--- .../loadv2/MiniLoadTxnCommitAttachment.java | 16 +-- .../doris/load/routineload/KafkaProgress.java | 48 ++++--- .../load/routineload/KafkaRoutineLoadJob.java | 25 +--- .../RLTaskTxnCommitAttachment.java | 13 +- .../load/routineload/RoutineLoadJob.java | 120 ++++++++++-------- .../load/routineload/RoutineLoadProgress.java | 11 +- .../routineload/RoutineLoadStatistic.java | 10 +- .../mysql/privilege/UserPropertyInfo.java | 32 +++-- .../doris/persist/AnalyzeDeletionLog.java | 17 ++- .../apache/doris/persist/CreateTableInfo.java | 17 ++- .../org/apache/doris/persist/HbPackage.java | 22 +++- .../doris/persist/PartitionPersistInfo.java | 1 + .../doris/persist/ReplicaPersistInfo.java | 65 ++++++---- .../doris/persist/RoutineLoadOperation.java | 21 ++- .../persist/gson/GsonPreProcessable.java | 24 ++++ .../apache/doris/persist/gson/GsonUtils.java | 39 +++++- .../org/apache/doris/qe/OriginStatement.java | 10 +- .../org/apache/doris/task/LoadTaskInfo.java | 3 + .../doris/transaction/TransactionState.java | 1 + .../transaction/TxnCommitAttachment.java | 48 ++++--- .../RoutineLoadTaskSchedulerTest.java | 3 +- .../transaction/GlobalTransactionMgrTest.java | 5 +- 29 files changed, 407 insertions(+), 293 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index 2c1f22735ffbee..7aa98625639636 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -94,9 +94,11 @@ public final class FeMetaVersion { public static final int VERSION_135 = 135; // For mate gson public static final int VERSION_136 = 136; + // For mate gson + public static final int VERSION_137 = 137; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_136; + public static final int VERSION_CURRENT = VERSION_137; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java index 13040bdcb5cf89..e62fca2c27f7ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java @@ -21,9 +21,12 @@ import org.apache.doris.catalog.Column; import com.google.common.base.Preconditions; +import com.google.gson.annotations.SerializedName; public class ImportColumnDesc { + @SerializedName("cn") private String columnName; + @SerializedName("expr") private Expr expr; public ImportColumnDesc(ImportColumnDesc other) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java index 5287118a586761..b07725d2507175 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.TStorageBackendType; import com.google.common.base.Strings; +import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; import java.util.Map; @@ -153,6 +154,7 @@ public enum StorageType { STREAM("Stream load pipe"), AZURE("MicroSoft Azure Blob"); + @SerializedName("desc") private final String description; StorageType(String description) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index 5a7b0d044fba00..b571393efbbd81 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -24,12 +24,14 @@ import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.fs.PersistentFileSystem; import org.apache.doris.fs.remote.AzureFileSystem; import org.apache.doris.fs.remote.BrokerFileSystem; @@ -37,12 +39,15 @@ import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.S3FileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.Backend; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -91,7 +96,7 @@ * * __10023_seg2.dat.DNW231dnklawd * * __10023.hdr.dnmwDDWI92dDko */ -public class Repository implements Writable { +public class Repository implements Writable, GsonPostProcessable { public static final String PREFIX_REPO = "__palo_repository_"; public static final String PREFIX_SNAPSHOT_DIR = "__ss_"; public static final String PREFIX_DB = "__db_"; @@ -110,19 +115,25 @@ public class Repository implements Writable { private static final String PATH_DELIMITER = "/"; private static final String CHECKSUM_SEPARATOR = "."; + @SerializedName("id") private long id; + @SerializedName("n") private String name; private String errMsg; + @SerializedName("ct") private long createTime; // If True, user can not backup data to this repo. + @SerializedName("iro") private boolean isReadOnly; // BOS location should start with "bos://your_bucket_name/" // and the specified bucket should exist. + @SerializedName("lo") private String location; - private RemoteFileSystem fileSystem; + @SerializedName("fs") + private PersistentFileSystem fileSystem; private Repository() { // for persist @@ -184,9 +195,26 @@ public static String replaceFileNameWithChecksumFileName(String origPath, String } public static Repository read(DataInput in) throws IOException { - Repository repo = new Repository(); - repo.readFields(in); - return repo; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + Repository repo = new Repository(); + repo.readFields(in); + return repo; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), Repository.class); + } + } + + @Override + public void gsonPostProcess() { + StorageBackend.StorageType type = StorageBackend.StorageType.BROKER; + if (this.fileSystem.properties.containsKey(PersistentFileSystem.STORAGE_TYPE)) { + type = StorageBackend.StorageType.valueOf( + this.fileSystem.properties.get(PersistentFileSystem.STORAGE_TYPE)); + this.fileSystem.properties.remove(PersistentFileSystem.STORAGE_TYPE); + } + this.fileSystem = FileSystemFactory.get(this.fileSystem.getName(), + type, + this.fileSystem.getProperties()); } public long getId() { @@ -209,7 +237,7 @@ public String getErrorMsg() { return errMsg; } - public RemoteFileSystem getRemoteFileSystem() { + public PersistentFileSystem getRemoteFileSystem() { return fileSystem; } @@ -836,14 +864,10 @@ private String allocLocalFileSuffix() { @Override public void write(DataOutput out) throws IOException { - out.writeLong(id); - Text.writeString(out, name); - out.writeBoolean(isReadOnly); - Text.writeString(out, location); - fileSystem.write(out); - out.writeLong(createTime); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Deprecated public void readFields(DataInput in) throws IOException { id = in.readLong(); name = Text.readString(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java index 0216779dcd39b3..853c1841449046 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RepositoryMgr.java @@ -19,13 +19,18 @@ import org.apache.doris.backup.Status.ErrCode; import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.Daemon; import org.apache.doris.fs.remote.AzureFileSystem; import org.apache.doris.fs.remote.S3FileSystem; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -33,18 +38,19 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; -import java.util.Map; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantLock; /* * A manager to manage all backup repositories */ -public class RepositoryMgr extends Daemon implements Writable { +public class RepositoryMgr extends Daemon implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(RepositoryMgr.class); // all key should be in lower case - private Map repoNameMap = Maps.newConcurrentMap(); - private Map repoIdMap = Maps.newConcurrentMap(); + @SerializedName("rn") + private ConcurrentMap repoNameMap = Maps.newConcurrentMap(); + private ConcurrentMap repoIdMap = Maps.newConcurrentMap(); private ReentrantLock lock = new ReentrantLock(); @@ -153,19 +159,27 @@ public List> getReposInfo() { } public static RepositoryMgr read(DataInput in) throws IOException { - RepositoryMgr mgr = new RepositoryMgr(); - mgr.readFields(in); - return mgr; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + RepositoryMgr mgr = new RepositoryMgr(); + mgr.readFields(in); + return mgr; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), RepositoryMgr.class); + } + } @Override public void write(DataOutput out) throws IOException { - out.writeInt(repoNameMap.size()); - for (Repository repo : repoNameMap.values()) { - repo.write(out); - } + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + @Override + public void gsonPostProcess() { + repoNameMap.forEach((n, repo) -> repoIdMap.put(repo.getId(), repo)); } + @Deprecated public void readFields(DataInput in) throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java index d2cc898766d980..9cb156036d9304 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java @@ -19,24 +19,25 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.persist.gson.GsonPreProcessable; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Map; /** * Use for persistence, Repository will persist properties of file system. */ -public abstract class PersistentFileSystem implements FileSystem, Writable { +public abstract class PersistentFileSystem implements FileSystem, GsonPreProcessable { public static final String STORAGE_TYPE = "_DORIS_STORAGE_TYPE_"; - protected Map properties = Maps.newHashMap(); - protected String name; - protected StorageBackend.StorageType type; + @SerializedName("prop") + public Map properties = Maps.newHashMap(); + @SerializedName("n") + public String name; + public StorageBackend.StorageType type; public boolean needFullPath() { return type == StorageBackend.StorageType.S3 @@ -66,7 +67,8 @@ public StorageBackend.StorageType getStorageType() { * @param in persisted data * @return file systerm */ - public static RemoteFileSystem read(DataInput in) throws IOException { + @Deprecated + public static PersistentFileSystem read(DataInput in) throws IOException { String name = Text.readString(in); Map properties = Maps.newHashMap(); StorageBackend.StorageType type = StorageBackend.StorageType.BROKER; @@ -83,14 +85,8 @@ public static RemoteFileSystem read(DataInput in) throws IOException { return FileSystemFactory.get(name, type, properties); } - public void write(DataOutput out) throws IOException { - // must write type first - Text.writeString(out, name); + @Override + public void gsonPreProcess() { properties.put(STORAGE_TYPE, type.name()); - out.writeInt(properties.size()); - for (Map.Entry entry : properties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobFinalOperation.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobFinalOperation.java index 223dbd1ed7bc3a..f536bc8e6c3815 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobFinalOperation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJobFinalOperation.java @@ -30,7 +30,6 @@ import com.google.gson.reflect.TypeToken; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -56,8 +55,11 @@ public class LoadJobFinalOperation extends TxnCommitAttachment implements Writab @SerializedName(value = "fm") private FailMsg failMsg; // only used for copy into + @SerializedName("cid") private String copyId = ""; + @SerializedName("lfp") private String loadFilePaths = ""; + @SerializedName("prop") private Map properties = new HashMap<>(); public LoadJobFinalOperation() { @@ -125,29 +127,7 @@ public Map getProperties() { return properties; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeLong(id); - loadingStatus.write(out); - out.writeInt(progress); - out.writeLong(loadStartTimestamp); - out.writeLong(finishTimestamp); - Text.writeString(out, jobState.name()); - if (failMsg == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - failMsg.write(out); - } - if (Config.isCloudMode()) { - Text.writeString(out, copyId); - Text.writeString(out, loadFilePaths); - Gson gson = new Gson(); - Text.writeString(out, properties == null ? "" : gson.toJson(properties)); - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); id = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java index 3bb48f83c6204e..1f7e7f7b7a7462 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java @@ -24,7 +24,6 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; public class MiniLoadTxnCommitAttachment extends TxnCommitAttachment { @@ -52,20 +51,7 @@ public String getErrorLogUrl() { return errorLogUrl; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeLong(filteredRows); - out.writeLong(loadedRows); - if (errorLogUrl == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, errorLogUrl); - } - - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); filteredRows = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java index 864d5ee0ef7c98..cb6c36dc2e69bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaProgress.java @@ -31,11 +31,11 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * this is description of kafka routine load progress @@ -56,7 +56,7 @@ public class KafkaProgress extends RoutineLoadProgress { // (partition id, begin offset) // the offset saved here is the next offset need to be consumed @SerializedName(value = "pito") - private Map partitionIdToOffset = Maps.newConcurrentMap(); + private ConcurrentMap partitionIdToOffset = Maps.newConcurrentMap(); public KafkaProgress() { super(LoadDataSourceType.KAFKA); @@ -64,17 +64,24 @@ public KafkaProgress() { public KafkaProgress(TKafkaRLTaskProgress tKafkaRLTaskProgress) { super(LoadDataSourceType.KAFKA); - this.partitionIdToOffset = tKafkaRLTaskProgress.getPartitionCmtOffset(); + this.partitionIdToOffset = new ConcurrentHashMap<>(); + partitionIdToOffset.putAll(tKafkaRLTaskProgress.getPartitionCmtOffset()); } public KafkaProgress(Map partitionIdToOffset) { + super(LoadDataSourceType.KAFKA); + this.partitionIdToOffset = new ConcurrentHashMap<>(); + this.partitionIdToOffset.putAll(partitionIdToOffset); + } + + public KafkaProgress(ConcurrentMap partitionIdToOffset) { super(LoadDataSourceType.KAFKA); this.partitionIdToOffset = partitionIdToOffset; } - public Map getPartitionIdToOffset(List partitionIds) { - Map result = Maps.newHashMap(); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { + public ConcurrentMap getPartitionIdToOffset(List partitionIds) { + ConcurrentMap result = Maps.newConcurrentMap(); + for (ConcurrentMap.Entry entry : partitionIdToOffset.entrySet()) { for (Integer partitionId : partitionIds) { if (entry.getKey().equals(partitionId)) { result.put(partitionId, entry.getValue()); @@ -92,7 +99,7 @@ public Long getOffsetByPartition(int kafkaPartition) { return partitionIdToOffset.get(kafkaPartition); } - public Map getOffsetByPartition() { + public ConcurrentMap getOffsetByPartition() { return partitionIdToOffset; } @@ -109,8 +116,8 @@ public boolean hasPartition() { // OFFSET_END: user set offset = OFFSET_END, no committed msg // OFFSET_BEGINNING: user set offset = OFFSET_BEGINNING, no committed msg // other: current committed msg's offset - private void getReadableProgress(Map showPartitionIdToOffset) { - for (Map.Entry entry : partitionIdToOffset.entrySet()) { + private void getReadableProgress(ConcurrentMap showPartitionIdToOffset) { + for (ConcurrentMap.Entry entry : partitionIdToOffset.entrySet()) { if (entry.getValue() == 0) { showPartitionIdToOffset.put(entry.getKey(), OFFSET_ZERO); } else if (entry.getValue() == -1) { @@ -141,7 +148,7 @@ public void modifyOffset(List> kafkaPartitionOffsets) throws public List> getPartitionOffsetPairs(boolean alreadyConsumed) { List> pairs = Lists.newArrayList(); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { + for (ConcurrentMap.Entry entry : partitionIdToOffset.entrySet()) { if (entry.getValue() == 0) { pairs.add(Pair.of(entry.getKey(), OFFSET_ZERO)); } else if (entry.getValue() == -1) { @@ -169,7 +176,7 @@ public List> getPartitionOffsetPairs(boolean alreadyConsum // so the lag should be (4-2=)2. public Map getLag(Map partitionIdWithLatestOffsets) { Map lagMap = Maps.newHashMap(); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { + for (ConcurrentMap.Entry entry : partitionIdToOffset.entrySet()) { if (partitionIdWithLatestOffsets.containsKey(entry.getKey())) { long lag = partitionIdWithLatestOffsets.get(entry.getKey()) - entry.getValue(); lagMap.put(entry.getKey(), lag); @@ -182,7 +189,7 @@ public Map getLag(Map partitionIdWithLatestOffsets @Override public String toString() { - Map showPartitionIdToOffset = Maps.newHashMap(); + ConcurrentMap showPartitionIdToOffset = Maps.newConcurrentMap(); getReadableProgress(showPartitionIdToOffset); return "KafkaProgress [partitionIdToOffset=" + Joiner.on("|").withKeyValueSeparator("_").join(showPartitionIdToOffset) + "]"; @@ -190,7 +197,7 @@ public String toString() { @Override public String toJsonString() { - Map showPartitionIdToOffset = Maps.newHashMap(); + ConcurrentMap showPartitionIdToOffset = Maps.newConcurrentMap(); getReadableProgress(showPartitionIdToOffset); Gson gson = new Gson(); return gson.toJson(showPartitionIdToOffset); @@ -208,20 +215,11 @@ public void update(RLTaskTxnCommitAttachment attachment) { } } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeInt(partitionIdToOffset.size()); - for (Map.Entry entry : partitionIdToOffset.entrySet()) { - out.writeInt((Integer) entry.getKey()); - out.writeLong((Long) entry.getValue()); - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); int size = in.readInt(); - partitionIdToOffset = new HashMap<>(); + partitionIdToOffset = new ConcurrentHashMap<>(); for (int i = 0; i < size; i++) { partitionIdToOffset.put(in.readInt(), in.readLong()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index a1ceebd605c1cb..751516b559a8c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -54,6 +54,7 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; import org.apache.commons.lang3.BooleanUtils; @@ -61,7 +62,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -83,9 +83,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { public static final String KAFKA_FILE_CATALOG = "kafka"; public static final String PROP_GROUP_ID = "group.id"; + @SerializedName("bl") private String brokerList; + @SerializedName("tp") private String topic; // optional, user want to load partitions. + @SerializedName("cskp") private List customKafkaPartitions = Lists.newArrayList(); // current kafka partitions is the actual partition which will be fetched private List currentKafkaPartitions = Lists.newArrayList(); @@ -95,6 +98,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // We should check it by calling isOffsetForTimes() method before use it. private String kafkaDefaultOffSet = ""; // kafka properties ,property prefix will be mapped to kafka custom parameters, which can be extended in the future + @SerializedName("prop") private Map customProperties = Maps.newHashMap(); private Map convertedCustomProperties = Maps.newHashMap(); @@ -638,24 +642,7 @@ protected Map getCustomProperties() { return ret; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - Text.writeString(out, brokerList); - Text.writeString(out, topic); - - out.writeInt(customKafkaPartitions.size()); - for (Integer partitionId : customKafkaPartitions) { - out.writeInt(partitionId); - } - - out.writeInt(customProperties.size()); - for (Map.Entry property : customProperties.entrySet()) { - Text.writeString(out, "property." + property.getKey()); - Text.writeString(out, property.getValue()); - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); brokerList = Text.readString(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java index 59ef6c7f1bfd86..af390ce094ad73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RLTaskTxnCommitAttachment.java @@ -26,7 +26,6 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; // {"progress": "", "backendId": "", "taskSignature": "", "numOfErrorData": "", @@ -145,17 +144,7 @@ public String toString() { + ", progress=" + progress.toString() + "]"; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeLong(filteredRows); - out.writeLong(loadedRows); - out.writeLong(unselectedRows); - out.writeLong(receivedBytes); - out.writeLong(taskExecutionTimeMs); - progress.write(out); - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); filteredRows = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 348ac00f9da437..d870922e2d7e36 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -53,12 +53,13 @@ import org.apache.doris.metric.MetricRepo; import org.apache.doris.persist.AlterRoutineLoadJobOperationLog; import org.apache.doris.persist.RoutineLoadOperation; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.planner.StreamLoadPlanner; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.SqlModeHelper; -import org.apache.doris.system.SystemInfoService; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; @@ -78,6 +79,7 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; import org.apache.logging.log4j.LogManager; @@ -102,7 +104,9 @@ * The desireTaskConcurrentNum means that user expect the number of concurrent stream load * The routine load job support different streaming medium such as KAFKA */ -public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback implements Writable, LoadTaskInfo { +public abstract class RoutineLoadJob + extends AbstractTxnStateChangeCallback + implements Writable, LoadTaskInfo, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(RoutineLoadJob.class); public static final long DEFAULT_MAX_ERROR_NUM = 0; @@ -159,9 +163,13 @@ public boolean isFinalState() { } } + @SerializedName("id") protected long id; + @SerializedName("n") protected String name; + @SerializedName("dbid") protected long dbId; + @SerializedName("tbid") protected long tableId; // this code is used to verify be task request protected long authCode; @@ -172,23 +180,29 @@ public boolean isFinalState() { protected Expr whereExpr; // optional protected Separator columnSeparator; // optional protected Separator lineDelimiter; + @SerializedName("dtcn") protected int desireTaskConcurrentNum; // optional + @SerializedName("st") protected JobState state = JobState.NEED_SCHEDULE; @Getter + @SerializedName("dsrc") protected LoadDataSourceType dataSourceType; // max number of error data in max batch rows * 10 // maxErrorNum / (maxBatchRows * 10) = max error rate of routine load job // if current error rate is more than max error rate, the job will be paused + @SerializedName("men") protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional protected double maxFilterRatio = DEFAULT_MAX_FILTER_RATIO; protected long execMemLimit = DEFAULT_EXEC_MEM_LIMIT; protected int sendBatchParallelism = DEFAULT_SEND_BATCH_PARALLELISM; protected boolean loadToSingleTablet = DEFAULT_LOAD_TO_SINGLE_TABLET; // include strict mode + @SerializedName("jp") protected Map jobProperties = Maps.newHashMap(); // sessionVariable's name -> sessionVariable's value // we persist these sessionVariables due to the session is not available when replaying the job. + @SerializedName("sv") protected Map sessionVariables = Maps.newHashMap(); /* @@ -197,8 +211,11 @@ public boolean isFinalState() { * If a task can consume data from source at rate of 10MB/s, and 500B a row, * then we can process 100MB for 10 secs, which is 200000 rows */ + @SerializedName("mbis") protected long maxBatchIntervalS = DEFAULT_MAX_INTERVAL_SECOND; + @SerializedName("mbr") protected long maxBatchRows = DEFAULT_MAX_BATCH_ROWS; + @SerializedName("mbsb") protected long maxBatchSizeBytes = DEFAULT_MAX_BATCH_SIZE; protected boolean isPartialUpdate = false; @@ -222,6 +239,7 @@ public boolean isFinalState() { protected int currentTaskConcurrentNum; + @SerializedName("pg") protected RoutineLoadProgress progress; protected long latestResumeTimestamp; // the latest resume time @@ -231,10 +249,14 @@ public boolean isFinalState() { protected ErrorReason pauseReason; protected ErrorReason cancelReason; + @SerializedName("cts") protected long createTimestamp = System.currentTimeMillis(); + @SerializedName("pts") protected long pauseTimestamp = -1; + @SerializedName("ets") protected long endTimestamp = -1; + @SerializedName("js") protected RoutineLoadStatistic jobStatistic = new RoutineLoadStatistic(); // The tasks belong to this job @@ -245,10 +267,13 @@ public boolean isFinalState() { // this is the origin stmt of CreateRoutineLoadStmt, we use it to persist the RoutineLoadJob, // because we can not serialize the Expressions contained in job. + @SerializedName("ostmt") protected OriginStatement origStmt; // User who submit this job. Maybe null for the old version job(before v1.1) + @SerializedName("ui") protected UserIdentity userIdentity; + @SerializedName("cm") protected String comment = ""; protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); @@ -261,6 +286,7 @@ public boolean isFinalState() { protected boolean isTypeRead = false; + @SerializedName("ccid") private String cloudClusterId; protected byte enclose = 0; @@ -1799,68 +1825,54 @@ public boolean isFinal() { } public static RoutineLoadJob read(DataInput in) throws IOException { - RoutineLoadJob job = null; - LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); - if (type == LoadDataSourceType.KAFKA) { - job = new KafkaRoutineLoadJob(); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + RoutineLoadJob job = null; + LoadDataSourceType type = LoadDataSourceType.valueOf(Text.readString(in)); + if (type == LoadDataSourceType.KAFKA) { + job = new KafkaRoutineLoadJob(); + } else { + throw new IOException("Unknown load data source type: " + type.name()); + } + + job.setTypeRead(true); + job.readFields(in); + return job; } else { - throw new IOException("Unknown load data source type: " + type.name()); + return GsonUtils.GSON.fromJson(Text.readString(in), RoutineLoadJob.class); } - - job.setTypeRead(true); - job.readFields(in); - return job; } @Override public void write(DataOutput out) throws IOException { - // ATTN: must write type first - Text.writeString(out, dataSourceType.name()); - - out.writeLong(id); - Text.writeString(out, name); - Text.writeString(out, SystemInfoService.DEFAULT_CLUSTER); - out.writeLong(dbId); - out.writeLong(tableId); - out.writeInt(desireTaskConcurrentNum); - Text.writeString(out, state.name()); - out.writeLong(maxErrorNum); - out.writeLong(maxBatchIntervalS); - out.writeLong(maxBatchRows); - out.writeLong(maxBatchSizeBytes); - progress.write(out); - - out.writeLong(createTimestamp); - out.writeLong(pauseTimestamp); - out.writeLong(endTimestamp); - - this.jobStatistic.write(out); - - origStmt.write(out); - out.writeInt(jobProperties.size()); - for (Map.Entry entry : jobProperties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - - out.writeInt(sessionVariables.size()); - for (Map.Entry entry : sessionVariables.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - - if (userIdentity == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - userIdentity.write(out); + Text.writeString(out, GsonUtils.GSON.toJson(this)); + } + + @Override + public void gsonPostProcess() throws IOException { + if (tableId == 0) { + isMultiTable = true; } - if (Config.isCloudMode()) { - Text.writeString(out, cloudClusterId); + jobProperties.forEach((k, v) -> { + if (k.equals(CreateRoutineLoadStmt.PARTIAL_COLUMNS)) { + isPartialUpdate = Boolean.parseBoolean(v); + } + }); + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt), + Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE)))); + CreateRoutineLoadStmt stmt = null; + try { + stmt = (CreateRoutineLoadStmt) SqlParserUtils.getStmt(parser, origStmt.idx); + stmt.checkLoadProperties(); + setRoutineLoadDesc(stmt.getRoutineLoadDesc()); + } catch (Exception e) { + throw new IOException("error happens when parsing create routine load stmt: " + origStmt.originStmt, e); + } + if (userIdentity != null) { + userIdentity.setIsAnalyzed(); } - Text.writeString(out, comment); } + @Deprecated protected void readFields(DataInput in) throws IOException { if (!isTypeRead) { dataSourceType = LoadDataSourceType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java index fad465c9277496..bb3b3e88daa77b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadProgress.java @@ -18,15 +18,13 @@ package org.apache.doris.load.routineload; import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; import com.google.gson.annotations.SerializedName; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; -public abstract class RoutineLoadProgress implements Writable { +public abstract class RoutineLoadProgress { @SerializedName(value = "ldst") protected LoadDataSourceType loadDataSourceType; protected boolean isTypeRead = false; @@ -57,12 +55,7 @@ public static RoutineLoadProgress read(DataInput in) throws IOException { return progress; } - @Override - public void write(DataOutput out) throws IOException { - // ATTN: must write type first - Text.writeString(out, loadDataSourceType.name()); - } - + @Deprecated public void readFields(DataInput in) throws IOException { if (!isTypeRead) { loadDataSourceType = LoadDataSourceType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java index 45c9cb1686cba7..ad10367f9823d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadStatistic.java @@ -18,7 +18,6 @@ package org.apache.doris.load.routineload; import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Maps; @@ -26,12 +25,11 @@ import com.google.gson.annotations.SerializedName; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Map; import java.util.Set; -public class RoutineLoadStatistic implements Writable { +public class RoutineLoadStatistic { /* * The following variables are for statistics * currentErrorRows/currentTotalRows: the row statistics of current sampling period @@ -68,12 +66,6 @@ public class RoutineLoadStatistic implements Writable { // No need to persist, only for tracing txn of routine load job. public Set runningTxnIds = Sets.newHashSet(); - @Override - public void write(DataOutput out) throws IOException { - String json = GsonUtils.GSON.toJson(this); - Text.writeString(out, json); - } - public static RoutineLoadStatistic read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, RoutineLoadStatistic.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyInfo.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyInfo.java index e7ee45174833b4..e1616ea133752e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyInfo.java @@ -17,21 +17,28 @@ package org.apache.doris.mysql.privilege; +import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.List; -public class UserPropertyInfo implements Writable { +public class UserPropertyInfo implements Writable, GsonPostProcessable { + @SerializedName("u") private String user; + @SerializedName("prop") private List> properties = Lists.newArrayList(); private UserPropertyInfo() { @@ -52,21 +59,26 @@ public List> getProperties() { } public static UserPropertyInfo read(DataInput in) throws IOException { - UserPropertyInfo info = new UserPropertyInfo(); - info.readFields(in); - return info; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + UserPropertyInfo info = new UserPropertyInfo(); + info.readFields(in); + return info; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), UserPropertyInfo.class); + } + } + + @Override + public void gsonPostProcess() throws IOException { + user = ClusterNamespace.getNameFromFullName(user); } @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, user); - out.writeInt(properties.size()); - for (Pair entry : properties) { - Text.writeString(out, entry.first); - Text.writeString(out, entry.second); - } + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Deprecated public void readFields(DataInput in) throws IOException { user = ClusterNamespace.getNameFromFullName(Text.readString(in)); int size = in.readInt(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/AnalyzeDeletionLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/AnalyzeDeletionLog.java index 7535aeb29b4068..86dcc506cc846e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/AnalyzeDeletionLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/AnalyzeDeletionLog.java @@ -17,7 +17,13 @@ package org.apache.doris.persist; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -25,6 +31,7 @@ public class AnalyzeDeletionLog implements Writable { + @SerializedName("id") public final long id; public AnalyzeDeletionLog(long id) { @@ -33,10 +40,14 @@ public AnalyzeDeletionLog(long id) { @Override public void write(DataOutput out) throws IOException { - out.writeLong(id); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } - public static AnalyzeDeletionLog read(DataInput dataInput) throws IOException { - return new AnalyzeDeletionLog(dataInput.readLong()); + public static AnalyzeDeletionLog read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + return new AnalyzeDeletionLog(in.readLong()); + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), AnalyzeDeletionLog.class); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/CreateTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/CreateTableInfo.java index 7ed43e2371c0cb..1f2bcc15eb4252 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/CreateTableInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/CreateTableInfo.java @@ -17,8 +17,10 @@ package org.apache.doris.persist; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; import org.apache.doris.cluster.ClusterNamespace; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonPostProcessable; @@ -58,17 +60,22 @@ public Table getTable() { return table; } + @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, dbName); - table.write(out); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } public static CreateTableInfo read(DataInput in) throws IOException { - CreateTableInfo createTableInfo = new CreateTableInfo(); - createTableInfo.readFields(in); - return createTableInfo; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + CreateTableInfo createTableInfo = new CreateTableInfo(); + createTableInfo.readFields(in); + return createTableInfo; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), CreateTableInfo.class); + } } + @Deprecated private void readFields(DataInput in) throws IOException { dbName = ClusterNamespace.getNameFromFullName(Text.readString(in)); table = Table.read(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/HbPackage.java b/fe/fe-core/src/main/java/org/apache/doris/persist/HbPackage.java index 17eb7bdc576adf..5b0c1a8e5c3abd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/HbPackage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/HbPackage.java @@ -17,10 +17,15 @@ package org.apache.doris.persist; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.system.HeartbeatResponse; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -29,6 +34,7 @@ public class HbPackage implements Writable { + @SerializedName("hbr") private List hbResults = Lists.newArrayList(); public HbPackage() { @@ -44,19 +50,21 @@ public List getHbResults() { } public static HbPackage read(DataInput in) throws IOException { - HbPackage hbPackage = new HbPackage(); - hbPackage.readFields(in); - return hbPackage; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + HbPackage hbPackage = new HbPackage(); + hbPackage.readFields(in); + return hbPackage; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), HbPackage.class); + } } @Override public void write(DataOutput out) throws IOException { - out.writeInt(hbResults.size()); - for (HeartbeatResponse heartbeatResult : hbResults) { - heartbeatResult.write(out); - } + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Deprecated public void readFields(DataInput in) throws IOException { int size = in.readInt(); for (int i = 0; i < size; i++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java index 09d1b61c93667d..9617a4b8323f88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/PartitionPersistInfo.java @@ -134,6 +134,7 @@ public static PartitionPersistInfo read(DataInput in) throws IOException { } } + @Deprecated public void readFields(DataInput in) throws IOException { dbId = in.readLong(); tableId = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java index acf9ad5d736737..1b45bbb9bfc093 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/ReplicaPersistInfo.java @@ -17,13 +17,20 @@ package org.apache.doris.persist; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -public class ReplicaPersistInfo implements Writable { +public class ReplicaPersistInfo implements Writable, GsonPostProcessable { public enum ReplicaOperationType { ADD(0), @@ -42,6 +49,7 @@ public enum ReplicaOperationType { DEFAULT_OP(8), TABLET_INFO(9); + @SerializedName("v") private final int value; ReplicaOperationType(int value) { @@ -81,29 +89,46 @@ public static ReplicaOperationType findByValue(int value) { } // required + @SerializedName("op") private ReplicaOperationType opType; + @SerializedName("dbid") private long dbId; + @SerializedName("tbid") private long tableId; + @SerializedName("pid") private long partitionId; + @SerializedName("ind") private long indexId; + @SerializedName("tblid") private long tabletId; + @SerializedName("repid") private long replicaId; + @SerializedName("beid") private long backendId; + @SerializedName("ver") private long version; @Deprecated + @SerializedName("verh") private long versionHash = 0L; + @SerializedName("sh") private int schemaHash = -1; + @SerializedName("ds") private long dataSize; private long remoteDataSize; + @SerializedName("rc") private long rowCount; + @SerializedName("lfv") private long lastFailedVersion = -1L; @Deprecated + @SerializedName("lfvh") private long lastFailedVersionHash = 0L; + @SerializedName("lsv") private long lastSuccessVersion = -1L; @Deprecated + @SerializedName("lsvh") private long lastSuccessVersionHash = 0L; public static ReplicaPersistInfo createForAdd(long dbId, long tableId, long partitionId, long indexId, @@ -283,34 +308,28 @@ public long getLastSuccessVersion() { } public static ReplicaPersistInfo read(DataInput in) throws IOException { - ReplicaPersistInfo replicaInfo = new ReplicaPersistInfo(); - replicaInfo.readFields(in); - return replicaInfo; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + ReplicaPersistInfo replicaInfo = new ReplicaPersistInfo(); + replicaInfo.readFields(in); + return replicaInfo; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), ReplicaPersistInfo.class); + } } @Override public void write(DataOutput out) throws IOException { - out.writeLong(dbId); - out.writeLong(tableId); - out.writeLong(partitionId); - out.writeLong(indexId); - out.writeLong(tabletId); - out.writeLong(backendId); - out.writeLong(replicaId); - out.writeLong(version); - out.writeLong(versionHash); - out.writeLong(dataSize); - out.writeLong(rowCount); - - out.writeInt(opType.value); - out.writeLong(lastFailedVersion); - out.writeLong(lastFailedVersionHash); - out.writeLong(lastSuccessVersion); - out.writeLong(lastSuccessVersionHash); - - out.writeInt(schemaHash); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Override + public void gsonPostProcess() throws IOException { + if (opType == null) { + throw new IOException("could not parse operation type from replica info"); + } + } + + @Deprecated public void readFields(DataInput in) throws IOException { dbId = in.readLong(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java b/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java index b8daa485da02fb..aa39788f4f02ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/RoutineLoadOperation.java @@ -17,16 +17,23 @@ package org.apache.doris.persist; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.routineload.RoutineLoadJob.JobState; +import org.apache.doris.persist.gson.GsonUtils; + +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class RoutineLoadOperation implements Writable { + @SerializedName("id") private long id; + @SerializedName("js") private JobState jobState; private RoutineLoadOperation() { @@ -46,17 +53,21 @@ public JobState getJobState() { } public static RoutineLoadOperation read(DataInput in) throws IOException { - RoutineLoadOperation operation = new RoutineLoadOperation(); - operation.readFields(in); - return operation; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + RoutineLoadOperation operation = new RoutineLoadOperation(); + operation.readFields(in); + return operation; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), RoutineLoadOperation.class); + } } @Override public void write(DataOutput out) throws IOException { - out.writeLong(id); - Text.writeString(out, jobState.name()); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Deprecated public void readFields(DataInput in) throws IOException { id = in.readLong(); jobState = JobState.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java new file mode 100644 index 00000000000000..19a8450e62e307 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonPreProcessable.java @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.persist.gson; + +import java.io.IOException; + +public interface GsonPreProcessable { + public void gsonPreProcess() throws IOException; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 90a1c507011cdd..7e49fd4e96e104 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -168,9 +168,9 @@ import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalCatalog; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase; import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; +import org.apache.doris.fs.PersistentFileSystem; import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.ObjFileSystem; -import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.doris.fs.remote.S3FileSystem; import org.apache.doris.fs.remote.dfs.DFSFileSystem; import org.apache.doris.fs.remote.dfs.JFSFileSystem; @@ -189,7 +189,9 @@ import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.load.routineload.KafkaProgress; +import org.apache.doris.load.routineload.KafkaRoutineLoadJob; import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; +import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.load.routineload.RoutineLoadProgress; import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties; import org.apache.doris.load.sync.SyncJob; @@ -540,8 +542,13 @@ public class GsonUtils { .registerDefaultSubtype(RoutineLoadProgress.class) .registerSubtype(KafkaProgress.class, KafkaProgress.class.getSimpleName()); - private static RuntimeTypeAdapterFactory remoteFileSystemTypeAdapterFactory - = RuntimeTypeAdapterFactory.of(RemoteFileSystem.class, "clazz") + private static RuntimeTypeAdapterFactory routineLoadJobTypeAdapterFactory + = RuntimeTypeAdapterFactory.of(RoutineLoadJob.class, "clazz") + .registerDefaultSubtype(RoutineLoadJob.class) + .registerSubtype(KafkaRoutineLoadJob.class, KafkaRoutineLoadJob.class.getSimpleName()); + + private static RuntimeTypeAdapterFactory remoteFileSystemTypeAdapterFactory + = RuntimeTypeAdapterFactory.of(PersistentFileSystem.class, "clazz") .registerSubtype(BrokerFileSystem.class, BrokerFileSystem.class.getSimpleName()) .registerSubtype(DFSFileSystem.class, DFSFileSystem.class.getSimpleName()) .registerSubtype(JFSFileSystem.class, JFSFileSystem.class.getSimpleName()) @@ -581,6 +588,7 @@ public class GsonUtils { // .registerTypeHierarchyAdapter(Expr.class, new ExprAdapter()) .registerTypeHierarchyAdapter(Multimap.class, new GuavaMultimapAdapter()) .registerTypeAdapterFactory(new PostProcessTypeAdapterFactory()) + .registerTypeAdapterFactory(new PreProcessTypeAdapterFactory()) .registerTypeAdapterFactory(new ExprAdapterFactory()) .registerTypeAdapterFactory(exprAdapterFactory) .registerTypeAdapterFactory(columnTypeAdapterFactory) @@ -603,6 +611,7 @@ public class GsonUtils { .registerTypeAdapterFactory(constraintTypeAdapterFactory) .registerTypeAdapterFactory(txnCommitAttachmentTypeAdapterFactory) .registerTypeAdapterFactory(routineLoadTypeAdapterFactory) + .registerTypeAdapterFactory(routineLoadJobTypeAdapterFactory) .registerTypeAdapterFactory(remoteFileSystemTypeAdapterFactory) .registerTypeAdapterFactory(jobBackupTypeAdapterFactory) .registerTypeAdapterFactory(loadJobTypeAdapterFactory) @@ -904,6 +913,30 @@ public ImmutableList deserialize(final JsonElement json, final Type type, } } + public static class PreProcessTypeAdapterFactory implements TypeAdapterFactory { + + public PreProcessTypeAdapterFactory() { + } + + @Override + public TypeAdapter create(Gson gson, TypeToken type) { + TypeAdapter delegate = gson.getDelegateAdapter(this, type); + + return new TypeAdapter() { + public void write(JsonWriter out, T value) throws IOException { + if (value instanceof GsonPreProcessable) { + ((GsonPreProcessable) value).gsonPreProcess(); + } + delegate.write(out, value); + } + + public T read(JsonReader reader) throws IOException { + return delegate.read(reader); + } + }; + } + } + public static class PostProcessTypeAdapterFactory implements TypeAdapterFactory { public PostProcessTypeAdapterFactory() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java b/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java index 0bbcd8dd3a83ab..5da34ae1c2e759 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java @@ -18,20 +18,18 @@ package org.apache.doris.qe; import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; import com.google.gson.annotations.SerializedName; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; /* * This class represents an origin statement * in multiple statements. */ -public class OriginStatement implements Writable { +public class OriginStatement { // the origin stmt from client. this may includes more than one statement. // eg: "select 1; select 2; select 3" @SerializedName(value = "originStmt") @@ -50,12 +48,6 @@ public static OriginStatement read(DataInput in) throws IOException { return GsonUtils.GSON.fromJson(json, OriginStatement.class); } - @Override - public void write(DataOutput out) throws IOException { - String json = GsonUtils.GSON.toJson(this); - Text.writeString(out, json); - } - @Override public String toString() { return "OriginStatement{" diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index 8f641070c40e71..d48d1702e1b2db 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -27,6 +27,7 @@ import org.apache.doris.thrift.TFileType; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import java.util.List; @@ -130,7 +131,9 @@ default int getStreamPerNode() { } class ImportColumnDescs { + @SerializedName("des") public List descs = Lists.newArrayList(); + @SerializedName("icdr") public boolean isColumnDescsRewrited = false; public List getFileColNames() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index c60081a4260e7e..74396538492a48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -78,6 +78,7 @@ public enum LoadJobSourceType { ROUTINE_LOAD_TASK(4), // routine load task use this type BATCH_LOAD_JOB(5); // load job v2 for broker load + @SerializedName("f") private final int flag; private LoadJobSourceType(int flag) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java index 0ba52962ec177a..eebd1ec8a5cbda 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java @@ -17,11 +17,15 @@ package org.apache.doris.transaction; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.load.loadv2.LoadJobFinalOperation; import org.apache.doris.load.loadv2.MiniLoadTxnCommitAttachment; import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TTxnCommitAttachment; import org.apache.doris.transaction.TransactionState.LoadJobSourceType; @@ -31,7 +35,7 @@ import java.io.DataOutput; import java.io.IOException; -public abstract class TxnCommitAttachment implements Writable { +public abstract class TxnCommitAttachment implements Writable, GsonPostProcessable { @SerializedName(value = "sourceType") protected TransactionState.LoadJobSourceType sourceType; protected boolean isTypeRead = false; @@ -58,32 +62,40 @@ public static TxnCommitAttachment fromThrift(TTxnCommitAttachment txnCommitAttac } public static TxnCommitAttachment read(DataInput in) throws IOException { - TxnCommitAttachment attachment = null; - LoadJobSourceType type = LoadJobSourceType.valueOf(Text.readString(in)); - if (type == LoadJobSourceType.ROUTINE_LOAD_TASK) { - attachment = new RLTaskTxnCommitAttachment(); - } else if (type == LoadJobSourceType.BATCH_LOAD_JOB) { - attachment = new LoadJobFinalOperation(); - } else if (type == LoadJobSourceType.BACKEND_STREAMING) { - attachment = new MiniLoadTxnCommitAttachment(); - } else if (type == LoadJobSourceType.FRONTEND) { - // spark load - attachment = new LoadJobFinalOperation(); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_137) { + TxnCommitAttachment attachment = null; + LoadJobSourceType type = LoadJobSourceType.valueOf(Text.readString(in)); + if (type == LoadJobSourceType.ROUTINE_LOAD_TASK) { + attachment = new RLTaskTxnCommitAttachment(); + } else if (type == LoadJobSourceType.BATCH_LOAD_JOB) { + attachment = new LoadJobFinalOperation(); + } else if (type == LoadJobSourceType.BACKEND_STREAMING) { + attachment = new MiniLoadTxnCommitAttachment(); + } else if (type == LoadJobSourceType.FRONTEND) { + // spark load + attachment = new LoadJobFinalOperation(); + } else { + throw new IOException("Unknown load job source type: " + type.name()); + } + + attachment.setTypeRead(true); + attachment.readFields(in); + return attachment; } else { - throw new IOException("Unknown load job source type: " + type.name()); + return GsonUtils.GSON.fromJson(Text.readString(in), TxnCommitAttachment.class); } + } - attachment.setTypeRead(true); - attachment.readFields(in); - return attachment; + public void gsonPostProcess() { + setTypeRead(true); } @Override public void write(DataOutput out) throws IOException { - // ATTN: must write type first - Text.writeString(out, sourceType.name()); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Deprecated public void readFields(DataInput in) throws IOException { if (!isTypeRead) { sourceType = LoadJobSourceType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java index c5bf509464e112..1548017b66115b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingDeque; public class RoutineLoadTaskSchedulerTest { @@ -61,7 +62,7 @@ public void testRunOneCycle(@Injectable KafkaRoutineLoadJob kafkaRoutineLoadJob1 MetaNotFoundException, AnalysisException, LabelAlreadyUsedException, BeginTransactionException { long beId = 100L; - Map partitionIdToOffset = Maps.newHashMap(); + ConcurrentMap partitionIdToOffset = Maps.newConcurrentMap(); partitionIdToOffset.put(1, 100L); partitionIdToOffset.put(2, 200L); KafkaProgress kafkaProgress = new KafkaProgress(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 797507e47dece6..03a29fb453f3cd 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -77,6 +77,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; public class GlobalTransactionMgrTest { @@ -315,7 +316,7 @@ public void testCommitRoutineLoadTransaction(@Injectable TabletCommitInfo tablet Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); - Map oldKafkaProgressMap = Maps.newHashMap(); + ConcurrentMap oldKafkaProgressMap = Maps.newConcurrentMap(); oldKafkaProgressMap.put(1, 0L); KafkaProgress oldkafkaProgress = new KafkaProgress(); Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap); @@ -380,7 +381,7 @@ public void testCommitRoutineLoadTransactionWithErrorMax(@Injectable TabletCommi Map idToTransactionState = Maps.newHashMap(); idToTransactionState.put(1L, transactionState); Deencapsulation.setField(routineLoadJob, "maxErrorNum", 10); - Map oldKafkaProgressMap = Maps.newHashMap(); + ConcurrentMap oldKafkaProgressMap = Maps.newConcurrentMap(); oldKafkaProgressMap.put(1, 0L); KafkaProgress oldkafkaProgress = new KafkaProgress(); Deencapsulation.setField(oldkafkaProgress, "partitionIdToOffset", oldKafkaProgressMap);