Skip to content

Commit

Permalink
Implement Open Addressing to support AsOfJoin (deephaven#2396)
Browse files Browse the repository at this point in the history
* implemented static and incremental OA hashing for asOf joings
  • Loading branch information
lbooker42 committed Jun 1, 2022
1 parent f326929 commit fe9c382
Show file tree
Hide file tree
Showing 36 changed files with 6,088 additions and 205 deletions.
8 changes: 5 additions & 3 deletions engine/table/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,11 @@ spotless {
'**/incopenagg/gen/*.java',
'**/staticagg/gen/*.java',
'**/staticopenagg/gen/*.java',
'**/naturaljoin/staticopen/gen/*.java',
'**/naturaljoin/incopen/gen/*.java',
'**/naturaljoin/rightincopen/gen/*.java',
'**/naturaljoin/typed/staticopen/gen/*.java',
'**/naturaljoin/typed/incopen/gen/*.java',
'**/naturaljoin/typed/rightincopen/gen/*.java',
'**/asofjoin/typed/rightincopen/gen/*.java',
'**/asofjoin/typed/staticopen/gen/*.java',
'src/main/java/io/deephaven/engine/table/impl/SymbolTableCombiner.java',
'src/main/java/io/deephaven/libs/GroovyStaticImports.java',
'src/test/java/**/*Sample.java'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,36 @@

import io.deephaven.base.Pair;
import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.*;
import io.deephaven.chunk.attributes.Any;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.*;
import io.deephaven.engine.rowset.*;
import io.deephaven.chunk.sized.SizedBooleanChunk;
import io.deephaven.chunk.sized.SizedChunk;
import io.deephaven.chunk.sized.SizedLongChunk;
import io.deephaven.chunk.util.hashing.ChunkEquals;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.asofjoin.RightIncrementalAsOfJoinStateManagerTypedBase;
import io.deephaven.engine.table.impl.asofjoin.RightIncrementalHashedAsOfJoinStateManager;
import io.deephaven.engine.table.impl.asofjoin.StaticAsOfJoinStateManagerTypedBase;
import io.deephaven.engine.table.impl.asofjoin.StaticHashedAsOfJoinStateManager;
import io.deephaven.engine.table.impl.by.typed.TypedHasherFactory;
import io.deephaven.engine.table.impl.join.BucketedChunkedAjMergedListener;
import io.deephaven.engine.table.impl.join.JoinListenerRecorder;
import io.deephaven.engine.table.impl.join.ZeroKeyChunkedAjMergedListener;
import io.deephaven.engine.table.impl.sort.LongSortKernel;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.chunk.*;
import io.deephaven.chunk.sized.SizedBooleanChunk;
import io.deephaven.chunk.sized.SizedChunk;
import io.deephaven.chunk.sized.SizedLongChunk;
import io.deephaven.engine.table.impl.ssa.ChunkSsaStamp;
import io.deephaven.engine.table.impl.ssa.SegmentedSortedArray;
import io.deephaven.engine.table.impl.ssa.SsaSsaStamp;
import io.deephaven.engine.table.impl.util.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.SingleValueRowRedirection;
import io.deephaven.engine.table.impl.util.SizedSafeCloseable;
import io.deephaven.engine.table.impl.util.WritableRowRedirection;
import io.deephaven.engine.table.impl.util.compact.CompactKernel;
import io.deephaven.engine.table.impl.util.compact.LongCompactKernel;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.SafeCloseableList;
import org.apache.commons.lang3.mutable.MutableInt;
Expand All @@ -34,6 +42,10 @@
import java.util.function.Supplier;

public class AsOfJoinHelper {
static boolean USE_TYPED_STATE_MANAGER =
Configuration.getInstance().getBooleanWithDefault(
"AsOfJoinHelper.useTypedStateManager",
true);

private AsOfJoinHelper() {} // static use only

Expand Down Expand Up @@ -93,10 +105,12 @@ static Table asOfJoin(JoinControl control, QueryTable leftTable, QueryTable righ
if (leftTable.isRefreshing()) {
return bothIncrementalAj(control, leftTable, rightTable, columnsToMatch, columnsToAdd, order,
disallowExactMatch, stampPair,
leftSources, rightSources, leftStampSource, rightStampSource, rowRedirection);
originalLeftSources, leftSources, rightSources, leftStampSource, originalRightStampSource,
rightStampSource, rowRedirection);
}
return rightTickingLeftStaticAj(control, leftTable, rightTable, columnsToMatch, columnsToAdd, order,
disallowExactMatch, stampPair, leftSources, rightSources, leftStampSource, rightStampSource,
disallowExactMatch, stampPair, originalLeftSources, leftSources, rightSources, leftStampSource,
originalRightStampSource, rightStampSource,
rowRedirection);
} else {
return rightStaticAj(control, leftTable, rightTable, columnsToMatch, columnsToAdd, order,
Expand All @@ -108,7 +122,7 @@ static Table asOfJoin(JoinControl control, QueryTable leftTable, QueryTable righ
@NotNull
private static Table rightStaticAj(JoinControl control,
QueryTable leftTable,
Table rightTable,
QueryTable rightTable,
MatchPair[] columnsToMatch,
MatchPair[] columnsToAdd,
SortingOrder order,
Expand All @@ -121,6 +135,7 @@ private static Table rightStaticAj(JoinControl control,
ColumnSource<?> originalRightStampSource,
ColumnSource<?> rightStampSource,
WritableRowRedirection rowRedirection) {

final LongArraySource slots = new LongArraySource();
final int slotCount;

Expand Down Expand Up @@ -157,8 +172,23 @@ private static Table rightStaticAj(JoinControl control,
leftGrouping = rightGrouping = null;
}

final StaticChunkedAsOfJoinStateManager asOfJoinStateManager =
new StaticChunkedAsOfJoinStateManager(leftSources, size, originalLeftSources);
final StaticHashedAsOfJoinStateManager asOfJoinStateManager;
if (buildLeft) {
asOfJoinStateManager = USE_TYPED_STATE_MANAGER
? TypedHasherFactory.make(StaticAsOfJoinStateManagerTypedBase.class,
leftSources, originalLeftSources, size,
control.getMaximumLoadFactor(), control.getTargetLoadFactor())
: new StaticChunkedAsOfJoinStateManager(leftSources,
size, originalLeftSources);
} else {
asOfJoinStateManager = USE_TYPED_STATE_MANAGER
? TypedHasherFactory.make(StaticAsOfJoinStateManagerTypedBase.class,
leftSources, originalLeftSources, size,
control.getMaximumLoadFactor(), control.getTargetLoadFactor())
: new StaticChunkedAsOfJoinStateManager(leftSources,
size, originalLeftSources);
}


final Pair<ArrayBackedColumnSource<?>, ObjectArraySource<RowSet>> leftGroupedSources;
final int leftGroupingSize;
Expand Down Expand Up @@ -228,6 +258,8 @@ private static Table rightStaticAj(JoinControl control,
arrayValuesCache = null;
if (rightGroupedSources != null) {
asOfJoinStateManager.convertRightGrouping(slots, slotCount, rightGroupedSources.getSecond());
} else {
asOfJoinStateManager.convertRightBuildersToIndex(slots, slotCount);
}
}

Expand Down Expand Up @@ -453,7 +485,7 @@ private static void processLeftSlotWithRightCache(AsOfStampContext stampContext,
* If the asOfJoinStateManager is null, it means we are passing in the leftRowSet. If the leftRowSet is null; we are
* passing in the asOfJoinStateManager and should obtain the leftRowSet from the state manager if necessary.
*/
private static void getCachedLeftStampsAndKeys(RightIncrementalChunkedAsOfJoinStateManager asOfJoinStateManager,
private static void getCachedLeftStampsAndKeys(RightIncrementalHashedAsOfJoinStateManager asOfJoinStateManager,
RowSet leftRowSet,
ColumnSource<?> leftStampSource,
SizedSafeCloseable<ChunkSource.FillContext> fillContext,
Expand Down Expand Up @@ -517,9 +549,11 @@ private static Table rightTickingLeftStaticAj(JoinControl control,
SortingOrder order,
boolean disallowExactMatch,
MatchPair stampPair,
ColumnSource<?>[] originalLeftSources,
ColumnSource<?>[] leftSources,
ColumnSource<?>[] rightSources,
ColumnSource<?> leftStampSource,
ColumnSource<?> originalRightStampSource,
ColumnSource<?> rightStampSource,
WritableRowRedirection rowRedirection) {
if (leftTable.isRefreshing()) {
Expand All @@ -534,8 +568,12 @@ private static Table rightTickingLeftStaticAj(JoinControl control,
final ChunkSsaStamp chunkSsaStamp = ChunkSsaStamp.make(stampChunkType, reverse);

final int tableSize = control.initialBuildSize();
final RightIncrementalChunkedAsOfJoinStateManager asOfJoinStateManager =
new RightIncrementalChunkedAsOfJoinStateManager(leftSources, tableSize);

final RightIncrementalHashedAsOfJoinStateManager asOfJoinStateManager = USE_TYPED_STATE_MANAGER
? TypedHasherFactory.make(RightIncrementalAsOfJoinStateManagerTypedBase.class,
leftSources, originalLeftSources, tableSize,
control.getMaximumLoadFactor(), control.getTargetLoadFactor())
: new RightIncrementalChunkedAsOfJoinStateManager(leftSources, tableSize, originalLeftSources);

final LongArraySource slots = new LongArraySource();
final int slotCount = asOfJoinStateManager.buildFromLeftSide(leftTable.getRowSet(), leftSources, slots);
Expand Down Expand Up @@ -900,9 +938,11 @@ private static Table bothIncrementalAj(JoinControl control,
SortingOrder order,
boolean disallowExactMatch,
MatchPair stampPair,
ColumnSource<?>[] originalLeftSources,
ColumnSource<?>[] leftSources,
ColumnSource<?>[] rightSources,
ColumnSource<?> leftStampSource,
ColumnSource<?> originalRightStampSource,
ColumnSource<?> rightStampSource,
WritableRowRedirection rowRedirection) {
final boolean reverse = order == SortingOrder.Descending;
Expand All @@ -912,8 +952,13 @@ private static Table bothIncrementalAj(JoinControl control,
SegmentedSortedArray.makeFactory(stampChunkType, reverse, control.rightSsaNodeSize());
final SsaSsaStamp ssaSsaStamp = SsaSsaStamp.make(stampChunkType, reverse);

final RightIncrementalChunkedAsOfJoinStateManager asOfJoinStateManager =
new RightIncrementalChunkedAsOfJoinStateManager(leftSources, control.initialBuildSize());
final int tableSize = control.initialBuildSize();

final RightIncrementalHashedAsOfJoinStateManager asOfJoinStateManager = USE_TYPED_STATE_MANAGER
? TypedHasherFactory.make(RightIncrementalAsOfJoinStateManagerTypedBase.class,
leftSources, originalLeftSources, tableSize,
control.getMaximumLoadFactor(), control.getTargetLoadFactor())
: new RightIncrementalChunkedAsOfJoinStateManager(leftSources, tableSize, originalLeftSources);

final LongArraySource slots = new LongArraySource();
int slotCount = asOfJoinStateManager.buildFromLeftSide(leftTable.getRowSet(), leftSources, slots);
Expand Down Expand Up @@ -989,11 +1034,11 @@ public SegmentedSortedArray apply(RowSet leftIndex) {
// them into an ssa
final byte state = asOfJoinStateManager.getState(slot);
if ((state
& RightIncrementalChunkedAsOfJoinStateManager.ENTRY_RIGHT_MASK) == RightIncrementalChunkedAsOfJoinStateManager.ENTRY_RIGHT_IS_EMPTY) {
& RightIncrementalHashedAsOfJoinStateManager.ENTRY_RIGHT_MASK) == RightIncrementalHashedAsOfJoinStateManager.ENTRY_RIGHT_IS_EMPTY) {
continue;
}
if ((state
& RightIncrementalChunkedAsOfJoinStateManager.ENTRY_LEFT_MASK) == RightIncrementalChunkedAsOfJoinStateManager.ENTRY_LEFT_IS_EMPTY) {
& RightIncrementalHashedAsOfJoinStateManager.ENTRY_LEFT_MASK) == RightIncrementalHashedAsOfJoinStateManager.ENTRY_LEFT_IS_EMPTY) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.deephaven.engine.table.impl;

import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.sources.LongSparseArraySource;
import io.deephaven.engine.table.impl.util.ContiguousWritableRowRedirection;
import io.deephaven.engine.table.impl.util.LongColumnSourceWritableRowRedirection;
import io.deephaven.engine.table.impl.util.WritableRowRedirection;
import io.deephaven.engine.table.impl.util.WritableRowRedirectionLockFree;

import java.util.Arrays;
import java.util.Objects;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

public abstract class RightIncrementalAsOfJoinStateManager {
public static final long NO_RIGHT_ENTRY_VALUE = RowSequence.NULL_ROW_KEY;

protected final ColumnSource<?>[] keySourcesForErrorMessages;

protected RightIncrementalAsOfJoinStateManager(ColumnSource<?>[] keySourcesForErrorMessages) {
this.keySourcesForErrorMessages = keySourcesForErrorMessages;
}

// produce a pretty key for error messages
protected String extractKeyStringFromSourceTable(long leftKey) {
if (keySourcesForErrorMessages.length == 1) {
return Objects.toString(keySourcesForErrorMessages[0].get(leftKey));
}
return "[" + Arrays.stream(keySourcesForErrorMessages).map(ls -> Objects.toString(ls.get(leftKey))).collect(Collectors.joining(", ")) + "]";
}
}
Loading

0 comments on commit fe9c382

Please sign in to comment.