-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Polling efficiency #378
Polling efficiency #378
Conversation
bb477b3
to
b6f2d08
Compare
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java
Show resolved
Hide resolved
...ce-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClientTest.java
Outdated
Show resolved
Hide resolved
...urce-connector/src/test/java/io/aiven/kafka/connect/s3/source/utils/RecordProcessorTest.java
Outdated
Show resolved
Hide resolved
@@ -0,0 +1,439 @@ | |||
/* | |||
* Copyright 2024 Aiven Oy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably 2025 now
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/S3ObjectIterator.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/AWSV2SourceClient.java
Outdated
Show resolved
Hide resolved
df11418
to
af45037
Compare
af45037
to
be775b0
Compare
...connector/src/integration-test/java/io/aiven/kafka/connect/s3/source/AwsIntegrationTest.java
Show resolved
Hide resolved
throw exception; | ||
} | ||
} else { | ||
// TODO validate that the iterator does not lose an S3Object. Add test to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3ObjectIterator is gone now, although we do have a test for rehydration now.
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/test/java/io/aiven/kafka/connect/s3/source/S3SourceTaskTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just two minor things, The rest looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review, on the abstract source task. Key question is about the defined poll
API and returning null and periodically returning control to the framework.
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public final List<SourceRecord> poll() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarification: https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll--
This function does comply with the defined API and returns control from time to time to the caller. The question and difference to the defined API is that this does not return null
, but returns empty list when there is no data available.
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Outdated
Show resolved
Hide resolved
s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/S3SourceTask.java
Outdated
Show resolved
Hide resolved
...rce-connector/src/main/java/io/aiven/kafka/connect/s3/source/utils/SourceRecordIterator.java
Outdated
Show resolved
Hide resolved
logger.error("Error during poll(): {}", e.getMessage(), e); | ||
if (config.getErrorsTolerance() == ErrorsTolerance.NONE) { | ||
logger.error("Stopping Task"); | ||
return null; // NOPMD must return null in this case. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on this
"Poll this source task for new records. If no data is currently available, this method should block but return control to the caller regularly (by returning null) in order for the task to transition to the PAUSED state if requested to do so.
The task will be stopped on a separate thread, and when that happens this method is expected to unblock, quickly finish up any remaining processing, and return.
Returns:
a list of source records"
returning null will not stop the task. It returns control and says no recs are available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code change to rethrow the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one minor thing clarify on poll / returning null.
…tractSourceTask.java Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com>
…tractSourceTask.java Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com>
…tractSourceTask.java Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com>
…urce/S3SourceTask.java Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com>
…tractSourceTask.java Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com>
…urce/S3SourceTask.java Co-authored-by: Jarkko Jaakola <91882676+jjaakola-aiven@users.noreply.github.com>
…urce/utils/SourceRecordIterator.java Co-authored-by: Murali Basani <murali.basani@gmail.com>
…y' into polling_efficiency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concerns addressed.
commons/src/main/java/io/aiven/kafka/connect/common/source/AbstractSourceTask.java
Show resolved
Hide resolved
} catch (RuntimeException e) { // NOPMD must catch runtime here. | ||
logger.error("Error during poll(): {}", e.getMessage(), e); | ||
if (config.getErrorsTolerance() == ErrorsTolerance.NONE) { | ||
logger.error("Stopping Task"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Claudenw Is this addressed ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it now re-throws the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @Claudenw for the changes
Fixes for KCON-26 - Backoff when no data available.
Fixes for KCON-28 - Improve poll method
Creates an AbstractSourceTask in commons to handle response to poll and backoff calculations as well as start, stop. Implementations need to implement an Iterator that poll will call to retrieve data.
Private classes Timer and Backoff are created in AbstractSourceTask and may be moved out at a later date if needed elsewhere.
Changes made to configurations to support configuration extraction in AbstractSourceTask.
Modifications to S3SourceTask to operate under AbstractSourceTask.
Additional tests added