-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add connector version in RECORD_METADATA #766
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLA signed.
Eng team for review
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/SnowflakeSinkConnectorConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/records/RecordService.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/records/SnowflakeMetadataConfig.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/records/SnowflakeMetadataConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some comments, we dont want behavior change so lets keep the flag default as false.
Thanks for contribution.
Thanks! @sfc-gh-japatel Good to merge |
assert result.get(META).has(RecordService.SF_CONNECTOR_VERSION); | ||
|
||
// test metadata configuration -- remove version | ||
metadataConfig = new SnowflakeMetadataConfig(versionConfig); | ||
service.setMetadataConfig(metadataConfig); | ||
result = mapper.readTree(service.getProcessedRecordForSnowpipe(record)); | ||
assert result.has(META); | ||
assert !result.get(META).has(RecordService.SF_CONNECTOR_VERSION); | ||
assert result.get(META).has(RecordService.OFFSET); | ||
assert result.get(META).has(RecordService.PARTITION); | ||
assert result.get(META).has(record.timestampType().name); | ||
assert result.get(META).has(RecordService.TOPIC); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something doesn add up in this test.
default is false right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. There was a bug and had to update the test. Fixed it in 1270dc6
@@ -114,6 +120,17 @@ public void testConfig() throws IOException { | |||
assert result.get(META).has(RecordService.PARTITION); | |||
assert result.get(META).has(record.timestampType().name); | |||
|
|||
// test metadata configuration -- remove version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is for default, can you add a test for what happens when you add true to the metadata version?
Also, once you are done, if you dont mind, I would like to add an e2e test here to verify if this works fine.
Then we can push the code. (I will open another PR where you will be seen as co-author)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just updated the test - something to note: Checking for default value is a bit different than rest of the metadata fields as they are enabled by default. Here I'm explicitly checking the value of sf connector version config to be "true" for it to enable, it's disabled by default. So this test already tests "what happens when you add true to the metadata version"
String value2 = | ||
"{\"cricket\":{\"team\":{\"MI\":{\"players\":[{\"name\":\"John" | ||
+ " Doe\",\"age\":30},{\"name\":\"Jane Doe\",\"age\":30}]}}}}"; | ||
byte[] valueContents = (value2).getBytes(StandardCharsets.UTF_8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need this?
@@ -163,6 +164,42 @@ public void testConfig() throws IOException { | |||
System.out.println("Config test success"); | |||
} | |||
|
|||
@Test | |||
public void testSfConnectorVersion() throws JsonProcessingException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is failing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left comments.
if (config.containsKey(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION) | ||
&& config | ||
.get(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION) | ||
.equals(SnowflakeSinkConnectorConfig.SNOWFLAKE_METADATA_DEFAULT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set this to true only when SNOWFLAKE_METADATA_SF_CONNECTOR_VERSION is true right? default is false now!
@cchandurkar, Can you confirm that you're working on a change? This issue became stale for a few months. |
This comes handy while comparing data after connector version upgrade. Especially useful if you want to identify rows impacted by a certain version of the connector that had a bug.