diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java index 003ced7ead735b..3ab5b3ec657dbd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java @@ -20,12 +20,10 @@ import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LocationPath; -import org.apache.doris.datasource.ExternalTable; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; @@ -115,17 +113,9 @@ public PaimonScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColu @Override protected void doInitialize() throws UserException { - ExternalTable table = (ExternalTable) desc.getTable(); - if (table.isView()) { - throw new AnalysisException( - String.format("Querying external view '%s.%s' is not supported", table.getDbName(), - table.getName())); - } - computeColumnsFilter(); - initBackendPolicy(); - source = new PaimonSource((PaimonExternalTable) table, desc, columnNameToRange); + super.doInitialize(); + source = new PaimonSource(desc); Preconditions.checkNotNull(source); - initSchemaParams(); PaimonPredicateConverter paimonPredicateConverter = new PaimonPredicateConverter( source.getPaimonTable().rowType()); predicates = paimonPredicateConverter.convertToPaimonExpr(conjuncts); @@ -330,7 +320,7 @@ public List getPathPartitionKeys() throws DdlException, MetaNotFoundExce @Override public TableIf getTargetTable() { - return source.getTargetTable(); + return desc.getTable(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java index 9ac44537e8aeec..da948d2b063e2e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java @@ -23,12 +23,10 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalTable; import org.apache.doris.datasource.property.constants.PaimonProperties; -import org.apache.doris.planner.ColumnRange; import org.apache.doris.thrift.TFileAttributes; import org.apache.paimon.table.Table; -import java.util.Map; public class PaimonSource { private final PaimonExternalTable paimonExtTable; @@ -36,11 +34,10 @@ public class PaimonSource { private final TupleDescriptor desc; - public PaimonSource(PaimonExternalTable table, TupleDescriptor desc, - Map columnNameToRange) { - this.paimonExtTable = table; - this.originTable = paimonExtTable.getPaimonTable(); + public PaimonSource(TupleDescriptor desc) { this.desc = desc; + this.paimonExtTable = (PaimonExternalTable) desc.getTable(); + this.originTable = paimonExtTable.getPaimonTable(); } public TupleDescriptor getDesc() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java index 3cba264861e256..27f9b8086a9cef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoConnectorExternalTable.java @@ -143,7 +143,8 @@ public Optional initSchema() { } Map columnMetadataMap = columnMetadataMapBuilder.buildOrThrow(); return Optional.of( - new TrinoSchemaCacheValue(columns, connectorTableHandle, columnHandleMap, columnMetadataMap)); + new TrinoSchemaCacheValue(columns, connectorMetadata, connectorTableHandle, connectorTransactionHandle, + columnHandleMap, columnMetadataMap)); } @Override @@ -160,11 +161,7 @@ public TTableDescriptor toThrift() { return tTableDescriptor; } - protected Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) { - return trinoConnectorPrimitiveTypeToDorisType(type); - } - - private Type trinoConnectorPrimitiveTypeToDorisType(io.trino.spi.type.Type type) { + private Type trinoConnectorTypeToDorisType(io.trino.spi.type.Type type) { if (type instanceof BooleanType) { return Type.BOOLEAN; } else if (type instanceof TinyintType) { @@ -201,19 +198,19 @@ private Type trinoConnectorPrimitiveTypeToDorisType(io.trino.spi.type.Type type) TimestampWithTimeZoneType timestampWithTimeZoneType = (TimestampWithTimeZoneType) type; return ScalarType.createDatetimeV2Type(timestampWithTimeZoneType.getPrecision()); } else if (type instanceof io.trino.spi.type.ArrayType) { - Type elementType = trinoConnectorPrimitiveTypeToDorisType( + Type elementType = trinoConnectorTypeToDorisType( ((io.trino.spi.type.ArrayType) type).getElementType()); return ArrayType.create(elementType, true); } else if (type instanceof io.trino.spi.type.MapType) { - Type keyType = trinoConnectorPrimitiveTypeToDorisType( + Type keyType = trinoConnectorTypeToDorisType( ((io.trino.spi.type.MapType) type).getKeyType()); - Type valueType = trinoConnectorPrimitiveTypeToDorisType( + Type valueType = trinoConnectorTypeToDorisType( ((io.trino.spi.type.MapType) type).getValueType()); return new MapType(keyType, valueType, true, true); } else if (type instanceof RowType) { ArrayList dorisFields = Lists.newArrayList(); for (Field field : ((RowType) type).getFields()) { - Type childType = trinoConnectorPrimitiveTypeToDorisType(field.getType()); + Type childType = trinoConnectorTypeToDorisType(field.getType()); if (field.getName().isPresent()) { dorisFields.add(new StructField(field.getName().get(), childType)); } else { @@ -233,6 +230,19 @@ public ConnectorTableHandle getConnectorTableHandle() { .orElse(null); } + public ConnectorMetadata getConnectorMetadata() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) value).getConnectorMetadata()).orElse(null); + } + + public ConnectorTransactionHandle getConnectorTransactionHandle() { + makeSureInitialized(); + Optional schemaCacheValue = getSchemaCacheValue(); + return schemaCacheValue.map(value -> ((TrinoSchemaCacheValue) value).getConnectorTransactionHandle()) + .orElse(null); + } + public Map getColumnHandleMap() { makeSureInitialized(); Optional schemaCacheValue = getSchemaCacheValue(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java index dc629190cbd09e..43bbe76c3b303b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/TrinoSchemaCacheValue.java @@ -22,27 +22,69 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorTableHandle; -import lombok.Getter; -import lombok.Setter; +import io.trino.spi.connector.ConnectorTransactionHandle; import java.util.List; import java.util.Map; import java.util.Optional; -@Getter -@Setter public class TrinoSchemaCacheValue extends SchemaCacheValue { - + private ConnectorMetadata connectorMetadata; private Optional connectorTableHandle; + private ConnectorTransactionHandle connectorTransactionHandle; private Map columnHandleMap; private Map columnMetadataMap; - public TrinoSchemaCacheValue(List schema, Optional connectorTableHandle, + public TrinoSchemaCacheValue(List schema, ConnectorMetadata connectorMetadata, + Optional connectorTableHandle, ConnectorTransactionHandle connectorTransactionHandle, Map columnHandleMap, Map columnMetadataMap) { super(schema); + this.connectorMetadata = connectorMetadata; + this.connectorTableHandle = connectorTableHandle; + this.connectorTransactionHandle = connectorTransactionHandle; + this.columnHandleMap = columnHandleMap; + this.columnMetadataMap = columnMetadataMap; + } + + public ConnectorMetadata getConnectorMetadata() { + return connectorMetadata; + } + + public Optional getConnectorTableHandle() { + return connectorTableHandle; + } + + public ConnectorTransactionHandle getConnectorTransactionHandle() { + return connectorTransactionHandle; + } + + public Map getColumnHandleMap() { + return columnHandleMap; + } + + public Map getColumnMetadataMap() { + return columnMetadataMap; + } + + public void setConnectorMetadata(ConnectorMetadata connectorMetadata) { + this.connectorMetadata = connectorMetadata; + } + + public void setConnectorTableHandle(Optional connectorTableHandle) { this.connectorTableHandle = connectorTableHandle; + } + + public void setConnectorTransactionHandle(ConnectorTransactionHandle connectorTransactionHandle) { + this.connectorTransactionHandle = connectorTransactionHandle; + } + + public void setColumnHandleMap(Map columnHandleMap) { this.columnHandleMap = columnHandleMap; + } + + public void setColumnMetadataMap(Map columnMetadataMap) { this.columnMetadataMap = columnMetadataMap; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java index 9c61d54614a070..2b09e30026c5ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java @@ -27,7 +27,6 @@ import org.apache.doris.common.UserException; import org.apache.doris.datasource.FileQueryScanNode; import org.apache.doris.datasource.TableFormatType; -import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable; import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; @@ -66,13 +65,11 @@ import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.ConnectorTableHandle; -import io.trino.spi.connector.ConnectorTransactionHandle; import io.trino.spi.connector.Constraint; import io.trino.spi.connector.ConstraintApplicationResult; import io.trino.spi.connector.DynamicFilter; import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.predicate.TupleDomain; -import io.trino.spi.transaction.IsolationLevel; import io.trino.spi.type.TypeManager; import io.trino.split.BufferingSplitSource; import io.trino.split.ConnectorAwareSplitSource; @@ -106,17 +103,8 @@ public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean needC @Override protected void doInitialize() throws UserException { - TrinoConnectorExternalTable table = (TrinoConnectorExternalTable) desc.getTable(); - if (table.isView()) { - throw new AnalysisException( - String.format("Querying external view '%s.%s' is not supported", table.getDbName(), - table.getName())); - } - - computeColumnsFilter(); - initBackendPolicy(); - source = new TrinoConnectorSource(desc, table); - initSchemaParams(); + super.doInitialize(); + source = new TrinoConnectorSource(desc); convertPredicate(); } @@ -144,12 +132,9 @@ protected void convertPredicate() throws UserException { public List getSplits() throws UserException { // 1. Get necessary objects Connector connector = source.getConnector(); - ConnectorTransactionHandle connectorTransactionHandle = connector.beginTransaction( - IsolationLevel.READ_UNCOMMITTED, true, true); - source.setConnectorTransactionHandle(connectorTransactionHandle); + connectorMetadata = source.getConnectorMetadata(); ConnectorSession connectorSession = source.getTrinoSession().toConnectorSession(source.getCatalogHandle()); - connectorMetadata = connector.getMetadata(connectorSession, connectorTransactionHandle); // 2. Begin query connectorMetadata.beginQuery(connectorSession); applyPushDown(connectorSession); @@ -157,7 +142,7 @@ public List getSplits() throws UserException { // 3. get splitSource List splits = Lists.newArrayList(); try (SplitSource splitSource = getTrinoSplitSource(connector, source.getTrinoSession(), - connectorTransactionHandle, source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) { + source.getTrinoConnectorTableHandle(), DynamicFilter.EMPTY)) { // 4. get trino.Splits and convert it to doris.Splits while (!splitSource.isFinished()) { for (io.trino.metadata.Split split : getNextSplitBatch(splitSource)) { @@ -165,6 +150,10 @@ public List getSplits() throws UserException { } } } + + // 4. Clear query + // It is necessary for hive connector + connectorMetadata.cleanupQuery(connectorSession); return splits; } @@ -216,8 +205,7 @@ private void applyPushDown(ConnectorSession connectorSession) { // } } - private SplitSource getTrinoSplitSource(Connector connector, Session session, - ConnectorTransactionHandle connectorTransactionHandle, ConnectorTableHandle table, + private SplitSource getTrinoSplitSource(Connector connector, Session session, ConnectorTableHandle table, DynamicFilter dynamicFilter) { ConnectorSplitManager splitManager = connector.getSplitManager(); @@ -227,8 +215,8 @@ private SplitSource getTrinoSplitSource(Connector connector, Session session, ConnectorSession connectorSession = session.toConnectorSession(source.getCatalogHandle()); // Constraint is not used by Hive/BigQuery Connector - ConnectorSplitSource connectorSplitSource = splitManager.getSplits(connectorTransactionHandle, connectorSession, - table, dynamicFilter, constraint); + ConnectorSplitSource connectorSplitSource = splitManager.getSplits(source.getConnectorTransactionHandle(), + connectorSession, table, dynamicFilter, constraint); SplitSource splitSource = new ConnectorAwareSplitSource(source.getCatalogHandle(), connectorSplitSource); if (this.minScheduleSplitBatchSize > 1) { @@ -386,7 +374,9 @@ public TFileAttributes getFileAttributes() throws UserException { @Override public TableIf getTargetTable() { - return source.getTargetTable(); + // can not use `source.getTargetTable()` + // because source is null when called getTargetTable + return desc.getTable(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java index 85e36517baedb1..20dcf996595a48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorSource.java @@ -27,6 +27,7 @@ import io.trino.connector.ConnectorName; import io.trino.spi.connector.CatalogHandle; import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTransactionHandle; @@ -40,16 +41,19 @@ public class TrinoConnectorSource { private final ConnectorName connectorName; private ConnectorTransactionHandle connectorTransactionHandle; private ConnectorTableHandle trinoConnectorTableHandle; + private ConnectorMetadata connectorMetadata; - public TrinoConnectorSource(TupleDescriptor desc, TrinoConnectorExternalTable table) { + public TrinoConnectorSource(TupleDescriptor desc) { this.desc = desc; - this.trinoConnectorExtTable = table; - this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) table.getCatalog(); + this.trinoConnectorExtTable = (TrinoConnectorExternalTable) desc.getTable(); + this.trinoConnectorExternalCatalog = (TrinoConnectorExternalCatalog) trinoConnectorExtTable.getCatalog(); this.catalogHandle = trinoConnectorExternalCatalog.getTrinoCatalogHandle(); - this.trinoConnectorTableHandle = table.getConnectorTableHandle(); + this.trinoConnectorTableHandle = trinoConnectorExtTable.getConnectorTableHandle(); + this.connectorMetadata = trinoConnectorExtTable.getConnectorMetadata(); + this.connectorTransactionHandle = trinoConnectorExtTable.getConnectorTransactionHandle(); this.trinoSession = trinoConnectorExternalCatalog.getTrinoSession(); - this.connector = ((TrinoConnectorExternalCatalog) table.getCatalog()).getConnector(); - this.connectorName = ((TrinoConnectorExternalCatalog) table.getCatalog()).getConnectorName(); + this.connector = trinoConnectorExternalCatalog.getConnector(); + this.connectorName = trinoConnectorExternalCatalog.getConnectorName(); } public TupleDescriptor getDesc() { @@ -88,8 +92,8 @@ public ConnectorName getConnectorName() { return connectorName; } - public void setConnectorTransactionHandle(ConnectorTransactionHandle connectorTransactionHandle) { - this.connectorTransactionHandle = connectorTransactionHandle; + public ConnectorMetadata getConnectorMetadata() { + return connectorMetadata; } public void setTrinoConnectorTableHandle(ConnectorTableHandle trinoConnectorExtTableHandle) {