Skip to content

Commit

Permalink
fix test con. pyformat changes
Browse files Browse the repository at this point in the history
  • Loading branch information
harshsoni2024 committed Sep 6, 2024
1 parent b96cc4e commit 0551381
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
14 changes: 11 additions & 3 deletions ingestion/src/metadata/ingestion/source/api/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
from typing import Optional

import requests

# from metadata.ingestion.source.pipeline.flink.client import FlinkClient
from requests.models import Response

from metadata.generated.schema.entity.automations.workflow import (
Expand All @@ -29,6 +27,12 @@
from metadata.ingestion.ometa.ometa_api import OpenMetadata


class SchemaURLError(Exception):
"""
Class to indicate schema url is invalid
"""


def get_connection(connection: RESTConnection) -> Response:
"""
Create connection
Expand All @@ -48,7 +52,11 @@ def test_connection(
"""

def custom_url_exec():
return [] if client.status_code == 200 else None
if client.status_code == 200:
return []
raise SchemaURLError(
f"Failed to get access to provided schema url. Please check with url and its permissions"
)

test_fn = {"CheckURL": custom_url_exec}

Expand Down
42 changes: 25 additions & 17 deletions ingestion/src/metadata/ingestion/source/api/rest/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ def get_api_collections(self, *args, **kwargs) -> Iterable[Any]:
Method to list all collections to process.
Here is where filtering happens
"""
json_response = self.connection.json()
if not type(json_response) == dict:
return None
json_response = {}
try:
json_response = self.connection.json()
except Exception as err:
logger.error(f"Error while fetching collections from schema URL :{err}")

for collection in json_response.get("tags", []):
if not collection.get("name"):
Expand Down Expand Up @@ -117,7 +119,7 @@ def yield_api_endpoint(
self, collection: dict
) -> Iterable[Either[CreateAPIEndpointRequest]]:
"""Method to return api endpoint Entities"""
filtered_endpoints = self._filter_collection_endpoints(collection)
filtered_endpoints = self._filter_collection_endpoints(collection) or {}
for path, methods in filtered_endpoints.items():
for method_type, info in methods.items():
try:
Expand Down Expand Up @@ -152,19 +154,25 @@ def yield_api_endpoint(
)
)

def _filter_collection_endpoints(self, collection: dict) -> dict:
""""""
collection_name = collection.get("name")
json_response = self.connection.json().get("paths", {})

filtered_paths = {}
for path, methods in json_response.items():
for method_type, info in methods.items():
if collection_name in info.get("tags", []):
# path & methods are part of collection
filtered_paths.update({path: methods})
break
return filtered_paths
def _filter_collection_endpoints(self, collection: dict) -> Optional[dict]:
"""filter endpoints related to specific collection"""
try:
collection_name = collection.get("name")
json_response = self.connection.json().get("paths", {})

filtered_paths = {}
for path, methods in json_response.items():
for method_type, info in methods.items():
if collection_name in info.get("tags", []):
# path & methods are part of collection
filtered_paths.update({path: methods})
break
return filtered_paths
except Exception as err:
logger.info(
f"Error while filtering endpoints for collection {collection_name}"
)
return None

def _get_api_request_method(self, method_type: str) -> Optional[str]:
"""fetch endpoint request method"""
Expand Down

0 comments on commit 0551381

Please sign in to comment.