Skip to content

Commit

Permalink
Merge pull request #1685 from Lakith-Rambukkanage/vMaster-websocket
Browse files Browse the repository at this point in the history
Fix web socket load testing issues
  • Loading branch information
Lakith-Rambukkanage committed Aug 17, 2023
2 parents 6041179 + 1db08d3 commit ae4d4ea
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public class InboundWebsocketSourceHandler extends ChannelInboundHandlerAdapter
private int port;
private boolean dispatchToCustomSequence;
private InboundWebsocketResponseSender responseSender;
private static ArrayList<String> contentTypes = new ArrayList<>();
private static ArrayList<String> otherSubprotocols = new ArrayList<>();
private ArrayList<String> contentTypes = new ArrayList<>();
private ArrayList<String> otherSubprotocols = new ArrayList<>();
private int clientBroadcastLevel;
private String outflowDispatchSequence;
private String outflowErrorSequence;
Expand All @@ -115,20 +115,14 @@ public class InboundWebsocketSourceHandler extends ChannelInboundHandlerAdapter
private InboundApiHandler inboundApiHandler = new InboundApiHandler();
private static final AttributeKey<Map<String, Object>> WSO2_PROPERTIES = AttributeKey.valueOf("WSO2_PROPERTIES");

static {
public InboundWebsocketSourceHandler() throws Exception {
contentTypes.add("application/xml");
contentTypes.add("application/json");
contentTypes.add("text/xml");
}

static {
otherSubprotocols.add("graphql-ws");
otherSubprotocols.add("graphql-transport-ws");
}

public InboundWebsocketSourceHandler() throws Exception {
}

public void setSubprotocolHandlers(ArrayList<AbstractSubprotocolHandler> subprotocolHandlers) {
this.subprotocolHandlers = subprotocolHandlers;
for (AbstractSubprotocolHandler handler : subprotocolHandlers) {
Expand Down Expand Up @@ -178,13 +172,17 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (isHealthCheckCall) {
return;
}
if (handshaker == null) {
//Handshake has not occurred for this channel. Hence, we can ignore the rest
return;
}
String endpointName = WebsocketEndpointManager.getInstance().getEndpointName(port, tenantDomain);
if (endpointName == null) {
int portWithOffset = port + portOffset;
handleException("Endpoint not found for port : " + portWithOffset + " tenant domain : " + tenantDomain);
}
WebsocketSubscriberPathManager.getInstance()
.addChannelContext(endpointName, subscriberPath.getPath(), wrappedContext);
.removeChannelContext(endpointName, subscriberPath.getPath(), wrappedContext);
MessageContext synCtx = getSynapseMessageContext(tenantDomain , ctx);
InboundEndpoint endpoint = synCtx.getConfiguration().getInboundEndpoint(endpointName);
synCtx.setProperty(InboundWebsocketConstants.CONNECTION_TERMINATE, new Boolean(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,32 @@ public void removeChannelContext(String inboundName,
InboundWebsocketChannelContext ctx) {
ConcurrentHashMap<String, List<InboundWebsocketChannelContext>> subscriberPathMap =
inboundSubscriberPathMap.get(inboundName);
if (subscriberPathMap == null) {
if (log.isDebugEnabled()) {
log.debug("SubscriberPathMap does not exist for Inbound : " + inboundName
+ ", in the Thread,ID: " + Thread.currentThread().getName() + ","
+ Thread.currentThread().getId());
}
return;
}
List<InboundWebsocketChannelContext> listContext = subscriberPathMap.get(subscriberPath);
if (listContext == null) {
if (log.isDebugEnabled()) {
log.debug("ListContext does not exist for SubscriberPath : " + subscriberPath
+ ", in the Thread,ID: " + Thread.currentThread().getName() + ","
+ Thread.currentThread().getId());
}
return;
}
for (Object context : listContext.toArray()) {
if (context == null) {
if (log.isDebugEnabled()) {
log.debug("ListContext has null contents : " + listContext.toArray().toString()
+ ", in the Thread,ID: " + Thread.currentThread().getName() + ","
+ Thread.currentThread().getId());
}
continue;
}
if (((InboundWebsocketChannelContext) context).getChannelIdentifier()
.equals(ctx.getChannelIdentifier())) {
if (log.isDebugEnabled()) {
Expand Down

0 comments on commit ae4d4ea

Please sign in to comment.