Skip to content

Commit

Permalink
SNOW-1649161 Handle NaN value in schematization
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-mbobowski committed Sep 5, 2024
1 parent 56f96bb commit bcf5125
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ArrayNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.data.ConnectSchema;
Expand Down Expand Up @@ -277,7 +278,7 @@ private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node)
} else if (columnNode.isNull()) {
columnValue = null;
} else {
columnValue = MAPPER.writeValueAsString(columnNode);
columnValue = writeValueAsStringOrNan(columnNode);
}
// while the value is always dumped into a string, the Streaming Ingest SDK
// will transform the value according to its type in the table
Expand All @@ -291,6 +292,14 @@ private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node)
return streamingIngestRow;
}

private String writeValueAsStringOrNan(JsonNode columnNode) throws JsonProcessingException {
if (columnNode instanceof NumericNode && ((NumericNode) columnNode).isNaN()) {
return "NaN";
} else {
return MAPPER.writeValueAsString(columnNode);
}
}

/** For now there are two columns one is content and other is metadata. Both are Json */
private static class SnowflakeTableRow {
// This can be a JsonNode but we will keep this as is.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class SchematizationTestUtils {
CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT32", 42L);
CONTENT_FOR_AVRO_TABLE_CREATION.put("ID_INT64", 42L);
CONTENT_FOR_AVRO_TABLE_CREATION.put("FIRST_NAME", "zekai");
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", Float.NaN);
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true);
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.provider.Arguments;

public class TestUtils {
Expand Down Expand Up @@ -879,14 +880,27 @@ public static void checkTableContentOneRow(String tableName, Map<String, Object>
if ("RECORD_METADATA_PLACE_HOLDER".equals(contentMap.get(columnName))) {
continue;
}
assert value.equals(contentMap.get(columnName))
: "expected: " + contentMap.get(columnName) + " actual: " + value;
if (value instanceof Double) {
assertDouble((Double) value, contentMap.get(columnName));
} else {
assert value.equals(contentMap.get(columnName))
: "expected: " + contentMap.get(columnName) + " actual: " + value;
}
} else {
assert contentMap.get(columnName) == null : "value should be null";
}
}
}

private static void assertDouble(Double actual, Object expected) {
if (expected instanceof Double) {
Assertions.assertEquals(actual, (Double) expected);
}
if (expected instanceof Float) {
Assertions.assertEquals(actual, Double.valueOf(expected.toString()));
}
}

public static SnowflakeStreamingIngestClient createStreamingClient(
Map<String, String> config, String clientName) {
Properties clientProperties = new Properties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ public void testSchematizationWithTableCreationAndAvroInput(boolean useSingleBuf
.put("ID_INT32", 42)
.put("id_int64", 42L)
.put("first_name", "zekai")
.put("rating_float32", 0.99f)
.put("rating_float32", Float.NaN)
.put("rating_float64", 0.99d)
.put("approval", true)
.put("info_array_string", Arrays.asList("a", "b"))
Expand Down
1 change: 0 additions & 1 deletion test/run_test_confluent.sh
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,5 @@ if [ $testError -ne 0 ]; then
RED='\033[0;31m'
NC='\033[0m' # No Color
echo -e "${RED} There is error above this line ${NC}"
cat $APACHE_LOG_PATH/kc.log
error_exit "=== test_verify.py failed ==="
fi
8 changes: 5 additions & 3 deletions test/test_suit/test_avrosr_avrosr.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ def __init__(self, driver, nameSalt):
"fields":[
{"name":"id","type":"int"},
{"name":"firstName","type":"string"},
{"name":"time","type":"int"}
{"name":"time","type":"int"},
{"name":"someFloat","type":"float"},
{"name":"someFloatNaN","type":"float"}
]
}
"""
Expand All @@ -41,7 +43,7 @@ def send(self):
for e in range(100):
# avro data must follow the schema defined in ValueSchemaStr
key.append({"id": e})
value.append({"id": e, "firstName": "abc0", "time": 1835})
value.append({"id": e, "firstName": "abc0", "time": 1835, "someFloat": 21.37, "someFloatNaN": "NaN"})
self.driver.sendAvroSRData(
self.topic, value, self.valueSchema, key, self.keySchema)

Expand All @@ -58,7 +60,7 @@ def verify(self, round):
"Select * from {} limit 1".format(self.topic)).fetchone()
goldMeta = r'{"CreateTime":\d*,"key":{"id":0},"key_schema_id":\d*,"offset":0,"partition":0,"schema_id":\d*,' \
r'"topic":"travis_correct_avrosr_avrosr_\w*"}'
goldContent = r'{"firstName":"abc0","id":0,"time":1835}'
goldContent = r'{"firstName":"abc0","id":0,"someFloat":21.37,"someFloatNaN":"NaN","time":1835}'
self.driver.regexMatchOneLine(res, goldMeta, goldContent)

self.driver.verifyStageIsCleaned(self.topic)
Expand Down
6 changes: 4 additions & 2 deletions test/test_suit/test_schema_evolution_avro_sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __init__(self, driver, nameSalt):
self.records.append({
'PERFORMANCE_STRING': 'Excellent',
'RATING_DOUBLE': 0.99,
'APPROVAL': True
'APPROVAL': True,
'SOME_FLOAT_NAN': "NaN"
})

self.ValueSchemaStr = []
Expand All @@ -56,7 +57,8 @@ def __init__(self, driver, nameSalt):
"fields":[
{"name":"RATING_DOUBLE","type":"float"},
{"name":"PERFORMANCE_STRING","type":"string"},
{"name":"APPROVAL","type":"boolean"}
{"name":"APPROVAL","type":"boolean"},
{"name":"SOME_FLOAT_NAN","type":"float"}
]
}
""")
Expand Down
12 changes: 4 additions & 8 deletions test/test_suit/test_snowpipe_streaming_string_avro_sr.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ def __init__(self, driver, nameSalt):
"fields":[
{"name":"id","type":"int"},
{"name":"firstName","type":"string"},
{"name":"time","type":"int"}
{"name":"time","type":"int"},
{"name":"someFloat","type":"float"},
{"name":"someFloatNaN","type":"float"}
]
}
"""
Expand All @@ -34,19 +36,13 @@ def getConfigFileName(self):
return self.fileName + ".json"

def send(self):
# create topic with n partitions and only one replication factor
print("Partition count:" + str(self.partitionNum))
print("Topic:", self.topic)

self.driver.describeTopic(self.topic)

for p in range(self.partitionNum):
print("Sending in Partition:" + str(p))
key = []
value = []
value = []
for e in range(self.recordNum):
value.append({"id": e, "firstName": "abc0", "time": 1835})
value.append({"id": e, "firstName": "abc0", "time": 1835, "someFloat": 21.37, "someFloatNaN": "NaN"})
self.driver.sendAvroSRData(self.topic, value, self.valueSchema, key=[], key_schema="", partition=p)
sleep(2)

Expand Down
2 changes: 1 addition & 1 deletion test/test_suites.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS
test_instance=TestSnowpipeStreamingStringJsonDLQ(driver, nameSalt), clean=True, run_in_confluent=True,
run_in_apache=True
)),
("TestSnowpipeStreamingStringAvro", EndToEndTestSuite(
("TestSnowpipeStreamingStringAvroSR", EndToEndTestSuite(
test_instance=TestSnowpipeStreamingStringAvroSR(driver, nameSalt), clean=True, run_in_confluent=True,
run_in_apache=False
)),
Expand Down

0 comments on commit bcf5125

Please sign in to comment.