Skip to content

Commit

Permalink
QueryPerformanceRecorder: Changes for CorePlus Integration (#4862)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored Nov 21, 2023
1 parent 18c6e75 commit f679f0c
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ public class PerformanceEntry extends BasePerformanceEntry implements TableListe
private final AuthContext authContext;
private final String updateGraphName;

private long intervalInvocationCount;
private long invocationCount;

private long intervalAdded;
private long intervalRemoved;
private long intervalModified;
private long intervalShifted;
private long rowsAdded;
private long rowsRemoved;
private long rowsModified;
private long rowsShifted;

private long maxTotalMemory;
private long minFreeMemory;
Expand Down Expand Up @@ -67,19 +67,19 @@ public final void onUpdateStart() {

public final void onUpdateStart(final RowSet added, final RowSet removed, final RowSet modified,
final RowSetShiftData shifted) {
intervalAdded += added.size();
intervalRemoved += removed.size();
intervalModified += modified.size();
intervalShifted += shifted.getEffectiveSize();
rowsAdded += added.size();
rowsRemoved += removed.size();
rowsModified += modified.size();
rowsShifted += shifted.getEffectiveSize();

onUpdateStart();
}

public final void onUpdateStart(long added, long removed, long modified, long shifted) {
intervalAdded += added;
intervalRemoved += removed;
intervalModified += modified;
intervalShifted += shifted;
rowsAdded += added;
rowsRemoved += removed;
rowsModified += modified;
rowsShifted += shifted;

onUpdateStart();
}
Expand All @@ -91,17 +91,17 @@ public final void onUpdateEnd() {
minFreeMemory = Math.min(minFreeMemory, Math.min(startSample.freeMemory, endSample.freeMemory));
collections += endSample.totalCollections - startSample.totalCollections;
collectionTimeMs += endSample.totalCollectionTimeMs - startSample.totalCollectionTimeMs;
++intervalInvocationCount;
++invocationCount;
}

void reset() {
baseEntryReset();
intervalInvocationCount = 0;
invocationCount = 0;

intervalAdded = 0;
intervalRemoved = 0;
intervalModified = 0;
intervalShifted = 0;
rowsAdded = 0;
rowsRemoved = 0;
rowsModified = 0;
rowsShifted = 0;

maxTotalMemory = 0;
minFreeMemory = Long.MAX_VALUE;
Expand All @@ -123,16 +123,16 @@ public LogOutput append(@NotNull final LogOutput logOutput) {
.append(", description='").append(description).append('\'')
.append(", callerLine='").append(callerLine).append('\'')
.append(", authContext=").append(authContext)
.append(", intervalUsageNanos=").append(getUsageNanos())
.append(", intervalCpuNanos=").append(getCpuNanos())
.append(", intervalUserCpuNanos=").append(getUserCpuNanos())
.append(", intervalInvocationCount=").append(intervalInvocationCount)
.append(", intervalAdded=").append(intervalAdded)
.append(", intervalRemoved=").append(intervalRemoved)
.append(", intervalModified=").append(intervalModified)
.append(", intervalShifted=").append(intervalShifted)
.append(", intervalAllocatedBytes=").append(getAllocatedBytes())
.append(", intervalPoolAllocatedBytes=").append(getPoolAllocatedBytes())
.append(", usageNanos=").append(getUsageNanos())
.append(", cpuNanos=").append(getCpuNanos())
.append(", userCpuNanos=").append(getUserCpuNanos())
.append(", invocationCount=").append(invocationCount)
.append(", rowsAdded=").append(rowsAdded)
.append(", rowsRemoved=").append(rowsRemoved)
.append(", rowsModified=").append(rowsModified)
.append(", rowsShifted=").append(rowsShifted)
.append(", allocatedBytes=").append(getAllocatedBytes())
.append(", poolAllocatedBytes=").append(getPoolAllocatedBytes())
.append(", maxTotalMemory=").append(maxTotalMemory)
.append(", minFreeMemory=").append(minFreeMemory)
.append(", collections=").append(collections)
Expand Down Expand Up @@ -175,20 +175,20 @@ public String getUpdateGraphName() {
return updateGraphName;
}

public long getIntervalAdded() {
return intervalAdded;
public long getRowsAdded() {
return rowsAdded;
}

public long getIntervalRemoved() {
return intervalRemoved;
public long getRowsRemoved() {
return rowsRemoved;
}

public long getIntervalModified() {
return intervalModified;
public long getRowsModified() {
return rowsModified;
}

public long getIntervalShifted() {
return intervalShifted;
public long getRowsShifted() {
return rowsShifted;
}

public long getMinFreeMemory() {
Expand All @@ -207,8 +207,8 @@ public long getCollectionTimeNanos() {
return DateTimeUtils.millisToNanos(collectionTimeMs);
}

public long getIntervalInvocationCount() {
return intervalInvocationCount;
public long getInvocationCount() {
return invocationCount;
}

/**
Expand All @@ -217,8 +217,7 @@ public long getIntervalInvocationCount() {
* @return if this nugget is significant enough to be logged, otherwise it is aggregated into the small update entry
*/
boolean shouldLogEntryInterval() {
return intervalInvocationCount > 0 &&
UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getUsageNanos());
return invocationCount > 0 && UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getUsageNanos());
}

public void accumulate(PerformanceEntry entry) {
Expand All @@ -231,12 +230,12 @@ public void accumulate(PerformanceEntry entry) {

collections += entry.getCollections();
collectionTimeMs += entry.collectionTimeMs;
intervalInvocationCount += entry.getIntervalInvocationCount();
invocationCount += entry.getInvocationCount();

intervalAdded += entry.getIntervalAdded();
intervalRemoved += entry.getIntervalRemoved();
intervalModified += entry.getIntervalModified();
intervalShifted += entry.getIntervalShifted();
rowsAdded += entry.getRowsAdded();
rowsRemoved += entry.getRowsRemoved();
rowsModified += entry.getRowsModified();
rowsShifted += entry.getRowsShifted();

super.accumulate(entry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package io.deephaven.engine.table.impl.perf;

import io.deephaven.auth.AuthContext;
import io.deephaven.base.clock.SystemClock;
import io.deephaven.base.clock.Clock;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.context.ExecutionContext;
Expand Down Expand Up @@ -50,8 +50,7 @@ public interface Factory {
* @param evaluationNumber A unique identifier for the query evaluation that triggered this nugget creation
* @param description The operation description
* @param sessionId The gRPC client session-id if applicable
* @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget
* should be logged.
* @param onCloseCallback A callback that is invoked when the nugget is closed.
* @return A new nugget
*/
default QueryPerformanceNugget createForQuery(
Expand All @@ -60,7 +59,7 @@ default QueryPerformanceNugget createForQuery(
@Nullable final String sessionId,
@NotNull final Consumer<QueryPerformanceNugget> onCloseCallback) {
return new QueryPerformanceNugget(evaluationNumber, NULL_LONG, NULL_INT, NULL_INT, NULL_INT, description,
sessionId, false, NULL_LONG, onCloseCallback);
sessionId, false, false, NULL_LONG, onCloseCallback);
}

/**
Expand All @@ -69,8 +68,7 @@ default QueryPerformanceNugget createForQuery(
* @param parentQuery The parent query nugget
* @param evaluationNumber A unique identifier for the sub-query evaluation that triggered this nugget creation
* @param description The operation description
* @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget
* should be logged.
* @param onCloseCallback A callback that is invoked when the nugget is closed.
* @return A new nugget
*/
default QueryPerformanceNugget createForSubQuery(
Expand All @@ -80,7 +78,7 @@ default QueryPerformanceNugget createForSubQuery(
@NotNull final Consumer<QueryPerformanceNugget> onCloseCallback) {
Assert.eqTrue(parentQuery.isQueryLevel(), "parentQuery.isQueryLevel()");
return new QueryPerformanceNugget(evaluationNumber, parentQuery.getEvaluationNumber(), NULL_INT, NULL_INT,
NULL_INT, description, parentQuery.getSessionId(), false, NULL_LONG, onCloseCallback);
NULL_INT, description, parentQuery.getSessionId(), false, false, NULL_LONG, onCloseCallback);
}

/**
Expand All @@ -89,8 +87,7 @@ default QueryPerformanceNugget createForSubQuery(
* @param parentQueryOrOperation The parent query / operation nugget
* @param operationNumber A query-unique identifier for the operation
* @param description The operation description
* @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget
* should be logged.
* @param onCloseCallback A callback that is invoked when the nugget is closed.
* @return A new nugget
*/
default QueryPerformanceNugget createForOperation(
Expand All @@ -115,17 +112,52 @@ default QueryPerformanceNugget createForOperation(
description,
parentQueryOrOperation.getSessionId(),
true, // operations are always user
false, // operations are not compilation
inputSize,
onCloseCallback);
}

/**
* Factory method for compilation-level nuggets.
*
* @param parentQueryOrOperation The parent query / operation nugget
* @param operationNumber A query-unique identifier for the operation
* @param description The compilation description
* @param onCloseCallback A callback that is invoked when the nugget is closed.
* @return A new nugget
*/
default QueryPerformanceNugget createForCompilation(
@NotNull final QueryPerformanceNugget parentQueryOrOperation,
final int operationNumber,
final String description,
@NotNull final Consumer<QueryPerformanceNugget> onCloseCallback) {
int depth = parentQueryOrOperation.getDepth();
if (depth == NULL_INT) {
depth = 0;
} else {
++depth;
}

return new QueryPerformanceNugget(
parentQueryOrOperation.getEvaluationNumber(),
parentQueryOrOperation.getParentEvaluationNumber(),
operationNumber,
parentQueryOrOperation.getOperationNumber(),
depth,
description,
parentQueryOrOperation.getSessionId(),
true, // compilations are always user
true, // compilations are .. compilations
0, // compilations have no input size
onCloseCallback);
}

/**
* Factory method for catch-all nuggets.
*
* @param parentQuery The parent query nugget
* @param operationNumber A query-unique identifier for the operation
* @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget
* should be logged.
* @param onCloseCallback A callback that is invoked when the nugget is closed.
* @return A new nugget
*/
default QueryPerformanceNugget createForCatchAll(
Expand All @@ -142,6 +174,7 @@ default QueryPerformanceNugget createForCatchAll(
QueryPerformanceRecorder.UNINSTRUMENTED_CODE_DESCRIPTION,
parentQuery.getSessionId(),
false, // catch all is not user
false, // catch all is not compilation
NULL_LONG,
onCloseCallback); // catch all has no input size
}
Expand All @@ -157,6 +190,7 @@ default QueryPerformanceNugget createForCatchAll(
private final String description;
private final String sessionId;
private final boolean isUser;
private final boolean isCompilation;
private final long inputSize;
private final Consumer<QueryPerformanceNugget> onCloseCallback;
private final AuthContext authContext;
Expand Down Expand Up @@ -185,6 +219,7 @@ default QueryPerformanceNugget createForCatchAll(
* @param depth Depth in the evaluation chain for the respective operation
* @param description The operation description
* @param isUser Whether this is a "user" nugget or one created by the system
* @param isCompilation Whether this is a compilation nugget
* @param inputSize The size of the input data
* @param onCloseCallback A callback that is invoked when the nugget is closed. It returns whether the nugget should
* be logged.
Expand All @@ -198,6 +233,7 @@ protected QueryPerformanceNugget(
@NotNull final String description,
@Nullable final String sessionId,
final boolean isUser,
final boolean isCompilation,
final long inputSize,
@NotNull final Consumer<QueryPerformanceNugget> onCloseCallback) {
this.evaluationNumber = evaluationNumber;
Expand All @@ -213,6 +249,7 @@ protected QueryPerformanceNugget(
}
this.sessionId = sessionId;
this.isUser = isUser;
this.isCompilation = isCompilation;
this.inputSize = inputSize;
this.onCloseCallback = onCloseCallback;

Expand All @@ -237,6 +274,7 @@ private QueryPerformanceNugget() {
description = null;
sessionId = null;
isUser = false;
isCompilation = false;
inputSize = NULL_LONG;
onCloseCallback = null;
authContext = null;
Expand All @@ -262,7 +300,7 @@ public synchronized void onBaseEntryStart() {
throw new IllegalStateException("Nugget was already started");
}
if (startClockEpochNanos == NULL_LONG) {
startClockEpochNanos = SystemClock.systemUTC().currentTimeNanos();
startClockEpochNanos = Clock.system().currentTimeNanos();
}
startMemorySample = new RuntimeMemory.Sample();
endMemorySample = new RuntimeMemory.Sample();
Expand Down Expand Up @@ -316,7 +354,7 @@ private void close(final QueryState closingState) {
}

onBaseEntryEnd();
endClockEpochNanos = SystemClock.systemUTC().currentTimeNanos();
endClockEpochNanos = Clock.system().currentTimeNanos();

final RuntimeMemory runtimeMemory = RuntimeMemory.getInstance();
runtimeMemory.read(endMemorySample);
Expand Down Expand Up @@ -373,6 +411,10 @@ public boolean isUser() {
return isUser;
}

public boolean isCompilation() {
return isCompilation;
}

public boolean isQueryLevel() {
return operationNumber == NULL_INT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,16 @@ default QueryPerformanceNugget getNugget(@NotNull String name) {
*/
QueryPerformanceNugget getNugget(@NotNull String name, long inputSize);

/**
* Create a nugget at the top of the user stack for a compilation task. May return a
* {@link QueryPerformanceNugget#DUMMY_NUGGET} if no recorder is installed.
*
* @param name the nugget name
* @return A new QueryPerformanceNugget to encapsulate the compilation. {@link QueryPerformanceNugget#close()} must
* be called on the nugget.
*/
QueryPerformanceNugget getCompilationNugget(@NotNull String name);

/**
* This is the nugget enclosing the current operation. It may belong to the dummy recorder, or a real one.
*
Expand Down Expand Up @@ -158,6 +168,16 @@ static QueryPerformanceRecorder newSubQuery(
return new QueryPerformanceRecorderImpl(description, null, parent, nuggetFactory);
}

/**
* Record a single-threaded operation's allocations as "pool" allocated memory attributable to the current thread.
*
* @param operation The operation to record allocation for
* @return The result of the operation.
*/
static <RESULT_TYPE> RESULT_TYPE recordPoolAllocation(@NotNull final Supplier<RESULT_TYPE> operation) {
return QueryPerformanceRecorderState.recordPoolAllocation(operation);
}

/**
* Return the query's current state
*
Expand Down
Loading

0 comments on commit f679f0c

Please sign in to comment.