Skip to content

Commit

Permalink
Added SessionFactoryCreator and related helpers (deephaven#5386)
Browse files Browse the repository at this point in the history
This is the java client related changes to support deephaven#5374. Note, that we may need to prioritize deephaven/web-client-ui#1947 as follow-up.
  • Loading branch information
devinrsmith authored Apr 24, 2024
1 parent 8becc41 commit 66cede6
Show file tree
Hide file tree
Showing 48 changed files with 1,035 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.annotations;

import org.immutables.value.Value;
import org.immutables.value.Value.Immutable;
import org.immutables.value.Value.Style.ImplementationVisibility;

import java.lang.annotation.ElementType;
Expand All @@ -12,8 +13,8 @@
import java.lang.annotation.Target;

/**
* A style for objects that should declare a builder interface to use for construction. Recommended for objects with
* more than two fields, or default fields.
* A style for objects that should declare a builder interface to use for construction. Disables
* {@link Immutable#copy()}. Recommended for objects with more than two fields, or default fields.
*/
@Target({ElementType.TYPE, ElementType.PACKAGE})
@Retention(RetentionPolicy.CLASS)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.annotations;

import org.immutables.value.Value;
import org.immutables.value.Value.Style.ImplementationVisibility;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* A style for objects that should declare a builder interface to use for construction and allows copy. Recommended for
* objects with more than two fields, or default fields. If copy is not needed, prefer {@link BuildableStyle}.
*/
@Target({ElementType.TYPE, ElementType.PACKAGE})
@Retention(RetentionPolicy.CLASS)
@Value.Style(visibility = ImplementationVisibility.PACKAGE,
defaults = @Value.Immutable(copy = true),
strictBuilder = true,
weakInterning = true,
jdkOnly = true,
includeHashCode = "getClass().hashCode()")
public @interface CopyableStyle {
// Note: this produces ImmutableX.builder()s for the implementation classes
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
//
package io.deephaven.extensions.s3;

import io.deephaven.annotations.CopyableStyle;
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.log.LogOutputAppendable;
import io.deephaven.configuration.Configuration;
import org.immutables.value.Value;
import org.immutables.value.Value.Check;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;
Expand All @@ -22,12 +22,7 @@
* documented in this class may change in the future. As such, callers may wish to explicitly set the values.
*/
@Immutable
// Almost the same as BuildableStyle, but has copy-ability to support withReadAheadCount
@Value.Style(visibility = Value.Style.ImplementationVisibility.PACKAGE,
defaults = @Value.Immutable(copy = true),
strictBuilder = true,
weakInterning = true,
jdkOnly = true)
@CopyableStyle
public abstract class S3Instructions implements LogOutputAppendable {

private final static int DEFAULT_MAX_CONCURRENT_REQUESTS = 50;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
@Subcomponent(modules = {SessionImplModule.class, FlightSessionModule.class, BarrageSessionModule.class})
public interface BarrageSubcomponent extends BarrageSessionFactory {

@Override
BarrageSession newBarrageSession();

@Override
ManagedChannel managedChannel();

@Module(subcomponents = {BarrageSubcomponent.class})
interface DeephavenClientSubcomponentModule {

}

@Subcomponent.Builder
interface Builder extends BarrageSessionFactoryBuilder {
interface Builder {
Builder managedChannel(@BindsInstance ManagedChannel channel);

Builder scheduler(@BindsInstance ScheduledExecutorService scheduler);
Expand Down
2 changes: 1 addition & 1 deletion java-client/barrage-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ plugins {
}

dependencies {
implementation project(':java-client-barrage-dagger')
implementation project(':java-client-barrage')
implementation project(':java-client-example-utilities')

Classpaths.inheritJUnitPlatform(project)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package io.deephaven.client.examples;

import io.deephaven.client.impl.BarrageSession;
import io.deephaven.client.impl.BarrageSessionFactory;
import io.deephaven.client.impl.BarrageSubcomponent.Builder;
import io.deephaven.client.impl.DaggerDeephavenBarrageRoot;
import io.deephaven.client.impl.BarrageSessionFactoryConfig;
import io.deephaven.client.impl.BarrageSessionFactoryConfig.Factory;
import io.deephaven.client.impl.SessionConfig;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.updategraph.impl.PeriodicUpdateGraph;
import io.deephaven.util.SafeCloseable;
Expand Down Expand Up @@ -34,10 +34,14 @@ abstract class BarrageClientExampleBase implements Callable<Void> {
public final Void call() throws Exception {
final BufferAllocator bufferAllocator = new RootAllocator();
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4);
ManagedChannel managedChannel = ConnectOptions.open(connectOptions);

final Factory factory = BarrageSessionFactoryConfig.builder()
.clientConfig(ConnectOptions.options(connectOptions).config())
.allocator(bufferAllocator)
.scheduler(scheduler)
.build()
.factory();
Runtime.getRuntime()
.addShutdownHook(new Thread(() -> onShutdown(scheduler, managedChannel)));
.addShutdownHook(new Thread(() -> onShutdown(scheduler, factory.managedChannel())));

// Note that a DEFAULT update graph is required for engine operation. Users may wish to create additional update
// graphs for their own purposes, but the DEFAULT must be created first.
Expand All @@ -54,31 +58,28 @@ public final Void call() throws Exception {
.newQueryLibrary()
.setUpdateGraph(updateGraph)
.build();

final Builder builder = DaggerDeephavenBarrageRoot.create().factoryBuilder()
.managedChannel(managedChannel)
.scheduler(scheduler)
.allocator(bufferAllocator);
if (authenticationOptions != null) {
authenticationOptions.ifPresent(builder::authenticationTypeAndValue);
}
final BarrageSessionFactory barrageFactory = builder.build();
final BarrageSession deephavenSession = barrageFactory.newBarrageSession();
try {
try (final SafeCloseable ignored = executionContext.open()) {
try (
final BarrageSession deephavenSession = factory.newBarrageSession(sessionConfig());
final SafeCloseable ignored = executionContext.open()) {
try {
execute(deephavenSession);
} finally {
deephavenSession.close();
deephavenSession.session().closeFuture().get(5, TimeUnit.SECONDS);
}
} finally {
deephavenSession.session().closeFuture().get(5, TimeUnit.SECONDS);
}

scheduler.shutdownNow();
managedChannel.shutdownNow();
factory.managedChannel().shutdownNow();
return null;
}

private SessionConfig sessionConfig() {
final SessionConfig.Builder builder = SessionConfig.builder();
if (authenticationOptions != null) {
authenticationOptions.ifPresent(builder::authenticationTypeAndValue);
}
return builder.build();
}

private static void onShutdown(final ScheduledExecutorService scheduler,
final ManagedChannel managedChannel) {
scheduler.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@

public class BarrageSession extends FlightSession implements BarrageSubscription.Factory, BarrageSnapshot.Factory {

/**
* Creates a barrage session. Closing the barrage session does <b>not</b> close {@code channel}.
*
* @param session the session
* @param incomingAllocator the incoming allocator
* @param channel the managed channel
* @return the barrage session
*/
public static BarrageSession of(
SessionImpl session, BufferAllocator incomingAllocator, ManagedChannel channel) {
final FlightClient client = FlightGrpcUtilsExtension.createFlightClientWithSharedChannel(
incomingAllocator, channel, Collections.singletonList(new SessionMiddleware(session)));
return new BarrageSession(session, client, channel);
return new BarrageSession(session, client);
}

protected BarrageSession(
final SessionImpl session, final FlightClient client, final ManagedChannel channel) {
final SessionImpl session, final FlightClient client) {
super(session, client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
//
package io.deephaven.client.impl;

import io.grpc.ManagedChannel;

public interface BarrageSessionFactory {

/**
* Creates a new {@link BarrageSession}. Closing the session does <b>not</b> close the {@link #managedChannel()}.
*
* @return the new barrage session
*/
BarrageSession newBarrageSession();

/**
* The {@link ManagedChannel} associated with {@code this} factory. Use {@link ManagedChannel#shutdown()} when
* {@code this} factory and sessions are no longer needed.
*
* @return the managed channel
*/
ManagedChannel managedChannel();
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
//
package io.deephaven.client.impl;

import io.deephaven.annotations.BuildableStyle;
import io.grpc.ManagedChannel;
import org.apache.arrow.memory.BufferAllocator;
import org.immutables.value.Value.Default;
import org.immutables.value.Value.Immutable;

import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;

@Immutable
@BuildableStyle
public abstract class BarrageSessionFactoryConfig {
private static final SessionConfig SESSION_CONFIG_EMPTY = SessionConfig.builder().build();

public static Builder builder() {
return ImmutableBarrageSessionFactoryConfig.builder();
}

/**
* The client configuration.
*/
public abstract ClientConfig clientConfig();

/**
* The client channel factory. By default is {@link ClientChannelFactory#defaultInstance()}.
*/
@Default
public ClientChannelFactory clientChannelFactory() {
return ClientChannelFactory.defaultInstance();
}

/**
* The default session config, used by the factory when {@link SessionConfig} is not provided. By default is
* {@code SessionConfig.builder().build()}.
*/
@Default
public SessionConfig sessionConfig() {
return SESSION_CONFIG_EMPTY;
}

/**
* The scheduler, used by the factory when {@link SessionConfig#scheduler()} is not set.
*/
public abstract ScheduledExecutorService scheduler();

/**
* The allocator.
*/
public abstract BufferAllocator allocator();

/**
* Creates a new factory with a new {@link ManagedChannel}.
*
* @return the factory
*/
public final Factory factory() {
return new Factory(SessionFactoryConfig.builder()
.clientConfig(clientConfig())
.clientChannelFactory(clientChannelFactory())
.sessionConfig(sessionConfig())
.scheduler(scheduler())
.build()
.factory());
}

public final class Factory implements BarrageSessionFactory {
private final SessionFactoryConfig.Factory factory;

private Factory(SessionFactoryConfig.Factory factory) {
this.factory = Objects.requireNonNull(factory);
}

@Override
public BarrageSession newBarrageSession() {
final Session session = factory.newSession();
return BarrageSession.of((SessionImpl) session, allocator(), factory.managedChannel());
}

/**
* Creates a new {@link BarrageSession} with {@code sessionConfig}. Closing the session does <b>not</b> close
* the {@link #managedChannel()}.
*
* @param sessionConfig the session config
* @return the new barrage session
*/
public BarrageSession newBarrageSession(SessionConfig sessionConfig) {
final Session session = factory.newSession(sessionConfig);
return BarrageSession.of((SessionImpl) session, allocator(), factory.managedChannel());
}

@Override
public ManagedChannel managedChannel() {
return factory.managedChannel();
}
}

// ------------------------------------------------

public interface Builder {

Builder clientConfig(ClientConfig clientConfig);

Builder clientChannelFactory(ClientChannelFactory clientChannelFactory);

Builder sessionConfig(SessionConfig sessionConfig);

Builder scheduler(ScheduledExecutorService scheduler);

Builder allocator(BufferAllocator allocator);

BarrageSessionFactoryConfig build();
}
}
Loading

0 comments on commit 66cede6

Please sign in to comment.