-
Notifications
You must be signed in to change notification settings - Fork 98
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-1161484: Use insertRows instead of insertRow for schematization #796
Conversation
@@ -1059,7 +1059,7 @@ private SnowflakeStreamingIngestChannel openChannelForTable() { | |||
.setDBName(this.sfConnectorConfig.get(Utils.SF_DATABASE)) | |||
.setSchemaName(this.sfConnectorConfig.get(Utils.SF_SCHEMA)) | |||
.setTableName(this.tableName) | |||
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE) | |||
.setOnErrorOption(OpenChannelRequest.OnErrorOption.SKIP_BATCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a BCR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, there is no behavior difference as far as customer is concerned
Could you consider using the checklist in PR? Let me know if you want to change something in the template. Also is SKIP_BATCH needed for schematization or is needed in general? If needed in general, I would like if it be separated from this PR. |
I would merge this into the next release if it is ready but up to you |
private StreamingBuffer rebuildBufferWithoutErrorRows( | ||
StreamingBuffer streamingBufferToInsert, | ||
List<InsertValidationResponse.InsertError> insertErrors) { | ||
StreamingBuffer buffer = new StreamingBuffer(); | ||
int errorIdx = 0; | ||
for (long rowIdx = 0; rowIdx < streamingBufferToInsert.getNumOfRecords(); rowIdx++) { | ||
if (errorIdx < insertErrors.size() && rowIdx == insertErrors.get(errorIdx).getRowIndex()) { | ||
errorIdx++; | ||
} else { | ||
buffer.insert(streamingBufferToInsert.getSinkRecord(rowIdx)); | ||
} | ||
} | ||
return buffer; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this result into a validation error you added recently?
@@ -576,6 +584,22 @@ InsertRowsResponse insertBufferedRecords(StreamingBuffer streamingBufferToInsert | |||
return response; | |||
} | |||
|
|||
/** Building a new buffer which contains only the good rows from the original buffer */ | |||
private StreamingBuffer rebuildBufferWithoutErrorRows( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a test coverage for this new method?
Assert.assertTrue(topicPartitionChannel.isPartitionBufferEmpty()); | ||
Assert.assertEquals(0, kafkaRecordErrorReporter.getReportedRecords().size()); | ||
|
||
// Do it again without any schematization error, and we should have row in DLQ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am confused about the comment, do you mean not have a row in DLQ if there is no schematization error?
or do you mean to verify the DLQ results something along this lines? Assert.assertEquals(>1, kafkaRecordErrorReporter.getReportedRecords().size());?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am worried about this change not having enough tests, do you feel existing tests are enough?
Also, highly encourage for atleast important changes to fill out PR checklist. Thanks
talked offline, dont need to split PRs, SKIP_BATCH is needed and there is no behavior change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left couple of comments, I dont have any more concerns, will approve in next iteration! Thank you, this will lower schematization latency 🥇
@sfc-gh-rcheng @sfc-gh-tzhang this should be merged for 2.2.2 release |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also a little concerned about this big of a behavior change without additional tests, but looks like @sfc-gh-japatel discussed this with toby offline.
With the latest, ingest-sdk merged. We should be able to update the ingest-sdk version and merge this PR? |
@sfc-gh-tzhang ingest-sdk update to 2.1.1 has been merged. Can this be merged now too? |
if (extraColNames == null && nonNullableColumns == null) { | ||
InsertValidationResponse.InsertError newInsertError = | ||
new InsertValidationResponse.InsertError( | ||
insertError.getRowContent(), originalSinkRecordIdx); | ||
newInsertError.setException(insertError.getException()); | ||
newInsertError.setExtraColNames(insertError.getExtraColNames()); | ||
newInsertError.setMissingNotNullColNames(insertError.getMissingNotNullColNames()); | ||
// Simply added to the final response if it's not schema related errors | ||
finalResponse.addError(insertError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-tzhang why you removed this part? IMO we still need it to properly add the rows to the rebuilt buffer and also send only the non-schema errors into DLQ
Closed in favor of #866 due to merge conflicts |
Overview
SNOW-1161484
- Better performance
- If KC is configured with a longer buffer flush time, everything will be ingested as one batch so that the 1 second flush or 32MB channel size won't apply
Pre-review checklist
snowflake.ingestion.method
.Yes
- Added end to end and Unit Tests.No
- Suggest why it is not param protected