diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java index d113fa3946..07edf4d508 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/InputManager.java @@ -52,11 +52,18 @@ public InputManager(SiddhiAppContext siddhiAppContext, public InputHandler getInputHandler(String streamId) { InputHandler inputHandler = inputHandlerMap.get(streamId); if (inputHandler == null) { - InputHandler newInputHandler = constructInputHandler(streamId); - if (this.isConnected) { - newInputHandler.connect(); + synchronized (this) { + inputHandler = inputHandlerMap.get(streamId); + if (inputHandler == null) { + InputHandler newInputHandler = constructInputHandler(streamId); + if (this.isConnected) { + newInputHandler.connect(); + } + return newInputHandler; + } else { + return inputHandler; + } } - return newInputHandler; } else { return inputHandler; } @@ -78,7 +85,7 @@ public synchronized void disconnect() { this.isConnected = false; } - public InputHandler constructInputHandler(String streamId) { + public synchronized InputHandler constructInputHandler(String streamId) { InputHandler inputHandler = new InputHandler(streamId, inputHandlerMap.size(), inputEntryValve, siddhiAppContext); StreamJunction streamJunction = streamJunctionMap.get(streamId);