-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes based on review of PR 317 #329
Conversation
return results; | ||
} | ||
|
||
while (!connectorStopped.get()) { | ||
try { | ||
LOGGER.info("Number of records sent {}", extractSourceRecords(results).size()); | ||
final int sizeOfExtractedRecs = extractSourceRecords(results).size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can get away without the extra variable as results is mutated in extract records so something like the below might be a little cleaner.
extractSourceRecords(results);
LOGGER.info("Number of records extracted and sent: {}", results.size());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes. Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM Thank you.
return null; // NOPMD | ||
} | ||
} catch (DataException exception) { | ||
LOGGER.warn("DataException occurred during polling. No retries will be attempted.", exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this correct? The way I read it, it will immediately retry the while loop without waiting (both before and after the change).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are ignoring the exception here with a warning. We shall revisit this section in handle malformed records ticket I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation! 👍
@@ -59,6 +59,8 @@ | |||
public interface IntegrationBase { | |||
|
|||
String DOCKER_IMAGE_KAFKA = "confluentinc/cp-kafka:7.7.0"; | |||
String PLUGINS_S_3_SOURCE_CONNECTOR_FOR_APACHE_KAFKA = "plugins/s3-source-connector-for-apache-kafka/"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it really be S_3
? This even complicates search in the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah thats a fair point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will anyways have more reviews on parent pr. I will make sure this is addressed. There are a couple of more vars like these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then we will end up with a PR with hundreds(if not thousands) of comments?
Fixes based on review of #317