Skip to content

Commit

Permalink
Enable primary_key for custom queries in source-google-ads
Browse files Browse the repository at this point in the history
We have a customer who is trying to add custom queries to the source-google-ads connector. The errors was caused by the lack of primary keys for the CustomQueryMixin stream, this PR enables it.
  • Loading branch information
Z33DD committed Mar 19, 2024
1 parent f395ab4 commit 29956f0
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 43 deletions.
57 changes: 44 additions & 13 deletions source-google-ads/source_google_ads/custom_query_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ def __init__(self, config, **kwargs):
@property
def primary_key(self) -> str:
"""
The primary_key option is disabled. Config should not provide the primary key.
The primary_key option is disabled. Config should not
provide the primary key.
It will be ignored if provided.
If you need to enable it, uncomment the next line instead of `return None` and modify your config
If you need to enable it, uncomment the next line instead of
`return None` and modify your config
"""
# return self.config.get("primary_key") or None
return None
return self.config.get("primary_key") or None
# return None

@property
def name(self):
Expand All @@ -43,7 +45,18 @@ def get_json_schema(self) -> Dict[str, Any]:
"properties": {},
"additionalProperties": True,
}
# full list {'ENUM', 'STRING', 'DATE', 'DOUBLE', 'RESOURCE_NAME', 'INT32', 'INT64', 'BOOLEAN', 'MESSAGE'}
# full list
# {
# "ENUM",
# "STRING",
# "DATE",
# "DOUBLE",
# "RESOURCE_NAME",
# "INT32",
# "INT64",
# "BOOLEAN",
# "MESSAGE",
# }

google_datatype_mapping = {
"INT64": "integer",
Expand All @@ -60,12 +73,19 @@ def get_json_schema(self) -> Dict[str, Any]:

for field in fields:
node = google_schema.get(field)
# Data type return in enum format: "GoogleAdsFieldDataType.<data_type>"
# Data type return in enum format:
# "GoogleAdsFieldDataType.<data_type>"
google_data_type = node.data_type.name
if google_data_type == "ENUM":
field_value = {"type": "string", "enum": list(node.enum_values)}
field_value = {
"type": "string",
"enum": list(node.enum_values),
}
if node.is_repeated:
field_value = {"type": ["null", "array"], "items": field_value}
field_value = {
"type": ["null", "array"],
"items": field_value,
}
elif google_data_type == "MESSAGE":
# Represents protobuf message and could be anything, set custom
# attribute "protobuf_message" to convert it to a string (or
Expand All @@ -77,7 +97,10 @@ def get_json_schema(self) -> Dict[str, Any]:
output_type = ["string", "null"]
field_value = {"type": output_type, "protobuf_message": True}
else:
output_type = [google_datatype_mapping.get(google_data_type, "string"), "null"]
output_type = [
google_datatype_mapping.get(google_data_type, "string"),
"null",
]
field_value = {"type": output_type}
if google_data_type == "DATE":
field_value["format"] = "date"
Expand All @@ -89,14 +112,22 @@ def get_json_schema(self) -> Dict[str, Any]:

class IncrementalCustomQuery(CustomQueryMixin, IncrementalGoogleAdsStream):
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
start_date, end_date = stream_slice["start_date"], stream_slice["end_date"]
query = self.insert_segments_date_expr(self.config["query"], start_date, end_date)
start_date = stream_slice["start_date"]
end_date = stream_slice["end_date"]
query = self.insert_segments_date_expr(
self.config["query"], start_date, end_date
)
return str(query)

@staticmethod
def insert_segments_date_expr(query: GAQL, start_date: str, end_date: str) -> GAQL:
def insert_segments_date_expr(
query: GAQL,
start_date: str,
end_date: str,
) -> GAQL:
"""
Insert segments.date condition to break query into slices for incremental stream.
Insert segments.date condition to break query into
slices for incremental stream.
:param query Origin user defined query
:param start_date start date for metric (inclusive)
:param end_date end date for metric (inclusive)
Expand Down
106 changes: 76 additions & 30 deletions source-google-ads/tests/test_custom_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,42 @@

from unittest.mock import MagicMock

from source_google_ads.custom_query_stream import CustomQueryMixin, IncrementalCustomQuery
from source_google_ads.custom_query_stream import (
CustomQueryMixin,
IncrementalCustomQuery,
)
from source_google_ads.utils import GAQL

TERMS = [
"ad_group.resource_name",
"ad_group.status",
"ad_group.target_cpa_micros",
"ad_group.target_cpm_micros",
"ad_group.target_roas",
"ad_group.targeting_setting.target_restrictions",
"ad_group.tracking_url_template",
"ad_group.type",
"ad_group.url_custom_parameters",
"campaign.accessible_bidding_strategy",
"campaign.ad_serving_optimization_status",
"campaign.advertising_channel_type",
"campaign.advertising_channel_sub_type",
"campaign.app_campaign_setting.app_id",
"campaign.app_campaign_setting.app_store",
]


def test_custom_query():
input_q = """SELECT ad_group.resource_name, ad_group.status, ad_group.target_cpa_micros, ad_group.target_cpm_micros,
ad_group.target_roas, ad_group.targeting_setting.target_restrictions, ad_group.tracking_url_template, ad_group.type,
ad_group.url_custom_parameters, campaign.accessible_bidding_strategy, campaign.ad_serving_optimization_status,
campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.app_campaign_setting.app_id,
campaign.app_campaign_setting.app_store FROM search_term_view"""
output_q = IncrementalCustomQuery.insert_segments_date_expr(GAQL.parse(input_q), "1980-01-01", "1980-01-01")
assert (
str(output_q)
== """SELECT ad_group.resource_name, ad_group.status, ad_group.target_cpa_micros, ad_group.target_cpm_micros, ad_group.target_roas, ad_group.targeting_setting.target_restrictions, ad_group.tracking_url_template, ad_group.type, ad_group.url_custom_parameters, campaign.accessible_bidding_strategy, campaign.ad_serving_optimization_status, campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.app_campaign_setting.app_id, campaign.app_campaign_setting.app_store, segments.date FROM search_term_view WHERE segments.date BETWEEN '1980-01-01' AND '1980-01-01'"""
input_q = f"SELECT {', '.join(TERMS)} FROM search_term_view"
output_q = IncrementalCustomQuery.insert_segments_date_expr(
GAQL.parse(input_q), "1980-01-01", "1980-01-01"
)
expected = (
f"SELECT {', '.join(TERMS + ['segments.date'])} "
"FROM search_term_view "
"WHERE segments.date BETWEEN '1980-01-01' AND '1980-01-01'"
)
assert str(output_q) == expected


class Obj:
Expand All @@ -27,29 +48,54 @@ def __init__(self, **entries):


def test_get_json_schema():
query_object = MagicMock(return_value={
'a': Obj(data_type=Obj(name='ENUM'), is_repeated=False, enum_values=['a', 'aa']),
'b': Obj(data_type=Obj(name='ENUM'), is_repeated=True, enum_values=['b', 'bb']),
'c': Obj(data_type=Obj(name='MESSAGE'), is_repeated=False),
'd': Obj(data_type=Obj(name='MESSAGE'), is_repeated=True),
'e': Obj(data_type=Obj(name='STRING')),
'f': Obj(data_type=Obj(name='DATE')),
})
instance = CustomQueryMixin(config={'query': Obj(fields=['a', 'b', 'c', 'd', 'e', 'f'])})
query_object = MagicMock(
return_value={
"a": Obj(
data_type=Obj(name="ENUM"),
is_repeated=False,
enum_values=["a", "aa"],
),
"b": Obj(
data_type=Obj(name="ENUM"),
is_repeated=True,
enum_values=["b", "bb"],
),
"c": Obj(
data_type=Obj(name="MESSAGE"),
is_repeated=False,
),
"d": Obj(
data_type=Obj(name="MESSAGE"),
is_repeated=True,
),
"e": Obj(data_type=Obj(name="STRING")),
"f": Obj(data_type=Obj(name="DATE")),
}
)
instance = CustomQueryMixin(
config={
"query": Obj(
fields=["a", "b", "c", "d", "e", "f"],
)
}
)
instance.cursor_field = None
instance.google_ads_client = Obj(get_fields_metadata=query_object)
schema = instance.get_json_schema()

assert schema == {
'$schema': 'http://json-schema.org/draft-07/schema#',
'additionalProperties': True,
'type': 'object',
'properties': {
'a': {'type': 'string', 'enum': ['a', 'aa']},
'b': {'type': ['null', 'array'], 'items': {'type': 'string', 'enum': ['b', 'bb']}},
'c': {'type': ['string', 'null'], 'protobuf_message': True},
'd': {'type': ['array', 'null'], 'protobuf_message': True},
'e': {'type': ['string', 'null']},
'f': {'type': ['string', 'null'], 'format': 'date'},
}
"$schema": "http://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"a": {"type": "string", "enum": ["a", "aa"]},
"b": {
"type": ["null", "array"],
"items": {"type": "string", "enum": ["b", "bb"]},
},
"c": {"type": ["string", "null"], "protobuf_message": True},
"d": {"type": ["array", "null"], "protobuf_message": True},
"e": {"type": ["string", "null"]},
"f": {"type": ["string", "null"], "format": "date"},
},
}

0 comments on commit 29956f0

Please sign in to comment.