Skip to content
This repository has been archived by the owner on Oct 5, 2021. It is now read-only.

Commit

Permalink
Merge pull request #524 from erandacr/StaleConFix_Automation
Browse files Browse the repository at this point in the history
Stale con fix automation
  • Loading branch information
Buddhima committed May 20, 2016
2 parents 1abc39b + 9809238 commit a33fe21
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.esb.jms.transport.test;

import org.apache.activemq.broker.TransportConnector;
import org.apache.axiom.om.OMElement;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.wso2.carbon.esb.jms.utils.JMSBroker;
import org.wso2.carbon.integration.common.admin.client.LogViewerClient;
import org.wso2.carbon.logging.view.stub.types.carbon.LogEvent;
import org.wso2.esb.integration.common.utils.ESBIntegrationTest;
import org.wso2.esb.integration.common.utils.JMSEndpointManager;
import org.wso2.esb.integration.common.utils.Utils;
import org.wso2.esb.integration.common.utils.clients.axis2client.AxisServiceClient;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

/**
* This class tests the JMS sender side stale connections handling in unexpected broker shutdowns, when the connection
* caching is unabled.
* Initially 15 messages will be sent, which can fill the cached connection map (10) and after a broker restart another
* message will be sent. If this message get into default fault sequence, the test will be failed.
*/
public class JMSSenderStaleConnectionsTestCase extends ESBIntegrationTest {
/* this will be printed on default fault sequence when the exception is thrown */
private final String exceptedErrorLog = "<soapenv:Envelope xmlns:soapenv=\"http://schemas.xmlsoap.org/soap/"
+ "envelope/\"><soapenv:Body><ns:getQuote xmlns:ns=\"http://services.samples\"><ns:request><ns:symbol>"
+ "JMS</ns:symbol></ns:request></ns:getQuote></soapenv:Body></soapenv:Envelope>";
private LogViewerClient logViewerClient;
private JMSBroker jmsBroker;

@BeforeClass(alwaysRun = true)
protected void init() throws Exception {
super.init();

/* Initialize the JMS Broker */
List<TransportConnector> tcpConnectors = new ArrayList<>();
tcpConnectors.add(getTCPConnector());
this.jmsBroker = new JMSBroker("JMSConnectionCaching", tcpConnectors);

startBroker();

/* uploadSynapseConfig (Proxy) */
OMElement synapse = esbUtils.loadResource("artifacts/ESB/jms/transport/JMSSenderStaleConnectionsTestProxy.xml");
updateESBConfiguration(JMSEndpointManager.setConfigurations(synapse));

logViewerClient = new LogViewerClient(contextUrls.getBackEndUrl(), getSessionCookie());
}

@Test(groups = {
"wso2.esb"
},
description = "Test for JMS sender side stale connections handling")
public void staleConnectionsTestJMSProxy() throws Exception {

int beforeLogCount = logViewerClient.getAllSystemLogs().length;
AxisServiceClient client = new AxisServiceClient();

boolean isExceptionThrown = false;

for (int i = 0; i < 15; i++) {
client.sendRobust(Utils.getStockQuoteRequest("JMS"),
getProxyServiceURLHttp("JMSSenderStaleConnectionsTestProxy"), "getQuote");
}

/* restart the JMS broker */
stopBroker();
startBroker();

/* send another message after broker restart */
client.sendRobust(Utils.getStockQuoteRequest("JMS"),
getProxyServiceURLHttp("JMSSenderStaleConnectionsTestProxy"), "getQuote");

LogEvent[] logs = logViewerClient.getAllSystemLogs();

for (int i = 0; i < (logs.length - beforeLogCount); i++) {
if (logs[i].getMessage().contains(exceptedErrorLog)) {
isExceptionThrown = true;
break;
}
}
Assert.assertTrue(!isExceptionThrown, "Sender Side Stale connections handling test passed");
}

@AfterClass(alwaysRun = true)
public void destroy() throws Exception {
super.cleanup();
stopBroker();
}

/**
* Start the JMS broker (ActiveMQ)
*/
private void startBroker() {
if (jmsBroker != null && !jmsBroker.isBrokerStarted()) {
Assert.assertTrue(this.jmsBroker.start(), "JMS Broker(ActiveMQ) stating failed");
}
}

/**
* Stop the JMS broker (ActiveMQ)
*/
private void stopBroker() {
if (jmsBroker != null && jmsBroker.isBrokerStarted()) {
Assert.assertTrue(jmsBroker.stop(), "JMS Broker(ActiveMQ) Stopping failed");
}
}

private TransportConnector getTCPConnector() {
TransportConnector tcp = new TransportConnector();
tcp.setName("tcp");
try {
tcp.setUri(new URI("tcp://127.0.0.1:61626"));
} catch (URISyntaxException e) {
log.error("Error while setting tcp uri :tcp://127.0.0.1:61626", e);
}
return tcp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Copyright (c) 2016, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.wso2.carbon.esb.jms.utils;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.automation.engine.FrameworkConstants;
import org.wso2.carbon.automation.extensions.servers.jmsserver.controller.config.JMSBrokerConfiguration;

import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

/**
* This class is similar to org.wso2.carbon.automation.extensions.servers.jmsserver.controller.JMSBrokerController.
* Since the above mentioned class doesn't allow creation of multiple JMS broker controllers, this class is
* implemented temporary to serve that purpose.
* Jira issue: https://wso2.org/jira/browse/TA-1009
*/
public class JMSBroker {
private static final Log log = LogFactory.getLog(JMSBroker.class);
private String serverName;
private List<TransportConnector> transportConnectors;
private BrokerService broker;
private boolean isBrokerStarted = false;

/**
* Constructor to defined broker transport
*
* @param serverName name for the server
* @param configuration Transport configurations which should expose by the server
*/
public JMSBroker(String serverName, JMSBrokerConfiguration configuration) {
this.serverName = serverName;
this.transportConnectors = new ArrayList<TransportConnector>();
TransportConnector connector = new TransportConnector();
connector.setName("tcp");
try {
connector.setUri(new URI(configuration.getProviderURL()));
} catch (URISyntaxException e) {
log.error("Invalid URI", e);
}
transportConnectors.add(connector);

}

/**
* Constructor to defined broker transport
*
* @param serverName name of the server
* @param transportConnectors transport configurations which should expose by the server
*/
public JMSBroker(String serverName, List<TransportConnector> transportConnectors) {
this.serverName = serverName;
this.transportConnectors = transportConnectors;
}

/**
* Return the server name defined from constructor
*
* @return name of the broker service
*/
public String getServerName() {
return serverName;
}

/**
* starting ActiveMQ embedded broker
*
* @return true if the broker is registered successfully
*/
public boolean start() {
try {
log.info("JMSServerController: Preparing to start JMS Broker: " + serverName);
broker = new BrokerService();
// configure the broker

broker.setBrokerName(serverName);
log.info(broker.getBrokerDataDirectory());
broker.setDataDirectory(System.getProperty(FrameworkConstants.CARBON_HOME) +
File.separator + broker.getBrokerDataDirectory());
broker.setTransportConnectors(transportConnectors);
broker.setPersistent(true);

broker.start();
setBrokerStatus(true);
log.info("JMSServerController: Broker is Successfully started. continuing tests");
return true;
} catch (Exception e) {
log.error("JMSServerController: There was an error starting JMS broker: " + serverName, e);
return false;
}
}

/**
* Stopping ActiveMQ embedded broker
*
* @return true if broker is successfully stopped
*/
public boolean stop() {
try {
log.info(" ************* Stopping **************");
if (broker.isStarted()) {
broker.stop();
for (TransportConnector transportConnector : transportConnectors) {
transportConnector.stop();
}
setBrokerStatus(false);
}
return true;
} catch (Exception e) {
log.error("Error while shutting down the broker", e);
return false;
}
}

/**
* getting the broker status
*
* @return true if the broker is started
*/
public boolean isBrokerStarted() {
return isBrokerStarted;
}

/**
* introduced to get rid of find bugs warning
*
* @param status
*/
private void setBrokerStatus(boolean status) {
isBrokerStarted = status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://ws.apache.org/ns/synapse">
<registry provider="org.wso2.carbon.mediation.registry.WSO2Registry">
<parameter name="cachableDuration">15000</parameter>
</registry>
<proxy name="JMSSenderStaleConnectionsTestProxy"
transports="http"
startOnLoad="true"
trace="disable">
<description/>
<target>
<inSequence>
<property name="OUT_ONLY" value="true"/>
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
<send>
<endpoint>
<address uri="jms:/?transport.jms.ConnectionFactory=myQueueConnectionFactory2"/>
</endpoint>
</send>
</inSequence>
<outSequence>
<drop/>
</outSequence>
</target>
</proxy>
</definitions>
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,17 @@
<!--<transportSender name="local" class="org.apache.axis2.transport.local.NonBlockingLocalTransportSender"/>-->

<!-- uncomment this and configure to use connection pools for sending messages> -->
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">
<parameter name="myQueueConnectionFactory2" locked="false">
<parameter name="java.naming.factory.initial" locked="false">org.apache.activemq.jndi.ActiveMQInitialContextFactory</parameter>
<parameter name="java.naming.provider.url" locked="false">tcp://localhost:61626</parameter>
<parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
<parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
<parameter name="transport.jms.Destination" locked="false">dynamicQueues/StaleConnectionTest</parameter>
<parameter name="transport.jms.DestinationType" locked="false">queue</parameter>
<parameter name="transport.jms.CacheLevel" locked="false">connection</parameter>
</parameter>
</transportSender>

<!--transportSender name="vfs" class="org.apache.synapse.transport.vfs.VFSTransportSender"/-->

Expand Down

0 comments on commit a33fe21

Please sign in to comment.