Skip to content

Commit

Permalink
Avoid expensive db queries at startup to check if data is migrated (h…
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Mar 20, 2024
1 parent 417b491 commit 06f356a
Show file tree
Hide file tree
Showing 7 changed files with 403 additions and 50 deletions.
65 changes: 25 additions & 40 deletions homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@

from . import migration, statistics
from .const import (
CONTEXT_ID_AS_BINARY_SCHEMA_VERSION,
DB_WORKER_PREFIX,
DOMAIN,
ESTIMATED_QUEUE_ITEM_SIZE,
EVENT_TYPE_IDS_SCHEMA_VERSION,
KEEPALIVE_TIME,
LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION,
MARIADB_PYMYSQL_URL_PREFIX,
Expand All @@ -58,7 +56,6 @@
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY,
SQLITE_MAX_BIND_VARS,
SQLITE_URL_PREFIX,
STATES_META_SCHEMA_VERSION,
STATISTICS_ROWS_SCHEMA_VERSION,
SupportedDialect,
)
Expand All @@ -78,14 +75,15 @@
StatisticsShortTerm,
)
from .executor import DBInterruptibleThreadPoolExecutor
from .migration import (
EntityIDMigration,
EventsContextIDMigration,
EventTypeIDMigration,
StatesContextIDMigration,
)
from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect
from .pool import POOL_SIZE, MutexPool, RecorderPool
from .queries import (
has_entity_ids_to_migrate,
has_event_type_to_migrate,
has_events_context_ids_to_migrate,
has_states_context_ids_to_migrate,
)
from .queries import get_migration_changes
from .table_managers.event_data import EventDataManager
from .table_managers.event_types import EventTypeManager
from .table_managers.recorder_runs import RecorderRunsManager
Expand All @@ -101,17 +99,13 @@
CommitTask,
CompileMissingStatisticsTask,
DatabaseLockTask,
EntityIDMigrationTask,
EntityIDPostMigrationTask,
EventIdMigrationTask,
EventsContextIDMigrationTask,
EventTypeIDMigrationTask,
ImportStatisticsTask,
KeepAliveTask,
PerodicCleanupTask,
PurgeTask,
RecorderTask,
StatesContextIDMigrationTask,
StatisticsTask,
StopTask,
SynchronizeTask,
Expand Down Expand Up @@ -783,44 +777,35 @@ def _run(self) -> None:

def _activate_and_set_db_ready(self) -> None:
"""Activate the table managers or schedule migrations and mark the db as ready."""
with session_scope(session=self.get_session(), read_only=True) as session:
with session_scope(session=self.get_session()) as session:
# Prime the statistics meta manager as soon as possible
# since we want the frontend queries to avoid a thundering
# herd of queries to find the statistics meta data if
# there are a lot of statistics graphs on the frontend.
if self.schema_version >= STATISTICS_ROWS_SCHEMA_VERSION:
schema_version = self.schema_version
if schema_version >= STATISTICS_ROWS_SCHEMA_VERSION:
self.statistics_meta_manager.load(session)

if (
self.schema_version < CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
or execute_stmt_lambda_element(
session, has_states_context_ids_to_migrate()
)
):
self.queue_task(StatesContextIDMigrationTask())
migration_changes: dict[str, int] = {
row[0]: row[1]
for row in execute_stmt_lambda_element(session, get_migration_changes())
}

if (
self.schema_version < CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
or execute_stmt_lambda_element(
session, has_events_context_ids_to_migrate()
)
):
self.queue_task(EventsContextIDMigrationTask())

if (
self.schema_version < EVENT_TYPE_IDS_SCHEMA_VERSION
or execute_stmt_lambda_element(session, has_event_type_to_migrate())
):
self.queue_task(EventTypeIDMigrationTask())
for migrator_cls in (StatesContextIDMigration, EventsContextIDMigration):
migrator = migrator_cls(session, schema_version, migration_changes)
if migrator.needs_migrate():
self.queue_task(migrator.task())

migrator = EventTypeIDMigration(session, schema_version, migration_changes)
if migrator.needs_migrate():
self.queue_task(migrator.task())
else:
_LOGGER.debug("Activating event_types manager as all data is migrated")
self.event_type_manager.active = True

if (
self.schema_version < STATES_META_SCHEMA_VERSION
or execute_stmt_lambda_element(session, has_entity_ids_to_migrate())
):
self.queue_task(EntityIDMigrationTask())
migrator = EntityIDMigration(session, schema_version, migration_changes)
if migrator.needs_migrate():
self.queue_task(migrator.task())
else:
_LOGGER.debug("Activating states_meta manager as all data is migrated")
self.states_meta_manager.active = True
Expand Down
11 changes: 11 additions & 0 deletions homeassistant/components/recorder/db_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class Base(DeclarativeBase):
TABLE_STATISTICS_META = "statistics_meta"
TABLE_STATISTICS_RUNS = "statistics_runs"
TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
TABLE_MIGRATION_CHANGES = "migration_changes"

STATISTICS_TABLES = ("statistics", "statistics_short_term")

Expand All @@ -100,6 +101,7 @@ class Base(DeclarativeBase):
TABLE_EVENT_TYPES,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
TABLE_MIGRATION_CHANGES,
TABLE_STATES_META,
TABLE_STATISTICS,
TABLE_STATISTICS_META,
Expand Down Expand Up @@ -771,6 +773,15 @@ def to_native(self, validate_entity_id: bool = True) -> Self:
return self


class MigrationChanges(Base):
"""Representation of migration changes."""

__tablename__ = TABLE_MIGRATION_CHANGES

migration_id: Mapped[str] = mapped_column(String(255), primary_key=True)
version: Mapped[int] = mapped_column(SmallInteger)


class SchemaChanges(Base):
"""Representation of schema version changes."""

Expand Down
135 changes: 130 additions & 5 deletions homeassistant/components/recorder/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable
import contextlib
from dataclasses import dataclass, replace as dataclass_replace
Expand All @@ -25,6 +26,7 @@
from sqlalchemy.orm.session import Session
from sqlalchemy.schema import AddConstraint, DropConstraint
from sqlalchemy.sql.expression import true
from sqlalchemy.sql.lambdas import StatementLambdaElement

from homeassistant.core import HomeAssistant
from homeassistant.util.enum import try_parse_enum
Expand All @@ -46,7 +48,12 @@
correct_db_schema as statistics_correct_db_schema,
validate_db_schema as statistics_validate_db_schema,
)
from .const import SupportedDialect
from .const import (
CONTEXT_ID_AS_BINARY_SCHEMA_VERSION,
EVENT_TYPE_IDS_SCHEMA_VERSION,
STATES_META_SCHEMA_VERSION,
SupportedDialect,
)
from .db_schema import (
CONTEXT_ID_BIN_MAX_LENGTH,
DOUBLE_PRECISION_TYPE_SQL,
Expand All @@ -60,6 +67,7 @@
Base,
Events,
EventTypes,
MigrationChanges,
SchemaChanges,
States,
StatesMeta,
Expand All @@ -80,18 +88,28 @@
find_states_context_ids_to_migrate,
find_unmigrated_short_term_statistics_rows,
find_unmigrated_statistics_rows,
has_entity_ids_to_migrate,
has_event_type_to_migrate,
has_events_context_ids_to_migrate,
has_states_context_ids_to_migrate,
has_used_states_event_ids,
migrate_single_short_term_statistics_row_to_timestamp,
migrate_single_statistics_row_to_timestamp,
)
from .statistics import get_start_time
from .tasks import (
CommitTask,
EntityIDMigrationTask,
EventsContextIDMigrationTask,
EventTypeIDMigrationTask,
PostSchemaMigrationTask,
RecorderTask,
StatesContextIDMigrationTask,
StatisticsTimestampMigrationCleanupTask,
)
from .util import (
database_job_retry_wrapper,
execute_stmt_lambda_element,
get_index_by_name,
retryable_database_job,
session_scope,
Expand Down Expand Up @@ -1478,7 +1496,8 @@ def migrate_states_context_ids(instance: Recorder) -> bool:
)
# If there is more work to do return False
# so that we can be called again
is_done = not states
if is_done := not states:
_mark_migration_done(session, StatesContextIDMigration)

if is_done:
_drop_index(session_maker, "states", "ix_states_context_id")
Expand Down Expand Up @@ -1515,7 +1534,8 @@ def migrate_events_context_ids(instance: Recorder) -> bool:
)
# If there is more work to do return False
# so that we can be called again
is_done = not events
if is_done := not events:
_mark_migration_done(session, EventsContextIDMigration)

if is_done:
_drop_index(session_maker, "events", "ix_events_context_id")
Expand Down Expand Up @@ -1580,7 +1600,8 @@ def migrate_event_type_ids(instance: Recorder) -> bool:

# If there is more work to do return False
# so that we can be called again
is_done = not events
if is_done := not events:
_mark_migration_done(session, EventTypeIDMigration)

if is_done:
instance.event_type_manager.active = True
Expand Down Expand Up @@ -1654,7 +1675,8 @@ def migrate_entity_ids(instance: Recorder) -> bool:

# If there is more work to do return False
# so that we can be called again
is_done = not states
if is_done := not states:
_mark_migration_done(session, EntityIDMigration)

_LOGGER.debug("Migrating entity_ids done=%s", is_done)
return is_done
Expand Down Expand Up @@ -1757,3 +1779,106 @@ def initialize_database(session_maker: Callable[[], Session]) -> bool:
except Exception as err: # pylint: disable=broad-except
_LOGGER.exception("Error when initialise database: %s", err)
return False


class BaseRunTimeMigration(ABC):
"""Base class for run time migrations."""

required_schema_version = 0
migration_version = 1
migration_id: str
task: Callable[[], RecorderTask]

def __init__(
self, session: Session, schema_version: int, migration_changes: dict[str, int]
) -> None:
"""Initialize a new BaseRunTimeMigration."""
self.schema_version = schema_version
self.session = session
self.migration_changes = migration_changes

@abstractmethod
def needs_migrate_query(self) -> StatementLambdaElement:
"""Return the query to check if the migration needs to run."""

def needs_migrate(self) -> bool:
"""Return if the migration needs to run.
If the migration needs to run, it will return True.
If the migration does not need to run, it will return False and
mark the migration as done in the database if its not already
marked as done.
"""
if self.schema_version < self.required_schema_version:
# Schema is too old, we must have to migrate
return True
if self.migration_changes.get(self.migration_id, -1) >= self.migration_version:
# The migration changes table indicates that the migration has been done
return False
# We do not know if the migration is done from the
# migration changes table so we must check the data
# This is the slow path
if not execute_stmt_lambda_element(self.session, self.needs_migrate_query()):
_mark_migration_done(self.session, self.__class__)
return False
return True


class StatesContextIDMigration(BaseRunTimeMigration):
"""Migration to migrate states context_ids to binary format."""

required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
migration_id = "state_context_id_as_binary"
task = StatesContextIDMigrationTask

def needs_migrate_query(self) -> StatementLambdaElement:
"""Return the query to check if the migration needs to run."""
return has_states_context_ids_to_migrate()


class EventsContextIDMigration(BaseRunTimeMigration):
"""Migration to migrate events context_ids to binary format."""

required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
migration_id = "event_context_id_as_binary"
task = EventsContextIDMigrationTask

def needs_migrate_query(self) -> StatementLambdaElement:
"""Return the query to check if the migration needs to run."""
return has_events_context_ids_to_migrate()


class EventTypeIDMigration(BaseRunTimeMigration):
"""Migration to migrate event_type to event_type_ids."""

required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION
migration_id = "event_type_id_migration"
task = EventTypeIDMigrationTask

def needs_migrate_query(self) -> StatementLambdaElement:
"""Check if the data is migrated."""
return has_event_type_to_migrate()


class EntityIDMigration(BaseRunTimeMigration):
"""Migration to migrate entity_ids to states_meta."""

required_schema_version = STATES_META_SCHEMA_VERSION
migration_id = "entity_id_migration"
task = EntityIDMigrationTask

def needs_migrate_query(self) -> StatementLambdaElement:
"""Check if the data is migrated."""
return has_entity_ids_to_migrate()


def _mark_migration_done(
session: Session, migration: type[BaseRunTimeMigration]
) -> None:
"""Mark a migration as done in the database."""
session.merge(
MigrationChanges(
migration_id=migration.migration_id, version=migration.migration_version
)
)
8 changes: 8 additions & 0 deletions homeassistant/components/recorder/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
EventData,
Events,
EventTypes,
MigrationChanges,
RecorderRuns,
StateAttributes,
States,
Expand Down Expand Up @@ -812,6 +813,13 @@ def find_states_context_ids_to_migrate(max_bind_vars: int) -> StatementLambdaEle
)


def get_migration_changes() -> StatementLambdaElement:
"""Query the database for previous migration changes."""
return lambda_stmt(
lambda: select(MigrationChanges.migration_id, MigrationChanges.version)
)


def find_event_types_to_purge() -> StatementLambdaElement:
"""Find event_type_ids to purge."""
return lambda_stmt(
Expand Down
Loading

0 comments on commit 06f356a

Please sign in to comment.