Skip to content

Commit

Permalink
Ryan's feedback and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Sep 17, 2024
1 parent 28b25d1 commit 8876208
Show file tree
Hide file tree
Showing 57 changed files with 913 additions and 734 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ public final boolean get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
return data[offset + index] == QueryConstants.NULL_BOOLEAN;
// region isNull
public final boolean isNull(int index) {
return false;
}
// endregion isNull

@Override
public BooleanChunk<ATTR> slice(int offset, int capacity) {
Expand Down
4 changes: 3 additions & 1 deletion engine/chunk/src/main/java/io/deephaven/chunk/ByteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ public final byte get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_BYTE;
}
// endregion isNull

@Override
public ByteChunk<ATTR> slice(int offset, int capacity) {
Expand Down
4 changes: 3 additions & 1 deletion engine/chunk/src/main/java/io/deephaven/chunk/CharChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,11 @@ public final char get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_CHAR;
}
// endregion isNull

@Override
public CharChunk<ATTR> slice(int offset, int capacity) {
Expand Down
6 changes: 0 additions & 6 deletions engine/chunk/src/main/java/io/deephaven/chunk/Chunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,6 @@ default void copyToBuffer(int srcOffset, @NotNull Buffer destBuffer, int destOff
*/
int size();

/**
* @return whether The value offset is null
* @param index The index to check
*/
boolean isNullAt(int index);

/**
* @return The underlying chunk type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public final double get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_DOUBLE;
}
// endregion isNull

@Override
public DoubleChunk<ATTR> slice(int offset, int capacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public final float get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_FLOAT;
}
// endregion isNull

@Override
public FloatChunk<ATTR> slice(int offset, int capacity) {
Expand Down
4 changes: 3 additions & 1 deletion engine/chunk/src/main/java/io/deephaven/chunk/IntChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public final int get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_INT;
}
// endregion isNull

@Override
public IntChunk<ATTR> slice(int offset, int capacity) {
Expand Down
4 changes: 3 additions & 1 deletion engine/chunk/src/main/java/io/deephaven/chunk/LongChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public final long get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_LONG;
}
// endregion isNull

@Override
public LongChunk<ATTR> slice(int offset, int capacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public final T get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == null;
}
// endregion isNull

@Override
public ObjectChunk<T, ATTR> slice(int offset, int capacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,11 @@ public final short get(int index) {
return data[offset + index];
}

public final boolean isNullAt(int index) {
// region isNull
public final boolean isNull(int index) {
return data[offset + index] == QueryConstants.NULL_SHORT;
}
// endregion isNull

@Override
public ShortChunk<ATTR> slice(int offset, int capacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2613,7 +2613,6 @@ private Table snapshotIncrementalInternal(final Table base, final boolean doInit
new ListenerRecorder("snapshotIncremental (triggerTable)", this, resultTable);
addUpdateListener(triggerListenerRecorder);

dropColumns(getColumnSourceMap().keySet());
final SnapshotIncrementalListener listener =
new SnapshotIncrementalListener(this, resultTable, resultColumns,
baseListenerRecorder, triggerListenerRecorder, baseTable, triggerColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public static ChunkType maybeConvertToWritablePrimitiveChunkType(@NotNull final
}
if (dataType == Instant.class) {
// Note that storing ZonedDateTime as a primitive is lossy on the time zone.
// TODO (https://github.com/deephaven/deephaven-core/issues/5241): Inconsistent handling of ZonedDateTime
return ChunkType.Long;
}
return ChunkType.fromElementType(dataType);
Expand All @@ -284,6 +285,7 @@ public static Class<?> maybeConvertToPrimitiveDataType(@NotNull final Class<?> d
}
if (dataType == Instant.class || dataType == ZonedDateTime.class) {
// Note: not all ZonedDateTime sources are convertible to long, so this doesn't match column source behavior
// TODO (https://github.com/deephaven/deephaven-core/issues/5241): Inconsistent handling of ZonedDateTime
return long.class;
}
return dataType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.impl.ExternalizableRowSetUtils;
import io.deephaven.engine.table.impl.util.BarrageMessage;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
import io.deephaven.extensions.barrage.chunk.ChunkWriter;
import io.deephaven.extensions.barrage.chunk.SingleElementListHeaderWriter;
import io.deephaven.extensions.barrage.util.ExposedByteArrayOutputStream;
Expand Down Expand Up @@ -75,7 +74,7 @@ public class BarrageMessageWriterImpl implements BarrageMessageWriter {
public interface RecordBatchMessageView extends MessageView {
boolean isViewport();

ChunkReader.Options options();
BarrageOptions options();

RowSet addRowOffsets();

Expand Down Expand Up @@ -354,7 +353,7 @@ public boolean isViewport() {
}

@Override
public ChunkReader.Options options() {
public BarrageOptions options() {
return options;
}

Expand Down Expand Up @@ -533,7 +532,7 @@ public boolean isViewport() {
}

@Override
public ChunkReader.Options options() {
public BarrageOptions options() {
return options;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import io.deephaven.util.QueryConstants;

public interface BarrageOptions {
/**
* @return whether we encode the validity buffer to express null values or {@link QueryConstants QueryConstants'}
* NULL values.
*/
boolean useDeephavenNulls();

/**
* @return the conversion mode to use for object columns
*/
ColumnConversionMode columnConversionMode();

/**
* @return the ideal number of records to send per record batch
*/
int batchSize();

/**
* @return the maximum number of bytes that should be sent in a single message.
*/
int maxMessageSize();

/**
* Some Flight clients cannot handle modifications that have irregular column counts. These clients request that the
* server wrap all columns in a list to enable each column having a variable length.
*
* @return true if the columns should be wrapped in a list
*/
default boolean columnsAsList() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
import com.google.flatbuffers.FlatBufferBuilder;
import io.deephaven.annotations.BuildableStyle;
import io.deephaven.barrage.flatbuf.BarrageSnapshotRequest;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

@Immutable
@BuildableStyle
public abstract class BarrageSnapshotOptions implements ChunkReader.Options {
public abstract class BarrageSnapshotOptions implements BarrageOptions {
public static Builder builder() {
return ImmutableBarrageSnapshotOptions.builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
import com.google.flatbuffers.FlatBufferBuilder;
import io.deephaven.annotations.BuildableStyle;
import io.deephaven.barrage.flatbuf.BarrageSubscriptionRequest;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

@Immutable
@BuildableStyle
public abstract class BarrageSubscriptionOptions implements ChunkReader.Options {
public abstract class BarrageSubscriptionOptions implements BarrageOptions {

public static Builder builder() {
return ImmutableBarrageSubscriptionOptions.builder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.extensions.barrage;

import org.apache.arrow.flatbuf.Field;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Describes type info used by factory implementations when creating a ChunkReader.
*/
public class BarrageTypeInfo {
/**
* Factory method to create a TypeInfo instance.
*
* @param type the Java type to be read into the chunk
* @param componentType the Java type of nested components
* @param arrowField the Arrow type to be read into the chunk
* @return a TypeInfo instance
*/
public static BarrageTypeInfo make(
@NotNull final Class<?> type,
@Nullable final Class<?> componentType,
@NotNull final Field arrowField) {
return new BarrageTypeInfo(type, componentType, arrowField);
}

private final Class<?> type;
@Nullable
private final Class<?> componentType;
private final Field arrowField;

public BarrageTypeInfo(
@NotNull final Class<?> type,
@Nullable final Class<?> componentType,
@NotNull final Field arrowField) {
this.type = type;
this.componentType = componentType;
this.arrowField = arrowField;
}

public Class<?> type() {
return type;
}

@Nullable
public Class<?> componentType() {
return componentType;
}

public Field arrowField() {
return arrowField;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.extensions.barrage.chunk.ChunkReader;
import io.deephaven.extensions.barrage.chunk.ChunkWriter;
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -41,7 +40,7 @@ public ChunkWriter.Context<SourceChunkType>[] chunks() {
return contexts;
}

public ChunkWriter.DrainableColumn empty(@NotNull final ChunkReader.Options options) throws IOException {
public ChunkWriter.DrainableColumn empty(@NotNull final BarrageOptions options) throws IOException {
return writer.getEmptyInputStream(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import java.util.function.Function;
import java.util.function.IntFunction;

public abstract class BaseChunkReader<ReadChunkType extends WritableChunk<Values>>
implements ChunkReader<ReadChunkType> {
public abstract class BaseChunkReader<READ_CHUNK_TYPE extends WritableChunk<Values>>
implements ChunkReader<READ_CHUNK_TYPE> {

protected static <T extends WritableChunk<Values>> T castOrCreateChunk(
final WritableChunk<Values> outChunk,
Expand Down
Loading

0 comments on commit 8876208

Please sign in to comment.