Skip to content

Commit

Permalink
Merge branch 'gh268-instrument-cluster' into gh47 (#47, #268)
Browse files Browse the repository at this point in the history
The instrument cluster is not complete yet but is sufficient to proceed so
small things can be added and tweaked later.
  • Loading branch information
climategadgets committed Aug 20, 2023
2 parents a4ddd66 + e5d7ee0 commit 1d970c7
Show file tree
Hide file tree
Showing 39 changed files with 1,057 additions and 790 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import net.sf.dz3.runtime.config.model.ZoneConfigurationParser;
import net.sf.dz3.runtime.config.mqtt.MqttConfigurationParser;
import net.sf.dz3.runtime.config.onewire.OnewireConfigurationParser;
import net.sf.dz3r.instrumentation.InstrumentCluster;
import net.sf.dz3r.instrumentation.Marker;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -88,12 +89,19 @@ public HccParsedConfig parse(HccRawConfig source) {
ctx.directors.close();
m.checkpoint("configured directors");

var ic = new InstrumentCluster(
ctx.sensors.getFlux(),
ctx.switches.getFlux(),
ctx.connectors.getFlux(),
ctx.collectors.getFlux(),
ctx.hvacDevices.getFlux());

// Need directors resolved by now

new WebUiConfigurationParser(ctx).parse(source.webUi());
new WebUiConfigurationParser(ctx, ic).parse(source.webUi());
m.checkpoint("configured WebUI");

new ConsoleConfigurationParser(ctx).parse(source.console());
new ConsoleConfigurationParser(ctx, ic).parse(source.instance(), source.console());
m.checkpoint("configured console");

logger.error("ConfigurationParser::parse(): NOT IMPLEMENTED");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import net.sf.dz3.runtime.config.ConfigurationContext;
import net.sf.dz3.runtime.config.ConfigurationContextAware;
import net.sf.dz3r.instrumentation.InstrumentCluster;
import net.sf.dz3r.signal.Signal;
import net.sf.dz3r.view.swing.ReactiveConsole;
import reactor.core.publisher.Flux;
Expand All @@ -13,11 +14,15 @@

public class ConsoleConfigurationParser extends ConfigurationContextAware {

public ConsoleConfigurationParser(ConfigurationContext context) {
private final InstrumentCluster ic;

public ConsoleConfigurationParser(ConfigurationContext context, InstrumentCluster ic) {
super(context);

this.ic = ic;
}

public ReactiveConsole parse(ConsoleConfig cf) {
public ReactiveConsole parse(String instance, ConsoleConfig cf) {

var directors = context
.directors
Expand All @@ -35,7 +40,7 @@ public ReactiveConsole parse(ConsoleConfig cf) {

// VT: NOTE: next step - just remove block() and make the constructor consume the fluxes, it iterates through them anyway

return new ReactiveConsole(directors, sensors, Optional.ofNullable(cf.units()).orElse(TemperatureUnit.C));
return new ReactiveConsole(instance, directors, sensors, ic, Optional.ofNullable(cf.units()).orElse(TemperatureUnit.C));
}

private boolean isConfigured(Set<String> sensors, Map.Entry<String, Flux<Signal<Double, Void>>> s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import net.sf.dz3.runtime.config.ConfigurationContext;
import net.sf.dz3.runtime.config.ConfigurationContextAware;
import net.sf.dz3r.instrumentation.InstrumentCluster;
import net.sf.dz3r.view.webui.v2.WebUI;

import java.util.Map;
Expand All @@ -10,7 +11,7 @@

public class WebUiConfigurationParser extends ConfigurationContextAware {

public WebUiConfigurationParser(ConfigurationContext context) {
public WebUiConfigurationParser(ConfigurationContext context, InstrumentCluster ic) {
super(context);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package net.sf.dz3r.instrumentation;

import net.sf.dz3r.device.actuator.HvacDevice;
import net.sf.dz3r.device.actuator.Switch;
import net.sf.dz3r.signal.Signal;
import net.sf.dz3r.signal.health.SystemStatus;
import net.sf.dz3r.view.Connector;
import net.sf.dz3r.view.MetricsCollector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.time.Instant;
import java.util.AbstractMap;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;


/**
* Collection of all raw signal processors emitting a coherent "at a glance" system status.
*
* @author Copyright &copy; <a href="mailto:vt@homeclimatecontrol.com">Vadim Tkachenko 2001-2023
*/
public class InstrumentCluster {

private final Logger logger = LogManager.getLogger();

private final Flux<Map.Entry<String, Flux<Signal<Double, Void>>>> sensors;
private final Flux<Map.Entry<String, Switch<?>>> switches;
private final Flux<Map.Entry<String, Connector>> connectors;
private final Flux<Map.Entry<String, MetricsCollector>> collectors;
private final Flux<Map.Entry<String, HvacDevice>> hvacDevices;

private final Map<String, SensorStatusProcessor> sensorProcessors = new HashMap<>();
private final Map<String, SwitchStatusProcessor> switchProcessors = new HashMap<>();

/**
* Status accumulator.
*
* This object gets updated and then emitted every time an update comes.
*/
private final SystemStatus currentStatus = createEmptyStatus();

private final Sinks.Many<Signal<SystemStatus, Void>> statusSink = Sinks.many().multicast().onBackpressureBuffer();

public InstrumentCluster(
Flux<Map.Entry<String, Flux<Signal<Double, Void>>>> sensors,
Flux<Map.Entry<String, Switch<?>>> switches,
Flux<Map.Entry<String, Connector>> connectors,
Flux<Map.Entry<String, MetricsCollector>> collectors,
Flux<Map.Entry<String, HvacDevice>> hvacDevices
) {

this.sensors = sensors;
this.switches = switches;
this.connectors = connectors;
this.collectors = collectors;
this.hvacDevices = hvacDevices;
}

/**
* @return System status flux. A new item is emitted every time a particular entity's status is updated,
* the item can and must be treated as an incremental update, though it may at times represent full system status.
*/
public Flux<Signal<SystemStatus, Void>> getFlux() {

connectSensors();
connectSwitches();
connectHvacDevices();

logger.error("FIXME: NOT IMPLEMENTED: getFlux(SystemStatus)");

return statusSink.asFlux();
}

private void connectSensors() {
sensors
.map(kv -> {
var id = kv.getKey();
var p = sensorProcessors.computeIfAbsent(id, SensorStatusProcessor::new);

return new AbstractMap.SimpleEntry<>(id, p.compute(kv.getValue()));
})
.subscribe(kv -> {

String id = kv.getKey();
var status = kv.getValue();

status
.subscribe(s -> {

logger.debug("update/sensor: id={}, status={}", id, s);

// Update the accumulated status
currentStatus.sensors().put(id, s);

// Send an incremental update
var incrementalStatus = createEmptyStatus();
incrementalStatus.sensors().put(id, s);

statusSink.tryEmitNext(new Signal<>(Instant.now(), incrementalStatus));
});

});
}

private void connectSwitches() {

switches
.map(kv -> {
var id = kv.getKey();
var p = switchProcessors.computeIfAbsent(id, SwitchStatusProcessor::new);

return new AbstractMap.SimpleEntry<>(id, p.compute(kv.getValue().getFlux()));
})
.subscribe(kv -> {

String id = kv.getKey();
var status = kv.getValue();

status
.subscribe(s -> {

logger.debug("update/switch: id={}, status={}", id, s);

// Update the accumulated status
currentStatus.switches().put(id, s);

// Send an incremental update
var incrementalStatus = createEmptyStatus();
incrementalStatus.switches().put(id, s);

statusSink.tryEmitNext(new Signal<>(Instant.now(), incrementalStatus));
});
});
}

private void connectHvacDevices() {

// Unlike others, this status object gets passed directly without transformation

hvacDevices

// VT: FIXME: Ugly. This should not have been needed if the rest of the framework was put together correctly.
// + bucket list for https://github.com/home-climate-control/dz/issues/271

// PS: ... and it still doesn't work right, not all HVAC devices are displayed
// but only those for which updates are being sent.

.parallel()
.runOn(Schedulers.boundedElastic())

.map(kv -> new AbstractMap.SimpleEntry<>(kv.getKey(), kv.getValue().getFlux()))
.subscribe(kv -> {

String id = kv.getKey();
var status = kv.getValue();

status
.subscribe(s -> {

logger.debug("update/hvacDevice: id={}, status={}", id, s);

// Update the accumulated status
currentStatus.hvacDevices().put(id, s);

// Send an incremental update
var incrementalStatus = createEmptyStatus();
incrementalStatus.hvacDevices().put(id, s);

statusSink.tryEmitNext(new Signal<>(Instant.now(), incrementalStatus));
});
});

}

private SystemStatus createEmptyStatus() {
return new SystemStatus(
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package net.sf.dz3r.instrumentation;

import net.sf.dz3r.signal.Signal;
import net.sf.dz3r.signal.SignalProcessor;
import net.sf.dz3r.signal.health.SensorStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import reactor.core.publisher.Flux;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Optional;
import java.util.SortedSet;
import java.util.TreeSet;

/**
* Consumes individual sensor signal, emits sensor status.
*
* @author Copyright &copy; <a href="mailto:vt@homeclimatecontrol.com">Vadim Tkachenko 2001-2023
*/
public class SensorStatusProcessor implements SignalProcessor<Double, SensorStatus, Void> {

private final Logger logger = LogManager.getLogger();

private final String id;
private final SortedSet<Double> diffs = new TreeSet<>();

/**
* Last known non-error signal value.
*/
private Double lastKnown = null;

/**
* Calculated resolution. Is not affected by error signals, but may need to be adjusted for time windows at some point.
*/
private Double resolution = null;

public SensorStatusProcessor(String id) {
this.id = id;

logger.info("created sensor status processor for id={}", id);
}

@Override
public Flux<Signal<SensorStatus, Void>> compute(Flux<Signal<Double, Void>> in) {
return in.map(this::compute);
}

private Signal<SensorStatus, Void> compute(Signal<Double, Void> source) {

if (source.isError()) {
// Nothing else matters
lastKnown = null;
return new Signal<>(source.timestamp, null, null, source.status, source.error);
}

// VT: FIXME: Calculate signal stats

return new Signal<>(
source.timestamp,
new SensorStatus(
Optional.ofNullable(source.getValue())
.map(this::computeResolution)
.orElse(this.resolution),
Optional.empty()));
}

private Double computeResolution(Double value) {

ThreadContext.push("computeResolution#" + id);

try {

if (lastKnown != null) {

// Stored as a set and not as a value to possibly improve the algorithm in the future,
// including detecting noisy analog signals (might want to return NaN instead of null for that case)

var diff = round(Math.abs(value - lastKnown));

if (Double.compare(diff, 0.0) == 0) {
// Sorry, no cigar
return resolution;
}

diffs.add(diff);

if (diffs.size() > 50) {
logger.warn("Noisy signal? Trimming the tail: {}", diffs);

var i = ((TreeSet<?>) diffs).descendingIterator();
i.next();
i.remove();
}
}

logger.debug("diffs: {}", diffs);

lastKnown = value;
resolution = computeResolution(diffs);

return resolution;

} finally {
ThreadContext.pop();
}
}

private double round(Double d) {
return
BigDecimal.valueOf(d)
.setScale(3, RoundingMode.HALF_UP)
.doubleValue();
}

private Double computeResolution(SortedSet<Double> source) {

if (source.size() < 2) {
return null;
}

// Naïve version, assumes there will be the smallest difference, doesn't account for quickly changing signals,
// nor does it account for noisy signals (except trimming the set to a reasonable length)

return source.first();
}
}
Loading

0 comments on commit 1d970c7

Please sign in to comment.