Skip to content

Commit

Permalink
source-facebook-marketing: support multiple account ids
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex-Bair committed Nov 22, 2024
1 parent ffc9623 commit 9930d28
Show file tree
Hide file tree
Showing 28 changed files with 1,388 additions and 2,899 deletions.
4 changes: 4 additions & 0 deletions source-facebook-marketing/acmeCo/activities.schema.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
---
properties:
account_id:
type:
- "null"
- string
actor_id:
type:
- string
Expand Down
4 changes: 4 additions & 0 deletions source-facebook-marketing/acmeCo/videos.schema.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
---
type: object
properties:
account_id:
type:
- "null"
- string
id:
type: string
ad_breaks:
Expand Down
10 changes: 5 additions & 5 deletions source-facebook-marketing/connector_config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
account_id_sops: ENC[AES256_GCM,data:DuMXhQYiyxxthsOHllmw,iv:wNw1RFVBe/AWvY6uycliIY3nPpzBiz+P4UBDr6f4nLI=,tag:7s2DQ+ApVrcfjMBNqOK+/w==,type:int]
account_ids: "196765372540622"
credentials:
access_token_sops: ENC[AES256_GCM,data:Re73N0PHhhJs7Umo5ONjzSXdb4xKe+XtOBI3r1padMyQo5Bki3bW7l+1+/v4ifQuq4RypBwxQOAXdJfarwqLMW6vKCpxzq+kcbEHkvFd+NTJIWduYl8lC5bbFHT6uTrBNMxaTv/N7LNTwohBss+iKc9kf+Dp7dwpUqikOohnvOZ9y6C4+p2lvanOUpVfw7mGL52uW8VALPiLWh1hgbn6ZXlk7MOAjZAboIzf2bCzkzZQIiTz/4NYVLcmbFbPx8DRjrUd6XU1yeLsfXbT3CNRbAJyNWwDJN9vwKErGQIKcfsXsP5hVb0rpFuSW+WBvpJ0avCR6N8vvM8I8mc5W47EMuMTAxI=,iv:hWmqMuuIUBm8Vm30DVzv5jkesBAIuxZnCzn2h/ysUZ8=,tag:uKzEZpI3M4IWiXGXIqncZQ==,type:str]
auth_type: OAuth Credentials
Expand Down Expand Up @@ -28,13 +28,13 @@ custom_insights:
level: ad
name: ads_insights_publisher_platform
start_date: "2021-11-01T00:00:00Z"
time_increment: 1
time_increment: 360
fetch_thumbnail_images: false
include_deleted: true
insights_lookback_window: 28
max_batch_size: 50
page_size: 100
start_date: "2021-11-01T00:00:00Z"
start_date: "2024-11-01T00:00:00Z"
sops:
kms: []
gcp_kms:
Expand All @@ -44,8 +44,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-02-05T23:41:33Z"
mac: ENC[AES256_GCM,data:ByAdg0VEy2xMpZXxAwa5oKB7CfG+eSA5ej/TJ2g9ZsIR+XhwJklI1XBW62VvahoGeC2AP9wYC/Mk7bP03914JZCz1v2gsbn6nL4WVuooIt0XMFgaquwWjoVrDa7Hy3iTHvr2Uwf4YcBWI/M3NwKpD/3f9TpNQRstrJq8pw6AC0k=,iv:Spd541rsKs5Si4AAdtxxB/muD9XiokkyEGBC8/wqAsg=,tag:eXTEjYrz/Vq4526D3E6whw==,type:str]
lastmodified: "2024-11-11T19:07:30Z"
mac: ENC[AES256_GCM,data:DnjkguTUM5RBYREQlUp9UJTbdQ1N8DNfTLWeF+ZYSPRzMGrfwYsAhrE/08GYy3/4mAJSOTvMVRnqc0FArZ5MxIl5ttgyEZpRWMA/Z2x7BQAlByvBO2FZcsv+PdmSbtThYO/9jLeAeCCyl6LUvguiUkHxCB4HMUdHTXbLCnQYe1w=,iv:nabT9Nmhz/hTHzM+8NLTFDNIQmciFAtpMxf9/P49b4A=,tag:qeFSf7/FvFI6md+fb7TVRg==,type:str]
pgp: []
encrypted_suffix: _sops
version: 3.8.1
1,830 changes: 1,006 additions & 824 deletions source-facebook-marketing/poetry.lock

Large diffs are not rendered by default.

14 changes: 8 additions & 6 deletions source-facebook-marketing/source_facebook_marketing/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import backoff
import pendulum
from cached_property import cached_property

from facebook_business import FacebookAdsApi
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookResponse
Expand Down Expand Up @@ -158,16 +158,18 @@ def call(
class API:
"""Simple wrapper around Facebook API"""

def __init__(self, account_id: str, access_token: str):
self._account_id = account_id
def __init__(self, access_token: str):
self._account_ids = {}
# design flaw in MyFacebookAdsApi requires such strange set of new default api instance
self.api = MyFacebookAdsApi.init(access_token=access_token, crash_log=False, api_version="v19.0")
FacebookAdsApi.set_default_api(self.api)

@cached_property
def account(self) -> AdAccount:
def get_account(self, account_id: str) -> AdAccount:
"""Find current account"""
return self._find_account(self._account_id)
if account_id in self._account_ids:
return self._account_ids[account_id]
self._account_ids[account_id] = self._find_account(account_id)
return self._account_ids[account_id]

@staticmethod
def _find_account(account_id: str) -> AdAccount:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
{
"properties": {
"account_id": {
"type": ["null", "string"]
},
"actor_id": {
"type": ["string"]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
{
"type": "object",
"properties": {
"account_id": {
"type": ["null", "string"]
},
"id": {
"type": "string"
},
Expand Down
33 changes: 23 additions & 10 deletions source-facebook-marketing/source_facebook_marketing/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,17 @@ def _validate_and_transform(self, config: Mapping[str, Any]):
config.setdefault("action_breakdowns_allow_empty", False)
if config.get("end_date") == "":
config.pop("end_date")

# Backwards compatible support for configs with the older
# account_id field instead of the newer account_ids.
if not config.get("account_ids", ""):
config["account_ids"] = config.get("account_id")

config = ConnectorConfig.parse_obj(config)

# Convert account_ids to a list so it's easier to work with throughout the connector.
config.account_ids = [n for n in config.account_ids.split(",")]

config.start_date = pendulum.instance(config.start_date)
config.end_date = pendulum.instance(config.end_date)
return config
Expand All @@ -66,17 +76,10 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
if config.end_date < config.start_date:
return False, "end_date must be equal or after start_date."

api = API(account_id=config.account_id, access_token=config.credentials.access_token)
logger.info(f"Select account {api.account}")
api = API(access_token=config.credentials.access_token)
except (requests.exceptions.RequestException, ValidationError) as e:
return False, e

# make sure that we have valid combination of "action_breakdowns" and "breakdowns" parameters
for stream in self.get_custom_insights_streams(api, config):
try:
stream.check_breakdowns()
except facebook_business.exceptions.FacebookRequestError as e:
return False, e._api_error_message
return True, None

def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
Expand All @@ -89,19 +92,21 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
config.start_date = validate_start_date(config.start_date)
config.end_date = validate_end_date(config.start_date, config.end_date)

api = API(account_id=config.account_id, access_token=config.credentials.access_token)
api = API(access_token=config.credentials.access_token)

insights_args = dict(
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window
api=api, start_date=config.start_date, end_date=config.end_date, insights_lookback_window=config.insights_lookback_window, account_ids=config.account_ids,
)
streams = [
AdAccount(
api=api,
account_ids=config.account_ids,
source_defined_primary_key=["account_id"],
),
AdSets(
api=api,
start_date=config.start_date,
account_ids=config.account_ids,
end_date=config.end_date,
include_deleted=config.include_deleted,
page_size=config.page_size,
Expand All @@ -111,6 +116,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
Ads(
api=api,
start_date=config.start_date,
account_ids=config.account_ids,
end_date=config.end_date,
include_deleted=config.include_deleted,
page_size=config.page_size,
Expand All @@ -120,6 +126,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
AdCreatives(
api=api,
fetch_thumbnail_images=config.fetch_thumbnail_images,
account_ids=config.account_ids,
page_size=config.page_size,
max_batch_size=config.max_batch_size,
source_defined_primary_key=["id"],
Expand All @@ -133,6 +140,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
AdsInsightsActionType(page_size=config.page_size, max_batch_size=config.max_batch_size, source_defined_primary_key=["id"], **insights_args),
Campaigns(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand All @@ -142,13 +150,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
),
CustomConversions(
api=api,
account_ids=config.account_ids,
include_deleted=config.include_deleted,
page_size=config.page_size,
max_batch_size=config.max_batch_size,
source_defined_primary_key=["id"],
),
Images(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand All @@ -158,6 +168,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
),
Videos(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand All @@ -167,6 +178,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
),
Activities(
api=api,
account_ids=config.account_ids,
start_date=config.start_date,
end_date=config.end_date,
include_deleted=config.include_deleted,
Expand Down Expand Up @@ -206,6 +218,7 @@ def get_custom_insights_streams(self, api: API, config: ConnectorConfig) -> List
raise ValueError(mes)
stream = AdsInsights(
api=api,
account_ids=config.account_ids,
name=f"Custom{insight.name}",
fields=list(insight_fields),
breakdowns=list(set(insight.breakdowns)),
Expand Down
12 changes: 7 additions & 5 deletions source-facebook-marketing/source_facebook_marketing/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
ValidActionBreakdowns = Enum("ValidActionBreakdowns", AdsInsights.ActionBreakdowns.__dict__)
DATE_TIME_PATTERN = "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$"
EMPTY_PATTERN = "^$"
ACCOUNT_IDS_PATTERN = r"^\d[\d,]+$"


class InsightConfig(BaseModel):
Expand Down Expand Up @@ -131,15 +132,16 @@ class Config:
def schema_extra(schema: Dict[str, Any], model: Type["ConnectorConfig"]) -> None:
schema["properties"]["end_date"].pop("format")

account_id: str = Field(
title="Account ID",
account_ids: str = Field(
title="Account IDs",
order=0,
description=(
"The Facebook Ad account ID to use when pulling data from the Facebook Marketing API."
" Open your Meta Ads Manager. The Ad account ID number is in the account dropdown menu or in your browser's address bar. "
"A comma separated list of the Facebook Ad account ID(s) to use when pulling data from the Facebook Marketing API."
"Open your Meta Ads Manager. The Ad account ID number is in the account dropdown menu or in your browser's address bar. "
'See the <a href="https://www.facebook.com/business/help/1492627900875762">docs</a> for more information.'
),
examples=["111111111111111"],
pattern=ACCOUNT_IDS_PATTERN,
examples=["111111111111111","222222222222222,333333333333333"],
)

start_date: datetime = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ class InsightAsyncJobManager:
# limit is not reliable indicator of async workload capability we still have to use this parameter.
MAX_JOBS_IN_QUEUE = 100

def __init__(self, api: "API", jobs: Iterator[AsyncJob]):
def __init__(self, api: "API", jobs: Iterator[AsyncJob], account_id: str):
"""Init
:param api:
:param jobs:
"""
self._api = api
self._account_id = account_id
self._jobs = iter(jobs)
self._running_jobs = []

Expand Down Expand Up @@ -147,4 +148,4 @@ def _update_api_throttle_limit(self):
respond with empty list of data so api use "x-fb-ads-insights-throttle"
header to update current insights throttle limit.
"""
self._api.account.get_insights()
self._api.get_account(account_id=self._account_id).get_insights()
Loading

0 comments on commit 9930d28

Please sign in to comment.