Skip to content

Commit

Permalink
Parquet: Replicate TransferObject and Update Statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Sep 20, 2023
1 parent bf579e0 commit 1838705
Show file tree
Hide file tree
Showing 19 changed files with 1,187 additions and 489 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;
import io.deephaven.util.annotations.FinalDefault;
import org.apache.parquet.column.statistics.Statistics;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
class MappedSchema {

static MappedSchema create(
final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
final TableDefinition definition,
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/**
* Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.table;

public enum ParquetCacheTags {
DECIMAL_ARGS
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* Contains the necessary information to convert a Deephaven table into a Parquet table. Both the schema translation,
* and the data translation.
*/
class TypeInfos {
public class TypeInfos {
private static final TypeInfo[] TYPE_INFOS = new TypeInfo[] {
IntType.INSTANCE,
LongType.INSTANCE,
Expand Down Expand Up @@ -105,14 +105,14 @@ static Pair<String, String> getCodecAndArgs(
return new ImmutablePair<>(SerializableCodec.class.getName(), null);
}

static PrecisionAndScale getPrecisionAndScale(
@NotNull final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
public static PrecisionAndScale getPrecisionAndScale(
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final String columnName,
@NotNull final RowSet rowSet,
@NotNull Supplier<ColumnSource<BigDecimal>> columnSourceSupplier) {
return (PrecisionAndScale) computedCache
.computeIfAbsent(columnName, unusedColumnName -> new HashMap<>())
.computeIfAbsent(ParquetTableWriter.CacheTags.DECIMAL_ARGS,
.computeIfAbsent(ParquetCacheTags.DECIMAL_ARGS,
uct -> parquetCompatible(computePrecisionAndScale(rowSet, columnSourceSupplier.get())));
}

Expand All @@ -130,7 +130,7 @@ private static PrecisionAndScale parquetCompatible(PrecisionAndScale pas) {
}

static TypeInfo bigDecimalTypeInfo(
final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final ColumnDefinition<?> column,
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap) {
Expand All @@ -157,7 +157,7 @@ public PrimitiveBuilder<PrimitiveType> getBuilder(boolean required, boolean repe
}

static TypeInfo getTypeInfo(
final Map<String, Map<ParquetTableWriter.CacheTags, Object>> computedCache,
final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final ColumnDefinition<?> column,
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import org.apache.parquet.column.statistics.Statistics;
import org.jetbrains.annotations.NotNull;

import java.nio.ByteBuffer;

public class BooleanTransfer implements TransferObject<ByteBuffer> {

private final ColumnSource<?> columnSource;
private final ChunkSource.GetContext context;
private ByteChunk<? extends Values> chunk;
private final ByteBuffer buffer;

public BooleanTransfer(
@NotNull ColumnSource<?> columnSource,
int targetSize) {
this.columnSource = columnSource;
this.buffer = ByteBuffer.allocate(targetSize);
this.context = columnSource.makeGetContext(targetSize);
}

@Override
public void fetchData(@NotNull final RowSequence rs) {
chunk = columnSource.getChunk(context, rs).asByteChunk();
}

@Override
public int transferAllToBuffer() {
return transferOnePageToBuffer();
}

@Override
public int transferOnePageToBuffer() {
if (!hasMoreDataToBuffer()) {
return 0;
}
buffer.clear();
// Assuming that all the fetched data will fit in one page. This is because page count is accurately
// calculated for non variable-width types. Check ParquetTableWriter.getTargetRowsPerPage for more details.
copyAllFromChunkToBuffer();
buffer.flip();
int ret = chunk.size();
chunk = null;
return ret;
}

private void copyAllFromChunkToBuffer() {
for (int chunkIdx = 0; chunkIdx < chunk.size(); ++chunkIdx) {
buffer.put(chunk.get(chunkIdx));
}
}

@Override
public boolean hasMoreDataToBuffer() {
return chunk != null;
}

@Override
public ByteBuffer getBuffer() {
return buffer;
}

@Override
public void close() {
context.close();
}

public <T extends Comparable<T>> void updateStatistics(@NotNull final Statistics<T> stats) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
/*
* ---------------------------------------------------------------------------------------------------------------------
* AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit IntTransfer and regenerate
* ---------------------------------------------------------------------------------------------------------------------
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.QueryConstants;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.jetbrains.annotations.NotNull;

import java.nio.Buffer;
import java.nio.IntBuffer;

public class ByteTransfer implements TransferObject<IntBuffer> {

private final ColumnSource<?> columnSource;
private final ChunkSource.GetContext context;
private final IntBuffer buffer;
private ByteChunk<? extends Values> chunk;
private byte minValue = QueryConstants.NULL_BYTE;
private byte maxValue = QueryConstants.NULL_BYTE;

public ByteTransfer(
@NotNull final ColumnSource<?> columnSource,
final int targetSize) {
this.columnSource = columnSource;
this.buffer = IntBuffer.allocate(targetSize);
context = columnSource.makeGetContext(targetSize);
}

@Override
final public void fetchData(@NotNull final RowSequence rs) {
chunk = columnSource.getChunk(context, rs).asByteChunk();
}

@Override
final public int transferAllToBuffer() {
return transferOnePageToBuffer();
}

@Override
final public int transferOnePageToBuffer() {
if (!hasMoreDataToBuffer()) {
return 0;
}
buffer.clear();
// Assuming that all the fetched data will fit in one page. This is because page count is accurately
// calculated for non variable-width types. Check ParquetTableWriter.getTargetRowsPerPage for more details.
copyAllFromChunkToBuffer();
buffer.flip();
int ret = chunk.size();
chunk = null;
return ret;
}

/**
* Helper method to copy all data from {@code this.chunk} to {@code this.buffer}. The buffer should be cleared
* before calling this method and is positioned for a {@link Buffer#flip()} after the call.
*/
private void copyAllFromChunkToBuffer() {
for (int chunkIdx = 0; chunkIdx < chunk.size(); ++chunkIdx) {
byte value = chunk.get(chunkIdx);
if (value != QueryConstants.NULL_BYTE) {
if (minValue == QueryConstants.NULL_BYTE) {
minValue = maxValue = value;
} else if (value < minValue) {
minValue = value;
} else if (value > maxValue) {
maxValue = value;
}
}
buffer.put(value);
}
}

@Override
final public boolean hasMoreDataToBuffer() {
return (chunk != null);
}

@Override
final public IntBuffer getBuffer() {
return buffer;
}

@Override
final public void close() {
context.close();
}

@Override
public <T extends Comparable<T>> void updateStatistics(@NotNull final Statistics<T> stats) {
if (minValue != QueryConstants.NULL_BYTE) {
((IntStatistics) stats).setMinMax(minValue, maxValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
/*
* ---------------------------------------------------------------------------------------------------------------------
* AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY - for any changes edit IntTransfer and regenerate
* ---------------------------------------------------------------------------------------------------------------------
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.chunk.CharChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.QueryConstants;
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.jetbrains.annotations.NotNull;

import java.nio.Buffer;
import java.nio.IntBuffer;

public class CharTransfer implements TransferObject<IntBuffer> {

private final ColumnSource<?> columnSource;
private final ChunkSource.GetContext context;
private final IntBuffer buffer;
private CharChunk<? extends Values> chunk;
private char minValue = QueryConstants.NULL_CHAR;
private char maxValue = QueryConstants.NULL_CHAR;

public CharTransfer(
@NotNull final ColumnSource<?> columnSource,
final int targetSize) {
this.columnSource = columnSource;
this.buffer = IntBuffer.allocate(targetSize);
context = columnSource.makeGetContext(targetSize);
}

@Override
final public void fetchData(@NotNull final RowSequence rs) {
chunk = columnSource.getChunk(context, rs).asCharChunk();
}

@Override
final public int transferAllToBuffer() {
return transferOnePageToBuffer();
}

@Override
final public int transferOnePageToBuffer() {
if (!hasMoreDataToBuffer()) {
return 0;
}
buffer.clear();
// Assuming that all the fetched data will fit in one page. This is because page count is accurately
// calculated for non variable-width types. Check ParquetTableWriter.getTargetRowsPerPage for more details.
copyAllFromChunkToBuffer();
buffer.flip();
int ret = chunk.size();
chunk = null;
return ret;
}

/**
* Helper method to copy all data from {@code this.chunk} to {@code this.buffer}. The buffer should be cleared
* before calling this method and is positioned for a {@link Buffer#flip()} after the call.
*/
private void copyAllFromChunkToBuffer() {
for (int chunkIdx = 0; chunkIdx < chunk.size(); ++chunkIdx) {
char value = chunk.get(chunkIdx);
if (value != QueryConstants.NULL_CHAR) {
if (minValue == QueryConstants.NULL_CHAR) {
minValue = maxValue = value;
} else if (value < minValue) {
minValue = value;
} else if (value > maxValue) {
maxValue = value;
}
}
buffer.put(value);
}
}

@Override
final public boolean hasMoreDataToBuffer() {
return (chunk != null);
}

@Override
final public IntBuffer getBuffer() {
return buffer;
}

@Override
final public void close() {
context.close();
}

@Override
public <T extends Comparable<T>> void updateStatistics(@NotNull final Statistics<T> stats) {
if (minValue != QueryConstants.NULL_CHAR) {
((IntStatistics) stats).setMinMax(minValue, maxValue);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.parquet.table.transfer;

import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.codec.ObjectCodec;
import org.apache.parquet.io.api.Binary;
import org.jetbrains.annotations.NotNull;

public class CodecTransfer<T> extends EncodedTransfer<T> {
private final ObjectCodec<? super T> codec;

public CodecTransfer(
@NotNull final ColumnSource<?> columnSource,
@NotNull final ObjectCodec<? super T> codec,
final int maxValuesPerPage,
final int targetPageSize) {
super(columnSource, maxValuesPerPage, targetPageSize);
this.codec = codec;
}

@Override
Binary encodeToBinary(T value) {
return Binary.fromConstantByteArray(codec.encode(value));
}
}
Loading

0 comments on commit 1838705

Please sign in to comment.