From 29956f00a035321712bbc7b108dc8b519da87fa6 Mon Sep 17 00:00:00 2001 From: Pedro Date: Tue, 19 Mar 2024 16:07:17 -0300 Subject: [PATCH] Enable primary_key for custom queries in source-google-ads We have a customer who is trying to add custom queries to the source-google-ads connector. The errors was caused by the lack of primary keys for the CustomQueryMixin stream, this PR enables it. --- .../source_google_ads/custom_query_stream.py | 57 +++++++--- source-google-ads/tests/test_custom_query.py | 106 +++++++++++++----- 2 files changed, 120 insertions(+), 43 deletions(-) diff --git a/source-google-ads/source_google_ads/custom_query_stream.py b/source-google-ads/source_google_ads/custom_query_stream.py index 2cf6066c48..7603429e30 100644 --- a/source-google-ads/source_google_ads/custom_query_stream.py +++ b/source-google-ads/source_google_ads/custom_query_stream.py @@ -17,12 +17,14 @@ def __init__(self, config, **kwargs): @property def primary_key(self) -> str: """ - The primary_key option is disabled. Config should not provide the primary key. + The primary_key option is disabled. Config should not + provide the primary key. It will be ignored if provided. - If you need to enable it, uncomment the next line instead of `return None` and modify your config + If you need to enable it, uncomment the next line instead of + `return None` and modify your config """ - # return self.config.get("primary_key") or None - return None + return self.config.get("primary_key") or None + # return None @property def name(self): @@ -43,7 +45,18 @@ def get_json_schema(self) -> Dict[str, Any]: "properties": {}, "additionalProperties": True, } - # full list {'ENUM', 'STRING', 'DATE', 'DOUBLE', 'RESOURCE_NAME', 'INT32', 'INT64', 'BOOLEAN', 'MESSAGE'} + # full list + # { + # "ENUM", + # "STRING", + # "DATE", + # "DOUBLE", + # "RESOURCE_NAME", + # "INT32", + # "INT64", + # "BOOLEAN", + # "MESSAGE", + # } google_datatype_mapping = { "INT64": "integer", @@ -60,12 +73,19 @@ def get_json_schema(self) -> Dict[str, Any]: for field in fields: node = google_schema.get(field) - # Data type return in enum format: "GoogleAdsFieldDataType." + # Data type return in enum format: + # "GoogleAdsFieldDataType." google_data_type = node.data_type.name if google_data_type == "ENUM": - field_value = {"type": "string", "enum": list(node.enum_values)} + field_value = { + "type": "string", + "enum": list(node.enum_values), + } if node.is_repeated: - field_value = {"type": ["null", "array"], "items": field_value} + field_value = { + "type": ["null", "array"], + "items": field_value, + } elif google_data_type == "MESSAGE": # Represents protobuf message and could be anything, set custom # attribute "protobuf_message" to convert it to a string (or @@ -77,7 +97,10 @@ def get_json_schema(self) -> Dict[str, Any]: output_type = ["string", "null"] field_value = {"type": output_type, "protobuf_message": True} else: - output_type = [google_datatype_mapping.get(google_data_type, "string"), "null"] + output_type = [ + google_datatype_mapping.get(google_data_type, "string"), + "null", + ] field_value = {"type": output_type} if google_data_type == "DATE": field_value["format"] = "date" @@ -89,14 +112,22 @@ def get_json_schema(self) -> Dict[str, Any]: class IncrementalCustomQuery(CustomQueryMixin, IncrementalGoogleAdsStream): def get_query(self, stream_slice: Mapping[str, Any] = None) -> str: - start_date, end_date = stream_slice["start_date"], stream_slice["end_date"] - query = self.insert_segments_date_expr(self.config["query"], start_date, end_date) + start_date = stream_slice["start_date"] + end_date = stream_slice["end_date"] + query = self.insert_segments_date_expr( + self.config["query"], start_date, end_date + ) return str(query) @staticmethod - def insert_segments_date_expr(query: GAQL, start_date: str, end_date: str) -> GAQL: + def insert_segments_date_expr( + query: GAQL, + start_date: str, + end_date: str, + ) -> GAQL: """ - Insert segments.date condition to break query into slices for incremental stream. + Insert segments.date condition to break query into + slices for incremental stream. :param query Origin user defined query :param start_date start date for metric (inclusive) :param end_date end date for metric (inclusive) diff --git a/source-google-ads/tests/test_custom_query.py b/source-google-ads/tests/test_custom_query.py index dc5c1f086a..98f3dba5c0 100644 --- a/source-google-ads/tests/test_custom_query.py +++ b/source-google-ads/tests/test_custom_query.py @@ -4,21 +4,42 @@ from unittest.mock import MagicMock -from source_google_ads.custom_query_stream import CustomQueryMixin, IncrementalCustomQuery +from source_google_ads.custom_query_stream import ( + CustomQueryMixin, + IncrementalCustomQuery, +) from source_google_ads.utils import GAQL +TERMS = [ + "ad_group.resource_name", + "ad_group.status", + "ad_group.target_cpa_micros", + "ad_group.target_cpm_micros", + "ad_group.target_roas", + "ad_group.targeting_setting.target_restrictions", + "ad_group.tracking_url_template", + "ad_group.type", + "ad_group.url_custom_parameters", + "campaign.accessible_bidding_strategy", + "campaign.ad_serving_optimization_status", + "campaign.advertising_channel_type", + "campaign.advertising_channel_sub_type", + "campaign.app_campaign_setting.app_id", + "campaign.app_campaign_setting.app_store", +] + def test_custom_query(): - input_q = """SELECT ad_group.resource_name, ad_group.status, ad_group.target_cpa_micros, ad_group.target_cpm_micros, - ad_group.target_roas, ad_group.targeting_setting.target_restrictions, ad_group.tracking_url_template, ad_group.type, - ad_group.url_custom_parameters, campaign.accessible_bidding_strategy, campaign.ad_serving_optimization_status, - campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.app_campaign_setting.app_id, - campaign.app_campaign_setting.app_store FROM search_term_view""" - output_q = IncrementalCustomQuery.insert_segments_date_expr(GAQL.parse(input_q), "1980-01-01", "1980-01-01") - assert ( - str(output_q) - == """SELECT ad_group.resource_name, ad_group.status, ad_group.target_cpa_micros, ad_group.target_cpm_micros, ad_group.target_roas, ad_group.targeting_setting.target_restrictions, ad_group.tracking_url_template, ad_group.type, ad_group.url_custom_parameters, campaign.accessible_bidding_strategy, campaign.ad_serving_optimization_status, campaign.advertising_channel_type, campaign.advertising_channel_sub_type, campaign.app_campaign_setting.app_id, campaign.app_campaign_setting.app_store, segments.date FROM search_term_view WHERE segments.date BETWEEN '1980-01-01' AND '1980-01-01'""" + input_q = f"SELECT {', '.join(TERMS)} FROM search_term_view" + output_q = IncrementalCustomQuery.insert_segments_date_expr( + GAQL.parse(input_q), "1980-01-01", "1980-01-01" + ) + expected = ( + f"SELECT {', '.join(TERMS + ['segments.date'])} " + "FROM search_term_view " + "WHERE segments.date BETWEEN '1980-01-01' AND '1980-01-01'" ) + assert str(output_q) == expected class Obj: @@ -27,29 +48,54 @@ def __init__(self, **entries): def test_get_json_schema(): - query_object = MagicMock(return_value={ - 'a': Obj(data_type=Obj(name='ENUM'), is_repeated=False, enum_values=['a', 'aa']), - 'b': Obj(data_type=Obj(name='ENUM'), is_repeated=True, enum_values=['b', 'bb']), - 'c': Obj(data_type=Obj(name='MESSAGE'), is_repeated=False), - 'd': Obj(data_type=Obj(name='MESSAGE'), is_repeated=True), - 'e': Obj(data_type=Obj(name='STRING')), - 'f': Obj(data_type=Obj(name='DATE')), - }) - instance = CustomQueryMixin(config={'query': Obj(fields=['a', 'b', 'c', 'd', 'e', 'f'])}) + query_object = MagicMock( + return_value={ + "a": Obj( + data_type=Obj(name="ENUM"), + is_repeated=False, + enum_values=["a", "aa"], + ), + "b": Obj( + data_type=Obj(name="ENUM"), + is_repeated=True, + enum_values=["b", "bb"], + ), + "c": Obj( + data_type=Obj(name="MESSAGE"), + is_repeated=False, + ), + "d": Obj( + data_type=Obj(name="MESSAGE"), + is_repeated=True, + ), + "e": Obj(data_type=Obj(name="STRING")), + "f": Obj(data_type=Obj(name="DATE")), + } + ) + instance = CustomQueryMixin( + config={ + "query": Obj( + fields=["a", "b", "c", "d", "e", "f"], + ) + } + ) instance.cursor_field = None instance.google_ads_client = Obj(get_fields_metadata=query_object) schema = instance.get_json_schema() assert schema == { - '$schema': 'http://json-schema.org/draft-07/schema#', - 'additionalProperties': True, - 'type': 'object', - 'properties': { - 'a': {'type': 'string', 'enum': ['a', 'aa']}, - 'b': {'type': ['null', 'array'], 'items': {'type': 'string', 'enum': ['b', 'bb']}}, - 'c': {'type': ['string', 'null'], 'protobuf_message': True}, - 'd': {'type': ['array', 'null'], 'protobuf_message': True}, - 'e': {'type': ['string', 'null']}, - 'f': {'type': ['string', 'null'], 'format': 'date'}, - } + "$schema": "http://json-schema.org/draft-07/schema#", + "additionalProperties": True, + "type": "object", + "properties": { + "a": {"type": "string", "enum": ["a", "aa"]}, + "b": { + "type": ["null", "array"], + "items": {"type": "string", "enum": ["b", "bb"]}, + }, + "c": {"type": ["string", "null"], "protobuf_message": True}, + "d": {"type": ["array", "null"], "protobuf_message": True}, + "e": {"type": ["string", "null"]}, + "f": {"type": ["string", "null"], "format": "date"}, + }, }