From 547e75cf216b8c1407ae1b9d26157e1d9d8bf906 Mon Sep 17 00:00:00 2001 From: malakaganga Date: Wed, 1 Nov 2023 15:17:08 +0530 Subject: [PATCH] Support invalidating connections when related local entry changed I introduced an enhancement to the way we handle connections in relation to local entries.A mechanism was established where the key associated with a localEntry is passed from the InvokeMediator to the TemplateMediator then further propagated to the TemplateContext, ensuring that the specific localEntry is accessible from connection handling code. With the availability of the localEntry key, each active connection(dynamic/static) has been bound to its respective localEntry in a map. So when a localEntry is undeployed or removed, this change is detected by the Synapse Observer, and the connections associated with that specific localEntry are invalidated. Fixes: https://github.com/wso2/micro-integrator/issues/3002 --- .../core/connection/ConnectionHandler.java | 125 +++++++++++++++++- .../LocalEntryUndeployCallBack.java | 26 ++++ .../LocalEntryUndeployObserver.java | 59 +++++++++ .../carbon/connector/core/util/Constants.java | 1 + 4 files changed, 207 insertions(+), 4 deletions(-) create mode 100644 components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployCallBack.java create mode 100644 components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployObserver.java diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java index 4d48122bd95..e13a805b37f 100644 --- a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/ConnectionHandler.java @@ -19,29 +19,38 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; +import org.apache.synapse.config.SynapseConfiguration; import org.wso2.carbon.connector.core.ConnectException; import org.wso2.carbon.connector.core.pool.Configuration; import org.wso2.carbon.connector.core.pool.ConnectionFactory; import org.wso2.carbon.connector.core.pool.ConnectionPool; +import org.wso2.carbon.connector.core.util.ConnectorUtils; +import org.wso2.carbon.connector.core.util.Constants; import java.time.Instant; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static java.lang.String.format; /** * Handles the connections */ -public class ConnectionHandler { +public class ConnectionHandler implements LocalEntryUndeployCallBack { private static final Log log = LogFactory.getLog(ConnectionHandler.class); private static final ConnectionHandler handler; // Stores connections/connection pools against connection code name // defined as : private final Map connectionMap; + private final Map connectionLocalEntryMap; + private final ConcurrentHashMap observerMap = new ConcurrentHashMap(); + private SynapseConfiguration synapseConfiguration = null; private ConnectionFactory connectionFactory = null; private Configuration configuration = null; @@ -52,8 +61,8 @@ public class ConnectionHandler { } private ConnectionHandler() { - this.connectionMap = new ConcurrentHashMap<>(); + this.connectionLocalEntryMap = new ConcurrentHashMap<>(); } /** @@ -66,6 +75,43 @@ public static ConnectionHandler getConnectionHandler() { return handler; } + /** + * Initialize local entry connection mapping + * + * @param connector Name of the connector + * @param connectionName Name of the connection + * @param messageContext Message Context + */ + public void initializeLocalEntryConnectionMapping(String connector, String connectionName, + MessageContext messageContext) { + String localEntryName = (String) ConnectorUtils. + lookupTemplateParamater(messageContext, Constants.INIT_CONFIG_KEY); + String uniqueConnectionName = getCode(connector, connectionName); + if (localEntryName != null && !connectionLocalEntryMap.containsKey(uniqueConnectionName)) { + connectionLocalEntryMap.put(uniqueConnectionName, localEntryName); + if (!observerMap.containsKey(localEntryName)) { + LocalEntryUndeployObserver localEntryUndeployObserver = new LocalEntryUndeployObserver(localEntryName); + localEntryUndeployObserver.setCallback(this); // Set the callback reference + SynapseConfiguration synapseConfig = messageContext.getEnvironment().getSynapseConfiguration(); + observerMap.put(localEntryName, localEntryUndeployObserver); + synapseConfig.registerObserver(localEntryUndeployObserver); + this.synapseConfiguration = synapseConfig; + } + } + } + + @Override + public void onLocalEntryUndeploy(String localEntryKey) { + if (localEntryKey != null && connectionLocalEntryMap.containsValue(localEntryKey)) { + removeLocalEntryConnections(localEntryKey); + connectionLocalEntryMap.values().removeIf(value -> value.equals(localEntryKey)); + LocalEntryUndeployObserver localEntryUndeployObserver = this.observerMap.remove(localEntryKey); + if (synapseConfiguration != null) { + this.synapseConfiguration.unregisterObserver(localEntryUndeployObserver); + } + } + } + /** * Creates a new connection pool and stores the connection * @@ -73,10 +119,41 @@ public static ConnectionHandler getConnectionHandler() { * @param connectionName Name of the connection * @param factory Connection Factory that defines how to create connections * @param configuration Configurations for the connection pool + * @param messageContext Message Context */ public void createConnection(String connector, String connectionName, ConnectionFactory factory, - Configuration configuration) { + Configuration configuration, MessageContext messageContext) { + initializeLocalEntryConnectionMapping(connector, connectionName, messageContext); + this.connectionFactory = factory; + this.configuration = configuration; + ConnectionPool pool = new ConnectionPool(connectionFactory, configuration); + connectionMap.putIfAbsent(getCode(connector, connectionName), pool); + } + + /** + * Stores a new single connection + * + * @param connector Name of the connector + * @param connectionName Name of the connection + * @param connection Connection to be stored + * @param messageContext Message Context + */ + public void createConnection(String connector, String connectionName, Connection connection + , MessageContext messageContext) { + initializeLocalEntryConnectionMapping(connector, connectionName, messageContext); + connectionMap.putIfAbsent(getCode(connector, connectionName), connection); + } + /** + * Creates a new connection pool and stores the connection + * + * @param connector Name of the connector + * @param connectionName Name of the connection + * @param factory Connection Factory that defines how to create connections + * @param configuration Configurations for the connection pool + */ + public void createConnection(String connector, String connectionName, ConnectionFactory factory, + Configuration configuration) { this.connectionFactory = factory; this.configuration = configuration; ConnectionPool pool = new ConnectionPool(connectionFactory, configuration); @@ -91,7 +168,6 @@ public void createConnection(String connector, String connectionName, Connection * @param connection Connection to be stored */ public void createConnection(String connector, String connectionName, Connection connection) { - connectionMap.putIfAbsent(getCode(connector, connectionName), connection); } @@ -196,6 +272,26 @@ public void shutdownConnections(String connector) { } } + /** + * remove local entry and associate connections from local entry store and connection map + * @param localEntryName + */ + public void removeLocalEntryConnections(String localEntryName) { + Set keysToRemove = connectionLocalEntryMap.entrySet().stream() + .filter(entry -> localEntryName.equals(entry.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + + // Now close each connection and remove the entry from the connectionMap + keysToRemove.forEach(key -> { + Object connection = connectionMap.get(key); + if (connection != null) { + closeConnection(connection); + connectionMap.remove(key); + } + }); + } + /** * Check if a connection exists for the connector by the same connection name * @@ -230,6 +326,27 @@ private void closeConnection(String conName, Object connectionObj) { } } + /** + * Closes the connection. + * + * @param connectionObj Connection Object + */ + private void closeConnection(Object connectionObj) { + if (connectionObj instanceof ConnectionPool) { + try { + ((ConnectionPool) connectionObj).close(); + } catch (ConnectException e) { + log.error("Failed to close connection pool. ", e); + } + } else if (connectionObj instanceof Connection) { + try { + ((Connection) connectionObj).close(); + } catch (ConnectException e) { + log.error("Failed to close connection ", e); + } + } + } + /** * Retrieves the connection code defined as : * diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployCallBack.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployCallBack.java new file mode 100644 index 00000000000..527c1630aae --- /dev/null +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployCallBack.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.wso2.carbon.connector.core.connection; + +public interface LocalEntryUndeployCallBack { + /** + * Listen for local entry un deploy events + * and cleanup connections originated by that local entry. + */ + void onLocalEntryUndeploy(String localEntryKey); +} diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployObserver.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployObserver.java new file mode 100644 index 00000000000..3ff85eab26e --- /dev/null +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/connection/LocalEntryUndeployObserver.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 Inc. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.wso2.carbon.connector.core.connection; + + +import org.apache.synapse.config.AbstractSynapseObserver; +import org.apache.synapse.config.Entry; + +/** + * Listen for local entry un deploy events + * and cleanup connections originated by that local entry. + */ +public class LocalEntryUndeployObserver extends AbstractSynapseObserver { + private final String localEntryName; + + private LocalEntryUndeployCallBack callback; + + public LocalEntryUndeployObserver(String localEntryName) { + this.localEntryName = localEntryName; + } + + @Override + public void entryRemoved(Entry entry) { + if (this.callback != null && entry.getKey().equals(localEntryName)) { + this.callback.onLocalEntryUndeploy(entry.getKey()); + } + } + public void setCallback(LocalEntryUndeployCallBack callback) { + this.callback = callback; + } + + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + LocalEntryUndeployObserver localEntryUndeployObserver = (LocalEntryUndeployObserver) o; + return localEntryName.equals(localEntryUndeployObserver.localEntryName); + } + + public int hashCode() { + return localEntryName.hashCode(); + } + +} diff --git a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java index b014f987036..95b5c6d1e0c 100644 --- a/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java +++ b/components/mediation-connector/org.wso2.carbon.connector.core/src/main/java/org/wso2/carbon/connector/core/util/Constants.java @@ -29,4 +29,5 @@ public class Constants { public static final String MAX_EVICTION_TIME = "minEvictionTime"; public static final String EVICTION_CHECK_INTERVAL = "evictionCheckInterval"; public static final String EXHAUSTED_ACTION = "exhaustedAction"; + public static final String INIT_CONFIG_KEY = "INIT_CONFIG_KEY"; }