* The events can act only as observers. *
@@ -33,7 +34,7 @@ public interface InterceptHandler {
Class>[] ALL_MESSAGE_TYPES = {InterceptConnectMessage.class, InterceptDisconnectMessage.class,
InterceptConnectionLostMessage.class, InterceptPublishMessage.class, InterceptSubscribeMessage.class,
- InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class};
+ InterceptUnsubscribeMessage.class, InterceptAcknowledgedMessage.class, InterceptExceptionMessage.class};
/**
* @return the identifier of this intercept handler.
@@ -65,4 +66,6 @@ public interface InterceptHandler {
void onUnsubscribe(InterceptUnsubscribeMessage msg);
void onMessageAcknowledged(InterceptAcknowledgedMessage msg);
+
+ void onSessionLoopError(Throwable error);
}
diff --git a/moquette-0.17/broker/src/main/java/io/moquette/interception/Interceptor.java b/moquette-0.17/broker/src/main/java/io/moquette/interception/Interceptor.java
index e6950981..7055bcea 100644
--- a/moquette-0.17/broker/src/main/java/io/moquette/interception/Interceptor.java
+++ b/moquette-0.17/broker/src/main/java/io/moquette/interception/Interceptor.java
@@ -18,6 +18,7 @@
import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.broker.subscriptions.Subscription;
+import io.moquette.interception.messages.InterceptExceptionMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
@@ -47,6 +48,8 @@ public interface Interceptor {
void notifyMessageAcknowledged(InterceptAcknowledgedMessage msg);
+ void notifyLoopException(InterceptExceptionMessage th);
+
void addInterceptHandler(InterceptHandler interceptHandler);
void removeInterceptHandler(InterceptHandler interceptHandler);
diff --git a/moquette-0.17/broker/src/main/java/io/moquette/interception/messages/InterceptExceptionMessage.java b/moquette-0.17/broker/src/main/java/io/moquette/interception/messages/InterceptExceptionMessage.java
new file mode 100644
index 00000000..fda676ec
--- /dev/null
+++ b/moquette-0.17/broker/src/main/java/io/moquette/interception/messages/InterceptExceptionMessage.java
@@ -0,0 +1,13 @@
+package io.moquette.interception.messages;
+
+public class InterceptExceptionMessage implements InterceptMessage {
+ private Throwable error;
+
+ public InterceptExceptionMessage(Throwable error) {
+ this.error = error;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+}
diff --git a/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2Builder.java b/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2Builder.java
index fd72e757..641c5cd6 100644
--- a/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2Builder.java
+++ b/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2Builder.java
@@ -1,14 +1,18 @@
package io.moquette.persistence;
-import io.moquette.BrokerConstants;
import io.moquette.broker.IQueueRepository;
import io.moquette.broker.IRetainedRepository;
+import io.moquette.broker.ISessionsRepository;
import io.moquette.broker.ISubscriptionsRepository;
-import io.moquette.broker.config.IConfig;
import org.h2.mvstore.MVStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Clock;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -19,21 +23,31 @@ public class H2Builder {
private final String storePath;
private final int autosaveInterval; // in seconds
private final ScheduledExecutorService scheduler;
+ private final Clock clock;
private MVStore mvStore;
- public H2Builder(IConfig props, ScheduledExecutorService scheduler) {
- this.storePath = props.getProperty(BrokerConstants.PERSISTENT_STORE_PROPERTY_NAME, "");
- final String autosaveProp = props.getProperty(BrokerConstants.AUTOSAVE_INTERVAL_PROPERTY_NAME, "30");
- this.autosaveInterval = Integer.parseInt(autosaveProp);
+ public H2Builder(ScheduledExecutorService scheduler, Path storePath, int autosaveInterval, Clock clock) {
+ this.storePath = storePath.resolve("moquette_store.h2").toAbsolutePath().toString();
+ this.autosaveInterval = autosaveInterval;
this.scheduler = scheduler;
+ this.clock = clock;
}
@SuppressWarnings("FutureReturnValueIgnored")
public H2Builder initStore() {
- LOG.info("Initializing H2 store");
+ LOG.info("Initializing H2 store to {}", storePath);
if (storePath == null || storePath.isEmpty()) {
throw new IllegalArgumentException("H2 store path can't be null or empty");
}
+
+ if (!Files.exists(Paths.get(storePath))) {
+ try {
+ Files.createFile(Paths.get(storePath));
+ } catch (IOException ex) {
+ throw new IllegalArgumentException("Error creating " + storePath + " file", ex);
+ }
+ }
+
mvStore = new MVStore.Builder()
.fileName(storePath)
.autoCommitDisabled()
@@ -62,4 +76,8 @@ public IQueueRepository queueRepository() {
public IRetainedRepository retainedRepository() {
return new H2RetainedRepository(mvStore);
}
+
+ public ISessionsRepository sessionsRepository() {
+ return new H2SessionsRepository(mvStore, clock);
+ }
}
diff --git a/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java b/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java
index 90eb967c..8f27b92c 100644
--- a/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java
+++ b/moquette-0.17/broker/src/main/java/io/moquette/persistence/H2QueueRepository.java
@@ -20,11 +20,7 @@
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
import org.h2.mvstore.MVStore;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;
public class H2QueueRepository implements IQueueRepository {
@@ -52,4 +48,9 @@ public boolean containsQueue(String queueName) {
public SessionMessageQueue