Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
josephcummings committed Mar 11, 2024
1 parent bf8f11c commit 2dce385
Show file tree
Hide file tree
Showing 43 changed files with 322 additions and 132 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -171,5 +171,5 @@ fabric.properties
# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored)
!gradle-wrapper.jar

# Generated certs
# Generated certificates
certs/
2 changes: 2 additions & 0 deletions db-client-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"
implementation 'org.slf4j:slf4j-api:2.0.9'
implementation "org.bouncycastle:bcprov-jdk18on:1.77"
implementation "org.bouncycastle:bcpkix-jdk18on:1.77"

testImplementation "org.junit.jupiter:junit-jupiter-api:${junitVersion}"
testImplementation "org.junit.jupiter:junit-jupiter-params:${junitVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public AbortProjection(final GrpcClient client, final String projectionName, fin
}

public CompletableFuture execute() {
return this.client.run(channel -> {
return this.client.run(options.getUserCertificate(), channel -> {
Projectionmanagement.DisableReq.Options.Builder optionsBuilder =
Projectionmanagement.DisableReq.Options.newBuilder()
.setName(this.projectionName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected Persistent.CreateReq.Settings.Builder createSettings(){

@SuppressWarnings({"unchecked", "deprecation"})
public CompletableFuture execute() {
return this.client.runWithArgs(args -> {
return this.client.runWithArgs(options.getUserCertificate(), args -> {
CompletableFuture result = new CompletableFuture();
PersistentSubscriptionsGrpc.PersistentSubscriptionsStub client =
GrpcUtils.configureStub(PersistentSubscriptionsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public AbstractDeletePersistentSubscription(GrpcClient client, String group, Del

@SuppressWarnings("unchecked")
public CompletableFuture execute() {
return this.client.runWithArgs(args -> {
return this.client.runWithArgs(options.getUserCertificate(), args -> {
CompletableFuture result = new CompletableFuture();
PersistentSubscriptionsGrpc.PersistentSubscriptionsStub client =
GrpcUtils.configureStub(PersistentSubscriptionsGrpc.newStub(args.getChannel()), this.client.getSettings(), this.options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void subscribe(Subscriber<? super ReadMessage> subscriber) {
subscriber.onSubscribe(readSubscription);

CompletableFuture<ReadSubscription> result = new CompletableFuture<>();
this.client.run(channel -> {
this.client.run(options.getUserCertificate(), channel -> {
StreamsOuterClass.ReadReq request = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected AbstractRegularSubscription(GrpcClient client, OptionsBase options) {

@SuppressWarnings("unchecked")
public CompletableFuture<Subscription> execute() {
return this.client.run(channel -> {
return this.client.run(options.getUserCertificate(), channel -> {
StreamsOuterClass.ReadReq readReq = StreamsOuterClass.ReadReq.newBuilder()
.setOptions(createOptions())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AbstractSubscribePersistentSubscription(GrpcClient connection, String gro
protected abstract Persistent.ReadReq.Options.Builder createOptions();

public CompletableFuture<PersistentSubscription> execute() {
return this.connection.runWithArgs(args -> {
return this.connection.runWithArgs(options.getUserCertificate(), args -> {
PersistentSubscriptionsGrpc.PersistentSubscriptionsStub client =
GrpcUtils.configureStub(PersistentSubscriptionsGrpc.newStub(args.getChannel()), this.connection.getSettings(), this.options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected Persistent.UpdateReq.Settings.Builder createSettings() {

@SuppressWarnings("unchecked")
public CompletableFuture execute() {
return this.connection.runWithArgs(args -> {
return this.connection.runWithArgs(options.getUserCertificate(), args -> {
CompletableFuture result = new CompletableFuture();
PersistentSubscriptionsGrpc.PersistentSubscriptionsStub client =
GrpcUtils.configureStub(PersistentSubscriptionsGrpc.newStub(args.getChannel()), this.connection.getSettings(), this.options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public AppendToStream(GrpcClient client, String streamName, Iterator<EventData>
}

public CompletableFuture<WriteResult> execute() {
return this.client.run(channel -> {
return this.client.run(options.getUserCertificate(), channel -> {
CompletableFuture<WriteResult> result = new CompletableFuture<>();
StreamsOuterClass.AppendReq.Options.Builder options = this.options.getExpectedRevision().applyOnWire(StreamsOuterClass.AppendReq.Options.newBuilder()
.setStreamIdentifier(Shared.StreamIdentifier.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,45 +25,45 @@ class ClusterDiscovery implements Discovery {
}
}

private static CompletableFuture<Optional<ClusterInfo.Member>> attemptDiscovery(NodeSelector selector, ConnectionState factory, InetSocketAddress seed) {
factory.connect(seed);
private static CompletableFuture<Optional<ClusterInfo.Member>> attemptDiscovery(NodeSelector selector, ConnectionState factory, InetSocketAddress seed, UserCertificate userCertificate) {
factory.connect(seed, userCertificate);
GossipClient client = new GossipClient(factory.getSettings(), factory.getCurrentChannel());
return client.read().thenApply(info -> {
if (factory.getLastConnectedEndpoint() != null) {
info.getMembers().removeIf(member -> member.getHttpEndpoint().equals(factory.getLastConnectedEndpoint()));
if (factory.getPreviousEndpoint() != null) {
info.getMembers().removeIf(member -> member.getHttpEndpoint().equals(factory.getPreviousEndpoint()));
}

return selector.determineBestFitNode(info);
});
}

@Override
public CompletableFuture<Void> run(ConnectionState state) {
return CompletableFuture.runAsync(() -> discover(state));
public CompletableFuture<Void> run(ConnectionState state, UserCertificate userCertificate) {
return CompletableFuture.runAsync(() -> discover(state, userCertificate));
}

void discover(ConnectionState state) {
void discover(ConnectionState state, UserCertificate userCertificate) {
List<InetSocketAddress> candidates = new ArrayList<>(this.seeds);

if (candidates.size() > 1) {
Collections.shuffle(candidates);

if (state.getLastConnectedEndpoint() != null) {
candidates.removeIf(candidate -> candidate.equals(state.getLastConnectedEndpoint()));
if (state.getPreviousEndpoint() != null) {
candidates.removeIf(candidate -> candidate.equals(state.getPreviousEndpoint()));
}
}

for (InetSocketAddress seed : candidates) {
logger.debug("Using seed node [{}] for cluster node discovery.", seed);
try {
Optional<ClusterInfo.Member> optionalMember = attemptDiscovery(this.nodeSelector, state, seed)
Optional<ClusterInfo.Member> optionalMember = attemptDiscovery(this.nodeSelector, state, seed, userCertificate)
.get(state.getSettings().getGossipTimeout(), TimeUnit.MILLISECONDS);

if (optionalMember.isPresent()) {
ClusterInfo.Member member = optionalMember.get();

if (!member.getHttpEndpoint().equals(state.getLastConnectedEndpoint())) {
state.connect(member.getHttpEndpoint());
if (!member.getHttpEndpoint().equals(state.getPreviousEndpoint())) {
state.connect(member.getHttpEndpoint(), userCertificate);
}

logger.debug("Selected cluster node [{}] in state [{}] for connection attempt.", member.getHttpEndpoint(), member.getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -99,15 +100,26 @@ private void forceExit() {
this.forceExit(null);
}

private void tryCreateChannel(InetSocketAddress endpoint, WorkItem workItem, UserCertificate userCertificate) {
try {
this.createChannel(this.channelId, endpoint, userCertificate);
} catch (RuntimeException e) {
workItem.accept(null, e);
throw e;
}
}

@Override
public void createChannel(UUID previousId, InetSocketAddress candidate) {
public void createChannel(UUID previousId, InetSocketAddress candidate, UserCertificate userCertificate) {
if (this.closed.get()) {
logger.warn("Channel creation request ignored, the connection to endpoint [{}] is already closed", this.connection.getLastConnectedEndpoint());
logger.warn("Channel creation request ignored, the connection to endpoint [{}] is already closed", this.connection.getPreviousEndpoint());
return;
}

if (!this.channelId.equals(previousId)) {
logger.debug("Skipping connection attempt as new connection to endpoint [{}] has already been created.", this.connection.getLastConnectedEndpoint());
// Skip creating channel if a new one has already been created
// unless a new ssl context must be created (new user certificate).
if (!this.channelId.equals(previousId) && !this.connection.shouldRecreateSslContext(userCertificate)) {
logger.debug("Skipping connection attempt as new connection to endpoint [{}] has already been created.", this.connection.getPreviousEndpoint());
return;
}

Expand All @@ -122,12 +134,12 @@ public void createChannel(UUID previousId, InetSocketAddress candidate) {

// Node selection.
if (candidate != null) {
this.connection.connect(candidate);
this.connection.connect(candidate, userCertificate);
logger.debug("Prepared channel to proposed leader candidate [{}]", candidate);
} else {
try {
// TODO - Should we consider a discovery timeout?
this.discovery.run(this.connection).get();
this.discovery.run(this.connection, userCertificate).get();
} catch (InterruptedException e) {
forceExit(e);
} catch (ExecutionException e) {
Expand All @@ -143,7 +155,7 @@ public void createChannel(UUID previousId, InetSocketAddress candidate) {
if (this.loadServerFeatures()) {
this.channelId = UUID.randomUUID();
this.connection.confirmChannel();
logger.info("Connection to endpoint [{}] created successfully", this.connection.getLastConnectedEndpoint());
logger.info("Connection to endpoint [{}] created successfully", this.connection.getPreviousEndpoint());
break;
}

Expand All @@ -157,27 +169,33 @@ public void createChannel(UUID previousId, InetSocketAddress candidate) {
@Override
public void process(RunWorkItem args) {
if (this.closed.get()) {
logger.warn("Receive a command request but the connection to endpoint [{}] is already closed", this.connection.getLastConnectedEndpoint());
logger.warn("Receive a command request but the connection to endpoint [{}] is already closed", this.connection.getPreviousEndpoint());
args.getItem().accept(null, new ConnectionShutdownException());
return;
}

// Always fall back to the default user certificate provided on the connection string (if any)
UserCertificate userCertificateOrDefault =
args.getUserCertificateOrDefault(this.settings.getDefaultUserCertificate());

// It's possible we haven't connected yet.
if (this.connection.getCurrentChannel() == null) {
logger.debug("Channel is not resolved yet, connecting...");
tryCreateChannel(null, args.getItem(), userCertificateOrDefault);
}

try {
this.createChannel(this.channelId, null);
} catch (RuntimeException e) {
args.getItem().accept(null, e);
throw e;
}
if (this.connection.shouldRecreateSslContext(userCertificateOrDefault)) {
logger.debug("Creating new channel with new user certificate and SSL context...");
tryCreateChannel(
this.connection.getPreviousEndpoint(),
args.getItem(),
userCertificateOrDefault);
}

WorkItemArgs workArgs = new WorkItemArgs(
this.channelId,
this.connection.getCurrentChannel(),
this.connection.getLastConnectedEndpoint(),
this.connection.getPreviousEndpoint(),
this.serverInfo);

args.getItem().accept(workArgs, null);
Expand All @@ -190,11 +208,11 @@ public void shutdown(Shutdown args) {
return;
}

logger.info("Received a shutdown request, closing connection to endpoint [{}]", this.connection.getLastConnectedEndpoint());
logger.info("Received a shutdown request, closing connection to endpoint [{}]", this.connection.getPreviousEndpoint());
this.closed.set(true);
this.connection.shutdown();
this.drainPendingRequests();
logger.info("Connection to endpoint [{}] was closed successfully", this.connection.getLastConnectedEndpoint());
logger.info("Connection to endpoint [{}] was closed successfully", this.connection.getPreviousEndpoint());
args.complete();
throw new ConnectionShutdownException();
}
Expand Down
Loading

0 comments on commit 2dce385

Please sign in to comment.