From 332d3ec5bfa3e33a528a9ba5323540ee767e934f Mon Sep 17 00:00:00 2001 From: Luishfs Date: Thu, 14 Nov 2024 16:38:06 -0300 Subject: [PATCH] source-facebook-marketing: removing old tests --- estuary-cdk/common.Dockerfile | 2 +- source-facebook-marketing/test.flow.yaml | 100 ++-- source-facebook-marketing/tests/conftest.py | 58 --- source-facebook-marketing/tests/helpers.py | 5 - .../snapshots__discover__capture.stdout.json | 6 +- .../snapshots__spec__capture.stdout.json | 13 +- source-facebook-marketing/tests/test_api.py | 141 ------ .../tests/test_async_job.py | 454 ----------------- .../tests/test_async_job_manager.py | 176 ------- .../tests/test_base_insight_streams.py | 459 ------------------ .../tests/test_base_streams.py | 149 ------ .../tests/test_client.py | 253 ---------- .../tests/test_deep_merge.py | 42 -- .../tests/test_source.py | 153 ------ .../tests/test_streams.py | 110 ----- source-facebook-marketing/tests/test_utils.py | 51 -- source-facebook-marketing/tests/utils.py | 19 - 17 files changed, 61 insertions(+), 2130 deletions(-) delete mode 100644 source-facebook-marketing/tests/conftest.py delete mode 100644 source-facebook-marketing/tests/helpers.py delete mode 100644 source-facebook-marketing/tests/test_api.py delete mode 100644 source-facebook-marketing/tests/test_async_job.py delete mode 100644 source-facebook-marketing/tests/test_async_job_manager.py delete mode 100644 source-facebook-marketing/tests/test_base_insight_streams.py delete mode 100644 source-facebook-marketing/tests/test_base_streams.py delete mode 100644 source-facebook-marketing/tests/test_client.py delete mode 100644 source-facebook-marketing/tests/test_deep_merge.py delete mode 100644 source-facebook-marketing/tests/test_source.py delete mode 100644 source-facebook-marketing/tests/test_streams.py delete mode 100644 source-facebook-marketing/tests/test_utils.py delete mode 100644 source-facebook-marketing/tests/utils.py diff --git a/estuary-cdk/common.Dockerfile b/estuary-cdk/common.Dockerfile index f611ecad2d..49cd1270fa 100644 --- a/estuary-cdk/common.Dockerfile +++ b/estuary-cdk/common.Dockerfile @@ -1,5 +1,5 @@ # syntax=docker/dockerfile:1 -FROM python:3.12-slim as base +FROM python:3.11 as base FROM base as builder ARG CONNECTOR_NAME diff --git a/source-facebook-marketing/test.flow.yaml b/source-facebook-marketing/test.flow.yaml index 08f3be5ff5..9186a7bf84 100644 --- a/source-facebook-marketing/test.flow.yaml +++ b/source-facebook-marketing/test.flow.yaml @@ -27,62 +27,62 @@ captures: cursorField: - updated_time target: acmeCo/ads - # - resource: - # stream: ad_creatives - # syncMode: full_refresh - # target: acmeCo/ad_creatives - # - resource: - # stream: ads_insights - # syncMode: incremental - # cursorField: - # - date_start - # target: acmeCo/ads_insights - # - resource: - # stream: ads_insights_age_and_gender - # syncMode: incremental - # cursorField: - # - date_start - # target: acmeCo/ads_insights_age_and_gender - # - resource: - # stream: ads_insights_country - # syncMode: incremental - # cursorField: - # - date_start - # target: acmeCo/ads_insights_country - # - resource: - # stream: ads_insights_region - # syncMode: incremental - # cursorField: - # - date_start - # target: acmeCo/ads_insights_region - # - resource: - # stream: ads_insights_dma - # syncMode: incremental - # cursorField: - # - date_start - # target: acmeCo/ads_insights_dma - # - resource: - # stream: ads_insights_platform_and_device - # syncMode: incremental - # cursorField: - # - date_start - # target: acmeCo/ads_insights_platform_and_device - # - resource: - # stream: ads_insights_action_type - # syncMode: incremental - # cursorField: - # - date_start - # target: acmeCo/ads_insights_action_type + - resource: + stream: ad_creatives + syncMode: full_refresh + target: acmeCo/ad_creatives + - resource: + stream: ads_insights + syncMode: incremental + cursorField: + - date_start + target: acmeCo/ads_insights + - resource: + stream: ads_insights_age_and_gender + syncMode: incremental + cursorField: + - date_start + target: acmeCo/ads_insights_age_and_gender + - resource: + stream: ads_insights_country + syncMode: incremental + cursorField: + - date_start + target: acmeCo/ads_insights_country + - resource: + stream: ads_insights_region + syncMode: incremental + cursorField: + - date_start + target: acmeCo/ads_insights_region + - resource: + stream: ads_insights_dma + syncMode: incremental + cursorField: + - date_start + target: acmeCo/ads_insights_dma + - resource: + stream: ads_insights_platform_and_device + syncMode: incremental + cursorField: + - date_start + target: acmeCo/ads_insights_platform_and_device + - resource: + stream: ads_insights_action_type + syncMode: incremental + cursorField: + - date_start + target: acmeCo/ads_insights_action_type - resource: stream: campaigns syncMode: incremental cursorField: - updated_time target: acmeCo/campaigns - # - resource: - # stream: custom_conversions - # syncMode: full_refresh - # target: acmeCo/custom_conversions + - resource: + stream: custom_conversions + syncMode: full_refresh + target: acmeCo/custom_conversions - resource: stream: images syncMode: incremental diff --git a/source-facebook-marketing/tests/conftest.py b/source-facebook-marketing/tests/conftest.py deleted file mode 100644 index 3d9f3087c7..0000000000 --- a/source-facebook-marketing/tests/conftest.py +++ /dev/null @@ -1,58 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import pendulum -from facebook_business import FacebookAdsApi, FacebookSession -from pytest import fixture -from source_facebook_marketing.api import API - -FB_API_VERSION = FacebookAdsApi.API_VERSION - - -@fixture(autouse=True) -def time_sleep_mock(mocker): - time_mock = mocker.patch("time.sleep") - yield time_mock - - -@fixture(scope="session", name="account_id") -def account_id_fixture(): - return "unknown_account" - - -@fixture(scope="session", name="some_config") -def some_config_fixture(account_id): - return {"start_date": "2021-01-23T00:00:00Z", "account_ids": [f"{account_id}"], "access_token": "unknown_token"} - - -@fixture(autouse=True) -def mock_default_sleep_interval(mocker): - mocker.patch("source_facebook_marketing.streams.common.DEFAULT_SLEEP_INTERVAL", return_value=pendulum.duration(seconds=5)) - - -@fixture(name="fb_account_response") -def fb_account_response_fixture(account_id): - return { - "json": { - "data": [ - { - "account_id": account_id, - "id": f"act_{account_id}", - } - ], - "paging": {"cursors": {"before": "MjM4NDYzMDYyMTcyNTAwNzEZD", "after": "MjM4NDYzMDYyMTcyNTAwNzEZD"}}, - }, - "status_code": 200, - } - - -@fixture(name="api") -def api_fixture(some_config, requests_mock, fb_account_response): - api = API(access_token=some_config["access_token"]) - - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response]) - requests_mock.register_uri( - "GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_ids'][0]}/", [fb_account_response] - ) - return api diff --git a/source-facebook-marketing/tests/helpers.py b/source-facebook-marketing/tests/helpers.py deleted file mode 100644 index db48914fde..0000000000 --- a/source-facebook-marketing/tests/helpers.py +++ /dev/null @@ -1,5 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -# diff --git a/source-facebook-marketing/tests/snapshots/snapshots__discover__capture.stdout.json b/source-facebook-marketing/tests/snapshots/snapshots__discover__capture.stdout.json index acfcc9ce13..1a22c75ec8 100644 --- a/source-facebook-marketing/tests/snapshots/snapshots__discover__capture.stdout.json +++ b/source-facebook-marketing/tests/snapshots/snapshots__discover__capture.stdout.json @@ -38852,11 +38852,7 @@ ] }, "key": [ - "/object_id", - "/actor_id", - "/application_id", - "/event_time", - "/event_type" + "/actor_id" ] }, { diff --git a/source-facebook-marketing/tests/snapshots/snapshots__spec__capture.stdout.json b/source-facebook-marketing/tests/snapshots/snapshots__spec__capture.stdout.json index 4346484d51..f90b529a1c 100644 --- a/source-facebook-marketing/tests/snapshots/snapshots__spec__capture.stdout.json +++ b/source-facebook-marketing/tests/snapshots/snapshots__spec__capture.stdout.json @@ -5,14 +5,19 @@ "title": "Source Facebook Marketing", "type": "object", "properties": { - "account_id": { - "title": "Account ID", + "account_ids": { + "title": "Account IDs", "description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API. Open your Meta Ads Manager. The Ad account ID number is in the account dropdown menu or in your browser's address bar. See the docs for more information.", "order": 0, "examples": [ "111111111111111" ], - "type": "string" + "type": "array", + "items": { + "type": "string", + "pattern": "^[0-9]+$" + }, + "uniqueItems": true }, "start_date": { "title": "Start Date", @@ -406,7 +411,7 @@ } }, "required": [ - "account_id", + "account_ids", "start_date", "credentials" ] diff --git a/source-facebook-marketing/tests/test_api.py b/source-facebook-marketing/tests/test_api.py deleted file mode 100644 index 29b2ccbfaa..0000000000 --- a/source-facebook-marketing/tests/test_api.py +++ /dev/null @@ -1,141 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import pendulum -import pytest -import source_facebook_marketing -from facebook_business import FacebookAdsApi, FacebookSession -from facebook_business.adobjects.adaccount import AdAccount - -FB_API_VERSION = FacebookAdsApi.API_VERSION - - -class TestMyFacebookAdsApi: - @pytest.fixture - def fb_api(self): - return source_facebook_marketing.api.MyFacebookAdsApi.init(access_token="foo", crash_log=False) - - @pytest.mark.parametrize( - "max_rate,max_pause_interval,min_pause_interval,usage,pause_interval,expected_pause_interval", - [ - ( - 95, - pendulum.duration(minutes=5), - pendulum.duration(minutes=1), - 96, - pendulum.duration(minutes=6), - pendulum.duration(minutes=6), - ), - ( - 95, - pendulum.duration(minutes=5), - pendulum.duration(minutes=2), - 96, - pendulum.duration(minutes=1), - pendulum.duration(minutes=5), - ), - ( - 95, - pendulum.duration(minutes=5), - pendulum.duration(minutes=1), - 93, - pendulum.duration(minutes=4), - pendulum.duration(minutes=4), - ), - ], - ) - def test__compute_pause_interval( - self, mocker, fb_api, max_rate, max_pause_interval, min_pause_interval, usage, pause_interval, expected_pause_interval - ): - mocker.patch.object(fb_api, "MAX_RATE", max_rate) - mocker.patch.object(fb_api, "MAX_PAUSE_INTERVAL", max_pause_interval) - mocker.patch.object(fb_api, "MIN_PAUSE_INTERVAL", min_pause_interval) - computed_pause_interval = fb_api._compute_pause_interval(usage, pause_interval) - assert computed_pause_interval == expected_pause_interval - - @pytest.mark.parametrize( - "min_pause_interval,usages_pause_intervals,expected_output", - [ - ( - pendulum.duration(minutes=1), # min_pause_interval - [(5, pendulum.duration(minutes=6)), (7, pendulum.duration(minutes=5))], # usages_pause_intervals - (7, pendulum.duration(minutes=6)), # expected_output - ), - ( - pendulum.duration(minutes=10), # min_pause_interval - [(5, pendulum.duration(minutes=6)), (7, pendulum.duration(minutes=5))], # usages_pause_intervals - (7, pendulum.duration(minutes=10)), # expected_output - ), - ( - pendulum.duration(minutes=10), # min_pause_interval - [ # usages_pause_intervals - (9, pendulum.duration(minutes=6)), - ], - (9, pendulum.duration(minutes=10)), # expected_output - ), - ( - pendulum.duration(minutes=10), # min_pause_interval - [ # usages_pause_intervals - (-1, pendulum.duration(minutes=1)), - (-2, pendulum.duration(minutes=10)), - (-3, pendulum.duration(minutes=100)), - ], - (0, pendulum.duration(minutes=100)), # expected_output - ), - ], - ) - def test__get_max_usage_pause_interval_from_batch(self, mocker, fb_api, min_pause_interval, usages_pause_intervals, expected_output): - records = [ - {"headers": [{"name": "USAGE", "value": usage}, {"name": "PAUSE_INTERVAL", "value": pause_interval}]} - for usage, pause_interval in usages_pause_intervals - ] - - mock_parse_call_rate_header = mocker.Mock(side_effect=usages_pause_intervals) - mocker.patch.object(fb_api, "_parse_call_rate_header", mock_parse_call_rate_header) - mocker.patch.object(fb_api, "MIN_PAUSE_INTERVAL", min_pause_interval) - - output = fb_api._get_max_usage_pause_interval_from_batch(records) - fb_api._parse_call_rate_header.assert_called_with( - {"usage": usages_pause_intervals[-1][0], "pause_interval": usages_pause_intervals[-1][1]} - ) - assert output == expected_output - - @pytest.mark.parametrize( - "params,min_rate,usage,expect_sleep", - [ - (["batch"], 0, 1, True), - (["batch"], 0, 0, True), - (["batch"], 2, 1, False), - (["not_batch"], 0, 1, True), - (["not_batch"], 0, 0, True), - (["not_batch"], 2, 1, False), - ], - ) - def test__handle_call_rate_limit(self, mocker, fb_api, params, min_rate, usage, expect_sleep): - pause_interval = 1 - mock_response = mocker.Mock() - - mocker.patch.object(fb_api, "MIN_RATE", min_rate) - mocker.patch.object(fb_api, "_get_max_usage_pause_interval_from_batch", mocker.Mock(return_value=(usage, pause_interval))) - mocker.patch.object(fb_api, "_parse_call_rate_header", mocker.Mock(return_value=(usage, pause_interval))) - mocker.patch.object(fb_api, "_compute_pause_interval") - mocker.patch.object(source_facebook_marketing.api, "logger") - mocker.patch.object(source_facebook_marketing.api, "sleep") - assert fb_api._handle_call_rate_limit(mock_response, params) is None - if "batch" in params: - fb_api._get_max_usage_pause_interval_from_batch.assert_called_with(mock_response.json.return_value) - else: - fb_api._parse_call_rate_header.assert_called_with(mock_response.headers.return_value) - if expect_sleep: - fb_api._compute_pause_interval.assert_called_with(usage=usage, pause_interval=pause_interval) - source_facebook_marketing.api.sleep.assert_called_with(fb_api._compute_pause_interval.return_value.total_seconds()) - source_facebook_marketing.api.logger.warning.assert_called_with( - f"Utilization is too high ({usage})%, pausing for {fb_api._compute_pause_interval.return_value}" - ) - - def test_find_account(self, api, account_id, requests_mock): - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/", [{"json": {"id": "act_test"}}]) - account = api.get_account(account_id) - assert isinstance(account, AdAccount) - assert account.get_id() == "act_test" diff --git a/source-facebook-marketing/tests/test_async_job.py b/source-facebook-marketing/tests/test_async_job.py deleted file mode 100644 index 625ba3ad69..0000000000 --- a/source-facebook-marketing/tests/test_async_job.py +++ /dev/null @@ -1,454 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import copy -import time -from typing import Iterator - -import pendulum -import pytest -from facebook_business.adobjects.ad import Ad -from facebook_business.adobjects.adaccount import AdAccount -from facebook_business.adobjects.adreportrun import AdReportRun -from facebook_business.adobjects.adset import AdSet -from facebook_business.adobjects.adsinsights import AdsInsights -from facebook_business.adobjects.campaign import Campaign -from facebook_business.api import FacebookAdsApiBatch, FacebookBadObjectError -from source_facebook_marketing.api import MyFacebookAdsApi -from source_facebook_marketing.streams.async_job import InsightAsyncJob, ParentAsyncJob, Status, update_in_batch - - -@pytest.fixture(name="adreport") -def adreport_fixture(mocker, api): - ao = AdReportRun(fbid="123", api=api) - ao["report_run_id"] = "123" - mocker.patch.object(ao, "api_get", side_effect=ao.api_get) - mocker.patch.object(ao, "get_result", side_effect=ao.get_result) - return ao - - -@pytest.fixture(name="account") -def account_fixture(mocker, adreport): - account = mocker.Mock(spec=AdAccount) - account.get_insights.return_value = adreport - return account - - -@pytest.fixture(name="job") -def job_fixture(api, account): - params = { - "level": "ad", - "action_breakdowns": [], - "breakdowns": [], - "fields": ["field1", "field2"], - "time_increment": 1, - "action_attribution_windows": [], - } - interval = pendulum.Interval(pendulum.Date(2019, 1, 1), pendulum.Date(2019, 1, 1)) - - return InsightAsyncJob(edge_object=account, api=api, interval=interval, params=params) - - -@pytest.fixture(name="grouped_jobs") -def grouped_jobs_fixture(mocker): - return [mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False, elapsed_time=None) for _ in range(10)] - - -@pytest.fixture(name="parent_job") -def parent_job_fixture(api, grouped_jobs): - interval = pendulum.Interval(pendulum.Date(2019, 1, 1), pendulum.Date(2019, 1, 1)) - return ParentAsyncJob(api=api, jobs=grouped_jobs, interval=interval) - - -@pytest.fixture(name="started_job") -def started_job_fixture(job, adreport, mocker): - adreport["async_status"] = Status.RUNNING.value - adreport["async_percent_completion"] = 0 - mocker.patch.object(job, "update_job", wraps=job.update_job) - job.start() - - return job - - -@pytest.fixture(name="completed_job") -def completed_job_fixture(started_job, adreport): - adreport["async_status"] = Status.COMPLETED.value - adreport["async_percent_completion"] = 100 - started_job.update_job() - - return started_job - - -@pytest.fixture(name="late_job") -def late_job_fixture(started_job, adreport): - adreport["async_status"] = Status.COMPLETED.value - adreport["async_percent_completion"] = 100 - started_job.update_job() - - return started_job - - -@pytest.fixture(name="failed_job") -def failed_job_fixture(started_job, adreport): - adreport["async_status"] = Status.FAILED.value - adreport["async_percent_completion"] = 0 - started_job.update_job() - - return started_job - - -@pytest.fixture(name="api") -def api_fixture(mocker): - api = mocker.Mock(spec=MyFacebookAdsApi) - api.call().json.return_value = {} - api.call().error.return_value = False - - return api - - -@pytest.fixture(name="batch") -def batch_fixture(api, mocker): - batch = FacebookAdsApiBatch(api=api) - mocker.patch.object(batch, "execute", wraps=batch.execute) - api.new_batch.return_value = batch - - return batch - - -class TestUpdateInBatch: - """Test update_in_batch""" - - def test_less_jobs(self, api, started_job, batch): - """Should update all jobs when number of jobs less than max size of batch""" - jobs = [started_job for _ in range(49)] - - update_in_batch(api=api, jobs=jobs) - - assert started_job.update_job.call_count == 49 - assert len(api.new_batch.return_value) == 49 - batch.execute.assert_called_once() - - def test_more_jobs(self, api, started_job, batch): - """Should update all jobs when number of jobs greater than max size of batch""" - second_batch = copy.deepcopy(batch) - jobs = [started_job for _ in range(55)] - api.new_batch.return_value = None - api.new_batch.side_effect = [batch, second_batch] - - update_in_batch(api=api, jobs=jobs) - - assert started_job.update_job.call_count == 55 - assert len(batch) == 50 - batch.execute.assert_called_once() - assert len(second_batch) == 5 - second_batch.execute.assert_called_once() - - def test_failed_execution(self, api, started_job, batch): - """Should execute batch until there are no failed tasks""" - jobs = [started_job for _ in range(49)] - batch.execute.side_effect = [batch, batch, None] - - update_in_batch(api=api, jobs=jobs) - - assert started_job.update_job.call_count == 49 - assert len(api.new_batch.return_value) == 49 - assert batch.execute.call_count == 3 - - -class TestInsightAsyncJob: - """Test InsightAsyncJob class""" - - def test_start(self, job): - job.start() - - assert job._job - assert job.elapsed_time - - def test_start_already_started(self, job): - job.start() - - with pytest.raises(RuntimeError, match=r": Incorrect usage of start - the job already started, use restart instead"): - job.start() - - def test_restart(self, failed_job, api, adreport): - assert failed_job.attempt_number == 1 - - failed_job.restart() - - assert not failed_job.failed, "restart should reset fail flag" - assert failed_job.attempt_number == 2 - - def test_restart_when_job_not_failed(self, job, api): - job.start() - assert not job.failed - - with pytest.raises(RuntimeError, match=r": Incorrect usage of restart - only failed jobs can be restarted"): - job.restart() - - def test_restart_when_job_not_started(self, job): - with pytest.raises(RuntimeError, match=r": Incorrect usage of restart - only failed jobs can be restarted"): - job.restart() - - def test_update_job_not_started(self, job): - with pytest.raises(RuntimeError, match=r": Incorrect usage of the method - the job is not started"): - job.update_job() - - def test_update_job_on_completed_job(self, completed_job, adreport): - completed_job.update_job() - - adreport.api_get.assert_called_once() - - def test_update_job(self, started_job, adreport): - started_job.update_job() - - adreport.api_get.assert_called_once() - - def test_update_job_expired(self, started_job, adreport, mocker): - mocker.patch.object(started_job, "job_timeout", new=pendulum.Duration()) - - started_job.update_job() - assert started_job.failed - - def test_update_job_with_batch(self, started_job, adreport, mocker): - response = mocker.Mock() - - response.json.return_value = { - "id": "1128003977936306", - "account_id": "212551616838260", - "time_ref": 1642989751, - "time_completed": 1642989754, - "async_status": "Job Completed", - "async_percent_completion": 100, - "date_start": "2021-02-24", - "date_stop": "2021-02-24", - } - response.body.return_value = "Some error" - batch_mock = mocker.Mock(spec=FacebookAdsApiBatch) - - started_job.update_job(batch=batch_mock) - - adreport.api_get.assert_called_once() - args, kwargs = adreport.api_get.call_args - assert kwargs["batch"] == batch_mock - - kwargs["success"](response) - assert started_job.completed - - kwargs["failure"](response) - - def test_elapsed_time(self, job, api, adreport): - assert job.elapsed_time is None, "should be None for the job that is not started" - - job.start() - adreport["async_status"] = Status.COMPLETED.value - adreport["async_percent_completion"] = 0 - job.update_job() - - assert job.elapsed_time, "should be set for the job that is running" - - elapsed_1 = job.elapsed_time - time.sleep(1) - elapsed_2 = job.elapsed_time - - assert elapsed_2 == elapsed_1, "should not change after job completed" - - def test_completed_without_start(self, job, api, adreport): - assert not job.completed - assert not job.failed - - def test_completed_ok(self, completed_job, api, adreport): - assert completed_job.completed, "should return True if the job was completed" - assert not completed_job.failed, "failed should be set to False" - - def test_completed_failed(self, failed_job, api, adreport): - assert failed_job.completed - assert failed_job.failed - - def test_completed_skipped(self, failed_job, api, adreport): - adreport["async_status"] = Status.SKIPPED.value - assert failed_job.completed - assert failed_job.failed - - def test_failed_no(self, job): - assert not job.failed, "should return False for active job" - - def test_failed_yes(self, failed_job): - assert failed_job.failed, "should return True if the job previously failed" - - def test_str(self, api, account): - interval = pendulum.Interval(pendulum.Date(2010, 1, 1), pendulum.Date(2011, 1, 1)) - job = InsightAsyncJob( - edge_object=account, - api=api, - params={"breakdowns": [10, 20]}, - interval=interval, - ) - - assert str(job) == f"InsightAsyncJob(id=, {account}, time_range= 2011-01-01]>, breakdowns=[10, 20])" - - def test_get_result(self, job, adreport, api): - job.start() - api.call().json.return_value = {"data": [{"some_data": 123}, {"some_data": 77}]} - - result = job.get_result() - - adreport.get_result.assert_called_once() - assert len(result) == 2 - assert isinstance(result[0], AdsInsights) - assert result[0].export_all_data() == {"some_data": 123} - assert result[1].export_all_data() == {"some_data": 77} - - def test_get_result_retried(self, mocker, job, api): - job.start() - api.call().json.return_value = {"data": [{"some_data": 123}, {"some_data": 77}]} - ads_insights = AdsInsights(api=api) - ads_insights._set_data({"items": [{"some_data": 123}, {"some_data": 77}]}) - with mocker.patch( - "facebook_business.adobjects.objectparser.ObjectParser.parse_multiple", - side_effect=[FacebookBadObjectError("Bad data to set object data"), ads_insights], - ): - # in case this is not retried, an error will be raised - job.get_result() - - def test_get_result_when_job_is_not_started(self, job): - with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started or failed"): - job.get_result() - - def test_get_result_when_job_is_failed(self, failed_job): - with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started or failed"): - failed_job.get_result() - - @pytest.mark.parametrize( - ("edge_class", "next_edge_class", "id_field"), - [ - (AdAccount, Campaign, "campaign_id"), - (Campaign, AdSet, "adset_id"), - (AdSet, Ad, "ad_id"), - ], - ) - def test_split_job(self, mocker, api, edge_class, next_edge_class, id_field): - """Test that split will correctly downsize edge_object""" - today = pendulum.today().date() - start, end = today - pendulum.duration(days=365 * 3 + 20), today - pendulum.duration(days=365 * 3 + 10) - params = {"time_increment": 1, "breakdowns": []} - job = InsightAsyncJob(api=api, edge_object=edge_class(1), interval=pendulum.Interval(start, end), params=params) - mocker.patch.object(edge_class, "get_insights", return_value=[{id_field: 1}, {id_field: 2}, {id_field: 3}]) - - small_jobs = job.split_job() - - edge_class.get_insights.assert_called_once_with( - params={ - "breakdowns": [], - "fields": [id_field], - "level": next_edge_class.__name__.lower(), - "time_range": {"since": (today - pendulum.duration(months=37)).to_date_string(), "until": end.to_date_string()}, - } - ) - assert len(small_jobs) == 3 - assert all(j.interval == job.interval for j in small_jobs) - for i, small_job in enumerate(small_jobs, start=1): - assert small_job._params["time_range"] == job._params["time_range"] - assert str(small_job) == f"InsightAsyncJob(id=, {next_edge_class(i)}, time_range={job.interval}, breakdowns={[]})" - - def test_split_job_smallest(self, mocker, api): - """Test that split will correctly downsize edge_object""" - interval = pendulum.Interval(pendulum.Date(2010, 1, 1), pendulum.Date(2010, 1, 10)) - params = {"time_increment": 1, "breakdowns": []} - job = InsightAsyncJob(api=api, edge_object=Ad(1), interval=interval, params=params) - - with pytest.raises(ValueError, match="The job is already splitted to the smallest size."): - job.split_job() - - -class TestParentAsyncJob: - def test_start(self, parent_job, grouped_jobs): - parent_job.start() - for job in grouped_jobs: - job.start.assert_called_once() - - def test_restart(self, parent_job, grouped_jobs): - assert not parent_job.failed, "initially not failed" - - # fail some jobs - grouped_jobs[0].failed = True - grouped_jobs[0].attempt_number = 2 - grouped_jobs[5].failed = True - grouped_jobs[0].attempt_number = 2 - grouped_jobs[6].attempt_number = 3 - - assert parent_job.failed, "should be failed if any job failed" - parent_job.restart() - assert parent_job.failed - assert parent_job.attempt_number == 3, "restart should be max value of all jobs" - - def test_completed(self, parent_job, grouped_jobs): - assert not parent_job.completed, "initially not completed" - - # complete some jobs - grouped_jobs[0].completed = True - grouped_jobs[5].completed = True - - assert not parent_job.completed, "not completed until all jobs completed" - - # complete all jobs - for job in grouped_jobs: - job.completed = True - - assert parent_job.completed, "completed because all jobs completed" - - def test_update_job(self, parent_job, grouped_jobs, api, batch): - """Checks jobs status in advance and restart if some failed.""" - parent_job.update_job() - - # assert - for job in grouped_jobs: - job.update_job.assert_called_once_with(batch=batch) - - def test_get_result(self, parent_job, grouped_jobs): - """Retrieve result of the finished job.""" - for job in grouped_jobs: - job.get_result.return_value = [] - grouped_jobs[0].get_result.return_value = range(3, 8) - grouped_jobs[6].get_result.return_value = range(4, 11) - - generator = parent_job.get_result() - - assert isinstance(generator, Iterator) - assert list(generator) == list(range(3, 8)) + list(range(4, 11)) - - def test_split_job(self, parent_job, grouped_jobs, mocker): - grouped_jobs[0].failed = True - grouped_jobs[0].split_job.return_value = [mocker.Mock(spec=InsightAsyncJob), mocker.Mock(spec=InsightAsyncJob)] - grouped_jobs[5].failed = True - grouped_jobs[5].split_job.return_value = [ - mocker.Mock(spec=InsightAsyncJob), - mocker.Mock(spec=InsightAsyncJob), - mocker.Mock(spec=InsightAsyncJob), - ] - - small_jobs = parent_job.split_job() - - assert len(small_jobs) == len(grouped_jobs) + 5 - 2, "each failed job must be replaced with its split" - for i, job in enumerate(grouped_jobs): - if i in (0, 5): - job.split_job.assert_called_once() - else: - job.split_job.assert_not_called() - - def test_split_job_smallest(self, parent_job, grouped_jobs): - grouped_jobs[0].failed = True - grouped_jobs[0].split_job.side_effect = ValueError("Mocking smallest size") - - # arbitrarily testing this X times, the max attempts is handled by async_job_manager rather than the job itself. - count = 0 - while count < 10: - split_jobs = parent_job.split_job() - assert len(split_jobs) == len( - grouped_jobs - ), "attempted to split job at smallest size so should just restart job meaning same no. of jobs" - grouped_jobs[0].attempt_number += 1 - count += 1 - - def test_str(self, parent_job, grouped_jobs): - assert str(parent_job) == f"ParentAsyncJob({grouped_jobs[0]} ... {len(grouped_jobs) - 1} jobs more)" diff --git a/source-facebook-marketing/tests/test_async_job_manager.py b/source-facebook-marketing/tests/test_async_job_manager.py deleted file mode 100644 index 9134766e9f..0000000000 --- a/source-facebook-marketing/tests/test_async_job_manager.py +++ /dev/null @@ -1,176 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import pytest -from facebook_business.api import FacebookAdsApiBatch -from source_facebook_marketing.api import MyFacebookAdsApi -from source_facebook_marketing.streams.async_job import InsightAsyncJob, ParentAsyncJob -from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager -from source_facebook_marketing.streams.common import JobException - - -@pytest.fixture(name="api") -def api_fixture(mocker): - api = mocker.Mock() - api.api.ads_insights_throttle = MyFacebookAdsApi.Throttle(0, 0) - api.api.new_batch.return_value = mocker.MagicMock(spec=FacebookAdsApiBatch) - return api - - -@pytest.fixture(name="time_mock") -def time_mock_fixture(mocker): - return mocker.patch("source_facebook_marketing.streams.async_job_manager.time") - - -@pytest.fixture(name="update_job_mock") -def update_job_mock_fixture(mocker): - return mocker.patch("source_facebook_marketing.streams.async_job_manager.update_in_batch") - - -class TestInsightAsyncManager: - def test_jobs_empty(self, api, some_config): - """Should work event without jobs""" - manager = InsightAsyncJobManager(api=api, jobs=[], account_id=some_config["account_ids"][0]) - jobs = list(manager.completed_jobs()) - assert not jobs - - def test_jobs_completed_immediately(self, api, mocker, time_mock, some_config): - """Manager should emmit jobs without waiting if they completed""" - jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False), - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False), - ] - manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0]) - completed_jobs = list(manager.completed_jobs()) - assert jobs == completed_jobs - time_mock.sleep.assert_not_called() - - def test_jobs_wait(self, api, mocker, time_mock, update_job_mock, some_config): - """Manager should return completed jobs and wait for others""" - - def update_job_behaviour(): - jobs[1].completed = True - yield - yield - jobs[0].completed = True - yield - - update_job_mock.side_effect = update_job_behaviour() - jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False), - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False), - ] - manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0]) - - job = next(manager.completed_jobs(), None) - assert job == jobs[1] - time_mock.sleep.assert_not_called() - - job = next(manager.completed_jobs(), None) - assert job == jobs[0] - time_mock.sleep.assert_called_with(InsightAsyncJobManager.JOB_STATUS_UPDATE_SLEEP_SECONDS) - - job = next(manager.completed_jobs(), None) - assert job is None - - def test_job_restarted(self, api, mocker, time_mock, update_job_mock, some_config): - """Manager should restart failed jobs""" - - def update_job_behaviour(): - jobs[1].failed = True - yield - jobs[1].failed = False - jobs[1].completed = True - yield - - update_job_mock.side_effect = update_job_behaviour() - jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False), - ] - manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0]) - - job = next(manager.completed_jobs(), None) - assert job == jobs[0] - jobs[1].restart.assert_called_once() - - job = next(manager.completed_jobs(), None) - assert job == jobs[1] - - job = next(manager.completed_jobs(), None) - assert job is None - - def test_job_split(self, api, mocker, time_mock, update_job_mock, some_config): - """Manager should split failed jobs when they fail second time""" - - def update_job_behaviour(): - jobs[1].failed = True - jobs[1].attempt_number = 2 - yield from range(10) - - update_job_mock.side_effect = update_job_behaviour() - jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False), - ] - sub_jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), - ] - sub_jobs[0].get_result.return_value = [1, 2] - sub_jobs[1].get_result.return_value = [3, 4] - jobs[1].split_job.return_value = sub_jobs - manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0]) - - job = next(manager.completed_jobs(), None) - assert job == jobs[0] - jobs[1].split_job.assert_called_once() - - job = next(manager.completed_jobs(), None) - assert isinstance(job, ParentAsyncJob) - assert list(job.get_result()) == [1, 2, 3, 4] - - job = next(manager.completed_jobs(), None) - assert job is None - - def test_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock, some_config): - """Manager should fail when job failed too many times""" - - def update_job_behaviour(): - jobs[1].failed = True - jobs[1].attempt_number = InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS - yield from range(10) - - update_job_mock.side_effect = update_job_behaviour() - jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False), - ] - manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0]) - - with pytest.raises(JobException, match=f"{jobs[1]}: failed more than {InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS} times."): - next(manager.completed_jobs(), None) - - def test_nested_job_failed_too_many_times(self, api, mocker, time_mock, update_job_mock, some_config): - """Manager should fail when a nested job within a ParentAsyncJob failed too many times""" - - def update_job_behaviour(): - jobs[1].failed = True - sub_jobs[1].failed = True - sub_jobs[1].attempt_number = InsightAsyncJobManager.MAX_NUMBER_OF_ATTEMPTS - yield from range(10) - - update_job_mock.side_effect = update_job_behaviour() - sub_jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=False), - ] - jobs = [ - mocker.Mock(spec=InsightAsyncJob, attempt_number=1, failed=False, completed=True), - mocker.Mock(spec=ParentAsyncJob, _jobs=sub_jobs, attempt_number=1, failed=False, completed=False), - ] - manager = InsightAsyncJobManager(api=api, jobs=jobs, account_id=some_config["account_ids"][0]) - - with pytest.raises(JobException): - next(manager.completed_jobs(), None) \ No newline at end of file diff --git a/source-facebook-marketing/tests/test_base_insight_streams.py b/source-facebook-marketing/tests/test_base_insight_streams.py deleted file mode 100644 index 80b46c69b1..0000000000 --- a/source-facebook-marketing/tests/test_base_insight_streams.py +++ /dev/null @@ -1,459 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from datetime import datetime - -import pendulum -import pytest -from airbyte_cdk.models import SyncMode -from pendulum import duration -from source_facebook_marketing.streams import AdsInsights -from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob - - -@pytest.fixture(name="api") -def api_fixture(mocker): - api = mocker.Mock() - api.api.ads_insights_throttle = (0, 0) - return api - - -@pytest.fixture(name="old_start_date") -def old_start_date_fixture(): - return pendulum.now() - duration(months=37 + 1) - - -@pytest.fixture(name="recent_start_date") -def recent_start_date_fixture(): - return pendulum.now() - duration(days=10) - - -@pytest.fixture(name="start_date") -def start_date_fixture(): - return pendulum.now() - duration(months=12) - - -@pytest.fixture(name="async_manager_mock") -def async_manager_mock_fixture(mocker): - mock = mocker.patch("source_facebook_marketing.streams.base_insight_streams.InsightAsyncJobManager") - mock.return_value = mock - return mock - - -@pytest.fixture(name="async_job_mock") -def async_job_mock_fixture(mocker): - mock = mocker.patch("source_facebook_marketing.streams.base_insight_streams.InsightAsyncJob") - mock.side_effect = lambda api, **kwargs: {"api": api, **kwargs} - - -class TestBaseInsightsStream: - def test_init(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - insights_lookback_window=28, - ) - - assert not stream.breakdowns - assert stream.action_breakdowns == ["action_type", "action_target_id", "action_destination"] - assert stream.name == "ads_insights" - assert stream.primary_key == ["date_start", "account_id", "ad_id"] - assert stream.action_report_time == "mixed" - - def test_init_override(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - name="CustomName", - breakdowns=["test1", "test2"], - action_breakdowns=["field1", "field2"], - insights_lookback_window=28, - ) - - assert stream.breakdowns == ["test1", "test2"] - assert stream.action_breakdowns == ["field1", "field2"] - assert stream.name == "custom_name" - assert stream.primary_key == ["date_start", "account_id", "ad_id", "test1", "test2"] - - def test_read_records_all(self, mocker, api, some_config): - """1. yield all from mock - 2. if read slice 2, 3 state not changed - if read slice 2, 3, 1 state changed to 3 - """ - job = mocker.Mock(spec=InsightAsyncJob) - job.get_result.return_value = [mocker.Mock(), mocker.Mock(), mocker.Mock()] - job.interval = pendulum.Period(pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1)) - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - insights_lookback_window=28, - ) - - records = list( - stream.read_records( - sync_mode=SyncMode.incremental, - stream_slice={"insight_job": job, "account_id": some_config["account_ids"][0]}, - ) - ) - - assert len(records) == 3 - - def test_read_records_random_order(self, mocker, api, some_config): - """1. yield all from mock - 2. if read slice 2, 3 state not changed - if read slice 2, 3, 1 state changed to 3 - """ - job = mocker.Mock(spec=AsyncJob) - job.get_result.return_value = [mocker.Mock(), mocker.Mock(), mocker.Mock()] - job.interval = pendulum.Period(pendulum.date(2010, 1, 1), pendulum.date(2010, 1, 1)) - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - insights_lookback_window=28, - ) - - records = list( - stream.read_records( - sync_mode=SyncMode.incremental, - stream_slice={"insight_job": job, "account_id": some_config["account_ids"][0]}, - ) - ) - - assert len(records) == 3 - - @pytest.mark.parametrize( - "state,result_state", - [ - # Old format - ( - { - AdsInsights.cursor_field: "2010-10-03", - "slices": [ - "2010-01-01", - "2010-01-02", - ], - "time_increment": 1, - }, - { - "unknown_account": { - AdsInsights.cursor_field: "2010-10-03", - "slices": { - "2010-01-01", - "2010-01-02", - }, - }, - "time_increment": 1, - }, - ), - ( - { - AdsInsights.cursor_field: "2010-10-03", - }, - { - "unknown_account": { - AdsInsights.cursor_field: "2010-10-03", - } - }, - ), - ( - { - "slices": [ - "2010-01-01", - "2010-01-02", - ] - }, - { - "unknown_account": { - "slices": { - "2010-01-01", - "2010-01-02", - } - } - }, - ), - # New format - nested with account_id - ( - { - "unknown_account": { - AdsInsights.cursor_field: "2010-10-03", - "slices": { - "2010-01-01", - "2010-01-02", - }, - }, - "time_increment": 1, - }, - None, - ), - ( - { - "unknown_account": { - AdsInsights.cursor_field: "2010-10-03", - } - }, - None, - ), - ( - { - "unknown_account": { - "slices": { - "2010-01-01", - "2010-01-02", - } - } - }, - None, - ), - ], - ) - def test_state(self, api, state, result_state, some_config): - """State setter/getter should work with all combinations""" - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - insights_lookback_window=28, - ) - - assert stream.state == {"time_increment": 1, "unknown_account": {"slices": set()}} - - stream.state = state - actual_state = stream.state - - result_state = state if not result_state else result_state - result_state[some_config["account_ids"][0]]["slices"] = result_state[some_config["account_ids"][0]].get("slices", set()) - result_state["time_increment"] = 1 - - assert actual_state == result_state - - def test_stream_slices_no_state(self, api, async_manager_mock, start_date, some_config): - """Stream will use start_date when there is not state""" - end_date = start_date + duration(weeks=2) - stream = AdsInsights( - api=api, account_ids=some_config["account_ids"], start_date=start_date, end_date=end_date, insights_lookback_window=28 - ) - async_manager_mock.completed_jobs.return_value = [1, 2, 3] - - slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental)) - - assert slices == [ - {"account_id": "unknown_account", "insight_job": 1}, - {"account_id": "unknown_account", "insight_job": 2}, - {"account_id": "unknown_account", "insight_job": 3}, - ] - async_manager_mock.assert_called_once() - args, kwargs = async_manager_mock.call_args - generated_jobs = list(kwargs["jobs"]) - assert len(generated_jobs) == (end_date - start_date).days + 1 - assert generated_jobs[0].interval.start == start_date.date() - assert generated_jobs[1].interval.start == start_date.date() + duration(days=1) - - def test_stream_slices_no_state_close_to_now(self, api, async_manager_mock, recent_start_date, some_config): - """Stream will use start_date when there is not state and start_date within 28d from now""" - start_date = recent_start_date - end_date = pendulum.now() - stream = AdsInsights( - api=api, account_ids=some_config["account_ids"], start_date=start_date, end_date=end_date, insights_lookback_window=28 - ) - async_manager_mock.completed_jobs.return_value = [1, 2, 3] - - slices = list(stream.stream_slices(stream_state=None, sync_mode=SyncMode.incremental)) - - assert slices == [ - {"account_id": "unknown_account", "insight_job": 1}, - {"account_id": "unknown_account", "insight_job": 2}, - {"account_id": "unknown_account", "insight_job": 3}, - ] - async_manager_mock.assert_called_once() - args, kwargs = async_manager_mock.call_args - generated_jobs = list(kwargs["jobs"]) - assert len(generated_jobs) == (end_date - start_date).days + 1 - assert generated_jobs[0].interval.start == start_date.date() - assert generated_jobs[1].interval.start == start_date.date() + duration(days=1) - - def test_stream_slices_with_state(self, api, async_manager_mock, start_date, some_config): - """Stream will use cursor_value from state when there is state""" - end_date = start_date + duration(days=10) - cursor_value = start_date + duration(days=5) - state = {AdsInsights.cursor_field: cursor_value.date().isoformat()} - stream = AdsInsights( - api=api, account_ids=some_config["account_ids"], start_date=start_date, end_date=end_date, insights_lookback_window=28 - ) - async_manager_mock.completed_jobs.return_value = [1, 2, 3] - - slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)) - - assert slices == [ - {"account_id": "unknown_account", "insight_job": 1}, - {"account_id": "unknown_account", "insight_job": 2}, - {"account_id": "unknown_account", "insight_job": 3}, - ] - async_manager_mock.assert_called_once() - args, kwargs = async_manager_mock.call_args - generated_jobs = list(kwargs["jobs"]) - assert len(generated_jobs) == (end_date - cursor_value).days - assert generated_jobs[0].interval.start == cursor_value.date() + duration(days=1) - assert generated_jobs[1].interval.start == cursor_value.date() + duration(days=2) - - def test_stream_slices_with_state_close_to_now(self, api, async_manager_mock, recent_start_date, some_config): - """Stream will use start_date when close to now and start_date close to now""" - start_date = recent_start_date - end_date = pendulum.now() - cursor_value = end_date - duration(days=1) - state = {AdsInsights.cursor_field: cursor_value.date().isoformat()} - stream = AdsInsights( - api=api, account_ids=some_config["account_ids"], start_date=start_date, end_date=end_date, insights_lookback_window=28 - ) - async_manager_mock.completed_jobs.return_value = [1, 2, 3] - - slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)) - - assert slices == [ - {"account_id": "unknown_account", "insight_job": 1}, - {"account_id": "unknown_account", "insight_job": 2}, - {"account_id": "unknown_account", "insight_job": 3}, - ] - async_manager_mock.assert_called_once() - args, kwargs = async_manager_mock.call_args - generated_jobs = list(kwargs["jobs"]) - assert len(generated_jobs) == (end_date - start_date).days + 1 - assert generated_jobs[0].interval.start == start_date.date() - assert generated_jobs[1].interval.start == start_date.date() + duration(days=1) - - @pytest.mark.parametrize("state_format", ["old_format", "new_format"]) - def test_stream_slices_with_state_and_slices(self, api, async_manager_mock, start_date, some_config, state_format): - """Stream will use cursor_value from state, but will skip saved slices""" - end_date = start_date + duration(days=10) - cursor_value = start_date + duration(days=5) - - if state_format == "old_format": - state = { - AdsInsights.cursor_field: cursor_value.date().isoformat(), - "slices": [(cursor_value + duration(days=1)).date().isoformat(), (cursor_value + duration(days=3)).date().isoformat()], - } - else: - state = { - "unknown_account": { - AdsInsights.cursor_field: cursor_value.date().isoformat(), - "slices": [(cursor_value + duration(days=1)).date().isoformat(), (cursor_value + duration(days=3)).date().isoformat()], - } - } - stream = AdsInsights( - api=api, account_ids=some_config["account_ids"], start_date=start_date, end_date=end_date, insights_lookback_window=28 - ) - async_manager_mock.completed_jobs.return_value = [1, 2, 3] - - slices = list(stream.stream_slices(stream_state=state, sync_mode=SyncMode.incremental)) - - assert slices == [ - {"account_id": "unknown_account", "insight_job": 1}, - {"account_id": "unknown_account", "insight_job": 2}, - {"account_id": "unknown_account", "insight_job": 3}, - ] - async_manager_mock.assert_called_once() - args, kwargs = async_manager_mock.call_args - generated_jobs = list(kwargs["jobs"]) - assert len(generated_jobs) == (end_date - cursor_value).days - 2, "should be 2 slices short because of state" - assert generated_jobs[0].interval.start == cursor_value.date() + duration(days=2) - assert generated_jobs[1].interval.start == cursor_value.date() + duration(days=4) - - def test_get_json_schema(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - insights_lookback_window=28, - ) - - schema = stream.get_json_schema() - - assert "device_platform" not in schema["properties"] - assert "country" not in schema["properties"] - assert not (set(stream.fields()) - set(schema["properties"].keys())), "all fields present in schema" - - def test_get_json_schema_custom(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - breakdowns=["device_platform", "country"], - insights_lookback_window=28, - ) - - schema = stream.get_json_schema() - - assert "device_platform" in schema["properties"] - assert "country" in schema["properties"] - assert not (set(stream.fields()) - set(schema["properties"].keys())), "all fields present in schema" - - def test_fields(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - insights_lookback_window=28, - ) - - fields = stream.fields() - - assert "account_id" in fields - assert "account_currency" in fields - assert "actions" in fields - - def test_fields_custom(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - fields=["account_id", "account_currency"], - insights_lookback_window=28, - ) - - assert stream.fields() == ["account_id", "account_currency"] - schema = stream.get_json_schema() - assert schema["properties"].keys() == set(["account_currency", "account_id", stream.cursor_field, "date_stop", "ad_id"]) - - def test_level_custom(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - fields=["account_id", "account_currency"], - insights_lookback_window=28, - level="adset", - ) - - assert stream.level == "adset" - - def test_breackdowns_fields_present_in_response_data(self, api, some_config): - stream = AdsInsights( - api=api, - account_ids=some_config["account_ids"], - start_date=datetime(2010, 1, 1), - end_date=datetime(2011, 1, 1), - breakdowns=["age", "gender"], - insights_lookback_window=28, - ) - - data = {"age": "0-100", "gender": "male"} - - assert stream._response_data_is_valid(data) - - data = {"id": "0000001", "name": "Pipenpodl Absakopalis"} - - assert not stream._response_data_is_valid(data) \ No newline at end of file diff --git a/source-facebook-marketing/tests/test_base_streams.py b/source-facebook-marketing/tests/test_base_streams.py deleted file mode 100644 index 9ba1676ec3..0000000000 --- a/source-facebook-marketing/tests/test_base_streams.py +++ /dev/null @@ -1,149 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from functools import partial -from typing import Any, Iterable, Mapping - -import pytest -from facebook_business import FacebookSession -from facebook_business.api import FacebookAdsApi, FacebookAdsApiBatch -from source_facebook_marketing.api import MyFacebookAdsApi -from source_facebook_marketing.streams.base_streams import FBMarketingIncrementalStream, FBMarketingStream - - -@pytest.fixture(name="mock_batch_responses") -def mock_batch_responses_fixture(requests_mock): - return partial(requests_mock.register_uri, "POST", f"{FacebookSession.GRAPH}/{FacebookAdsApi.API_VERSION}/") - - -@pytest.fixture(name="batch") -def batch_fixture(api, mocker): - batch = FacebookAdsApiBatch(api=api.api) - mocker.patch.object(batch, "execute", wraps=batch.execute) - mocker.patch.object(batch, "add_request", wraps=batch.add_request) - mocker.patch.object(MyFacebookAdsApi, "new_batch", return_value=batch) - return batch - - -class SomeTestStream(FBMarketingStream): - def list_objects(self, params: Mapping[str, Any]) -> Iterable: - yield from [] - - -class TestDateTimeValue: - def test_date_time_value(self): - record = { - "bla": "2023-01-19t20:38:59 0000", - "created_time": "2023-01-19t20:38:59 0000", - "creation_time": "2023-01-19t20:38:59 0000", - "updated_time": "2023-01-19t20:38:59 0000", - "event_time": "2023-01-19t20:38:59 0000", - "first_fired_time": "2023-01-19t20:38:59 0000", - "last_fired_time": "2023-01-19t20:38:59 0000", - "sub_list": [ - { - "bla": "2023-01-19t20:38:59 0000", - "created_time": "2023-01-19t20:38:59 0000", - "creation_time": "2023-01-19t20:38:59 0000", - "updated_time": "2023-01-19t20:38:59 0000", - "event_time": "2023-01-19t20:38:59 0000", - "first_fired_time": "2023-01-19t20:38:59 0000", - "last_fired_time": "2023-01-19t20:38:59 0000", - } - ], - "sub_entries1": { - "sub_entries2": { - "bla": "2023-01-19t20:38:59 0000", - "created_time": "2023-01-19t20:38:59 0000", - "creation_time": "2023-01-19t20:38:59 0000", - "updated_time": "2023-01-19t20:38:59 0000", - "event_time": "2023-01-19t20:38:59 0000", - "first_fired_time": "2023-01-19t20:38:59 0000", - "last_fired_time": "2023-01-19t20:38:59 0000", - } - }, - } - FBMarketingStream.fix_date_time(record) - assert { - "bla": "2023-01-19t20:38:59 0000", - "created_time": "2023-01-19T20:38:59+0000", - "creation_time": "2023-01-19T20:38:59+0000", - "updated_time": "2023-01-19T20:38:59+0000", - "event_time": "2023-01-19T20:38:59+0000", - "first_fired_time": "2023-01-19T20:38:59+0000", - "last_fired_time": "2023-01-19T20:38:59+0000", - "sub_list": [ - { - "bla": "2023-01-19t20:38:59 0000", - "created_time": "2023-01-19T20:38:59+0000", - "creation_time": "2023-01-19T20:38:59+0000", - "updated_time": "2023-01-19T20:38:59+0000", - "event_time": "2023-01-19T20:38:59+0000", - "first_fired_time": "2023-01-19T20:38:59+0000", - "last_fired_time": "2023-01-19T20:38:59+0000", - } - ], - "sub_entries1": { - "sub_entries2": { - "bla": "2023-01-19t20:38:59 0000", - "created_time": "2023-01-19T20:38:59+0000", - "creation_time": "2023-01-19T20:38:59+0000", - "updated_time": "2023-01-19T20:38:59+0000", - "event_time": "2023-01-19T20:38:59+0000", - "first_fired_time": "2023-01-19T20:38:59+0000", - "last_fired_time": "2023-01-19T20:38:59+0000", - } - }, - } == record - - -class ConcreteFBMarketingIncrementalStream(FBMarketingIncrementalStream): - cursor_field = "date" - - def list_objects(self, **kwargs): - return [] - - -@pytest.fixture -def incremental_class_instance(api): - return ConcreteFBMarketingIncrementalStream(api=api, account_ids=["123", "456", "789"], start_date=None, end_date=None) - - -class TestFBMarketingIncrementalStreamSliceAndState: - def test_stream_slices_multiple_accounts_with_state(self, incremental_class_instance): - stream_state = {"123": {"state_key": "state_value"}, "456": {"state_key": "another_state_value"}} - expected_slices = [ - {"account_id": "123", "stream_state": {"state_key": "state_value"}}, - {"account_id": "456", "stream_state": {"state_key": "another_state_value"}}, - {"account_id": "789", "stream_state": {}}, - ] - assert list(incremental_class_instance.stream_slices(stream_state)) == expected_slices - - def test_stream_slices_multiple_accounts_empty_state(self, incremental_class_instance): - expected_slices = [ - {"account_id": "123", "stream_state": {}}, - {"account_id": "456", "stream_state": {}}, - {"account_id": "789", "stream_state": {}}, - ] - assert list(incremental_class_instance.stream_slices()) == expected_slices - - def test_stream_slices_single_account_with_state(self, incremental_class_instance): - incremental_class_instance._account_ids = ["123"] - stream_state = {"state_key": "state_value"} - expected_slices = [{"account_id": "123", "stream_state": stream_state}] - assert list(incremental_class_instance.stream_slices(stream_state)) == expected_slices - - def test_stream_slices_single_account_empty_state(self, incremental_class_instance): - incremental_class_instance._account_ids = ["123"] - expected_slices = [{"account_id": "123", "stream_state": None}] - assert list(incremental_class_instance.stream_slices()) == expected_slices - - def test_get_updated_state(self, incremental_class_instance): - current_stream_state = {"123": {"date": "2021-01-15T00:00:00+00:00"}, "include_deleted": False} - latest_record = {"account_id": "123", "date": "2021-01-20T00:00:00+00:00"} - - expected_state = {"123": {"date": "2021-01-20T00:00:00+00:00", "include_deleted": False}, "include_deleted": False} - - new_state = incremental_class_instance.get_updated_state(current_stream_state, latest_record) - assert new_state == expected_state \ No newline at end of file diff --git a/source-facebook-marketing/tests/test_client.py b/source-facebook-marketing/tests/test_client.py deleted file mode 100644 index 0fc03e20d1..0000000000 --- a/source-facebook-marketing/tests/test_client.py +++ /dev/null @@ -1,253 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import json - -import pendulum -import pytest -from airbyte_cdk.models import SyncMode -from airbyte_cdk.utils import AirbyteTracedException -from facebook_business import FacebookAdsApi, FacebookSession -from facebook_business.exceptions import FacebookRequestError -from source_facebook_marketing.streams import Activities, AdAccount, AdCreatives, Campaigns, Videos - -FB_API_VERSION = FacebookAdsApi.API_VERSION - - -@pytest.fixture(name="fb_call_rate_response") -def fb_call_rate_response_fixture(): - error = { - "message": ( - "(#80000) There have been too many calls from this ad-account. Wait a bit and try again. " - "For more info, please refer to https://developers.facebook.com/docs/graph-api/overview/rate-limiting." - ), - "type": "OAuthException", - "code": 80000, - "error_subcode": 2446079, - "fbtrace_id": "this_is_fake_response", - } - - headers = {"x-app-usage": json.dumps({"call_count": 28, "total_time": 25, "total_cputime": 25})} - - return { - "json": { - "error": error, - }, - "status_code": 400, - "headers": headers, - } - - -@pytest.fixture(name="fb_call_amount_data_response") -def fb_call_amount_data_response_fixture(): - error = {"message": "Please reduce the amount of data you're asking for, then retry your request", "code": 1} - - return { - "json": { - "error": error, - }, - "status_code": 500, - } - - -class TestBackoff: - def test_limit_reached(self, mocker, requests_mock, api, fb_call_rate_response, account_id, some_config): - """Error once, check that we retry and not fail""" - # turn Campaigns into non batch mode to test non batch logic - campaign_responses = [ - fb_call_rate_response, - { - "json": {"data": [{"id": 1, "updated_time": "2020-09-25T00:00:00Z"}, {"id": 2, "updated_time": "2020-09-25T00:00:00Z"}]}, - "status_code": 200, - }, - ] - - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/campaigns", campaign_responses) - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/1/", [{"status_code": 200}]) - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/2/", [{"status_code": 200}]) - - stream = Campaigns(api=api, account_ids=[account_id], start_date=pendulum.now(), end_date=pendulum.now(), include_deleted=False) - try: - records = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}, stream_slice={"account_id": account_id})) - assert records - except FacebookRequestError: - pytest.fail("Call rate error has not being handled") - - def test_batch_limit_reached(self, requests_mock, api, fb_call_rate_response, account_id): - """Error once, check that we retry and not fail""" - responses = [ - fb_call_rate_response, - { - "json": { - "data": [ - { - "id": "123", - "object_type": "SHARE", - "status": "ACTIVE", - }, - { - "id": "1234", - "object_type": "SHARE", - "status": "ACTIVE", - }, - ], - "status_code": 200, - } - }, - ] - - batch_responses = [ - fb_call_rate_response, - { - "json": [ - {"body": json.dumps({"name": "creative 1"}), "code": 200, "headers": {}}, - {"body": json.dumps({"name": "creative 2"}), "code": 200, "headers": {}}, - ] - }, - ] - - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/adcreatives", responses) - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/", responses) - requests_mock.register_uri("POST", FacebookSession.GRAPH + f"/{FB_API_VERSION}/", batch_responses) - - stream = AdCreatives(api=api, account_ids=[account_id], include_deleted=False) - records = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}, stream_slice={"account_id": account_id})) - - assert records == [ - {"account_id": "unknown_account", "id": "123", "object_type": "SHARE", "status": "ACTIVE"}, - {"account_id": "unknown_account", "id": "1234", "object_type": "SHARE", "status": "ACTIVE"}, - ] - - @pytest.mark.parametrize( - "error_response", - [ - {"json": {"error": {}}, "status_code": 500}, - {"json": {"error": {"code": 104}}}, - {"json": {"error": {"code": 2}}, "status_code": 500}, - ], - ids=["server_error", "connection_reset_error", "temporary_oauth_error"], - ) - def test_common_error_retry(self, error_response, requests_mock, api, account_id): - """Error once, check that we retry and not fail""" - account_data = {"account_id": "unknown_account", "id": 1, "updated_time": "2020-09-25T00:00:00Z", "name": "Some name"} - responses = [ - error_response, - { - "json": account_data, - "status_code": 200, - }, - ] - - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/business_users", json={"data": []}) - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/", responses) - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/{account_data['id']}/", responses) - - stream = AdAccount(api=api, account_ids=[account_id]) - accounts = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}, stream_slice={"account_id": account_id})) - - assert accounts == [account_data] - - def test_limit_error_retry(self, fb_call_amount_data_response, requests_mock, api, account_id): - """Error every time, check limit parameter decreases by 2 times every new call""" - - res = requests_mock.register_uri( - "GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/campaigns", [fb_call_amount_data_response] - ) - - stream = Campaigns( - api=api, account_ids=[account_id], start_date=pendulum.now(), end_date=pendulum.now(), include_deleted=False, page_size=100 - ) - try: - list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}, stream_slice={"account_id": account_id})) - except AirbyteTracedException: - assert [x.qs.get("limit")[0] for x in res.request_history] == ["100", "50", "25", "12", "6"] - - def test_limit_error_retry_revert_page_size(self, requests_mock, api, account_id): - """Error every time, check limit parameter decreases by 2 times every new call""" - - error = { - "json": { - "error": { - "message": "An unknown error occurred", - "code": 1, - } - }, - "status_code": 500, - } - success = { - "json": { - "data": [], - "paging": { - "cursors": { - "after": "test", - }, - "next": f"https://graph.facebook.com/{FB_API_VERSION}/act_{account_id}/activities?limit=31&after=test", - }, - }, - "status_code": 200, - } - - res = requests_mock.register_uri( - "GET", - FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/activities", - [error, success, error, success], - ) - - stream = Activities( - api=api, account_ids=[account_id], start_date=pendulum.now(), end_date=pendulum.now(), include_deleted=False, page_size=100 - ) - try: - list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}, stream_slice={"account_id": account_id})) - except FacebookRequestError: - assert [x.qs.get("limit")[0] for x in res.request_history] == ["100", "50", "100", "50"] - - def test_start_date_not_provided(self, requests_mock, api, account_id): - success = { - "json": { - "data": [], - "paging": { - "cursors": { - "after": "test", - }, - "next": f"https://graph.facebook.com/{FB_API_VERSION}/act_{account_id}/activities?limit=31&after=test", - }, - }, - "status_code": 200, - } - - requests_mock.register_uri( - "GET", - FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/activities", - [success], - ) - - stream = Activities(api=api, account_ids=[account_id], start_date=None, end_date=None, include_deleted=False, page_size=100) - list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}, stream_slice={"account_id": account_id})) - - def test_limit_error_retry_next_page(self, fb_call_amount_data_response, requests_mock, api, account_id): - """Unlike the previous test, this one tests the API call fail on the second or more page of a request.""" - base_url = FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/advideos" - - res = requests_mock.register_uri( - "GET", - base_url, - [ - { - "json": { - "data": [{"id": 1, "updated_time": "2020-09-25T00:00:00Z"}, {"id": 2, "updated_time": "2020-09-25T00:00:00Z"}], - "paging": {"next": f"{base_url}?after=after_page_1&limit=100"}, - }, - "status_code": 200, - }, - fb_call_amount_data_response, - ], - ) - - stream = Videos( - api=api, account_ids=[account_id], start_date=pendulum.now(), end_date=pendulum.now(), include_deleted=False, page_size=100 - ) - try: - list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}, stream_slice={"account_id": account_id})) - except AirbyteTracedException: - assert [x.qs.get("limit")[0] for x in res.request_history] == ["100", "100", "50", "25", "12", "6"] \ No newline at end of file diff --git a/source-facebook-marketing/tests/test_deep_merge.py b/source-facebook-marketing/tests/test_deep_merge.py deleted file mode 100644 index ca0b43393f..0000000000 --- a/source-facebook-marketing/tests/test_deep_merge.py +++ /dev/null @@ -1,42 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -from copy import deepcopy - -from source_facebook_marketing.streams.common import deep_merge - - -def test_return_new_object(): - """Should return new object, arguments should not be modified""" - left = { - "key_1": { - "one": {"a", "b"}, - "two": "left_value", - }, - "key_2": [1, 2], - } - right = {"key_1": {"two": "right_value", "three": [1, 2, 3]}, "key_2": [3]} - expected_result = {"key_1": {"one": {"a", "b"}, "two": "right_value", "three": [1, 2, 3]}, "key_2": [1, 2, 3]} - - result = deep_merge(deepcopy(left), deepcopy(right)) - - assert left == left - assert right == right - assert result == expected_result - - -def test_sets(): - left = {1, 2, 3} - right = {4, 2, 1} - result = deep_merge(left, right) - - assert result == {1, 2, 3, 4} - - -def test_lists(): - left = [1, 2, 3] - right = [4, 2, 1] - result = deep_merge(left, right) - - assert result == [1, 2, 3, 4, 2, 1] diff --git a/source-facebook-marketing/tests/test_source.py b/source-facebook-marketing/tests/test_source.py deleted file mode 100644 index 9fa3f5924c..0000000000 --- a/source-facebook-marketing/tests/test_source.py +++ /dev/null @@ -1,153 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - - -from copy import deepcopy - -import pytest -from airbyte_cdk.models import AirbyteConnectionStatus, ConnectorSpecification, Status -from facebook_business import FacebookAdsApi, FacebookSession -from source_facebook_marketing import SourceFacebookMarketing -from source_facebook_marketing.spec import ConnectorConfig -from unittest.mock import call - -from .utils import command_check - - -@pytest.fixture(name="config") -def config_fixture(): - config = { - "account_ids": ["123"], - "credentials": { - "access_token": "TOKEN", - "client_id": "an-id", - "client_secret": "a-secret" - }, - "start_date": "2019-10-10T00:00:00Z", - "end_date": "2020-10-10T00:00:00Z", - } - - return config - - -@pytest.fixture -def config_gen(config): - def inner(**kwargs): - new_config = deepcopy(config) - # WARNING, no support deep dictionaries - new_config.update(kwargs) - return {k: v for k, v in new_config.items() if v is not ...} - - return inner - - -@pytest.fixture(name="api") -def api_fixture(mocker): - api_mock = mocker.patch("source_facebook_marketing.source.API") - api_mock.return_value = mocker.Mock(account=mocker.Mock(return_value=123)) - return api_mock - - -@pytest.fixture(name="logger_mock") -def logger_mock_fixture(mocker): - return mocker.patch("source_facebook_marketing.source.logger") - - -class TestSourceFacebookMarketing: - def test_check_connection_ok(self, api, config, logger_mock): - ok, error_msg = SourceFacebookMarketing().check_connection(logger_mock, config=config) - - assert ok - assert not error_msg - api.assert_called_once_with(account_id="123", access_token="TOKEN") - logger_mock.info.assert_called_once_with(f"Select account {api.return_value.account}") - - def test_check_connection_future_date_range(self, api, config, logger_mock): - config["start_date"] = "2219-10-10T00:00:00" - config["end_date"] = "2219-10-11T00:00:00" - assert SourceFacebookMarketing().check_connection(logger_mock, config=config) == ( - False, - "Date range can not be in the future.", - ) - - def test_check_connection_end_date_before_start_date(self, api, config, logger_mock): - config["start_date"] = "2019-10-10T00:00:00" - config["end_date"] = "2019-10-09T00:00:00" - assert SourceFacebookMarketing().check_connection(logger_mock, config=config) == ( - False, - "end_date must be equal or after start_date.", - ) - - def test_check_connection_empty_config(self, api, logger_mock): - config = {} - ok, error_msg = SourceFacebookMarketing().check_connection(logger_mock, config=config) - - assert not ok - assert error_msg - - def test_check_connection_invalid_config(self, api, config, logger_mock): - config.pop("start_date") - ok, error_msg = SourceFacebookMarketing().check_connection(logger_mock, config=config) - - assert not ok - assert error_msg - - def test_check_connection_exception(self, api, config, logger_mock): - api.side_effect = RuntimeError("Something went wrong!") - - with pytest.raises(RuntimeError, match="Something went wrong!"): - SourceFacebookMarketing().check_connection(logger_mock, config=config) - - def test_streams(self, config, api): - streams = SourceFacebookMarketing().streams(config) - - assert len(streams) == 16 - - def test_spec(self): - spec = SourceFacebookMarketing().spec() - - assert isinstance(spec, ConnectorSpecification) - - def test_get_custom_insights_streams(self, api, config): - config["custom_insights"] = [ - {"name": "test", "fields": ["account_id"], "breakdowns": ["ad_format_asset"], "action_breakdowns": ["action_device"]}, - ] - config = ConnectorConfig.parse_obj(config) - assert SourceFacebookMarketing().get_custom_insights_streams(api, config) - - def test_get_custom_insights_action_breakdowns_allow_empty(self, api, config): - config["custom_insights"] = [ - {"name": "test", "fields": ["account_id"], "breakdowns": ["ad_format_asset"], "action_breakdowns": []}, - ] - - config["action_breakdowns_allow_empty"] = False - streams = SourceFacebookMarketing().get_custom_insights_streams(api, ConnectorConfig.parse_obj(config)) - assert len(streams) == 1 - assert streams[0].breakdowns == ["ad_format_asset"] - assert streams[0].action_breakdowns == ["action_type", "action_target_id", "action_destination"] - - config["action_breakdowns_allow_empty"] = True - streams = SourceFacebookMarketing().get_custom_insights_streams(api, ConnectorConfig.parse_obj(config)) - assert len(streams) == 1 - assert streams[0].breakdowns == ["ad_format_asset"] - assert streams[0].action_breakdowns == [] - - -def test_check_config(config_gen, requests_mock): - requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FacebookAdsApi.API_VERSION}/act_123/", {}) - - source = SourceFacebookMarketing() - assert command_check(source, config_gen()) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None) - - status = command_check(source, config_gen(start_date="2019-99-10T00:00:00Z")) - assert status.status == Status.FAILED - - status = command_check(source, config_gen(end_date="2019-99-10T00:00:00Z")) - assert status.status == Status.FAILED - - with pytest.raises(Exception): - assert command_check(source, config_gen(start_date=...)) - - assert command_check(source, config_gen(end_date=...)) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None) - assert command_check(source, config_gen(end_date="")) == AirbyteConnectionStatus(status=Status.SUCCEEDED, message=None) diff --git a/source-facebook-marketing/tests/test_streams.py b/source-facebook-marketing/tests/test_streams.py deleted file mode 100644 index 569592805f..0000000000 --- a/source-facebook-marketing/tests/test_streams.py +++ /dev/null @@ -1,110 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import pendulum -import pytest -from pendulum import duration -from source_facebook_marketing.api import MyFacebookAdsApi -from source_facebook_marketing.streams import ( - AdsInsights, - AdsInsightsActionType, - AdsInsightsAgeAndGender, - AdsInsightsCountry, - AdsInsightsDma, - AdsInsightsPlatformAndDevice, - AdsInsightsRegion, -) -from source_facebook_marketing.streams.base_streams import FBMarketingStream -from source_facebook_marketing.streams.streams import fetch_thumbnail_data_url - - -def test_filter_all_statuses(api, mocker, some_config): - mocker.patch.multiple(FBMarketingStream, __abstractmethods__=set()) - expected = { - "filtering": [ - { - "field": "None.delivery_info", - "operator": "IN", - "value": [ - "active", - "archived", - "completed", - "limited", - "not_delivering", - "deleted", - "not_published", - "pending_review", - "permanently_deleted", - "recently_completed", - "recently_rejected", - "rejected", - "scheduled", - "inactive", - ], - } - ] - } - assert FBMarketingStream(api=api, account_ids=some_config["account_ids"])._filter_all_statuses() == expected - - -@pytest.mark.parametrize( - "url", ["https://graph.facebook.com", "https://graph.facebook.com?test=123%23%24%25%2A&test2=456", "https://graph.facebook.com?"] -) -def test_fetch_thumbnail_data_url(url, requests_mock): - requests_mock.get(url, status_code=200, headers={"content-type": "content-type"}, content=b"") - assert fetch_thumbnail_data_url(url) == "data:content-type;base64," - - -def test_parse_call_rate_header(): - headers = { - "x-business-use-case-usage": '{"test":[{"type":"ads_management","call_count":1,"total_cputime":1,' - '"total_time":1,"estimated_time_to_regain_access":1}]}' - } - assert MyFacebookAdsApi._parse_call_rate_header(headers) == (1, duration(minutes=1)) - - -@pytest.mark.parametrize( - "class_name, breakdowns, action_breakdowns", - [ - [AdsInsights, [], ["action_type", "action_target_id", "action_destination"]], - [AdsInsightsActionType, [], ["action_type"]], - [AdsInsightsAgeAndGender, ["age", "gender"], ["action_type", "action_target_id", "action_destination"]], - [AdsInsightsCountry, ["country"], ["action_type", "action_target_id", "action_destination"]], - [AdsInsightsDma, ["dma"], ["action_type", "action_target_id", "action_destination"]], - [AdsInsightsPlatformAndDevice, ["publisher_platform", "platform_position", "impression_device"], ["action_type"]], - [AdsInsightsRegion, ["region"], ["action_type", "action_target_id", "action_destination"]], - ], -) -def test_ads_insights_breakdowns(class_name, breakdowns, action_breakdowns, some_config): - kwargs = { - "api": None, - "account_ids": some_config["account_ids"], - "start_date": pendulum.now(), - "end_date": pendulum.now(), - "insights_lookback_window": 1, - } - stream = class_name(**kwargs) - assert stream.breakdowns == breakdowns - assert stream.action_breakdowns == action_breakdowns - - -def test_custom_ads_insights_breakdowns(some_config): - kwargs = { - "api": None, - "account_ids": some_config["account_ids"], - "start_date": pendulum.now(), - "end_date": pendulum.now(), - "insights_lookback_window": 1, - } - stream = AdsInsights(breakdowns=["mmm"], action_breakdowns=["action_destination"], **kwargs) - assert stream.breakdowns == ["mmm"] - assert stream.action_breakdowns == ["action_destination"] - - stream = AdsInsights(breakdowns=[], action_breakdowns=[], **kwargs) - assert stream.breakdowns == [] - assert stream.action_breakdowns == ["action_type", "action_target_id", "action_destination"] - - stream = AdsInsights(breakdowns=[], action_breakdowns=[], action_breakdowns_allow_empty=True, **kwargs) - assert stream.breakdowns == [] - assert stream.action_breakdowns == [] diff --git a/source-facebook-marketing/tests/test_utils.py b/source-facebook-marketing/tests/test_utils.py deleted file mode 100644 index 69e82932d7..0000000000 --- a/source-facebook-marketing/tests/test_utils.py +++ /dev/null @@ -1,51 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import freezegun -import pendulum -import pytest -from source_facebook_marketing.utils import DATA_RETENTION_PERIOD, validate_end_date, validate_start_date - -TODAY = pendulum.local(2023, 3, 31) - - -@pytest.mark.parametrize( - "field_name, date, expected_date, expected_messages", - [ - ( - "start_date", - TODAY.subtract(months=DATA_RETENTION_PERIOD - 1), - TODAY.subtract(months=DATA_RETENTION_PERIOD - 1), - [], - ), - ( - "start_date", - pendulum.local(2019, 1, 1), - pendulum.local(2020, 3, 1), - [ - f"The start date cannot be beyond 37 months from the current date. " - f"Set start date to {pendulum.local(2020, 3, 1)}." - ] - ), - ( - "start_date", - TODAY + pendulum.duration(months=1), - TODAY, - [f"The start date cannot be in the future. Set start date to today's date - {TODAY}."], - ), - ( - "end_date", - TODAY.subtract(months=DATA_RETENTION_PERIOD), - TODAY, - [f"The end date must be after start date. Set end date to {TODAY}."], - ), - ], -) -@freezegun.freeze_time("2023-03-31") -def test_date_validators(caplog, field_name, date, expected_date, expected_messages): - if field_name == "start_date": - assert validate_start_date(date) == expected_date - elif field_name == "end_date": - assert validate_end_date(expected_date, date) == expected_date - assert caplog.messages == expected_messages diff --git a/source-facebook-marketing/tests/utils.py b/source-facebook-marketing/tests/utils.py deleted file mode 100644 index bb97c9a1ae..0000000000 --- a/source-facebook-marketing/tests/utils.py +++ /dev/null @@ -1,19 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - - -from unittest import mock - -from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification -from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config - - -def command_check(source: Source, config): - logger = mock.MagicMock() - connector_config, _ = split_config(config) - if source.check_config_against_spec: - source_spec: ConnectorSpecification = source.spec(logger) - check_config_against_spec_or_exit(connector_config, source_spec) - return source.check(logger, config)