Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
dataroaring committed Jun 24, 2024
1 parent 14a763f commit f1cfdde
Show file tree
Hide file tree
Showing 16 changed files with 109 additions and 143 deletions.
17 changes: 10 additions & 7 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package org.apache.doris.analysis;

import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TFileType;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -49,6 +53,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
// just for multi load
public static final String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__";
public static final String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__";
@SerializedName("cts3")
private boolean convertedToS3 = false;

// Only used for recovery
Expand Down Expand Up @@ -132,13 +137,8 @@ public StorageBackend.StorageType storageType() {

@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, name);
properties.put(PersistentFileSystem.STORAGE_TYPE, storageType.name());
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}

public void readFields(DataInput in) throws IOException {
Expand All @@ -163,6 +163,9 @@ public void readFields(DataInput in) throws IOException {
}

public static BrokerDesc read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_136) {
return GsonUtils.GSON.fromJson(Text.readString(in), BrokerDesc.class);
}
BrokerDesc desc = new BrokerDesc();
desc.readFields(in);
return desc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.load.EtlJobType;

import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;

import java.util.Map;

Expand All @@ -38,7 +39,9 @@
// "spark.yarn.queue" = "queue0"
// )
public class ResourceDesc {
@SerializedName("nm")
protected String name;
@SerializedName("prop")
protected Map<String, String> properties;
/**
* TODO(tsy): transfer to LoadType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.analysis;

import com.google.gson.annotations.SerializedName;

import java.util.Map;


Expand All @@ -30,7 +32,7 @@
* The broker's StorageBackend.StorageType desc
*/
public class StorageDesc extends ResourceDesc {

@SerializedName("st")
protected StorageBackend.StorageType storageType;

public StorageDesc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TStorageMedium;

import com.google.common.base.Preconditions;
Expand All @@ -42,6 +44,7 @@
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.collect.Table.Cell;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -58,17 +61,20 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class CatalogRecycleBin extends MasterDaemon implements Writable {
public class CatalogRecycleBin extends MasterDaemon implements Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(CatalogRecycleBin.class);
private static final int DEFAULT_INTERVAL_SECONDS = 30; // 30 seconds
// erase meta at least after minEraseLatency milliseconds
// to avoid erase log ahead of drop log
private static final long minEraseLatency = 10 * 60 * 1000; // 10 min

@SerializedName(value = "itd")
private Map<Long, RecycleDatabaseInfo> idToDatabase;
@SerializedName(value = "itt")
private Map<Long, RecycleTableInfo> idToTable;
@SerializedName(value = "itp")
private Map<Long, RecyclePartitionInfo> idToPartition;

@SerializedName(value = "itr")
private Map<Long, Long> idToRecycleTime;

public CatalogRecycleBin() {
Expand Down Expand Up @@ -1281,33 +1287,21 @@ public synchronized Map<Long, Pair<Long, Long>> getDbToRecycleSize() {
// this class is not protected by any lock, will throw ConcurrentModificationException.
@Override
public synchronized void write(DataOutput out) throws IOException {
int count = idToDatabase.size();
out.writeInt(count);
for (Map.Entry<Long, RecycleDatabaseInfo> entry : idToDatabase.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
}
Text.writeString(out, GsonUtils.GSON.toJson(this));
}

count = idToTable.size();
out.writeInt(count);
for (Map.Entry<Long, RecycleTableInfo> entry : idToTable.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
public static CatalogRecycleBin read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_136) {
return GsonUtils.GSON.fromJson(Text.readString(in), CatalogRecycleBin.class);
}

count = idToPartition.size();
out.writeInt(count);
for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
out.writeLong(entry.getKey());
entry.getValue().write(out);
}
CatalogRecycleBin bin = new CatalogRecycleBin();
bin.readFields(in);
return bin;
}

count = idToRecycleTime.size();
out.writeInt(count);
for (Map.Entry<Long, Long> entry : idToRecycleTime.entrySet()) {
out.writeLong(entry.getKey());
out.writeLong(entry.getValue());
}
public void gsonPostProcess() throws IOException {
updateDbInfoForLowerVersion();
}

public void readFields(DataInput in) throws IOException {
Expand Down Expand Up @@ -1368,9 +1362,12 @@ private void updateDbInfoForLowerVersion() {
}
}

public class RecycleDatabaseInfo implements Writable {
public class RecycleDatabaseInfo {
@SerializedName("db")
private Database db;
@SerializedName("tns")
private Set<String> tableNames;
@SerializedName("tis")
private Set<Long> tableIds;

public RecycleDatabaseInfo() {
Expand Down Expand Up @@ -1400,23 +1397,6 @@ public Set<Long> setTableIds(Set<Long> tableIds) {
return this.tableIds = tableIds;
}

@Override
public void write(DataOutput out) throws IOException {
db.write(out);

int count = tableNames.size();
out.writeInt(count);
for (String tableName : tableNames) {
Text.writeString(out, tableName);
}

count = tableIds.size();
out.writeInt(count);
for (long tableId : tableIds) {
out.writeLong(tableId);
}
}

public void readFields(DataInput in) throws IOException {
db = Database.read(in);

Expand All @@ -1435,8 +1415,10 @@ public void readFields(DataInput in) throws IOException {
}
}

public class RecycleTableInfo implements Writable {
public class RecycleTableInfo {
@SerializedName("did")
private long dbId;
@SerializedName("t")
private Table table;

public RecycleTableInfo() {
Expand All @@ -1456,27 +1438,30 @@ public Table getTable() {
return table;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(dbId);
table.write(out);
}

public void readFields(DataInput in) throws IOException {
dbId = in.readLong();
table = Table.read(in);
}
}

public class RecyclePartitionInfo implements Writable {
public class RecyclePartitionInfo {
@SerializedName("did")
private long dbId;
@SerializedName("tid")
private long tableId;
@SerializedName("p")
private Partition partition;
@SerializedName("r")
private Range<PartitionKey> range;
@SerializedName("lpi")
private PartitionItem listPartitionItem;
@SerializedName("dp")
private DataProperty dataProperty;
@SerializedName("ra")
private ReplicaAllocation replicaAlloc;
@SerializedName("im")
private boolean isInMemory;
@SerializedName("mu")
private boolean isMutable = true;

public RecyclePartitionInfo() {
Expand Down Expand Up @@ -1534,27 +1519,6 @@ public boolean isMutable() {
return isMutable;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(dbId);
out.writeLong(tableId);
partition.write(out);
RangeUtils.writeRange(out, range);
listPartitionItem.write(out);
dataProperty.write(out);
replicaAlloc.write(out);
out.writeBoolean(isInMemory);
if (Config.isCloudMode()) {
// HACK: the origin implementation of the cloud mode has code likes:
//
// out.writeBoolean(isPersistent);
//
// keep the compatibility here.
out.writeBoolean(false);
}
out.writeBoolean(isMutable);
}

public void readFields(DataInput in) throws IOException {
dbId = in.readLong();
tableId = in.readLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ public void gsonPostProcess() throws IOException {
for (ImmutableList<Function> functions : name2Function.values()) {
for (Function function : functions) {
try {
FunctionUtil.translateToNereids(null, function);
FunctionUtil.translateToNereids(this.getFullName(), function);
} catch (Exception e) {
LOG.warn("Nereids add function failed", e);
}
Expand Down
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -2208,7 +2208,7 @@ public long loadTransactionState(DataInputStream dis, long checksum) throws IOEx
}

public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException {
recycleBin.readFields(dis);
recycleBin = CatalogRecycleBin.read(dis);
// add tablet in Recycle bin to TabletInvertedIndex
recycleBin.addTabletToInvertedIndex();
// create DatabaseTransactionMgr for db in recycle bin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.ListUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.gson.GsonUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -65,6 +68,10 @@ public ListPartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr> exprs,
}

public static PartitionInfo read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_136) {
return GsonUtils.GSON.fromJson(Text.readString(in), ListPartitionInfo.class);
}

PartitionInfo partitionInfo = new ListPartitionInfo();
partitionInfo.readFields(in);
return partitionInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.gson.annotations.SerializedName;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -40,6 +39,7 @@ public class ListPartitionItem extends PartitionItem {

@SerializedName(value = "partitionKeys")
private final List<PartitionKey> partitionKeys;
@SerializedName(value = "idp")
private boolean isDefaultPartition = false;

public ListPartitionItem(List<PartitionKey> partitionKeys) {
Expand Down Expand Up @@ -122,14 +122,6 @@ public boolean isGreaterThanSpecifiedTime(int pos, Optional<String> dateFormatOp
return false;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeInt(partitionKeys.size());
for (PartitionKey partitionKey : partitionKeys) {
partitionKey.write(out);
}
}

@Override
public int compareTo(PartitionItem other) {
int thisKeyLen = this.partitionKeys.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Writable;

import java.util.Comparator;
import java.util.Map;
import java.util.Optional;

public abstract class PartitionItem implements Comparable<PartitionItem>, Writable {
public abstract class PartitionItem implements Comparable<PartitionItem> {
public static final Comparator<Map.Entry<Long, PartitionItem>> ITEM_MAP_ENTRY_COMPARATOR =
Comparator.comparing(o -> ((ListPartitionItem) o.getValue()).getItems().iterator().next());

Expand Down
Loading

0 comments on commit f1cfdde

Please sign in to comment.