Skip to content

Commit

Permalink
[Fix](trino-connector) refactor code of TrinoConnectorExternalTable (a…
Browse files Browse the repository at this point in the history
…pache#34496)

1. Fix the issue with the trino-connector accessing DeltaLake:
Cache `ConnectorMetadata` and `ConnectorTransactionHandle`.

2. refactor some code
  • Loading branch information
BePPPower authored May 24, 2024
1 parent 49c12b6 commit fc27d7a
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
Expand Down Expand Up @@ -115,17 +113,9 @@ public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColu

@Override
protected void doInitialize() throws UserException {
ExternalTable table = (ExternalTable) desc.getTable();
if (table.isView()) {
throw new AnalysisException(
String.format("Querying external view '%s.%s' is not supported", table.getDbName(),
table.getName()));
}
computeColumnsFilter();
initBackendPolicy();
source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange);
super.doInitialize();
source = new PaimonSource(desc);
Preconditions.checkNotNull(source);
initSchemaParams();
PaimonPredicateConverter paimonPredicateConverter = new PaimonPredicateConverter(
source.getPaimonTable().rowType());
predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts);
Expand Down Expand Up @@ -330,7 +320,7 @@ public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundExce

@Override
public TableIf getTargetTable() {
return source.getTargetTable();
return desc.getTable();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,21 @@
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;

import org.apache.paimon.table.Table;

import java.util.Map;

public class PaimonSource {
private final PaimonExternalTable paimonExtTable;
private final Table originTable;

private final TupleDescriptor desc;

public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.paimonExtTable = table;
this.originTable = paimonExtTable.getPaimonTable();
public PaimonSource(TupleDescriptor desc) {
this.desc = desc;
this.paimonExtTable = (PaimonExternalTable) desc.getTable();
this.originTable = paimonExtTable.getPaimonTable();
}

public TupleDescriptor getDesc() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public Optional<SchemaCacheValue> initSchema() {
}
Map<String, ColumnMetadata> columnMetadataMap = columnMetadataMapBuilder.buildOrThrow();
return Optional.of(
new TrinoSchemaCacheValue(columns, connectorTableHandle, columnHandleMap, columnMetadataMap));
new TrinoSchemaCacheValue(columns, connectorMetadata, connectorTableHandle, connectorTransactionHandle,
columnHandleMap, columnMetadataMap));
}

@Override
Expand All @@ -160,11 +161,7 @@ public TTableDescriptor toThrift() {
return tTableDescriptor;
}

protected Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) {
return trinoConnectorPrimitiveTypeToDorisType(type);
}

private Type trinoConnectorPrimitiveTypeToDorisType(io.trino.spi.type.Type type) {
private Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) {
if (type instanceof BooleanType) {
return Type.BOOLEAN;
} else if (type instanceof TinyintType) {
Expand Down Expand Up @@ -201,19 +198,19 @@ private Type trinoConnectorPrimitiveTypeToDorisType(io.trino.spi.type.Type type)
TimestampWithTimeZoneType timestampWithTimeZoneType = (TimestampWithTimeZoneType) type;
return ScalarType.createDatetimeV2Type(timestampWithTimeZoneType.getPrecision());
} else if (type instanceof io.trino.spi.type.ArrayType) {
Type elementType = trinoConnectorPrimitiveTypeToDorisType(
Type elementType = trinoConnectorTypeToDorisType(
((io.trino.spi.type.ArrayType) type).getElementType());
return ArrayType.create(elementType, true);
} else if (type instanceof io.trino.spi.type.MapType) {
Type keyType = trinoConnectorPrimitiveTypeToDorisType(
Type keyType = trinoConnectorTypeToDorisType(
((io.trino.spi.type.MapType) type).getKeyType());
Type valueType = trinoConnectorPrimitiveTypeToDorisType(
Type valueType = trinoConnectorTypeToDorisType(
((io.trino.spi.type.MapType) type).getValueType());
return new MapType(keyType, valueType, true, true);
} else if (type instanceof RowType) {
ArrayList<StructField> dorisFields = Lists.newArrayList();
for (Field field : ((RowType) type).getFields()) {
Type childType = trinoConnectorPrimitiveTypeToDorisType(field.getType());
Type childType = trinoConnectorTypeToDorisType(field.getType());
if (field.getName().isPresent()) {
dorisFields.add(new StructField(field.getName().get(), childType));
} else {
Expand All @@ -233,6 +230,19 @@ public ConnectorTableHandle getConnectorTableHandle() {
.orElse(null);
}

public ConnectorMetadata getConnectorMetadata() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) value).getConnectorMetadata()).orElse(null);
}

public ConnectorTransactionHandle getConnectorTransactionHandle() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) value).getConnectorTransactionHandle())
.orElse(null);
}

public Map<String, ColumnHandle> getColumnHandleMap() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,69 @@

import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorTableHandle;
import lombok.Getter;
import lombok.Setter;
import io.trino.spi.connector.ConnectorTransactionHandle;

import java.util.List;
import java.util.Map;
import java.util.Optional;

@Getter
@Setter
public class TrinoSchemaCacheValue extends SchemaCacheValue {

private ConnectorMetadata connectorMetadata;
private Optional<ConnectorTableHandle> connectorTableHandle;
private ConnectorTransactionHandle connectorTransactionHandle;
private Map<String, ColumnHandle> columnHandleMap;
private Map<String, ColumnMetadata> columnMetadataMap;

public TrinoSchemaCacheValue(List<Column> schema, Optional<ConnectorTableHandle> connectorTableHandle,
public TrinoSchemaCacheValue(List<Column> schema, ConnectorMetadata connectorMetadata,
Optional<ConnectorTableHandle> connectorTableHandle, ConnectorTransactionHandle connectorTransactionHandle,
Map<String, ColumnHandle> columnHandleMap, Map<String, ColumnMetadata> columnMetadataMap) {
super(schema);
this.connectorMetadata = connectorMetadata;
this.connectorTableHandle = connectorTableHandle;
this.connectorTransactionHandle = connectorTransactionHandle;
this.columnHandleMap = columnHandleMap;
this.columnMetadataMap = columnMetadataMap;
}

public ConnectorMetadata getConnectorMetadata() {
return connectorMetadata;
}

public Optional<ConnectorTableHandle> getConnectorTableHandle() {
return connectorTableHandle;
}

public ConnectorTransactionHandle getConnectorTransactionHandle() {
return connectorTransactionHandle;
}

public Map<String, ColumnHandle> getColumnHandleMap() {
return columnHandleMap;
}

public Map<String, ColumnMetadata> getColumnMetadataMap() {
return columnMetadataMap;
}

public void setConnectorMetadata(ConnectorMetadata connectorMetadata) {
this.connectorMetadata = connectorMetadata;
}

public void setConnectorTableHandle(Optional<ConnectorTableHandle> connectorTableHandle) {
this.connectorTableHandle = connectorTableHandle;
}

public void setConnectorTransactionHandle(ConnectorTransactionHandle connectorTransactionHandle) {
this.connectorTransactionHandle = connectorTransactionHandle;
}

public void setColumnHandleMap(Map<String, ColumnHandle> columnHandleMap) {
this.columnHandleMap = columnHandleMap;
}

public void setColumnMetadataMap(Map<String, ColumnMetadata> columnMetadataMap) {
this.columnMetadataMap = columnMetadataMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
Expand Down Expand Up @@ -66,13 +65,11 @@
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.transaction.IsolationLevel;
import io.trino.spi.type.TypeManager;
import io.trino.split.BufferingSplitSource;
import io.trino.split.ConnectorAwareSplitSource;
Expand Down Expand Up @@ -106,17 +103,8 @@ public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean needC

@Override
protected void doInitialize() throws UserException {
TrinoConnectorExternalTable table = (TrinoConnectorExternalTable) desc.getTable();
if (table.isView()) {
throw new AnalysisException(
String.format("Querying external view '%s.%s' is not supported", table.getDbName(),
table.getName()));
}

computeColumnsFilter();
initBackendPolicy();
source = new TrinoConnectorSource(desc, table);
initSchemaParams();
super.doInitialize();
source = new TrinoConnectorSource(desc);
convertPredicate();
}

Expand Down Expand Up @@ -144,27 +132,28 @@ protected void convertPredicate() throws UserException {
public List<Split> getSplits() throws UserException {
// 1. Get necessary objects
Connector connector = source.getConnector();
ConnectorTransactionHandle connectorTransactionHandle = connector.beginTransaction(
IsolationLevel.READ_UNCOMMITTED, true, true);
source.setConnectorTransactionHandle(connectorTransactionHandle);
connectorMetadata = source.getConnectorMetadata();
ConnectorSession connectorSession = source.getTrinoSession().toConnectorSession(source.getCatalogHandle());

connectorMetadata = connector.getMetadata(connectorSession, connectorTransactionHandle);
// 2. Begin query
connectorMetadata.beginQuery(connectorSession);
applyPushDown(connectorSession);

// 3. get splitSource
List<Split> splits = Lists.newArrayList();
try (SplitSource splitSource = getTrinoSplitSource(connector, source.getTrinoSession(),
connectorTransactionHandle, source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) {
source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) {
// 4. get trino.Splits and convert it to doris.Splits
while (!splitSource.isFinished()) {
for (io.trino.metadata.Split split : getNextSplitBatch(splitSource)) {
splits.add(new TrinoConnectorSplit(split.getConnectorSplit(), source.getConnectorName()));
}
}
}

// 4. Clear query
// It is necessary for hive connector
connectorMetadata.cleanupQuery(connectorSession);
return splits;
}

Expand Down Expand Up @@ -216,8 +205,7 @@ private void applyPushDown(ConnectorSession connectorSession) {
// }
}

private SplitSource getTrinoSplitSource(Connector connector, Session session,
ConnectorTransactionHandle connectorTransactionHandle, ConnectorTableHandle table,
private SplitSource getTrinoSplitSource(Connector connector, Session session, ConnectorTableHandle table,
DynamicFilter dynamicFilter) {
ConnectorSplitManager splitManager = connector.getSplitManager();

Expand All @@ -227,8 +215,8 @@ private SplitSource getTrinoSplitSource(Connector connector, Session session,

ConnectorSession connectorSession = session.toConnectorSession(source.getCatalogHandle());
// Constraint is not used by Hive/BigQuery Connector
ConnectorSplitSource connectorSplitSource = splitManager.getSplits(connectorTransactionHandle, connectorSession,
table, dynamicFilter, constraint);
ConnectorSplitSource connectorSplitSource = splitManager.getSplits(source.getConnectorTransactionHandle(),
connectorSession, table, dynamicFilter, constraint);

SplitSource splitSource = new ConnectorAwareSplitSource(source.getCatalogHandle(), connectorSplitSource);
if (this.minScheduleSplitBatchSize > 1) {
Expand Down Expand Up @@ -386,7 +374,9 @@ public TFileAttributes getFileAttributes() throws UserException {

@Override
public TableIf getTargetTable() {
return source.getTargetTable();
// can not use `source.getTargetTable()`
// because source is null when called getTargetTable
return desc.getTable();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.connector.ConnectorName;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;

Expand All @@ -40,16 +41,19 @@ public class TrinoConnectorSource {
private final ConnectorName connectorName;
private ConnectorTransactionHandle connectorTransactionHandle;
private ConnectorTableHandle trinoConnectorTableHandle;
private ConnectorMetadata connectorMetadata;

public TrinoConnectorSource(TupleDescriptor desc, TrinoConnectorExternalTable table) {
public TrinoConnectorSource(TupleDescriptor desc) {
this.desc = desc;
this.trinoConnectorExtTable = table;
this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) table.getCatalog();
this.trinoConnectorExtTable = (TrinoConnectorExternalTable) desc.getTable();
this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) trinoConnectorExtTable.getCatalog();
this.catalogHandle = trinoConnectorExternalCatalog.getTrinoCatalogHandle();
this.trinoConnectorTableHandle = table.getConnectorTableHandle();
this.trinoConnectorTableHandle = trinoConnectorExtTable.getConnectorTableHandle();
this.connectorMetadata = trinoConnectorExtTable.getConnectorMetadata();
this.connectorTransactionHandle = trinoConnectorExtTable.getConnectorTransactionHandle();
this.trinoSession = trinoConnectorExternalCatalog.getTrinoSession();
this.connector = ((TrinoConnectorExternalCatalog) table.getCatalog()).getConnector();
this.connectorName = ((TrinoConnectorExternalCatalog) table.getCatalog()).getConnectorName();
this.connector = trinoConnectorExternalCatalog.getConnector();
this.connectorName = trinoConnectorExternalCatalog.getConnectorName();
}

public TupleDescriptor getDesc() {
Expand Down Expand Up @@ -88,8 +92,8 @@ public ConnectorName getConnectorName() {
return connectorName;
}

public void setConnectorTransactionHandle(ConnectorTransactionHandle connectorTransactionHandle) {
this.connectorTransactionHandle = connectorTransactionHandle;
public ConnectorMetadata getConnectorMetadata() {
return connectorMetadata;
}

public void setTrinoConnectorTableHandle(ConnectorTableHandle trinoConnectorExtTableHandle) {
Expand Down

0 comments on commit fc27d7a

Please sign in to comment.