Skip to content

Commit

Permalink
Merge branch 'main' into gen-1322
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachin-chaurasiya authored Sep 26, 2024
2 parents 3498e9d + 423e99f commit e49e5e7
Show file tree
Hide file tree
Showing 82 changed files with 2,670 additions and 1,160 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/py-cli-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,8 @@ jobs:
with:
payload: |
{
"text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥"
}
"text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥\nLogs: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}\nCommit: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.sha }}"
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.E2E_SLACK_WEBHOOK }}
SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/py-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ on:
permissions:
contents: read

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
py-run-tests:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
"kafka-python==2.0.2",
*plugins["pii-processor"],
"requests==2.31.0",
f"{DATA_DIFF['mysql']}==0.11.2",
f"{DATA_DIFF['mysql']}",
*plugins["deltalake"],
*plugins["datalake-gcs"],
*plugins["pgspider"],
Expand Down
6 changes: 4 additions & 2 deletions ingestion/src/_openmetadata_testutils/ometa.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
OM_JWT = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"


def int_admin_ometa(url: str = "http://localhost:8585/api") -> OpenMetadata:
def int_admin_ometa(
url: str = "http://localhost:8585/api", jwt: str = OM_JWT
) -> OpenMetadata:
"""Initialize the ometa connection with default admin:admin creds"""
server_config = OpenMetadataConnection(
hostPort=url,
authProvider=AuthProvider.openmetadata,
securityConfig=OpenMetadataJWTClientConfig(jwtToken=CustomSecretStr(OM_JWT)),
securityConfig=OpenMetadataJWTClientConfig(jwtToken=CustomSecretStr(jwt)),
)
metadata = OpenMetadata(server_config)
assert metadata.health_check()
Expand Down
25 changes: 25 additions & 0 deletions ingestion/src/metadata/examples/workflows/sigma.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
source:
type: sigma
serviceName: local_sigma
serviceConnection:
config:
type: Sigma
hostPort: https://api.sigmacomputing.com
clientId: client_id
clientSecret: client_secret
apiVersion: v2
sourceConfig:
config:
type: DashboardMetadata
lineageInformation:
dbServiceNames: [db_service_name]
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
51 changes: 47 additions & 4 deletions ingestion/src/metadata/ingestion/ometa/mixins/suggestions_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,13 @@
To be used by OpenMetadata class
"""
from metadata.generated.schema.entity.feed.suggestion import Suggestion
from typing import Union

from metadata.generated.schema.entity.feed.suggestion import Suggestion, SuggestionType
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import ometa_logger

logger = ometa_logger()
Expand All @@ -30,12 +35,50 @@ class OMetaSuggestionsMixin:
client: REST

def update_suggestion(self, suggestion: Suggestion) -> Suggestion:
"""
Update an existing Suggestion with new fields
"""
"""Update an existing Suggestion with new fields"""
resp = self.client.put(
f"{self.get_suffix(Suggestion)}/{str(suggestion.root.id.root)}",
data=suggestion.model_dump_json(),
)

return Suggestion(**resp)

def accept_suggestion(self, suggestion_id: Union[str, basic.Uuid]) -> None:
"""Accept a given suggestion"""
self.client.put(
f"{self.get_suffix(Suggestion)}/{model_str(suggestion_id)}/accept",
)

def reject_suggestion(self, suggestion_id: Union[str, basic.Uuid]) -> None:
"""Reject a given suggestion"""
self.client.put(
f"{self.get_suffix(Suggestion)}/{model_str(suggestion_id)}/reject",
)

def accept_all_suggestions(
self,
fqn: Union[str, FullyQualifiedEntityName],
user_id: Union[str, basic.Uuid],
suggestion_type: SuggestionType = SuggestionType.SuggestDescription,
) -> None:
"""Accept all suggestions"""
self.client.put(
f"{self.get_suffix(Suggestion)}/accept-all?"
f"userId={model_str(user_id)}&"
f"entityFQN={model_str(fqn)}&"
f"suggestionType={suggestion_type.value}",
)

def reject_all_suggestions(
self,
fqn: Union[str, FullyQualifiedEntityName],
user_id: Union[str, basic.Uuid],
suggestion_type: SuggestionType = SuggestionType.SuggestDescription,
) -> None:
"""Accept all suggestions"""
self.client.put(
f"{self.get_suffix(Suggestion)}/reject-all?"
f"userId={model_str(user_id)}&"
f"entityFQN={model_str(fqn)}&"
f"suggestionType={suggestion_type.value}",
)
6 changes: 5 additions & 1 deletion ingestion/src/metadata/ingestion/ometa/ometa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pydantic import BaseModel

from metadata.generated.schema.api.createBot import CreateBot
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
CreateIngestionPipelineRequest,
)
Expand Down Expand Up @@ -172,13 +173,16 @@ def get_suffix(entity: Type[T]) -> str:

return route

def get_module_path(self, entity: Type[T]) -> str:
def get_module_path(self, entity: Type[T]) -> Optional[str]:
"""
Based on the entity, return the module path
it is found inside generated
"""
if issubclass(entity, CreateIngestionPipelineRequest):
return "services.ingestionPipelines"
if issubclass(entity, CreateBot):
# Bots schemas don't live inside any subdirectory
return None
return entity.__module__.split(".")[-2]

def get_create_entity_type(self, entity: Type[T]) -> Type[C]:
Expand Down
4 changes: 3 additions & 1 deletion ingestion/src/metadata/ingestion/ometa/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
CreateClassificationRequest,
)
from metadata.generated.schema.api.classification.createTag import CreateTagRequest
from metadata.generated.schema.api.createBot import CreateBot
from metadata.generated.schema.api.data.createAPICollection import (
CreateAPICollectionRequest,
)
Expand Down Expand Up @@ -213,7 +214,8 @@
User.__name__: "/users",
CreateUserRequest.__name__: "/users",
AuthenticationMechanism.__name__: "/users/auth-mechanism",
Bot.__name__: "/bots", # We won't allow bot creation from the client
Bot.__name__: "/bots",
CreateBot.__name__: "/bots",
# Roles
Role.__name__: "/roles",
CreateRoleRequest.__name__: "/roles",
Expand Down
Empty file.
187 changes: 187 additions & 0 deletions ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
REST Auth & Client for Sigma
"""

import traceback
from base64 import b64encode
from typing import List, Optional, Tuple

from metadata.generated.schema.entity.services.connections.dashboard.sigmaConnection import (
SigmaConnection,
)
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.source.dashboard.sigma.models import (
AuthToken,
EdgeSource,
EdgeSourceResponse,
Elements,
ElementsResponse,
NodeDetails,
OwnerDetails,
Workbook,
WorkbookDetails,
WorkBookPageResponse,
WorkBookResponseDetails,
)
from metadata.utils.constants import AUTHORIZATION_HEADER, UTF_8
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import utils_logger

logger = utils_logger()

HEADERS = {
"accept": "application/json",
"Content-type": "application/x-www-form-urlencoded",
}

TOKEN_PAYLOAD = {"grant_type": "client_credentials"}


class SigmaApiClient:
"""
REST Auth & Client for Sigma
"""

client: REST

def __init__(self, config: SigmaConnection):
self.config = config
token_api_key = str(
b64encode(
f"{self.config.clientId}:{self.config.clientSecret.get_secret_value()}".encode(
UTF_8
)
).decode(UTF_8)
)

token_config = ClientConfig(
base_url=clean_uri(config.hostPort),
api_version=config.apiVersion,
auth_header=AUTHORIZATION_HEADER,
extra_headers=HEADERS,
auth_token=lambda: (token_api_key, 0),
auth_token_mode="Basic",
)

self.token_client = REST(token_config)

client_config = ClientConfig(
base_url=clean_uri(config.hostPort),
api_version=config.apiVersion,
auth_token=self.get_auth_token,
auth_header=AUTHORIZATION_HEADER,
)

self.client = REST(client_config)

def get_auth_token(self) -> Tuple[str, int]:
"""
generate auth token
Returns:
Tuple[str, int]: A tuple containing the access_token (str) and expires_in (int)
"""
result = AuthToken.model_validate(
self.token_client.post("/auth/token", data=TOKEN_PAYLOAD)
)
return result.access_token, result.expires_in

def get_dashboards(self) -> Optional[List[Workbook]]:
"""
method to fetch dashboards from api
"""
result = WorkBookResponseDetails.model_validate(self.client.get("/workbooks"))
if result:
return result.entries

def get_dashboard_detail(self, workbook_id: str) -> Optional[WorkbookDetails]:
"""
method to fetch dashboard details from api
"""
try:
result = WorkbookDetails.model_validate(
self.client.get(f"/workbooks/{workbook_id}")
)
if result:
return result
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.error(
f"Error fetching Dashboard details for for workbook {workbook_id}: {exc}"
)
return None

def get_owner_detail(self, owner_id: str) -> Optional[OwnerDetails]:
"""
method to fetch dashboard owner details from api
"""
try:
result = OwnerDetails.model_validate(
self.client.get(f"/members/{owner_id}")
)
if result:
return result
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch owner details for owner {owner_id}: {exc}")
return None

def get_chart_details(self, workbook_id: str) -> Optional[List[Elements]]:
"""
method to fetch dashboards chart details from api
"""
try:
elements_list = []
pages = WorkBookPageResponse.model_validate(
self.client.get(f"/workbooks/{workbook_id}/pages")
)
for page in pages.entries:
elements = ElementsResponse.model_validate(
self.client.get(
f"/workbooks/{workbook_id}/pages/{page.pageId}/elements"
)
)
elements_list.extend(elements.entries or [])
return elements_list
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch chart details for workbook {workbook_id}: {exc}"
)
return None

def get_lineage_details(
self, workbook_id: str, element_id: str
) -> Optional[List[EdgeSource]]:
"""
method to fetch dashboards lineage details from api
"""
try:
source_nodes = []
edges_response = EdgeSourceResponse.model_validate(
self.client.get(
f"/workbooks/{workbook_id}/lineage/elements/{element_id}"
)
)
for node in edges_response.edges:
if node.node_id:
node_details = NodeDetails.model_validate(
self.client.get(f"/files/{node.node_id}")
)
source_nodes.append(node_details)
return source_nodes
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch lineage details for workbook {workbook_id}: {exc}"
)
return None
Loading

0 comments on commit e49e5e7

Please sign in to comment.