Skip to content

Commit

Permalink
[Kernel] Parquet writer TableClient APIs and default implementation (
Browse files Browse the repository at this point in the history
…delta-io#2626)

Add the following API to `ParquetHandler` to support writing Parquet files.

```
    /**
     * Write the given data batches to a Parquet files. Try to keep the Parquet file size to given
     * size. If the current file exceeds this size close the current file and start writing to a new
     * file.
     * <p>
     *
     * @param directoryPath Path to the directory where the Parquet should be written into.
     * @param dataIter      Iterator of data batches to write.
     * @param maxFileSize   Target maximum size of the created Parquet file in bytes.
     * @param statsColumns  List of columns to collect statistics for. The statistics collection is
     *                      optional. If the implementation does not support statistics collection,
     *                      it is ok to return no statistics.
     * @return an iterator of {@link DataFileStatus} containing the status of the written files.
     * Each status contains the file path and the optionally collected statistics for the file
     * It is the responsibility of the caller to close the iterator.
     *
     * @throws IOException if an I/O error occurs during the file writing. This may leave some files
     *                     already written in the directory. It is the responsibility of the caller
     *                     to clean up.
     * @SInCE 3.2.0
     */
    CloseableIterator<DataFileStatus> writeParquetFiles(
            String directoryPath,
            CloseableIterator<FilteredColumnarBatch> dataIter,
            long maxFileSize,
            List<Column> statsColumns) throws IOException;
```

The default implementation of the above interface uses `parquet-mr` library.

## How was this patch tested?
Added support for all Delta types except the `timestamp_ntz`. Tested writing different data types with variations of nested levels, null/non-null values and target file size.

## Followup work
* Support 2-level structures for array and map type data writing
* Support INT64 format timestamp writing
* Decimal legacy format (always binary) support
* Uniform support to add field id for intermediate elements in `MAP`, `LIST` data types.
  • Loading branch information
vkorukanti authored Feb 14, 2024
1 parent 665aa7d commit 4ecfa45
Show file tree
Hide file tree
Showing 14 changed files with 1,997 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
package io.delta.kernel.client;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.*;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import io.delta.kernel.utils.*;

/**
* Provides Parquet file related functionalities to Delta Kernel. Connectors can leverage this
Expand All @@ -44,25 +45,53 @@ public interface ParquetHandler {
* {@link StructField#METADATA_ROW_INDEX_COLUMN_NAME} and the field is a metadata column
* {@link StructField#isMetadataColumn()} the column must be populated with the file row index.
* <p>
* How does a column in {@code physicalSchema} match to the column in the Parquet file?
* If the {@link StructField} has a field id in the {@code metadata} with key `parquet.field.id`
* the column is attempted to match by id. If the column is not found by id, the column is
* matched by name. When trying to find the column in Parquet by name,
* first case-sensitive match is used. If not found then a case-insensitive match is attempted.
* How does a column in {@code physicalSchema} match to the column in the Parquet file? If the
* {@link StructField} has a field id in the {@code metadata} with key `parquet.field.id` the
* column is attempted to match by id. If the column is not found by id, the column is matched
* by name. When trying to find the column in Parquet by name, first case-sensitive match is
* used. If not found then a case-insensitive match is attempted.
*
* @param fileIter Iterator of files to read data from.
* @param physicalSchema Select list of columns to read from the Parquet file.
* @param predicate Optional predicate which the Parquet reader can optionally use to prune
* rows that don't satisfy the predicate. Because pruning is optional and
* may be incomplete, caller is still responsible apply the predicate on
* the data returned by this method.
* @return an iterator of {@link ColumnarBatch}s containing the data in columnar format.
* It is the responsibility of the caller to close the iterator. The data returned is in the
* same as the order of files given in {@code scanFileIter}.
* @param predicate Optional predicate which the Parquet reader can optionally use to
* prune rows that don't satisfy the predicate. Because pruning is
* optional and may be incomplete, caller is still responsible apply the
* predicate on the data returned by this method.
* @return an iterator of {@link ColumnarBatch}s containing the data in columnar format. It is
* the responsibility of the caller to close the iterator. The data returned is in the same as
* the order of files given in {@code scanFileIter}.
* @throws IOException if an I/O error occurs during the read.
*/
CloseableIterator<ColumnarBatch> readParquetFiles(
CloseableIterator<FileStatus> fileIter,
StructType physicalSchema,
Optional<Predicate> predicate) throws IOException;
CloseableIterator<FileStatus> fileIter,
StructType physicalSchema,
Optional<Predicate> predicate) throws IOException;

/**
* Write the given data batches to a Parquet files. Try to keep the Parquet file size to given
* size. If the current file exceeds this size close the current file and start writing to a new
* file.
* <p>
*
* @param directoryPath Location where the data files should be written.
* @param dataIter Iterator of data batches to write. It is the responsibility of the calle
* to close the iterator.
* @param maxFileSize Target maximum size of the created Parquet file in bytes.
* @param statsColumns List of columns to collect statistics for. The statistics collection is
* optional. If the implementation does not support statistics collection,
* it is ok to return no statistics.
* @return an iterator of {@link DataFileStatus} containing the status of the written files.
* Each status contains the file path and the optionally collected statistics for the file
* It is the responsibility of the caller to close the iterator.
*
* @throws IOException if an I/O error occurs during the file writing. This may leave some files
* already written in the directory. It is the responsibility of the caller
* to clean up.
* @since 3.2.0
*/
CloseableIterator<DataFileStatus> writeParquetFiles(
String directoryPath,
CloseableIterator<FilteredColumnarBatch> dataIter,
long maxFileSize,
List<Column> statsColumns) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.utils;

import java.util.Collections;
import java.util.Map;

import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Literal;

/**
* Statistics about data file in a Delta Lake table.
*/
public class DataFileStatistics {
private final long numRecords;
private final Map<Column, Literal> minValues;
private final Map<Column, Literal> maxValues;
private final Map<Column, Long> nullCounts;

/**
* Create a new instance of {@link DataFileStatistics}.
*
* @param numRecords Number of records in the data file.
* @param minValues Map of column to minimum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the
* map.
* @param maxValues Map of column to maximum value of it in the data file. If the data file has
* all nulls for the column, the value will be null or not present in the
* map.
* @param nullCounts Map of column to number of nulls in the data file.
*/
public DataFileStatistics(
long numRecords,
Map<Column, Literal> minValues,
Map<Column, Literal> maxValues,
Map<Column, Long> nullCounts) {
this.numRecords = numRecords;
this.minValues = Collections.unmodifiableMap(minValues);
this.maxValues = Collections.unmodifiableMap(maxValues);
this.nullCounts = Collections.unmodifiableMap(nullCounts);
}

/**
* Get the number of records in the data file.
*
* @return Number of records in the data file.
*/
public long getNumRecords() {
return numRecords;
}

/**
* Get the minimum values of the columns in the data file. The map may contain statistics for
* only a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMinValues() {
return minValues;
}

/**
* Get the maximum values of the columns in the data file. The map may contain statistics for
* only a subset of columns in the data file.
*
* @return Map of column to minimum value of it in the data file.
*/
public Map<Column, Literal> getMaxValues() {
return maxValues;
}

/**
* Get the number of nulls of columns in the data file. The map may contain statistics for only
* a subset of columns in the data file.
*
* @return Map of column to number of nulls in the data file.
*/
public Map<Column, Long> getNullCounts() {
return nullCounts;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.utils;

import java.util.Optional;

/**
* Extends {@link FileStatus} to include additional details such as column level statistics
* of the data file in the Delta Lake table.
*/
public class DataFileStatus extends FileStatus {

private final Optional<DataFileStatistics> statistics;

/**
* Create a new instance of {@link DataFileStatus}.
*
* @param path Fully qualified file path.
* @param size File size in bytes.
* @param modificationTime Last modification time of the file in epoch milliseconds.
* @param statistics Optional column and file level statistics in the data file.
*/
public DataFileStatus(
String path,
long size,
long modificationTime,
Optional<DataFileStatistics> statistics) {
super(path, size, modificationTime);
this.statistics = statistics;
}

/**
* Get the statistics of the data file encapsulated in this object.
*
* @return Statistics of the file.
*/
public Optional<DataFileStatistics> getStatistics() {
return statistics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class FileStatus {
private final long size;
private final long modificationTime;

private FileStatus(
protected FileStatus(
String path,
long size,
long modificationTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,23 @@
package io.delta.kernel.defaults.client;

import java.io.IOException;
import java.util.List;
import java.util.Optional;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import io.delta.kernel.client.ParquetHandler;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.*;
import io.delta.kernel.expressions.Column;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import io.delta.kernel.utils.*;

import io.delta.kernel.internal.util.Utils;

import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader;
import io.delta.kernel.defaults.internal.parquet.ParquetFileWriter;

/**
* Default implementation of {@link ParquetHandler} based on Hadoop APIs.
Expand Down Expand Up @@ -87,4 +90,15 @@ public ColumnarBatch next() {
}
};
}

@Override
public CloseableIterator<DataFileStatus> writeParquetFiles(
String directoryPath,
CloseableIterator<FilteredColumnarBatch> dataIter,
long maxFileSize,
List<Column> statsColumns) throws IOException {
ParquetFileWriter batchWriter =
new ParquetFileWriter(hadoopConf, new Path(directoryPath), maxFileSize, statsColumns);
return batchWriter.write(dataIter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package io.delta.kernel.defaults.internal;

import java.time.LocalDate;
import java.util.concurrent.TimeUnit;

import io.delta.kernel.internal.util.Tuple2;

public class DefaultKernelUtils {
private static final LocalDate EPOCH = LocalDate.ofEpochDay(0);
Expand All @@ -39,6 +42,18 @@ public static long fromJulianDay(int days, long nanos) {
nanos / DateTimeConstants.NANOS_PER_MICROS;
}

/**
* Returns Julian day and remaining nanoseconds from the number of microseconds
*
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/
public static Tuple2<Integer, Long> toJulianDay(long micros) {
long julianUs = micros + JULIAN_DAY_OF_EPOCH * DateTimeConstants.MICROS_PER_DAY;
long days = julianUs / DateTimeConstants.MICROS_PER_DAY;
long us = julianUs % DateTimeConstants.MICROS_PER_DAY;
return new Tuple2<>((int) days, TimeUnit.MICROSECONDS.toNanos(us));
}

public static long millisToMicros(long millis) {
return Math.multiplyExact(millis, DateTimeConstants.MICROS_PER_MILLIS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public int getSize() {

private void checkColumnOrdinal(int ordinal) {
if (ordinal < 0 || ordinal >= columnVectors.size()) {
throw new IllegalArgumentException("invalid column ordinal");
throw new IllegalArgumentException("invalid column ordinal: " + ordinal);
}
}
}
Loading

0 comments on commit 4ecfa45

Please sign in to comment.