Skip to content

Commit

Permalink
table_definition will be optional for parquet batch_write (deepha…
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Jun 10, 2024
1 parent 6512b0e commit df2733c
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -851,8 +851,9 @@ private static Collection<List<String>> indexedColumnNames(@NotNull final Table

/**
* Write out tables to disk. Data indexes to write are determined by those already present on the first source or
* those provided through {@link ParquetInstructions.Builder#addIndexColumns}. The {@link TableDefinition} to use
* for writing must be provided as part of {@link ParquetInstructions}.
* those provided through {@link ParquetInstructions.Builder#addIndexColumns}. If all source tables have the same
* definition, this method will use the common definition for writing. Else, a definition must be provided through
* the {@code writeInstructions}.
*
* @param sources The tables to write
* @param destinations The destination paths or URIs. Any non-existing directories in the paths provided are
Expand All @@ -864,19 +865,35 @@ public static void writeTables(
@NotNull final Table[] sources,
@NotNull final String[] destinations,
@NotNull final ParquetInstructions writeInstructions) {
final Collection<List<String>> indexColumns =
writeInstructions.getIndexColumns().orElseGet(() -> indexedColumnNames(sources));
final TableDefinition definition = writeInstructions.getTableDefinition().orElseThrow(
() -> new IllegalArgumentException("Table definition must be provided"));
if (sources.length == 0) {
throw new IllegalArgumentException("No source tables provided for writing");
}
if (sources.length != destinations.length) {
throw new IllegalArgumentException("Number of sources and destinations must match");
}
final TableDefinition definition;
if (writeInstructions.getTableDefinition().isPresent()) {
definition = writeInstructions.getTableDefinition().get();
} else {
final TableDefinition firstDefinition = sources[0].getDefinition();
for (int idx = 1; idx < sources.length; idx++) {
if (!firstDefinition.equals(sources[idx].getDefinition())) {
throw new IllegalArgumentException(
"Table definition must be provided when writing multiple tables " +
"with different definitions");
}
}
definition = firstDefinition;
}
final File[] destinationFiles = new File[destinations.length];
for (int i = 0; i < destinations.length; i++) {
final URI destinationURI = convertToURI(destinations[i], false);
for (int idx = 0; idx < destinations.length; idx++) {
final URI destinationURI = convertToURI(destinations[idx], false);
if (!FILE_URI_SCHEME.equals(destinationURI.getScheme())) {
throw new IllegalArgumentException(
"Only file URI scheme is supported for writing parquet files, found" +
"non-file URI: " + destinations[i]);
"non-file URI: " + destinations[idx]);
}
destinationFiles[i] = new File(destinationURI);
destinationFiles[idx] = new File(destinationURI);
}
final File metadataRootDir;
if (writeInstructions.generateMetadataFiles()) {
Expand All @@ -893,7 +910,8 @@ public static void writeTables(
} else {
metadataRootDir = null;
}

final Collection<List<String>> indexColumns =
writeInstructions.getIndexColumns().orElseGet(() -> indexedColumnNames(sources));
final Map<String, Map<ParquetCacheTags, Object>> computedCache =
buildComputedCache(() -> PartitionedTableFactory.ofTables(definition, sources).merge(), definition);
// We do not have any additional schema for partitioning columns in this case. Schema for all columns will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ public void flatPartitionedParquetWithMetadataTest() throws IOException {
// Write without any metadata files
writeTables(new Table[] {someTable, someTable},
new String[] {firstDataFile.getPath(), secondDataFile.getPath()},
ParquetInstructions.EMPTY.withTableDefinition(someTable.getDefinition()));
ParquetInstructions.EMPTY);
final Table source = readTable(parentDir.getPath()).select();

// Now write with metadata files
Expand All @@ -722,7 +722,7 @@ public void flatPartitionedParquetWithMetadataTest() throws IOException {
.build();
writeTables(new Table[] {someTable, someTable},
new String[] {firstDataFile.getPath(), secondDataFile.getPath()},
writeInstructions.withTableDefinition(someTable.getDefinition()));
writeInstructions);

final Table fromDisk = readTable(parentDir.getPath());
assertTableEquals(source, fromDisk);
Expand Down Expand Up @@ -753,7 +753,7 @@ public void flatPartitionedParquetWithMetadataTest() throws IOException {
try {
writeTables(new Table[] {someTable, someTable},
new String[] {firstDataFile.getPath(), updatedSecondDataFile.getPath()},
writeInstructions.withTableDefinition(someTable.getDefinition()));
writeInstructions);
fail("Expected exception when writing the metadata files for tables with different parent directories");
} catch (final RuntimeException expected) {
}
Expand All @@ -774,8 +774,7 @@ public void flatPartitionedParquetWithBigDecimalMetadataTest() throws IOExceptio
.setGenerateMetadataFiles(true)
.build();
final Table[] sources = new Table[] {firstTable, secondTable};
writeTables(sources, new String[] {firstDataFile.getPath(), secondDataFile.getPath()},
writeInstructions.withTableDefinition(firstTable.getDefinition()));
writeTables(sources, new String[] {firstDataFile.getPath(), secondDataFile.getPath()}, writeInstructions);

// Merge the tables and compute the precision and scale as per the union of the two tables
final Table expected =
Expand Down Expand Up @@ -1782,7 +1781,7 @@ private interface TestParquetTableWriter {
(sourceTable, destFile) -> writeTable(sourceTable, destFile.getPath());
private static final TestParquetTableWriter MULTI_WRITER =
(table, destFile) -> writeTables(new Table[] {table}, new String[] {destFile.getPath()},
ParquetInstructions.EMPTY.withTableDefinition(table.getDefinition()));
ParquetInstructions.EMPTY);

/**
* Verify that the parent directory contains the expected parquet files and index files in the right directory
Expand Down Expand Up @@ -2014,6 +2013,77 @@ public void writeMultiTableExceptionTest() {
assertEquals(0, parentDir.list().length);
}

@Test
public void writeMultiTableDefinitionTest() {
// Create an empty parent directory
final File parentDir = new File(rootFile, "tempDir");
parentDir.mkdir();

final int numRows = 5;
final Table firstTable = TableTools.emptyTable(numRows)
.updateView("A = Long.toString(ii)", "B=(long)ii");
final File firstDestFile = new File(parentDir, "firstTable.parquet");

final Table secondTable = TableTools.emptyTable(numRows)
.updateView("A = Long.toString(ii*5)", "B=(long)(ii*5)");
final File secondDestFile = new File(parentDir, "secondTable.parquet");

final Table[] tablesToSave = new Table[] {firstTable, secondTable};
final String[] destinations = new String[] {firstDestFile.getPath(), secondDestFile.getPath()};

try {
writeTables(tablesToSave, new String[] {firstDestFile.getPath()},
ParquetInstructions.EMPTY.withTableDefinition(firstTable.getDefinition()));
TestCase.fail("Exception expected becuase of mismatch in number of tables and destinations");
} catch (final IllegalArgumentException expected) {
}

// Writing a single table without definition should work
writeTables(new Table[] {firstTable}, new String[] {firstDestFile.getPath()}, ParquetInstructions.EMPTY);
checkSingleTable(firstTable, firstDestFile);
assertTrue(firstDestFile.delete());

// Writing a single table with definition should work
writeTables(new Table[] {firstTable}, new String[] {firstDestFile.getPath()},
ParquetInstructions.EMPTY.withTableDefinition(firstTable.view("A").getDefinition()));
checkSingleTable(firstTable.view("A"), firstDestFile);
assertTrue(firstDestFile.delete());

// Writing multiple tables which have the same definition should work
writeTables(tablesToSave, destinations, ParquetInstructions.EMPTY);
checkSingleTable(firstTable, firstDestFile);
checkSingleTable(secondTable, secondDestFile);
assertTrue(firstDestFile.delete());
assertTrue(secondDestFile.delete());

// Writing multiple tables which have the different definition should not work
final Table thirdTable = TableTools.emptyTable(numRows)
.updateView("A = Long.toString(ii*10)", "B=(int)(ii*10)");
final File thirdDestFile = new File(parentDir, "thirdTable.parquet");
try {
writeTables(new Table[] {firstTable, thirdTable},
new String[] {firstDestFile.getPath(), thirdDestFile.getPath()},
ParquetInstructions.EMPTY);
TestCase.fail("Exception expected becuase of mismatch in table definitions");
} catch (final IllegalArgumentException expected) {
}

// Taking view with same definition should work
writeTables(new Table[] {firstTable.view("A"), thirdTable.view("A")},
new String[] {firstDestFile.getPath(), thirdDestFile.getPath()}, ParquetInstructions.EMPTY);
checkSingleTable(firstTable.view("A"), firstDestFile);
checkSingleTable(thirdTable.view("A"), thirdDestFile);
assertTrue(firstDestFile.delete());
assertTrue(thirdDestFile.delete());

// Providing a definition should work
writeTables(new Table[] {firstTable, thirdTable},
new String[] {firstDestFile.getPath(), thirdDestFile.getPath()},
ParquetInstructions.EMPTY.withTableDefinition(firstTable.view("A").getDefinition()));
checkSingleTable(firstTable.view("A"), firstDestFile);
checkSingleTable(thirdTable.view("A"), thirdDestFile);
}

@Test
public void writingParquetFilesWithSpacesInName() {
final String parentDirName = "tempDir";
Expand Down Expand Up @@ -2287,8 +2357,7 @@ public void writeMultiTableIndexTest() {

Table[] tablesToSave = new Table[] {firstTable, secondTable};
final String[] destinations = new String[] {firstDestFile.getPath(), secondDestFile.getPath()};
writeTables(tablesToSave, destinations,
ParquetInstructions.EMPTY.withTableDefinition(firstTable.getDefinition()));
writeTables(tablesToSave, destinations, ParquetInstructions.EMPTY);

String firstIndexFilePath = ".dh_metadata/indexes/vvv/index_vvv_firstTable.parquet";
String secondIndexFilePath = ".dh_metadata/indexes/vvv/index_vvv_secondTable.parquet";
Expand Down
9 changes: 5 additions & 4 deletions py/server/deephaven/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def write_partitioned(
def batch_write(
tables: List[Table],
paths: List[str],
table_definition: Union[Dict[str, DType], List[Column]],
table_definition: Optional[Union[Dict[str, DType], List[Column]]] = None,
col_instructions: Optional[List[ColumnInstruction]] = None,
compression_codec_name: Optional[str] = None,
max_dictionary_keys: Optional[int] = None,
Expand All @@ -415,9 +415,10 @@ def batch_write(
paths (List[str]): the destination paths. Any non-existing directories in the paths provided are
created. If there is an error, any intermediate directories previously created are removed; note this makes
this method unsafe for concurrent use
table_definition (Union[Dict[str, DType], List[Column]]): the table definition to use for writing, instead of
the definitions implied by the tables. This definition can be used to skip some columns or add additional
columns with null values.
table_definition (Optional[Union[Dict[str, DType], List[Column]]]): the table definition to use for writing.
This definition can be used to skip some columns or add additional columns with null values. Default is
None, which means if all tables have the same definition, use the common table definition implied by the
tables. Otherwise, this parameter must be specified.
col_instructions (Optional[List[ColumnInstruction]]): instructions for customizations while writing
compression_codec_name (Optional[str]): the compression codec to use. Allowed values include "UNCOMPRESSED",
"SNAPPY", "GZIP", "LZO", "LZ4", "LZ4_RAW", "ZSTD", etc. If not specified, defaults to "SNAPPY".
Expand Down
32 changes: 25 additions & 7 deletions py/server/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,13 +688,7 @@ def test_write_with_index_columns(self):
shutil.rmtree(".dh_metadata")

second_table = empty_table(10).update(formulas=["x=i*5", "y=(double)(i/5.0)", "z=(double)(i*i*i)"])
table_definition = {
"x": dtypes.int32,
"y": dtypes.double,
"z": dtypes.double,
}
batch_write([first_table, second_table], ["X.parquet", "Y.parquet"], index_columns=[["x"], ["y", "z"]],
table_definition=table_definition)
batch_write([first_table, second_table], ["X.parquet", "Y.parquet"], index_columns=[["x"], ["y", "z"]])
from_disk_first_table = read("X.parquet")
self.assert_table_equals(first_table, from_disk_first_table)
from_disk_second_table = read("Y.parquet")
Expand Down Expand Up @@ -752,5 +746,29 @@ def test_v2_pages_helper(dh_table):
dh_table2 = self.get_table_with_array_data()
test_v2_pages_helper(dh_table2)

def test_batch_write_definition_handling(self):
table = empty_table(3).update(
formulas=["x=i", "y=(double)(i/10.0)", "z=(double)(i*i)"]
)
table2 = empty_table(3).update(
formulas=["x=i*2", "y=(double)(i/5.0)", "z=(double)(i*i*i)"]
)
# Should succeed because both tables have the same definition
batch_write([table, table2], ["X.parquet", "Y.parquet"])
self.assert_table_equals(read("X.parquet"), table)
self.assert_table_equals(read("Y.parquet"), table2)

table_definition = {
"x": dtypes.int32,
"y": dtypes.double,
}
batch_write([table, table2], ["X.parquet", "Y.parquet"], table_definition=table_definition)
self.assert_table_equals(read("X.parquet"), table.view(["x", "y"]))
self.assert_table_equals(read("Y.parquet"), table2.view(["x", "y"]))

# Fails because we don't provide a table definition and the tables have different definition
with self.assertRaises(DHError):
batch_write([table, table2.view(["x", "y"])],["X.parquet", "Y.parquet"])

if __name__ == '__main__':
unittest.main()

0 comments on commit df2733c

Please sign in to comment.