Skip to content

Commit

Permalink
Merge pull request #1690 from malakaganga/fix_localentry
Browse files Browse the repository at this point in the history
Support invalidating connections when related local entry changed
  • Loading branch information
malakaganga committed Nov 10, 2023
2 parents 27d717d + 547e75c commit e5db85f
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,38 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.config.SynapseConfiguration;
import org.wso2.carbon.connector.core.ConnectException;
import org.wso2.carbon.connector.core.pool.Configuration;
import org.wso2.carbon.connector.core.pool.ConnectionFactory;
import org.wso2.carbon.connector.core.pool.ConnectionPool;
import org.wso2.carbon.connector.core.util.ConnectorUtils;
import org.wso2.carbon.connector.core.util.Constants;

import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static java.lang.String.format;

/**
* Handles the connections
*/
public class ConnectionHandler {
public class ConnectionHandler implements LocalEntryUndeployCallBack {

private static final Log log = LogFactory.getLog(ConnectionHandler.class);
private static final ConnectionHandler handler;
// Stores connections/connection pools against connection code name
// defined as <connector_name>:<connection_name>
private final Map<String, Object> connectionMap;
private final Map<String, String> connectionLocalEntryMap;
private final ConcurrentHashMap<String, LocalEntryUndeployObserver> observerMap = new ConcurrentHashMap();
private SynapseConfiguration synapseConfiguration = null;
private ConnectionFactory connectionFactory = null;
private Configuration configuration = null;

Expand All @@ -52,8 +61,8 @@ public class ConnectionHandler {
}

private ConnectionHandler() {

this.connectionMap = new ConcurrentHashMap<>();
this.connectionLocalEntryMap = new ConcurrentHashMap<>();
}

/**
Expand All @@ -66,17 +75,85 @@ public static ConnectionHandler getConnectionHandler() {
return handler;
}

/**
* Initialize local entry connection mapping
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param messageContext Message Context
*/
public void initializeLocalEntryConnectionMapping(String connector, String connectionName,
MessageContext messageContext) {
String localEntryName = (String) ConnectorUtils.
lookupTemplateParamater(messageContext, Constants.INIT_CONFIG_KEY);
String uniqueConnectionName = getCode(connector, connectionName);
if (localEntryName != null && !connectionLocalEntryMap.containsKey(uniqueConnectionName)) {
connectionLocalEntryMap.put(uniqueConnectionName, localEntryName);
if (!observerMap.containsKey(localEntryName)) {
LocalEntryUndeployObserver localEntryUndeployObserver = new LocalEntryUndeployObserver(localEntryName);
localEntryUndeployObserver.setCallback(this); // Set the callback reference
SynapseConfiguration synapseConfig = messageContext.getEnvironment().getSynapseConfiguration();
observerMap.put(localEntryName, localEntryUndeployObserver);
synapseConfig.registerObserver(localEntryUndeployObserver);
this.synapseConfiguration = synapseConfig;
}
}
}

@Override
public void onLocalEntryUndeploy(String localEntryKey) {
if (localEntryKey != null && connectionLocalEntryMap.containsValue(localEntryKey)) {
removeLocalEntryConnections(localEntryKey);
connectionLocalEntryMap.values().removeIf(value -> value.equals(localEntryKey));
LocalEntryUndeployObserver localEntryUndeployObserver = this.observerMap.remove(localEntryKey);
if (synapseConfiguration != null) {
this.synapseConfiguration.unregisterObserver(localEntryUndeployObserver);
}
}
}

/**
* Creates a new connection pool and stores the connection
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param factory Connection Factory that defines how to create connections
* @param configuration Configurations for the connection pool
* @param messageContext Message Context
*/
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration) {
Configuration configuration, MessageContext messageContext) {
initializeLocalEntryConnectionMapping(connector, connectionName, messageContext);
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
connectionMap.putIfAbsent(getCode(connector, connectionName), pool);
}

/**
* Stores a new single connection
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param connection Connection to be stored
* @param messageContext Message Context
*/
public void createConnection(String connector, String connectionName, Connection connection
, MessageContext messageContext) {
initializeLocalEntryConnectionMapping(connector, connectionName, messageContext);
connectionMap.putIfAbsent(getCode(connector, connectionName), connection);
}

/**
* Creates a new connection pool and stores the connection
*
* @param connector Name of the connector
* @param connectionName Name of the connection
* @param factory Connection Factory that defines how to create connections
* @param configuration Configurations for the connection pool
*/
public void createConnection(String connector, String connectionName, ConnectionFactory factory,
Configuration configuration) {
this.connectionFactory = factory;
this.configuration = configuration;
ConnectionPool pool = new ConnectionPool(connectionFactory, configuration);
Expand All @@ -91,7 +168,6 @@ public void createConnection(String connector, String connectionName, Connection
* @param connection Connection to be stored
*/
public void createConnection(String connector, String connectionName, Connection connection) {

connectionMap.putIfAbsent(getCode(connector, connectionName), connection);
}

Expand Down Expand Up @@ -196,6 +272,26 @@ public void shutdownConnections(String connector) {
}
}

/**
* remove local entry and associate connections from local entry store and connection map
* @param localEntryName
*/
public void removeLocalEntryConnections(String localEntryName) {
Set<String> keysToRemove = connectionLocalEntryMap.entrySet().stream()
.filter(entry -> localEntryName.equals(entry.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());

// Now close each connection and remove the entry from the connectionMap
keysToRemove.forEach(key -> {
Object connection = connectionMap.get(key);
if (connection != null) {
closeConnection(connection);
connectionMap.remove(key);
}
});
}

/**
* Check if a connection exists for the connector by the same connection name
*
Expand Down Expand Up @@ -230,6 +326,27 @@ private void closeConnection(String conName, Object connectionObj) {
}
}

/**
* Closes the connection.
*
* @param connectionObj Connection Object
*/
private void closeConnection(Object connectionObj) {
if (connectionObj instanceof ConnectionPool) {
try {
((ConnectionPool) connectionObj).close();
} catch (ConnectException e) {
log.error("Failed to close connection pool. ", e);
}
} else if (connectionObj instanceof Connection) {
try {
((Connection) connectionObj).close();
} catch (ConnectException e) {
log.error("Failed to close connection ", e);
}
}
}

/**
* Retrieves the connection code defined as <connector_name>:<connection_name>
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.wso2.carbon.connector.core.connection;

public interface LocalEntryUndeployCallBack {
/**
* Listen for local entry un deploy events
* and cleanup connections originated by that local entry.
*/
void onLocalEntryUndeploy(String localEntryKey);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2023, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.connector.core.connection;


import org.apache.synapse.config.AbstractSynapseObserver;
import org.apache.synapse.config.Entry;

/**
* Listen for local entry un deploy events
* and cleanup connections originated by that local entry.
*/
public class LocalEntryUndeployObserver extends AbstractSynapseObserver {
private final String localEntryName;

private LocalEntryUndeployCallBack callback;

public LocalEntryUndeployObserver(String localEntryName) {
this.localEntryName = localEntryName;
}

@Override
public void entryRemoved(Entry entry) {
if (this.callback != null && entry.getKey().equals(localEntryName)) {
this.callback.onLocalEntryUndeploy(entry.getKey());
}
}
public void setCallback(LocalEntryUndeployCallBack callback) {
this.callback = callback;
}

public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocalEntryUndeployObserver localEntryUndeployObserver = (LocalEntryUndeployObserver) o;
return localEntryName.equals(localEntryUndeployObserver.localEntryName);
}

public int hashCode() {
return localEntryName.hashCode();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ public class Constants {
public static final String MAX_EVICTION_TIME = "minEvictionTime";
public static final String EVICTION_CHECK_INTERVAL = "evictionCheckInterval";
public static final String EXHAUSTED_ACTION = "exhaustedAction";
public static final String INIT_CONFIG_KEY = "INIT_CONFIG_KEY";
}

0 comments on commit e5db85f

Please sign in to comment.