diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index a54d30d294e4a62..6dc4c5028802d58 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -92,8 +92,10 @@ public final class FeMetaVersion { public static final int VERSION_134 = 134; // For mate gson public static final int VERSION_135 = 135; + // For mate gson + public static final int VERSION_136 = 136; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_135; + public static final int VERSION_CURRENT = VERSION_136; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java index b29938eb9f4fec6..41f28dfe281cee6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/AbstractJob.java @@ -18,11 +18,12 @@ package org.apache.doris.backup; import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; -import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; @@ -169,53 +170,28 @@ public void setTypeRead(boolean isTypeRead) { public abstract Status updateRepo(Repository repo); public static AbstractJob read(DataInput in) throws IOException { - AbstractJob job = null; - JobType type = JobType.valueOf(Text.readString(in)); - if (type == JobType.BACKUP) { - job = new BackupJob(); - } else if (type == JobType.RESTORE) { - job = new RestoreJob(); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + AbstractJob job = null; + JobType type = JobType.valueOf(Text.readString(in)); + if (type == JobType.BACKUP) { + job = new BackupJob(); + } else if (type == JobType.RESTORE) { + job = new RestoreJob(); + } else { + throw new IOException("Unknown job type: " + type.name()); + } + + job.setTypeRead(true); + job.readFields(in); + return job; } else { - throw new IOException("Unknown job type: " + type.name()); + return GsonUtils.GSON.fromJson(Text.readString(in), AbstractJob.class); } - - job.setTypeRead(true); - job.readFields(in); - return job; } @Override public void write(DataOutput out) throws IOException { - // ATTN: must write type first - Text.writeString(out, type.name()); - - out.writeLong(repoId); - Text.writeString(out, label); - out.writeLong(jobId); - out.writeLong(dbId); - Text.writeString(out, dbName); - - out.writeLong(createTime); - out.writeLong(finishedTime); - out.writeLong(timeoutMs); - - if (!taskErrMsg.isEmpty()) { - out.writeBoolean(true); - // we only save at most 3 err msgs - int savedNum = Math.min(3, taskErrMsg.size()); - out.writeInt(savedNum); - for (Map.Entry entry : taskErrMsg.entrySet()) { - if (savedNum == 0) { - break; - } - out.writeLong(entry.getKey()); - Text.writeString(out, entry.getValue()); - savedNum--; - } - Preconditions.checkState(savedNum == 0, savedNum); - } else { - out.writeBoolean(false); - } + Text.writeString(out, GsonUtils.GSON.toJson(this)); } @Deprecated diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java index 89b5ccccb6ee804..d0f02d85ff96cb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java @@ -35,10 +35,12 @@ import org.apache.doris.catalog.Tablet; import org.apache.doris.catalog.View; import org.apache.doris.common.Config; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.persist.BarrierLog; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; import org.apache.doris.task.AgentTaskExecutor; @@ -53,7 +55,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; -import com.google.common.base.Strings; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; @@ -63,7 +64,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.nio.file.FileVisitOption; @@ -977,64 +977,12 @@ private String getBackupObjs() { } public static BackupJob read(DataInput in) throws IOException { - BackupJob job = new BackupJob(); - job.readFields(in); - return job; - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - // table refs - out.writeInt(tableRefs.size()); - for (TableRef tblRef : tableRefs) { - tblRef.write(out); - } - - // state - Text.writeString(out, state.name()); - - // times - out.writeLong(snapshotFinishedTime); - out.writeLong(snapshotUploadFinishedTime); - - // snapshot info - out.writeInt(snapshotInfos.size()); - for (SnapshotInfo info : snapshotInfos.values()) { - info.write(out); - } - - // backup meta - if (backupMeta == null) { - out.writeBoolean(false); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + BackupJob job = new BackupJob(); + job.readFields(in); + return job; } else { - out.writeBoolean(true); - backupMeta.write(out); - } - - // No need to persist job info. It is generated then write to file - - // metaInfoFilePath and jobInfoFilePath - if (Strings.isNullOrEmpty(localMetaInfoFilePath)) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, localMetaInfoFilePath); - } - - if (Strings.isNullOrEmpty(localJobInfoFilePath)) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, localJobInfoFilePath); - } - - // write properties - out.writeInt(properties.size()); - for (Map.Entry entry : properties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); + return GsonUtils.GSON.fromJson(Text.readString(in), BackupJob.class); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java index e9a1866a59b458c..8def02a4456b829 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java @@ -88,7 +88,7 @@ public static BackupMeta fromFile(String filePath, int metaVersion) throws IOExc metaContext.setMetaVersion(metaVersion); metaContext.setThreadLocalInfo(); try (DataInputStream dis = new DataInputStream(new FileInputStream(file))) { - BackupMeta backupMeta = BackupMeta.read(dis); + BackupMeta backupMeta = BackupMeta.read(dis, metaVersion); return backupMeta; } finally { MetaContext.remove(); @@ -110,22 +110,31 @@ public boolean compatibleWith(BackupMeta other) { return false; } + public static BackupMeta read(DataInput in, int metaVersion) throws IOException { + if (metaVersion < FeMetaVersion.VERSION_136) { + BackupMeta backupMeta = new BackupMeta(); + backupMeta.readFields(in); + return backupMeta; + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, BackupMeta.class); + } + } + public static BackupMeta read(DataInput in) throws IOException { - BackupMeta backupMeta = new BackupMeta(); - backupMeta.readFields(in); - return backupMeta; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + BackupMeta backupMeta = new BackupMeta(); + backupMeta.readFields(in); + return backupMeta; + } else { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, BackupMeta.class); + } } @Override public void write(DataOutput out) throws IOException { - out.writeInt(tblNameMap.size()); - for (Table table : tblNameMap.values()) { - table.write(out); - } - out.writeInt(resourceNameMap.size()); - for (Resource resource : resourceNameMap.values()) { - resource.write(out); - } + Text.writeString(out, GsonUtils.GSON.toJson(this)); } public void readFields(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index 903e2ef95efe80e..9b870af8b9dfad3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -67,7 +67,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.persist.gson.GsonPostProcessable; -// import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.resource.Tag; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; @@ -100,7 +100,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.HashMap; import java.util.List; @@ -2097,85 +2096,16 @@ private void setTableStateToNormal(Database db, boolean committed, boolean isRep } } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - Text.writeString(out, backupTimestamp); - jobInfo.write(out); - out.writeBoolean(allowLoad); - - Text.writeString(out, state.name()); - - if (backupMeta != null) { - out.writeBoolean(true); - backupMeta.write(out); + public static RestoreJob read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + RestoreJob job = new RestoreJob(); + job.readFields(in); + return job; } else { - out.writeBoolean(false); - } - - fileMapping.write(out); - - out.writeLong(metaPreparedTime); - out.writeLong(snapshotFinishedTime); - out.writeLong(downloadFinishedTime); - - replicaAlloc.write(out); - - out.writeInt(restoredPartitions.size()); - for (Pair entry : restoredPartitions) { - Text.writeString(out, entry.first); - entry.second.write(out); - } - - out.writeInt(restoredTbls.size()); - for (Table tbl : restoredTbls) { - tbl.write(out); - } - - out.writeInt(restoredVersionInfo.rowKeySet().size()); - for (long tblId : restoredVersionInfo.rowKeySet()) { - out.writeLong(tblId); - out.writeInt(restoredVersionInfo.row(tblId).size()); - for (Map.Entry entry : restoredVersionInfo.row(tblId).entrySet()) { - out.writeLong(entry.getKey()); - out.writeLong(entry.getValue()); - // It is version hash in the past, - // but it useless but should compatible with old version so that write 0 here - out.writeLong(0L); - } - } - - out.writeInt(snapshotInfos.rowKeySet().size()); - for (long tabletId : snapshotInfos.rowKeySet()) { - out.writeLong(tabletId); - Map map = snapshotInfos.row(tabletId); - out.writeInt(map.size()); - for (Map.Entry entry : map.entrySet()) { - out.writeLong(entry.getKey()); - entry.getValue().write(out); - } - } - - out.writeInt(restoredResources.size()); - for (Resource resource : restoredResources) { - resource.write(out); - } - - // write properties - out.writeInt(properties.size()); - for (Map.Entry entry : properties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); + return GsonUtils.GSON.fromJson(Text.readString(in), RestoreJob.class); } } - public static RestoreJob read(DataInput in) throws IOException { - RestoreJob job = new RestoreJob(); - job.readFields(in); - return job; - } - @Deprecated @Override public void readFields(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateFunction.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateFunction.java index 55df22331972413..504b88d5410aa3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AggregateFunction.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -68,16 +69,24 @@ public class AggregateFunction extends Function { public static ImmutableSet SUPPORT_ORDER_BY_AGGREGATE_FUNCTION_NAME_SET = ImmutableSet.of("group_concat"); // Set if different from retType_, null otherwise. + @SerializedName("it") private Type intermediateType; // The symbol inside the binary at location_ that contains this particular. // They can be null if it is not required. + @SerializedName("ufs") private String updateFnSymbol; + @SerializedName("ifs") private String initFnSymbol; + @SerializedName("sfs") private String serializeFnSymbol; + @SerializedName("mfs") private String mergeFnSymbol; + @SerializedName("gvfs") private String getValueFnSymbol; + @SerializedName("rfs") private String removeFnSymbol; + @SerializedName("ffs") private String finalizeFnSymbol; private static String BE_BUILTINS_CLASS = "AggregateFunctions"; @@ -86,6 +95,7 @@ public class AggregateFunction extends Function { // e.g. min(distinct col) == min(col). // TODO: currently it is not possible for user functions to specify this. We should // extend the create aggregate function stmt to allow additional metadata like this. + @SerializedName("igd") private boolean ignoresDistinct; // True if this function can appear within an analytic expr (fn() OVER(...)). @@ -93,9 +103,11 @@ public class AggregateFunction extends Function { // we should identify this property from the function itself (e.g., based on which // functions of the UDA API are implemented). // Currently, there is no reliable way of doing that. + @SerializedName("isAn") private boolean isAnalyticFn; // True if this function can be used for aggregation (without an OVER() clause). + @SerializedName("isAg") private boolean isAggregateFn; // True if this function returns a non-null value on an empty input. It is used @@ -103,9 +115,11 @@ public class AggregateFunction extends Function { // TODO: Instead of manually setting this flag, we should identify this // property from the function itself (e.g. evaluating the function on an // empty input in BE). + @SerializedName("rnno") private boolean returnsNonNullOnEmpty; // use for java-udaf to point the class of user define + @SerializedName("sn") private String symbolName; // only used for serialization diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AliasFunction.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AliasFunction.java index 882689dbfa8a12d..b6bf3be83e2a2ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AliasFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AliasFunction.java @@ -35,6 +35,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -59,7 +60,9 @@ public class AliasFunction extends Function { private static final String DIGITAL_MASKING = "digital_masking"; + @SerializedName("of") private Expr originFunction; + @SerializedName("pm") private List parameters = new ArrayList<>(); private List typeDefParams = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthorizationInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthorizationInfo.java index aed195647df3f0f..7742e1f7cf29e24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthorizationInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthorizationInfo.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -39,7 +40,9 @@ * The job checks the privilege by auth info. */ public class AuthorizationInfo implements Writable { + @SerializedName(value = "dn") private String dbName; + @SerializedName(value = "tn") private Set tableNameList; // only for persist diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerTable.java index 2d827bd50a04e29..cc9c17ead4f8067 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerTable.java @@ -26,12 +26,12 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringEscapeUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; @@ -46,11 +46,16 @@ public class BrokerTable extends Table { private static final String COLUMN_SEPARATOR = "column_separator"; private static final String LINE_DELIMITER = "line_delimiter"; private static final String FILE_FORMAT = "format"; + @SerializedName("bn") private String brokerName; + @SerializedName("ps") private List paths; + @SerializedName("cs") private String columnSeparator; + @SerializedName("ld") private String lineDelimiter; private String fileFormat; + @SerializedName("bp") private Map brokerProperties; public BrokerTable() { @@ -209,24 +214,7 @@ public TTableDescriptor toThrift() { return tTableDescriptor; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - Text.writeString(out, brokerName); - out.writeInt(paths.size()); - for (String path : paths) { - Text.writeString(out, path); - } - Text.writeString(out, columnSeparator); - Text.writeString(out, lineDelimiter); - out.writeInt(brokerProperties.size()); - for (Map.Entry prop : brokerProperties.entrySet()) { - Text.writeString(out, prop.getKey()); - Text.writeString(out, prop.getValue()); - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java index 0fab32a4924c3bc..f06f8262aa228c3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java @@ -35,13 +35,14 @@ import org.apache.doris.common.util.Util; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.persist.CreateTableInfo; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.system.SystemInfoService; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.gson.JsonObject; import com.google.gson.annotations.SerializedName; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; @@ -76,7 +77,7 @@ * if the table has never been loaded * if the table loading failed on the * previous attempt */ -public class Database extends MetaObject implements Writable, DatabaseIf { +public class Database extends MetaObject implements Writable, DatabaseIf
, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(Database.class); private static final String TRANSACTION_QUOTA_SIZE = "transactionQuotaSize"; @@ -108,6 +109,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf
@SerializedName(value = "replicaQuotaSize") private volatile long replicaQuotaSize; + @SerializedName(value = "tq") private volatile long transactionQuotaSize; private volatile boolean isDropped; @@ -124,6 +126,7 @@ public enum DbState { @SerializedName(value = "dbProperties") private DatabaseProperty dbProperties = new DatabaseProperty(); + @SerializedName(value = "bc") private BinlogConfig binlogConfig = new BinlogConfig(); public Database() { @@ -587,9 +590,33 @@ public int getMaxReplicationNum() { } public static Database read(DataInput in) throws IOException { - Database db = new Database(); - db.readFields(in); - return db; + LOG.info("read db from journal {}", in); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + Database db = new Database(); + db.readFields(in); + return db; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), Database.class); + } + } + + @Override + public void gsonPostProcess() throws IOException { + fullQualifiedName = ClusterNamespace.getNameFromFullName(fullQualifiedName); + nameToTable.forEach((tn, tb) -> { + tb.setQualifiedDbName(fullQualifiedName); + if (tb instanceof MTMV) { + Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) tb, id); + } + idToTable.put(tb.getId(), tb); + lowerCaseToTableName.put(tn.toLowerCase(), tn); + }); + + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { + transactionQuotaSize = Config.default_db_max_running_txn_num == -1L + ? Config.max_running_txn_num_per_db + : Config.default_db_max_running_txn_num; + } } @Override @@ -605,33 +632,16 @@ public String getSignature(int signatureVersion) { @Override public void write(DataOutput out) throws IOException { - super.write(out); - - out.writeLong(id); - Text.writeString(out, ClusterNamespace.getNameFromFullName(fullQualifiedName)); - // write tables discardHudiTable(); - int numTables = nameToTable.size(); - out.writeInt(numTables); - for (Map.Entry entry : nameToTable.entrySet()) { - entry.getValue().write(out); - } - - out.writeLong(dataQuotaBytes); - Text.writeString(out, SystemInfoService.DEFAULT_CLUSTER); - Text.writeString(out, dbState.name()); - Text.writeString(out, attachDbName); - - // write functions - FunctionUtil.write(out, name2Function); - - // write encryptKeys - dbEncryptKey.write(out); - - out.writeLong(replicaQuotaSize); - dbProperties.write(out); + String json = GsonUtils.GSON.toJson(this); + JsonObject jsonObject = GsonUtils.GSON.fromJson(json, JsonObject.class); + String fqn = ClusterNamespace.getNameFromFullName(jsonObject.get("fullQualifiedName").getAsString()); + jsonObject.remove("fullQualifiedName"); + jsonObject.addProperty("fullQualifiedName", fqn); + Text.writeString(out, GsonUtils.GSON.toJson(jsonObject)); } + private void discardHudiTable() { Iterator> iterator = nameToTable.entrySet().iterator(); while (iterator.hasNext()) { @@ -650,6 +660,7 @@ public void analyze() { } } + @Deprecated @Override public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 4b33b2079b4fdf4..b06f803df1d80ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2085,10 +2085,12 @@ public long loadFrontends(DataInputStream dis, long checksum) throws IOException } public long loadBackends(DataInputStream dis, long checksum) throws IOException { + LOG.info("start loading backends from image"); return systemInfo.loadBackends(dis, checksum); } public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException { + LOG.info("start loading db from image"); return getInternalCatalog().loadDb(dis, checksum); } @@ -2315,6 +2317,7 @@ public long loadPolicy(DataInputStream in, long checksum) throws IOException { * Load catalogs through file. **/ public long loadCatalog(DataInputStream in, long checksum) throws IOException { + LOG.info("start loading catalog from image"); CatalogMgr mgr = CatalogMgr.read(in); // When enable the multi catalog in the first time, the "mgr" will be a null value. // So ignore it to use default catalog manager. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java index 0926554f022342c..f31d1bd82d50a56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java @@ -28,6 +28,7 @@ import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; +import com.google.gson.annotations.SerializedName; import lombok.Getter; import lombok.Setter; import org.apache.commons.codec.digest.DigestUtils; @@ -36,7 +37,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -65,6 +65,7 @@ public class EsTable extends Table { // Here we have a slightly conservative value of 20, but at the same time // we also provide configurable parameters for expert-using // @see `MAX_DOCVALUE_FIELDS` + @Getter private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20; private String hosts; private String[] seeds; @@ -77,6 +78,7 @@ public class EsTable extends Table { private String mappingType = null; // only save the partition definition, save the partition key, // partition list is got from es cluster dynamically and is saved in esTableState + @SerializedName("pi") private PartitionInfo partitionInfo; private EsTablePartitions esTablePartitions; @@ -101,6 +103,7 @@ public class EsTable extends Table { private boolean includeHiddenIndex = Boolean.parseBoolean(EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE); // tableContext is used for being convenient to persist some configuration parameters uniformly + @SerializedName("tc") private Map tableContext = new HashMap<>(); // record the latest and recently exception when sync ES table metadata (mapping, shard location) @@ -261,18 +264,7 @@ public String getSignature(int signatureVersion) { return md5; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeInt(tableContext.size()); - for (Map.Entry entry : tableContext.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - Text.writeString(out, partitionInfo.getType().name()); - partitionInfo.write(out); - } - + @Deprecated @Override public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java index 4d9c97e8dd9b4e5..753b62efa6d0081 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java @@ -35,6 +35,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import org.apache.commons.io.output.NullOutputStream; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -107,38 +108,49 @@ public enum NullableMode { public static final long UNIQUE_FUNCTION_ID = 0; // Function id, every function has a unique id. Now all built-in functions' id is 0 + @SerializedName("id") private long id = 0; // User specified function name e.g. "Add" + @SerializedName("n") private FunctionName name; + @SerializedName("rt") private Type retType; // Array of parameter types. empty array if this function does not have parameters. + @SerializedName("at") private Type[] argTypes; // If true, this function has variable arguments. // TODO: we don't currently support varargs with no fixed types. i.e. fn(...) + @SerializedName("hva") private boolean hasVarArgs; // If true (default), this function is called directly by the user. For operators, // this is false. If false, it also means the function is not visible from // 'show functions'. + @SerializedName("uv") private boolean userVisible; // Absolute path in HDFS for the binary that contains this function. // e.g. /udfs/udfs.jar + @SerializedName("l") private URI location; + @SerializedName("bt") private TFunctionBinaryType binaryType; private Function nestedFunction = null; + @SerializedName("nm") protected NullableMode nullableMode = NullableMode.DEPEND_ON_ARGUMENT; protected boolean vectorized = true; // library's checksum to make sure all backends use one library to serve user's request + @SerializedName("cs") protected String checksum = ""; // If true, this function is global function protected boolean isGlobal = false; // If true, this function is table function, mainly used by java-udtf + @SerializedName("isU") protected boolean isUDTFunction = false; // Only used for serialization diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java index d1cf421aa8648e7..717c69f667c8039 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java @@ -29,10 +29,10 @@ import com.google.common.base.Strings; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -49,8 +49,11 @@ public class HiveTable extends Table { private static final String PROPERTY_ERROR_MSG = "Hive table properties('%s'='%s')" + " is illegal or not supported. Please check it"; + @SerializedName("hdb") private String hiveDb; + @SerializedName("ht") private String hiveTable; + @SerializedName("hp") private Map hiveProperties = Maps.newHashMap(); public static final String HIVE_DB = "database"; @@ -172,19 +175,7 @@ private void validate(Map properties) throws DdlException { } } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - Text.writeString(out, hiveDb); - Text.writeString(out, hiveTable); - out.writeInt(hiveProperties.size()); - for (Map.Entry entry : hiveProperties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index d81b82fd3eb111b..34fe4ba095ad898 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import lombok.Setter; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.collections.map.CaseInsensitiveMap; @@ -38,7 +39,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Collections; import java.util.List; @@ -65,23 +65,36 @@ public class JdbcTable extends Table { private static final String DRIVER_URL = "driver_url"; private static final String CHECK_SUM = "checksum"; private static Map TABLE_TYPE_MAP; + @SerializedName("rn") private String resourceName; + @SerializedName("etn") private String externalTableName; // real name only for jdbc catalog + @SerializedName("rdn") private String remoteDatabaseName; + @SerializedName("rtn") private String remoteTableName; + @SerializedName("rcn") private Map remoteColumnNames; + @SerializedName("jtn") private String jdbcTypeName; + @SerializedName("jurl") private String jdbcUrl; + @SerializedName("jusr") private String jdbcUser; + @SerializedName("jpwd") private String jdbcPasswd; + @SerializedName("dc") private String driverClass; + @SerializedName("du") private String driverUrl; + @SerializedName("cs") private String checkSum; + @SerializedName("cid") private long catalogId = -1; private int connectionPoolMinSize; @@ -235,37 +248,7 @@ public TTableDescriptor toThrift() { return tTableDescriptor; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - Map serializeMap = Maps.newHashMap(); - serializeMap.put(CATALOG_ID, String.valueOf(catalogId)); - serializeMap.put(TABLE, externalTableName); - serializeMap.put(RESOURCE, resourceName); - serializeMap.put(TABLE_TYPE, jdbcTypeName); - serializeMap.put(URL, jdbcUrl); - serializeMap.put(USER, jdbcUser); - serializeMap.put(PASSWORD, jdbcPasswd); - serializeMap.put(DRIVER_CLASS, driverClass); - serializeMap.put(DRIVER_URL, driverUrl); - serializeMap.put(CHECK_SUM, checkSum); - serializeMap.put(REMOTE_DATABASE, remoteDatabaseName); - serializeMap.put(REMOTE_TABLE, remoteTableName); - serializeMap.put(REMOTE_COLUMNS, objectMapper.writeValueAsString(remoteColumnNames)); - - int size = (int) serializeMap.values().stream().filter(v -> { - return v != null; - }).count(); - out.writeInt(size); - - for (Map.Entry kv : serializeMap.entrySet()) { - if (kv.getValue() != null) { - Text.writeString(out, kv.getKey()); - Text.writeString(out, kv.getValue()); - } - } - } - + @Deprecated @Override public void readFields(DataInput in) throws IOException { super.readFields(in); @@ -362,8 +345,8 @@ public String getSignature(int signatureVersion) { @Override public JdbcTable clone() { - JdbcTable copied = new JdbcTable(); - if (!DeepCopy.copy(this, copied, JdbcTable.class, FeConstants.meta_version)) { + JdbcTable copied = DeepCopy.copy(this, JdbcTable.class, FeConstants.meta_version); + if (copied == null) { LOG.warn("failed to copy jdbc table: " + getName()); return null; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java index ce8d4303cbec2d9..09d8494ea2c5b91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionInfo.java @@ -35,7 +35,6 @@ import com.google.common.collect.Maps; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -136,29 +135,7 @@ public void checkPartitionItemListsConflict(List list1, ListUtil.checkListsConflict(list1, list2); } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - // partition columns - out.writeInt(partitionColumns.size()); - for (Column column : partitionColumns) { - column.write(out); - } - - out.writeInt(idToItem.size()); - for (Map.Entry entry : idToItem.entrySet()) { - out.writeLong(entry.getKey()); - entry.getValue().write(out); - } - - out.writeInt(idToTempItem.size()); - for (Map.Entry entry : idToTempItem.entrySet()) { - out.writeLong(entry.getKey()); - entry.getValue().write(out); - } - - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java index dafdcdc49f5393d..937ef6a561e7668 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; import java.io.DataOutput; @@ -37,6 +38,7 @@ public class ListPartitionItem extends PartitionItem { public static final ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList()); + @SerializedName(value = "partitionKeys") private final List partitionKeys; private boolean isDefaultPartition = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java index f238cf2f1abce5b..b0e158f89d21dde 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MTMV.java @@ -56,7 +56,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Map; import java.util.Map.Entry; @@ -449,12 +448,7 @@ public void writeMvUnlock() { this.mvRwLock.writeLock().unlock(); } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - Text.writeString(out, GsonUtils.GSON.toJson(this)); - } - + @Deprecated @Override public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java index d2a15d3b1643d03..a6c27cc16cad2c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetadataViewer.java @@ -297,7 +297,7 @@ private static List> getDataSkew(String dbName, String tblName, Par for (Partition p : olapTable.getPartitions()) { allPartionNames.put(p.getName(), false); } - for (Partition p : olapTable.getTempPartitions()) { + for (Partition p : olapTable.getAllTempPartitions()) { allPartionNames.put(p.getName(), true); } } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java index b815dbf3410eaaa..f8f5265c88f7e38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java @@ -27,12 +27,12 @@ import com.google.common.base.Strings; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.List; import java.util.Map; @@ -49,13 +49,21 @@ public class MysqlTable extends Table { private static final String MYSQL_TABLE = "table"; private static final String MYSQL_CHARSET = "charset"; + @SerializedName("ocrn") private String odbcCatalogResourceName; + @SerializedName("h") private String host; + @SerializedName("p") private String port; + @SerializedName("un") private String userName; + @SerializedName("pwd") private String passwd; + @SerializedName("mdn") private String mysqlDatabaseName; + @SerializedName("mtn") private String mysqlTableName; + @SerializedName("c") private String charset; public MysqlTable() { @@ -239,32 +247,7 @@ public String getSignature(int signatureVersion) { return md5; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - Map serializeMap = Maps.newHashMap(); - serializeMap.put(ODBC_CATALOG_RESOURCE, odbcCatalogResourceName); - serializeMap.put(MYSQL_HOST, host); - serializeMap.put(MYSQL_PORT, port); - serializeMap.put(MYSQL_USER, userName); - serializeMap.put(MYSQL_PASSWORD, passwd); - serializeMap.put(MYSQL_DATABASE, mysqlDatabaseName); - serializeMap.put(MYSQL_TABLE, mysqlTableName); - serializeMap.put(MYSQL_CHARSET, charset); - - int size = (int) serializeMap.values().stream().filter(v -> { - return v != null; - }).count(); - out.writeInt(size); - for (Map.Entry kv : serializeMap.entrySet()) { - if (kv.getValue() != null) { - Text.writeString(out, kv.getKey()); - Text.writeString(out, kv.getValue()); - } - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java index 76444b03deedc45..eb54ddf73aebefc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java @@ -30,12 +30,12 @@ import com.google.common.base.Strings; import com.google.common.collect.Maps; +import com.google.gson.annotations.SerializedName; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -71,16 +71,27 @@ public class OdbcTable extends Table { TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap); } + @SerializedName("ocrn") private String odbcCatalogResourceName; + @SerializedName("h") private String host; + @SerializedName("p") private String port; + @SerializedName("un") private String userName; + @SerializedName("pwd") private String passwd; + @SerializedName("odn") private String odbcDatabaseName; + @SerializedName("otn") private String odbcTableName; + @SerializedName("d") private String driver; + @SerializedName("ottn") private String odbcTableTypeName; + @SerializedName("c") private String charset; + @SerializedName("ep") private String extraParam; private Map resourceProperties; @@ -366,8 +377,8 @@ public TOdbcTableType getOdbcTableType() { @Override public OdbcTable clone() { - OdbcTable copied = new OdbcTable(); - if (!DeepCopy.copy(this, copied, OdbcTable.class, FeConstants.meta_version)) { + OdbcTable copied = DeepCopy.copy(this, OdbcTable.class, FeConstants.meta_version); + if (copied == null) { LOG.warn("failed to copy odbc table: " + getName()); return null; } @@ -422,37 +433,7 @@ public String getSignature(int signatureVersion) { return md5; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - Map serializeMap = Maps.newHashMap(); - - serializeMap.put(ODBC_CATALOG_RESOURCE, odbcCatalogResourceName); - serializeMap.put(ODBC_HOST, host); - serializeMap.put(ODBC_PORT, port); - serializeMap.put(ODBC_USER, userName); - serializeMap.put(ODBC_PASSWORD, passwd); - serializeMap.put(ODBC_DATABASE, odbcDatabaseName); - serializeMap.put(ODBC_TABLE, odbcTableName); - serializeMap.put(ODBC_DRIVER, driver); - serializeMap.put(ODBC_TYPE, odbcTableTypeName); - serializeMap.put(ODBC_CHARSET, charset); - serializeMap.put(ODBC_EXTRA_PARAM, extraParam); - - int size = (int) serializeMap.values().stream().filter(v -> { - return v != null; - }).count(); - out.writeInt(size); - - for (Map.Entry kv : serializeMap.entrySet()) { - if (kv.getValue() != null) { - Text.writeString(out, kv.getKey()); - Text.writeString(out, kv.getValue()); - } - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 0b3cd21f9e6d030..5e5a4f849ff02a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -54,6 +54,8 @@ import org.apache.doris.mtmv.MTMVRelatedTableIf; import org.apache.doris.mtmv.MTMVSnapshotIf; import org.apache.doris.mtmv.MTMVVersionSnapshot; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.StmtExecutor; @@ -89,6 +91,7 @@ import com.google.common.collect.Sets; import com.google.gson.annotations.SerializedName; import lombok.Getter; +import lombok.Setter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -111,7 +114,7 @@ * Internal representation of tableFamilyGroup-related metadata. A OlaptableFamilyGroup contains several tableFamily. * Note: when you add a new olap table property, you should modify TableProperty class */ -public class OlapTable extends Table implements MTMVRelatedTableIf { +public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(OlapTable.class); public enum OlapTableState { @@ -143,27 +146,32 @@ public enum OlapTableState { @SerializedName("keysType") private KeysType keysType; + @Setter @SerializedName("partitionInfo") private PartitionInfo partitionInfo; @SerializedName("idToPartition") + @Getter private Map idToPartition = new HashMap<>(); + // handled in postgsonprocess + @Getter private Map nameToPartition = Maps.newTreeMap(); @SerializedName(value = "distributionInfo") private DistributionInfo defaultDistributionInfo; // all info about temporary partitions are save in "tempPartitions" + @Getter @SerializedName(value = "tempPartitions") private TempPartitions tempPartitions = new TempPartitions(); // bloom filter columns @SerializedName(value = "bfColumns") - private Set bfColumns; + @SerializedName(value = "bfFpp") private double bfFpp; - @SerializedName(value = "colocateGroup") + @SerializedName(value = "colocateGroup") private String colocateGroup; private boolean hasSequenceCol; @@ -184,6 +192,7 @@ public enum OlapTableState { @SerializedName(value = "tableProperty") private TableProperty tableProperty; + @SerializedName(value = "aIncg") private AutoIncrementGenerator autoIncrementGenerator; private volatile Statistics statistics = new Statistics(); @@ -1172,7 +1181,7 @@ public Collection getPartitions() { } // get only temp partitions - public Collection getTempPartitions() { + public Collection getAllTempPartitions() { return tempPartitions.getAllPartitions(); } @@ -1536,82 +1545,7 @@ public boolean isPartitionDistributed() { @Override public void write(DataOutput out) throws IOException { - super.write(out); - - // state - Text.writeString(out, state.name()); - - // indices' schema - int counter = indexNameToId.size(); - out.writeInt(counter); - for (Map.Entry entry : indexNameToId.entrySet()) { - String indexName = entry.getKey(); - long indexId = entry.getValue(); - Text.writeString(out, indexName); - out.writeLong(indexId); - indexIdToMeta.get(indexId).write(out); - } - - Text.writeString(out, keysType.name()); - Text.writeString(out, partitionInfo.getType().name()); - partitionInfo.write(out); - Text.writeString(out, defaultDistributionInfo.getType().name()); - defaultDistributionInfo.write(out); - - // partitions - int partitionCount = idToPartition.size(); - out.writeInt(partitionCount); - for (Partition partition : idToPartition.values()) { - partition.write(out); - } - - // bloom filter columns - if (bfColumns == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - out.writeInt(bfColumns.size()); - for (String bfColumn : bfColumns) { - Text.writeString(out, bfColumn); - } - out.writeDouble(bfFpp); - } - - // colocateTable - if (colocateGroup == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - Text.writeString(out, colocateGroup); - } - - out.writeLong(baseIndexId); - - // write indexes - if (indexes != null) { - out.writeBoolean(true); - indexes.write(out); - } else { - out.writeBoolean(false); - } - - // tableProperty - if (tableProperty == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - tableProperty.write(out); - } - - // autoIncrementGenerator - if (autoIncrementGenerator == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - autoIncrementGenerator.write(out); - } - - tempPartitions.write(out); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } @Override @@ -1645,6 +1579,7 @@ public void readFields(DataInput in) throws IOException { // partition and distribution info keysType = KeysType.valueOf(Text.readString(in)); + // useless code see pr 3036 // add the correct keys type in tmp index meta for (MaterializedIndexMeta indexMeta : tmpIndexMetaList) { indexMeta.setKeysType(keysType); @@ -1733,9 +1668,56 @@ public void readFields(DataInput in) throws IOException { rebuildFullSchema(); } + @Override + public void gsonPostProcess() throws IOException { + + // HACK: the index id in MaterializedIndexMeta is not equals to the index id + // saved in OlapTable, because the table restore from snapshot is not reset + // the MaterializedIndexMeta correctly. + // for each index, reset the index id in MaterializedIndexMeta + for (Map.Entry entry : indexIdToMeta.entrySet()) { + long indexId = entry.getKey(); + MaterializedIndexMeta indexMeta = entry.getValue(); + if (indexMeta.getIndexId() != indexId) { + LOG.warn("HACK: the index id {} in materialized index meta of {} is not equals" + + " to the index saved in table {} ({}), reset it to {}", + indexMeta.getIndexId(), indexNameToId.get(indexId), name, id, indexId); + indexMeta.resetIndexIdForRestore(indexId); + } + } + + // for each idToPartition, add partition to nameToPartition + for (Partition partition : idToPartition.values()) { + nameToPartition.put(partition.getName(), partition); + } + + if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_124 + && autoIncrementGenerator != null) { + autoIncrementGenerator.setEditLog(Env.getCurrentEnv().getEditLog()); + } + if (isAutoBucket()) { + defaultDistributionInfo.markAutoBucket(); + } + RangePartitionInfo tempRangeInfo = tempPartitions.getPartitionInfo(); + if (tempRangeInfo != null) { + for (long partitionId : tempRangeInfo.getIdToItem(false).keySet()) { + this.partitionInfo.addPartition(partitionId, true, + tempRangeInfo.getItem(partitionId), tempRangeInfo.getDataProperty(partitionId), + tempRangeInfo.getReplicaAllocation(partitionId), tempRangeInfo.getIsInMemory(partitionId), + tempRangeInfo.getIsMutable(partitionId)); + } + } + tempPartitions.unsetPartitionInfo(); + + // In the present, the fullSchema could be rebuilt by schema change while the properties is changed by MV. + // After that, some properties of fullSchema and nameToColumn may be not same as properties of base columns. + // So, here we need to rebuild the fullSchema to ensure the correctness of the properties. + rebuildFullSchema(); + } + public OlapTable selectiveCopy(Collection reservedPartitions, IndexExtState extState, boolean isForBackup) { - OlapTable copied = new OlapTable(); - if (!DeepCopy.copy(this, copied, OlapTable.class, FeConstants.meta_version)) { + OlapTable copied = DeepCopy.copy(this, OlapTable.class, FeConstants.meta_version); + if (copied == null) { LOG.warn("failed to copy olap table: " + getName()); return null; } @@ -1797,6 +1779,15 @@ public OlapTable selectiveCopy(Collection reservedPartitions, IndexExtSt return copied; } + public static OlapTable read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + OlapTable t = new OlapTable(); + t.readFields(in); + return t; + } + return (OlapTable) GsonUtils.GSON.fromJson(Text.readString(in), OlapTable.class); + } + /* * this method is currently used for truncating table(partitions). * the new partition has new id, so we need to change all 'id-related' members diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java index 5077649de98a953..10c01f5f2dea353 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java @@ -30,6 +30,7 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTabletType; @@ -61,9 +62,12 @@ public class PartitionInfo implements Writable { @SerializedName("Type") protected PartitionType type; // partition columns for list and range partitions + @SerializedName("pc") protected List partitionColumns = Lists.newArrayList(); // formal partition id -> partition item + @SerializedName("IdToItem") protected Map idToItem = Maps.newHashMap(); + @SerializedName("IdToTempItem") // temp partition id -> partition item protected Map idToTempItem = Maps.newHashMap(); // partition id -> data property @@ -75,6 +79,7 @@ public class PartitionInfo implements Writable { @SerializedName("IdToReplicaAllocation") protected Map idToReplicaAllocation; // true if the partition has multi partition columns + @SerializedName("isM") protected boolean isMultiColumnPartition = false; @SerializedName("IdToInMemory") @@ -357,9 +362,13 @@ public void addPartition(long partitionId, DataProperty dataProperty, } public static PartitionInfo read(DataInput in) throws IOException { - PartitionInfo partitionInfo = new PartitionInfo(); - partitionInfo.readFields(in); - return partitionInfo; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + PartitionInfo partitionInfo = new PartitionInfo(); + partitionInfo.readFields(in); + return partitionInfo; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), PartitionInfo.class); + } } public boolean isMultiColumnPartition() { @@ -424,40 +433,10 @@ public void resetPartitionIdForRestore( @Override public void write(DataOutput out) throws IOException { - Text.writeString(out, type.name()); - - Preconditions.checkState(idToDataProperty.size() == idToReplicaAllocation.size()); - Preconditions.checkState(idToInMemory.keySet().equals(idToReplicaAllocation.keySet())); - out.writeInt(idToDataProperty.size()); - for (Map.Entry entry : idToDataProperty.entrySet()) { - out.writeLong(entry.getKey()); - if (entry.getValue().equals(DataProperty.DEFAULT_HDD_DATA_PROPERTY)) { - out.writeBoolean(true); - } else { - out.writeBoolean(false); - entry.getValue().write(out); - } - - idToReplicaAllocation.get(entry.getKey()).write(out); - out.writeBoolean(idToInMemory.get(entry.getKey())); - if (Config.isCloudMode()) { - // HACK: the origin implementation of the cloud mode has code likes: - // - // out.writeBoolean(idToPersistent.get(entry.getKey())); - // - // keep the compatibility here. - out.writeBoolean(false); - } - } - int size = partitionExprs.size(); - out.writeInt(size); - for (int i = 0; i < size; ++i) { - Expr e = this.partitionExprs.get(i); - Expr.writeTo(e, out); - } - out.writeBoolean(isAutoCreatePartitions); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Deprecated public void readFields(DataInput in) throws IOException { type = PartitionType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java index 0426759fe4f1d67..446797f7e2bae2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -41,6 +41,8 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; import com.google.gson.JsonParseException; import com.google.gson.JsonPrimitive; @@ -524,7 +526,9 @@ public int hashCode() { return hashCode + ret; } - public static class PartitionKeySerializer implements JsonSerializer { + // added by ccr, and we have to follow. + public static class PartitionKeySerializer + implements JsonSerializer, JsonDeserializer { @Override public JsonElement serialize(PartitionKey partitionKey, java.lang.reflect.Type reflectType, JsonSerializationContext context) { @@ -596,6 +600,82 @@ public JsonElement serialize(PartitionKey partitionKey, java.lang.reflect.Type r return result; } + + @Override + public PartitionKey deserialize(JsonElement json, java.lang.reflect.Type typeOfT, + JsonDeserializationContext context) throws JsonParseException { + PartitionKey partitionKey = new PartitionKey(); + JsonArray jsonArray = json.getAsJsonArray(); + for (int i = 0; i < jsonArray.size(); i++) { + JsonArray typeAndKey = jsonArray.get(i).getAsJsonArray(); + PrimitiveType type = PrimitiveType.valueOf(typeAndKey.get(0).getAsString()); + if (type == PrimitiveType.NULL_TYPE) { + String realType = typeAndKey.get(1).getAsString(); + type = PrimitiveType.valueOf(realType); + partitionKey.types.add(type); + partitionKey.keys.add(NullLiteral.create(Type.fromPrimitiveType(type))); + continue; + } + LiteralExpr literal = null; + partitionKey.types.add(type); + if (typeAndKey.get(1).getAsString().equals("MAX_VALUE")) { + literal = MaxLiteral.MAX_VALUE; + } else { + switch (type) { + case TINYINT: + case SMALLINT: + case INT: + case BIGINT: { + long value = typeAndKey.get(1).getAsLong(); + literal = new IntLiteral(value); + } + break; + case LARGEINT: { + String value = typeAndKey.get(1).getAsString(); + try { + literal = new LargeIntLiteral(value); + } catch (AnalysisException e) { + throw new JsonParseException("LargeIntLiteral deserialize failed: " + e.getMessage()); + } + } + break; + case DATE: + case DATETIME: + case DATEV2: + case DATETIMEV2: { + String value = typeAndKey.get(1).getAsString(); + try { + literal = new DateLiteral(value); + } catch (AnalysisException e) { + throw new JsonParseException("DateLiteral deserialize failed: " + e.getMessage()); + } + } + break; + case CHAR: + case VARCHAR: + case STRING: { + String value = typeAndKey.get(1).getAsString(); + literal = new StringLiteral(value); + } + break; + case BOOLEAN: { + boolean value = typeAndKey.get(1).getAsBoolean(); + literal = new BoolLiteral(value); + } + break; + default: + throw new JsonParseException( + "type[" + type.name() + "] not supported: "); + } + } + if (type != PrimitiveType.DATETIMEV2) { + literal.setType(Type.fromPrimitiveType(type)); + } + + partitionKey.keys.add(literal); + } + return partitionKey; + } } // for test diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java index caa77f660bd8fad..57c99e20a0cf535 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionInfo.java @@ -34,7 +34,6 @@ import com.google.common.collect.Range; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -210,29 +209,7 @@ public static PartitionInfo read(DataInput in) throws IOException { return partitionInfo; } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - - // partition columns - out.writeInt(partitionColumns.size()); - for (Column column : partitionColumns) { - column.write(out); - } - - out.writeInt(idToItem.size()); - for (Map.Entry entry : idToItem.entrySet()) { - out.writeLong(entry.getKey()); - entry.getValue().write(out); - } - - out.writeInt(idToTempItem.size()); - for (Map.Entry entry : idToTempItem.entrySet()) { - out.writeLong(entry.getKey()); - entry.getValue().write(out); - } - } - + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index bb7ddabbaa4b7b6..34f82a00e0e06f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -23,12 +23,14 @@ import org.apache.doris.mtmv.MTMVUtil; import com.google.common.collect.Range; +import com.google.gson.annotations.SerializedName; import java.io.DataOutput; import java.io.IOException; import java.util.Optional; public class RangePartitionItem extends PartitionItem { + @SerializedName(value = "range") private Range partitionKeyRange; public static final Range DUMMY_RANGE; public static final RangePartitionItem DUMMY_ITEM; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java index 7d8368646f2b49a..e5a86816fa43183 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java @@ -28,6 +28,7 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -48,8 +49,11 @@ public class ScalarFunction extends Function { private static final Logger LOG = LogManager.getLogger(ScalarFunction.class); // The name inside the binary at location_ that contains this particular // function. e.g. org.example.MyUdf.class. + @SerializedName("sn") private String symbolName; + @SerializedName("pfs") private String prepareFnSymbol; + @SerializedName("cfs") private String closeFnSymbol; // Only used for serialization diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SinglePartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SinglePartitionInfo.java index 4a24135954324bb..56c17f5a2b3c870 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SinglePartitionInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SinglePartitionInfo.java @@ -18,7 +18,6 @@ package org.apache.doris.catalog; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; public class SinglePartitionInfo extends PartitionInfo { @@ -31,13 +30,4 @@ public static PartitionInfo read(DataInput in) throws IOException { partitionInfo.readFields(in); return partitionInfo; } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - } - - public void readFields(DataInput in) throws IOException { - super.readFields(in); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index a7667ec2e314266..77b6632fa6af216 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -30,6 +30,7 @@ import org.apache.doris.common.util.QueryableReentrantReadWriteLock; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; @@ -64,7 +65,7 @@ /** * Internal representation of table-related metadata. A table contains several partitions. */ -public abstract class Table extends MetaObject implements Writable, TableIf { +public abstract class Table extends MetaObject implements Writable, TableIf, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(Table.class); // empirical value. @@ -84,7 +85,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { @SerializedName(value = "createTime") protected long createTime; protected QueryableReentrantReadWriteLock rwLock; - // Used for queuing commit transaction tasks to avoid fdb transaction conflicts, + // Used for queuing commit transactifon tasks to avoid fdb transaction conflicts, // especially to reduce conflicts when obtaining delete bitmap update locks for // MoW table protected ReentrantLock commitLock; @@ -441,59 +442,61 @@ public TTableDescriptor toThrift() { } public static Table read(DataInput in) throws IOException { - Table table = null; - TableType type = TableType.valueOf(Text.readString(in)); - if (type == TableType.OLAP) { - table = new OlapTable(); - } else if (type == TableType.MATERIALIZED_VIEW) { - table = new MTMV(); - } else if (type == TableType.ODBC) { - table = new OdbcTable(); - } else if (type == TableType.MYSQL) { - table = new MysqlTable(); - } else if (type == TableType.VIEW) { - table = new View(); - } else if (type == TableType.BROKER) { - table = new BrokerTable(); - } else if (type == TableType.ELASTICSEARCH) { - table = new EsTable(); - } else if (type == TableType.HIVE) { - table = new HiveTable(); - } else if (type == TableType.JDBC) { - table = new JdbcTable(); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + Table table = null; + TableType type = TableType.valueOf(Text.readString(in)); + if (type == TableType.OLAP) { + table = new OlapTable(); + } else if (type == TableType.MATERIALIZED_VIEW) { + table = new MTMV(); + } else if (type == TableType.ODBC) { + table = new OdbcTable(); + } else if (type == TableType.MYSQL) { + table = new MysqlTable(); + } else if (type == TableType.VIEW) { + table = new View(); + } else if (type == TableType.BROKER) { + table = new BrokerTable(); + } else if (type == TableType.ELASTICSEARCH) { + table = new EsTable(); + } else if (type == TableType.HIVE) { + table = new HiveTable(); + } else if (type == TableType.JDBC) { + table = new JdbcTable(); + } else { + throw new IOException("Unknown table type: " + type.name()); + } + + table.setTypeRead(true); + table.readFields(in); + return table; } else { - throw new IOException("Unknown table type: " + type.name()); + return GsonUtils.GSON.fromJson(Text.readString(in), Table.class); } - - table.setTypeRead(true); - table.readFields(in); - return table; } @Override - public void write(DataOutput out) throws IOException { - // ATTN: must write type first - Text.writeString(out, type.name()); - - // write last check time - super.write(out); - - out.writeLong(id); - Text.writeString(out, name); + public void gsonPostProcess() throws IOException { + List keys = Lists.newArrayList(); - // base schema - int columnCount = fullSchema.size(); - out.writeInt(columnCount); for (Column column : fullSchema) { - column.write(out); + if (column.isKey()) { + keys.add(column); + } + this.nameToColumn.put(column.getName(), column); } - Text.writeString(out, comment); - // write table attributes - Text.writeString(out, GsonUtils.GSON.toJson(tableAttributes)); - // write create time - out.writeLong(createTime); + if (keys.size() > 1) { + keys.forEach(key -> key.setCompoundKey(true)); + hasCompoundKey = true; + } + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, GsonUtils.GSON.toJson(this)); } + @Deprecated public void readFields(DataInput in) throws IOException { if (!isTypeRead) { type = TableType.valueOf(Text.readString(in)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index 479251ab9d99819..808b63fadc8001d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -24,6 +24,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.persist.OperationType; +import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TCompressionType; import org.apache.doris.thrift.TInvertedIndexStorageFormat; @@ -54,7 +55,7 @@ * Different properties is recognized by prefix such as dynamic_partition * If there is different type properties is added, write a method such as buildDynamicProperty to build it. */ -public class TableProperty implements Writable { +public class TableProperty implements Writable, GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(TableProperty.class); @SerializedName(value = "properties") @@ -627,46 +628,50 @@ public void write(DataOutput out) throws IOException { } public static TableProperty read(DataInput in) throws IOException { - TableProperty tableProperty = GsonUtils.GSON.fromJson(Text.readString(in), TableProperty.class) - .executeBuildDynamicProperty() - .buildInMemory() - .buildMinLoadReplicaNum() - .buildStorageMedium() - .buildStorageFormat() - .buildInvertedIndexStorageFormat() - .buildDataSortInfo() - .buildCompressionType() - .buildStoragePolicy() - .buildIsBeingSynced() - .buildBinlogConfig() - .buildEnableLightSchemaChange() - .buildStoreRowColumn() - .buildRowStoreColumns() - .buildSkipWriteIndexOnLoad() - .buildCompactionPolicy() - .buildTimeSeriesCompactionGoalSizeMbytes() - .buildTimeSeriesCompactionFileCountThreshold() - .buildTimeSeriesCompactionTimeThresholdSeconds() - .buildDisableAutoCompaction() - .buildEnableSingleReplicaCompaction() - .buildTimeSeriesCompactionEmptyRowsetsThreshold() - .buildTimeSeriesCompactionLevelThreshold() - .buildTTLSeconds(); + TableProperty tableProperty = GsonUtils.GSON.fromJson(Text.readString(in), TableProperty.class); + return tableProperty; + } + + public void gsonPostProcess() throws IOException { + executeBuildDynamicProperty(); + buildInMemory(); + buildMinLoadReplicaNum(); + buildStorageMedium(); + buildStorageFormat(); + buildInvertedIndexStorageFormat(); + buildDataSortInfo(); + buildCompressionType(); + buildStoragePolicy(); + buildIsBeingSynced(); + buildBinlogConfig(); + buildEnableLightSchemaChange(); + buildStoreRowColumn(); + buildRowStoreColumns(); + buildSkipWriteIndexOnLoad(); + buildCompactionPolicy(); + buildTimeSeriesCompactionGoalSizeMbytes(); + buildTimeSeriesCompactionFileCountThreshold(); + buildTimeSeriesCompactionTimeThresholdSeconds(); + buildDisableAutoCompaction(); + buildEnableSingleReplicaCompaction(); + buildTimeSeriesCompactionEmptyRowsetsThreshold(); + buildTimeSeriesCompactionLevelThreshold(); + buildTTLSeconds(); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { // get replica num from property map and create replica allocation - String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM); + String repNum = properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM); if (!Strings.isNullOrEmpty(repNum)) { ReplicaAllocation replicaAlloc = new ReplicaAllocation(Short.valueOf(repNum)); - tableProperty.properties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, + properties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt()); } else { - tableProperty.properties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, + properties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, ReplicaAllocation.DEFAULT_ALLOCATION.toCreateStmt()); } } - tableProperty.removeDuplicateReplicaNumProperty(); - tableProperty.buildReplicaAllocation(); - return tableProperty; + removeDuplicateReplicaNumProperty(); + buildReplicaAllocation(); } // For some historical reason, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java index 612d29a995e49dc..4d2558f8748ab8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java @@ -22,20 +22,23 @@ import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; import org.apache.doris.common.io.DeepCopy; import org.apache.doris.common.io.Text; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.Util; +import org.apache.doris.persist.gson.GsonPostProcessable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.gson.annotations.SerializedName; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.io.StringReader; import java.lang.ref.SoftReference; @@ -49,7 +52,7 @@ * Refreshing or invalidating a view will reload the view's definition but will not * affect the metadata of the underlying tables (if any). */ -public class View extends Table { +public class View extends Table implements GsonPostProcessable { private static final Logger LOG = LogManager.getLogger(View.class); // The original SQL-string given as view definition. Set during analysis. @@ -69,6 +72,7 @@ public class View extends Table { // // Corresponds to Hive's viewExpandedText, but is not identical to the SQL // Hive would produce in view creation. + @SerializedName("ivd") private String inlineViewDef; // for persist @@ -228,8 +232,8 @@ public String getSignature(int signatureVersion) { @Override public View clone() { - View copied = new View(); - if (!DeepCopy.copy(this, copied, View.class, FeConstants.meta_version)) { + View copied = DeepCopy.copy(this, View.class, FeConstants.meta_version); + if (copied == null) { LOG.warn("failed to copy view: " + getName()); return null; } @@ -237,17 +241,25 @@ public View clone() { return copied; } + public static View read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + View t = new View(); + t.readFields(in); + return t; + } + return GsonUtils.GSON.fromJson(Text.readString(in), View.class); + } + public void resetIdsForRestore(Env env) { id = env.getNextId(); } @Override - public void write(DataOutput out) throws IOException { - super.write(out); - Text.writeString(out, originalViewDef); - Text.writeString(out, inlineViewDef); + public void gsonPostProcess() throws IOException { + originalViewDef = ""; } + @Deprecated public void readFields(DataInput in) throws IOException { super.readFields(in); // just do not want to modify the meta version, so leave originalViewDef here but set it as empty diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java index 35574561a52ba5c..e54c5a66a506694 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/load/CopyJob.java @@ -48,13 +48,13 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; import com.google.gson.reflect.TypeToken; import lombok.Getter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -80,11 +80,15 @@ public class CopyJob extends CloudBrokerLoadJob { private String pattern; @Getter private ObjectInfo objectInfo; + + @SerializedName("cid") @Getter private String copyId; @Getter private boolean forceCopy; + @SerializedName("lfp") private String loadFilePaths = ""; + @SerializedName("pty") private Map properties = new HashMap<>(); private volatile boolean abortedCopy = false; private boolean isReplay = false; @@ -232,15 +236,6 @@ protected LoadJobFinalOperation getLoadJobFinalOperation() { finishTimestamp, state, failMsg, copyId, loadFilePaths, properties); } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - Text.writeString(out, copyId); - Text.writeString(out, loadFilePaths); - Gson gson = new Gson(); - Text.writeString(out, properties == null ? "" : gson.toJson(properties)); - } - @Override public void readFields(DataInput in) throws IOException { super.readFields(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java index 19e2ac6b6bbd35b..b36bc5eb022ffec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/PartitionsProcDir.java @@ -240,7 +240,7 @@ private List> getPartitionInfos() throws AnalysisException { .map(Map.Entry::getKey).collect(Collectors.toList()); } else { Collection partitions = isTempPartition - ? olapTable.getTempPartitions() : olapTable.getPartitions(); + ? olapTable.getAllTempPartitions() : olapTable.getPartitions(); partitionIds = partitions.stream().map(Partition::getId).collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/RangeUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/RangeUtils.java index 70ee1a3d6845153..3e32c2570f27c6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/RangeUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/RangeUtils.java @@ -27,6 +27,8 @@ import com.google.common.collect.RangeMap; import com.google.common.collect.TreeRangeMap; import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; import com.google.gson.JsonNull; import com.google.gson.JsonObject; @@ -225,7 +227,8 @@ public static void checkRangeConflict(List baseRanges, } } - public static class RangeSerializer implements JsonSerializer> { + public static class RangeSerializer implements + JsonSerializer>, JsonDeserializer> { @Override public JsonElement serialize(Range range, Type type, JsonSerializationContext context) { JsonArray result = new JsonArray(); @@ -254,5 +257,37 @@ public JsonElement serialize(Range range, Type type, JsonSerializa return result; } + + @Override + public Range deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) { + JsonArray jsonArray = json.getAsJsonArray(); + PartitionKey lowerBound = null; + BoundType lowerBoundType = BoundType.CLOSED; + PartitionKey upperBound = null; + BoundType upperBoundType = BoundType.CLOSED; + + if (!jsonArray.get(0).isJsonNull()) { + JsonObject lowerBoundObject = jsonArray.get(0).getAsJsonObject(); + lowerBoundType = BoundType.valueOf(lowerBoundObject.get("type").getAsString()); + lowerBound = context.deserialize(lowerBoundObject.get("value"), PartitionKey.class); + } + + if (!jsonArray.get(1).isJsonNull()) { + JsonObject upperBoundObject = jsonArray.get(1).getAsJsonObject(); + upperBoundType = BoundType.valueOf(upperBoundObject.get("type").getAsString()); + upperBound = context.deserialize(upperBoundObject.get("value"), PartitionKey.class); + } + + if (lowerBound == null && upperBound == null) { + return null; + } + if (lowerBound == null) { + return Range.upTo(upperBound, upperBoundType); + } + if (upperBound == null) { + return Range.downTo(lowerBound, lowerBoundType); + } + return Range.range(lowerBound, lowerBoundType, upperBound, upperBoundType); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/SmallFileMgr.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/SmallFileMgr.java index a6552d3ace7da9e..0fc63a8e3274760 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/SmallFileMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/SmallFileMgr.java @@ -24,14 +24,17 @@ import org.apache.doris.cloud.security.SecurityChecker; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import com.google.common.base.Strings; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Table; +import com.google.gson.annotations.SerializedName; import org.apache.commons.codec.binary.Hex; import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; @@ -61,13 +64,21 @@ public class SmallFileMgr implements Writable { public static final Logger LOG = LogManager.getLogger(SmallFileMgr.class); public static class SmallFile implements Writable { + @SerializedName("dbid") public long dbId; + @SerializedName("log") public String catalog; + @SerializedName("n") public String name; + @SerializedName("id") public long id; + @SerializedName("ctn") public String content; + @SerializedName("s") public long size; + @SerializedName("md5") public String md5; + @SerializedName("isC") public boolean isContent; private SmallFile() { @@ -87,9 +98,13 @@ public SmallFile(Long dbId, String catalogName, String fileName, Long id, String } public static SmallFile read(DataInput in) throws IOException { - SmallFile smallFile = new SmallFile(); - smallFile.readFields(in); - return smallFile; + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) { + SmallFile smallFile = new SmallFile(); + smallFile.readFields(in); + return smallFile; + } else { + return GsonUtils.GSON.fromJson(Text.readString(in), SmallFile.class); + } } public byte[] getContentBytes() { @@ -101,14 +116,7 @@ public byte[] getContentBytes() { @Override public void write(DataOutput out) throws IOException { - out.writeLong(dbId); - Text.writeString(out, catalog); - Text.writeString(out, name); - out.writeLong(id); - Text.writeString(out, content); - out.writeLong(size); - Text.writeString(out, md5); - out.writeBoolean(isContent); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } public void readFields(DataInput in) throws IOException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/URI.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/URI.java index 5e064bf91eacfb1..71ba4a069d88e6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/URI.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/URI.java @@ -20,6 +20,7 @@ import org.apache.doris.common.AnalysisException; import com.google.common.base.Strings; +import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; import java.util.TreeMap; @@ -35,6 +36,7 @@ public class URI { private static final String QUERY_ITEM_DELIM = "&"; private static final String QUERY_KV_DELIM = "="; + @SerializedName("l") private final String location; private String scheme; private String authority; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 57ea0e7c8b6f831..5d78b3252c015d1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -214,8 +214,8 @@ public class InternalCatalog implements CatalogIf { private static final Logger LOG = LogManager.getLogger(InternalCatalog.class); private QueryableReentrantLock lock = new QueryableReentrantLock(true); - private ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); - private ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); + private transient ConcurrentHashMap idToDb = new ConcurrentHashMap<>(); + private transient ConcurrentHashMap fullNameToDb = new ConcurrentHashMap<>(); // Add transient to fix gson issue. @Getter @@ -3283,7 +3283,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } // if table currently has no partitions, this sql like empty command and do nothing, should return directly. // but if truncate whole table, the temporary partitions also need drop - if (origPartitions.isEmpty() && (!truncateEntireTable || olapTable.getTempPartitions().isEmpty())) { + if (origPartitions.isEmpty() && (!truncateEntireTable || olapTable.getAllTempPartitions().isEmpty())) { LOG.info("finished to truncate table {}, no partition contains data, do nothing", tblRef.getName().toSql()); return; @@ -3460,7 +3460,7 @@ private List truncateTableInternal(OlapTable olapTable, List oldPartitionsIds = oldPartitions.stream().map(Partition::getId).collect(Collectors.toSet()); - for (Partition partition : olapTable.getTempPartitions()) { + for (Partition partition : olapTable.getAllTempPartitions()) { if (!oldPartitionsIds.contains(partition.getId())) { oldPartitions.add(partition); } @@ -3576,8 +3576,7 @@ public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlEx int dbCount = dis.readInt(); long newChecksum = checksum ^ dbCount; for (long i = 0; i < dbCount; ++i) { - Database db = new Database(); - db.readFields(dis); + Database db = Database.read(dis); newChecksum ^= db.getId(); Database dbPrev = fullNameToDb.get(db.getFullName()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 72b3a96c4eb9be7..d133676b9063505 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -45,6 +45,7 @@ import org.apache.doris.load.BrokerFileGroupAggInfo; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.plugin.audit.AuditEvent; import org.apache.doris.plugin.audit.LoadAuditEvent; import org.apache.doris.qe.ConnectContext; @@ -58,6 +59,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -81,10 +83,12 @@ public abstract class BulkLoadJob extends LoadJob { private static final Logger LOG = LogManager.getLogger(BulkLoadJob.class); // input params + @SerializedName(value = "bd") protected BrokerDesc brokerDesc; // this param is used to persist the expr of columns // the origin stmt is persisted instead of columns expr // the expr of columns will be reanalyzed when the log is replayed + @SerializedName(value = "os") private OriginStatement originStmt; // include broker desc and data desc @@ -93,6 +97,7 @@ public abstract class BulkLoadJob extends LoadJob { // sessionVariable's name -> sessionVariable's value // we persist these sessionVariables due to the session is not available when replaying the job. + @SerializedName(value = "svs") protected Map sessionVariables = Maps.newHashMap(); public BulkLoadJob(EtlJobType jobType) { @@ -308,23 +313,16 @@ protected void replayTxnAttachment(TransactionState txnState) { unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - brokerDesc.write(out); - originStmt.write(out); - - out.writeInt(sessionVariables.size()); - for (Map.Entry entry : sessionVariables.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - } - public OriginStatement getOriginStmt() { return this.originStmt; } + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } + public void readFields(DataInput in) throws IOException { super.readFields(in); brokerDesc = BrokerDesc.read(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java index 19814036ec40a06..5ee63f1fd7455b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/InsertLoadJob.java @@ -31,9 +31,9 @@ import com.google.common.base.Strings; import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.util.Set; @@ -43,7 +43,7 @@ * The state of insert load job is always finished, so it will never be scheduled by JobScheduler. */ public class InsertLoadJob extends LoadJob { - + @SerializedName("tid") private long tableId; // only for log replay @@ -116,12 +116,7 @@ public Set getTableNames() throws MetaNotFoundException { } @Override - public void write(DataOutput out) throws IOException { - super.write(out); - out.writeLong(tableId); - } - - public void readFields(DataInput in) throws IOException { + protected void readFields(DataInput in) throws IOException { super.readFields(in); tableId = in.readLong(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index f28394957bd7768..5f06f6317712480 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -90,20 +90,30 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements public static final String DPP_ABNORMAL_ALL = "dpp.abnorm.ALL"; public static final String UNSELECTED_ROWS = "unselected.rows"; + @SerializedName("id") protected long id; // input params + @SerializedName("did") protected long dbId; + @SerializedName("lb") protected String label; + @SerializedName("st") protected JobState state = JobState.PENDING; protected EtlJobType jobType; // the auth info could be null when load job is created before commit named 'Persist auth info in load job' + @SerializedName("ai") protected AuthorizationInfo authorizationInfo; + @SerializedName("ct") protected long createTimestamp = System.currentTimeMillis(); + @SerializedName("lst") protected long loadStartTimestamp = -1; + @SerializedName("ft") protected long finishTimestamp = -1; + @SerializedName("txid") protected long transactionId; + @SerializedName("fm") protected FailMsg failMsg; protected Map idToTasks = Maps.newConcurrentMap(); protected Set finishedTaskIds = Sets.newHashSet(); @@ -127,6 +137,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected LoadStatistic loadStatistic = new LoadStatistic(); // This map is used to save job property. + @SerializedName("jp") private Map jobProperties = Maps.newHashMap(); // only for persistence param. see readFields() for usage @@ -134,8 +145,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements protected List errorTabletInfos = Lists.newArrayList(); + @SerializedName("ui") protected UserIdentity userInfo = UserIdentity.UNKNOWN; + @SerializedName("cmt") protected String comment = ""; protected List tWorkloadGroups = null; @@ -831,6 +844,10 @@ public void getJobInfo(Load.JobInfo jobInfo) throws DdlException { } public static LoadJob read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_136) { + return GsonUtils.GSON.fromJson(Text.readString(in), LoadJob.class); + } + LoadJob job = null; EtlJobType type = EtlJobType.valueOf(Text.readString(in)); if (type == EtlJobType.BROKER) { @@ -1014,47 +1031,10 @@ public boolean equals(Object obj) { @Override public void write(DataOutput out) throws IOException { - // Add the type of load secondly - Text.writeString(out, jobType.name()); - - out.writeLong(id); - out.writeLong(dbId); - Text.writeString(out, label); - Text.writeString(out, state.name()); - out.writeLong(createTimestamp); - out.writeLong(loadStartTimestamp); - out.writeLong(finishTimestamp); - if (failMsg == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - failMsg.write(out); - } - out.writeInt(progress); - loadingStatus.write(out); - out.writeLong(transactionId); - if (authorizationInfo == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - authorizationInfo.write(out); - } - - out.writeInt(this.jobProperties.size()); - for (Map.Entry entry : jobProperties.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, String.valueOf(entry.getValue())); - } - if (userInfo == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - userInfo.write(out); - } - Text.writeString(out, comment); + Text.writeString(out, GsonUtils.GSON.toJson(this)); } - public void readFields(DataInput in) throws IOException { + protected void readFields(DataInput in) throws IOException { if (!isJobTypeRead) { jobType = EtlJobType.valueOf(Text.readString(in)); @@ -1202,11 +1182,19 @@ protected String getTimeZone() { } public int getLoadParallelism() { - return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM); + if (jobProperties.get(LoadStmt.LOAD_PARALLELISM).getClass() == Integer.class) { + return (int) jobProperties.get(LoadStmt.LOAD_PARALLELISM); + } else { + return ((Long) jobProperties.get(LoadStmt.LOAD_PARALLELISM)).intValue(); + } } public int getSendBatchParallelism() { - return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM); + if (jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM).getClass() == Integer.class) { + return (int) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM); + } else { + return ((Long) jobProperties.get(LoadStmt.SEND_BATCH_PARALLELISM)).intValue(); + } } public LoadTask.Priority getPriority() { @@ -1231,6 +1219,8 @@ public boolean isExpired(long currentTimeMs) { expireTime = Config.streaming_label_keep_max_second; } + LOG.info("state {}, expireTime {}, currentTimeMs {}, finishTimestamp {}", + state, expireTime, currentTimeMs, getFinishTimestamp()); return (currentTimeMs - getFinishTimestamp()) / 1000 > expireTime; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 9d7c0f3bc778378..1026689d8f1448f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -933,8 +933,10 @@ public void write(DataOutput out) throws IOException { idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)) .filter(t -> !(t instanceof MiniLoadJob)).collect(Collectors.toList()); + LOG.info("write load job size: {}", loadJobs.size()); out.writeInt(loadJobs.size()); for (LoadJob loadJob : loadJobs) { + LOG.info("write load job: {}", loadJob.getId()); loadJob.write(out); } } @@ -945,6 +947,7 @@ public void write(DataOutput out) throws IOException { public void readFields(DataInput in) throws IOException { long currentTimeMs = System.currentTimeMillis(); int size = in.readInt(); + LOG.info("load job num {} ", size); for (int i = 0; i < size; i++) { LoadJob loadJob = LoadJob.read(in); if (loadJob.isExpired(currentTimeMs)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 514a38ccb51bb6f..fdc0b45c46b2564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -23,9 +23,11 @@ import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.io.Text; import org.apache.doris.load.EtlJobType; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.transaction.TransactionState; import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,6 +40,7 @@ public class MiniLoadJob extends LoadJob { private static final Logger LOG = LogManager.getLogger(MiniLoadJob.class); + @SerializedName("tn") private String tableName; private long tableId; @@ -67,8 +70,8 @@ protected void replayTxnAttachment(TransactionState txnState) { @Override public void write(DataOutput out) throws IOException { - super.write(out); - Text.writeString(out, tableName); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index 85a923d19faefde..a6327ff02a934dc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -49,7 +49,7 @@ public class SparkLoadAppHandle implements Writable { @SerializedName("startTime") private long startTime; @SerializedName("finalStatus") - private FinalApplicationStatus finalStatus; + private String finalStatus; @SerializedName("trackingUrl") private String trackingUrl; @SerializedName("user") @@ -137,7 +137,7 @@ public long getStartTime() { } public FinalApplicationStatus getFinalStatus() { - return this.finalStatus; + return FinalApplicationStatus.valueOf(this.finalStatus); } public String getUrl() { @@ -177,7 +177,7 @@ public void setStartTime(long startTime) { } public void setFinalStatus(FinalApplicationStatus status) { - this.finalStatus = status; + this.finalStatus = status.toString(); this.fireEvent(true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 1d1dc426fda72b1..1f1c71d7a903d2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -100,7 +100,6 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; -import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -122,20 +121,26 @@ public class SparkLoadJob extends BulkLoadJob { // --- members below need persist --- // create from resourceDesc when job created + @SerializedName(value = "sr") private SparkResource sparkResource; // members below updated when job state changed to etl + @SerializedName(value = "est") private long etlStartTimestamp = -1; // for spark yarn + @SerializedName(value = "appid") private String appId = ""; // spark job outputPath + @SerializedName(value = "etlop") private String etlOutputPath = ""; // members below updated when job state changed to loading // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) } + @SerializedName(value = "tm2fi") private Map> tabletMetaToFileInfo = Maps.newHashMap(); // --- members below not persist --- private ResourceDesc resourceDesc; // for spark standalone + @SerializedName(value = "slah") private SparkLoadAppHandle sparkLoadAppHandle = new SparkLoadAppHandle(); // for straggler wait long time to commit transaction private long quorumFinishTimestamp = -1; @@ -782,22 +787,6 @@ public void clearSparkLauncherLog() { } } - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - sparkResource.write(out); - sparkLoadAppHandle.write(out); - out.writeLong(etlStartTimestamp); - Text.writeString(out, appId); - Text.writeString(out, etlOutputPath); - out.writeInt(tabletMetaToFileInfo.size()); - for (Map.Entry> entry : tabletMetaToFileInfo.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue().first); - out.writeLong(entry.getValue().second); - } - } - public void readFields(DataInput in) throws IOException { super.readFields(in); sparkResource = (SparkResource) Resource.read(in); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index 79775a2ef24663a..cf34404d94768cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -80,6 +80,7 @@ import org.apache.doris.catalog.JdbcResource; import org.apache.doris.catalog.JdbcTable; import org.apache.doris.catalog.ListPartitionInfo; +import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.MapType; import org.apache.doris.catalog.MultiRowType; @@ -90,9 +91,11 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.RandomDistributionInfo; import org.apache.doris.catalog.RangePartitionInfo; +import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.S3Resource; @@ -114,6 +117,8 @@ import org.apache.doris.cloud.catalog.CloudReplica; import org.apache.doris.cloud.catalog.CloudTablet; import org.apache.doris.cloud.datasource.CloudInternalCatalog; +import org.apache.doris.cloud.load.CloudBrokerLoadJob; +import org.apache.doris.cloud.load.CopyJob; import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.util.RangeUtils; @@ -168,9 +173,15 @@ import org.apache.doris.fs.remote.dfs.OFSFileSystem; import org.apache.doris.job.extensions.insert.InsertJob; import org.apache.doris.job.extensions.mtmv.MTMVJob; +import org.apache.doris.load.loadv2.BrokerLoadJob; +import org.apache.doris.load.loadv2.BulkLoadJob; +import org.apache.doris.load.loadv2.InsertLoadJob; +import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo; import org.apache.doris.load.loadv2.LoadJobFinalOperation; +import org.apache.doris.load.loadv2.MiniLoadJob; import org.apache.doris.load.loadv2.MiniLoadTxnCommitAttachment; +import org.apache.doris.load.loadv2.SparkLoadJob; import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo; import org.apache.doris.load.routineload.AbstractDataSourceProperties; import org.apache.doris.load.routineload.KafkaProgress; @@ -215,6 +226,7 @@ import com.google.gson.JsonSerializationContext; import com.google.gson.JsonSerializer; import com.google.gson.ReflectionAccessFilter; +import com.google.gson.ToNumberPolicy; import com.google.gson.TypeAdapter; import com.google.gson.TypeAdapterFactory; import com.google.gson.annotations.SerializedName; @@ -252,7 +264,6 @@ * See the following "GuavaTableAdapter" and "GuavaMultimapAdapter" for example. */ public class GsonUtils { - // runtime adapter for class "Type" private static RuntimeTypeAdapterFactory columnTypeAdapterFactory = RuntimeTypeAdapterFactory @@ -531,9 +542,26 @@ public class GsonUtils { .registerSubtype(BackupJob.class, BackupJob.class.getSimpleName()) .registerSubtype(RestoreJob.class, RestoreJob.class.getSimpleName()); + private static RuntimeTypeAdapterFactory loadJobTypeAdapterFactory + = RuntimeTypeAdapterFactory.of(LoadJob.class, "clazz") + .registerSubtype(BrokerLoadJob.class, BrokerLoadJob.class.getSimpleName()) + .registerSubtype(BulkLoadJob.class, BulkLoadJob.class.getSimpleName()) + .registerSubtype(CloudBrokerLoadJob.class, CloudBrokerLoadJob.class.getSimpleName()) + .registerSubtype(CopyJob.class, CopyJob.class.getSimpleName()) + .registerSubtype(InsertLoadJob.class, InsertLoadJob.class.getSimpleName()) + .registerSubtype(MiniLoadJob.class, MiniLoadJob.class.getSimpleName()) + .registerSubtype(SparkLoadJob.class, SparkLoadJob.class.getSimpleName()); + + private static RuntimeTypeAdapterFactory partitionItemTypeAdapterFactory + = RuntimeTypeAdapterFactory.of(PartitionItem.class, "clazz") + .registerSubtype(ListPartitionItem.class, ListPartitionItem.class.getSimpleName()) + .registerSubtype(RangePartitionItem.class, RangePartitionItem.class.getSimpleName()); + // the builder of GSON instance. // Add any other adapters if necessary. - private static final GsonBuilder GSON_BUILDER = new GsonBuilder().addSerializationExclusionStrategy( + private static final GsonBuilder GSON_BUILDER = new GsonBuilder() + .setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE) + .addSerializationExclusionStrategy( new HiddenAnnotationExclusionStrategy()).enableComplexMapKeySerialization() .addReflectionAccessFilter(ReflectionAccessFilter.BLOCK_INACCESSIBLE_JAVA) .registerTypeHierarchyAdapter(Table.class, new GuavaTableAdapter()) @@ -563,6 +591,8 @@ public class GsonUtils { .registerTypeAdapterFactory(routineLoadTypeAdapterFactory) .registerTypeAdapterFactory(remoteFileSystemTypeAdapterFactory) .registerTypeAdapterFactory(jobBackupTypeAdapterFactory) + .registerTypeAdapterFactory(loadJobTypeAdapterFactory) + .registerTypeAdapterFactory(partitionItemTypeAdapterFactory) .registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer()) .registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter()) .registerTypeAdapter(PartitionKey.class, new PartitionKey.PartitionKeySerializer()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/RuntimeTypeAdapterFactory.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/RuntimeTypeAdapterFactory.java index ee780f0de1b95e7..ef44fccbfb6a735 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/RuntimeTypeAdapterFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/RuntimeTypeAdapterFactory.java @@ -183,6 +183,7 @@ public final class RuntimeTypeAdapterFactory implements TypeAdapterFactory { private final String typeFieldName; private final Map> labelToSubtype = new LinkedHashMap>(); private final Map, String> subtypeToLabel = new LinkedHashMap, String>(); + private final boolean maintainType; private Class defaultType = null; diff --git a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java index bb9cbe0d717e184..177522638eaeb62 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/alter/AlterTest.java @@ -446,7 +446,7 @@ public void testConflictAlterOperations() throws Exception { // add temp partition when dynamic partition is enable stmt = "alter table test.tbl1 add temporary partition tp3 values less than('2020-04-01') distributed by hash(k2) buckets 4 PROPERTIES ('replication_num' = '1')"; alterTable(stmt, false); - Assert.assertEquals(1, tbl.getTempPartitions().size()); + Assert.assertEquals(1, tbl.getAllTempPartitions().size()); // disable the dynamic partition stmt = "alter table test.tbl1 set ('dynamic_partition.enable' = 'false')"; @@ -547,7 +547,7 @@ public void testAlterDateV2Operations() throws Exception { stmt = "alter table test.tbl6 add temporary partition tp3 values less than('2020-04-01 00:00:00') distributed" + " by hash(k2) buckets 4 PROPERTIES ('replication_num' = '1')"; alterTable(stmt, false); - Assert.assertEquals(1, tbl.getTempPartitions().size()); + Assert.assertEquals(1, tbl.getAllTempPartitions().size()); // disable the dynamic partition stmt = "alter table test.tbl6 set ('dynamic_partition.enable' = 'false')"; diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java index b5ee8331c595415..c70ea93d5e48b63 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DatabaseTest.java @@ -243,12 +243,10 @@ public void testSerialization() throws Exception { // 2. Read objects from file DataInputStream dis = new DataInputStream(Files.newInputStream(path)); - Database rDb1 = new Database(); - rDb1.readFields(dis); + Database rDb1 = Database.read(dis); Assert.assertEquals(rDb1, db1); - Database rDb2 = new Database(); - rDb2.readFields(dis); + Database rDb2 = Database.read(dis); Assert.assertEquals(rDb2, db2); // 3. delete files diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/RangePartitionInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/RangePartitionInfoTest.java index 2db7338b8e0bc0b..e1112c8cb383c7b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/RangePartitionInfoTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/RangePartitionInfoTest.java @@ -25,9 +25,15 @@ import org.apache.doris.common.DdlException; import com.google.common.collect.Lists; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -442,4 +448,28 @@ public void testFixedRange8() throws DdlException, AnalysisException { partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId++, false); } } + + @Test + public void testSerialization() throws IOException, AnalysisException, DdlException { + // 1. Write objects to file + final Path path = Files.createTempFile("rangePartitionInfo", "tmp"); + DataOutputStream out = new DataOutputStream(Files.newOutputStream(path)); + + partitionInfo = new RangePartitionInfo(partitionColumns); + + partitionInfo.write(out); + out.flush(); + out.close(); + + // 2. Read objects from file + DataInputStream in = new DataInputStream(Files.newInputStream(path)); + + RangePartitionInfo partitionInfo2 = (RangePartitionInfo) PartitionInfo.read(in); + + Assert.assertEquals(partitionInfo.getType(), partitionInfo2.getType()); + + // 3. delete files + in.close(); + Files.delete(path); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java index 390f83622fe9e2a..19b191a4cd2fada 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TempPartitionTest.java @@ -1283,9 +1283,9 @@ private void testSerializeOlapTable(OlapTable tbl) throws IOException, AnalysisE // 2. Read objects from file DataInputStream in = new DataInputStream(new FileInputStream(file)); - OlapTable readTbl = (OlapTable) Table.read(in); + OlapTable readTbl = OlapTable.read(in); Assert.assertEquals(tbl.getId(), readTbl.getId()); - Assert.assertEquals(tbl.getTempPartitions().size(), readTbl.getTempPartitions().size()); + Assert.assertEquals(tbl.getAllTempPartitions().size(), readTbl.getAllTempPartitions().size()); file.delete(); }