Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9/n][dagster-fivetran] Implement base sync methods in FivetranClient #25911

Open
wants to merge 16 commits into
base: master
Choose a base branch
from

Conversation

maximearmstrong
Copy link
Contributor

@maximearmstrong maximearmstrong commented Nov 13, 2024

Summary & Motivation

This PR reworks legacy sync methods and implements them in the FivetranClient:

  • update_schedule_type_for_connector is added based on legacy update_schedule_type and update_connector
  • _start_sync is based on the legacy start_sync and start_resync
    • it avoids code duplication
    • start_resync will be added in a subsequent PR
    • the order of some steps has been reversed - we verify that a connector is syncable before updating the state of its schedule
  • start_sync is added based on legacy start_sync

Tests mock the request API calls and make sure that all calls are made.

How I Tested These Changes

Additional unit tests with BK

Comment on lines 618 to 634
def update_schedule_type(
self, connector_id: str, schedule_type: Optional[str] = None
) -> Mapping[str, Any]:
"""Updates the schedule type property of the connector to either "auto" or "manual".

Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to
turn it off).

Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
if schedule_type not in ["auto", "manual"]:
check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'")
return self.update_connector(connector_id, properties={"schedule_type": schedule_type})
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schedule_type parameter is marked as Optional[str] but the validation logic will raise an error if None is passed. Consider either removing the Optional type hint or adding a None check before the schedule_type validation to align the type hints with the actual behavior.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good one.

@maximearmstrong maximearmstrong marked this pull request as ready for review November 15, 2024 14:46
@maximearmstrong maximearmstrong self-assigned this Nov 15, 2024
Base automatically changed from maxime/rework-fivetran-8 to master November 15, 2024 14:53
Comment on lines 34 to 40
@classmethod
def has_value(cls, value) -> bool:
return value in cls._value2member_map_

@classmethod
def values(cls) -> Sequence[str]:
return list(cls._value2member_map_.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

highly surprised you need to add these

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible to list and use __contains__ with the symbolic names, but no functions support _value2member_map_ in Enum. This is to avoid calling _value2member_map_ in the resource.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - so this is entirely due to the difference in casing between key and value? In that case maybe we should just make them match and add a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated FivetranConnectorScheduleType to subclass both str and Enum in 8e607df. Callsites have been updated too.

)
self._start_sync(request_fn=request_fn, connector_id=connector_id)

def _start_sync(self, request_fn: Callable, connector_id: str) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we be more explicit about the callable signature pls

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in d341d4b

Copy link
Contributor

@dpeng817 dpeng817 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding this PR hard to review;

  • a ton of methods are getting added, but it seems like only the base layer (start_sync, start_resync, poll_sync) are under test, and the assertion is kinda bare. See additional comments regarding testing
  • I don't know what the actual public API surface area is here that we're introducing, or what it's going to look like when it's used. For example, all this stuff about the previous_sync_completed_at: I was at first very concerned that this was a pretty awkward surface area, but then I inferred (and I could be wrong) that this isn't actually exposed to users. But I did that through context clues.
  • In general, I don't really understand what's coming from legacy code and what's new. I think some comments elucidating that could be quite useful.

More broadly, this is likely our last chance to make significant changes to these APIs until 2.0. So I think we should try our best to fix any changes in the APIs now rather than later.

Comment on lines 48 to 69
all_api_mocks.calls.reset()
client.start_sync(connector_id=connector_id)
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url

# resync calls
all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters=None)
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url

# resync calls with parameters
all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]})
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url

# poll calls
all_api_mocks.calls.reset()
client.poll_sync(
connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding these tests to be pretty insufficient, this is OK maybe for the underlying start_xsync fxns, but sync_and_poll, and resync_and_poll have pretty complicated logic that I think is worth testing the state changes on.

Copy link
Contributor Author

@maximearmstrong maximearmstrong Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add tests for sync_and_poll and resync_and_poll - the initial goal was to test the full behavior with the FivetranWorkspace.sync_and_poll method that will be introduced in PR 11 of this stack.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to prefer PRs to be relatively atomic in their additions + tests - so I think yea makes sense to add some tests here as well.

For what it's worth, with only 9/n and 10/n open, there was no way for me to know that context. I think it's useful to try to think about things from that perspective when writing up PR description, adding comments etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's totally fair - I will update the tests here and make sure to apply this feedback in the future as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good! Thanks

@maximearmstrong
Copy link
Contributor Author

a ton of methods are getting added, but it seems like only the base layer (start_sync, start_resync, poll_sync) are under test, and the assertion is kinda bare. See additional comments regarding testing

@dpeng817 The focus of this PR is mainly to take what we had in the legacy code, implement it in the new FivetranClient, and rework the methods to:

  • make the code cleaner
  • refresh the behavior where errors were introduced, example, previously the schedule state of the connector was updated before we confirmed that the connector was syncable.

I don't know what the actual public API surface area is here that we're introducing, or what it's going to look like when it's used. For example, all this stuff about the previous_sync_completed_at: I was at first very concerned that this was a pretty awkward surface area, but then I inferred (and I could be wrong) that this isn't actually exposed to users. But I did that through context clues.

In most cases, user will call fivetran_workspace.sync_and_poll() (PR 11 in this stack) which sync and poll the connector, and materialize the assets.

In general, I don't really understand what's coming from legacy code and what's new. I think some comments elucidating that could be quite useful.

I can add comments to clarify where changes are made.

More broadly, this is likely our last chance to make significant changes to these APIs until 2.0. So I think we should try our best to fix any changes in the APIs now rather than later.

The first effort with this stack of PRs is the reproduce the current behavior but using the new API pattern. Once we have that, the goal would be to work with a design partner that is already using dagster-fivetran, make the transition from the previous pattern to the new one, then improve the behavior with the design partner.

"""
schedule_types = {s for s in FivetranConnectorScheduleType}
if schedule_type not in schedule_types:
check.failed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; include a reference to the connector in question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c236554

disable_schedule_on_trigger: bool = Field(
default=True,
description=(
"Specifies if you would like any connector that is sync'd using this " # TODO: update description
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; remove todo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 5a8ea97

Copy link
Contributor

@dpeng817 dpeng817 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the respin here! I found it much easier to interpret now that I also have all the context.

@@ -9,6 +10,8 @@

from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table

MIN_TIME_STR = "0001-01-01 00:00:00+00"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't look like this gets used anywhere - can we delete this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch - I forgot to move it to the PR for the poll method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 7dde6fb

def is_paused(self) -> bool:
return self.paused

def assert_syncable(self) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe validate_syncable? Just because assert is a python keyword, and we aren't using it here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 8e3c5a9

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants