Skip to content

Commit

Permalink
KAFKA-414: External secrets in connection.uri attribute runs into val…
Browse files Browse the repository at this point in the history
…idation error during connector deployment if config providers is not set on the connect worker level (#163)



Co-authored-by: Asif Mahmud <rasifmahmud16@gmail.com>
  • Loading branch information
jagadishmdb and rasifmahmud authored May 29, 2024
1 parent 6e43b1d commit 8c1a9b2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
10 changes: 6 additions & 4 deletions src/main/java/com/mongodb/kafka/connect/MongoSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@ public ConfigDef config() {

@Override
public Config validate(final Map<String, String> connectorConfigs) {
Config rawConfig = super.validate(connectorConfigs);

MongoSinkConfig sinkConfig;
try {
sinkConfig = new MongoSinkConfig(connectorConfigs);
} catch (Exception e) {
return rawConfig;
return super.validate(connectorConfigs);
}

final Config config = ConfigHelper.evaluateConfigValues(rawConfig, sinkConfig);
final Map<String, String> resolvedConnectorConfigs =
ConfigHelper.evaluateConfigValues(connectorConfigs, sinkConfig);

final Config tempConfig = super.validate(resolvedConnectorConfigs);
final Config config = ConfigHelper.evaluateConfigValues(tempConfig, sinkConfig);

validateCanConnect(sinkConfig, config, CONNECTION_URI_CONFIG)
.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ public Class<? extends Task> taskClass() {

@Override
public Config validate(final Map<String, String> connectorConfigs) {
Config rawConfig = super.validate(connectorConfigs);
MongoSourceConfig sourceConfig;
try {
sourceConfig = new MongoSourceConfig(connectorConfigs);
} catch (Exception e) {
return rawConfig;
return super.validate(connectorConfigs);
}

final Config config = ConfigHelper.evaluateConfigValues(rawConfig, sourceConfig);
final Map<String, String> resolvedConnectorConfigs =
ConfigHelper.evaluateConfigValues(connectorConfigs, sourceConfig);

final Config tempConfig = super.validate(resolvedConnectorConfigs);
final Config config = ConfigHelper.evaluateConfigValues(tempConfig, sourceConfig);

validateCanConnect(sourceConfig, config, MongoSourceConfig.CONNECTION_URI_CONFIG)
.ifPresent(
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/mongodb/kafka/connect/util/ConfigHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import static java.lang.String.format;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -205,4 +206,18 @@ public static Config evaluateConfigValues(
});
return rawConfig;
}

public static Map<String, String> evaluateConfigValues(
final Map<String, String> rawConfigs, final AbstractConfig resolvedConfig) {
Map<String, String> resolvedRawConfigs = new HashMap<>(rawConfigs);
final Map<String, Object> originals = resolvedConfig.originals();
rawConfigs.forEach(
(key, val) -> {
Object ev = originals.get(key);
if (ev instanceof String) {
resolvedRawConfigs.put(key, (String) ev);
}
});
return resolvedRawConfigs;
}
}

0 comments on commit 8c1a9b2

Please sign in to comment.