diff --git a/ingestion/src/metadata/examples/workflows/rest.yaml b/ingestion/src/metadata/examples/workflows/rest.yaml new file mode 100644 index 000000000000..1846d309eb90 --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/rest.yaml @@ -0,0 +1,19 @@ +source: + type: rest + serviceName: openapi_rest + serviceConnection: + config: + type: REST + openAPISchemaURL: https://docs.open-metadata.org/swagger.json + sourceConfig: + config: + type: ApiMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/src/metadata/ingestion/source/api/api_service.py b/ingestion/src/metadata/ingestion/source/api/api_service.py new file mode 100644 index 000000000000..38bddd260829 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/api_service.py @@ -0,0 +1,206 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Base class for ingesting api services +""" +from abc import ABC, abstractmethod +from typing import Any, Iterable, Set + +from pydantic import Field +from typing_extensions import Annotated + +from metadata.generated.schema.api.data.createAPICollection import ( + CreateAPICollectionRequest, +) +from metadata.generated.schema.api.data.createAPIEndpoint import ( + CreateAPIEndpointRequest, +) +from metadata.generated.schema.entity.data.apiCollection import APICollection +from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint +from metadata.generated.schema.entity.services.apiService import ( + ApiService, + ApiServiceConnection, +) +from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import ( + ApiServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.delete import delete_entity_from_source +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.steps import Source +from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.models.delete_entity import DeleteEntity +from metadata.ingestion.models.topology import ( + NodeStage, + ServiceTopology, + TopologyContextManager, + TopologyNode, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.utils import fqn +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class ApiServiceTopology(ServiceTopology): + """ + Defines the hierarchy in API Services. + service -> ApiCollection -> ApiEndpoint + + We could have a topology validator. We can only consume + data that has been produced by any parent node. + """ + + root: Annotated[ + TopologyNode, Field(description="Root node for the topology") + ] = TopologyNode( + producer="get_services", + stages=[ + NodeStage( + type_=ApiService, + context="api_service", + processor="yield_create_request_api_service", + overwrite=False, + must_return=True, + cache_entities=True, + ), + ], + children=["api_collection"], + post_process=["mark_api_collections_as_deleted"], + ) + api_collection: Annotated[ + TopologyNode, Field(description="API Collection Processing Node") + ] = TopologyNode( + producer="get_api_collections", + stages=[ + NodeStage( + type_=APICollection, + context="api_collections", + processor="yield_api_collection", + consumer=["api_service"], + use_cache=True, + ), + NodeStage( + type_=APIEndpoint, + context="api_endpoints", + processor="yield_api_endpoint", + consumer=["api_service"], + use_cache=True, + ), + ], + ) + + +class ApiServiceSource(TopologyRunnerMixin, Source, ABC): + """ + Base class for API services. + It implements the topology and context + """ + + source_config: ApiServiceMetadataPipeline + config: WorkflowSource + # Big union of types we want to fetch dynamically + service_connection: ApiServiceConnection.model_fields["config"].annotation + + topology = ApiServiceTopology() + context = TopologyContextManager(topology) + api_collection_source_state: Set = set() + api_endpoint_source_state: Set = set() + + def __init__( + self, + config: WorkflowSource, + metadata: OpenMetadata, + ): + super().__init__() + self.config = config + self.metadata = metadata + self.service_connection = self.config.serviceConnection.root.config + self.source_config: ApiServiceMetadataPipeline = self.config.sourceConfig.config + self.connection = get_connection(self.service_connection) + + # Flag the connection for the test connection + self.connection_obj = self.connection + self.test_connection() + + self.client = self.connection + + @property + def name(self) -> str: + return self.service_connection.type.name + + def get_services(self) -> Iterable[WorkflowSource]: + yield self.config + + def yield_create_request_api_service(self, config: WorkflowSource): + yield Either( + right=self.metadata.get_create_service_from_source( + entity=ApiService, config=config + ) + ) + + @abstractmethod + def get_api_collections(self, *args, **kwargs) -> Iterable[Any]: + """ + Method to list all collections to process. + Here is where filtering happens + """ + + @abstractmethod + def yield_api_collection( + self, *args, **kwargs + ) -> Iterable[Either[CreateAPICollectionRequest]]: + """Method to return api collection Entities""" + + @abstractmethod + def yield_api_endpoint( + self, *args, **kwargs + ) -> Iterable[Either[CreateAPIEndpointRequest]]: + """Method to return api endpoint Entities""" + + def close(self): + """By default, nothing to close""" + + def test_connection(self) -> None: + test_connection_fn = get_test_connection_fn(self.service_connection) + test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + + def mark_api_collections_as_deleted(self) -> Iterable[Either[DeleteEntity]]: + """Method to mark the api collection as deleted""" + if self.source_config.markDeletedApiCollections: + yield from delete_entity_from_source( + metadata=self.metadata, + entity_type=APICollection, + entity_source_state=self.api_collection_source_state, + mark_deleted_entity=self.source_config.markDeletedApiCollections, + params={"service": self.context.get().api_service}, + ) + + def register_record(self, collection_request: CreateAPICollectionRequest) -> None: + """ + Mark the api collection record as scanned and update + the api_collection_source_state + """ + api_collection_fqn = fqn.build( + self.metadata, + entity_type=APICollection, + service_name=collection_request.service.root, + api_collection_name=collection_request.name.root, + ) + + self.api_collection_source_state.add(api_collection_fqn) + + def prepare(self): + """By default, nothing to prepare""" diff --git a/ingestion/src/metadata/ingestion/source/api/rest/connection.py b/ingestion/src/metadata/ingestion/source/api/rest/connection.py new file mode 100644 index 000000000000..f76872ae78e1 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/rest/connection.py @@ -0,0 +1,60 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Source connection handler +""" +from typing import Optional + +import requests + +# from metadata.ingestion.source.pipeline.flink.client import FlinkClient +from requests.models import Response + +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.services.connections.apiService.restConnection import ( + RESTConnection, +) +from metadata.ingestion.connections.test_connections import test_connection_steps +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +def get_connection(connection: RESTConnection) -> Response: + """ + Create connection + """ + return requests.get(connection.openAPISchemaURL) + + +def test_connection( + metadata: OpenMetadata, + client: Response, + service_connection: RESTConnection, + automation_workflow: Optional[AutomationWorkflow] = None, +) -> None: + """ + Test connection. This can be executed either as part + of a metadata workflow or during an Automation Workflow + """ + + def custom_url_exec(): + return [] if client.status_code == 200 else None + + test_fn = {"CheckURL": custom_url_exec} + + test_connection_steps( + metadata=metadata, + test_fn=test_fn, + service_type=service_connection.type.value, + automation_workflow=automation_workflow, + ) diff --git a/ingestion/src/metadata/ingestion/source/api/rest/metadata.py b/ingestion/src/metadata/ingestion/source/api/rest/metadata.py new file mode 100644 index 000000000000..60348b07976d --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/rest/metadata.py @@ -0,0 +1,230 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""REST source module""" + +import traceback +from typing import Any, Iterable, List, Optional + +from pydantic import AnyUrl + +from metadata.generated.schema.api.data.createAPICollection import ( + CreateAPICollectionRequest, +) +from metadata.generated.schema.api.data.createAPIEndpoint import ( + CreateAPIEndpointRequest, +) +from metadata.generated.schema.entity.data.apiCollection import APICollection +from metadata.generated.schema.entity.data.apiEndpoint import ApiRequestMethod +from metadata.generated.schema.entity.services.connections.apiService.restConnection import ( + RESTConnection, +) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.apiSchema import APISchema +from metadata.generated.schema.type.basic import ( + EntityName, + FullyQualifiedEntityName, + Markdown, +) +from metadata.generated.schema.type.schema import DataTypeTopic, FieldModel +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.api.api_service import ApiServiceSource +from metadata.utils import fqn +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class RestSource(ApiServiceSource): + """ + Source implementation to ingest REST data. + + We will iterate on the registered collections, endpoints + and prepare an iterator of + """ + + def __init__(self, config: WorkflowSource, metadata: OpenMetadata): + super().__init__(config, metadata) + + @classmethod + def create( + cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None + ): + config: WorkflowSource = WorkflowSource.model_validate(config_dict) + connection: RESTConnection = config.serviceConnection.root.config + if not isinstance(connection, RESTConnection): + raise InvalidSourceException( + f"Expected RESTConnection, but got {connection}" + ) + return cls(config, metadata) + + def get_api_collections(self, *args, **kwargs) -> Iterable[Any]: + """ + Method to list all collections to process. + Here is where filtering happens + """ + json_response = self.connection.json() + if not type(json_response) == dict: + return None + + for collection in json_response.get("tags", []): + if not collection.get("name"): + continue + yield collection + + def yield_api_collection( + self, collection: dict + ) -> Iterable[Either[CreateAPICollectionRequest]]: + """Method to return api collection Entities""" + try: + collection_request = CreateAPICollectionRequest( + name=EntityName(collection.get("name")), + displayName=collection.get("name"), + description=Markdown(collection.get("description")) + if collection.get("description") + else None, + service=FullyQualifiedEntityName(self.context.get().api_service), + endpointURL=AnyUrl( + f"{self.config.serviceConnection.root.config.openAPISchemaURL}#tag/{collection.get('name')}" + ), + ) + yield Either(right=collection_request) + self.register_record(collection_request=collection_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=collection.get("name"), + error=f"Error creating collection: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def yield_api_endpoint( + self, collection: dict + ) -> Iterable[Either[CreateAPIEndpointRequest]]: + """Method to return api endpoint Entities""" + filtered_endpoints = self._filter_collection_endpoints(collection) + for path, methods in filtered_endpoints.items(): + for method_type, info in methods.items(): + try: + yield Either( + right=CreateAPIEndpointRequest( + name=EntityName(info.get("operationId")), + description=Markdown(info.get("description")) + if info.get("description") + else None, + apiCollection=FullyQualifiedEntityName( + fqn.build( + self.metadata, + entity_type=APICollection, + service_name=self.context.get().api_service, + api_collection_name=collection.get("name"), + ) + ), + endpointURL=AnyUrl( + f"{self.config.serviceConnection.root.config.openAPISchemaURL}#operation/{info.get('operationId')}", + ), + requestMethod=self._get_api_request_method(method_type), + requestSchema=self._get_request_schema(info), + responseSchema=self._get_response_schema(info), + ) + ) + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name=collection.get("name"), + error=f"Error creating API Endpoint [{info.get('operationId')}]: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def _filter_collection_endpoints(self, collection: dict) -> dict: + """""" + collection_name = collection.get("name") + json_response = self.connection.json().get("paths", {}) + + filtered_paths = {} + for path, methods in json_response.items(): + for method_type, info in methods.items(): + if collection_name in info.get("tags", []): + # path & methods are part of collection + filtered_paths.update({path: methods}) + break + return filtered_paths + + def _get_api_request_method(self, method_type: str) -> Optional[str]: + """fetch endpoint request method""" + try: + return ApiRequestMethod[method_type.upper()] + except KeyError as err: + logger.info(f"Keyerror while fetching request method: {err}") + return None + + def _get_request_schema(self, info: dict) -> Optional[APISchema]: + """fetch request schema""" + schema_ref = ( + info.get("requestBody", {}) + .get("content", {}) + .get("application/json", {}) + .get("schema", {}) + .get("$ref") + ) + if not schema_ref: + logger.info("No request schema found for the endpoint") + return None + return self._process_schema(schema_ref) + + def _get_response_schema(self, info: dict) -> Optional[APISchema]: + """fetch response schema""" + schema_ref = ( + info.get("responses", {}) + .get("200", {}) + .get("content", {}) + .get("application/json", {}) + .get("schema", {}) + .get("$ref", {}) + ) + if not schema_ref: + logger.info("No response schema found for the endpoint") + return None + return self._process_schema(schema_ref) + + def _process_schema(self, schema_ref: str) -> Optional[List[APISchema]]: + """process schema""" + try: + schema_ref = schema_ref.split("/")[-1] + schema_fields = ( + self.connection.json().get("components").get("schemas").get(schema_ref) + ) + + fetched_fields = [] + for key, val in schema_fields.get("properties", {}).items(): + dtype = val.get("type") + if not dtype: + continue + fetched_fields.append( + FieldModel( + name=key, + dataType=DataTypeTopic[dtype.upper()] + if dtype.upper() in DataTypeTopic.__members__ + else DataTypeTopic.UNKNOWN, + ) + ) + return APISchema(schemaFields=fetched_fields) + except Exception as err: + logger.info(f"Error while processing request schema: {err}") + return None diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/api/rest.json b/openmetadata-service/src/main/resources/json/data/testConnections/api/rest.json new file mode 100644 index 000000000000..3d46582ffe81 --- /dev/null +++ b/openmetadata-service/src/main/resources/json/data/testConnections/api/rest.json @@ -0,0 +1,14 @@ +{ + "name": "REST", + "displayName": "REST Test Connection", + "description": "This Test Connection validates the schema provided for openapi", + "steps": [ + { + "name": "CheckURL", + "description": "Checks if url is valid open api json schema url or not", + "errorMessage": "Failed to validate the url, please check the url", + "shortCircuit": true, + "mandatory": true + } + ] +} \ No newline at end of file diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/apiServiceMetadataPipeline.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/apiServiceMetadataPipeline.json new file mode 100644 index 000000000000..2a3b45af4760 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/apiServiceMetadataPipeline.json @@ -0,0 +1,40 @@ +{ + "$id": "https://open-metadata.org/schema/metadataIngestion/apiServiceMetadataPipeline.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ApiServiceMetadataPipeline", + "description": "ApiService Metadata Pipeline Configuration.", + "type": "object", + "definitions": { + "ApiMetadataConfigType": { + "description": "Api Source Config Metadata Pipeline type", + "type": "string", + "enum": ["ApiMetadata"], + "default": "ApiMetadata" + } + }, + "properties": { + "type": { + "description": "Pipeline type", + "$ref": "#/definitions/ApiMetadataConfigType", + "default": "ApiMetadata" + }, + "apiCollectionFilterPattern": { + "description": "Regex to only fetch api collections with names matching the pattern.", + "$ref": "../type/filterPattern.json#/definitions/filterPattern", + "title": "API Collection Filter Pattern" + }, + "markDeletedApiCollections": { + "description": "Optional configuration to soft delete api collections in OpenMetadata if the source collections are deleted. Also, if the collection is deleted, all the associated entities like endpoints, etc., with that collection will be deleted", + "type": "boolean", + "default": true, + "title": "Mark Deleted Api Collection" + }, + "overrideMetadata":{ + "title": "Override Metadata", + "description": "Set the 'Override Metadata' toggle to control whether to override the existing metadata in the OpenMetadata server with the metadata fetched from the source. If the toggle is set to true, the metadata fetched from the source will override the existing metadata in the OpenMetadata server. If the toggle is set to false, the metadata fetched from the source will not override the existing metadata in the OpenMetadata server. This is applicable for fields like description, tags, owner and displayName", + "type": "boolean", + "default": false + } + }, + "additionalProperties": false +} \ No newline at end of file