Skip to content

Commit

Permalink
Added tests for deephaven-examples
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Oct 11, 2024
1 parent e4c1ddc commit 4f8aa29
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Top level accessor for a parquet file which can read both from a file path string or a CLI style file URI,
* ex."s3://bucket/key".
* Top level accessor for a parquet file which can read both from a CLI style file URI, ex."s3://bucket/key".
*/
public class ParquetFileReader {
private static final int FOOTER_LENGTH_SIZE = 4;
Expand All @@ -39,25 +38,7 @@ public class ParquetFileReader {
* If reading a single parquet file, root URI is the URI of the file, else the parent directory for a metadata file
*/
private final URI rootURI;
private final MessageType type;

/**
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link UncheckedIOException}.
*
* @param parquetFile The parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
* @return The new {@link ParquetFileReader}
*/
public static ParquetFileReader create(
@NotNull final File parquetFile,
@NotNull final SeekableChannelsProvider channelsProvider) {
try {
return new ParquetFileReader(convertToURI(parquetFile, false), channelsProvider);
} catch (final IOException e) {
throw new UncheckedIOException("Failed to create Parquet file reader: " + parquetFile, e);
}
}
private final MessageType schema;

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
Expand Down Expand Up @@ -91,7 +72,6 @@ private ParquetFileReader(
// Construct a new file URI for the parent directory
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
rootURI = parquetFileURI;
}
try (
Expand All @@ -102,7 +82,7 @@ private ParquetFileReader(
fileMetaData = Util.readFileMetaData(in);
}
}
type = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
schema = fromParquetSchema(fileMetaData.schema, fileMetaData.column_orders);
}

/**
Expand Down Expand Up @@ -148,87 +128,6 @@ public SeekableChannelsProvider getChannelsProvider() {
return channelsProvider;
}

private Set<String> columnsWithDictionaryUsedOnEveryDataPage = null;

/**
* Get the name of all columns that we can know for certain (a) have a dictionary, and (b) use the dictionary on all
* data pages.
*
* @return A set of parquet column names that satisfies the required condition.
*/
@SuppressWarnings("unused")
public Set<String> getColumnsWithDictionaryUsedOnEveryDataPage() {
if (columnsWithDictionaryUsedOnEveryDataPage == null) {
columnsWithDictionaryUsedOnEveryDataPage =
calculateColumnsWithDictionaryUsedOnEveryDataPage();
}
return columnsWithDictionaryUsedOnEveryDataPage;
}

/**
* True only if we are certain every data page in this column chunk uses dictionary encoding; note false also covers
* the "we can't tell" case.
*/
private static boolean columnChunkUsesDictionaryOnEveryPage(final ColumnChunk columnChunk) {
final ColumnMetaData columnMeta = columnChunk.getMeta_data();
if (columnMeta.encoding_stats == null) {
return false; // this is false as "don't know".
}
for (PageEncodingStats encodingStat : columnMeta.encoding_stats) {
if (encodingStat.page_type != PageType.DATA_PAGE
&& encodingStat.page_type != PageType.DATA_PAGE_V2) {
// skip non-data pages.
continue;
}
// this is a data page.
if (encodingStat.encoding != Encoding.PLAIN_DICTIONARY
&& encodingStat.encoding != Encoding.RLE_DICTIONARY) {
return false;
}
}
return true;
}

private Set<String> calculateColumnsWithDictionaryUsedOnEveryDataPage() {
final Set<String> result = new HashSet<>(fileMetaData.getSchemaSize());
final List<RowGroup> rowGroups = fileMetaData.getRow_groups();
final Iterator<RowGroup> riter = rowGroups.iterator();
if (!riter.hasNext()) {
// For an empty file we say all columns satisfy the property.
for (SchemaElement se : fileMetaData.getSchema()) {
if (!se.isSetNum_children()) { // We want only the leaves.
result.add(se.getName());
}
}
return result;
}
// On the first pass, for row group zero, we are going to add all columns to the set
// that satisfy the restriction.
// On later passes after zero, we will remove any column that does not satisfy
// the restriction.
final RowGroup rg0 = riter.next();
for (ColumnChunk columnChunk : rg0.columns) {
if (columnChunkUsesDictionaryOnEveryPage(columnChunk)) {
final String parquetColumnName = columnChunk.getMeta_data().path_in_schema.get(0);
result.add(parquetColumnName);
}
}

while (riter.hasNext()) {
final RowGroup rowGroup = riter.next();
for (ColumnChunk columnChunk : rowGroup.columns) {
final String parquetColumnName = columnChunk.getMeta_data().path_in_schema.get(0);
if (!result.contains(parquetColumnName)) {
continue;
}
if (!columnChunkUsesDictionaryOnEveryPage(columnChunk)) {
result.remove(parquetColumnName);
}
}
}
return result;
}

/**
* Create a {@link RowGroupReader} object for provided row group number
*
Expand All @@ -239,7 +138,7 @@ public RowGroupReader getRowGroup(final int groupNumber, final String version) {
fileMetaData.getRow_groups().get(groupNumber),
channelsProvider,
rootURI,
type,
schema,
getSchema(),
version);
}
Expand Down Expand Up @@ -463,24 +362,7 @@ private static LogicalTypeAnnotation getLogicalTypeFromConvertedType(
}
}

/**
* Helper method to determine if a logical type is adjusted to UTC.
*
* @param logicalType the logical type to check
* @return true if the logical type is a timestamp adjusted to UTC, false otherwise
*/
private static boolean isAdjustedToUTC(final LogicalType logicalType) {
if (logicalType.getSetField() == LogicalType._Fields.TIMESTAMP) {
return logicalType.getTIMESTAMP().isAdjustedToUTC;
}
return false;
}

public MessageType getSchema() {
return type;
}

public int rowGroupCount() {
return fileMetaData.getRow_groups().size();
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.parquet.table;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.testutil.junit4.EngineCleanup;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

import java.nio.file.Path;

import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static org.junit.Assume.assumeTrue;

/**
* Assumes that there is already a checkout of <a href="https://github.com/deephaven/examples">deephaven-examples</a>.
* This is currently meant to be run locally.
*/
public class ParquetDeephavenExamplesTest {

private static final Path CHECKOUT_ROOT = null; // Path.of("/path/to/deephaven-examples");

@Rule
public final EngineCleanup framework = new EngineCleanup();

@BeforeClass
public static void beforeClass() {
assumeTrue(CHECKOUT_ROOT != null);
}

@Test
public void pems() {
read("Pems/parquet/pems");
}

@Test
public void crypto() {
read("CryptoCurrencyHistory/Parquet/crypto.parquet");
read("CryptoCurrencyHistory/Parquet/crypto_sept7.parquet");
read("CryptoCurrencyHistory/Parquet/crypto_sept8.parquet");
read("CryptoCurrencyHistory/Parquet/CryptoTrades_20210922.parquet");
read("CryptoCurrencyHistory/Parquet/FakeCryptoTrades_20230209.parquet");
}

@Test
public void taxi() {
read("Taxi/parquet/taxi.parquet");
}

@Test
public void sensorData() {
read("SensorData/parquet/SensorData_gzip.parquet");
}

@Test
public void grades() {
assertTableEquals(
read("ParquetExamples/grades"),
read("ParquetExamples/grades_meta"));
assertTableEquals(
read("ParquetExamples/grades_flat").sort("Name", "Class"),
read("ParquetExamples/grades_flat_meta").sort("Name", "Class"));
assertTableEquals(
read("ParquetExamples/grades_kv").sort("Name", "Class"),
read("ParquetExamples/grades_kv_meta").sort("Name", "Class"));
}

private static Table read(String name) {
final String path = CHECKOUT_ROOT
.resolve(name)
.toUri()
.toString();
return ParquetTools.readTable(path).select();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
//
package io.deephaven.extensions.s3;


import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

public class CredentialsTest {
class CredentialsTest {

@Test
void defaultCredentials() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import java.time.Duration;
import java.util.Optional;

import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.fail;

public class S3InstructionsTest {
class S3InstructionsTest {

@Test
void defaults() {
Expand Down Expand Up @@ -63,7 +63,7 @@ void testMinMaxConcurrentRequests() {
.regionName("some-region")
.maxConcurrentRequests(-1)
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maxConcurrentRequests");
}
Expand All @@ -76,7 +76,7 @@ void tooSmallMaxConcurrentRequests() {
.regionName("some-region")
.maxConcurrentRequests(0)
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("maxConcurrentRequests");
}
Expand All @@ -99,7 +99,7 @@ void tooSmallReadAheadCount() {
.regionName("some-region")
.readAheadCount(-1)
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("readAheadCount");
}
Expand All @@ -122,7 +122,7 @@ void tooSmallFragmentSize() {
.regionName("some-region")
.fragmentSize(8 * (1 << 10) - 1)
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("fragmentSize");
}
Expand All @@ -145,7 +145,7 @@ void badCredentials() {
.regionName("some-region")
.credentials(new Credentials() {})
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("credentials");
}
Expand All @@ -158,7 +158,7 @@ void tooSmallWritePartSize() {
.regionName("some-region")
.writePartSize(1024)
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("writePartSize");
}
Expand All @@ -171,7 +171,7 @@ void tooSmallNumConcurrentWriteParts() {
.regionName("some-region")
.numConcurrentWriteParts(0)
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("numConcurrentWriteParts");
}
Expand All @@ -185,7 +185,7 @@ void tooLargeNumConcurrentWriteParts() {
.numConcurrentWriteParts(1001)
.maxConcurrentRequests(1000)
.build();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("numConcurrentWriteParts");
}
Expand Down Expand Up @@ -239,7 +239,7 @@ void testBadConfigFilePath() {
.configFilePath("/some/random/path")
.build()
.aggregatedProfileFile();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("/some/random/path");
}
Expand All @@ -252,7 +252,7 @@ void testBadCredentialsFilePath() {
.credentialsFilePath("/some/random/path")
.build()
.aggregatedProfileFile();
fail("Expected exception");
failBecauseExceptionWasNotThrown(IllegalStateException.class);
} catch (IllegalStateException e) {
assertThat(e).hasMessageContaining("/some/random/path");
}
Expand Down

0 comments on commit 4f8aa29

Please sign in to comment.