Skip to content

Commit

Permalink
SNOW-1649161 Rewrite test
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Sep 5, 2024
1 parent bcf5125 commit 9fcd013
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class SchematizationTestUtils {
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("ID_INT64", "NUMBER");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("FIRST_NAME", "VARCHAR");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("FLOAT_NAN", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY");
Expand Down Expand Up @@ -53,7 +54,8 @@ public class SchematizationTestUtils {
CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT32", 42L);
CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT64", 42L);
CONTENT_FOR_AVRO_TABLE_CREATION.put("FIRST_NAME", "zekai");
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", Float.NaN);
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("FLOAT_NAN", Float.NaN);
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true);
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ public static void checkTableContentOneRow(String tableName, Map<String, Object>
assertDouble((Double) value, contentMap.get(columnName));
} else {
assert value.equals(contentMap.get(columnName))
: "expected: " + contentMap.get(columnName) + " actual: " + value;
: "expected: " + contentMap.get(columnName) + " actual: " + value;
}
} else {
assert contentMap.get(columnName) == null : "value should be null";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
import static org.awaitility.Awaitility.await;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.SchematizationTestUtils;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory;
import com.snowflake.kafka.connector.internal.TestUtils;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public class SnowflakeSinkServiceV2AvroSchematizationIT {

private static final int PARTITION = 0;
private static final int START_OFFSET = 0;

private String table;
private SnowflakeConnectionService conn;
private String topic;
private TopicPartition topicPartition;

private SnowflakeSinkService service;

@BeforeEach
void before() {
table = TestUtils.randomTableName();
topic = table;
conn = TestUtils.getConnectionServiceForStreaming();
topicPartition = new TopicPartition(topic, PARTITION);
}

@AfterEach
void after() {
service.closeAll();
}

@ParameterizedTest(name = "useSingleBuffer: {0}")
@MethodSource("singleBufferParameters")
public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer)
throws Exception {
// given
conn.createTableWithOnlyMetadataColumn(table);
SinkRecord avroRecordValue = createSinkRecord();
service = createService(useSingleBuffer);

// when
// The first insert should fail and schema evolution will kick in to update the schema
service.insert(Collections.singletonList(avroRecordValue));

// then
waitUntilOffsetEquals(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);
TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION);

// when
// Retry the insert should succeed now with the updated schema
service.insert(Collections.singletonList(avroRecordValue));

// then
waitUntilOffsetEquals(START_OFFSET + 1);
TestUtils.checkTableContentOneRow(
table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION);
}

private SnowflakeSinkService createService(boolean useSingleBuffer) {
Map<String, String> config = prepareConfig(useSingleBuffer);
return SnowflakeSinkServiceFactory.builder(
conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(table, new TopicPartition(topic, PARTITION))
.build();
}

private SinkRecord createSinkRecord() {
Schema schema = prepareSchema();
Struct data = prepareData(schema);
AvroConverter avroConverter = prepareAvroConverter();

byte[] converted = avroConverter.fromConnectData(topic, data.schema(), data);
conn.createTableWithOnlyMetadataColumn(table);

SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted);

return new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
avroInputValue.schema(),
avroInputValue.value(),
START_OFFSET);
}

private AvroConverter prepareAvroConverter() {
SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
AvroConverter avroConverter = new AvroConverter(schemaRegistry);
avroConverter.configure(
Collections.singletonMap("schema.registry.url", "http://fake-url"), false);
return avroConverter;
}

private Map<String, String> prepareConfig(boolean useSingleBuffer) {
Map<String, String> config = TestUtils.getConfForStreaming(useSingleBuffer);
config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"io.confluent.connect.avro.AvroConverter");
config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url");
SnowflakeSinkConnectorConfig.setDefaultValues(config);
return config;
}

private Schema prepareSchema() {
SchemaBuilder schemaBuilder =
SchemaBuilder.struct()
.field("id_int8", Schema.INT8_SCHEMA)
.field("id_int8_optional", Schema.OPTIONAL_INT8_SCHEMA)
.field("id_int16", Schema.INT16_SCHEMA)
.field("ID_INT32", Schema.INT32_SCHEMA)
.field("id_int64", Schema.INT64_SCHEMA)
.field("first_name", Schema.STRING_SCHEMA)
.field("rating_float32", Schema.FLOAT32_SCHEMA)
.field("float_nan", Schema.FLOAT32_SCHEMA)
.field("rating_float64", Schema.FLOAT64_SCHEMA)
.field("approval", Schema.BOOLEAN_SCHEMA)
.field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
.field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build())
.field(
"info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build());
return schemaBuilder.build();
}

private Struct prepareData(Schema schema) {
return new Struct(schema)
.put("id_int8", (byte) 0)
.put("id_int16", (short) 42)
.put("ID_INT32", 42)
.put("id_int64", 42L)
.put("first_name", "zekai")
.put("rating_float32", 0.99f)
.put("float_nan", Float.NaN)
.put("rating_float64", 0.99d)
.put("approval", true)
.put("info_array_string", Arrays.asList("a", "b"))
.put("info_array_int", Arrays.asList(1, 2))
.put(
"info_array_json",
Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}"))
.put("info_map", Collections.singletonMap("field", 3));
}

private static Stream<Arguments> singleBufferParameters() {
return Stream.of(Arguments.of(false), Arguments.of(true));
}

private void waitUntilOffsetEquals(long expectedOffset) {
await()
.timeout(Duration.ofSeconds(60))
.until(() -> service.getOffset(new TopicPartition(topic, PARTITION)) == expectedOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1256,105 +1256,6 @@ public void testStreamingIngestionWithExactlyOnceSemanticsOverlappingOffsets(
service2.closeAll();
}

@ParameterizedTest(name = "useSingleBuffer: {0}")
@MethodSource("singleBufferParameters")
public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuffer)
throws Exception {
conn = getConn(false);
Map<String, String> config = TestUtils.getConfForStreaming(useSingleBuffer);
config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true");
config.put(
SnowflakeSinkConnectorConfig.VALUE_CONVERTER_CONFIG_FIELD,
"io.confluent.connect.avro.AvroConverter");
config.put(SnowflakeSinkConnectorConfig.VALUE_SCHEMA_REGISTRY_CONFIG_FIELD, "http://fake-url");
// get rid of these at the end
SnowflakeSinkConnectorConfig.setDefaultValues(config);
// avro
SchemaBuilder schemaBuilder =
SchemaBuilder.struct()
.field("id_int8", Schema.INT8_SCHEMA)
.field("id_int8_optional", Schema.OPTIONAL_INT8_SCHEMA)
.field("id_int16", Schema.INT16_SCHEMA)
.field("ID_INT32", Schema.INT32_SCHEMA)
.field("id_int64", Schema.INT64_SCHEMA)
.field("first_name", Schema.STRING_SCHEMA)
.field("rating_float32", Schema.FLOAT32_SCHEMA)
.field("rating_float64", Schema.FLOAT64_SCHEMA)
.field("approval", Schema.BOOLEAN_SCHEMA)
.field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
.field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build())
.field(
"info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build());

Struct original =
new Struct(schemaBuilder.build())
.put("id_int8", (byte) 0)
.put("id_int16", (short) 42)
.put("ID_INT32", 42)
.put("id_int64", 42L)
.put("first_name", "zekai")
.put("rating_float32", Float.NaN)
.put("rating_float64", 0.99d)
.put("approval", true)
.put("info_array_string", Arrays.asList("a", "b"))
.put("info_array_int", Arrays.asList(1, 2))
.put(
"info_array_json",
Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}"))
.put("info_map", Collections.singletonMap("field", 3));

SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
AvroConverter avroConverter = new AvroConverter(schemaRegistry);
avroConverter.configure(
Collections.singletonMap("schema.registry.url", "http://fake-url"), false);
byte[] converted = avroConverter.fromConnectData(topic, original.schema(), original);
conn.createTableWithOnlyMetadataColumn(table);

SchemaAndValue avroInputValue = avroConverter.toConnectData(topic, converted);

long startOffset = 0;

SinkRecord avroRecordValue =
new SinkRecord(
topic,
partition,
Schema.STRING_SCHEMA,
"test",
avroInputValue.schema(),
avroInputValue.value(),
startOffset);

SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(table, new TopicPartition(topic, partition))
.build();

// The first insert should fail and schema evolution will kick in to update the schema
service.insert(Collections.singletonList(avroRecordValue));
TestUtils.assertWithRetry(
() ->
service.getOffset(new TopicPartition(topic, partition))
== NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE,
20,
5);

TestUtils.checkTableSchema(table, SchematizationTestUtils.SF_AVRO_SCHEMA_FOR_TABLE_CREATION);

// Retry the insert should succeed now with the updated schema
service.insert(Collections.singletonList(avroRecordValue));
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, partition)) == startOffset + 1, 20, 5);

TestUtils.checkTableContentOneRow(
table, SchematizationTestUtils.CONTENT_FOR_AVRO_TABLE_CREATION);

service.closeAll();
}

@ParameterizedTest(name = "useSingleBuffer: {0}")
@MethodSource("singleBufferParameters")
public void testSchematizationWithTableCreationAndJsonInput(boolean useSingleBuffer)
Expand Down
3 changes: 2 additions & 1 deletion test/test_suit/test_schema_evolution_avro_sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def __init__(self, driver, nameSalt):
'RATING_INT': 'NUMBER',
'RATING_DOUBLE': 'FLOAT',
'APPROVAL': 'BOOLEAN',
'RECORD_METADATA': 'VARIANT'
'SOME_FLOAT_NAN': 'FLOAT',
'RECORD_METADATA': 'VARIANT',
}

self.gold_columns = [columnName for columnName in self.gold_type]
Expand Down

0 comments on commit 9fcd013

Please sign in to comment.