Skip to content

Commit

Permalink
22329: Transactional support in direct client, MINOR
Browse files Browse the repository at this point in the history
Adds runtime options to the direct client.  If you create a trainee as

    Trainee(persistence='always', runtime={'transactional': True})

then every operation will append a log entry to the .caml file.  This is
faster per operation, and all of the data continues to be recorded on
disk, but the resulting files will be larger than always-persist mode
without the transactional option.
  • Loading branch information
dmaze committed Dec 18, 2024
1 parent 51662ce commit 1ce9f37
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 75 deletions.
4 changes: 2 additions & 2 deletions howso/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def get_api(engine_path: t.Optional[Path | str] = None) -> EngineApi:
if result[0] == 1 and isinstance(result[1], dict):
return EngineApi(result[1]["payload"])
raise ValueError("Invalid response")
except Exception:
raise HowsoError('Failed to retrieve the Howso Engine API schema.')
except Exception as e:
raise HowsoError('Failed to retrieve the Howso Engine API schema.') from e
finally:
amlg.destroy_entity(entity_id)
del amlg
2 changes: 0 additions & 2 deletions howso/client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,6 @@ def trainee(self, trainee_builder):
trainee = trainee_builder.create(features=features, overwrite_trainee=True)
try:
yield trainee
except Exception:
raise
finally:
trainee_builder.delete(trainee)

Expand Down
124 changes: 98 additions & 26 deletions howso/direct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
TraineeVersion,
)
from howso.client.typing import LibraryType, Persistence
from howso.direct.schemas import DirectTrainee
from howso.utilities import internals

# Client version
Expand Down Expand Up @@ -407,16 +408,18 @@ def _auto_persist_trainee(self, trainee_id: str):
trainee_id : str
The ID of the Trainee to persist.
"""
try:
trainee = self.trainee_cache.get(trainee_id)
if trainee.persistence == 'always':
self.amlg.store_entity(
handle=trainee_id,
file_path=self.resolve_trainee_filepath(trainee_id)
)
except KeyError:
# Trainee not cached, ignore
pass
trainee = self.trainee_cache.get(trainee_id)
if trainee is None:
return
if trainee.persistence != 'always':
return
if getattr(trainee, 'transactional', False):
return

self.amlg.store_entity(
handle=trainee_id,
file_path=self.resolve_trainee_filepath(trainee_id)
)

def _store_session(self, trainee_id: str, session: Session):
"""Store session details in a Trainee."""
Expand All @@ -442,6 +445,20 @@ def _initialize_trainee(self, trainee_id: str):
# If tracing is enabled, log the trainee version
self.execute(trainee_id, "get_trainee_version", {})

def _initialize_transactional_trainee(self, trainee_id: str):
# Create a temporary trainee and initialize it in the normal way, then clone it with transactional mode on.
tmp_id = str(uuid.uuid4())
self._initialize_trainee(tmp_id)
try:
cloned = self.amlg.clone_entity(tmp_id, trainee_id,
file_path=self.resolve_trainee_filepath(trainee_id),
persist=True,
json_file_params='{"transactional":true,"flatten":true}')
if not cloned:
raise HowsoError(f'Failed to initialize the Trainee "{trainee_id}"')
finally:
self.amlg.destroy_entity(handle=tmp_id)

def _get_trainee_from_engine(self, trainee_id: str) -> Trainee:
"""
Retrieve the Howso Engine representation of a Trainee object.
Expand All @@ -468,12 +485,14 @@ def _get_trainee_from_engine(self, trainee_id: str) -> Trainee:
persistence = metadata.get('persistence', 'allow')
trainee_meta = metadata.get('metadata')
trainee_name = metadata.get('name')
transactional = metadata.get('transactional', False)

return Trainee(
return DirectTrainee(
name=trainee_name,
id=trainee_id,
persistence=persistence,
metadata=trainee_meta,
transactional=transactional
)

def _get_trainee_thread_count(self, trainee_id: str) -> int:
Expand Down Expand Up @@ -733,8 +752,16 @@ def create_trainee( # noqa: C901
.. deprecated:: 31.0
Pass via `runtime` instead.
runtime : TraineeRuntime, optional
(Not implemented in this client)
runtime : TraineeDirectRuntimeOptions, optional
Additional backend-specific settings.
* `transactional`: if true, and `persistence='always'`, then write
out an incremental update on each action rather than the entire
state. Generally results in faster operation at the cost of
increased disk utilization.
.. versionchanged:: 33.1
Supports the `transactional` parameter.
Returns
-------
Expand All @@ -750,6 +777,10 @@ def create_trainee( # noqa: C901
if features is None:
features = {}

if runtime is None:
runtime = {}
transactional = runtime.get('transactional', False)

if library_type is not None:
warnings.warn(
'The create trainee parameter `library_type` is deprecated and will be removed in '
Expand Down Expand Up @@ -795,13 +826,17 @@ def create_trainee( # noqa: C901
if self.configuration.verbose:
print('Creating new Trainee')
# Initialize Amalgam entity
self._initialize_trainee(trainee_id)
if transactional:
self._initialize_transactional_trainee(trainee_id)
else:
self._initialize_trainee(trainee_id)

# Store the metadata
trainee_metadata = dict(
name=name,
persistence=persistence,
metadata=metadata
metadata=metadata,
transactional=transactional
)
self.execute(trainee_id, "set_metadata", {"metadata": trainee_metadata})

Expand All @@ -812,11 +847,12 @@ def create_trainee( # noqa: C901
features = internals.postprocess_feature_attributes(features)

# Cache and return the trainee
new_trainee = Trainee(
new_trainee = DirectTrainee(
name=name,
persistence=persistence,
id=trainee_id,
metadata=metadata
metadata=metadata,
transactional=transactional
)
self.trainee_cache.set(new_trainee, feature_attributes=features)
return new_trainee
Expand All @@ -835,7 +871,7 @@ def update_trainee(self, trainee: Mapping | Trainee) -> Trainee:
Trainee
The `Trainee` object that was updated.
"""
instance = Trainee.from_dict(trainee) if isinstance(trainee, Mapping) else trainee
instance = DirectTrainee.from_dict(trainee) if isinstance(trainee, Mapping) else trainee

if not instance.id:
raise ValueError("A Trainee id is required.")
Expand All @@ -848,6 +884,7 @@ def update_trainee(self, trainee: Mapping | Trainee) -> Trainee:
'name': instance.name,
'metadata': instance.metadata,
'persistence': instance.persistence,
'transactional': getattr(instance, 'transactional', False)
}
self.execute(instance.id, "set_metadata", {"metadata": metadata})

Expand Down Expand Up @@ -1151,6 +1188,7 @@ def copy_trainee(
new_trainee_id: t.Optional[str] = None,
*,
library_type: t.Optional[LibraryType] = None,
persistence: t.Optional[Persistence] = None,
resources: t.Optional[Mapping[str, t.Any]] = None,
runtime: t.Optional[TraineeRuntimeOptions] = None
) -> Trainee:
Expand All @@ -1174,18 +1212,30 @@ def copy_trainee(
.. deprecated:: 31.0
Pass via `runtime` instead.
persistence : {"allow", "always", "never"}, optional
The requested persistence state of the Trainee. If not specified,
the new trainee will inherit the value from the original.
.. versionadded:: 33.1
resources : dict, optional
(Not Implemented) Customize the resources provisioned for the
Trainee instance. If not specified, the new trainee will inherit
the value from the original.
.. deprecated:: 31.0
Pass via `runtime` instead.
runtime : TraineeRuntimeOptions, optional
Library type, resource requirements, and other runtime settings
for the new Trainee instance. If not specified, the new trainee
will inherit the values from the original. Not used in this
client implementation.
runtime : TraineeDirectRuntimeOptions, optional
Additional backend-specific settings. If not specified, the new
trainee will inherit the values from the original.
* `transactional`: if true, and `persistence='always'`, then write
out an incremental update on each action rather than the entire
state. Generally results in faster operation at the cost of
increased disk utilization.
.. versionchanged:: 33.1
Supports the `transactional` parameter.
Returns
-------
Expand Down Expand Up @@ -1213,9 +1263,25 @@ def copy_trainee(
'The copy trainee parameter `resources` is deprecated and will be removed in '
'a future release. Please use `runtime` instead.', DeprecationWarning)

persistence = persistence or original_trainee.persistence
transactional = False
if persistence == 'always':
if runtime is not None and 'transactional' in runtime:
transactional = runtime['transactional']
else:
transactional = getattr(original_trainee, 'transactional', False)
if transactional:
persist = True
json_file_params = '{"transactional":true,"flatten":true}'
else:
persist = False
json_file_params = ""

is_cloned = self.amlg.clone_entity(
handle=trainee_id,
clone_handle=new_trainee_id,
persist=persist,
json_file_params=json_file_params
)
if not is_cloned:
raise HowsoError(
Expand All @@ -1224,13 +1290,15 @@ def copy_trainee(
f'binaries or camls, or a Trainee "{new_trainee_name}" already exists.')

# Create the copy trainee
new_trainee = deepcopy(original_trainee)
new_trainee = DirectTrainee.from_dict(original_trainee.to_dict())
new_trainee.name = new_trainee_name
new_trainee._id = new_trainee_id # type: ignore
new_trainee._transactional = transactional
metadata = {
'name': new_trainee.name,
'metadata': new_trainee.metadata,
'persistence': new_trainee.persistence,
'persistence': persistence,
'transactional': transactional
}
self.execute(new_trainee_id, "set_metadata", {"metadata": metadata})
# Add new trainee to cache
Expand Down Expand Up @@ -1344,18 +1412,22 @@ def persist_trainee(self, trainee_id: str):
if self.configuration.verbose:
print(f'Saving Trainee with id: {trainee_id}')

transactional = False
if trainee_id in self.trainee_cache:
trainee = self.trainee_cache.get(trainee_id)
if trainee.persistence == 'never':
raise AssertionError(
"Trainee is set to never persist. Update the trainee "
"persistence option to enable persistence.")
transactional = getattr(trainee, 'transactional', False)
# Enable auto persistence
trainee.persistence = 'always'

self.amlg.store_entity(
handle=trainee_id,
file_path=self.resolve_trainee_filepath(trainee_id)
file_path=self.resolve_trainee_filepath(trainee_id),
persist=transactional,
json_file_params='{"transactional":true,"flatten":true}' if transactional else ""
)

def begin_session(self, name: str | None = "default", metadata: t.Optional[Mapping] = None) -> Session:
Expand Down
6 changes: 6 additions & 0 deletions howso/direct/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .trainee import DirectTrainee, TraineeDirectRuntimeOptions

__all__ = [
"DirectTrainee",
"TraineeDirectRuntimeOptions"
]
71 changes: 71 additions & 0 deletions howso/direct/schemas/trainee.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from __future__ import annotations

from collections.abc import Mapping
import typing as t
from uuid import UUID

from typing_extensions import NotRequired, ReadOnly

from ...client.schemas.trainee import Trainee, TraineeDict, TraineeRuntimeOptions
from ...client.typing import Persistence


class DirectTraineeDict(TraineeDict):
"""
Direct-client-specific trainee state.
.. versionadded:: 33.1
"""

transactional: bool


class DirectTrainee(Trainee):
"""
Direct-client-specific internal representation of a trainee.
.. versionadded:: 33.1
"""

attribute_map = dict(Trainee.attribute_map, transactional='transactional')

def __init__(
self,
id: str | UUID,
name: t.Optional[str] = None,
*,
metadata: t.Optional[Mapping] = None,
persistence: Persistence = 'allow',
project_id: t.Optional[str | UUID] = None,
transactional: bool = False
):
"""Initialize the Trainee instance."""
super().__init__(id, name, metadata=metadata, persistence=persistence, project_id=project_id)
self._transactional = transactional

@property
def transactional(self) -> bool:
"""
Whether this trainee is in transactional mode.
Returns
-------
bool
true if this trainee is running in transactional mode
"""
return self._transactional


class TraineeDirectRuntimeOptions(TraineeRuntimeOptions):
"""
Runtime options specific to the direct client.
.. versionadded:: 33.1
"""

transactional: ReadOnly[NotRequired[bool | None]]
"""Use transactional mode when `persistence='always'."""
Loading

0 comments on commit 1ce9f37

Please sign in to comment.