From b7b90b670bcb85d8a268283911c90ec46d27da65 Mon Sep 17 00:00:00 2001 From: Konstantin Deev Date: Sat, 19 Aug 2023 20:07:19 -0500 Subject: [PATCH] Import data points exactly once --- custom_components/eyeonwater/sensor.py | 57 ++++++++++++++++++++------ 1 file changed, 45 insertions(+), 12 deletions(-) diff --git a/custom_components/eyeonwater/sensor.py b/custom_components/eyeonwater/sensor.py index ae95579..ab54afa 100644 --- a/custom_components/eyeonwater/sensor.py +++ b/custom_components/eyeonwater/sensor.py @@ -1,11 +1,15 @@ """Support for EyeOnWater sensors.""" import logging +import datetime +import pytz from homeassistant.components.sensor import ( SensorDeviceClass, SensorEntity, SensorStateClass, ) +from homeassistant.components.recorder import get_instance + from homeassistant.core import callback from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.update_coordinator import ( @@ -13,7 +17,7 @@ DataUpdateCoordinator, ) from homeassistant.components.recorder.models import StatisticData, StatisticMetaData -from homeassistant.components.recorder.statistics import async_import_statistics +from homeassistant.components.recorder.statistics import async_import_statistics, get_last_statistics from .const import DATA_COORDINATOR, DATA_SMART_METER, DOMAIN, WATER_METER_NAME from .eow import Meter @@ -21,6 +25,31 @@ _LOGGER = logging.getLogger(__name__) _LOGGER.addHandler(logging.StreamHandler()) +def get_statistics_id(meter) -> str: + return f"sensor.water_meter_{meter.meter_id}" + +async def get_last_imported_time(hass, meter): + # https://github.com/home-assistant/core/blob/74e2d5c5c312cf3ba154b5206ceb19ba884c6fb4/homeassistant/components/tibber/sensor.py#L11 + + statistic_id = get_statistics_id(meter) + + + last_stats = await get_instance(hass).async_add_executor_job( + get_last_statistics, hass, 1, statistic_id, True, set(["start", "sum"]) + ) + _LOGGER.warning(f"last_stats {last_stats}") + + if last_stats: + date = last_stats[statistic_id][0]["start"] + _LOGGER.warning(f"date {date}") + date = datetime.datetime.fromtimestamp(date) + _LOGGER.warning(f"date {date}") + date = pytz.UTC.localize(date) + _LOGGER.warning(f"date {date}") + + return date + return None + async def async_setup_entry(hass, config_entry, async_add_entities): """Set up the EyeOnWater sensors.""" coordinator = hass.data[DOMAIN][config_entry.entry_id][DATA_COORDINATOR] @@ -28,7 +57,8 @@ async def async_setup_entry(hass, config_entry, async_add_entities): sensors = [] for meter in meters: - sensors.append(EyeOnWaterSensor(meter, coordinator)) + last_imported_time = await get_last_imported_time(hass=hass, meter=meter) + sensors.append(EyeOnWaterSensor(meter, last_imported_time, coordinator)) async_add_entities(sensors, False) @@ -41,7 +71,7 @@ class EyeOnWaterSensor(CoordinatorEntity, SensorEntity): _attr_device_class = SensorDeviceClass.WATER _attr_state_class = SensorStateClass.TOTAL_INCREASING - def __init__(self, meter: Meter, coordinator: DataUpdateCoordinator) -> None: + def __init__(self, meter: Meter, last_imported_time, coordinator: DataUpdateCoordinator) -> None: """Initialize the sensor.""" super().__init__(coordinator) self.meter = meter @@ -54,7 +84,7 @@ def __init__(self, meter: Meter, coordinator: DataUpdateCoordinator) -> None: name=f"{WATER_METER_NAME} {self.meter.meter_id}", ) self._last_historical_data = [] - self._last_imported_time = None + self._last_imported_time = last_imported_time @property def available(self): @@ -80,11 +110,16 @@ def _state_update(self): self._last_historical_data = self.meter.last_historical_data.copy() if self._last_imported_time: + _LOGGER.warning(f"_last_imported_time {self._last_imported_time} - self._last_historical_data {self._last_historical_data[-1]['start']}") self._last_historical_data = list(filter(lambda r: r["start"] > self._last_imported_time, self._last_historical_data)) + _LOGGER.warning(f"{len(self._last_historical_data)} data points will be imported") + + if self._last_historical_data: + self.import_historical_data() + self._last_imported_time = self._last_historical_data[-1]["start"] - self.import_historical_data() self.async_write_ha_state() @@ -104,9 +139,11 @@ def import_historical_data(self): """Import historical data for today and past N days.""" if not self._last_historical_data: - _LOGGER.info("There is no new historical data") + _LOGGER.warning("There is no new historical data") # Nothing to import return + + _LOGGER.warning(f"{len(self._last_historical_data)} data points will be imported") statistics = [ StatisticData( @@ -116,13 +153,8 @@ def import_historical_data(self): for row in self._last_historical_data ] - # # Do not import last 3 hours - # statistics = statistics[:-3] - # if not statistics: - # return - name = f"{WATER_METER_NAME} {self.meter.meter_id}" - statistic_id = name = f"sensor.water_meter_{self.meter.meter_id}" + statistic_id = get_statistics_id(self.meter) metadata = StatisticMetaData( has_mean=False, @@ -133,3 +165,4 @@ def import_historical_data(self): unit_of_measurement=self.meter.native_unit_of_measurement, ) async_import_statistics(self.hass, metadata, statistics) +