From aa1c1dcac6cab51672a67360031ecaa7b930967c Mon Sep 17 00:00:00 2001 From: Grainier Perera Date: Mon, 28 Mar 2022 12:13:04 +0530 Subject: [PATCH] Fix latency/throughput tracker issues --- .../core/aggregation/AggregationRuntime.java | 5 +- .../stream/input/source/SourceMapper.java | 4 +- .../main/java/io/siddhi/core/table/Table.java | 112 ++++++++++-------- .../java/io/siddhi/core/window/Window.java | 11 +- 4 files changed, 76 insertions(+), 56 deletions(-) diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java index ccadf7dfec..7140e6b56e 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/aggregation/AggregationRuntime.java @@ -341,10 +341,13 @@ public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCond SiddhiQueryContext siddhiQueryContext) { try { SnapshotService.getSkipStateStorageThreadLocal().set(true); + if (throughputTrackerFind != null && + Level.BASIC.compareTo(siddhiQueryContext.getSiddhiAppContext().getRootMetricsLevel()) <= 0) { + throughputTrackerFind.eventIn(); + } if (latencyTrackerFind != null && Level.BASIC.compareTo(siddhiQueryContext.getSiddhiAppContext().getRootMetricsLevel()) <= 0) { latencyTrackerFind.markIn(); - throughputTrackerFind.eventIn(); } if (!isDistributed && !isFirstEventArrived) { // No need to initialise executors if it is distributed diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/SourceMapper.java b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/SourceMapper.java index e7b497d723..f2b1e1cc56 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/SourceMapper.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/stream/input/source/SourceMapper.java @@ -194,7 +194,7 @@ public final void onEvent(Object eventObject, Object[] transportProperties, Stri Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { throughputTracker.eventIn(); } - if (throughputTracker != null && + if (mapperLatencyTracker != null && Level.DETAIL.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { mapperLatencyTracker.markIn(); } @@ -207,7 +207,7 @@ public final void onEvent(Object eventObject, Object[] transportProperties, Stri receivedEventCounter.countEvents(eventObject); } } finally { - if (throughputTracker != null && + if (mapperLatencyTracker != null && Level.DETAIL.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { mapperLatencyTracker.markOut(); } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java b/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java index 44e4e02594..04f154e2dc 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java @@ -216,19 +216,22 @@ private void handleStoreAddError(ComplexEventChunk addingEventChunk, boolean isF } public void addEvents(ComplexEventChunk addingEventChunk, int noOfEvents) { - if (latencyTrackerInsert != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerInsert.markIn(); - } - addingEventChunk.reset(); - add(addingEventChunk); - if (latencyTrackerInsert != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerInsert.markOut(); - } - if (throughputTrackerInsert != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - throughputTrackerInsert.eventsIn(noOfEvents); + try { + if (throughputTrackerInsert != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + throughputTrackerInsert.eventsIn(noOfEvents); + } + if (latencyTrackerInsert != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerInsert.markIn(); + } + addingEventChunk.reset(); + add(addingEventChunk); + } finally { + if (latencyTrackerInsert != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerInsert.markOut(); + } } } @@ -335,18 +338,21 @@ protected void onDeleteError(ComplexEventChunk deletingEventChunk, C public void deleteEvents(ComplexEventChunk deletingEventChunk, CompiledCondition compiledCondition, int noOfEvents) { - if (latencyTrackerDelete != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerDelete.markIn(); - } - delete(deletingEventChunk, compiledCondition); - if (latencyTrackerDelete != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerDelete.markOut(); - } - if (throughputTrackerDelete != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - throughputTrackerDelete.eventsIn(noOfEvents); + try { + if (throughputTrackerDelete != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + throughputTrackerDelete.eventsIn(noOfEvents); + } + if (latencyTrackerDelete != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerDelete.markIn(); + } + delete(deletingEventChunk, compiledCondition); + } finally { + if (latencyTrackerDelete != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerDelete.markOut(); + } } } @@ -418,18 +424,21 @@ protected void onUpdateError(ComplexEventChunk updatingEventChunk, C public void updateEvents(ComplexEventChunk updatingEventChunk, CompiledCondition compiledCondition, CompiledUpdateSet compiledUpdateSet, int noOfEvents) { - if (latencyTrackerUpdate != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerUpdate.markIn(); - } - update(updatingEventChunk, compiledCondition, compiledUpdateSet); - if (latencyTrackerUpdate != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerUpdate.markOut(); - } - if (throughputTrackerUpdate != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - throughputTrackerUpdate.eventsIn(noOfEvents); + try { + if (throughputTrackerUpdate != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + throughputTrackerUpdate.eventsIn(noOfEvents); + } + if (latencyTrackerUpdate != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerUpdate.markIn(); + } + update(updatingEventChunk, compiledCondition, compiledUpdateSet); + } finally { + if (latencyTrackerUpdate != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerUpdate.markOut(); + } } } @@ -506,19 +515,22 @@ public void updateOrAddEvents(ComplexEventChunk updateOrAddingEventC CompiledCondition compiledCondition, CompiledUpdateSet compiledUpdateSet, AddingStreamEventExtractor addingStreamEventExtractor, int noOfEvents) { - if (latencyTrackerUpdateOrInsert != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerUpdateOrInsert.markIn(); - } - updateOrAdd(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet, - addingStreamEventExtractor); - if (latencyTrackerUpdateOrInsert != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - latencyTrackerUpdateOrInsert.markOut(); - } - if (throughputTrackerUpdateOrInsert != null && - Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { - throughputTrackerUpdateOrInsert.eventsIn(noOfEvents); + try { + if (throughputTrackerUpdateOrInsert != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + throughputTrackerUpdateOrInsert.eventsIn(noOfEvents); + } + if (latencyTrackerUpdateOrInsert != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerUpdateOrInsert.markIn(); + } + updateOrAdd(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet, + addingStreamEventExtractor); + } finally { + if (latencyTrackerUpdateOrInsert != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + latencyTrackerUpdateOrInsert.markOut(); + } } } diff --git a/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java b/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java index 8d1d5908d2..3568abbaad 100644 --- a/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java +++ b/modules/siddhi-core/src/main/java/io/siddhi/core/window/Window.java @@ -238,12 +238,15 @@ public void add(ComplexEventChunk complexEventChunk) { if (throughputTrackerInsert != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { throughputTrackerInsert.eventsIn(numberOfEvents); + } + if (latencyTrackerInsert != null && + Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { latencyTrackerInsert.markIn(); } // Send to the window windowProcessor windowProcessor.process(new ComplexEventChunk<>(firstEvent, currentEvent)); } finally { - if (throughputTrackerInsert != null && + if (latencyTrackerInsert != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { latencyTrackerInsert.markOut(); } @@ -261,11 +264,13 @@ public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCond try { if (throughputTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { throughputTrackerFind.eventIn(); + } + if (latencyTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { latencyTrackerFind.markIn(); } return ((FindableProcessor) this.internalWindowProcessor).find(matchingEvent, compiledCondition); } finally { - if (throughputTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { + if (latencyTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { latencyTrackerFind.markOut(); } } @@ -330,7 +335,7 @@ private class StreamPublishProcessor implements Processor { } public void process(ComplexEventChunk complexEventChunk) { - if (throughputTrackerInsert != null && + if (latencyTrackerInsert != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) { latencyTrackerInsert.markOut(); }