Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Persisted aggregation initialization #1840

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ public synchronized void initialiseExecutors() {
if (isPersistedAggregation) {
for (int i = incrementalDurations.size() - 1; i > 0; i--) {
if (lastData != null && !(IncrementalTimeConverterUtil.isAggregationDataCompleteAgainstTime(lastData,
incrementalDurations.get(i), timeZone) && isAggregationDataComplete(incrementalDurations.get(i),
incrementalDurations.get(i - 1), endOFLatestEventTimestamp, lastData))) {
incrementalDurations.get(i), timeZone) || isAggregationDataComplete(incrementalDurations.get(i),
incrementalDurations.get(i - 1), endOFLatestEventTimestamp))) {
recreateState(lastData, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
} else if (lastData == null && !isAggregationDataComplete(incrementalDurations.get(i),
incrementalDurations.get(i - 1), endOFLatestEventTimestamp, null)) {
incrementalDurations.get(i - 1), endOFLatestEventTimestamp)) {
recreateState(null, incrementalDurations.get(i),
aggregationTables.get(incrementalDurations.get(i - 1)), i == 1);
}
Expand All @@ -154,6 +154,8 @@ public synchronized void initialiseExecutors() {
events = onDemandQueryRuntime.execute();
if (events != null) {
lastData = (Long) events[events.length - 1].getData(0);
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
.getNextEmitTime(lastData, incrementalDurations.get(i - 1), timeZone);
} else {
lastData = null;
}
Expand Down Expand Up @@ -228,6 +230,10 @@ private boolean isStatePresentForAggregationDuration(TimePeriod.Duration recreat

private void recreateState(Long lastData, TimePeriod.Duration recreateForDuration,
Table recreateFromTable, boolean isBeforeRoot) {
if (log.isDebugEnabled()) {
log.debug("Start initialising state for aggregation: " + aggregationDefinition.getId() +
" Duration: " + recreateForDuration);
}
OnDemandQuery onDemandQuery;
if (lastData != null) {
endOFLatestEventTimestamp = IncrementalTimeConverterUtil
Expand Down Expand Up @@ -268,6 +274,10 @@ private void recreateState(Long lastData, TimePeriod.Duration recreateForDuratio

}
}
if (log.isDebugEnabled()) {
log.debug("Completed initialising state for aggregation: " + aggregationDefinition.getId() +
" Duration " + recreateForDuration);
}
}

private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity, Long endOFLatestEventTimestamp,
Expand Down Expand Up @@ -319,22 +329,20 @@ private OnDemandQuery getOnDemandQuery(Table table, boolean isLargestGranularity
}

private boolean isAggregationDataComplete(TimePeriod.Duration parentDuration, TimePeriod.Duration childDuration,
Long endOFLatestEventTimestamp, Long lastData) {
Long endOFLatestEventTimestamp) {
OnDemandQuery onDemandQuery = getOnDemandQuery(aggregationTables.get(childDuration), true,
endOFLatestEventTimestamp, true, OrderByAttribute.Order.ASC);
onDemandQuery.setType(OnDemandQuery.OnDemandQueryType.FIND);
OnDemandQueryRuntime onDemandQueryRuntime = OnDemandQueryParser.parse(onDemandQuery, null, siddhiAppContext,
tableMap, windowMap, aggregationMap);
Event[] events = onDemandQueryRuntime.execute();
if (lastData == null && events == null) {
if (events == null) {
return true;
} else if (lastData == null
&& (Long) events[events.length - 1].getData(0) >= IncrementalTimeConverterUtil.getStartTimeOfAggregates(
} else if ((Long) events[events.length - 1].getData(0) >= IncrementalTimeConverterUtil.getStartTimeOfAggregates(
System.currentTimeMillis(), parentDuration, timeZone)) {
return true;

}

return events == null || events.length == 0;
return false;
}
}