diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java index 4d48122bd95..e13a805b37f 100644 --- a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java @@ -19,29 +19,38 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; +import org.apache.synapse.config.SynapseConfiguration; import org.wso2.carbon.connector.core.ConnectException; import org.wso2.carbon.connector.core.pool.Configuration; import org.wso2.carbon.connector.core.pool.ConnectionFactory; import org.wso2.carbon.connector.core.pool.ConnectionPool; +import org.wso2.carbon.connector.core.util.ConnectorUtils; +import org.wso2.carbon.connector.core.util.Constants; import java.time.Instant; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static java.lang.String.format; /** * Handles the connections */ -public class ConnectionHandler { +public class ConnectionHandler implements LocalEntryUndeployCallBack { private static final Log log = LogFactory.getLog(ConnectionHandler.class); private static final ConnectionHandler handler; // Stores connections/connection pools against connection code name // defined as : private final Map connectionMap; + private final Map connectionLocalEntryMap; + private final ConcurrentHashMap observerMap = new ConcurrentHashMap(); + private SynapseConfiguration synapseConfiguration = null; private ConnectionFactory connectionFactory = null; private Configuration configuration = null; @@ -52,8 +61,8 @@ public class ConnectionHandler { } private ConnectionHandler() { - this.connectionMap = new ConcurrentHashMap<>(); + this.connectionLocalEntryMap = new ConcurrentHashMap<>(); } /** @@ -66,6 +75,43 @@ public static ConnectionHandler getConnectionHandler() { return handler; } + /** + * Initialize local entry connection mapping + * + * @param connector Name of the connector + * @param connectionName Name of the connection + * @param messageContext Message Context + */ + public void initializeLocalEntryConnectionMapping(String connector, String connectionName, + MessageContext messageContext) { + String localEntryName = (String) ConnectorUtils. + lookupTemplateParamater(messageContext, Constants.INIT_CONFIG_KEY); + String uniqueConnectionName = getCode(connector, connectionName); + if (localEntryName != null && !connectionLocalEntryMap.containsKey(uniqueConnectionName)) { + connectionLocalEntryMap.put(uniqueConnectionName, localEntryName); + if (!observerMap.containsKey(localEntryName)) { + LocalEntryUndeployObserver localEntryUndeployObserver = new LocalEntryUndeployObserver(localEntryName); + localEntryUndeployObserver.setCallback(this); // Set the callback reference + SynapseConfiguration synapseConfig = messageContext.getEnvironment().getSynapseConfiguration(); + observerMap.put(localEntryName, localEntryUndeployObserver); + synapseConfig.registerObserver(localEntryUndeployObserver); + this.synapseConfiguration = synapseConfig; + } + } + } + + @Override + public void onLocalEntryUndeploy(String localEntryKey) { + if (localEntryKey != null && connectionLocalEntryMap.containsValue(localEntryKey)) { + removeLocalEntryConnections(localEntryKey); + connectionLocalEntryMap.values().removeIf(value -> value.equals(localEntryKey)); + LocalEntryUndeployObserver localEntryUndeployObserver = this.observerMap.remove(localEntryKey); + if (synapseConfiguration != null) { + this.synapseConfiguration.unregisterObserver(localEntryUndeployObserver); + } + } + } + /** * Creates a new connection pool and stores the connection * @@ -73,10 +119,41 @@ public static ConnectionHandler getConnectionHandler() { * @param connectionName Name of the connection * @param factory Connection Factory that defines how to create connections * @param configuration Configurations for the connection pool + * @param messageContext Message Context */ public void createConnection(String connector, String connectionName, ConnectionFactory factory, - Configuration configuration) { + Configuration configuration, MessageContext messageContext) { + initializeLocalEntryConnectionMapping(connector, connectionName, messageContext); + this.connectionFactory = factory; + this.configuration = configuration; + ConnectionPool pool = new ConnectionPool(connectionFactory, configuration); + connectionMap.putIfAbsent(getCode(connector, connectionName), pool); + } + + /** + * Stores a new single connection + * + * @param connector Name of the connector + * @param connectionName Name of the connection + * @param connection Connection to be stored + * @param messageContext Message Context + */ + public void createConnection(String connector, String connectionName, Connection connection + , MessageContext messageContext) { + initializeLocalEntryConnectionMapping(connector, connectionName, messageContext); + connectionMap.putIfAbsent(getCode(connector, connectionName), connection); + } + /** + * Creates a new connection pool and stores the connection + * + * @param connector Name of the connector + * @param connectionName Name of the connection + * @param factory Connection Factory that defines how to create connections + * @param configuration Configurations for the connection pool + */ + public void createConnection(String connector, String connectionName, ConnectionFactory factory, + Configuration configuration) { this.connectionFactory = factory; this.configuration = configuration; ConnectionPool pool = new ConnectionPool(connectionFactory, configuration); @@ -91,7 +168,6 @@ public void createConnection(String connector, String connectionName, Connection * @param connection Connection to be stored */ public void createConnection(String connector, String connectionName, Connection connection) { - connectionMap.putIfAbsent(getCode(connector, connectionName), connection); } @@ -196,6 +272,26 @@ public void shutdownConnections(String connector) { } } + /** + * remove local entry and associate connections from local entry store and connection map + * @param localEntryName + */ + public void removeLocalEntryConnections(String localEntryName) { + Set keysToRemove = connectionLocalEntryMap.entrySet().stream() + .filter(entry -> localEntryName.equals(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + // Now close each connection and remove the entry from the connectionMap + keysToRemove.forEach(key -> { + Object connection = connectionMap.get(key); + if (connection != null) { + closeConnection(connection); + connectionMap.remove(key); + } + }); + } + /** * Check if a connection exists for the connector by the same connection name * @@ -230,6 +326,27 @@ private void closeConnection(String conName, Object connectionObj) { } } + /** + * Closes the connection. + * + * @param connectionObj Connection Object + */ + private void closeConnection(Object connectionObj) { + if (connectionObj instanceof ConnectionPool) { + try { + ((ConnectionPool) connectionObj).close(); + } catch (ConnectException e) { + log.error("Failed to close connection pool. ", e); + } + } else if (connectionObj instanceof Connection) { + try { + ((Connection) connectionObj).close(); + } catch (ConnectException e) { + log.error("Failed to close connection ", e); + } + } + } + /** * Retrieves the connection code defined as : * diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployCallBack.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployCallBack.java new file mode 100644 index 00000000000..527c1630aae --- /dev/null +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployCallBack.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.carbon.connector.core.connection; + +public interface LocalEntryUndeployCallBack { + /** + * Listen for local entry un deploy events + * and cleanup connections originated by that local entry. + */ + void onLocalEntryUndeploy(String localEntryKey); +} diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployObserver.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployObserver.java new file mode 100644 index 00000000000..3ff85eab26e --- /dev/null +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployObserver.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.connector.core.connection; + + +import org.apache.synapse.config.AbstractSynapseObserver; +import org.apache.synapse.config.Entry; + +/** + * Listen for local entry un deploy events + * and cleanup connections originated by that local entry. + */ +public class LocalEntryUndeployObserver extends AbstractSynapseObserver { + private final String localEntryName; + + private LocalEntryUndeployCallBack callback; + + public LocalEntryUndeployObserver(String localEntryName) { + this.localEntryName = localEntryName; + } + + @Override + public void entryRemoved(Entry entry) { + if (this.callback != null && entry.getKey().equals(localEntryName)) { + this.callback.onLocalEntryUndeploy(entry.getKey()); + } + } + public void setCallback(LocalEntryUndeployCallBack callback) { + this.callback = callback; + } + + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocalEntryUndeployObserver localEntryUndeployObserver = (LocalEntryUndeployObserver) o; + return localEntryName.equals(localEntryUndeployObserver.localEntryName); + } + + public int hashCode() { + return localEntryName.hashCode(); + } + +} diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java index b014f987036..95b5c6d1e0c 100644 --- a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java @@ -29,4 +29,5 @@ public class Constants { public static final String MAX_EVICTION_TIME = "minEvictionTime"; public static final String EVICTION_CHECK_INTERVAL = "evictionCheckInterval"; public static final String EXHAUSTED_ACTION = "exhaustedAction"; + public static final String INIT_CONFIG_KEY = "INIT_CONFIG_KEY"; }