Skip to content

Commit

Permalink
integration with data hub works
Browse files Browse the repository at this point in the history
unit tests completely broken
  • Loading branch information
DC2-DanielKrueger committed Nov 20, 2024
1 parent 9f1906b commit 7272bd0
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.hivemq.adapter.sdk.api.eventsv2.Event;
import com.hivemq.extension.sdk.api.annotations.NotNull;

import java.util.concurrent.CompletableFuture;

public interface EventServiceHandling {

@NotNull ListenableFuture<EventServiceResult> apply(final @NotNull Event coolEvent);
@NotNull
EventServiceResult apply(final @NotNull Event event);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.hivemq.bootstrap.factories;

import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.mqtt.message.publish.PUBLISH;

Expand All @@ -23,11 +24,21 @@ public class EventServiceResult {
private final boolean preventPublish;
private final @Nullable PUBLISH createdPublish;

public EventServiceResult(boolean preventPublish, @Nullable PUBLISH createdPublish) {
private EventServiceResult(final boolean preventPublish, @Nullable final PUBLISH createdPublish) {
this.preventPublish = preventPublish;
this.createdPublish = createdPublish;
}


public static @NotNull EventServiceResult preventPublishing() {
return new EventServiceResult(true, null);
}

public static @NotNull EventServiceResult allowPublishing(final @NotNull PUBLISH createdPublish) {
return new EventServiceResult(false, createdPublish);
}


public boolean isPreventPublish() {
return preventPublish;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.edge.impl.events.v2;

import com.google.common.base.Preconditions;
import com.hivemq.adapter.sdk.api.eventsv2.Event;
import com.hivemq.adapter.sdk.api.eventsv2.EventsService;
import com.hivemq.bootstrap.factories.EventServiceHandling;
import com.hivemq.bootstrap.factories.EventServiceHandlingProvider;
import com.hivemq.bootstrap.factories.EventServiceResult;
import com.hivemq.configuration.service.ConfigurationService;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.extension.sdk.api.services.publish.Publish;
import com.hivemq.extension.sdk.api.services.publish.PublishService;
import com.hivemq.extensions.services.builder.PublishBuilderImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.List;

/**
* SPI delegate which wraps multiple (chained) implementations and
* manages the event listeners
*
* @author Simon L Johnson
*/
@Singleton
public class EventServiceDelegateImpl implements EventsService {

private final @NotNull InMemoryEvent eventStore;
private final @NotNull EventServiceHandlingProvider eventServiceHandlingProvider;
private final @NotNull PublishService publishService;
private final @NotNull ConfigurationService configurationService;

@Inject
public EventServiceDelegateImpl(
final @NotNull InMemoryEvent eventStore,
final @NotNull EventServiceHandlingProvider eventServiceHandlingProvider,
final @NotNull PublishService publishService,
final @NotNull ConfigurationService configurationService) {
this.eventServiceHandlingProvider = eventServiceHandlingProvider;
this.publishService = publishService;
this.configurationService = configurationService;
Preconditions.checkNotNull(eventStore);
this.eventStore = eventStore;
}

@Override
public void publish(final @NotNull Event event) {

final EventServiceHandling eventServiceHandling = eventServiceHandlingProvider.get();
if (eventServiceHandling != null) {
final EventServiceResult eventServiceResult = eventServiceHandling.apply(event);
if (!eventServiceResult.isPreventPublish()) {
// publish can not be null if preventPublish is false
assert eventServiceResult.getModifiedPublish() != null;
final Publish publish =
new PublishBuilderImpl(configurationService).fromPublish(eventServiceResult.getModifiedPublish())
.build();
publishService.publish(publish);
}
}
eventStore.storeEvent(event);
}

@Override
public @NotNull List<Event> readEvents(
final @Nullable Long sinceTimestamp, final @Nullable Integer limit) {
return eventStore.readEvents(sinceTimestamp, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2019-present HiveMQ GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hivemq.edge.impl.events.v2;

import com.hivemq.adapter.sdk.api.eventsv2.Event;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.util.RollingList;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Simple rolling list implementation optimized for fast writes but slow reads as requires sort on read. (Handled
* out of lock).
*
* @author Simon L Johnson
*/
@Singleton
public class InMemoryEvent{
private final @NotNull RollingList<Event> inMemoryEventList;
private final @NotNull ReadWriteLock lock = new ReentrantReadWriteLock();

@Inject
public InMemoryEvent() {
//optimize for quick write slow read (sort)
this(InternalConfigurations.EDGE_RUNTIME_MAX_EVENTS_IN_INMEMORY_LIST.get());
}

public InMemoryEvent(final int max) {
this.inMemoryEventList = new RollingList<>(max);
}

public void storeEvent(final @NotNull Event event) {
Lock writeLock = lock.writeLock();
try {
writeLock.lock();
inMemoryEventList.add(event);
} finally {
writeLock.unlock();
}
}

public @NotNull List<Event> readEvents(@Nullable Long since, @Nullable Integer limit) {
Lock readLock = lock.writeLock();
List<Event> events;
try {
readLock.lock();
events = new ArrayList<>(inMemoryEventList);
} finally {
readLock.unlock();
}
Stream<Event> stream = events.stream().sorted(Comparator.comparing(Event::getCreated).reversed());
if (since != null) {
stream = stream.filter(event -> since < event.getCreated().getTime());
}
if (limit != null) {
stream = stream.limit(limit);
}
return stream.collect(Collectors.toUnmodifiableList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.hivemq.edge.modules.adapters.impl;

import com.hivemq.adapter.sdk.api.events.EventService;
import com.hivemq.adapter.sdk.api.eventsv2.EventsService;
import com.hivemq.adapter.sdk.api.services.ModuleServices;
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterPublishService;
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterWritingService;
Expand All @@ -28,15 +29,18 @@ public class ModuleServicesImpl implements ModuleServices {

private final @NotNull ProtocolAdapterPublishService adapterPublishService;
private final @NotNull EventService eventService;
private final @NotNull EventsService eventServiceV2;
private final @NotNull ProtocolAdapterWritingService protocolAdapterWritingService;

@Inject
public ModuleServicesImpl(
final @NotNull ProtocolAdapterPublishService adapterPublishService,
final @NotNull EventService eventService,
final @NotNull EventsService eventServiceV2,
final @NotNull InternalProtocolAdapterWritingService protocolAdapterWritingService) {
this.adapterPublishService = adapterPublishService;
this.eventService = eventService;
this.eventServiceV2 = eventServiceV2;
this.protocolAdapterWritingService = protocolAdapterWritingService;
}

Expand All @@ -50,6 +54,11 @@ public ModuleServicesImpl(
return eventService;
}

@Override
public @NotNull EventsService eventServiceV2() {
return eventServiceV2;
}

@Override
public @NotNull ProtocolAdapterWritingService protocolAdapterWritingService() {
return protocolAdapterWritingService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.hivemq.adapter.sdk.api.ProtocolAdapter;
import com.hivemq.adapter.sdk.api.ProtocolAdapterPublishBuilder;
import com.hivemq.adapter.sdk.api.events.EventService;
import com.hivemq.adapter.sdk.api.eventsv2.EventsService;
import com.hivemq.adapter.sdk.api.services.ModuleServices;
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterPublishService;
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterWritingService;
Expand All @@ -29,15 +30,18 @@ public class ModuleServicesPerModuleImpl implements ModuleServices {

private final @NotNull ProtocolAdapterPublishServicePerAdapter adapterPublishServicePerAdapter;
private final @NotNull EventService eventService;
private final @NotNull EventsService eventsService;
private final @NotNull ProtocolAdapterWritingService protocolAdapterWritingService;

public ModuleServicesPerModuleImpl(
final @NotNull ProtocolAdapterPublishService adapterPublishService,
final @NotNull EventService eventService,
final @NotNull EventsService eventsService,
final @NotNull ProtocolAdapterWritingService protocolAdapterWritingService
) {
this.eventService = eventService;
this.adapterPublishServicePerAdapter = new ProtocolAdapterPublishServicePerAdapter(adapterPublishService);
this.eventsService = eventsService;
this.protocolAdapterWritingService = protocolAdapterWritingService;
}

Expand All @@ -51,6 +55,11 @@ public ModuleServicesPerModuleImpl(
return eventService;
}

@Override
public @NotNull EventsService eventServiceV2() {
return eventsService;
}

public void setAdapter(final @NotNull ProtocolAdapter protocolAdapter) {
this.adapterPublishServicePerAdapter.setAdapter(protocolAdapter);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.hivemq.edge.modules.ioc;

import com.hivemq.adapter.sdk.api.events.EventService;
import com.hivemq.adapter.sdk.api.eventsv2.EventsService;
import com.hivemq.adapter.sdk.api.services.ModuleServices;
import com.hivemq.adapter.sdk.api.services.ProtocolAdapterPublishService;
import com.hivemq.edge.impl.events.EventServiceDelegateImpl;
Expand Down Expand Up @@ -55,6 +56,10 @@ public abstract class ModulesModule {
@Singleton
abstract @NotNull EventService eventService(@NotNull EventServiceDelegateImpl eventServiceDelegate);

@Binds
@Singleton
abstract @NotNull EventsService eventsService(@NotNull com.hivemq.edge.impl.events.v2.EventServiceDelegateImpl eventServiceDelegate);

@Binds
@Singleton
abstract @NotNull EventStore eventStore(@NotNull InMemoryEventImpl inMemoryEvent);
Expand Down
Loading

0 comments on commit 7272bd0

Please sign in to comment.