diff --git a/howso/client/api.py b/howso/client/api.py index 4ee6feff..c1b63458 100644 --- a/howso/client/api.py +++ b/howso/client/api.py @@ -135,8 +135,8 @@ def get_api(engine_path: t.Optional[Path | str] = None) -> EngineApi: if result[0] == 1 and isinstance(result[1], dict): return EngineApi(result[1]["payload"]) raise ValueError("Invalid response") - except Exception: - raise HowsoError('Failed to retrieve the Howso Engine API schema.') + except Exception as e: + raise HowsoError('Failed to retrieve the Howso Engine API schema.') from e finally: amlg.destroy_entity(entity_id) del amlg diff --git a/howso/client/tests/test_client.py b/howso/client/tests/test_client.py index 697f17dd..16ea6263 100644 --- a/howso/client/tests/test_client.py +++ b/howso/client/tests/test_client.py @@ -195,8 +195,6 @@ def trainee(self, trainee_builder): trainee = trainee_builder.create(features=features, overwrite_trainee=True) try: yield trainee - except Exception: - raise finally: trainee_builder.delete(trainee) diff --git a/howso/direct/client.py b/howso/direct/client.py index 3fe91693..01b54053 100644 --- a/howso/direct/client.py +++ b/howso/direct/client.py @@ -41,6 +41,7 @@ TraineeVersion, ) from howso.client.typing import LibraryType, Persistence +from howso.direct.schemas import DirectTrainee from howso.utilities import internals # Client version @@ -407,16 +408,18 @@ def _auto_persist_trainee(self, trainee_id: str): trainee_id : str The ID of the Trainee to persist. """ - try: - trainee = self.trainee_cache.get(trainee_id) - if trainee.persistence == 'always': - self.amlg.store_entity( - handle=trainee_id, - file_path=self.resolve_trainee_filepath(trainee_id) - ) - except KeyError: - # Trainee not cached, ignore - pass + trainee = self.trainee_cache.get(trainee_id) + if trainee is None: + return + if trainee.persistence != 'always': + return + if getattr(trainee, 'transactional', False): + return + + self.amlg.store_entity( + handle=trainee_id, + file_path=self.resolve_trainee_filepath(trainee_id) + ) def _store_session(self, trainee_id: str, session: Session): """Store session details in a Trainee.""" @@ -442,6 +445,20 @@ def _initialize_trainee(self, trainee_id: str): # If tracing is enabled, log the trainee version self.execute(trainee_id, "get_trainee_version", {}) + def _initialize_transactional_trainee(self, trainee_id: str): + # Create a temporary trainee and initialize it in the normal way, then clone it with transactional mode on. + tmp_id = str(uuid.uuid4()) + self._initialize_trainee(tmp_id) + try: + cloned = self.amlg.clone_entity(tmp_id, trainee_id, + file_path=self.resolve_trainee_filepath(trainee_id), + persist=True, + json_file_params='{"transactional":true,"flatten":true}') + if not cloned: + raise HowsoError(f'Failed to initialize the Trainee "{trainee_id}"') + finally: + self.amlg.destroy_entity(handle=tmp_id) + def _get_trainee_from_engine(self, trainee_id: str) -> Trainee: """ Retrieve the Howso Engine representation of a Trainee object. @@ -468,12 +485,14 @@ def _get_trainee_from_engine(self, trainee_id: str) -> Trainee: persistence = metadata.get('persistence', 'allow') trainee_meta = metadata.get('metadata') trainee_name = metadata.get('name') + transactional = metadata.get('transactional', False) - return Trainee( + return DirectTrainee( name=trainee_name, id=trainee_id, persistence=persistence, metadata=trainee_meta, + transactional=transactional ) def _get_trainee_thread_count(self, trainee_id: str) -> int: @@ -733,8 +752,16 @@ def create_trainee( # noqa: C901 .. deprecated:: 31.0 Pass via `runtime` instead. - runtime : TraineeRuntime, optional - (Not implemented in this client) + runtime : TraineeDirectRuntimeOptions, optional + Additional backend-specific settings. + + * `transactional`: if true, and `persistence='always'`, then write + out an incremental update on each action rather than the entire + state. Generally results in faster operation at the cost of + increased disk utilization. + + .. versionchanged:: 32.1 + Supports the `transactional` parameter. Returns ------- @@ -750,6 +777,10 @@ def create_trainee( # noqa: C901 if features is None: features = {} + if runtime is None: + runtime = {} + transactional = runtime.get('transactional', False) + if library_type is not None: warnings.warn( 'The create trainee parameter `library_type` is deprecated and will be removed in ' @@ -795,13 +826,17 @@ def create_trainee( # noqa: C901 if self.configuration.verbose: print('Creating new Trainee') # Initialize Amalgam entity - self._initialize_trainee(trainee_id) + if transactional: + self._initialize_transactional_trainee(trainee_id) + else: + self._initialize_trainee(trainee_id) # Store the metadata trainee_metadata = dict( name=name, persistence=persistence, - metadata=metadata + metadata=metadata, + transactional=transactional ) self.execute(trainee_id, "set_metadata", {"metadata": trainee_metadata}) @@ -812,11 +847,12 @@ def create_trainee( # noqa: C901 features = internals.postprocess_feature_attributes(features) # Cache and return the trainee - new_trainee = Trainee( + new_trainee = DirectTrainee( name=name, persistence=persistence, id=trainee_id, - metadata=metadata + metadata=metadata, + transactional=transactional ) self.trainee_cache.set(new_trainee, feature_attributes=features) return new_trainee @@ -835,7 +871,7 @@ def update_trainee(self, trainee: Mapping | Trainee) -> Trainee: Trainee The `Trainee` object that was updated. """ - instance = Trainee.from_dict(trainee) if isinstance(trainee, Mapping) else trainee + instance = DirectTrainee.from_dict(trainee) if isinstance(trainee, Mapping) else trainee if not instance.id: raise ValueError("A Trainee id is required.") @@ -848,6 +884,7 @@ def update_trainee(self, trainee: Mapping | Trainee) -> Trainee: 'name': instance.name, 'metadata': instance.metadata, 'persistence': instance.persistence, + 'transactional': getattr(instance, 'transactional', False) } self.execute(instance.id, "set_metadata", {"metadata": metadata}) @@ -1151,6 +1188,7 @@ def copy_trainee( new_trainee_id: t.Optional[str] = None, *, library_type: t.Optional[LibraryType] = None, + persistence: t.Optional[Persistence] = None, resources: t.Optional[Mapping[str, t.Any]] = None, runtime: t.Optional[TraineeRuntimeOptions] = None ) -> Trainee: @@ -1174,6 +1212,12 @@ def copy_trainee( .. deprecated:: 31.0 Pass via `runtime` instead. + persistence : {"allow", "always", "never"}, optional + The requested persistence state of the Trainee. If not specified, + the new trainee will inherit the value from the original. + + .. versionadded:: 32.1 + resources : dict, optional (Not Implemented) Customize the resources provisioned for the Trainee instance. If not specified, the new trainee will inherit @@ -1181,11 +1225,17 @@ def copy_trainee( .. deprecated:: 31.0 Pass via `runtime` instead. - runtime : TraineeRuntimeOptions, optional - Library type, resource requirements, and other runtime settings - for the new Trainee instance. If not specified, the new trainee - will inherit the values from the original. Not used in this - client implementation. + runtime : TraineeDirectRuntimeOptions, optional + Additional backend-specific settings. If not specified, the new + trainee will inherit the values from the original. + + * `transactional`: if true, and `persistence='always'`, then write + out an incremental update on each action rather than the entire + state. Generally results in faster operation at the cost of + increased disk utilization. + + .. versionchanged:: 32.1 + Supports the `transactional` parameter. Returns ------- @@ -1213,9 +1263,19 @@ def copy_trainee( 'The copy trainee parameter `resources` is deprecated and will be removed in ' 'a future release. Please use `runtime` instead.', DeprecationWarning) + transactional = runtime is not None and runtime.get('transactional', False) + if transactional: + persist = True + json_file_params = '{"transactional":true,"flatten":true}' + else: + persist = False + json_file_params = "" + is_cloned = self.amlg.clone_entity( handle=trainee_id, clone_handle=new_trainee_id, + persist=persist, + json_file_params=json_file_params ) if not is_cloned: raise HowsoError( @@ -1225,12 +1285,15 @@ def copy_trainee( # Create the copy trainee new_trainee = deepcopy(original_trainee) + new_trainee = DirectTrainee.from_dict(original_trainee.to_dict()) new_trainee.name = new_trainee_name new_trainee._id = new_trainee_id # type: ignore + new_trainee._transactional = transactional metadata = { 'name': new_trainee.name, 'metadata': new_trainee.metadata, - 'persistence': new_trainee.persistence, + 'persistence': persistence or new_trainee.persistence, + 'transactional': transactional } self.execute(new_trainee_id, "set_metadata", {"metadata": metadata}) # Add new trainee to cache diff --git a/howso/direct/schemas/__init__.py b/howso/direct/schemas/__init__.py new file mode 100644 index 00000000..c9110122 --- /dev/null +++ b/howso/direct/schemas/__init__.py @@ -0,0 +1,6 @@ +from .trainee import DirectTrainee, TraineeDirectRuntimeOptions + +__all__ = [ + "DirectTrainee", + "TraineeDirectRuntimeOptions" +] diff --git a/howso/direct/schemas/trainee.py b/howso/direct/schemas/trainee.py new file mode 100644 index 00000000..2d61f120 --- /dev/null +++ b/howso/direct/schemas/trainee.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +from collections.abc import Mapping +import typing as t +from uuid import UUID + +from typing_extensions import NotRequired, ReadOnly + +from ...client.schemas.trainee import Trainee, TraineeDict, TraineeRuntimeOptions +from ...client.typing import Persistence + + +class DirectTraineeDict(TraineeDict): + """ + Direct-client-specific trainee state. + + .. versionadded:: 32.1 + + """ + + transactional: bool + + +class DirectTrainee(Trainee): + """ + Direct-client-specific internal representation of a trainee. + + .. versionadded:: 32.1 + + """ + + attribute_map = dict(Trainee.attribute_map, transactional='transactional') + + def __init__( + self, + id: str | UUID, + name: t.Optional[str] = None, + *, + metadata: t.Optional[Mapping] = None, + persistence: Persistence = 'allow', + project_id: t.Optional[str | UUID] = None, + transactional: bool = False + ): + """Initialize the Trainee instance.""" + super().__init__(id, name, metadata=metadata, persistence=persistence, project_id=project_id) + self._transactional = transactional + + @property + def transactional(self) -> bool: + """ + Whether this trainee is in transactional mode. + + Returns + ------- + bool + true if this trainee is running in transactional mode + + """ + return self._transactional + + +class TraineeDirectRuntimeOptions(TraineeRuntimeOptions): + """ + Runtime options specific to the direct client. + + .. versionadded:: 32.1 + + """ + + transactional: ReadOnly[NotRequired[bool | None]] + """Use transactional mode when `persistence='always'.""" diff --git a/howso/direct/tests/test_standalone.py b/howso/direct/tests/test_standalone.py index 090bad4e..2a12c34d 100644 --- a/howso/direct/tests/test_standalone.py +++ b/howso/direct/tests/test_standalone.py @@ -1,15 +1,18 @@ +from pathlib import Path + import pytest from amalgam.api import Amalgam from howso.direct import HowsoDirectClient +from howso.direct.schemas.trainee import TraineeDirectRuntimeOptions from howso.utilities.testing import get_configurationless_test_client @pytest.fixture -def client(): +def client(tmp_path: Path): """Direct client instance using latest binaries.""" return get_configurationless_test_client(client_class=HowsoDirectClient, - verbose=True, trace=True) + verbose=True, trace=True, default_persist_path=tmp_path) def test_direct_client(client: HowsoDirectClient): @@ -27,3 +30,41 @@ def test_direct_client(client: HowsoDirectClient): def test_check_name_valid_for_save(client, filename, truthiness): """Ensure that the internal function `check_name_valid_for_save` works.""" assert client.check_name_valid_for_save(filename, clobber=True)[0] == truthiness + + +def test_persistence_always(client: HowsoDirectClient, tmp_path: Path): + """Test that persist-always mode creates a file on disk.""" + trainee = client.create_trainee(persistence='always') + trainee_path = tmp_path / f"{trainee.id}.caml" + client.set_feature_attributes(trainee.id, {"f": {"type": "nominal"}}) + assert trainee_path.exists() + + +def test_persistence_always_shrinks(client: HowsoDirectClient, tmp_path: Path): + """Test that persist-always mode rewrites a file to maybe be smaller.""" + trainee = client.create_trainee(persistence='always') + trainee_path = tmp_path / f"{trainee.id}.caml" + client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"}, + "other_unrelated_feature": {"type": "continuous"}}) + old_size = trainee_path.stat().st_size + client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"}}) + new_size = trainee_path.stat().st_size + # We've deleted a feature so the file should be smaller + assert new_size < old_size + + +def test_persistence_always_transactional_grows(client: HowsoDirectClient, tmp_path: Path): + """Test that transactional mode makes a file larger.""" + trainee = client.create_trainee(persistence='always', runtime=TraineeDirectRuntimeOptions(transactional=True)) + trainee_path = tmp_path / f"{trainee.id}.caml" + client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"}, + "other_unrelated_feature": {"type": "continuous"}}) + old_size = trainee_path.stat().st_size + client.set_feature_attributes(trainee.id, {"feature_1": {"type": "nominal"}}) + # Transactional mode always makes the file larger + new_size = trainee_path.stat().st_size + assert new_size > old_size + client.persist_trainee(trainee.id) + # But now saving should compact the file + new_new_size = trainee_path.stat().st_size + assert new_new_size < old_size diff --git a/howso/engine/tests/test_engine.py b/howso/engine/tests/test_engine.py index 7e794a53..f55d75e9 100644 --- a/howso/engine/tests/test_engine.py +++ b/howso/engine/tests/test_engine.py @@ -1,9 +1,12 @@ from pathlib import Path +import typing as t from pandas.testing import assert_frame_equal import pytest from howso.client.exceptions import HowsoError +from howso.client.schemas.trainee import TraineeRuntimeOptions +from howso.direct.client import HowsoDirectClient from howso.engine import ( delete_trainee, load_trainee, @@ -23,8 +26,6 @@ def trainee(self, data, features): try: yield t - except Exception: - raise finally: t.delete() @@ -212,14 +213,34 @@ def test_save_load_bad_load(self): ): load_trainee(file_path=file_path) - def test_delete_method_standalone_good(self, trainee): + @pytest.mark.parametrize("transactional", [False, True]) + def test_always_persist_load(self, tmp_path: Path, data, features, transactional: bool): + """Test that an auto-persist trainee can be reloaded.""" + runtime = {} + if transactional: + runtime['transactional'] = True + trainee_runtime = t.cast(TraineeRuntimeOptions, runtime) + trainee = Trainee(features=features, persistence="always", runtime=trainee_runtime) + try: + trainee.train(data) + file_path = Path(t.cast(HowsoDirectClient, trainee.client).resolve_trainee_filepath(trainee.id)) + save_path = tmp_path / "save.caml" + save_path.write_bytes(file_path.read_bytes()) + finally: + trainee.delete() + + load_example_trainee = load_trainee(file_path=save_path, persistence="always", runtime=trainee_runtime) + try: + assert load_example_trainee.get_num_training_cases() == 150 + finally: + load_example_trainee.delete() + + def test_delete_method_standalone_good(self, trainee, tmp_path: Path): """Test the standalone trainee deletion method for both strings and Path.""" - # Non-default directory - directory_path = Path('test_directory') # Path and string file path - Path_file_path = directory_path.joinpath('Path_save_load_trainee.caml') - string_file_path = str(directory_path.joinpath('string_save_load_trainee.caml')) + Path_file_path = tmp_path / 'Path_save_load_trainee.caml' + string_file_path = str(tmp_path / 'string_save_load_trainee.caml') # Save two trainees to test deletion trainee.save(file_path=Path_file_path) @@ -232,19 +253,13 @@ def test_delete_method_standalone_good(self, trainee): # Checks to make sure directory is empty assert not any(Path_file_path.parents[0].iterdir()) - # Cleanup - directory_path.rmdir() - - def test_delete_method_trainee_good_save(self, trainee): + def test_delete_method_trainee_good_save(self, trainee, tmp_path: Path): """Test the Trainee deletion function method for saved trainee, should delete from last saved location.""" - # Non-default directory - directory_path = Path('test_directory') - trainee_name = 'delete_trainee' delete_example_trainee = trainee.copy(name=trainee_name) # Path and string file path - file_path = directory_path.joinpath(f'Path_{trainee_name}.caml') + file_path = tmp_path / f'Path_{trainee_name}.caml' # Save trainee to test deletion delete_example_trainee.save(file_path=file_path) @@ -255,19 +270,13 @@ def test_delete_method_trainee_good_save(self, trainee): # Checks to make sure directory is empty assert not any(file_path.parents[0].iterdir()) - # Cleanup - directory_path.rmdir() - - def test_delete_method_trainee_load_good(self, trainee): + def test_delete_method_trainee_load_good(self, trainee, tmp_path: Path): """Test the Trainee deletion function method for loaded trainee, should delete from loaded location.""" - # Non-default directory - directory_path = Path('test_directory') - trainee_name = 'delete_trainee' delete_example_trainee = trainee.copy(name=trainee_name) # Path and string file path - file_path = directory_path.joinpath(f'Path_{trainee_name}.caml') + file_path = tmp_path / f'Path_{trainee_name}.caml' delete_example_trainee.save(file_path=file_path) @@ -281,9 +290,6 @@ def test_delete_method_trainee_load_good(self, trainee): # remove from memory delete_example_trainee.delete() - # Cleanup - directory_path.rmdir() - def test_delete_method_standalone_bad(self): """Test attempting to delete non-existant trainee.""" directory_path = Path('test_directory') diff --git a/howso/engine/trainee.py b/howso/engine/trainee.py index 6af27434..a2d0c03c 100644 --- a/howso/engine/trainee.py +++ b/howso/engine/trainee.py @@ -453,6 +453,7 @@ def copy( name: t.Optional[str] = None, *, library_type: t.Optional[LibraryType] = None, + persistence: t.Optional[Persistence] = None, project: t.Optional[str | BaseProject] = None, resources: t.Optional[Mapping[str, t.Any]] = None, runtime: t.Optional[TraineeRuntimeOptions] = None @@ -471,6 +472,12 @@ def copy( .. deprecated:: 31.0 Pass via `runtime` instead. + persistence : {"allow", "always", "never"}, optional + The requested persistence state of the Trainee. If not specified, + the new trainee will inherit the value from the original. + + .. versionadded:: 32.1 + project : str or Project, optional The instance or id of the project to use for the new trainee. resources : dict, optional @@ -480,11 +487,17 @@ def copy( .. deprecated:: 31.0 Pass via `runtime` instead. - runtime : TraineeRuntimeOptions, optional - Runtime settings for this trainee, including resource requirements. - Takes precedence over `library_type` and `resources`, if either - option is set. If not specified, the new trainee will inherit the - value from the original. + runtime : TraineeDirectRuntimeOptions, optional + Additional backend-specific settings. If not specified, the new + trainee will inherit the values from the original. + + * `transactional`: if true, and `persistence='always'`, then write + out an incremental update on each action rather than the entire + state. Generally results in faster operation at the cost of + increased disk utilization. + + .. versionchanged:: 32.1 + Supports the `transactional` parameter. Returns ------- @@ -500,6 +513,7 @@ def copy( "trainee_id": self.id, "new_trainee_name": name, "library_type": library_type, + "persistence": persistence, "resources": resources, "runtime": runtime, } @@ -3694,9 +3708,7 @@ def from_schema( """ if isinstance(schema, cls) and client is None: return schema - trainee_dict = schema.to_dict() - trainee_dict['client'] = client - return cls.from_dict(trainee_dict) + return cls.from_dict(dict(schema.to_dict(), client=client)) @classmethod def from_dict(cls, schema: Mapping) -> Trainee: @@ -3992,7 +4004,10 @@ def delete_trainee( def load_trainee( file_path: PathLike, - client: t.Optional[AbstractHowsoClient] = None + client: t.Optional[AbstractHowsoClient] = None, + *, + persistence: Persistence = 'allow', + runtime: t.Optional[TraineeRuntimeOptions] = None ) -> Trainee: """ Load an existing trainee from disk. @@ -4015,6 +4030,15 @@ def load_trainee( client : AbstractHowsoClient, optional The Howso client instance to use. Must have local disk access. + persistence : {"allow", "always", "never"}, default "allow" + The requested persistence state of the trainee. + + .. versionadded:: 32.1 + + runtime : TraineeRuntimeOptions, optional + Runtime settings for this trainee, including resource requirements. + + .. versionadded:: 32.1 Returns ------- @@ -4022,6 +4046,7 @@ def load_trainee( The trainee instance. """ client = client or get_client() + runtime = runtime or {} if not isinstance(client, LocalSaveableProtocol): raise HowsoError("The current client does not support loading a Trainee from file.") @@ -4037,7 +4062,7 @@ def load_trainee( # file name. if file_path.suffix: # Check to make sure sure `.caml` file is provided - if file_path.suffix.lower() != '.caml': + if file_path.suffix.lower() not in ('.caml', '.amlg'): raise HowsoError( 'Filepath with a non `.caml` extension was provided.' ) @@ -4055,10 +4080,19 @@ def load_trainee( raise HowsoError( f'The specified directory "{file_path.parents[0]}" does not exist.') - status = client.amlg.load_entity( - handle=trainee_id, - file_path=str(file_path) - ) + if persistence == 'always' and runtime.get('transactional', False): + status = client.amlg.load_entity( + handle=trainee_id, + file_path=str(file_path), + persist=True, + json_file_params=('{"transactional":true,"flatten":true,"execute_on_load":true,' + '"require_version_compatibility":true}') + ) + else: + status = client.amlg.load_entity( + handle=trainee_id, + file_path=str(file_path) + ) if not status.loaded: raise HowsoError(f"Trainee from file '{file_path}' not found.")