-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[12/n][dagster-airbyte] Implement sync_and_poll method in AirbyteCloudClient #26431
Conversation
01c67e0
to
6bcbaf3
Compare
4b9b2c1
to
6585306
Compare
6bcbaf3
to
5b2f698
Compare
6585306
to
e8d86fc
Compare
5b2f698
to
48f6c5e
Compare
e8d86fc
to
15f5b2b
Compare
|
||
|
||
@deprecated(breaking_version="1.10", additional_warn_text="Use `AirbyteJobStatusType` instead.") | ||
class AirbyteState: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This enum is renamed to be AirbyteJobStatusType
, which reflects Airbyte's ontology and our naming convention in other integrations.
Kept for backcompat and to be removed in 1.10 because it was exposed in __init__.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 1.10 the right thing to say here; since dagster-airbyte is on a pre 1.0 release ?
@@ -27,7 +27,9 @@ | |||
TEST_STREAM_NAME = "test_stream" | |||
TEST_SELECTED = True | |||
TEST_JSON_SCHEMA = {} | |||
TEST_JOB_ID = "3fa85f64-5717-4562-b3fc-2c963f66afa6" | |||
TEST_JOB_ID = 12345 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixes an error in Airbyte's example in the API documentation here. The json example uses a UUID as the Job ID, but the documentation mentions that the ID is an int.
@@ -50,7 +50,12 @@ def test_trigger_connection_fail() -> None: | |||
@responses.activate | |||
@pytest.mark.parametrize( | |||
"state", | |||
[AirbyteState.SUCCEEDED, AirbyteState.CANCELLED, AirbyteState.ERROR, "unrecognized"], | |||
[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing is changed in the legacy tests except the name of the enum
) | ||
|
||
time.sleep(poll_interval) | ||
poll_job_details = self.get_job_details(job.id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; why make the caller initialize the job from the details. Feels like we should just return it directly
if status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: | ||
base_api_mocks.add( | ||
method=responses.DELETE, | ||
url=test_job_api_url, | ||
status=200, | ||
json=get_job_details_sample(status=AirbyteJobStatusType.CANCELLED), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confused by this - why is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have another test dedicated to cancel_on_termination
, so let's remove this part of the test when testing statuses - responses.RequestsMock
yells if a mocked request is not called in a test, so it must be added only when the job is cancelled.
if status in [AirbyteJobStatusType.ERROR, AirbyteJobStatusType.FAILED]: | ||
with pytest.raises(Failure, match="Job failed"): | ||
client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) | ||
|
||
elif status == AirbyteJobStatusType.CANCELLED: | ||
with pytest.raises(Failure, match="Job was cancelled"): | ||
client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) | ||
|
||
elif status == TEST_UNRECOGNIZED_AIRBYTE_JOB_STATUS_TYPE: | ||
with pytest.raises(Failure, match="unexpected state"): | ||
client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) | ||
assert_rest_api_call( | ||
call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE | ||
) | ||
|
||
else: | ||
result = client.sync_and_poll(connection_id=TEST_CONNECTION_ID, poll_interval=0) | ||
assert result == AirbyteOutput( | ||
job_details=get_job_details_sample(AirbyteJobStatusType.SUCCEEDED), | ||
connection_details=SAMPLE_CONNECTION_DETAILS, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit; this could all be part of the parameterization. I actually had to do something similar recently, and I found an optional_pytest_raise
to be really useful here: https://github.com/dagster-io/dagster/pull/26545/files#diff-bb4b77564796719ee9a5eaf83ecd67f54f4f16b8b7724f9348d1a38dac76800d
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 5df711b
if cancel_on_termination: | ||
assert_rest_api_call( | ||
call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.DELETE | ||
) | ||
else: | ||
# If we don't cancel on termination, the last call will be a call to fetch the job details | ||
assert_rest_api_call( | ||
call=base_api_mocks.calls[-1], endpoint=test_job_endpoint, method=responses.GET | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really don't love these complex ternaries in test body IMO. Just really hard to read, I think this should just be a part of the parameterization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good call - updated in 5df711b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some feedback / comments but nothing review blocking
48f6c5e
to
438b2bc
Compare
1b64ddb
to
860d858
Compare
438b2bc
to
2255aff
Compare
860d858
to
c7fbd62
Compare
2255aff
to
e4f7ec5
Compare
c7fbd62
to
5caf85f
Compare
e4f7ec5
to
0f39290
Compare
5caf85f
to
f888a06
Compare
0f39290
to
159a035
Compare
f888a06
to
90848ca
Compare
159a035
to
1ae39d0
Compare
90848ca
to
94bb6cc
Compare
1ae39d0
to
797ff67
Compare
94bb6cc
to
7009a22
Compare
7009a22
to
fa56484
Compare
Summary & Motivation
Implement full sync and poll process in
AirbyteCloudClient
. This will be used in a subsequent PR inAirbyteCloudWorkspace.sync_and_poll
to materialize Airbyte assets.How I Tested These Changes
Additional tests with BK