Skip to content

Commit

Permalink
Merge branch 'main' into ISSUE-19095-s2
Browse files Browse the repository at this point in the history
  • Loading branch information
IceS2 authored Dec 20, 2024
2 parents 2653b33 + 6522e3b commit e946dff
Show file tree
Hide file tree
Showing 259 changed files with 4,656 additions and 2,047 deletions.
6 changes: 3 additions & 3 deletions ingestion/src/metadata/ingestion/api/topology_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,9 @@ def yield_and_update_context(
if not entity:
# Safe access to Entity Request name
raise MissingExpectedEntityAckException(
f"Missing ack back from [{stage.type_.__name__}: {entity_fqn}] - "
"Possible causes are changes in the server Fernet key or mismatched JSON Schemas "
"for the service connection."
f"We are trying to create a [{stage.type_.__name__}] with FQN [{entity_fqn}],"
" but we got no Entity back from the API. Checking for errors in the OpenMetadata Sink could help"
" validate if the Entity was properly created or not."
)

self.context.get().update_context_name(stage=stage, right=right)
Expand Down
24 changes: 19 additions & 5 deletions ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
To be used by OpenMetadata class
"""
import json
from functools import lru_cache
from typing import Optional, Type

Expand Down Expand Up @@ -46,16 +47,29 @@ def email_search_query_es(entity: Type[T]) -> str:
)

@staticmethod
def name_search_query_es(entity: Type[T]) -> str:
def name_search_query_es(entity: Type[T], name: str, from_: int, size: int) -> str:
"""
Allow for more flexible lookup following what the UI is doing when searching users.
We don't want to stick to `q=name:{name}` since in case a user is named `random.user`
but looked as `Random User`, we want to find this match.
Search should only look in name and displayName fields and should not return bots.
"""
query_filter = {
"query": {
"query_string": {
"query": f"{name} AND isBot:false",
"fields": ["name", "displayName"],
"default_operator": "AND",
"fuzziness": "AUTO",
}
}
}

return (
"/search/query?q={name} AND isBot:false&from={from_}&size={size}&index="
+ ES_INDEX_MAP[entity.__name__]
f"""/search/query?query_filter={json.dumps(query_filter)}"""
f"&from={from_}&size={size}&index=" + ES_INDEX_MAP[entity.__name__]
)

def _search_by_email(
Expand Down Expand Up @@ -103,8 +117,8 @@ def _search_by_name(
fields: Optional field list to pass to ES request
"""
if name:
query_string = self.name_search_query_es(entity=entity).format(
name=name, from_=from_count, size=size
query_string = self.name_search_query_es(
entity=entity, name=name, from_=from_count, size=size
)
return self.get_entity_from_es(
entity=entity, query_string=query_string, fields=fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ def fetch_all_org_dashboards(
List[PowerBIDashboard]
"""
try:
response_data = self.client.get(f"/myorg/groups/{group_id}/dashboards")
admin = "admin/" if self.config.useAdminApis else ""
response_data = self.client.get(
f"/myorg/{admin}groups/{group_id}/dashboards"
)
if not response_data:
logger.debug(f"No dashboards found for workspace_id: {group_id}")
return None
response = DashboardsResponse(**response_data)
return response.value
except Exception as exc: # pylint: disable=broad-except
Expand All @@ -142,7 +148,11 @@ def fetch_all_org_reports(self, group_id: str) -> Optional[List[PowerBIReport]]:
List[PowerBIReport]
"""
try:
response_data = self.client.get(f"/myorg/groups/{group_id}/reports")
admin = "admin/" if self.config.useAdminApis else ""
response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/reports")
if not response_data:
logger.debug(f"No reports found for workspace_id: {group_id}")
return None
response = ReportsResponse(**response_data)
return response.value
except Exception as exc: # pylint: disable=broad-except
Expand All @@ -157,7 +167,11 @@ def fetch_all_org_datasets(self, group_id: str) -> Optional[List[Dataset]]:
List[Dataset]
"""
try:
response_data = self.client.get(f"/myorg/groups/{group_id}/datasets")
admin = "admin/" if self.config.useAdminApis else ""
response_data = self.client.get(f"/myorg/{admin}groups/{group_id}/datasets")
if not response_data:
logger.debug(f"No datasets found for workspace_id: {group_id}")
return None
response = DatasetResponse(**response_data)
return response.value
except Exception as exc: # pylint: disable=broad-except
Expand All @@ -174,9 +188,13 @@ def fetch_all_org_tiles(
List[Tile]
"""
try:
admin = "admin/" if self.config.useAdminApis else ""
response_data = self.client.get(
f"/myorg/groups/{group_id}/dashboards/{dashboard_id}/tiles"
f"/myorg/{admin}dashboards/{dashboard_id}/tiles"
)
if not response_data:
logger.debug(f"No dashboard tiles found for workspace_id: {group_id}")
return None
response = TilesResponse(**response_data)
return response.value
except Exception as exc: # pylint: disable=broad-except
Expand Down
209 changes: 74 additions & 135 deletions ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,6 @@ def __init__(
self.datamodel_file_mappings = []

def prepare(self):
if self.service_connection.useAdminApis:
groups = self.get_admin_workspace_data()
else:
groups = self.get_org_workspace_data()
if groups:
self.workspace_data = self.get_filtered_workspaces(groups)
return super().prepare()

def close(self):
Expand All @@ -123,91 +117,6 @@ def get_filtered_workspaces(self, groups: List[Group]) -> List[Group]:
filtered_groups.append(group)
return filtered_groups

def get_org_workspace_data(self) -> Optional[List[Group]]:
"""
fetch all the group workspace ids
"""
groups = self.client.api_client.fetch_all_workspaces()
for group in groups:
# add the dashboards to the groups
group.dashboards.extend(
self.client.api_client.fetch_all_org_dashboards(group_id=group.id) or []
)
for dashboard in group.dashboards:
# add the tiles to the dashboards
dashboard.tiles.extend(
self.client.api_client.fetch_all_org_tiles(
group_id=group.id, dashboard_id=dashboard.id
)
or []
)

# add the reports to the groups
group.reports.extend(
self.client.api_client.fetch_all_org_reports(group_id=group.id) or []
)

# add the datasets to the groups
group.datasets.extend(
self.client.api_client.fetch_all_org_datasets(group_id=group.id) or []
)
for dataset in group.datasets:
# add the tables to the datasets
dataset.tables.extend(
self.client.api_client.fetch_dataset_tables(
group_id=group.id, dataset_id=dataset.id
)
or []
)
return groups

def get_admin_workspace_data(self) -> Optional[List[Group]]:
"""
fetch all the workspace ids
"""
groups = []
workspaces = self.client.api_client.fetch_all_workspaces()
if workspaces:
workspace_id_list = [workspace.id for workspace in workspaces]

# Start the scan of the available workspaces for dashboard metadata
workspace_paginated_list = [
workspace_id_list[i : i + self.pagination_entity_per_page]
for i in range(
0, len(workspace_id_list), self.pagination_entity_per_page
)
]
count = 1
for workspace_ids_chunk in workspace_paginated_list:
logger.info(
f"Scanning {count}/{len(workspace_paginated_list)} set of workspaces"
)
workspace_scan = self.client.api_client.initiate_workspace_scan(
workspace_ids_chunk
)

# Keep polling the scan status endpoint to check if scan is succeeded
workspace_scan_status = self.client.api_client.wait_for_scan_complete(
scan_id=workspace_scan.id
)
if workspace_scan_status:
response = self.client.api_client.fetch_workspace_scan_result(
scan_id=workspace_scan.id
)
groups.extend(
[
active_workspace
for active_workspace in response.workspaces
if active_workspace.state == "Active"
]
)
else:
logger.error("Error in fetching dashboards and charts")
count += 1
else:
logger.error("Unable to fetch any PowerBI workspaces")
return groups or None

@classmethod
def create(
cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None
Expand All @@ -220,12 +129,51 @@ def create(
)
return cls(config, metadata)

def prepare_workspace_data(self, workspace: Group):
"""prepare one workspace data at a time"""
# add the dashboards to the groups
workspace.dashboards.extend(
self.client.api_client.fetch_all_org_dashboards(group_id=workspace.id) or []
)
for dashboard in workspace.dashboards:
# add the tiles to the dashboards
dashboard.tiles.extend(
self.client.api_client.fetch_all_org_tiles(
group_id=workspace.id, dashboard_id=dashboard.id
)
or []
)

# add the reports to the groups
workspace.reports.extend(
self.client.api_client.fetch_all_org_reports(group_id=workspace.id) or []
)

# add the datasets to the groups
workspace.datasets.extend(
self.client.api_client.fetch_all_org_datasets(group_id=workspace.id) or []
)
for dataset in workspace.datasets:
# add the tables to the datasets
dataset.tables.extend(
self.client.api_client.fetch_dataset_tables(
group_id=workspace.id, dataset_id=dataset.id
)
or []
)

def get_dashboard(self) -> Any:
"""
Method to iterate through dashboard lists filter dashboards & yield dashboard details
"""
for workspace in self.workspace_data:
# fetch all workspaces/groups & apply filter pattern
all_workspaces = self.client.api_client.fetch_all_workspaces()
all_workspaces = self.get_filtered_workspaces(all_workspaces)
for workspace in all_workspaces:
# prepare additional data for specific workspace (datasets, reports, dashboards)
self.prepare_workspace_data(workspace)
self.context.get().workspace = workspace
self.workspace_data.append(workspace)
for dashboard in self.get_dashboards_list():
try:
dashboard_details = self.get_dashboard_details(dashboard)
Expand Down Expand Up @@ -309,54 +257,45 @@ def _get_chart_url(
f"{workspace_id}/{chart_url_postfix}"
)

def list_datamodels(self) -> Iterable[Dataset]:
def yield_datamodel(
self, dashboard_details: Union[PowerBIDashboard, PowerBIReport]
) -> Iterable[Either[CreateDashboardRequest]]:
"""
Get All the Powerbi Datasets
Method to yield datamodel for each workspace
"""
if self.source_config.includeDataModels:
workspace_datasets = self.context.get().workspace.datasets
for dataset in workspace_datasets:
if filter_by_datamodel(
self.source_config.dataModelFilterPattern, dataset.name
):
self.status.filter(dataset.name, "Data model filtered out.")
continue
try:
for workspace in self.workspace_data:
for dataset in workspace.datasets or []:
if filter_by_datamodel(
self.source_config.dataModelFilterPattern, dataset.name
):
self.status.filter(dataset.name, "Data model filtered out.")
continue
yield dataset
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(f"Unexpected error fetching PowerBI datasets - {err}")

def yield_bulk_datamodel(
self, dataset: Dataset
) -> Iterable[Either[CreateDashboardDataModelRequest]]:
"""
Method to fetch DataModels in bulk
"""
try:
data_model_request = CreateDashboardDataModelRequest(
name=EntityName(dataset.id),
displayName=dataset.name,
description=Markdown(dataset.description)
if dataset.description
else None,
service=FullyQualifiedEntityName(self.context.get().dashboard_service),
dataModelType=DataModelType.PowerBIDataModel.value,
serviceType=DashboardServiceType.PowerBI.value,
columns=self._get_column_info(dataset),
project=self._fetch_dataset_workspace(dataset_id=dataset.id),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)
data_model_request = CreateDashboardDataModelRequest(
name=EntityName(dataset.id),
displayName=dataset.name,
description=Markdown(dataset.description)
if dataset.description
else None,
service=FullyQualifiedEntityName(
self.context.get().dashboard_service
),
dataModelType=DataModelType.PowerBIDataModel.value,
serviceType=DashboardServiceType.PowerBI.value,
columns=self._get_column_info(dataset),
project=self.get_project_name(dashboard_details),
)
yield Either(right=data_model_request)
self.register_record_datamodel(datamodel_request=data_model_request)

except Exception as exc:
yield Either(
left=StackTraceError(
name=dataset.name,
error=f"Error yielding Data Model [{dataset.name}]: {exc}",
stackTrace=traceback.format_exc(),
except Exception as exc:
yield Either(
left=StackTraceError(
name=dataset.name,
error=f"Error yielding Data Model [{dataset.name}]: {exc}",
stackTrace=traceback.format_exc(),
)
)
)

def _get_child_columns(self, table: PowerBiTable) -> List[Column]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Iterable, Optional

from ibm_db_sa.base import ischema_names
from ibm_db_sa.reflection import DB2Reflector, OS390Reflector
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.row import LegacyRow
from sqlalchemy.sql.sqltypes import BOOLEAN
Expand All @@ -26,6 +27,7 @@
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.db2.utils import get_unique_constraints
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()
Expand All @@ -34,6 +36,10 @@
ischema_names.update({"BOOLEAN": BOOLEAN})


DB2Reflector.get_unique_constraints = get_unique_constraints
OS390Reflector.get_unique_constraints = get_unique_constraints


class Db2Source(CommonDbSourceService):
"""
Implements the necessary methods to extract
Expand Down
Loading

0 comments on commit e946dff

Please sign in to comment.