Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BC-6130 add createAndWait and extendAndWait #1

Closed
wants to merge 11 commits into from
30 changes: 29 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.6.4</quarkus.platform.version>
<quarkus.platform.version>3.6.5</quarkus.platform.version>
<skipITs>true</skipITs>
<surefire-plugin.version>3.1.2</surefire-plugin.version>
</properties>
Expand All @@ -38,6 +38,34 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mongodb-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-mongodb-panache</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-mockito</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.25.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mongodb</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive-jackson</artifactId>
Expand Down
157 changes: 22 additions & 135 deletions src/main/java/de/svs/Namespace.java
Original file line number Diff line number Diff line change
@@ -1,148 +1,35 @@
package de.svs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.result.UpdateResult;
import de.svs.status.NamespaceStatus;
import io.quarkus.qute.CheckedTemplate;
import io.quarkus.qute.TemplateInstance;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.unchecked.Unchecked;
import jakarta.inject.Inject;
import jakarta.ws.rs.*;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import org.bson.Document;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.RestStreamElementType;
import io.quarkus.mongodb.panache.PanacheMongoEntityBase;
import io.quarkus.mongodb.panache.common.MongoEntity;
import org.bson.codecs.pojo.annotations.BsonId;
import org.bson.types.ObjectId;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Updates.set;
@MongoEntity(collection = "namespaces")
public class Namespace extends PanacheMongoEntityBase {
@BsonId
public ObjectId id = new ObjectId();
public String name;
public Instant activatedUntil;

@Path("/namespace")
public class Namespace {

private static final Logger logger = Logger.getLogger(Namespace.class);

@ConfigProperty(name = "namespace.mongodb.name", defaultValue = "keda")
String mongoDbName;

@Inject
MongoClient mongoClient;

@Inject
Sse sse;

@Inject
ObjectMapper objectMapper;

@ConfigProperty(name = "namespace.activationHours", defaultValue = "48")
int activationHours;

@ConfigProperty(name = "externalHostName", defaultValue = "localhost")
String externalHostName;

@ConfigProperty(name = "baseDomain")
String baseDomain;

@CheckedTemplate
public static class Templates {
public static native TemplateInstance namespace(String host, String defaultNamespace, String message, boolean pollNamespace);
public static Optional<Namespace> findByName(String name) {
return find("name", name).singleResultOptional();
}


@GET
@Produces(MediaType.TEXT_HTML)
public TemplateInstance get(@QueryParam("namespace") Optional<String> namespace, @QueryParam("redirected-from-503") Optional<Boolean> gotRedirectedFrom503) {
return Templates.namespace(this.externalHostName,
namespace.orElse(""),
gotRedirectedFrom503.filter(Boolean::booleanValue)
.map(b -> "You got here because your namespace appears to be deactivated")
.orElse(null),
false);
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Namespace that = (Namespace) o;
return Objects.equals(id, that.id);
}

@POST
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
@Produces(MediaType.TEXT_HTML)
public TemplateInstance post(@FormParam("namespace") String namespace, @FormParam("action") Action action) {
logger.info("post called " + namespace + " " + action);

final Instant activatedUntil = switch (action) {
case ACTIVATE -> Instant.now().plus(activationHours, ChronoUnit.HOURS);
case DEACTIVATE -> Instant.EPOCH;
};

UpdateResult updateResult = getCollection().updateOne(
and(eq("name", namespace), exists("activatedUntil", true)),
set("activatedUntil", activatedUntil));

final boolean pollNamespace;
final String message;
if (updateResult.getMatchedCount() > 0) {
if (action == Action.ACTIVATE) {
message = "namespace " + namespace + " is now activated until " + activatedUntil;
pollNamespace = true;
} else {
message = "namespace " + namespace + " has been deactivated";
pollNamespace = false;
}
} else {
message = namespace + " not found";
pollNamespace = false;
}
logger.info(message);
return Templates.namespace(this.externalHostName, namespace, message, pollNamespace);
}

@PUT
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public NamespaceDto createNamespaceEntry(NamespaceDto dto) {
Instant activatedUntil = Instant.now().plus(activationHours, ChronoUnit.HOURS);
getCollection().updateOne(
and(eq("name", dto.getName())),
set("activatedUntil", activatedUntil),
new UpdateOptions().upsert(true));
dto.setActivatedUntil(activatedUntil);
logger.info("namespace " + dto.getName() + " is now activated until " + activatedUntil);
return dto;
}

@Path("/status")
@GET()
@RestStreamElementType(MediaType.TEXT_PLAIN)
public Multi<OutboundSseEvent> status(@QueryParam("namespace") String namespace) {
AtomicBoolean finalMessageReceived = new AtomicBoolean();

return Multi.createBy()
.repeating()
.supplier(Unchecked.supplier(() -> new NamespaceStatus(objectMapper, baseDomain).get(namespace)))
.withDelay(Duration.ofSeconds(2))
.until(outboundSseEvent -> finalMessageReceived.getAndSet(outboundSseEvent.finalMessage()))
.map(Unchecked.function(statusDto -> sse.newEventBuilder()
.name("namespace-status")
.data(String.class, objectMapper.writeValueAsString(statusDto))
.build()))
.select()
.first(10);
@Override
public int hashCode() {
return Objects.hash(id);
}

private MongoCollection<Document> getCollection() {
MongoCollection<Document> namespaces = mongoClient.getDatabase(mongoDbName).getCollection("namespaces");
namespaces.createIndex(eq("name", 1), new IndexOptions().unique(true));
return namespaces;
}

}
47 changes: 47 additions & 0 deletions src/main/java/de/svs/NamespaceActivationWaiter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package de.svs;

import com.fasterxml.jackson.databind.ObjectMapper;
import de.svs.status.NamespaceStatus;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.unchecked.Unchecked;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;

@ApplicationScoped
public class NamespaceActivationWaiter {
@Inject
ObjectMapper objectMapper;

@Inject
Sse sse;

@ConfigProperty(name = "baseDomain")
String baseDomain;

@ConfigProperty(name = "waiter.delayInSeconds", defaultValue = "2")
int delayInSeconds;

Multi<OutboundSseEvent> waitForNamespaceToBecomeAvailable(String namespace, int maxWaitTimeInSeconds) {
int delayInSeconds = 2;
int maxTries = maxWaitTimeInSeconds / delayInSeconds;
AtomicBoolean finalMessageReceived = new AtomicBoolean();

return Multi.createBy()
.repeating()
.supplier(Unchecked.supplier(() -> new NamespaceStatus(objectMapper, baseDomain).get(namespace)))
.withDelay(Duration.ofSeconds(delayInSeconds))
.until(outboundSseEvent -> finalMessageReceived.getAndSet(outboundSseEvent.finalMessage()))
.map(Unchecked.function(statusDto -> sse.newEventBuilder()
.name("namespace-status")
.data(String.class, objectMapper.writeValueAsString(statusDto))
.build()))
.select()
.first(maxTries);
}
}
Loading