diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java index 2323703c6..b2eec0bbb 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SchematizationTestUtils.java @@ -16,6 +16,7 @@ public class SchematizationTestUtils { SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT64", "NUMBER"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("FIRST_NAME", "VARCHAR"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT"); + SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("FLOAT_NAN", "FLOAT"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN"); SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY"); @@ -53,7 +54,8 @@ public class SchematizationTestUtils { CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT32", 42L); CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT64", 42L); CONTENT_FOR_AVRO_TABLE_CREATION.put("FIRST_NAME", "zekai"); - CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", Float.NaN); + CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99); + CONTENT_FOR_AVRO_TABLE_CREATION.put("FLOAT_NAN", Float.NaN); CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99); CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true); CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]"); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index e0dfa6a58..b1deb9644 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -884,7 +884,7 @@ public static void checkTableContentOneRow(String tableName, Map assertDouble((Double) value, contentMap.get(columnName)); } else { assert value.equals(contentMap.get(columnName)) - : "expected: " + contentMap.get(columnName) + " actual: " + value; + : "expected: " + contentMap.get(columnName) + " actual: " + value; } } else { assert contentMap.get(columnName) == null : "value should be null"; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java new file mode 100644 index 000000000..d6d4ed096 --- /dev/null +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2AvroSchematizationIT.java @@ -0,0 +1,184 @@ +package com.snowflake.kafka.connector.internal.streaming; + +import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; +import static org.awaitility.Awaitility.await; + +import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; +import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; +import com.snowflake.kafka.connector.internal.SchematizationTestUtils; +import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; +import com.snowflake.kafka.connector.internal.SnowflakeSinkService; +import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory; +import com.snowflake.kafka.connector.internal.TestUtils; +import io.confluent.connect.avro.AvroConverter; +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class SnowflakeSinkServiceV2AvroSchematizationIT { + + private static final int PARTITION = 0; + private static final int START_OFFSET = 0; + + private String table; + private SnowflakeConnectionService conn; + private String topic; + private TopicPartition topicPartition; + + private SnowflakeSinkService service; + + @BeforeEach + void before() { + table = TestUtils.randomTableName(); + topic = table; + conn = TestUtils.getConnectionServiceForStreaming(); + topicPartition = new TopicPartition(topic, PARTITION); + } + + @AfterEach + void after() { + service.closeAll(); + } + + @ParameterizedTest(name = "useSingleBuffer: {0}") + @MethodSource("singleBufferParameters") + public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer) + throws Exception { + // given + conn.createTableWithOnlyMetadataColumn(table); + SinkRecord avroRecordValue = createSinkRecord(); + service = createService(useSingleBuffer); + + // when + // The first insert should fail and schema evolution will kick in to update the schema + service.insert(Collections.singletonList(avroRecordValue)); + + // then + waitUntilOffsetEquals(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE); + TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION); + + // when + // Retry the insert should succeed now with the updated schema + service.insert(Collections.singletonList(avroRecordValue)); + + // then + waitUntilOffsetEquals(START_OFFSET + 1); + TestUtils.checkTableContentOneRow( + table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION); + } + + private SnowflakeSinkService createService(boolean useSingleBuffer) { + Map config = prepareConfig(useSingleBuffer); + return SnowflakeSinkServiceFactory.builder( + conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) + .addTask(table, new TopicPartition(topic, PARTITION)) + .build(); + } + + private SinkRecord createSinkRecord() { + Schema schema = prepareSchema(); + Struct data = prepareData(schema); + AvroConverter avroConverter = prepareAvroConverter(); + + byte[] converted = avroConverter.fromConnectData(topic, data.schema(), data); + conn.createTableWithOnlyMetadataColumn(table); + + SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); + + return new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + avroInputValue.schema(), + avroInputValue.value(), + START_OFFSET); + } + + private AvroConverter prepareAvroConverter() { + SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); + AvroConverter avroConverter = new AvroConverter(schemaRegistry); + avroConverter.configure( + Collections.singletonMap("schema.registry.url", "http://fake-url"), false); + return avroConverter; + } + + private Map prepareConfig(boolean useSingleBuffer) { + Map config = TestUtils.getConfForStreaming(useSingleBuffer); + config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); + config.put( + SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, + "io.confluent.connect.avro.AvroConverter"); + config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url"); + SnowflakeSinkConnectorConfig.setDefaultValues(config); + return config; + } + + private Schema prepareSchema() { + SchemaBuilder schemaBuilder = + SchemaBuilder.struct() + .field("id_int8", Schema.INT8_SCHEMA) + .field("id_int8_optional", Schema.OPTIONAL_INT8_SCHEMA) + .field("id_int16", Schema.INT16_SCHEMA) + .field("ID_INT32", Schema.INT32_SCHEMA) + .field("id_int64", Schema.INT64_SCHEMA) + .field("first_name", Schema.STRING_SCHEMA) + .field("rating_float32", Schema.FLOAT32_SCHEMA) + .field("float_nan", Schema.FLOAT32_SCHEMA) + .field("rating_float64", Schema.FLOAT64_SCHEMA) + .field("approval", Schema.BOOLEAN_SCHEMA) + .field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build()) + .field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) + .field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) + .field( + "info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build()); + return schemaBuilder.build(); + } + + private Struct prepareData(Schema schema) { + return new Struct(schema) + .put("id_int8", (byte) 0) + .put("id_int16", (short) 42) + .put("ID_INT32", 42) + .put("id_int64", 42L) + .put("first_name", "zekai") + .put("rating_float32", 0.99f) + .put("float_nan", Float.NaN) + .put("rating_float64", 0.99d) + .put("approval", true) + .put("info_array_string", Arrays.asList("a", "b")) + .put("info_array_int", Arrays.asList(1, 2)) + .put( + "info_array_json", + Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}")) + .put("info_map", Collections.singletonMap("field", 3)); + } + + private static Stream singleBufferParameters() { + return Stream.of(Arguments.of(false), Arguments.of(true)); + } + + private void waitUntilOffsetEquals(long expectedOffset) { + await() + .timeout(Duration.ofSeconds(60)) + .until(() -> service.getOffset(new TopicPartition(topic, PARTITION)) == expectedOffset); + } +} diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index c07bd78b2..3f23a4fa6 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -1256,105 +1256,6 @@ public void testStreamingIngestionWithExactlyOnceSemanticsOverlappingOffsets( service2.closeAll(); } - @ParameterizedTest(name = "useSingleBuffer: {0}") - @MethodSource("singleBufferParameters") - public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer) - throws Exception { - conn = getConn(false); - Map config = TestUtils.getConfForStreaming(useSingleBuffer); - config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); - config.put( - SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD, - "io.confluent.connect.avro.AvroConverter"); - config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url"); - // get rid of these at the end - SnowflakeSinkConnectorConfig.setDefaultValues(config); - // avro - SchemaBuilder schemaBuilder = - SchemaBuilder.struct() - .field("id_int8", Schema.INT8_SCHEMA) - .field("id_int8_optional", Schema.OPTIONAL_INT8_SCHEMA) - .field("id_int16", Schema.INT16_SCHEMA) - .field("ID_INT32", Schema.INT32_SCHEMA) - .field("id_int64", Schema.INT64_SCHEMA) - .field("first_name", Schema.STRING_SCHEMA) - .field("rating_float32", Schema.FLOAT32_SCHEMA) - .field("rating_float64", Schema.FLOAT64_SCHEMA) - .field("approval", Schema.BOOLEAN_SCHEMA) - .field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build()) - .field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build()) - .field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build()) - .field( - "info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build()); - - Struct original = - new Struct(schemaBuilder.build()) - .put("id_int8", (byte) 0) - .put("id_int16", (short) 42) - .put("ID_INT32", 42) - .put("id_int64", 42L) - .put("first_name", "zekai") - .put("rating_float32", Float.NaN) - .put("rating_float64", 0.99d) - .put("approval", true) - .put("info_array_string", Arrays.asList("a", "b")) - .put("info_array_int", Arrays.asList(1, 2)) - .put( - "info_array_json", - Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}")) - .put("info_map", Collections.singletonMap("field", 3)); - - SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient(); - AvroConverter avroConverter = new AvroConverter(schemaRegistry); - avroConverter.configure( - Collections.singletonMap("schema.registry.url", "http://fake-url"), false); - byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original); - conn.createTableWithOnlyMetadataColumn(table); - - SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted); - - long startOffset = 0; - - SinkRecord avroRecordValue = - new SinkRecord( - topic, - partition, - Schema.STRING_SCHEMA, - "test", - avroInputValue.schema(), - avroInputValue.value(), - startOffset); - - SnowflakeSinkService service = - SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .setRecordNumber(1) - .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) - .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) - .addTask(table, new TopicPartition(topic, partition)) - .build(); - - // The first insert should fail and schema evolution will kick in to update the schema - service.insert(Collections.singletonList(avroRecordValue)); - TestUtils.assertWithRetry( - () -> - service.getOffset(new TopicPartition(topic, partition)) - == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE, - 20, - 5); - - TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION); - - // Retry the insert should succeed now with the updated schema - service.insert(Collections.singletonList(avroRecordValue)); - TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5); - - TestUtils.checkTableContentOneRow( - table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION); - - service.closeAll(); - } - @ParameterizedTest(name = "useSingleBuffer: {0}") @MethodSource("singleBufferParameters") public void testSchematizationWithTableCreationAndJsonInput(boolean useSingleBuffer) diff --git a/test/test_suit/test_schema_evolution_avro_sr.py b/test/test_suit/test_schema_evolution_avro_sr.py index 0f8a997e6..cce02574a 100644 --- a/test/test_suit/test_schema_evolution_avro_sr.py +++ b/test/test_suit/test_schema_evolution_avro_sr.py @@ -69,7 +69,8 @@ def __init__(self, driver, nameSalt): 'RATING_INT': 'NUMBER', 'RATING_DOUBLE': 'FLOAT', 'APPROVAL': 'BOOLEAN', - 'RECORD_METADATA': 'VARIANT' + 'SOME_FLOAT_NAN': 'FLOAT', + 'RECORD_METADATA': 'VARIANT', } self.gold_columns = [columnName for columnName in self.gold_type]