Skip to content

Commit

Permalink
Sending incremental update instead of complete (#268)
Browse files Browse the repository at this point in the history
  • Loading branch information
climategadgets committed Aug 4, 2023
1 parent bce92d8 commit b12a941
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,7 @@ public class InstrumentCluster {
*
* This object gets updated and then emitted every time an update comes.
*/
private final SystemStatus currentStatus = new SystemStatus(
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>());
private final SystemStatus currentStatus = createEmptyStatus();

private final Sinks.Many<Signal<SystemStatus, Void>> statusSink = Sinks.many().multicast().onBackpressureBuffer();

Expand Down Expand Up @@ -79,16 +74,22 @@ public Flux<Signal<SystemStatus, Void>> getFlux() {
})
.subscribe(kv -> {

var id = kv.getKey();
String id = kv.getKey();
var status = kv.getValue();

status
.subscribe(s -> {

logger.info("update: id={}, status={}", id, s);
currentStatus.sensors().put(String.valueOf(id), s.getValue());

statusSink.tryEmitNext(new Signal<>(Instant.now(), currentStatus));
// Update the accumulated status
currentStatus.sensors().put(id, s.getValue());

// Send an incremental update
var incrementalStatus = createEmptyStatus();
incrementalStatus.sensors().put(id, s.getValue());

statusSink.tryEmitNext(new Signal<>(Instant.now(), incrementalStatus));
});

});
Expand All @@ -97,4 +98,13 @@ public Flux<Signal<SystemStatus, Void>> getFlux() {

return statusSink.asFlux();
}

private SystemStatus createEmptyStatus() {
return new SystemStatus(
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>(),
new TreeMap<>());
}
}

0 comments on commit b12a941

Please sign in to comment.