diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java index 67240956..2ece623b 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java @@ -347,8 +347,8 @@ public void validateCredentials() { } } else { final BasicAWSCredentials awsCredentials = getAwsCredentials(); - final AwsBasicCredentials awsV2Credentials = getAwsV2Credentials(); - if (awsCredentials == null && awsV2Credentials == null) { + final AwsBasicCredentials awsCredentialsV2 = getAwsCredentialsV2(); + if (awsCredentials == null && awsCredentialsV2 == null) { LOGGER.info( "Connector use {} as credential Provider, " + "when configuration for {{}, {}} OR {{}, {}} are absent", @@ -435,7 +435,7 @@ public BasicAWSCredentials getAwsCredentials() { return null; } - public AwsBasicCredentials getAwsV2Credentials() { + public AwsBasicCredentials getAwsCredentialsV2() { if (Objects.nonNull(cfg.getPassword(AWS_ACCESS_KEY_ID_CONFIG)) && Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) { @@ -467,7 +467,7 @@ public Region getAwsS3Region() { } } - public software.amazon.awssdk.regions.Region getAwsV2S3Region() { + public software.amazon.awssdk.regions.Region getAwsS3RegionV2() { // we have priority of properties if old one not set or both old and new one set // the new property value will be selected if (Objects.nonNull(cfg.getString(AWS_S3_REGION_CONFIG))) { @@ -515,7 +515,7 @@ public AWSCredentialsProvider getCustomCredentialsProvider() { return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class); } - public AwsCredentialsProvider getCustomV2CredentialsProvider() { + public AwsCredentialsProvider getCustomCredentialsProviderV2() { return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AwsCredentialsProvider.class); } diff --git a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java index 52e7d4b9..167d872a 100644 --- a/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java +++ b/s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java @@ -69,9 +69,9 @@ public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) { if (config.hasAwsStsRole()) { return getV2StsProvider(config); } - final AwsBasicCredentials awsCredentials = config.getAwsV2Credentials(); + final AwsBasicCredentials awsCredentials = config.getAwsCredentialsV2(); if (Objects.isNull(awsCredentials)) { - return config.getCustomV2CredentialsProvider(); + return config.getCustomCredentialsProviderV2(); } return StaticCredentialsProvider.create(awsCredentials); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java index 451311cd..13ff4d69 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java @@ -45,7 +45,8 @@ public S3Client createAmazonS3Client(final S3SourceConfig config) { if (Objects.isNull(config.getAwsS3EndPoint())) { return S3Client.builder() .overrideConfiguration(clientOverrideConfiguration) - .overrideConfiguration(o -> o.retryStrategy(r -> r.backoffStrategy(backoffStrategy))) + .overrideConfiguration(o -> o.retryStrategy( + r -> r.backoffStrategy(backoffStrategy).maxAttempts(config.getS3RetryBackoffMaxRetries()))) .region(config.getAwsS3Region()) .credentialsProvider(credentialFactory.getAwsV2Provider(config.getS3ConfigFragment())) .build(); diff --git a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java index d92f448c..23dc69e9 100644 --- a/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java +++ b/s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3SourceConfig.java @@ -86,7 +86,7 @@ public AwsStsEndpointConfig getStsEndpointConfig() { } public AwsBasicCredentials getAwsCredentials() { - return s3ConfigFragment.getAwsV2Credentials(); + return s3ConfigFragment.getAwsCredentialsV2(); } public String getAwsS3EndPoint() { @@ -94,7 +94,7 @@ public String getAwsS3EndPoint() { } public Region getAwsS3Region() { - return s3ConfigFragment.getAwsV2S3Region(); + return s3ConfigFragment.getAwsS3RegionV2(); } public String getAwsS3BucketName() {