-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes based on review of PR 317 #329
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,10 +39,10 @@ | |
import io.aiven.kafka.connect.s3.source.config.S3SourceConfig; | ||
import io.aiven.kafka.connect.s3.source.input.Transformer; | ||
import io.aiven.kafka.connect.s3.source.input.TransformerFactory; | ||
import io.aiven.kafka.connect.s3.source.utils.AivenS3SourceRecord; | ||
import io.aiven.kafka.connect.s3.source.utils.FileReader; | ||
import io.aiven.kafka.connect.s3.source.utils.OffsetManager; | ||
import io.aiven.kafka.connect.s3.source.utils.RecordProcessor; | ||
import io.aiven.kafka.connect.s3.source.utils.S3SourceRecord; | ||
import io.aiven.kafka.connect.s3.source.utils.SourceRecordIterator; | ||
import io.aiven.kafka.connect.s3.source.utils.Version; | ||
|
||
|
@@ -72,7 +72,7 @@ public class S3SourceTask extends SourceTask { | |
private S3SourceConfig s3SourceConfig; | ||
private AmazonS3 s3Client; | ||
|
||
private Iterator<AivenS3SourceRecord> sourceRecordIterator; | ||
private Iterator<S3SourceRecord> sourceRecordIterator; | ||
private Optional<Converter> keyConverter; | ||
|
||
private Converter valueConverter; | ||
|
@@ -144,24 +144,35 @@ private void prepareReaderFromOffsetStorageReader() { | |
|
||
@Override | ||
public List<SourceRecord> poll() throws InterruptedException { | ||
LOGGER.info("Polling again"); | ||
LOGGER.info("Polling for new records..."); | ||
synchronized (pollLock) { | ||
final List<SourceRecord> results = new ArrayList<>(s3SourceConfig.getInt(MAX_POLL_RECORDS)); | ||
|
||
if (connectorStopped.get()) { | ||
LOGGER.info("Connector has been stopped. Returning empty result list."); | ||
return results; | ||
} | ||
|
||
while (!connectorStopped.get()) { | ||
try { | ||
LOGGER.info("Number of records sent {}", extractSourceRecords(results).size()); | ||
extractSourceRecords(results); | ||
LOGGER.info("Number of records extracted and sent: {}", results.size()); | ||
return results; | ||
} catch (AmazonS3Exception | DataException exception) { | ||
if (handleException(exception)) { | ||
} catch (AmazonS3Exception exception) { | ||
if (exception.isRetryable()) { | ||
LOGGER.warn("Retryable error encountered during polling. Waiting before retrying...", | ||
exception); | ||
pollLock.wait(ERROR_BACKOFF); | ||
|
||
prepareReaderFromOffsetStorageReader(); | ||
} else { | ||
LOGGER.warn("Non-retryable AmazonS3Exception occurred. Stopping polling.", exception); | ||
return null; // NOPMD | ||
} | ||
} catch (DataException exception) { | ||
LOGGER.warn("DataException occurred during polling. No retries will be attempted.", exception); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this correct? The way I read it, it will immediately retry the while loop without waiting (both before and after the change). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we are ignoring the exception here with a warning. We shall revisit this section in handle malformed records ticket I believe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation! 👍 |
||
} catch (final Throwable t) { // NOPMD | ||
// This task has failed, so close any resources (may be reopened if needed) before throwing | ||
LOGGER.error("Unexpected error encountered. Closing resources and stopping task.", t); | ||
closeResources(); | ||
throw t; | ||
} | ||
|
@@ -170,23 +181,6 @@ public List<SourceRecord> poll() throws InterruptedException { | |
} | ||
} | ||
|
||
private boolean handleException(final RuntimeException exception) throws InterruptedException { | ||
if (exception instanceof AmazonS3Exception) { | ||
if (((AmazonS3Exception) exception).isRetryable()) { | ||
LOGGER.warn("Retryable error while polling. Will sleep and try again.", exception); | ||
Thread.sleep(ERROR_BACKOFF); | ||
|
||
prepareReaderFromOffsetStorageReader(); | ||
} else { | ||
return true; | ||
} | ||
} | ||
if (exception instanceof DataException) { | ||
LOGGER.warn("DataException. Will NOT try again.", exception); | ||
} | ||
return false; | ||
} | ||
|
||
private List<SourceRecord> extractSourceRecords(final List<SourceRecord> results) throws InterruptedException { | ||
waitForObjects(); | ||
if (connectorStopped.get()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it really be
S_3
? This even complicates search in the code.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah thats a fair point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will anyways have more reviews on parent pr. I will make sure this is addressed. There are a couple of more vars like these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then we will end up with a PR with hundreds(if not thousands) of comments?