Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Jun 22, 2024
1 parent 8766431 commit 5685f7d
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -638,33 +637,6 @@ public TFunction toThrift(Type realReturnType, Type[] realArgTypes, Boolean[] re
return fn;
}

@Override
public void write(DataOutput output) throws IOException {
// 1. type
FunctionType.AGGREGATE.write(output);
// 2. parent
super.writeFields(output);
// 3. self's member
boolean hasInterType = intermediateType != null;
output.writeBoolean(hasInterType);
if (hasInterType) {
ColumnType.write(output, intermediateType);
}
IOUtils.writeOptionString(output, updateFnSymbol);
IOUtils.writeOptionString(output, initFnSymbol);
IOUtils.writeOptionString(output, serializeFnSymbol);
IOUtils.writeOptionString(output, mergeFnSymbol);
IOUtils.writeOptionString(output, getValueFnSymbol);
IOUtils.writeOptionString(output, removeFnSymbol);
IOUtils.writeOptionString(output, finalizeFnSymbol);
IOUtils.writeOptionString(output, symbolName);

output.writeBoolean(ignoresDistinct);
output.writeBoolean(isAnalyticFn);
output.writeBoolean(isAggregateFn);
output.writeBoolean(returnsNonNullOnEmpty);
}

public void readFields(DataInput input) throws IOException {
super.readFields(input);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
Expand Down Expand Up @@ -257,21 +256,6 @@ public String toSql(boolean ifNotExists) {
return sb.toString();
}

@Override
public void write(DataOutput output) throws IOException {
// 1. type
FunctionType.ALIAS.write(output);
// 2. parent
super.writeFields(output);
// 3. parameter
output.writeInt(parameters.size());
for (String p : parameters) {
Text.writeString(output, p);
}
// 4. expr
Expr.writeTo(originFunction, output);
}

@Override
public void readFields(DataInput input) throws IOException {
super.readFields(input);
Expand Down
25 changes: 21 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>,
// table family group map
private final Map<Long, Table> idToTable;
@SerializedName(value = "nameToTable")
private Map<String, Table> nameToTable;
private ConcurrentMap<String, Table> nameToTable;
// table name lower cast -> table name
private final Map<String, String> lowerCaseToTableName;

Expand All @@ -109,7 +109,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>,
@SerializedName(value = "replicaQuotaSize")
private volatile long replicaQuotaSize;

@SerializedName(value = "tq")
// from dbProperties;
private volatile long transactionQuotaSize;

private volatile boolean isDropped;
Expand All @@ -126,7 +126,7 @@ public enum DbState {
@SerializedName(value = "dbProperties")
private DatabaseProperty dbProperties = new DatabaseProperty();

@SerializedName(value = "bc")
// from dbProperties;
private BinlogConfig binlogConfig = new BinlogConfig();

public Database() {
Expand Down Expand Up @@ -602,6 +602,8 @@ public static Database read(DataInput in) throws IOException {

@Override
public void gsonPostProcess() throws IOException {
Preconditions.checkState(nameToTable.getClass() == ConcurrentMap.class,
"nameToTable should be ConcurrentMap");
fullQualifiedName = ClusterNamespace.getNameFromFullName(fullQualifiedName);
nameToTable.forEach((tn, tb) -> {
tb.setQualifiedDbName(fullQualifiedName);
Expand All @@ -612,11 +614,26 @@ public void gsonPostProcess() throws IOException {
lowerCaseToTableName.put(tn.toLowerCase(), tn);
});

if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_105) {
String txnQuotaStr = dbProperties.getOrDefault(TRANSACTION_QUOTA_SIZE,
String.valueOf(Config.max_running_txn_num_per_db));
transactionQuotaSize = Long.parseLong(txnQuotaStr);
binlogConfig = dbProperties.getBinlogConfig();
} else {
transactionQuotaSize = Config.default_db_max_running_txn_num == -1L
? Config.max_running_txn_num_per_db
: Config.default_db_max_running_txn_num;
}

for (ImmutableList<Function> functions : name2Function.values()) {
for (Function function : functions) {
try {
FunctionUtil.translateToNereids(null, function);
} catch (Exception e) {
LOG.warn("Nereids add function failed", e);
}
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
Expand All @@ -34,7 +36,7 @@
/**
* user define encryptKey in current db.
*/
public class DatabaseEncryptKey implements Writable {
public class DatabaseEncryptKey implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(DatabaseEncryptKey.class);
// user define encryptKey
// keyName -> encryptKey
Expand All @@ -55,4 +57,10 @@ public static DatabaseEncryptKey read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, DatabaseEncryptKey.class);
}

@Override
public void gsonPostProcess() throws IOException {
Preconditions.checkState(name2EncryptKey.getClass() == ConcurrentMap.class,
"name2EncryptKey should be ConcurrentMap, but is " + name2EncryptKey.getClass());
}
}
36 changes: 7 additions & 29 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.IOUtils;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.URI;
Expand Down Expand Up @@ -636,6 +635,7 @@ enum FunctionType {
AGGREGATE(2),
ALIAS(3);

@SerializedName("c")
private int code;

FunctionType(int code) {
Expand All @@ -660,43 +660,21 @@ public static FunctionType fromCode(int code) {
return null;
}

public void write(DataOutput output) throws IOException {
output.writeInt(code);
}

public static FunctionType read(DataInput input) throws IOException {
return fromCode(input.readInt());
}
}

protected void writeFields(DataOutput output) throws IOException {
output.writeLong(id);
name.write(output);
ColumnType.write(output, retType);
output.writeInt(argTypes.length);
for (Type type : argTypes) {
ColumnType.write(output, type);
}
output.writeBoolean(hasVarArgs);
output.writeBoolean(userVisible);
output.writeInt(binaryType.getValue());
// write library URL
String libUrl = "";
if (location != null) {
libUrl = location.getLocation();
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
return fromCode(input.readInt());
} else {
throw new IOException("FunctionType should not be serialized af");
}
}
IOUtils.writeOptionString(output, libUrl);
IOUtils.writeOptionString(output, checksum);
output.writeUTF(nullableMode.toString());
output.writeBoolean(isUDTFunction);
}

@Override
public void write(DataOutput output) throws IOException {
throw new Error("Origin function cannot be serialized");
}

public void readFields(DataInput input) throws IOException {
protected void readFields(DataInput input) throws IOException {
id = input.readLong();
name = FunctionName.read(input);
retType = ColumnType.read(input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -175,19 +174,6 @@ public static Function getFunction(Function desc, Function.CompareMode mode,
return Function.getFunction(fns, desc, mode);
}

public static void write(DataOutput out, ConcurrentMap<String, ImmutableList<Function>> name2Function)
throws IOException {
// write functions
out.writeInt(name2Function.size());
for (Entry<String, ImmutableList<Function>> entry : name2Function.entrySet()) {
Text.writeString(out, entry.getKey());
out.writeInt(entry.getValue().size());
for (Function function : entry.getValue()) {
function.write(out);
}
}
}

public static void readFields(DataInput in, String dbName,
ConcurrentMap<String, ImmutableList<Function>> name2Function)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package org.apache.doris.catalog;

import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -35,23 +40,28 @@
* GlobalFunctionMgr will load all global functions at FE startup.
* Provides management of global functions such as add, drop and other operations
*/
public class GlobalFunctionMgr extends MetaObject {
public class GlobalFunctionMgr extends MetaObject implements GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(GlobalFunctionMgr.class);

// user define function
@SerializedName(value = "name2Function")
private ConcurrentMap<String, ImmutableList<Function>> name2Function = Maps.newConcurrentMap();

public static GlobalFunctionMgr read(DataInput in) throws IOException {
GlobalFunctionMgr globalFunctionMgr = new GlobalFunctionMgr();
globalFunctionMgr.readFields(in);
return globalFunctionMgr;
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_136) {
GlobalFunctionMgr globalFunctionMgr = new GlobalFunctionMgr();
globalFunctionMgr.readFields(in);
return globalFunctionMgr;
} else {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, GlobalFunctionMgr.class);
}
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
// write functions
FunctionUtil.write(out, name2Function);
Text.writeString(out, GsonUtils.GSON.toJson(name2Function));
}

@Override
Expand All @@ -60,6 +70,19 @@ public void readFields(DataInput in) throws IOException {
FunctionUtil.readFields(in, null, name2Function);
}

public void gsonPostProcess() throws IOException {
// translate function to nereids
for (ImmutableList<Function> functions : name2Function.values()) {
for (Function function : functions) {
try {
FunctionUtil.translateToNereids(null, function);
} catch (Exception e) {
LOG.warn("Nereids add function failed", e);
}
}
}
}

public synchronized void addFunction(Function function, boolean ifNotExists) throws UserException {
function.setGlobal(true);
function.checkWritable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.common.io.IOUtils;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.URI;
import org.apache.doris.thrift.TFunction;
Expand All @@ -33,7 +32,6 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -259,18 +257,6 @@ public TFunction toThrift(Type realReturnType, Type[] realArgTypes, Boolean[] re
return fn;
}

@Override
public void write(DataOutput output) throws IOException {
// 1. type
FunctionType.SCALAR.write(output);
// 2. parent
super.writeFields(output);
// 3.symbols
Text.writeString(output, symbolName);
IOUtils.writeOptionString(output, prepareFnSymbol);
IOUtils.writeOptionString(output, closeFnSymbol);
}

public void readFields(DataInput input) throws IOException {
super.readFields(input);
symbolName = Text.readString(input);
Expand Down

0 comments on commit 5685f7d

Please sign in to comment.