Skip to content

Commit

Permalink
[Flink] Bump Flink to 1.17.2
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jul 11, 2024
1 parent d04b3cd commit fe78f25
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ connectServer / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
val flinkVersion = "1.16.1"
val flinkVersion = "1.17.2"
val hadoopVersion = "3.3.4"
val scalaTestVersion = "3.2.15"
val scalaTestVersionForConnectors = "3.0.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.delta.flink.internal.table.DeltaCatalogTableHelper.DeltaMetastoreTable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.Schema;
Expand Down Expand Up @@ -218,8 +217,8 @@ public void createTable(DeltaCatalogBaseTable catalogTable, boolean ignoreIfExis
}

// Add table to metastore
DeltaMetastoreTable metastoreTable =
DeltaCatalogTableHelper.prepareMetastoreTable(table, deltaTablePath);
ResolvedCatalogTable metastoreTable =
DeltaCatalogTableHelper.prepareResolvedMetastoreTable(table, deltaTablePath);
this.decoratedCatalog.createTable(tableCatalogPath, metastoreTable, ignoreIfExists);
} else {
// Table does not exist on filesystem, we have to create a new _delta_log
Expand All @@ -237,8 +236,8 @@ public void createTable(DeltaCatalogBaseTable catalogTable, boolean ignoreIfExis
Operation.Name.CREATE_TABLE
);

DeltaMetastoreTable metastoreTable =
DeltaCatalogTableHelper.prepareMetastoreTable(table, deltaTablePath);
ResolvedCatalogTable metastoreTable =
DeltaCatalogTableHelper.prepareResolvedMetastoreTable(table, deltaTablePath);

// add table to metastore
this.decoratedCatalog.createTable(tableCatalogPath, metastoreTable, ignoreIfExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.Column.PhysicalColumn;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
Expand Down Expand Up @@ -227,7 +228,10 @@ public static Map<String, String> filterMetastoreDdlOptions(Map<String, String>
/**
* Prepare catalog table to store in metastore. This table will have only selected
* options from DDL and an empty schema.
*
* @deprecated Use {@link #prepareResolvedMetastoreTable(CatalogBaseTable, String)} instead.
*/
@Deprecated
public static DeltaMetastoreTable prepareMetastoreTable(
CatalogBaseTable table,
String deltaTablePath) {
Expand Down Expand Up @@ -258,6 +262,42 @@ public static DeltaMetastoreTable prepareMetastoreTable(
);
}

/**
* Prepare catalog table to store in metastore. This table will have only selected
* options from DDL and an empty schema.
*/
public static ResolvedCatalogTable prepareResolvedMetastoreTable(
CatalogBaseTable table,
String deltaTablePath) {
// Store only path, table name and connector type in metastore.
// For computed and meta columns are not supported.
Map<String, String> optionsToStoreInMetastore = new HashMap<>();
optionsToStoreInMetastore.put(FactoryUtil.CONNECTOR.key(),
DeltaDynamicTableFactory.DELTA_CONNECTOR_IDENTIFIER);
optionsToStoreInMetastore.put(DeltaTableConnectorOptions.TABLE_PATH.key(),
deltaTablePath);

// Flink's Hive catalog calls CatalogTable::getSchema method (deprecated) and apply null
// check on the resul.
// The default implementation for this method returns null, and the DefaultCatalogTable
// returned by CatalogTable.of( ) does not override it,
// hence we need to have our own wrapper that will return empty TableSchema when
// getSchema method is called.
return new ResolvedCatalogTable(
CatalogTable.of(
// by design don't store schema in metastore. Also watermark and primary key will
// not be stored in metastore and for now it will not be supported by Delta
// connector SQL.
Schema.newBuilder().build(),
table.getComment(),
Collections.emptyList(),
optionsToStoreInMetastore
),
ResolvedSchema.of()
);
}


/**
* Validates DDL options against existing delta table properties. If there is any mismatch (i.e.
* same key, different value) and `allowOverride` is set to false throws an exception. Else,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.delta.flink.source.internal.DeltaSourceOptions;
import io.delta.flink.source.internal.builder.DeltaSourceBuilderBase;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.janino.util.Producer;
import org.codehaus.commons.compiler.util.Producer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down

0 comments on commit fe78f25

Please sign in to comment.