diff --git a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java index 91041bdd45e..a05f351d15e 100644 --- a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java +++ b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/InboundWebsocketSourceHandler.java @@ -178,13 +178,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (isHealthCheckCall) { return; } + if (handshaker == null) { + //Handshake has not occurred for this channel. Hence, we can ignore the rest + return; + } String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain); if (endpointName == null) { int portWithOffset = port + portOffset; handleException("Endpoint not found for port : " + portWithOffset + " tenant domain : " + tenantDomain); } WebsocketSubscriberPathManager.getInstance() - .addChannelContext(endpointName, subscriberPath.getPath(), wrappedContext); + .removeChannelContext(endpointName, subscriberPath.getPath(), wrappedContext); MessageContext synCtx = getSynapseMessageContext(tenantDomain , ctx); InboundEndpoint endpoint = synCtx.getConfiguration().getInboundEndpoint(endpointName); synCtx.setProperty(InboundWebsocketConstants.CONNECTION_TERMINATE, new Boolean(true)); diff --git a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java index 6e11a19a90b..e281d156586 100644 --- a/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java +++ b/components/inbound-endpoints/org.wso2.carbon.inbound.endpoint/src/main/java/org/wso2/carbon/inbound/endpoint/protocol/websocket/management/WebsocketSubscriberPathManager.java @@ -75,8 +75,32 @@ public void removeChannelContext(String inboundName, InboundWebsocketChannelContext ctx) { ConcurrentHashMap> subscriberPathMap = inboundSubscriberPathMap.get(inboundName); + if (subscriberPathMap == null) { + if (log.isDebugEnabled()) { + log.debug("SubscriberPathMap does not exist for Inbound : " + inboundName + + ", in the Thread,ID: " + Thread.currentThread().getName() + "," + + Thread.currentThread().getId()); + } + return; + } List listContext = subscriberPathMap.get(subscriberPath); + if (listContext == null) { + if (log.isDebugEnabled()) { + log.debug("ListContext does not exist for SubscriberPath : " + subscriberPath + + ", in the Thread,ID: " + Thread.currentThread().getName() + "," + + Thread.currentThread().getId()); + } + return; + } for (Object context : listContext.toArray()) { + if (context == null) { + if (log.isDebugEnabled()) { + log.debug("ListContext has null contents : " + listContext.toArray().toString() + + ", in the Thread,ID: " + Thread.currentThread().getName() + "," + + Thread.currentThread().getId()); + } + continue; + } if (((InboundWebsocketChannelContext) context).getChannelIdentifier() .equals(ctx.getChannelIdentifier())) { if (log.isDebugEnabled()) {