Skip to content

Commit

Permalink
source-google-analytics-data-api: add multiple property ids
Browse files Browse the repository at this point in the history
  • Loading branch information
Luishfs committed Oct 30, 2024
1 parent 642ad33 commit dae9ad6
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 62 deletions.
7 changes: 4 additions & 3 deletions source-google-analytics-data-api/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ credentials:
client_secret_sops: ENC[AES256_GCM,data:lTaxOBRDGT1FzNcwhEjPZzB1UJhOdjpFsWviEEMfXkkCdd4=,iv:SzIH9vkXctgh9EO/NAeUfPgZBWRv/0QQyBoVG+b6AQE=,tag:OYcbeln0r4QQhhZWndWuwg==,type:str]
refresh_token_sops: ENC[AES256_GCM,data:S4bW08hyZuMnB+VIExQHon5cAED+CKUw015ZLjw1NGKHwJqYNuhMwhEDwS0Vr4TCq/a5/eJa7vVbJldGZbLvo8b3aJtOSXLmylOt66Gq4TZYXoE1DVAQqIJMAlQg/fO59UsoSXkt2g==,iv:GmgFm8/zEs2/EX8nipgJ65DO2JjGqpqUqTuxbzEpnF4=,tag:1oXaulNxwVcc7VDTn+LwiA==,type:str]
date_ranges_start_date: "2024-04-01"
property_id: "431462875"
property_ids:
- "431462875"
window_in_days: 1
sops:
kms: []
Expand All @@ -16,8 +17,8 @@ sops:
azure_kv: []
hc_vault: []
age: []
lastmodified: "2024-04-08T20:17:08Z"
mac: ENC[AES256_GCM,data:4cGuunX3pZZu90iOAYn70+BijSebkzHogwGsb0tmCmkSSuDUTecS76lwUtWoEzG3mUpjtA5QdkTlcuG699tL7anjVQp/nm6UONaQM1YKHgGpYjZsiYuZaOrD92v8LKhEIbNxzlU+se0/ahRUKrQzYeSLX4Juprr50H4SQG6PZ4A=,iv:oO6vY8QqzU17j2Ou6DAiiZTSFu4fCedFMRpOyAFfLs4=,tag:wrD0ttB5AoNRg0hvVn2itw==,type:str]
lastmodified: "2024-10-28T17:05:38Z"
mac: ENC[AES256_GCM,data:eRNatGgoun98ysvSSMINp6vAPL+S6R8npa6ae9fup2BmC2VPcyOl2pP4FBDz6geIgXOT57fggcWPh/Bdb/Ft4ugH78CIa4WHBWl2/jfhtwTWl8sCetJa18EGHqaqzhkvZIX7Ak26pXMCdv817fm0k30ZLlbeyOUU3eexmD7DpTQ=,iv:QnIpctu6M4AvZ44DGcKfxxO6QWKrwiJffK37ze7aW98=,tag:Y3eYu+S9oBmOXF+QDmA+Fw==,type:str]
pgp: []
encrypted_suffix: _sops
version: 3.8.1
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import logging
from typing import Any, List, Mapping

from airbyte_cdk.config_observation import create_connector_config_control_message
from airbyte_cdk.entrypoint import AirbyteEntrypoint
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.message import InMemoryMessageRepository, MessageRepository

logger = logging.getLogger("airbyte_logger")


class MigrateCustomReports:
"""
This class stands for migrating the config at runtime,
while providing the backward compatibility when falling back to the previous source version.
Specifically, starting from `1.3.0`, the `property_id` property should be like :
> List(["<property_id 1>", "<property_id 2>", ..., "<property_id n>"])
instead of, in `1.2.0`:
> JSON STR: "<property_id>"
"""

message_repository: MessageRepository = InMemoryMessageRepository()
migrate_from_key: str = "property_id"
migrate_to_key: str = "property_ids"

@classmethod
def should_migrate(cls, config: Mapping[str, Any]) -> bool:
"""
This method determines whether config require migration.
Returns:
> True, if the transformation is neccessary
> False, otherwise.
"""
if cls.migrate_from_key in config:
return True
return False

@classmethod
def transform_to_array(cls, config: Mapping[str, Any], source: Source = None) -> Mapping[str, Any]:
# assign old values to new property that will be used within the new version
config[cls.migrate_to_key] = config[cls.migrate_to_key] if cls.migrate_to_key in config else []
data = config.pop(cls.migrate_from_key)
if data not in config[cls.migrate_to_key]:
config[cls.migrate_to_key].append(data)
return config

@classmethod
def modify_and_save(cls, config_path: str, source: Source, config: Mapping[str, Any]) -> Mapping[str, Any]:
# modify the config
migrated_config = cls.transform_to_array(config, source)
# save the config
source.write_config(migrated_config, config_path)
# return modified config
return migrated_config

@classmethod
def emit_control_message(cls, migrated_config: Mapping[str, Any]) -> None:
# add the Airbyte Control Message to message repo
cls.message_repository.emit_message(create_connector_config_control_message(migrated_config))
# emit the Airbyte Control Message from message queue to stdout
for message in cls.message_repository._message_queue:
print(message.json(exclude_unset=True))

@classmethod
def migrate(cls, args: List[str], source: Source) -> None:
"""
This method checks the input args, should the config be migrated,
transform if neccessary and emit the CONTROL message.
"""
# get config path
config_path = AirbyteEntrypoint(source).extract_config(args)
# proceed only if `--config` arg is provided
if config_path:
# read the existing config
config = source.read_config(config_path)
# migration check
if cls.should_migrate(config):
cls.emit_control_message(
cls.modify_and_save(config_path, source, config),
)
Original file line number Diff line number Diff line change
Expand Up @@ -441,63 +441,77 @@ def get_authenticator(self, config: Mapping[str, Any]):
return authenticator_class(**get_credentials(credentials))

def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]:
reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json"))
try:
config = self._validate_and_transform(config, report_names={r["name"] for r in reports})
except ConfigurationError as e:
return False, str(e)
config["authenticator"] = self.get_authenticator(config)
for property_id in config["property_ids"]:
reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json"))
try:
config = self._validate_and_transform(config, report_names={r["name"] for r in reports})
except ConfigurationError as e:
return False, str(e)
config["authenticator"] = self.get_authenticator(config)

metadata = None
try:
stream = GoogleAnalyticsDataApiMetadataStream(config=config, authenticator=config["authenticator"])
metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
except HTTPError as e:
error_list = [HTTPStatus.BAD_REQUEST, HTTPStatus.FORBIDDEN]
if e.response.status_code in error_list:
internal_message = f"Incorrect Property ID: {config['property_id']}"
property_id_docs_url = (
"https://developers.google.com/analytics/devguides/reporting/data/v1/property-id#what_is_my_property_id"
)
message = f"Access was denied to the property ID entered. Check your access to the Property ID or use Google Analytics {property_id_docs_url} to find your Property ID."

wrong_property_id_error = AirbyteTracedException(
message=message, internal_message=internal_message, failure_type=FailureType.config_error
)
raise wrong_property_id_error

if not metadata:
return False, "failed to get metadata, over quota, try later"

dimensions = {d["apiName"] for d in metadata["dimensions"]}
metrics = {d["apiName"] for d in metadata["metrics"]}

for report in config["custom_reports"]:
invalid_dimensions = set(report["dimensions"]) - dimensions
if invalid_dimensions:
invalid_dimensions = ", ".join(invalid_dimensions)
return False, WRONG_DIMENSIONS.format(fields=invalid_dimensions, report_name=report["name"])
invalid_metrics = set(report["metrics"]) - metrics
if invalid_metrics:
invalid_metrics = ", ".join(invalid_metrics)
return False, WRONG_METRICS.format(fields=invalid_metrics, report_name=report["name"])
report_stream = self.instantiate_report_class(report, config)
# check if custom_report dimensions + metrics can be combined and report generated
stream_slice = next(report_stream.stream_slices(sync_mode=SyncMode.full_refresh))
next(report_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), None)
return True, None
_config = config.copy()
_config["property_id"] = property_id

metadata = None
try:
# explicitly setting small page size for the check operation not to cause OOM issues
stream = GoogleAnalyticsDataApiMetadataStream(config=_config, authenticator=_config["authenticator"])
metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None)
except HTTPError as e:
error_list = [HTTPStatus.BAD_REQUEST, HTTPStatus.FORBIDDEN]
if e.response.status_code in error_list:
internal_message = f"Incorrect Property ID: {property_id}"
property_id_docs_url = (
"https://developers.google.com/analytics/devguides/reporting/data/v1/property-id#what_is_my_property_id"
)
message = f"Access was denied to the property ID entered. Check your access to the Property ID or use Google Analytics {property_id_docs_url} to find your Property ID."

wrong_property_id_error = AirbyteTracedException(
message=message, internal_message=internal_message, failure_type=FailureType.config_error
)
raise wrong_property_id_error

if not metadata:
return False, "Failed to get metadata, over quota, try later"

dimensions = {d["apiName"] for d in metadata["dimensions"]}
metrics = {d["apiName"] for d in metadata["metrics"]}

for report in _config["custom_reports"]:
# Check if custom report dimensions supported. Compare them with dimensions provided by GA API
invalid_dimensions = set(report["dimensions"]) - dimensions
if invalid_dimensions:
invalid_dimensions = ", ".join(invalid_dimensions)
return False, WRONG_DIMENSIONS.format(fields=invalid_dimensions, report_name=report["name"])

# Check if custom report metrics supported. Compare them with metrics provided by GA API
invalid_metrics = set(report["metrics"]) - metrics
if invalid_metrics:
invalid_metrics = ", ".join(invalid_metrics)
return False, WRONG_METRICS.format(fields=invalid_metrics, report_name=report["name"])

report_stream = self.instantiate_report_class(report, _config, page_size=100)
# check if custom_report dimensions + metrics can be combined and report generated
stream_slice = next(report_stream.stream_slices(sync_mode=SyncMode.full_refresh))
next(report_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), None)

return True, None

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json"))
config = self._validate_and_transform(config, report_names={r["name"] for r in reports})
config["authenticator"] = self.get_authenticator(config)
return [self.instantiate_report_class(report, config) for report in reports + config["custom_reports"]]
return [stream for report in reports + config["custom_reports"] for stream in self.instantiate_report_streams(report, config)]

@staticmethod
def instantiate_report_class(report: dict, config: Mapping[str, Any]) -> GoogleAnalyticsDataApiBaseStream:
def instantiate_report_streams(self, report: dict, config: Mapping[str, Any], **extra_kwargs) -> GoogleAnalyticsDataApiBaseStream:
for property_id in config["property_ids"]:
yield self.instantiate_report_class(report=report, config={**config, "property_id": property_id})

def instantiate_report_class(self, report: dict, config: Mapping[str, Any], **extra_kwargs) -> GoogleAnalyticsDataApiBaseStream:
cohort_spec = report.get("cohortSpec")
pivots = report.get("pivots")
stream_config = {
**config,
"metrics": report["metrics"],
"dimensions": report["dimensions"],
**config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"title": "Google Analytics Data API Spec",
"type": "object",
"required": [
"property_id",
"property_ids",
"date_ranges_start_date",
"credentials"
],
Expand Down Expand Up @@ -109,12 +109,15 @@
"order": 4,
"type": "integer"
},
"property_id": {
"property_ids": {
"description": "A Google Analytics GA4 property identifier whose events are tracked. Specified in the URL path and not the body such as \"123...\". See <a href=\"https://developers.google.com/analytics/devguides/reporting/data/v1/property-id#what_is_my_property_id\">the docs</a> for more details.",
"order": 1,
"pattern": "^[0-9]*$",
"title": "Property ID",
"type": "string"
"type": "array",
"items": {
"type": "string",
"pattern": "^[0-9]*$"
},
"examples": [["1738294", "5729978930"]]
},
"date_ranges_start_date": {
"description": "The start date from which to replicate report data in the format YYYY-MM-DD. Data generated before this date will not be included in the report. Not applied to custom Cohort reports.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,20 @@
"title": "Start Date",
"type": "string"
},
"property_id": {
"property_ids": {
"description": "A Google Analytics GA4 property identifier whose events are tracked. Specified in the URL path and not the body such as \"123...\". See <a href=\"https://developers.google.com/analytics/devguides/reporting/data/v1/property-id#what_is_my_property_id\">the docs</a> for more details.",
"examples": [
[
"1738294",
"5729978930"
]
],
"items": {
"pattern": "^[0-9]*$",
"type": "string"
},
"order": 1,
"pattern": "^[0-9]*$",
"title": "Property ID",
"type": "string"
"type": "array"
},
"window_in_days": {
"default": 1,
Expand Down
6 changes: 3 additions & 3 deletions source-google-analytics-data-api/tests/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
def patch_base_class():
return {
"config": {
"property_id": "108176369",
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"),
}
Expand All @@ -43,7 +43,7 @@ def patch_base_class():
@pytest.fixture
def config():
return {
"property_id": "108176369",
"property_ids": ["108176369"],
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"),
"custom_reports": json.dumps([{
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_check(requests_mock, config_gen, config_values, is_successful, message)
assert source.check(logger, config_gen(**config_values)) == AirbyteConnectionStatus(status=is_successful, message=message)
if not is_successful:
with pytest.raises(AirbyteTracedException) as e:
source.check(logger, config_gen(property_id="UA-11111111"))
source.check(logger, config_gen(property_ids=["UA-11111111"]))
assert e.value.failure_type == FailureType.config_error


Expand Down
1 change: 1 addition & 0 deletions source-google-analytics-data-api/tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def patch_base_class(mocker):

return {
"config": {
"property_ids": ["496180525"],
"property_id": "496180525",
"credentials": {"auth_type": "Service", "credentials_json": json_credentials},
"dimensions": ["date", "deviceCategory", "operatingSystem", "browser"],
Expand Down

0 comments on commit dae9ad6

Please sign in to comment.