Skip to content

Commit

Permalink
Support invalidating connections when related local entry changed
Browse files Browse the repository at this point in the history
I introduced an enhancement to the way we handle connections
 in relation to local entries.A mechanism was established where the key
 associated with a localEntry is passed from the InvokeMediator
 to the TemplateMediator then further propagated to the TemplateContext,
 ensuring that the specific localEntry is accessible from connection handling code.

 With the availability of the localEntry key, each active connection(dynamic/static) has been
 bound to its respective localEntry in a map.

 So when a localEntry is undeployed or removed, this change is detected by
 the Synapse Observer, and the connections associated with that specific localEntry
 are invalidated.

 Fixes: wso2/micro-integrator#3002
  • Loading branch information
malakaganga committed Nov 10, 2023
1 parent 27d717d commit 547e75c
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,38 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.config.SynapseConfiguration;
import org.wso2.carbon.connector.core.ConnectException;
import org.wso2.carbon.connector.core.pool.Configuration;
import org.wso2.carbon.connector.core.pool.ConnectionFactory;
import org.wso2.carbon.connector.core.pool.ConnectionPool;
import org.wso2.carbon.connector.core.util.ConnectorUtils;
import org.wso2.carbon.connector.core.util.Constants;

import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static java.lang.String.format;

/**
* Handles the connections
*/
public class ConnectionHandler {
public class ConnectionHandler implements LocalEntryUndeployCallBack {

private static final Log log = LogFactory.getLog(ConnectionHandler.class);
private static final ConnectionHandler handler;
// Stores connections/connection pools against connection code name
// defined as <connector_name>:<connection_name>
private final Map<String, Object> connectionMap;
private final Map<String, String> connectionLocalEntryMap;
private final ConcurrentHashMap<String, LocalEntryUndeployObserver> observerMap = new ConcurrentHashMap();
private SynapseConfiguration synapseConfiguration = null;
private ConnectionFactory connectionFactory = null;
private Configuration configuration = null;

Expand All @@ -52,8 +61,8 @@ public class ConnectionHandler {
}

private ConnectionHandler() {

this.connectionMap = new ConcurrentHashMap<>();
this.connectionLocalEntryMap = new ConcurrentHashMap<>();
}

/**
Expand All @@ -66,17 +75,85 @@ public static ConnectionHandler getConnectionHandler() {
return handler;
}

/**
* Initialize local entry connection mapping
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param messageContext Message Context
*/
public void initializeLocalEntryConnectionMapping(String connector, String connectionName,
MessageContext messageContext) {
String localEntryName = (String) ConnectorUtils.
lookupTemplateParamater(messageContext, Constants.INIT_CONFIG_KEY);
String uniqueConnectionName = getCode(connector, connectionName);
if (localEntryName != null && !connectionLocalEntryMap.containsKey(uniqueConnectionName)) {
connectionLocalEntryMap.put(uniqueConnectionName, localEntryName);
if (!observerMap.containsKey(localEntryName)) {
LocalEntryUndeployObserver localEntryUndeployObserver = new LocalEntryUndeployObserver(localEntryName);
localEntryUndeployObserver.setCallback(this); // Set the callback reference
SynapseConfiguration synapseConfig = messageContext.getEnvironment().getSynapseConfiguration();
observerMap.put(localEntryName, localEntryUndeployObserver);
synapseConfig.registerObserver(localEntryUndeployObserver);
this.synapseConfiguration = synapseConfig;
}
}
}

@Override
public void onLocalEntryUndeploy(String localEntryKey) {
if (localEntryKey != null && connectionLocalEntryMap.containsValue(localEntryKey)) {
removeLocalEntryConnections(localEntryKey);
connectionLocalEntryMap.values().removeIf(value -> value.equals(localEntryKey));
LocalEntryUndeployObserver localEntryUndeployObserver = this.observerMap.remove(localEntryKey);
if (synapseConfiguration != null) {
this.synapseConfiguration.unregisterObserver(localEntryUndeployObserver);
}
}
}

/**
* Creates a new connection pool and stores the connection
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param factory Connection Factory that defines how to create connections
* @param configuration Configurations for the connection pool
* @param messageContext Message Context
*/
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration) {
Configuration configuration, MessageContext messageContext) {
initializeLocalEntryConnectionMapping(connector, connectionName, messageContext);
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
connectionMap.putIfAbsent(getCode(connector, connectionName), pool);
}

/**
* Stores a new single connection
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param connection Connection to be stored
* @param messageContext Message Context
*/
public void createConnection(String connector, String connectionName, Connection connection
, MessageContext messageContext) {
initializeLocalEntryConnectionMapping(connector, connectionName, messageContext);
connectionMap.putIfAbsent(getCode(connector, connectionName), connection);
}

/**
* Creates a new connection pool and stores the connection
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param factory Connection Factory that defines how to create connections
* @param configuration Configurations for the connection pool
*/
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration) {
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
Expand All @@ -91,7 +168,6 @@ public void createConnection(String connector, String connectionName, Connection
* @param connection Connection to be stored
*/
public void createConnection(String connector, String connectionName, Connection connection) {

connectionMap.putIfAbsent(getCode(connector, connectionName), connection);
}

Expand Down Expand Up @@ -196,6 +272,26 @@ public void shutdownConnections(String connector) {
}
}

/**
* remove local entry and associate connections from local entry store and connection map
* @param localEntryName
*/
public void removeLocalEntryConnections(String localEntryName) {
Set<String> keysToRemove = connectionLocalEntryMap.entrySet().stream()
.filter(entry -> localEntryName.equals(entry.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());

// Now close each connection and remove the entry from the connectionMap
keysToRemove.forEach(key -> {
Object connection = connectionMap.get(key);
if (connection != null) {
closeConnection(connection);
connectionMap.remove(key);
}
});
}

/**
* Check if a connection exists for the connector by the same connection name
*
Expand Down Expand Up @@ -230,6 +326,27 @@ private void closeConnection(String conName, Object connectionObj) {
}
}

/**
* Closes the connection.
*
* @param connectionObj Connection Object
*/
private void closeConnection(Object connectionObj) {
if (connectionObj instanceof ConnectionPool) {
try {
((ConnectionPool) connectionObj).close();
} catch (ConnectException e) {
log.error("Failed to close connection pool. ", e);
}
} else if (connectionObj instanceof Connection) {
try {
((Connection) connectionObj).close();
} catch (ConnectException e) {
log.error("Failed to close connection ", e);
}
}
}

/**
* Retrieves the connection code defined as <connector_name>:<connection_name>
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.connector.core.connection;

public interface LocalEntryUndeployCallBack {
/**
* Listen for local entry un deploy events
* and cleanup connections originated by that local entry.
*/
void onLocalEntryUndeploy(String localEntryKey);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.connector.core.connection;


import org.apache.synapse.config.AbstractSynapseObserver;
import org.apache.synapse.config.Entry;

/**
* Listen for local entry un deploy events
* and cleanup connections originated by that local entry.
*/
public class LocalEntryUndeployObserver extends AbstractSynapseObserver {
private final String localEntryName;

private LocalEntryUndeployCallBack callback;

public LocalEntryUndeployObserver(String localEntryName) {
this.localEntryName = localEntryName;
}

@Override
public void entryRemoved(Entry entry) {
if (this.callback != null && entry.getKey().equals(localEntryName)) {
this.callback.onLocalEntryUndeploy(entry.getKey());
}
}
public void setCallback(LocalEntryUndeployCallBack callback) {
this.callback = callback;
}

public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocalEntryUndeployObserver localEntryUndeployObserver = (LocalEntryUndeployObserver) o;
return localEntryName.equals(localEntryUndeployObserver.localEntryName);
}

public int hashCode() {
return localEntryName.hashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ public class Constants {
public static final String MAX_EVICTION_TIME = "minEvictionTime";
public static final String EVICTION_CHECK_INTERVAL = "evictionCheckInterval";
public static final String EXHAUSTED_ACTION = "exhaustedAction";
public static final String INIT_CONFIG_KEY = "INIT_CONFIG_KEY";
}

0 comments on commit 547e75c

Please sign in to comment.