Skip to content

Commit

Permalink
JSON configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith committed Mar 6, 2024
1 parent ef123cb commit a49987d
Show file tree
Hide file tree
Showing 145 changed files with 12,924 additions and 5 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/groovy/Classpaths.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Classpaths {

static final String JACKSON_GROUP = 'com.fasterxml.jackson'
static final String JACKSON_NAME = 'jackson-bom'
static final String JACKSON_VERSION = '2.14.1'
static final String JACKSON_VERSION = '2.16.1'

static final String SSLCONTEXT_GROUP = 'io.github.hakky54'
static final String SSLCONTEXT_VERSION = '8.1.1'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.NoSuchColumnException;
import io.deephaven.io.log.impl.LogOutputStringImpl;
import io.deephaven.qst.column.header.ColumnHeader;
import io.deephaven.qst.type.Type;
import org.jetbrains.annotations.NotNull;

import java.util.Map.Entry;
Expand Down Expand Up @@ -49,7 +50,7 @@ public static TableDefinition inferFrom(@NotNull final Map<String, ? extends Col
return new TableDefinition(definitions);
}

public static TableDefinition from(@NotNull final Iterable<ColumnHeader<?>> headers) {
public static TableDefinition from(@NotNull final Iterable<? extends ColumnHeader<?>> headers) {
final List<ColumnDefinition<?>> definitions = new ArrayList<>();
for (ColumnHeader<?> columnHeader : headers) {
final ColumnDefinition<?> columnDefinition = ColumnDefinition.from(columnHeader);
Expand All @@ -58,6 +59,30 @@ public static TableDefinition from(@NotNull final Iterable<ColumnHeader<?>> head
return new TableDefinition(definitions);
}

public static TableDefinition from(
final Collection<String> columnNames,
final Collection<Type<?>> columnTypes) {
final int size = columnNames.size();
if (columnTypes.size() != size) {
throw new IllegalArgumentException("todo");
}
final List<ColumnDefinition<?>> definitions = new ArrayList<>(size);
final Iterator<String> names = columnNames.iterator();
final Iterator<Type<?>> types = columnTypes.iterator();
while (names.hasNext()) {
if (!types.hasNext()) {
throw new IllegalStateException("Iterator / Collection error");
}
final String name = names.next();
final Type<?> type = types.next();
definitions.add(ColumnDefinition.of(name, type));
}
if (types.hasNext()) {
throw new IllegalStateException("Iterator / Collection error");
}
return new TableDefinition(definitions);
}

/**
* Convenience factory method for use with parallel arrays of column names and data types. All
* {@link ColumnDefinition column definitions} will have default {@link ColumnDefinition#getColumnType() component
Expand Down
3 changes: 3 additions & 0 deletions engine/processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ dependencies {
api project(':qst-type')
api project(':engine-chunk')

implementation project(':engine-time')
Classpaths.inheritImmutables(project)

Classpaths.inheritJUnitPlatform(project)
Classpaths.inheritAssertJ(project)
testImplementation 'org.junit.jupiter:junit-jupiter'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.processor;

import io.deephaven.annotations.BuildableStyle;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Immutable;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

@Immutable
@BuildableStyle
public abstract class NamedObjectProcessor<T> {

public static <T> Builder<T> builder() {
return ImmutableNamedObjectProcessor.builder();
}

public static <T> NamedObjectProcessor<T> of(ObjectProcessor<T> processor, String... names) {
return NamedObjectProcessor.<T>builder().processor(processor).addColumnNames(names).build();
}

public static <T> NamedObjectProcessor<T> of(ObjectProcessor<T> processor, Iterable<String> names) {
return NamedObjectProcessor.<T>builder().processor(processor).addAllColumnNames(names).build();
}

public static <T> NamedObjectProcessor<T> prefix(ObjectProcessor<T> processor, String prefix) {
final int size = processor.size();
if (size == 1) {
return of(processor, prefix);
}
return of(processor, IntStream.range(0, size).mapToObj(ix -> prefix + "_" + ix).collect(Collectors.toList()));
}

public abstract ObjectProcessor<T> processor();

public abstract List<String> columnNames();

public interface Builder<T> {
Builder<T> processor(ObjectProcessor<T> processor);

Builder<T> addColumnNames(String element);

Builder<T> addColumnNames(String... elements);

Builder<T> addAllColumnNames(Iterable<String> elements);

NamedObjectProcessor<T> build();
}


public interface Provider {

/**
* Creates a named object processor that can process the input type {@code inputType}.
*
* @param inputType the input type
* @return the named object processor
* @param <T> the input type
*/
<T> NamedObjectProcessor<? super T> named(Class<T> inputType);
}

@Check
final void checkSizes() {
if (columnNames().size() != processor().size()) {
throw new IllegalArgumentException(
String.format("Unmatched sizes; columnNames().size()=%d, processor().size()=%d",
columnNames().size(), processor().size()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import io.deephaven.qst.type.ShortType;
import io.deephaven.qst.type.Type;

import java.time.Instant;
import java.util.List;
import java.util.function.Function;

/**
* An interface for processing data from one or more input objects into output chunks on a 1-to-1 input record to output
Expand Down Expand Up @@ -82,6 +82,47 @@ static <T> ObjectProcessor<T> noop(List<Type<?>> outputTypes, boolean fillWithNu
return new ObjectProcessorNoop<>(outputTypes, fillWithNullValue);
}

/**
* Creates a "combined" object processor. The resulting {@link ObjectProcessor#outputTypes()} is the combined
* in-order outputs from {@code processors} and the {@link ObjectProcessor#processAll(ObjectChunk, List)}
* implementation is the in-order processing of {@code processors}.
*
* @param processors the processors
* @return the combined object processor
* @param <T> the object type
*/
static <T> ObjectProcessor<T> combined(List<ObjectProcessor<? super T>> processors) {
return ObjectProcessorCombined.of(processors);
}

/**
* Creates a mapped object processor by applying the function {@code f} to the in chunk of type {@code T} to produce
* a mapped in chunk of type {@code R}, and then delegates to {@code delegate} for processing.
*
* @param f the function
* @param delegate the delegate
* @return the mapped object processor
* @param <T> the in type
* @param <R> the mapped in type
*/
static <T, R> ObjectProcessor<T> map(Function<? super T, ? extends R> f, ObjectProcessor<? super R> delegate) {
return ObjectProcessorMap.of(f, delegate);
}

/**
*
* @param inType the input type
* @return the object processor
* @param <T> the input type
*/
static <T> ObjectProcessor<T> simple(GenericType<T> inType) {
return ObjectProcessorSimple.of(inType);
}

static <T> ObjectProcessor<T> empty() {
return ObjectProcessorEmpty.of();
}

/**
* The relationship between {@link #outputTypes() output types} and the {@link #processAll(ObjectChunk, List)
* processAll out param} {@link WritableChunk#getChunkType()}.
Expand Down Expand Up @@ -141,6 +182,15 @@ static ChunkType chunkType(Type<?> type) {
return ObjectProcessorTypes.of(type);
}

/**
* The number of outputs. Equivalent to {@code outputTypes().size()}.
*
* @return the number of outputs
*/
default int size() {
return outputTypes().size();
}

/**
* The logical output types {@code this} instance processes. The size and types correspond to the expected size and
* {@link io.deephaven.chunk.ChunkType chunk types} for {@link #processAll(ObjectChunk, List)} as specified by
Expand Down Expand Up @@ -168,4 +218,16 @@ static ChunkType chunkType(Type<?> type) {
* at least {@code in.size()}
*/
void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out);

interface Provider {

/**
* Creates an object processor that can process the input type {@code inputType}.
*
* @param inputType the input type
* @return the object processor
* @param <T> the input type
*/
<T> ObjectProcessor<? super T> processor(Class<T> inputType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.processor;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.qst.type.Type;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

final class ObjectProcessorCombined<T> implements ObjectProcessor<T> {

public static <T> ObjectProcessor<T> of(List<ObjectProcessor<? super T>> processors) {
final List<ObjectProcessor<? super T>> actual = processors.stream()
.flatMap(ObjectProcessorCombined::destructure)
.collect(Collectors.toUnmodifiableList());
if (actual.isEmpty()) {
return ObjectProcessor.empty();
}
if (actual.size() == 1) {
// noinspection unchecked
return (ObjectProcessor<T>) actual.get(0);
}
return new ObjectProcessorCombined<>(actual);
}

private static <T> Stream<ObjectProcessor<? super T>> destructure(ObjectProcessor<T> processor) {
return processor instanceof ObjectProcessorCombined
? ((ObjectProcessorCombined<T>) processor).processors.stream()
: (processor instanceof ObjectProcessorEmpty
? Stream.empty()
: Stream.of(processor));
}


private final List<ObjectProcessor<? super T>> processors;

private ObjectProcessorCombined(List<ObjectProcessor<? super T>> processors) {
this.processors = Objects.requireNonNull(processors);
}

@Override
public int size() {
return processors.stream().mapToInt(ObjectProcessor::size).sum();
}

@Override
public List<Type<?>> outputTypes() {
return processors.stream()
.map(ObjectProcessor::outputTypes)
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

@Override
public void processAll(ObjectChunk<? extends T, ?> in, List<WritableChunk<?>> out) {
int outIx = 0;
for (ObjectProcessor<? super T> processor : processors) {
final int toIx = outIx + processor.size();
processor.processAll(in, out.subList(outIx, toIx));
outIx = toIx;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
*/
package io.deephaven.processor;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.qst.type.Type;

import java.util.List;

enum ObjectProcessorEmpty implements ObjectProcessor<Object> {
OBJECT_PROCESSOR_EMPTY;

public static <T> ObjectProcessor<T> of() {
// noinspection unchecked
return (ObjectProcessor<T>) OBJECT_PROCESSOR_EMPTY;
}

@Override
public List<Type<?>> outputTypes() {
return List.of();
}

@Override
public void processAll(ObjectChunk<?, ?> in, List<WritableChunk<?>> out) {

}
}
Loading

0 comments on commit a49987d

Please sign in to comment.