Skip to content

Commit

Permalink
[improvement](meta) serialize meta via gson part3
Browse files Browse the repository at this point in the history
Switch meta serialization to gson
Contains the following classes:

Database
SmallFileMgr$SmallFile
catalog.Table
loadv2.LoadJob
loadv2.LoadJobFinalOperation

Co-authored-by: iszhangpch
  • Loading branch information
dataroaring committed Jun 19, 2024
1 parent 100023f commit 42da483
Show file tree
Hide file tree
Showing 41 changed files with 441 additions and 563 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ public final class FeMetaVersion {
public static final int VERSION_134 = 134;
// For mate gson
public static final int VERSION_135 = 135;
// For mate gson
public static final int VERSION_136 = 136;
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
public static final int VERSION_CURRENT = VERSION_135;
public static final int VERSION_CURRENT = VERSION_136;

// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
// if (FE_METAVERSION < VERSION_94) ...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -68,16 +69,24 @@ public class AggregateFunction extends Function {
public static ImmutableSet<String> SUPPORT_ORDER_BY_AGGREGATE_FUNCTION_NAME_SET = ImmutableSet.of("group_concat");

// Set if different from retType_, null otherwise.
@SerializedName("it")
private Type intermediateType;

// The symbol inside the binary at location_ that contains this particular.
// They can be null if it is not required.
@SerializedName("ufs")
private String updateFnSymbol;
@SerializedName("ifs")
private String initFnSymbol;
@SerializedName("sfs")
private String serializeFnSymbol;
@SerializedName("mfs")
private String mergeFnSymbol;
@SerializedName("gvfs")
private String getValueFnSymbol;
@SerializedName("rfs")
private String removeFnSymbol;
@SerializedName("ffs")
private String finalizeFnSymbol;

private static String BE_BUILTINS_CLASS = "AggregateFunctions";
Expand All @@ -86,26 +95,31 @@ public class AggregateFunction extends Function {
// e.g. min(distinct col) == min(col).
// TODO: currently it is not possible for user functions to specify this. We should
// extend the create aggregate function stmt to allow additional metadata like this.
@SerializedName("igd")
private boolean ignoresDistinct;

// True if this function can appear within an analytic expr (fn() OVER(...)).
// TODO: Instead of manually setting this flag for all builtin aggregate functions
// we should identify this property from the function itself (e.g., based on which
// functions of the UDA API are implemented).
// Currently, there is no reliable way of doing that.
@SerializedName("isAn")
private boolean isAnalyticFn;

// True if this function can be used for aggregation (without an OVER() clause).
@SerializedName("isAg")
private boolean isAggregateFn;

// True if this function returns a non-null value on an empty input. It is used
// primarily during the equal of scalar subqueries.
// TODO: Instead of manually setting this flag, we should identify this
// property from the function itself (e.g. evaluating the function on an
// empty input in BE).
@SerializedName("rnno")
private boolean returnsNonNullOnEmpty;

// use for java-udaf to point the class of user define
@SerializedName("sn")
private String symbolName;

// only used for serialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -59,7 +60,9 @@ public class AliasFunction extends Function {

private static final String DIGITAL_MASKING = "digital_masking";

@SerializedName("of")
private Expr originFunction;
@SerializedName("pm")
private List<String> parameters = new ArrayList<>();
private List<String> typeDefParams = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -39,7 +40,9 @@
* The job checks the privilege by auth info.
*/
public class AuthorizationInfo implements Writable {
@SerializedName(value = "dn")
private String dbName;
@SerializedName(value = "tn")
private Set<String> tableNameList;

// only for persist
Expand Down
26 changes: 7 additions & 19 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/BrokerTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
Expand All @@ -46,11 +46,16 @@ public class BrokerTable extends Table {
private static final String COLUMN_SEPARATOR = "column_separator";
private static final String LINE_DELIMITER = "line_delimiter";
private static final String FILE_FORMAT = "format";
@SerializedName("bn")
private String brokerName;
@SerializedName("ps")
private List<String> paths;
@SerializedName("cs")
private String columnSeparator;
@SerializedName("ld")
private String lineDelimiter;
private String fileFormat;
@SerializedName("bp")
private Map<String, String> brokerProperties;

public BrokerTable() {
Expand Down Expand Up @@ -209,24 +214,7 @@ public TTableDescriptor toThrift() {
return tTableDescriptor;
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);

Text.writeString(out, brokerName);
out.writeInt(paths.size());
for (String path : paths) {
Text.writeString(out, path);
}
Text.writeString(out, columnSeparator);
Text.writeString(out, lineDelimiter);
out.writeInt(brokerProperties.size());
for (Map.Entry<String, String> prop : brokerProperties.entrySet()) {
Text.writeString(out, prop.getKey());
Text.writeString(out, prop.getValue());
}
}

@Deprecated
public void readFields(DataInput in) throws IOException {
super.readFields(in);

Expand Down
57 changes: 29 additions & 28 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.SystemInfoService;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -108,6 +108,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
@SerializedName(value = "replicaQuotaSize")
private volatile long replicaQuotaSize;

@SerializedName(value = "tq")
private volatile long transactionQuotaSize;

private volatile boolean isDropped;
Expand All @@ -124,6 +125,7 @@ public enum DbState {
@SerializedName(value = "dbProperties")
private DatabaseProperty dbProperties = new DatabaseProperty();

@SerializedName(value = "bc")
private BinlogConfig binlogConfig = new BinlogConfig();

public Database() {
Expand Down Expand Up @@ -587,9 +589,24 @@ public int getMaxReplicationNum() {
}

public static Database read(DataInput in) throws IOException {
Database db = new Database();
db.readFields(in);
return db;
LOG.info("read db from journal {}", in);
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
Database db = new Database();
db.readFields(in);
return db;
} else {
Database db = GsonUtils.GSON.fromJson(Text.readString(in), Database.class);
db.fullQualifiedName = ClusterNamespace.getNameFromFullName(db.fullQualifiedName);
db.nameToTable.forEach((tn, tb) -> {
tb.setQualifiedDbName(db.fullQualifiedName);
if (tb instanceof MTMV) {
Env.getCurrentEnv().getMtmvService().registerMTMV((MTMV) tb, db.id);
}
db.idToTable.put(tb.getId(), tb);
db.lowerCaseToTableName.put(tn.toLowerCase(), tn);
});
return db;
}
}

@Override
Expand All @@ -605,33 +622,16 @@ public String getSignature(int signatureVersion) {

@Override
public void write(DataOutput out) throws IOException {
super.write(out);

out.writeLong(id);
Text.writeString(out, ClusterNamespace.getNameFromFullName(fullQualifiedName));
// write tables
discardHudiTable();
int numTables = nameToTable.size();
out.writeInt(numTables);
for (Map.Entry<String, Table> entry : nameToTable.entrySet()) {
entry.getValue().write(out);
}

out.writeLong(dataQuotaBytes);
Text.writeString(out, SystemInfoService.DEFAULT_CLUSTER);
Text.writeString(out, dbState.name());
Text.writeString(out, attachDbName);

// write functions
FunctionUtil.write(out, name2Function);

// write encryptKeys
dbEncryptKey.write(out);

out.writeLong(replicaQuotaSize);
dbProperties.write(out);
String json = GsonUtils.GSON.toJson(this);
JsonObject jsonObject = GsonUtils.GSON.fromJson(json, JsonObject.class);
String fqn = ClusterNamespace.getNameFromFullName(jsonObject.get("fullQualifiedName").getAsString());
jsonObject.remove("fullQualifiedName");
jsonObject.addProperty("fullQualifiedName", fqn);
Text.writeString(out, GsonUtils.GSON.toJson(jsonObject));
}


private void discardHudiTable() {
Iterator<Entry<String, Table>> iterator = nameToTable.entrySet().iterator();
while (iterator.hasNext()) {
Expand All @@ -650,6 +650,7 @@ public void analyze() {
}
}

@Deprecated
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -2082,10 +2082,12 @@ public long loadFrontends(DataInputStream dis, long checksum) throws IOException
}

public long loadBackends(DataInputStream dis, long checksum) throws IOException {
LOG.info("start loading backends from image");
return systemInfo.loadBackends(dis, checksum);
}

public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException {
LOG.info("start loading db from image");
return getInternalCatalog().loadDb(dis, checksum);
}

Expand Down Expand Up @@ -2312,6 +2314,7 @@ public long loadPolicy(DataInputStream in, long checksum) throws IOException {
* Load catalogs through file.
**/
public long loadCatalog(DataInputStream in, long checksum) throws IOException {
LOG.info("start loading catalog from image");
CatalogMgr mgr = CatalogMgr.read(in);
// When enable the multi catalog in the first time, the "mgr" will be a null value.
// So ignore it to use default catalog manager.
Expand Down
18 changes: 5 additions & 13 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;

import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
Expand All @@ -36,7 +37,6 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -65,6 +65,7 @@ public class EsTable extends Table {
// Here we have a slightly conservative value of 20, but at the same time
// we also provide configurable parameters for expert-using
// @see `MAX_DOCVALUE_FIELDS`
@Getter
private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
private String hosts;
private String[] seeds;
Expand All @@ -77,6 +78,7 @@ public class EsTable extends Table {
private String mappingType = null;
// only save the partition definition, save the partition key,
// partition list is got from es cluster dynamically and is saved in esTableState
@SerializedName("pi")
private PartitionInfo partitionInfo;
private EsTablePartitions esTablePartitions;

Expand All @@ -101,6 +103,7 @@ public class EsTable extends Table {
private boolean includeHiddenIndex = Boolean.parseBoolean(EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE);

// tableContext is used for being convenient to persist some configuration parameters uniformly
@SerializedName("tc")
private Map<String, String> tableContext = new HashMap<>();

// record the latest and recently exception when sync ES table metadata (mapping, shard location)
Expand Down Expand Up @@ -261,18 +264,7 @@ public String getSignature(int signatureVersion) {
return md5;
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(tableContext.size());
for (Map.Entry<String, String> entry : tableContext.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
Text.writeString(out, partitionInfo.getType().name());
partitionInfo.write(out);
}

@Deprecated
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
Expand Down
Loading

0 comments on commit 42da483

Please sign in to comment.