From 45f9324925277a125184a0281f7db689abe27c2c Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 11:50:18 +0800 Subject: [PATCH] fix --- .../flink/lookup/FileStoreLookupFunction.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java index 92f9b2d93124..bdd493c3f586 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java @@ -28,6 +28,7 @@ import org.apache.paimon.flink.utils.TableScanUtils; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.OutOfRangeException; @@ -162,15 +163,20 @@ private void open() throws Exception { int[] projection = projectFields.stream().mapToInt(fieldNames::indexOf).toArray(); FileStoreTable storeTable = (FileStoreTable) table; + LOG.info("Creating lookup table for {}.", table.name()); if (options.get(LOOKUP_CACHE_MODE) == LookupCacheMode.AUTO && new HashSet<>(table.primaryKeys()).equals(new HashSet<>(joinKeys))) { + String message = + String.format( + "{}={} and primary keys equal to join keys, trying to create PrimaryKeyPartialLookupTable.", + LOOKUP_CACHE_MODE.key(), + LookupCacheMode.AUTO); if (isRemoteServiceAvailable(storeTable)) { this.lookupTable = PrimaryKeyPartialLookupTable.createRemoteTable( storeTable, projection, joinKeys); LOG.info( - "Created lookup table for {}. Type is PrimaryKeyPartialLookupTable with remote service.", - table.name()); + "Remote service is available. Created PrimaryKeyPartialLookupTable with remote service."); } else { try { this.lookupTable = @@ -181,9 +187,12 @@ private void open() throws Exception { joinKeys, getRequireCachedBucketIds()); LOG.info( - "Created lookup table for {}. Type is PrimaryKeyPartialLookupTable with LocalQueryExecutor.", - table.name()); - } catch (UnsupportedOperationException ignore2) { + "Remote service isn't available. Created PrimaryKeyPartialLookupTable with LocalQueryExecutor."); + } catch (UnsupportedOperationException ignore) { + LOG.info( + "Remote service isn't available. Cannot create PrimaryKeyPartialLookupTable with LocalQueryExecutor " + + "because bucket mode isn't {}. Will create FullCacheLookupTable.", + BucketMode.HASH_FIXED); } } } @@ -199,10 +208,7 @@ private void open() throws Exception { joinKeys, getRequireCachedBucketIds()); this.lookupTable = FullCacheLookupTable.create(context, options.get(LOOKUP_CACHE_ROWS)); - LOG.info( - "Created lookup table for {}. Type is {}.", - table.name(), - lookupTable.getClass().getSimpleName()); + LOG.info("Created {}.", lookupTable.getClass().getSimpleName()); } if (partitionLoader != null) {