diff --git a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java index 91041bdd45e..559bcdc81d7 100644 --- a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java +++ b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java @@ -101,8 +101,8 @@ public class InboundWebsocketSourceHandler extends ChannelInboundHandlerAdapter private int port; private boolean dispatchToCustomSequence; private InboundWebsocketResponseSender responseSender; - private static ArrayList contentTypes = new ArrayList<>(); - private static ArrayList otherSubprotocols = new ArrayList<>(); + private ArrayList contentTypes = new ArrayList<>(); + private ArrayList otherSubprotocols = new ArrayList<>(); private int clientBroadcastLevel; private String outflowDispatchSequence; private String outflowErrorSequence; @@ -115,20 +115,14 @@ public class InboundWebsocketSourceHandler extends ChannelInboundHandlerAdapter private InboundApiHandler inboundApiHandler = new InboundApiHandler(); private static final AttributeKey> WSO2_PROPERTIES = AttributeKey.valueOf("WSO2_PROPERTIES"); - static { + public InboundWebsocketSourceHandler() throws Exception { contentTypes.add("application/xml"); contentTypes.add("application/json"); contentTypes.add("text/xml"); - } - - static { otherSubprotocols.add("graphql-ws"); otherSubprotocols.add("graphql-transport-ws"); } - public InboundWebsocketSourceHandler() throws Exception { - } - public void setSubprotocolHandlers(ArrayList subprotocolHandlers) { this.subprotocolHandlers = subprotocolHandlers; for (AbstractSubprotocolHandler handler : subprotocolHandlers) { @@ -178,13 +172,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (isHealthCheckCall) { return; } + if (handshaker == null) { + //Handshake has not occurred for this channel. Hence, we can ignore the rest + return; + } String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain); if (endpointName == null) { int portWithOffset = port + portOffset; handleException("Endpoint not found for port : " + portWithOffset + " tenant domain : " + tenantDomain); } WebsocketSubscriberPathManager.getInstance() - .addChannelContext(endpointName, subscriberPath.getPath(), wrappedContext); + .removeChannelContext(endpointName, subscriberPath.getPath(), wrappedContext); MessageContext synCtx = getSynapseMessageContext(tenantDomain , ctx); InboundEndpoint endpoint = synCtx.getConfiguration().getInboundEndpoint(endpointName); synCtx.setProperty(InboundWebsocketConstants.CONNECTION_TERMINATE, new Boolean(true)); diff --git a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java index 6e11a19a90b..e281d156586 100644 --- a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java +++ b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java @@ -75,8 +75,32 @@ public void removeChannelContext(String inboundName, InboundWebsocketChannelContext ctx) { ConcurrentHashMap> subscriberPathMap = inboundSubscriberPathMap.get(inboundName); + if (subscriberPathMap == null) { + if (log.isDebugEnabled()) { + log.debug("SubscriberPathMap does not exist for Inbound : " + inboundName + + ", in the Thread,ID: " + Thread.currentThread().getName() + "," + + Thread.currentThread().getId()); + } + return; + } List listContext = subscriberPathMap.get(subscriberPath); + if (listContext == null) { + if (log.isDebugEnabled()) { + log.debug("ListContext does not exist for SubscriberPath : " + subscriberPath + + ", in the Thread,ID: " + Thread.currentThread().getName() + "," + + Thread.currentThread().getId()); + } + return; + } for (Object context : listContext.toArray()) { + if (context == null) { + if (log.isDebugEnabled()) { + log.debug("ListContext has null contents : " + listContext.toArray().toString() + + ", in the Thread,ID: " + Thread.currentThread().getName() + "," + + Thread.currentThread().getId()); + } + continue; + } if (((InboundWebsocketChannelContext) context).getChannelIdentifier() .equals(ctx.getChannelIdentifier())) { if (log.isDebugEnabled()) {