diff --git a/kgforge/core/archetypes/store.py b/kgforge/core/archetypes/store.py index b65435d5..b499fb04 100644 --- a/kgforge/core/archetypes/store.py +++ b/kgforge/core/archetypes/store.py @@ -155,7 +155,8 @@ def mapper(self) -> Type[Mapper]: # [C]RUD. def register( - self, data: Union[Resource, List[Resource]], schema_id: str = None + self, data: Union[Resource, List[Resource]], schema_id: str = None, + context: Optional[Context] = None ) -> None: # Replace None by self._register_many to switch to optimized bulk registration. run( @@ -167,9 +168,10 @@ def register( exception=RegistrationError, monitored_status="_synchronized", schema_id=schema_id, + context=context ) - def _register_many(self, resources: List[Resource], schema_id: str) -> None: + def _register_many(self, resources: List[Resource], schema_id: str, ) -> None: # Bulk registration could be optimized by overriding this method in the specialization. # POLICY Should reproduce self._register_one() and execution._run_one() behaviours. not_supported() @@ -323,7 +325,8 @@ def _download_one( # CR[U]D. def update( - self, data: Union[Resource, List[Resource]], schema_id: Optional[str] + self, data: Union[Resource, List[Resource]], schema_id: Optional[str], + context: Optional[Context] = None ) -> None: # Replace None by self._update_many to switch to optimized bulk update. run( @@ -336,15 +339,21 @@ def update( exception=UpdatingError, monitored_status="_synchronized", schema_id=schema_id, + context=context ) - def _update_many(self, resources: List[Resource], schema_id: Optional[str]) -> None: + def _update_many( + self, resources: List[Resource], schema_id: Optional[str], + context: Optional[Context] = None + ) -> None: # Bulk update could be optimized by overriding this method in the specialization. # POLICY Should reproduce self._update_one() and execution._run_one() behaviours. not_supported() @abstractmethod - def _update_one(self, resource: Resource, schema_id: Optional[str]) -> None: + def _update_one( + self, resource: Resource, schema_id: Optional[str], context: Optional[Context] = None + ) -> None: # POLICY Should notify of failures with exception UpdatingError including a message. # POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict(). # TODO This operation might be abstracted here when other stores will be implemented. diff --git a/kgforge/core/forge.py b/kgforge/core/forge.py index db28427c..b437984e 100644 --- a/kgforge/core/forge.py +++ b/kgforge/core/forge.py @@ -21,6 +21,7 @@ from rdflib import Graph from kgforge.core import Resource +from kgforge.core.commons import Context from kgforge.core.commons.files import load_file_as_byte from kgforge.core.archetypes import Mapping, Model, Resolver, Store from kgforge.core.commons.actions import LazyAction @@ -701,7 +702,8 @@ def download( # No @catch because the error handling is done by execution.run(). def register( - self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None + self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None, + context: Optional[Context] = None ) -> None: """ Store a resource or list of resources in the configured Store. @@ -710,11 +712,12 @@ def register( :param schema_id: an identifier of the schema the registered resources should conform to """ # self._store.mapper = self._store.mapper(self) - self._store.register(data, schema_id) + self._store.register(data, schema_id, context) # No @catch because the error handling is done by execution.run(). def update( - self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None + self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None, + context: Optional[Context] = None ) -> None: """ Update a resource or a list of resources in the configured Store. @@ -722,7 +725,7 @@ def update( :param data: the resources to update :param schema_id: an identifier of the schema the updated resources should conform to """ - self._store.update(data, schema_id) + self._store.update(data, schema_id, context) # No @catch because the error handling is done by execution.run(). def deprecate(self, data: Union[Resource, List[Resource]]) -> None: diff --git a/kgforge/specializations/stores/bluebrain_nexus.py b/kgforge/specializations/stores/bluebrain_nexus.py index 7294295c..e3d15c4a 100644 --- a/kgforge/specializations/stores/bluebrain_nexus.py +++ b/kgforge/specializations/stores/bluebrain_nexus.py @@ -134,7 +134,8 @@ def mapper(self) -> Optional[DictionaryMapper]: return DictionaryMapper def register( - self, data: Union[Resource, List[Resource]], schema_id: str = None + self, data: Union[Resource, List[Resource]], schema_id: str = None, context: + Optional[Context] = None ) -> None: run( self._register_one, @@ -145,9 +146,12 @@ def register( exception=RegistrationError, monitored_status="_synchronized", schema_id=schema_id, + context=context ) - def _register_many(self, resources: List[Resource], schema_id: str) -> None: + def _register_many( + self, resources: List[Resource], schema_id: str, context: Optional[Context] = None + ) -> None: def register_callback(task: Task): result = task.result() if isinstance(result.response, Exception): @@ -161,11 +165,11 @@ def register_callback(task: Task): else: result.resource.id = result.response["@id"] if not hasattr(result.resource, "context"): - context = self.model_context or self.context + context_value = context or self.model_context or self.context result.resource.context = ( - context.iri - if context.is_http_iri() - else context.document["@context"] + context_value.iri + if context_value.is_http_iri() + else context_value.document["@context"] ) self.service.synchronize_resource( result.resource, @@ -184,22 +188,28 @@ def register_callback(task: Task): execute_actions=True, ) params_register = copy.deepcopy(self.service.params.get("register", {})) + + context_value = context or self.model_context or self.context + self.service.batch_request( verified, BatchAction.CREATE, register_callback, RegistrationError, schema_id=schema_id, + context=context_value, params=params_register, ) - def _register_one(self, resource: Resource, schema_id: str) -> None: - context = self.model_context or self.context + def _register_one( + self, resource: Resource, schema_id: str, context: Optional[Context] = None + ) -> None: + context_value = context or self.model_context or self.context data = as_jsonld( resource, "compacted", False, - model_context=context, + model_context=context_value, metadata_context=None, context_resolver=self.service.resolve_context ) @@ -525,7 +535,10 @@ def _prepare_download_one( # CR[U]D. - def update(self, data: Union[Resource, List[Resource]], schema_id: str) -> None: + def update( + self, data: Union[Resource, List[Resource]], schema_id: str, + context: Optional[Context] = None + ) -> None: run( self._update_one, self._update_many, @@ -536,9 +549,12 @@ def update(self, data: Union[Resource, List[Resource]], schema_id: str) -> None: exception=UpdatingError, monitored_status="_synchronized", schema_id=schema_id, + context=context ) - def _update_many(self, resources: List[Resource], schema_id: str) -> None: + def _update_many( + self, resources: List[Resource], schema_id: str, context: Optional[Context] = None + ) -> None: update_callback = self.service.default_callback(self._update_many.__name__) verified = self.service.verify( resources, @@ -555,15 +571,18 @@ def _update_many(self, resources: List[Resource], schema_id: str) -> None: update_callback, UpdatingError, params=params_update, + context=context ) - def _update_one(self, resource: Resource, schema_id: str) -> None: - context = self.model_context or self.context + def _update_one( + self, resource: Resource, schema_id: str, context: Optional[Context] = None + ) -> None: + context_value = context or self.model_context or self.context data = as_jsonld( resource, "compacted", False, - model_context=context, + model_context=context_value, metadata_context=None, context_resolver=self.service.resolve_context ) diff --git a/kgforge/specializations/stores/demo_store.py b/kgforge/specializations/stores/demo_store.py index 997f9948..649a8f2c 100644 --- a/kgforge/specializations/stores/demo_store.py +++ b/kgforge/specializations/stores/demo_store.py @@ -49,7 +49,9 @@ def mapper(self) -> Type[Mapper]: # [C]RUD. - def _register_one(self, resource: Resource, schema_id: str) -> None: + def _register_one( + self, resource: Resource, schema_id: str, context: Optional[Context] = None + ) -> None: data = as_json(resource, expanded=False, store_metadata=False, model_context=None, metadata_context=None, context_resolver=None) try: @@ -75,7 +77,7 @@ def retrieve(self, id_: str, version: Optional[Union[int, str]], # CR[U]D. - def _update_one(self, resource: Resource, schema_id: str) -> None: + def _update_one(self, resource: Resource, schema_id: str, context: Optional[Context] = None) -> None: data = as_json(resource, expanded=False, store_metadata=False, model_context=None, metadata_context=None, context_resolver=None) try: diff --git a/tests/specializations/stores/test_demo_store.py b/tests/specializations/stores/test_demo_store.py index af9ee081..b6436b7f 100644 --- a/tests/specializations/stores/test_demo_store.py +++ b/tests/specializations/stores/test_demo_store.py @@ -53,7 +53,7 @@ def register(capsys, store, data, rc, err, msg): @when("I register the resource. An exception is raised. The printed report does mention an error: 'Exception: exception raised'.") def register_exception(monkeypatch, capsys, store, data): - def _register_one(_, x, schema_id): raise Exception("exception raised") + def _register_one(_, x, schema_id, context): raise Exception("exception raised") monkeypatch.setattr("kgforge.specializations.stores.demo_store.DemoStore._register_one", _register_one) store.register(data) out = capsys.readouterr().out[:-1]