diff --git a/.env.example b/.env.example index e4a49a6..926d5af 100644 --- a/.env.example +++ b/.env.example @@ -17,3 +17,8 @@ INFLUX_ORG=solectrus # Customize InfluxDB storage INFLUX_BUCKET=my-solectrus-bucket INFLUX_MEASUREMENT=shelly + +# Optional: Minimal mode to reduce data sent to InfluxDB +# When enabled, it minimizes the data sent to InfluxDB +# by skipping consecutive records with zero power values. +# INFLUX_MODE=minimal diff --git a/lib/config.rb b/lib/config.rb index 52df569..e62228d 100644 --- a/lib/config.rb +++ b/lib/config.rb @@ -14,12 +14,14 @@ influx_org influx_bucket influx_measurement + influx_mode ].freeze DEFAULTS = { influx_schema: :http, influx_port: 8086, influx_measurement: 'Consumer', + influx_mode: :default, }.freeze Config = @@ -118,6 +120,7 @@ def validate_influx_settings! end validate_url!(influx_url) + validate_mode!(influx_mode) end def validate_url!(url) @@ -126,6 +129,10 @@ def validate_url!(url) (uri.is_a?(URI::HTTP) && uri.host.present?) || throw("URL is invalid: #{url}") end + def validate_mode!(mode) + %i[default minimal].include?(mode) || throw("INFLUX_MODE is invalid: #{mode}") + end + def self.from_env(options = {}) new( { @@ -139,6 +146,7 @@ def self.from_env(options = {}) influx_org: ENV.fetch('INFLUX_ORG'), influx_bucket: ENV.fetch('INFLUX_BUCKET', nil), influx_measurement: ENV.fetch('INFLUX_MEASUREMENT', nil), + influx_mode: ENV.fetch('INFLUX_MODE', nil)&.to_sym, }.merge(options), ) end diff --git a/lib/shelly_pull.rb b/lib/shelly_pull.rb index 16aff16..36b4bca 100644 --- a/lib/shelly_pull.rb +++ b/lib/shelly_pull.rb @@ -3,14 +3,35 @@ def initialize(config:, queue:) @queue = queue @config = config @count = 0 + @last_power = -1 end attr_reader :config, :queue, :count def next record = config.adapter.solectrus_record(@count += 1) - queue << record + queue << record if should_queue?(record) record end + + private + + def should_queue?(record) + case config.influx_mode + when :default + # Always queue this record + true + + when :minimal + # Only queue this record if the power is non-zero or if it just changed to zero + result = record.power.nonzero? || @last_power.nonzero? + config.logger.info "Ignoring record ##{record.id} (no power)" unless result + + # Remember the last power + @last_power = record.power + + result + end + end end diff --git a/spec/lib/config_spec.rb b/spec/lib/config_spec.rb index c23bf88..5332572 100644 --- a/spec/lib/config_spec.rb +++ b/spec/lib/config_spec.rb @@ -7,6 +7,7 @@ influx_token: 'this.is.just.an.example', influx_org: 'solectrus', influx_bucket: 'Consumer', + influx_mode: :minimal, } end @@ -51,6 +52,12 @@ end.to raise_error(Exception, /INFLUX_TOKEN is missing/) end + it 'raises an error for invalid INFLUX_MODE' do + expect do + described_class.new(valid_options.merge(influx_mode: 'foo')) + end.to raise_error(Exception, /MODE is invalid/) + end + it 'initializes with valid options' do expect { described_class.new(valid_options) }.not_to raise_error end @@ -138,5 +145,9 @@ it 'returns correct influx_measurement' do expect(config.influx_measurement).to eq('Consumer') end + + it 'returns correct influx_mode' do + expect(config.influx_mode).to eq(:minimal) + end end end diff --git a/spec/lib/shelly_pull_spec.rb b/spec/lib/shelly_pull_spec.rb index 5596a11..c995fcf 100644 --- a/spec/lib/shelly_pull_spec.rb +++ b/spec/lib/shelly_pull_spec.rb @@ -31,5 +31,45 @@ expect(queue.length).to eq(0) end end + + context 'when minimal mode' do + let(:config) { Config.from_env(influx_mode: :minimal) } + + it 'queues record if power is non-zero' do + record1 = SolectrusRecord.new(id: 1, time: 0, payload: { power: 1 }) + allow(config.adapter).to receive(:solectrus_record).and_return(record1) + shelly_pull.next + + record2 = SolectrusRecord.new(id: 2, time: 0, payload: { power: 2 }) + allow(config.adapter).to receive(:solectrus_record).and_return(record2) + shelly_pull.next + + expect(queue.length).to eq(2) + end + + it 'queues record if power just changed to zero' do + record1 = SolectrusRecord.new(id: 3, time: 0, payload: { power: 1 }) + allow(config.adapter).to receive(:solectrus_record).and_return(record1) + shelly_pull.next + + record2 = SolectrusRecord.new(id: 4, time: 0, payload: { power: 0 }) + allow(config.adapter).to receive(:solectrus_record).and_return(record2) + shelly_pull.next + + expect(queue.length).to eq(2) + end + + it 'does not queue record if power is zero' do + record1 = SolectrusRecord.new(id: 5, time: 0, payload: { power: 0 }) + allow(config.adapter).to receive(:solectrus_record).and_return(record1) + shelly_pull.next + + record2 = SolectrusRecord.new(id: 6, time: 0, payload: { power: 0 }) + allow(config.adapter).to receive(:solectrus_record).and_return(record2) + shelly_pull.next + + expect(queue.length).to eq(1) + end + end end end