From 8562731fbb2b5b8b3ec91fc73413741615507d0e Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 14 Nov 2024 18:48:03 -0500 Subject: [PATCH] Update Fivetran client and tests --- .../dagster_fivetran/resources.py | 60 ++++++------------- .../dagster_fivetran/translator.py | 15 +++++ .../experimental/conftest.py | 36 +++++++++-- .../experimental/test_resources.py | 44 +++++++++++--- 4 files changed, 101 insertions(+), 54 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 5e9adc22b9436..0433a7209049b 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -33,6 +33,7 @@ from dagster_fivetran.translator import ( DagsterFivetranTranslator, FivetranConnector, + FivetranConnectorScheduleType, FivetranDestination, FivetranSchemaConfig, FivetranWorkspaceData, @@ -595,43 +596,28 @@ def get_groups(self) -> Mapping[str, Any]: """ return self._make_request("GET", "groups") - # TODO: update - def update_connector( - self, connector_id: str, properties: Optional[Mapping[str, Any]] = None - ) -> Mapping[str, Any]: - """Updates properties of a Fivetran Connector. - - Args: - connector_id (str): The Fivetran Connector ID. You can retrieve this value from the - "Setup" tab of a given connector in the Fivetran UI. - properties (Dict[str, Any]): The properties to be updated. For a comprehensive list of - properties, see the [Fivetran docs](https://fivetran.com/docs/rest-api/connectors#modifyaconnector). - - Returns: - Dict[str, Any]: Parsed json data representing the API response. - """ - return self._make_connector_request( - method="PATCH", endpoint=connector_id, data=json.dumps(properties) - ) - - # TODO: update - def update_schedule_type( - self, connector_id: str, schedule_type: Optional[str] = None + def update_schedule_type_for_connector( + self, connector_id: str, schedule_type: str ) -> Mapping[str, Any]: """Updates the schedule type property of the connector to either "auto" or "manual". Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. - schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to + schedule_type (str): Either "auto" (to turn the schedule on) or "manual" (to turn it off). Returns: Dict[str, Any]: Parsed json data representing the API response. """ - if schedule_type not in ["auto", "manual"]: - check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'") - return self.update_connector(connector_id, properties={"schedule_type": schedule_type}) + if not FivetranConnectorScheduleType.has_value(schedule_type): + check.failed( + f"The schedule_type for a connector must be in {FivetranConnectorScheduleType.values()}: " + f"got '{schedule_type}'" + ) + return self._make_connector_request( + method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type}) + ) def start_sync(self, connector_id: str) -> None: """Initiates a sync of a Fivetran connector. @@ -640,9 +626,6 @@ def start_sync(self, connector_id: str) -> None: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. - Returns: - Dict[str, Any]: Parsed json data representing the connector details API response after - the sync is started. """ request_fn = partial( self._make_connector_request, method="POST", endpoint=f"{connector_id}/force" @@ -659,10 +642,6 @@ def start_resync( "Setup" tab of a given connector in the Fivetran UI. resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 - - Returns: - Dict[str, Any]: Parsed json data representing the connector details API response after - the resync is started. """ request_fn = partial( self._make_connector_request, @@ -679,7 +658,7 @@ def start_resync( def _start_sync(self, request_fn: Callable, connector_id: str) -> None: if self.disable_schedule_on_trigger: self._log.info("Disabling Fivetran sync schedule.") - self.update_schedule_type(connector_id, "manual") + self.update_schedule_type_for_connector(connector_id, "manual") connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) @@ -712,7 +691,7 @@ def poll_sync( initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync (successful or otherwise) for this connector, prior to running this method. poll_interval (float): The time (in seconds) that will be waited between successive polls. - poll_timeout (float): The maximum time that will waited before this operation is timed + poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. Returns: @@ -744,19 +723,16 @@ def poll_sync( # Sleep for the configured time interval before polling again. time.sleep(poll_interval) - raw_connector_details = self.get_connector_details(connector_id) - connector = FivetranConnector.from_connector_details( - connector_details=self.get_connector_details(connector_id) - ) + post_raw_connector_details = self.get_connector_details(connector_id) if not curr_last_sync_succeeded: raise Failure( f"Sync for connector '{connector_id}' failed!", metadata={ - "connector_details": MetadataValue.json(raw_connector_details), + "connector_details": MetadataValue.json(post_raw_connector_details), "log_url": MetadataValue.url(connector.url), }, ) - return raw_connector_details + return post_raw_connector_details def sync_and_poll( self, @@ -770,7 +746,7 @@ def sync_and_poll( connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. poll_interval (float): The time (in seconds) that will be waited between successive polls. - poll_timeout (float): The maximum time that will waited before this operation is timed + poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. Returns: diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 1a51805f59a18..1c92fe760c8fc 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -25,6 +25,21 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] +class FivetranConnectorScheduleType(Enum): + """Enum representing each schedule type for a connector in Fivetran's ontology.""" + + AUTO = "auto" + MANUAL = "manual" + + @classmethod + def has_value(cls, value) -> bool: + return value in cls._value2member_map_ + + @classmethod + def values(cls) -> Sequence[str]: + return list(cls._value2member_map_.keys()) + + class FivetranConnectorSetupStateType(Enum): """Enum representing each setup state for a connector in Fivetran's ontology.""" diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 82b44013d6a67..2c72c311c6efc 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -61,7 +61,7 @@ }, "config": {"property1": {}, "property2": {}}, "daily_sync_time": "14:00", - "succeeded_at": "2024-12-01T15:43:29.013729Z", + "succeeded_at": "2024-12-01T15:45:29.013729Z", "sync_frequency": 360, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -75,7 +75,7 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2024-12-01T15:43:29.013729Z", + "created_at": "2024-12-01T15:41:29.013729Z", "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "string", "proxy_agent_id": "string", @@ -181,7 +181,7 @@ "rescheduled_for": "2024-12-01T15:43:29.013729Z", }, "daily_sync_time": "14:00", - "succeeded_at": "2024-03-17T12:31:40.870504Z", + "succeeded_at": "2024-12-01T15:45:29.013729Z", "sync_frequency": 1440, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -195,8 +195,8 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2023-12-01T15:43:29.013729Z", - "failed_at": "2024-04-01T18:13:25.043659Z", + "created_at": "2024-12-01T15:41:29.013729Z", + "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "private_link_id", "proxy_agent_id": "proxy_agent_id", "networking_method": "Directly", @@ -380,6 +380,8 @@ }, } +SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} + TEST_ACCOUNT_ID = "test_account_id" TEST_API_KEY = "test_api_key" TEST_API_SECRET = "test_api_secret" @@ -453,4 +455,28 @@ def all_api_mocks_fixture( json=SAMPLE_CONNECTOR_DETAILS, status=200, ) + fetch_workspace_data_api_mocks.add( + method=responses.PATCH, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=SAMPLE_CONNECTOR_DETAILS, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 4a7b9db5298c2..457ecc6070a4e 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,5 +1,7 @@ import responses +from dagster._vendored.dateutil import parser from dagster_fivetran import FivetranWorkspace +from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( TEST_ACCOUNT_ID, @@ -19,17 +21,45 @@ def test_basic_resource_request( ) client = resource.get_client() - client.get_connector_details(connector_id=connector_id) client.get_connectors_for_group(group_id=group_id) client.get_destination_details(destination_id=destination_id) client.get_groups() client.get_schema_config_for_connector(connector_id=connector_id) - assert len(all_api_mocks.calls) == 5 - + assert len(all_api_mocks.calls) == 4 assert "Basic" in all_api_mocks.calls[0].request.headers["Authorization"] + assert group_id in all_api_mocks.calls[0].request.url + assert destination_id in all_api_mocks.calls[1].request.url + assert "groups" in all_api_mocks.calls[2].request.url + assert f"{connector_id}/schemas" in all_api_mocks.calls[3].request.url + + # reset calls + all_api_mocks.calls.reset() + client.get_connector_details(connector_id=connector_id) + client.update_schedule_type_for_connector(connector_id=connector_id, schedule_type="auto") + + assert len(all_api_mocks.calls) == 2 assert connector_id in all_api_mocks.calls[0].request.url - assert group_id in all_api_mocks.calls[1].request.url - assert destination_id in all_api_mocks.calls[2].request.url - assert "groups" in all_api_mocks.calls[3].request.url - assert f"{connector_id}/schemas" in all_api_mocks.calls[4].request.url + assert connector_id in all_api_mocks.calls[1].request.url + assert all_api_mocks.calls[1].request.method == "PATCH" + + all_api_mocks.calls.reset() + client.start_sync(connector_id=connector_id) + assert len(all_api_mocks.calls) == 4 + assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url + + all_api_mocks.calls.reset() + client.start_resync(connector_id=connector_id, resync_parameters=None) + assert len(all_api_mocks.calls) == 4 + assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url + + all_api_mocks.calls.reset() + client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) + assert len(all_api_mocks.calls) == 4 + assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url + + all_api_mocks.calls.reset() + client.poll_sync( + connector_id=connector_id, initial_last_sync_completion=parser.parse(MIN_TIME_STR) + ) + assert len(all_api_mocks.calls) == 2