Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): upload dataset items to data service via http using client #642

Merged
merged 6 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lib/client/dmod/client/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.8.2'
__version__ = '0.8.3'
17 changes: 14 additions & 3 deletions python/lib/client/dmod/client/dmod_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .request_clients import DataServiceClient, JobClient
from .client_config import ClientConfig
from pathlib import Path
from typing import List, Optional, Type, Union
from typing import List, Literal, Optional, Type, Union

from functools import reduce
from .domain_detectors import ClientDataCollectionDomainDetector, ClientUniversalItemDomainDetector
Expand Down Expand Up @@ -174,7 +174,11 @@ def _get_transport_client(self, **kwargs) -> TransportLayerClient:
def client_config(self) -> ClientConfig:
return self._client_config

async def data_service_action(self, action: str, **kwargs) -> ResultIndicator:
async def data_service_action(
self,
action: Literal["create", "delete", "upload", "download", "list", "items", "state"],
**kwargs,
) -> ResultIndicator:
"""
Perform a supported data service action.

Expand Down Expand Up @@ -222,10 +226,17 @@ async def data_service_action(self, action: str, **kwargs) -> ResultIndicator:
@property
def data_service_client(self) -> DataServiceClient:
if self._data_service_client is None:
# NOTE: request service is bypassed if data service config has been specified and is active
if self.client_config.data_service is not None and self.client_config.data_service.active:
t_client_type = determine_transport_client_type(self.client_config.data_service.endpoint_protocol)
t_client = t_client_type(**self.client_config.data_service.dict())
self._data_service_client = DataServiceClient(t_client, self._auth_client)
# use http client where applicable when bypassing request service
import aiohttp
host = self.client_config.data_service.endpoint_host
port = self.client_config.data_service.endpoint_port
proto = "http" if t_client.ssl_context is None else "https"
http_client = aiohttp.ClientSession(f"{proto}://{host}:{port}")
self._data_service_client = DataServiceClient(t_client, self._auth_client, http_client=http_client)
else:
self._data_service_client = DataServiceClient(self._get_transport_client(), self._auth_client)
return self._data_service_client
Expand Down
71 changes: 69 additions & 2 deletions python/lib/client/dmod/client/request_clients.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from abc import ABC, abstractmethod
import aiohttp
import mimetypes
import ssl
from dmod.communication import (AuthClient, InvalidMessageResponse, ManagementAction, NGENRequest, NGENRequestResponse,
NgenCalibrationRequest, NgenCalibrationResponse, TransportLayerClient)
from dmod.communication.client import ConnectionContextClient
Expand Down Expand Up @@ -351,6 +354,58 @@ def uses_auth(self) -> bool:
pass


class HttpDataTransferAgent(DataTransferAgent):
def __init__(self, http_client: aiohttp.ClientSession, ssl_context: Optional[ssl.SSLContext] = None):
self.http_client = http_client
self.ssl_context = ssl_context

async def download_dataset_item(self, dataset_name: str, item_name: str, dest: Path):
raise NotImplemented

async def upload_dataset_item(
self, dataset_name: str, item_name: str, source: Path
) -> DatasetManagementResponse:
if not source.is_file():
return DatasetManagementResponse(
success=False,
reason="Dataset Upload File Not Found",
message=f"File {source!s} does not exist",
)

content_type, _ = mimetypes.guess_type(source)
form_data = aiohttp.FormData()
with source.open() as fp:
form_data.add_field(
"obj",
fp,
filename=item_name,
content_type=content_type,
)
response = await self.http_client.post(
"/add_object",
data=form_data,
params={
"dataset_name": dataset_name,
"object_name": item_name,
},
ssl=self.ssl_context,
)
if not response.ok:
return DatasetManagementResponse(
success=False,
reason="Dataset Item Upload Failed",
message=f"Status Code: {response.status}",
)

return DatasetManagementResponse(success=True, message="Dataset Item Upload Successful", reason="Success")

@property
def uses_auth(self) -> bool:
# NOTE: At this point in time, the dataset service's rest endpoints do
# not support auth
return False
aaraney marked this conversation as resolved.
Show resolved Hide resolved


class SimpleDataTransferAgent(DataTransferAgent):

def __init__(self, transport_client: TransportLayerClient, auth_client: Optional[AuthClient] = None, *args, **kwargs):
Expand Down Expand Up @@ -559,10 +614,18 @@ def extract_dataset_names(cls, response: DatasetManagementResponse) -> List[str]
else:
return response.data.datasets

def __init__(self, transport_client: TransportLayerClient, auth_client: Optional[AuthClient] = None, *args, **kwargs):
def __init__(
self,
transport_client: TransportLayerClient,
auth_client: Optional[AuthClient] = None,
*args,
http_client: Optional[aiohttp.ClientSession] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason this was ordered this way with respect to *args and **kwargs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, great question! This forces http_client to be provided as a keyword argument. If there were to be a subclass of DataServiceClient that added arguments to it's __init__ method, adding the http_client as a keyword or positional argument is a breaking change. Subclasses that add a positional / keyword argument to their __init__ method could be now inadvertently mis-initialized.

**kwargs,
):
super().__init__(*args, **kwargs)
self._transport_client: TransportLayerClient = transport_client
self._auth_client: Optional[AuthClient] = auth_client
self._http_client: Optional[aiohttp.ClientSession] = http_client

async def _process_request(self, request: DatasetManagementMessage) -> DatasetManagementResponse:
"""
Expand Down Expand Up @@ -918,7 +981,11 @@ async def upload_to_dataset(self, dataset_name: str, paths: Union[Path, List[Pat
An indicator of whether uploading was successful
"""
# TODO: see if we can perhaps have multiple agents and thread pool if multiplexing is available
tx_agent = SimpleDataTransferAgent(transport_client=self._transport_client, auth_client=self._auth_client)
# NOTE: prefer http client if available
if self._http_client is not None:
tx_agent = HttpDataTransferAgent(http_client=self._http_client, ssl_context=self._transport_client.ssl_context)
else:
tx_agent = SimpleDataTransferAgent(transport_client=self._transport_client, auth_client=self._auth_client)
if isinstance(paths, Path):
paths = [paths]

Expand Down
2 changes: 1 addition & 1 deletion python/lib/client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
license='',
include_package_data=True,
#install_requires=['websockets', 'jsonschema'],vi
install_requires=['dmod-core>=0.16.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.19.1',
install_requires=['dmod-core>=0.16.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.19.2',
'dmod-externalrequests>=0.6.0', 'dmod-modeldata>=0.12.0'],
packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test'])
)
2 changes: 1 addition & 1 deletion python/lib/communication/dmod/communication/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.19.1'
__version__ = '0.19.2'
12 changes: 12 additions & 0 deletions python/lib/communication/dmod/communication/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ async def async_recv(self) -> str:
"""
pass

@property
def ssl_context(self) -> Optional[ssl.SSLContext]:
"""
Client ssl context, if present.

Returns
-------
Optional[ssl.SSLContext]
Configured ssl context
"""
return self._client_ssl_context


class AuthClient:
"""
Expand Down
Loading