Skip to content

Commit

Permalink
Import data points exactly once
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstantin Deev committed Aug 20, 2023
1 parent bceba9a commit b7b90b6
Showing 1 changed file with 45 additions and 12 deletions.
57 changes: 45 additions & 12 deletions custom_components/eyeonwater/sensor.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,64 @@
"""Support for EyeOnWater sensors."""
import logging
import datetime
import pytz

from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.components.recorder import get_instance

from homeassistant.core import callback
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from homeassistant.components.recorder.models import StatisticData, StatisticMetaData
from homeassistant.components.recorder.statistics import async_import_statistics
from homeassistant.components.recorder.statistics import async_import_statistics, get_last_statistics

from .const import DATA_COORDINATOR, DATA_SMART_METER, DOMAIN, WATER_METER_NAME
from .eow import Meter

_LOGGER = logging.getLogger(__name__)
_LOGGER.addHandler(logging.StreamHandler())

def get_statistics_id(meter) -> str:
return f"sensor.water_meter_{meter.meter_id}"

async def get_last_imported_time(hass, meter):
# https://github.com/home-assistant/core/blob/74e2d5c5c312cf3ba154b5206ceb19ba884c6fb4/homeassistant/components/tibber/sensor.py#L11

statistic_id = get_statistics_id(meter)


last_stats = await get_instance(hass).async_add_executor_job(
get_last_statistics, hass, 1, statistic_id, True, set(["start", "sum"])
)
_LOGGER.warning(f"last_stats {last_stats}")

if last_stats:
date = last_stats[statistic_id][0]["start"]
_LOGGER.warning(f"date {date}")
date = datetime.datetime.fromtimestamp(date)
_LOGGER.warning(f"date {date}")
date = pytz.UTC.localize(date)
_LOGGER.warning(f"date {date}")

return date
return None

async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up the EyeOnWater sensors."""
coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR]
meters = hass.data[DOMAIN][config_entry.entry_id][DATA_SMART_METER].meters

sensors = []
for meter in meters:
sensors.append(EyeOnWaterSensor(meter, coordinator))
last_imported_time = await get_last_imported_time(hass=hass, meter=meter)
sensors.append(EyeOnWaterSensor(meter, last_imported_time, coordinator))

async_add_entities(sensors, False)

Expand All @@ -41,7 +71,7 @@ class EyeOnWaterSensor(CoordinatorEntity, SensorEntity):
_attr_device_class = SensorDeviceClass.WATER
_attr_state_class = SensorStateClass.TOTAL_INCREASING

def __init__(self, meter: Meter, coordinator: DataUpdateCoordinator) -> None:
def __init__(self, meter: Meter, last_imported_time, coordinator: DataUpdateCoordinator) -> None:
"""Initialize the sensor."""
super().__init__(coordinator)
self.meter = meter
Expand All @@ -54,7 +84,7 @@ def __init__(self, meter: Meter, coordinator: DataUpdateCoordinator) -> None:
name=f"{WATER_METER_NAME} {self.meter.meter_id}",
)
self._last_historical_data = []
self._last_imported_time = None
self._last_imported_time = last_imported_time

@property
def available(self):
Expand All @@ -80,11 +110,16 @@ def _state_update(self):

self._last_historical_data = self.meter.last_historical_data.copy()
if self._last_imported_time:
_LOGGER.warning(f"_last_imported_time {self._last_imported_time} - self._last_historical_data {self._last_historical_data[-1]['start']}")
self._last_historical_data = list(filter(lambda r: r["start"] > self._last_imported_time, self._last_historical_data))
_LOGGER.warning(f"{len(self._last_historical_data)} data points will be imported")


if self._last_historical_data:
self.import_historical_data()

self._last_imported_time = self._last_historical_data[-1]["start"]

self.import_historical_data()

self.async_write_ha_state()

Expand All @@ -104,9 +139,11 @@ def import_historical_data(self):
"""Import historical data for today and past N days."""

if not self._last_historical_data:
_LOGGER.info("There is no new historical data")
_LOGGER.warning("There is no new historical data")
# Nothing to import
return

_LOGGER.warning(f"{len(self._last_historical_data)} data points will be imported")

statistics = [
StatisticData(
Expand All @@ -116,13 +153,8 @@ def import_historical_data(self):
for row in self._last_historical_data
]

# # Do not import last 3 hours
# statistics = statistics[:-3]
# if not statistics:
# return

name = f"{WATER_METER_NAME} {self.meter.meter_id}"
statistic_id = name = f"sensor.water_meter_{self.meter.meter_id}"
statistic_id = get_statistics_id(self.meter)

metadata = StatisticMetaData(
has_mean=False,
Expand All @@ -133,3 +165,4 @@ def import_historical_data(self):
unit_of_measurement=self.meter.native_unit_of_measurement,
)
async_import_statistics(self.hass, metadata, statistics)

0 comments on commit b7b90b6

Please sign in to comment.