Skip to content

Commit

Permalink
Merge pull request #279 from FlowFuse/213-datastore
Browse files Browse the repository at this point in the history
Extract historical state mgmt into a datastore
  • Loading branch information
joepavitt authored Oct 18, 2023
2 parents 0cc3f14 + dac0ffe commit 1bc99cb
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 25 deletions.
1 change: 1 addition & 0 deletions docs/.vitepress/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ export default ({ mode }) => {
items: [
{ text: 'Repo Structure', link: '/contributing/guides/repo' },
{ text: 'Events Architecture', link: '/contributing/guides/events' },
{ text: 'State Management', link: '/contributing/guides/state-management' },
{ text: 'Layout Managers', link: '/contributing/guides/layouts' },
{ text: 'Registering Widgets', link: '/contributing/guides/registration' }
]
Expand Down
Binary file modified docs/assets/images/events-architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
50 changes: 50 additions & 0 deletions docs/contributing/guides/state-management.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# State Management

Dashboard 2.0 conducts state management through the use of a shared server-side data store. It provides four core functions for interaction with the store.

The store can be injected into a widget using:

```js
const datastore = require('<path>/<to>/store/index.js')
```


## `datastore.save`

When a widget receives a message, the default `node.on('input')` handler will store the received message, mapped to the widget's id into the datastore using:

```js
datastore.save(node.id, msg)
```

This will store the latest message received by the widget, which can be retrieved by that same widget on load using:

## `datastore.get`

When a widget is initialised, it will attempt to retrieve the latest message from the datastore using:

```js
datastore.get(node.id)
```

This ensures, on refresh of the client, or when new clients connect after data has been geenrated, that the state is presented consistently.

## `datastore.append`

With `.append`, we can store multiple messages against the same widget, representing a history of state, rather than a single point reference to the _last_ value only.

```js
datastore.append(node.id, msg)
```

This is used in `ui-chart` to store the history of data points, where each data point could have been an individual message received by the widget.

## `datastore.clear`

When a widget is removed from the Editor, we can clear the datastore of any messages stored against that widget using:

```js
datastore.clear(node.id)
```

This ensures that we don't have any stale data in the datastore, and that we don't have any data stored against widgets that no longer exist in the Editor.
19 changes: 13 additions & 6 deletions nodes/config/ui_base.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
const path = require('path')

const v = require('../../package.json').version
const datastore = require('../store/index.js')
const { appendTopic } = require('../utils/index.js')

// from: https://stackoverflow.com/a/28592528/3016654
Expand Down Expand Up @@ -346,7 +347,7 @@ module.exports = function (RED) {
if (!wNode) {
return // widget does not exist any more (e.g. deleted from NR and deployed BUT the ui page was not refreshed)
}
let msg = wNode._msg || {}
let msg = datastore.get(id) || {}
async function defaultHandler (value) {
msg.payload = value

Expand All @@ -355,7 +356,7 @@ module.exports = function (RED) {
if (widgetEvents?.beforeSend) {
msg = await widgetEvents.beforeSend(msg)
}
wNode._msg = msg
datastore.save(id, msg)
wNode.send(msg) // send the msg onwards
}

Expand All @@ -366,6 +367,7 @@ module.exports = function (RED) {
const handler = typeof (widgetEvents.onChange) === 'function' ? widgetEvents.onChange : defaultHandler
await handler(value)
} catch (error) {
console.log(error)
let errorHandler = typeof (widgetEvents.onError) === 'function' ? widgetEvents.onError : null
errorHandler = errorHandler || (typeof wNode.error === 'function' ? wNode.error : node.error)
errorHandler && errorHandler(error)
Expand All @@ -381,7 +383,7 @@ module.exports = function (RED) {
}
async function handler () {
// replicate receiving an input, so the widget can handle accordingly
const msg = wNode._msg
const msg = datastore.get(id)
if (msg) {
// only emit something if we have something to send
// and only to this connection, not all connected clients
Expand Down Expand Up @@ -489,8 +491,8 @@ module.exports = function (RED) {
order: widgetConfig.order || 0
},
state: {
enabled: widgetNode._msg?.enabled || true,
visible: widgetNode._msg?.visible || true
enabled: datastore.get(widgetConfig.id)?.enabled || true,
visible: datastore.get(widgetConfig.id)?.visible || true
},
hooks: widgetEvents
}
Expand Down Expand Up @@ -586,7 +588,7 @@ module.exports = function (RED) {
// msg could be null if the beforeSend errors and returns null
if (msg) {
// store the latest msg passed to node
wNode._msg = msg
datastore.save(widgetNode.id, msg)

if (widgetConfig.topic || widgetConfig.topicType) {
msg = await appendTopic(RED, widgetConfig, wNode, msg)
Expand Down Expand Up @@ -617,6 +619,11 @@ module.exports = function (RED) {

// when a widget is "closed" remove it from this Base Node's knowledge
widgetNode.on('close', function (removed, done) {
if (removed) {
// widget has been removed from the Editor
// clear any data from datastore
datastore.clear(widgetNode.id)
}
node.deregister(null, null, widgetNode)
done()
})
Expand Down
34 changes: 34 additions & 0 deletions nodes/store/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const data = {}

const getters = {
// given a widget id, return the latest msg received
msg (id) {
return data[id]
}
}

const setters = {
// remove data associated to a given widget
clear (id) {
delete data[id]
},
// given a widget id, and msg, store that latest value
save (id, msg) {
data[id] = msg
},
// given a widget id, and msg, store in an array of history of values
// useful for charting widgets
append (id, msg) {
if (!data[id]) {
data[id] = []
}
data[id].push(msg)
}
}

module.exports = {
get: getters.msg,
save: setters.save,
append: setters.append,
clear: setters.clear
}
34 changes: 21 additions & 13 deletions nodes/widgets/ui_chart.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const datastore = require('../store/index.js')

module.exports = function (RED) {
function ChartNode (config) {
const node = this
Expand Down Expand Up @@ -51,16 +53,16 @@ module.exports = function (RED) {
},
onInput: function (msg, send, done) {
// use our own custom onInput in order to store history of msg payloads
if (!node._msg) {
node._msg = []
if (!datastore.get(node.id)) {
datastore.save(node.id, [])
}
if (Array.isArray(msg.payload) && !msg.payload.length) {
// clear history
node._msg = []
datastore.save(node.id, [])
} else {
if (!Array.isArray(msg.payload)) {
// quick clone of msg, and store in history
node._msg.push({
datastore.append(node.id, {
...msg
})
} else {
Expand All @@ -72,40 +74,45 @@ module.exports = function (RED) {
payload,
_datapoint: msg._datapoint[i]
}
node._msg.push(m)
datastore.append(node.id, m)
})
}

const maxPoints = parseInt(config.removeOlderPoints)

if (config.xAxisType === 'category') {
// filters the node._msg array so that we keep just the latest msg with each category
// TODO: CARRY ON FROM HERE
const _msg = datastore.get(node.id)

// filters the ._msg array so that we keep just the latest msg with each category/series
const seen = {}
node._msg.forEach((msg, index) => {
_msg.forEach((msg, index) => {
// loop through and record the latest index seen for each topic/label
seen[msg.topic] = index
})
const indices = Object.values(seen)
node._msg = node._msg.filter((msg, index) => {
datastore(node.id, _msg.filter((msg, index) => {
// return only the msgs with the latest index for each topic/label
return indices.includes(index)
})
}))
} else if (maxPoints && config.removeOlderPoints) {
// account for multiple lines?
// client-side does this for _each_ line
// remove older points
const lineCounts = {}
const _msg = datastore.get(node.id)
// trawl through in reverse order, and only keep the latest points (up to maxPoints) for each label
for (let i = node._msg.length - 1; i >= 0; i--) {
const msg = node._msg[i]
for (let i = _msg.length - 1; i >= 0; i--) {
const msg = _msg[i]
const label = msg.topic
lineCounts[label] = lineCounts[label] || 0
if (lineCounts[label] >= maxPoints) {
node._msg.splice(i, 1)
_msg.splice(i, 1)
} else {
lineCounts[label]++
}
}
datastore.save(node.id, _msg)
}

if (config.xAxisType === 'time' && config.removeOlder && config.removeOlderUnit) {
Expand All @@ -114,14 +121,15 @@ module.exports = function (RED) {
const removeOlderUnit = parseFloat(config.removeOlderUnit)
const ago = (removeOlder * removeOlderUnit) * 1000 // milliseconds ago
const cutoff = (new Date()).getTime() - ago
node._msg = node._msg.filter((msg) => {
const _msg = datastore.get(node.id).filter((msg) => {
let timestamp = msg._datapoint.x
// is x already a millisecond timestamp?
if (typeof (msg._datapoint.x) === 'string') {
timestamp = (new Date(msg._datapoint.x)).getTime()
}
return timestamp > cutoff
})
datastore.save(node.id, _msg)
}

// check sizing limits
Expand Down
7 changes: 4 additions & 3 deletions nodes/widgets/ui_switch.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const datastore = require('../store/index.js')
const { appendTopic } = require('../utils/index.js')

module.exports = function (RED) {
Expand All @@ -19,7 +20,7 @@ module.exports = function (RED) {
onChange: async function (value) {
// ensure we have latest instance of the widget's node
const wNode = RED.nodes.getNode(node.id)
const msg = wNode._msg || {}
const msg = datastore.get(node.id) || {}

node.status({
fill: value ? 'green' : 'red',
Expand All @@ -32,7 +33,7 @@ module.exports = function (RED) {
const off = RED.util.evaluateNodeProperty(config.offvalue, config.offvalueType, wNode)
msg.payload = value ? on : off

wNode._msg = msg
datastore.save(node.id, msg)

// simulate Node-RED node receiving an input
wNode.send(msg)
Expand All @@ -55,7 +56,7 @@ module.exports = function (RED) {
}
if (!error) {
// store the latest msg passed to node
wNode._msg = msg
datastore.save(node.id, msg)

node.status({
fill: msg.payload ? 'green' : 'red',
Expand Down
6 changes: 3 additions & 3 deletions nodes/widgets/ui_text_input.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const datastore = require('../store/index.js')

module.exports = function (RED) {
function TextInputNode (config) {
const node = this
Expand All @@ -14,10 +16,8 @@ module.exports = function (RED) {
const evts = {
onChange: true,
onInput: function (msg, send) {
// ensure we have latest instance of the widget's node
const wNode = RED.nodes.getNode(node.id)
// store the latest msg passed to node
wNode._msg = msg
datastore.save(node.id, msg)
// only send msg on if we have passthru enabled
if (config.passthru) {
send(msg)
Expand Down

0 comments on commit 1bc99cb

Please sign in to comment.