Skip to content

Commit

Permalink
feat: Added support to write vector/array of big decimals to parquet (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Sep 6, 2024
1 parent b748158 commit 483a72f
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,7 @@

import io.deephaven.util.SafeCloseable;

import java.util.Iterator;
import java.util.PrimitiveIterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Interface for {@link SafeCloseable closeable} {@link PrimitiveIterator primitive iterators}.
Expand Down
150 changes: 107 additions & 43 deletions engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,37 @@
//
package io.deephaven.engine.util;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.primitive.iterator.CloseableIterator;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.vectors.ObjectVectorColumnWrapper;
import io.deephaven.vector.ObjectVector;
import io.deephaven.vector.Vector;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.math.BigDecimal;
import java.util.Iterator;
import java.util.Properties;

/**
* Utilities to support BigDecimal exhaust.
*
* <p>
* Parquet and Avro decimal types make a whole column decimal type have a fixed precision and scale; BigDecimal columns
* in Deephaven are, each value, arbitrary precision (its own precision and scale).
*
* <p>
* For static tables, it is possible to compute overall precision and scale values that fit every existing value. For
* refreshing tables, we need the user to tell us.
*/
public class BigDecimalUtils {
private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1);
private static final int TARGET_CHUNK_SIZE = 4096;

public static final int INVALID_PRECISION_OR_SCALE = -1;

private static final PrecisionAndScale EMPTY_TABLE_PRECISION_AND_SCALE = new PrecisionAndScale(1, 1);
private static final int INIT_MAX_PRECISION_MINUS_SCALE = -1;
private static final int INIT_MAX_SCALE = -1;

/**
* Immutable way to store and pass precision and scale values.
*/
Expand All @@ -44,67 +50,125 @@ public PrecisionAndScale(final int precision, final int scale) {
/**
* Compute an overall precision and scale that would fit all existing values in a table.
*
* @param t a Deephaven table
* @param colName a Column for {@code t}, which should be of {@code BigDecimal} type
* @return a {@code PrecisionAndScale} object result.
* @param table A Deephaven table
* @param colName Column for {@code table}, which should be of {@code BigDecimal} {@link ColumnSource#getType type}
* or {@link ColumnSource#getComponentType component type}
* @return A {@link PrecisionAndScale} object result.
*/
public static PrecisionAndScale computePrecisionAndScale(
final Table t, final String colName) {
final ColumnSource<BigDecimal> src = t.getColumnSource(colName, BigDecimal.class);
return computePrecisionAndScale(t.getRowSet(), src);
final Table table,
final String colName) {
final ColumnSource<?> src = table.getColumnSource(colName);
return computePrecisionAndScale(table.getRowSet(), src);
}

/**
* Compute an overall precision and scale that would fit all existing values in a column source. Note that this
* requires a full table scan to ensure the correct values are determined.
*
* @param rowSet The rowset for the provided column
* @param source a {@code ColumnSource} of {@code BigDecimal} type
* @return a {@code PrecisionAndScale} object result.
* @param columnSource A {@code ColumnSource} of {@code BigDecimal} {@link ColumnSource#getType type} or
* {@link ColumnSource#getComponentType component type}
* @return A {@link PrecisionAndScale} object result.
*/
public static PrecisionAndScale computePrecisionAndScale(
final RowSet rowSet,
final ColumnSource<BigDecimal> source) {
final ColumnSource<?> columnSource) {
if (rowSet.isEmpty()) {
return EMPTY_TABLE_PRECISION_AND_SCALE;
}

// We will walk the entire table to determine the max(precision - scale) and
// max(scale), which corresponds to max(digits left of the decimal point), max(digits right of the decimal
// point). Then we convert to (precision, scale) before returning.
int maxPrecisionMinusScale = -1;
int maxScale = -1;
try (final ChunkSource.GetContext context = source.makeGetContext(TARGET_CHUNK_SIZE);
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
while (it.hasMore()) {
final RowSequence rowSeq = it.getNextRowSequenceWithLength(TARGET_CHUNK_SIZE);
final ObjectChunk<BigDecimal, ? extends Values> chunk =
source.getChunk(context, rowSeq).asObjectChunk();
for (int i = 0; i < chunk.size(); ++i) {
final BigDecimal x = chunk.get(i);
if (x == null) {
continue;
}

final int precision = x.precision();
final int scale = x.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
}
final BigDecimalParameters result = new BigDecimalParameters(INIT_MAX_PRECISION_MINUS_SCALE, INIT_MAX_SCALE);
final ObjectVector<?> columnVector = new ObjectVectorColumnWrapper<>(columnSource, rowSet);
try (final CloseableIterator<?> columnIterator = columnVector.iterator()) {
final Class<?> columnType = columnSource.getType();
if (columnType == BigDecimal.class) {
// noinspection unchecked
processFlatColumn((Iterator<BigDecimal>) columnIterator, result);
} else if (columnSource.getComponentType() == BigDecimal.class) {
if (columnType.isArray()) {
// noinspection unchecked
processArrayColumn((Iterator<BigDecimal[]>) columnIterator, result);
} else if (Vector.class.isAssignableFrom(columnType)) {
// noinspection unchecked
processVectorColumn((Iterator<ObjectVector<BigDecimal>>) columnIterator, result);
}
} else {
throw new IllegalArgumentException("Column source is not of type BigDecimal or an array/vector of " +
"BigDecimal, but of type " + columnType + " and component type " +
columnSource.getComponentType());
}
}

// If these are < 0, then every value we visited was null
if (maxPrecisionMinusScale < 0 && maxScale < 0) {
// If these are same as initial values, then every value we visited was null
if (result.maxPrecisionMinusScale == INIT_MAX_PRECISION_MINUS_SCALE && result.maxScale == INIT_MAX_SCALE) {
return EMPTY_TABLE_PRECISION_AND_SCALE;
}

return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale);
return new PrecisionAndScale(result.maxPrecisionMinusScale + result.maxScale, result.maxScale);
}

private static class BigDecimalParameters {
private int maxPrecisionMinusScale;
private int maxScale;

private BigDecimalParameters(final int maxPrecisionMinusScale, final int maxScale) {
this.maxPrecisionMinusScale = maxPrecisionMinusScale;
this.maxScale = maxScale;
}

/**
* Update the maximum values for the parameters based on the given value.
*/
private void updateMaximum(@Nullable final BigDecimal value) {
if (value == null) {
return;
}
final int precision = value.precision();
final int scale = value.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
}
}
}

private static void processFlatColumn(
@NotNull final Iterator<BigDecimal> columnIterator,
@NotNull final BigDecimalParameters result) {
columnIterator.forEachRemaining(result::updateMaximum);
}

private static void processVectorColumn(
@NotNull final Iterator<ObjectVector<BigDecimal>> columnIterator,
@NotNull final BigDecimalParameters result) {
columnIterator.forEachRemaining(values -> {
if (values == null) {
return;
}
try (final CloseableIterator<BigDecimal> valuesIterator = values.iterator()) {
valuesIterator.forEachRemaining(result::updateMaximum);
}
});
}

private static void processArrayColumn(
@NotNull final Iterator<BigDecimal[]> columnIterator,
@NotNull final BigDecimalParameters result) {
columnIterator.forEachRemaining(values -> {
if (values == null) {
return;
}
for (final BigDecimal value : values) {
result.updateMaximum(value);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public static PrecisionAndScale getPrecisionAndScale(
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final String columnName,
@NotNull final RowSet rowSet,
@NotNull Supplier<ColumnSource<BigDecimal>> columnSourceSupplier) {
@NotNull final Supplier<ColumnSource<?>> columnSourceSupplier) {
return (PrecisionAndScale) computedCache
.computeIfAbsent(columnName, unusedColumnName -> new HashMap<>())
.computeIfAbsent(ParquetCacheTags.DECIMAL_ARGS,
Expand Down Expand Up @@ -152,7 +152,7 @@ static TypeInfo bigDecimalTypeInfo(
final String columnName = column.getName();
// noinspection unchecked
final PrecisionAndScale precisionAndScale = getPrecisionAndScale(
computedCache, columnName, rowSet, () -> (ColumnSource<BigDecimal>) columnSourceMap.get(columnName));
computedCache, columnName, rowSet, () -> columnSourceMap.get(columnName));
final Set<Class<?>> clazzes = Set.of(BigDecimal.class);
return new TypeInfo() {
@Override
Expand All @@ -175,14 +175,9 @@ static TypeInfo getTypeInfo(
final RowSet rowSet,
final Map<String, ? extends ColumnSource<?>> columnSourceMap,
@NotNull final ParquetInstructions instructions) {
if (BigDecimal.class.equals(column.getDataType())) {
if (column.getDataType() == BigDecimal.class || column.getComponentType() == BigDecimal.class) {
return bigDecimalTypeInfo(computedCache, column, rowSet, columnSourceMap);
}
if (BigDecimal.class.equals(column.getComponentType())) {
throw new UnsupportedOperationException("Writing arrays/vector columns for big decimals is currently not " +
"supported");
// TODO(deephaven-core#4612): Add support for this
}
return lookupTypeInfo(column, instructions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static <DATA_TYPE> TransferObject<?> create(
@NotNull final Map<String, Map<ParquetCacheTags, Object>> computedCache,
@NotNull final String columnName,
@NotNull final ColumnSource<DATA_TYPE> columnSource) {
Class<DATA_TYPE> columnType = columnSource.getType();
final Class<DATA_TYPE> columnType = columnSource.getType();
if (columnType == int.class) {
return IntTransfer.create(columnSource, tableRowSet, instructions.getTargetPageSize());
}
Expand Down Expand Up @@ -84,13 +84,11 @@ static <DATA_TYPE> TransferObject<?> create(
return new CodecTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (columnType == BigDecimal.class) {
// noinspection unchecked
final ColumnSource<BigDecimal> bigDecimalColumnSource = (ColumnSource<BigDecimal>) columnSource;
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
computedCache, columnName, tableRowSet, () -> bigDecimalColumnSource);
computedCache, columnName, tableRowSet, () -> columnSource);
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
precisionAndScale.precision, precisionAndScale.scale);
return new CodecTransfer<>(bigDecimalColumnSource, codec, tableRowSet, instructions.getTargetPageSize());
return new CodecTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (columnType == BigInteger.class) {
return new CodecTransfer<>(columnSource, new BigIntegerParquetBytesCodec(), tableRowSet,
Expand Down Expand Up @@ -136,6 +134,13 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == String.class) {
return new StringArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigDecimal.class) {
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
computedCache, columnName, tableRowSet, () -> columnSource);
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
precisionAndScale.precision, precisionAndScale.scale);
return new CodecArrayTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigInteger.class) {
return new CodecArrayTransfer<>(columnSource, new BigIntegerParquetBytesCodec(),
tableRowSet, instructions.getTargetPageSize());
Expand All @@ -152,7 +157,7 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == LocalDateTime.class) {
return new LocalDateTimeArrayTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
// TODO(deephaven-core#4612): Handle arrays of BigDecimal and if explicit codec provided
// TODO(deephaven-core#4612): Handle if explicit codec provided
}
if (Vector.class.isAssignableFrom(columnType)) {
if (componentType == int.class) {
Expand Down Expand Up @@ -182,6 +187,13 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == String.class) {
return new StringVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigDecimal.class) {
final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale(
computedCache, columnName, tableRowSet, () -> columnSource);
final ObjectCodec<BigDecimal> codec = new BigDecimalParquetBytesCodec(
precisionAndScale.precision, precisionAndScale.scale);
return new CodecVectorTransfer<>(columnSource, codec, tableRowSet, instructions.getTargetPageSize());
}
if (componentType == BigInteger.class) {
return new CodecVectorTransfer<>(columnSource, new BigIntegerParquetBytesCodec(),
tableRowSet, instructions.getTargetPageSize());
Expand All @@ -198,7 +210,7 @@ static <DATA_TYPE> TransferObject<?> create(
if (componentType == LocalDateTime.class) {
return new LocalDateTimeVectorTransfer(columnSource, tableRowSet, instructions.getTargetPageSize());
}
// TODO(deephaven-core#4612): Handle vectors of BigDecimal and if explicit codec provided
// TODO(deephaven-core#4612): Handle if explicit codec provided
}

// Go with the default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1832,29 +1832,6 @@ private static Table arrayToVectorTable(final Table table) {
return arrayToVectorFormulas.isEmpty() ? table : table.updateView(arrayToVectorFormulas);
}

@Test
public void testBigDecimalArrayColumn() {
final Table bdArrayTable = TableTools.emptyTable(10000).select(Selectable.from(List.of(
"someBigDecimalArrayColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : " +
"java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}")));
final File dest = new File(rootFile + File.separator + "testBigDecimalArrayColumn.parquet");
try {
ParquetTools.writeTable(bdArrayTable, dest.getPath());
fail("Expected exception because writing arrays of big decimal column types is not supported");
} catch (final RuntimeException e) {
assertTrue(e.getCause() instanceof UnsupportedOperationException);
}

// Convert array to vector table
final Table bdVectorTable = arrayToVectorTable(bdArrayTable);
try {
ParquetTools.writeTable(bdVectorTable, dest.getPath());
fail("Expected exception because writing vectors of big decimal column types is not supported");
} catch (final RuntimeException e) {
assertTrue(e.getCause() instanceof UnsupportedOperationException);
}
}

@Test
public void testArrayColumns() {
ArrayList<String> columns =
Expand All @@ -1869,6 +1846,7 @@ public void testArrayColumns() {
"someByteArrayColumn = new byte[] {i % 10 == 0 ? null : (byte)i}",
"someCharArrayColumn = new char[] {i % 10 == 0 ? null : (char)i}",
"someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}",
"someBigDecimalArrayColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}",
"someBiArrayColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}",
"someDateArrayColumn = new java.time.LocalDate[] {i % 10 == 0 ? null : java.time.LocalDate.ofEpochDay(i)}",
"someTimeArrayColumn = new java.time.LocalTime[] {i % 10 == 0 ? null : java.time.LocalTime.of(i%24, i%60, (i+10)%60)}",
Expand Down
5 changes: 4 additions & 1 deletion py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,8 @@ def get_table_with_array_data(self):
"someCharArrayColumn = new char[] {i % 10 == 0 ? null : (char)i}",
"someTimeArrayColumn = new Instant[] {i % 10 == 0 ? null : (Instant)DateTimeUtils.now() + i}",
"someBiColumn = new java.math.BigInteger[] {i % 10 == 0 ? null : java.math.BigInteger.valueOf(i)}",
"someBdColumn = new java.math.BigDecimal[] {i % 10 == 0 ? null : " +
"java.math.BigDecimal.valueOf(ii).stripTrailingZeros()}",
"nullStringArrayColumn = new String[] {(String)null}",
"nullIntArrayColumn = new int[] {(int)null}",
"nullLongArrayColumn = new long[] {(long)null}",
Expand All @@ -240,7 +242,8 @@ def get_table_with_array_data(self):
"nullByteArrayColumn = new byte[] {(byte)null}",
"nullCharArrayColumn = new char[] {(char)null}",
"nullTimeArrayColumn = new Instant[] {(Instant)null}",
"nullBiColumn = new java.math.BigInteger[] {(java.math.BigInteger)null}"
"nullBiColumn = new java.math.BigInteger[] {(java.math.BigInteger)null}",
"nullBdColumn = new java.math.BigDecimal[] {(java.math.BigDecimal)null}"
])
return dh_table

Expand Down

0 comments on commit 483a72f

Please sign in to comment.