Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional context to register/update operations #357

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions kgforge/core/archetypes/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ def mapper(self) -> Type[Mapper]:
# [C]RUD.

def register(
self, data: Union[Resource, List[Resource]], schema_id: str = None
self, data: Union[Resource, List[Resource]], schema_id: str = None,
context: Optional[Context] = None
) -> None:
# Replace None by self._register_many to switch to optimized bulk registration.
run(
Expand All @@ -167,9 +168,10 @@ def register(
exception=RegistrationError,
monitored_status="_synchronized",
schema_id=schema_id,
context=context
)

def _register_many(self, resources: List[Resource], schema_id: str) -> None:
def _register_many(self, resources: List[Resource], schema_id: str, ) -> None:
# Bulk registration could be optimized by overriding this method in the specialization.
# POLICY Should reproduce self._register_one() and execution._run_one() behaviours.
not_supported()
Expand Down Expand Up @@ -323,7 +325,8 @@ def _download_one(
# CR[U]D.

def update(
self, data: Union[Resource, List[Resource]], schema_id: Optional[str]
self, data: Union[Resource, List[Resource]], schema_id: Optional[str],
context: Optional[Context] = None
) -> None:
# Replace None by self._update_many to switch to optimized bulk update.
run(
Expand All @@ -336,15 +339,21 @@ def update(
exception=UpdatingError,
monitored_status="_synchronized",
schema_id=schema_id,
context=context
)

def _update_many(self, resources: List[Resource], schema_id: Optional[str]) -> None:
def _update_many(
self, resources: List[Resource], schema_id: Optional[str],
context: Optional[Context] = None
) -> None:
# Bulk update could be optimized by overriding this method in the specialization.
# POLICY Should reproduce self._update_one() and execution._run_one() behaviours.
not_supported()

@abstractmethod
def _update_one(self, resource: Resource, schema_id: Optional[str]) -> None:
def _update_one(
self, resource: Resource, schema_id: Optional[str], context: Optional[Context] = None
) -> None:
# POLICY Should notify of failures with exception UpdatingError including a message.
# POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict().
# TODO This operation might be abstracted here when other stores will be implemented.
Expand Down
11 changes: 7 additions & 4 deletions kgforge/core/forge.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from rdflib import Graph

from kgforge.core import Resource
from kgforge.core.commons import Context
from kgforge.core.commons.files import load_file_as_byte
from kgforge.core.archetypes import Mapping, Model, Resolver, Store
from kgforge.core.commons.actions import LazyAction
Expand Down Expand Up @@ -701,7 +702,8 @@ def download(

# No @catch because the error handling is done by execution.run().
def register(
self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None
self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None,
context: Optional[Context] = None
) -> None:
"""
Store a resource or list of resources in the configured Store.
Expand All @@ -710,19 +712,20 @@ def register(
:param schema_id: an identifier of the schema the registered resources should conform to
"""
# self._store.mapper = self._store.mapper(self)
self._store.register(data, schema_id)
self._store.register(data, schema_id, context)

# No @catch because the error handling is done by execution.run().
def update(
self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None
self, data: Union[Resource, List[Resource]], schema_id: Optional[str] = None,
context: Optional[Context] = None
) -> None:
"""
Update a resource or a list of resources in the configured Store.

:param data: the resources to update
:param schema_id: an identifier of the schema the updated resources should conform to
"""
self._store.update(data, schema_id)
self._store.update(data, schema_id, context)

# No @catch because the error handling is done by execution.run().
def deprecate(self, data: Union[Resource, List[Resource]]) -> None:
Expand Down
47 changes: 33 additions & 14 deletions kgforge/specializations/stores/bluebrain_nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ def mapper(self) -> Optional[DictionaryMapper]:
return DictionaryMapper

def register(
self, data: Union[Resource, List[Resource]], schema_id: str = None
self, data: Union[Resource, List[Resource]], schema_id: str = None, context:
Optional[Context] = None
) -> None:
run(
self._register_one,
Expand All @@ -145,9 +146,12 @@ def register(
exception=RegistrationError,
monitored_status="_synchronized",
schema_id=schema_id,
context=context
)

def _register_many(self, resources: List[Resource], schema_id: str) -> None:
def _register_many(
self, resources: List[Resource], schema_id: str, context: Optional[Context] = None
) -> None:
def register_callback(task: Task):
result = task.result()
if isinstance(result.response, Exception):
Expand All @@ -161,11 +165,11 @@ def register_callback(task: Task):
else:
result.resource.id = result.response["@id"]
if not hasattr(result.resource, "context"):
context = self.model_context or self.context
context_value = context or self.model_context or self.context
result.resource.context = (
context.iri
if context.is_http_iri()
else context.document["@context"]
context_value.iri
if context_value.is_http_iri()
else context_value.document["@context"]
)
self.service.synchronize_resource(
result.resource,
Expand All @@ -184,22 +188,28 @@ def register_callback(task: Task):
execute_actions=True,
)
params_register = copy.deepcopy(self.service.params.get("register", {}))

context_value = context or self.model_context or self.context

self.service.batch_request(
verified,
BatchAction.CREATE,
register_callback,
RegistrationError,
schema_id=schema_id,
context=context_value,
params=params_register,
)

def _register_one(self, resource: Resource, schema_id: str) -> None:
context = self.model_context or self.context
def _register_one(
self, resource: Resource, schema_id: str, context: Optional[Context] = None
) -> None:
context_value = context or self.model_context or self.context
data = as_jsonld(
resource,
"compacted",
False,
model_context=context,
model_context=context_value,
metadata_context=None,
context_resolver=self.service.resolve_context
)
Expand Down Expand Up @@ -525,7 +535,10 @@ def _prepare_download_one(

# CR[U]D.

def update(self, data: Union[Resource, List[Resource]], schema_id: str) -> None:
def update(
self, data: Union[Resource, List[Resource]], schema_id: str,
context: Optional[Context] = None
) -> None:
run(
self._update_one,
self._update_many,
Expand All @@ -536,9 +549,12 @@ def update(self, data: Union[Resource, List[Resource]], schema_id: str) -> None:
exception=UpdatingError,
monitored_status="_synchronized",
schema_id=schema_id,
context=context
)

def _update_many(self, resources: List[Resource], schema_id: str) -> None:
def _update_many(
self, resources: List[Resource], schema_id: str, context: Optional[Context] = None
) -> None:
update_callback = self.service.default_callback(self._update_many.__name__)
verified = self.service.verify(
resources,
Expand All @@ -555,15 +571,18 @@ def _update_many(self, resources: List[Resource], schema_id: str) -> None:
update_callback,
UpdatingError,
params=params_update,
context=context
)

def _update_one(self, resource: Resource, schema_id: str) -> None:
context = self.model_context or self.context
def _update_one(
self, resource: Resource, schema_id: str, context: Optional[Context] = None
) -> None:
context_value = context or self.model_context or self.context
data = as_jsonld(
resource,
"compacted",
False,
model_context=context,
model_context=context_value,
metadata_context=None,
context_resolver=self.service.resolve_context
)
Expand Down
6 changes: 4 additions & 2 deletions kgforge/specializations/stores/demo_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def mapper(self) -> Type[Mapper]:

# [C]RUD.

def _register_one(self, resource: Resource, schema_id: str) -> None:
def _register_one(
self, resource: Resource, schema_id: str, context: Optional[Context] = None
) -> None:
data = as_json(resource, expanded=False, store_metadata=False, model_context=None,
metadata_context=None, context_resolver=None)
try:
Expand All @@ -75,7 +77,7 @@ def retrieve(self, id_: str, version: Optional[Union[int, str]],

# CR[U]D.

def _update_one(self, resource: Resource, schema_id: str) -> None:
def _update_one(self, resource: Resource, schema_id: str, context: Optional[Context] = None) -> None:
data = as_json(resource, expanded=False, store_metadata=False, model_context=None,
metadata_context=None, context_resolver=None)
try:
Expand Down
2 changes: 1 addition & 1 deletion tests/specializations/stores/test_demo_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def register(capsys, store, data, rc, err, msg):

@when("I register the resource. An exception is raised. The printed report does mention an error: 'Exception: exception raised'.")
def register_exception(monkeypatch, capsys, store, data):
def _register_one(_, x, schema_id): raise Exception("exception raised")
def _register_one(_, x, schema_id, context): raise Exception("exception raised")
monkeypatch.setattr("kgforge.specializations.stores.demo_store.DemoStore._register_one", _register_one)
store.register(data)
out = capsys.readouterr().out[:-1]
Expand Down
Loading