Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming without ENABLE_SCHEMA_EVOLUTION; postgres JSON mapped to snowflake VARIANT #536

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ public class SnowflakeSinkConnectorConfig {
public static final String ENABLE_SCHEMATIZATION_CONFIG = "snowflake.enable.schematization";
public static final String ENABLE_SCHEMATIZATION_DEFAULT = "false";

public static final String SCHEMATIZATION_AUTO_CONFIG = "snowflake.schematization.auto";
public static final String SCHEMATIZATION_AUTO_DEFAULT = "true";
public static final String SCHEMATIZATION_AUTO_DISPLAY = "Use automatic schema evolution";
public static final String SCHEMATIZATION_AUTO_DOC =
"If true, use snowflake automatic schema evolution feature."
+ "NOTE: you need to grant evolve schema to " + SNOWFLAKE_USER;

// Proxy Info
private static final String PROXY_INFO = "Proxy Info";
public static final String JVM_PROXY_HOST = "jvm.proxy.host";
Expand Down Expand Up @@ -334,7 +341,10 @@ static ConfigDef newConfigDef() {
topicToTableValidator,
Importance.LOW,
"Map of topics to tables (optional). Format : comma-separated tuples, e.g."
+ " <topic-1>:<table-1>,<topic-2>:<table-2>,... ",
+ " <topic-1>:<table-1>,<topic-2>:<table-2>,... \n"
+ "Generic regex matching is possible using the following syntax:"
+ Utils.TOPIC_MATCHER_PREFIX + "^[^.]\\w+.\\w+.(.*):$1\n"
+ "NOTE: topics names cannot contain \":\" or \",\" so the regex should not contain these characters either\n",
CONNECTOR_CONFIG,
0,
ConfigDef.Width.NONE,
Expand Down
29 changes: 22 additions & 7 deletions src/main/java/com/snowflake/kafka/connector/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,18 @@ public static String generateValidName(String topic, Map<String, String> topic2t
if (Utils.isValidSnowflakeObjectIdentifier(topic)) {
return topic;
}

for (Map.Entry<String, String> entry : topic2table.entrySet()) {
if (entry.getKey().startsWith(TOPIC_MATCHER_PREFIX)) {
String regex = entry.getKey().replaceFirst(TOPIC_MATCHER_PREFIX, "");
String finalTableName = topic.replaceAll(regex, entry.getValue());
if (Utils.isValidSnowflakeObjectIdentifier(finalTableName)) {
topic2table.put(topic, finalTableName);
return finalTableName;
}
}
}

int hash = Math.abs(topic.hashCode());

StringBuilder result = new StringBuilder();
Expand Down Expand Up @@ -538,6 +550,7 @@ public static String generateValidName(String topic, Map<String, String> topic2t
return result.toString();
}

public static String TOPIC_MATCHER_PREFIX = "REGEX_MATCHER>";
public static Map<String, String> parseTopicToTableMap(String input) {
Map<String, String> topic2Table = new HashMap<>();
boolean isInvalid = false;
Expand All @@ -553,13 +566,15 @@ public static Map<String, String> parseTopicToTableMap(String input) {
String topic = tt[0].trim();
String table = tt[1].trim();

if (!isValidSnowflakeTableName(table)) {
LOGGER.error(
"table name {} should have at least 2 "
+ "characters, start with _a-zA-Z, and only contains "
+ "_$a-zA-z0-9",
table);
isInvalid = true;
if (!topic.startsWith(TOPIC_MATCHER_PREFIX)) {
if (!isValidSnowflakeTableName(table)) {
LOGGER.error(
"table name {} should have at least 2 "
+ "characters, start with _a-zA-Z, and only contains "
+ "_$a-zA-z0-9",
table);
isInvalid = true;
}
}

if (topic2Table.containsKey(topic)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,5 +281,5 @@ public interface SnowflakeConnectionService {
*
* @param tableName table name
*/
void createTableWithOnlyMetadataColumn(String tableName);
void createTableWithOnlyMetadataColumn(String tableName, boolean autoSchematization);
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void createTable(final String tableName) {
}

@Override
public void createTableWithOnlyMetadataColumn(final String tableName) {
public void createTableWithOnlyMetadataColumn(final String tableName, final boolean autoSchematization) {
checkConnection();
InternalUtils.assertNotEmpty("tableName", tableName);
String createTableQuery =
Expand All @@ -151,17 +151,19 @@ public void createTableWithOnlyMetadataColumn(final String tableName) {
throw SnowflakeErrors.ERROR_2007.getException(e);
}

// Enable schema evolution by default if the table is created by the connector
String enableSchemaEvolutionQuery =
"alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true";
try {
PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery);
stmt.setString(1, tableName);
stmt.executeQuery();
} catch (SQLException e) {
// Skip the error given that schema evolution is still under PrPr
LOG_WARN_MSG(
"Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage());
if (autoSchematization) {
// Enable schema evolution by default if the table is created by the connector
String enableSchemaEvolutionQuery =
"alter table identifier(?) set ENABLE_SCHEMA_EVOLUTION = true";
try {
PreparedStatement stmt = conn.prepareStatement(enableSchemaEvolutionQuery);
stmt.setString(1, tableName);
stmt.executeQuery();
} catch (SQLException e) {
// Skip the error given that schema evolution is still under PrPr
LOG_WARN_MSG(
"Enable schema evolution failed on table: {}, message: {}", tableName, e.getMessage());
}
Comment on lines +154 to +166
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work because we rely on the schema evolution to create the table with the correct schema, if you don't want schema evolution on the table, you should create the table yourself and it will have schema evolution turned off by default

Copy link
Author

@acristu acristu Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the reply, the schema is evolved here: https://github.com/streamkap-com/snowflake-kafka-connector/blob/63bc190dd75692e6423d3f50b25143dafaa40d1a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java#L653. The main point of this draft PR is to assess if "manual schema evolution" from SchematizationUtils.evolveSchemaIfNeeded can be supported as an alternative to "automatic schema evolution". Our main concern is that that "manual schema evolution" is currently done on the error path, if the insertRow fails.

We cand contribute this "manual schema evolution" option, when setting snowflake.schematization.auto=false the connector would use the connect schema from the schema registry or embedded into the records to evolve the snowflake schema before inserting the data. But we wanted to see first if this approach is acceptable for you going forward.

Copy link
Contributor

@sfc-gh-tzhang sfc-gh-tzhang Feb 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our main concern is that that "manual schema evolution" is currently done on the error path, if the insertRow fails.

This is by design, insertRow will always fail first because we just create a table with RECORD_METADATA only during connector start up

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, that should not be a problem, I mean schema changes should not happen often. The problem is that without these proposed changes https://github.com/streamkap-com/snowflake-kafka-connector/blob/63bc190dd75692e6423d3f50b25143dafaa40d1a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java#L264, we cannot enable schema evolution without setting ENABLE_SCHEMA_EVOLUTION. We would like to use only SchematizationUtils.evolveSchemaIfNeeded and not ENABLE_SCHEMA_EVOLUTION.

}

LOG_INFO_MSG("Created table {} with only RECORD_METADATA column", tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,25 @@
import com.snowflake.kafka.connector.internal.SnowflakeErrors;
import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException;
import com.snowflake.kafka.connector.records.RecordService;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nonnull;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode;

import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -83,7 +95,15 @@ public static void evolveSchemaIfNeeded(

// Add columns if needed, ignore any exceptions since other task might be succeeded
if (extraColNames != null) {
Map<String, String> extraColumnsToType = getColumnTypes(record, extraColNames);
List<String> fieldNamesOrderedAsOnSource = Stream.concat(
record.keySchema() != null ? record.keySchema().fields().stream().map(f -> f.name()) : Stream.empty(),
record.valueSchema() != null ? record.valueSchema().fields().stream().map(f -> f.name()) : Stream.empty()
).collect(Collectors.toList());
List<String> extraColNamesOrderedAsOnSource = new ArrayList<>(extraColNames);
extraColNamesOrderedAsOnSource.sort(
Comparator.comparingInt(fieldNamesOrderedAsOnSource::indexOf));
Map<String, String> extraColumnsToType = getColumnTypes(record, extraColNamesOrderedAsOnSource);

try {
conn.appendColumnsToTable(tableName, extraColumnsToType);
} catch (SnowflakeKafkaConnectorException e) {
Expand All @@ -109,7 +129,7 @@ static Map<String, String> getColumnTypes(SinkRecord record, List<String> column
if (columnNames == null) {
return new HashMap<>();
}
Map<String, String> columnToType = new HashMap<>();
Map<String, String> columnToType = new LinkedHashMap<>();
Map<String, String> schemaMap = getSchemaMapFromRecord(record);
JsonNode recordNode = RecordService.convertToJson(record.valueSchema(), record.value());

Expand Down Expand Up @@ -145,7 +165,7 @@ private static Map<String, String> getSchemaMapFromRecord(SinkRecord record) {
Schema schema = record.valueSchema();
if (schema != null) {
for (Field field : schema.fields()) {
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type()));
schemaMap.put(field.name(), convertToSnowflakeType(field.schema().type(), field.schema().name()));
}
}
return schemaMap;
Expand All @@ -158,7 +178,7 @@ private static String inferDataTypeFromJsonObject(JsonNode value) {
// only when the type of the value is unrecognizable for JAVA
throw SnowflakeErrors.ERROR_5021.getException("class: " + value.getClass());
}
return convertToSnowflakeType(schemaType);
return convertToSnowflakeType(schemaType, null);
}

/** Convert a json node type to kafka data type */
Expand Down Expand Up @@ -192,7 +212,25 @@ private static Type convertJsonNodeTypeToKafkaType(JsonNode value) {
}

/** Convert the kafka data type to Snowflake data type */
private static String convertToSnowflakeType(Type kafkaType) {
private static String convertToSnowflakeType(Type kafkaType, String semanticType) {
if (semanticType != null) {
switch (semanticType) {
case Decimal.LOGICAL_NAME:
return "DOUBLE";
case Time.LOGICAL_NAME:
case Timestamp.LOGICAL_NAME:
case "io.debezium.time.ZonedTimestamp":
case "io.debezium.time.ZonedTime":
case "io.debezium.time.MicroTime":
case "io.debezium.time.Timestamp":
case "io.debezium.time.MicroTimestamp":
return "TIMESTAMP";
case Date.LOGICAL_NAME:
case "io.debezium.time.Date":
return "DATE";
}
}

switch (kafkaType) {
case INT8:
return "BYTEINT";
Expand All @@ -209,6 +247,9 @@ private static String convertToSnowflakeType(Type kafkaType) {
case BOOLEAN:
return "BOOLEAN";
case STRING:
if (semanticType != null && semanticType.equals("io.debezium.data.Json")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the issue of using VARCHAR in this case?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By using varchar you'd have to parse json each time you wanted to pull out a field. With variant can use json.field. So this will skip extra processing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, this makes sense, I guess this is a general issue for json value since they will be all mapped to STRING, I will see what we can do here, thanks!

return "VARIANT";
}
return "VARCHAR";
case BYTES:
return "BINARY";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public class SnowflakeSinkServiceV2 implements SnowflakeSinkService {
private final String streamingIngestClientName;

private boolean enableSchematization;
private boolean autoSchematization;

/**
* Key is formulated in {@link #partitionChannelKey(String, int)} }
Expand Down Expand Up @@ -133,6 +134,8 @@ public SnowflakeSinkServiceV2(

this.enableSchematization =
this.recordService.setAndGetEnableSchematizationFromConfig(this.connectorConfig);
this.autoSchematization =
this.recordService.setAndGetAutoSchematizationFromConfig(this.connectorConfig);

this.taskId = connectorConfig.getOrDefault(Utils.TASK_ID, "-1");
this.streamingIngestClientName =
Expand Down Expand Up @@ -531,7 +534,7 @@ private void createTableIfNotExists(final String tableName) {
if (this.enableSchematization) {
// Always create the table with RECORD_METADATA only and rely on schema evolution to update
// the schema
this.conn.createTableWithOnlyMetadataColumn(tableName);
this.conn.createTableWithOnlyMetadataColumn(tableName, this.autoSchematization);
} else {
this.conn.createTable(tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public class TopicPartitionChannel {

// Whether schematization has been enabled.
private final boolean enableSchematization;
private final boolean autoSchematization;

// Whether schema evolution could be done on this channel
private final boolean enableSchemaEvolution;
Expand Down Expand Up @@ -255,11 +256,14 @@ public TopicPartitionChannel(
/* Schematization related properties */
this.enableSchematization =
this.recordService.setAndGetEnableSchematizationFromConfig(sfConnectorConfig);
this.autoSchematization =
this.recordService.setAndGetAutoSchematizationFromConfig(sfConnectorConfig);
this.enableSchemaEvolution =
this.enableSchematization
&& this.conn != null
&& this.conn.hasSchemaEvolutionPermission(
tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE));
&& (!autoSchematization ||
this.conn.hasSchemaEvolutionPermission(
tableName, sfConnectorConfig.get(SNOWFLAKE_ROLE)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class RecordService extends EnableLogging {
static final String HEADERS = "headers";

private boolean enableSchematization = false;
private boolean autoSchematization = true;

// For each task, we require a separate instance of SimpleDataFormat, since they are not
// inherently thread safe
Expand Down Expand Up @@ -120,6 +121,24 @@ public boolean setAndGetEnableSchematizationFromConfig(
return this.enableSchematization;
}

/**
* extract autoSchematization from the connector config and set the value for the recordService
*
* <p>The extracted boolean is returned for external usage.
*
* @param connectorConfig the connector config map
* @return a boolean indicating whether schematization is enabled
*/
public boolean setAndGetAutoSchematizationFromConfig(
final Map<String, String> connectorConfig) {
if (connectorConfig.containsKey(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG)) {
this.autoSchematization =
Boolean.parseBoolean(
connectorConfig.get(SnowflakeSinkConnectorConfig.SCHEMATIZATION_AUTO_CONFIG));
}
return this.autoSchematization;
}

/**
* Directly set the enableSchematization through param
*
Expand Down
8 changes: 8 additions & 0 deletions src/test/java/com/snowflake/kafka/connector/UtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public void testTableName() {
assert Utils.tableName(topic, topic2table).equals("_12345_" + Math.abs(topic.hashCode()));
}

@Test
public void test_generateValidName() {
Map<String, String> topic2table = Utils.parseTopicToTableMap("ab@cd:abcd, REGEX_MATCHER>^[^.]\\w+.\\w+.(.*):$1, 1234:_1234");

assert Utils.tableName("postgres.public.testtbl", topic2table).equals("testtbl");
}


@Test
public void testTableFullName() {
assert Utils.isValidSnowflakeTableName("_1342dfsaf$");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
avroConverter.configure(
Collections.singletonMap("schema.registry.url", "http://fake-url"), false);
byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original);
conn.createTableWithOnlyMetadataColumn(table);
conn.createTableWithOnlyMetadataColumn(table, true);

SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted);

Expand Down Expand Up @@ -1024,7 +1024,7 @@ public void testSchematizationWithTableCreationAndJsonInput() throws Exception {
JsonConverter jsonConverter = new JsonConverter();
jsonConverter.configure(config, false);
byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original);
conn.createTableWithOnlyMetadataColumn(table);
conn.createTableWithOnlyMetadataColumn(table, true);

SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted);

Expand Down Expand Up @@ -1084,7 +1084,7 @@ public void testSchematizationSchemaEvolutionWithNonNullableColumn() throws Exce
JsonConverter jsonConverter = new JsonConverter();
jsonConverter.configure(config, false);
byte[] converted = jsonConverter.fromConnectData(topic, original.schema(), original);
conn.createTableWithOnlyMetadataColumn(table);
conn.createTableWithOnlyMetadataColumn(table, true);
createNonNullableColumn(table, "id_int8_non_nullable");

SchemaAndValue jsonInputValue = jsonConverter.toConnectData(topic, converted);
Expand Down