Skip to content

Commit

Permalink
Merge pull request #1774 from grainier/latency-tracker-fix
Browse files Browse the repository at this point in the history
Fix latency/throughput tracker issues
  • Loading branch information
AnuGayan committed Mar 28, 2022
2 parents 6b804a6 + aa1c1dc commit ff12f2f
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -341,10 +341,13 @@ public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCond
SiddhiQueryContext siddhiQueryContext) {
try {
SnapshotService.getSkipStateStorageThreadLocal().set(true);
if (throughputTrackerFind != null &&
Level.BASIC.compareTo(siddhiQueryContext.getSiddhiAppContext().getRootMetricsLevel()) <= 0) {
throughputTrackerFind.eventIn();
}
if (latencyTrackerFind != null &&
Level.BASIC.compareTo(siddhiQueryContext.getSiddhiAppContext().getRootMetricsLevel()) <= 0) {
latencyTrackerFind.markIn();
throughputTrackerFind.eventIn();
}
if (!isDistributed && !isFirstEventArrived) {
// No need to initialise executors if it is distributed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public final void onEvent(Object eventObject, Object[] transportProperties, Stri
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTracker.eventIn();
}
if (throughputTracker != null &&
if (mapperLatencyTracker != null &&
Level.DETAIL.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
mapperLatencyTracker.markIn();
}
Expand All @@ -207,7 +207,7 @@ public final void onEvent(Object eventObject, Object[] transportProperties, Stri
receivedEventCounter.countEvents(eventObject);
}
} finally {
if (throughputTracker != null &&
if (mapperLatencyTracker != null &&
Level.DETAIL.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
mapperLatencyTracker.markOut();
}
Expand Down
112 changes: 62 additions & 50 deletions modules/siddhi-core/src/main/java/io/siddhi/core/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,19 +216,22 @@ private void handleStoreAddError(ComplexEventChunk addingEventChunk, boolean isF
}

public void addEvents(ComplexEventChunk<StreamEvent> addingEventChunk, int noOfEvents) {
if (latencyTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerInsert.markIn();
}
addingEventChunk.reset();
add(addingEventChunk);
if (latencyTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerInsert.markOut();
}
if (throughputTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerInsert.eventsIn(noOfEvents);
try {
if (throughputTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerInsert.eventsIn(noOfEvents);
}
if (latencyTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerInsert.markIn();
}
addingEventChunk.reset();
add(addingEventChunk);
} finally {
if (latencyTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerInsert.markOut();
}
}
}

Expand Down Expand Up @@ -335,18 +338,21 @@ protected void onDeleteError(ComplexEventChunk<StateEvent> deletingEventChunk, C

public void deleteEvents(ComplexEventChunk<StateEvent> deletingEventChunk, CompiledCondition compiledCondition,
int noOfEvents) {
if (latencyTrackerDelete != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerDelete.markIn();
}
delete(deletingEventChunk, compiledCondition);
if (latencyTrackerDelete != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerDelete.markOut();
}
if (throughputTrackerDelete != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerDelete.eventsIn(noOfEvents);
try {
if (throughputTrackerDelete != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerDelete.eventsIn(noOfEvents);
}
if (latencyTrackerDelete != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerDelete.markIn();
}
delete(deletingEventChunk, compiledCondition);
} finally {
if (latencyTrackerDelete != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerDelete.markOut();
}
}
}

Expand Down Expand Up @@ -418,18 +424,21 @@ protected void onUpdateError(ComplexEventChunk<StateEvent> updatingEventChunk, C
public void updateEvents(ComplexEventChunk<StateEvent> updatingEventChunk,
CompiledCondition compiledCondition,
CompiledUpdateSet compiledUpdateSet, int noOfEvents) {
if (latencyTrackerUpdate != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdate.markIn();
}
update(updatingEventChunk, compiledCondition, compiledUpdateSet);
if (latencyTrackerUpdate != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdate.markOut();
}
if (throughputTrackerUpdate != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerUpdate.eventsIn(noOfEvents);
try {
if (throughputTrackerUpdate != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerUpdate.eventsIn(noOfEvents);
}
if (latencyTrackerUpdate != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdate.markIn();
}
update(updatingEventChunk, compiledCondition, compiledUpdateSet);
} finally {
if (latencyTrackerUpdate != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdate.markOut();
}
}
}

Expand Down Expand Up @@ -506,19 +515,22 @@ public void updateOrAddEvents(ComplexEventChunk<StateEvent> updateOrAddingEventC
CompiledCondition compiledCondition,
CompiledUpdateSet compiledUpdateSet,
AddingStreamEventExtractor addingStreamEventExtractor, int noOfEvents) {
if (latencyTrackerUpdateOrInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdateOrInsert.markIn();
}
updateOrAdd(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet,
addingStreamEventExtractor);
if (latencyTrackerUpdateOrInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdateOrInsert.markOut();
}
if (throughputTrackerUpdateOrInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerUpdateOrInsert.eventsIn(noOfEvents);
try {
if (throughputTrackerUpdateOrInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerUpdateOrInsert.eventsIn(noOfEvents);
}
if (latencyTrackerUpdateOrInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdateOrInsert.markIn();
}
updateOrAdd(updateOrAddingEventChunk, compiledCondition, compiledUpdateSet,
addingStreamEventExtractor);
} finally {
if (latencyTrackerUpdateOrInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerUpdateOrInsert.markOut();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,15 @@ public void add(ComplexEventChunk complexEventChunk) {
if (throughputTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerInsert.eventsIn(numberOfEvents);
}
if (latencyTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerInsert.markIn();
}
// Send to the window windowProcessor
windowProcessor.process(new ComplexEventChunk<>(firstEvent, currentEvent));
} finally {
if (throughputTrackerInsert != null &&
if (latencyTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerInsert.markOut();
}
Expand All @@ -261,11 +264,13 @@ public StreamEvent find(StateEvent matchingEvent, CompiledCondition compiledCond
try {
if (throughputTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
throughputTrackerFind.eventIn();
}
if (latencyTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerFind.markIn();
}
return ((FindableProcessor) this.internalWindowProcessor).find(matchingEvent, compiledCondition);
} finally {
if (throughputTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
if (latencyTrackerFind != null && Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerFind.markOut();
}
}
Expand Down Expand Up @@ -330,7 +335,7 @@ private class StreamPublishProcessor implements Processor {
}

public void process(ComplexEventChunk complexEventChunk) {
if (throughputTrackerInsert != null &&
if (latencyTrackerInsert != null &&
Level.BASIC.compareTo(siddhiAppContext.getRootMetricsLevel()) <= 0) {
latencyTrackerInsert.markOut();
}
Expand Down

0 comments on commit ff12f2f

Please sign in to comment.