Skip to content

Commit

Permalink
NO-SNOW: Preserve the old data type that goes into an ARRAY column fo…
Browse files Browse the repository at this point in the history
…r Schematization (snowflakedb#730)

Before this change, every element in the array will be added as a STRING, this change preserves the old data type in the source, for example when the input is [1, 2], the ingested value will be [1, 2] now instead of ["1", "2"]

Forked from snowflakedb#727, with additional tests
  • Loading branch information
sfc-gh-tzhang authored and khsoneji committed Dec 4, 2023
1 parent 1218471 commit 227251c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -287,14 +285,7 @@ private Map<String, Object> getMapFromJsonNodeForStreamingIngest(JsonNode node)
String columnName = columnNames.next();
JsonNode columnNode = node.get(columnName);
Object columnValue;
if (columnNode.isArray()) {
List<String> itemList = new ArrayList<>();
ArrayNode arrayNode = (ArrayNode) columnNode;
for (JsonNode e : arrayNode) {
itemList.add(e.isTextual() ? e.textValue() : MAPPER.writeValueAsString(e));
}
columnValue = itemList;
} else if (columnNode.isTextual()) {
if (columnNode.isTextual()) {
columnValue = columnNode.textValue();
} else if (columnNode.isNull()) {
columnValue = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ public class SchematizationTestUtils {
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT32", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RATING_FLOAT64", "FLOAT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("APPROVAL", "BOOLEAN");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_STRING", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_INT", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_ARRAY_JSON", "ARRAY");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("INFO_MAP", "VARIANT");
SF_AVRO_SCHEMA_FOR_TABLE_CREATION.put("RECORD_METADATA", "VARIANT");
}
Expand Down Expand Up @@ -54,7 +56,11 @@ public class SchematizationTestUtils {
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT32", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("RATING_FLOAT64", 0.99);
CONTENT_FOR_AVRO_TABLE_CREATION.put("APPROVAL", true);
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY", "[\"a\",\"b\"]");
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_STRING", "[\"a\",\"b\"]");
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_ARRAY_INT", "[1,2]");
CONTENT_FOR_AVRO_TABLE_CREATION.put(
"INFO_ARRAY_JSON",
"[null,\"{\\\"a\\\":1,\\\"b\\\":null,\\\"c\\\":null,\\\"d\\\":\\\"89asda9s0a\\\"}\"]");
CONTENT_FOR_AVRO_TABLE_CREATION.put("INFO_MAP", "{\"field\":3}");
CONTENT_FOR_AVRO_TABLE_CREATION.put("RECORD_METADATA", "RECORD_METADATA_PLACE_HOLDER");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,9 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
.field("rating_float32", Schema.FLOAT32_SCHEMA)
.field("rating_float64", Schema.FLOAT64_SCHEMA)
.field("approval", Schema.BOOLEAN_SCHEMA)
.field("info_array", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("info_array_string", SchemaBuilder.array(Schema.STRING_SCHEMA).build())
.field("info_array_int", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
.field("info_array_json", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).build())
.field(
"info_map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build());

Expand All @@ -1244,7 +1246,11 @@ public void testSchematizationWithTableCreationAndAvroInput() throws Exception {
.put("rating_float32", 0.99f)
.put("rating_float64", 0.99d)
.put("approval", true)
.put("info_array", Arrays.asList("a", "b"))
.put("info_array_string", Arrays.asList("a", "b"))
.put("info_array_int", Arrays.asList(1, 2))
.put(
"info_array_json",
Arrays.asList(null, "{\"a\": 1, \"b\": null, \"c\": null, \"d\": \"89asda9s0a\"}"))
.put("info_map", Collections.singletonMap("field", 3));

SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,26 @@ public void testSchematizationStringField() throws JsonProcessingException {
assert got.get("\"ANSWER\"").equals("42");
}

@Test
public void testSchematizationArrayOfObject() throws JsonProcessingException {
RecordService service = new RecordService();
SnowflakeJsonConverter jsonConverter = new SnowflakeJsonConverter();

service.setEnableSchematization(true);
String value =
"{\"players\":[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}";
byte[] valueContents = (value).getBytes(StandardCharsets.UTF_8);
SchemaAndValue sv = jsonConverter.toConnectData(topic, valueContents);

SinkRecord record =
new SinkRecord(
topic, partition, Schema.STRING_SCHEMA, "string", sv.schema(), sv.value(), partition);

Map<String, Object> got = service.getProcessedRecordForStreamingIngest(record);
assert got.get("\"PLAYERS\"")
.equals("[{\"name\":\"John Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]");
}

@Test
public void testColumnNameFormatting() throws JsonProcessingException {
RecordService service = new RecordService();
Expand Down

0 comments on commit 227251c

Please sign in to comment.