Skip to content

Commit

Permalink
Fix ElasticSearch Test Connection & Deploy (#13061)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulixius9 authored Sep 8, 2023
1 parent a41326e commit 4e63387
Show file tree
Hide file tree
Showing 26 changed files with 496 additions and 105 deletions.
5 changes: 4 additions & 1 deletion ingestion/examples/sample_data/searchIndexes/service.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
"serviceConnection": {
"config": {
"type": "ElasticSearch",
"hostPort": "localhost:9200"
"hostPort": "http://localhost:9200"
}
},
"sourceConfig": {
"config": {
"type": "SearchMetadata"
}
}
}
3 changes: 2 additions & 1 deletion ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ def get_long_description():
"druid": {"pydruid>=0.6.5"},
"dynamodb": {VERSIONS["boto3"]},
"elasticsearch": {
"elasticsearch==7.13.1"
"elasticsearch==7.13.1",
"elasticsearch8~=8.9.0",
}, # also requires requests-aws4auth which is in base
"glue": {VERSIONS["boto3"]},
"great-expectations": {VERSIONS["great-expectations"]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ source:
serviceConnection:
config:
type: ElasticSearch
hostPort: localhost:9200
hostPort: http://localhost:9200
sourceConfig:
config:
type: SearchMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""
from typing import Optional

from elasticsearch import Elasticsearch
from elasticsearch8 import Elasticsearch

from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
Expand All @@ -35,26 +35,34 @@ def get_connection(connection: ElasticsearchConnection) -> Elasticsearch:
"""
basic_auth = None
api_key = None
if isinstance(connection.authType, BasicAuthentication):
if (
isinstance(connection.authType, BasicAuthentication)
and connection.authType.username
):
basic_auth = (
connection.authType.username,
connection.authType.password.get_secret_value(),
connection.authType.password.get_secret_value()
if connection.authType.password
else None,
)

if isinstance(connection.authType, ApiAuthentication):
api_key = (
connection.authType.apiKeyId,
connection.authType.apiKey.get_secret_value(),
)
if connection.authType.apiKeyId and connection.authType.apiKey:
api_key = (
connection.authType.apiKeyId,
connection.authType.apiKey.get_secret_value(),
)
elif connection.authType.apiKey:
api_key = connection.authType.apiKey.get_secret_value()

if not connection.connectionArguments:
connection.connectionArguments = init_empty_connection_arguments()

return Elasticsearch(
[connection.hostPort],
basic_auth=basic_auth,
connection.hostPort,
http_auth=basic_auth,
api_key=api_key,
scheme=connection.scheme.value,
ca_certs=connection.caCert,
**connection.connectionArguments.__root__
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""
from typing import Any, Iterable, Optional

from elasticsearch import Elasticsearch
from elasticsearch8 import Elasticsearch

from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
Expand Down Expand Up @@ -67,7 +67,7 @@ def get_search_index_list(self) -> Iterable[dict]:
"""
index_list = self.client.indices.get_alias() or {}
for index in index_list.keys():
yield self.client.indices.get(index)
yield self.client.indices.get(index=str(index))

def get_search_index_name(self, search_index_details: dict) -> Optional[str]:
"""
Expand Down
13 changes: 10 additions & 3 deletions ingestion/src/metadata/ingestion/source/search/search_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
from metadata.generated.schema.api.data.createSearchIndex import (
CreateSearchIndexRequest,
)
from metadata.generated.schema.api.services.createSearchService import (
CreateSearchServiceRequest,
)
from metadata.generated.schema.entity.data.searchIndex import (
SearchIndex,
SearchIndexSampleData,
Expand Down Expand Up @@ -165,9 +168,13 @@ def get_search_index(self) -> Any:
continue
yield index_details

def yield_create_request_search_service(self, config: WorkflowSource):
yield self.metadata.get_create_service_from_source(
entity=SearchService, config=config
def yield_create_request_search_service(
self, config: WorkflowSource
) -> Iterable[Either[CreateSearchServiceRequest]]:
yield Either(
right=self.metadata.get_create_service_from_source(
entity=SearchService, config=config
)
)

def get_services(self) -> Iterable[WorkflowSource]:
Expand Down
2 changes: 1 addition & 1 deletion ingestion/tests/unit/topology/search/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"username": "username",
"password": "password",
},
"hostPort": "localhost:9200",
"hostPort": "http://localhost:9200",
}
},
"sourceConfig": {"config": {"type": "SearchMetadata"}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.searchService import SearchService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.ingestion.models.encoders import show_secrets_encoder
from metadata.ingestion.ometa.ometa_api import OpenMetadata
Expand Down Expand Up @@ -66,6 +67,17 @@

logger = workflow_logger()

ENTITY_CLASS_MAP = {
"databaseService": DatabaseService,
"pipelineService": PipelineService,
"dashboardService": DashboardService,
"messagingService": MessagingService,
"mlmodelService": MlModelService,
"metadataService": MetadataService,
"storageService": StorageService,
"searchService": SearchService,
}


class InvalidServiceException(Exception):
"""
Expand Down Expand Up @@ -119,7 +131,7 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:

service_type = ingestion_pipeline.service.type

entity_class = None
entity_class = ENTITY_CLASS_MAP.get(service_type)
try:
if service_type == "testSuite":
return WorkflowSource(
Expand All @@ -129,57 +141,15 @@ def build_source(ingestion_pipeline: IngestionPipeline) -> WorkflowSource:
serviceConnection=None, # retrieved from the test suite workflow using the `sourceConfig.config.entityFullyQualifiedName`
)

if service_type == "databaseService":
entity_class = DatabaseService
service: DatabaseService = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)
elif service_type == "pipelineService":
entity_class = PipelineService
service: PipelineService = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)
elif service_type == "dashboardService":
entity_class = DashboardService
service: DashboardService = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)
elif service_type == "messagingService":
entity_class = MessagingService
service: MessagingService = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)
elif service_type == "mlmodelService":
entity_class = MlModelService
service: MlModelService = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)
elif service_type == "metadataService":
entity_class = MetadataService
service: MetadataService = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)
elif service_type == "storageService":
entity_class = StorageService
service: StorageService = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)
else:
if entity_class is None:
raise InvalidServiceException(f"Invalid Service Type: {service_type}")

service = metadata.get_by_name(
entity=entity_class,
fqn=ingestion_pipeline.service.name,
nullable=False,
)

except ValidationError as original_error:
try:
resp = metadata.client.get(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{% step srNumber=7 %}

{% stepDescription title="7. Configure Metadata Ingestion" %}

In this step we will configure the metadata ingestion pipeline,
Please follow the instructions below

{% /stepDescription %}

{% stepVisualInfo %}

{% image
src="/images/v1.2.0/connectors/configure-metadata-ingestion-search.png"
alt="Configure Metadata Ingestion"
caption="Configure Metadata Ingestion Page" /%}

{% /stepVisualInfo %}

{% /step %}

{% extraContent parentTagName="stepsContainer" %}

#### Metadata Ingestion Options

- **Name**: This field refers to the name of ingestion pipeline, you can customize the name or use the generated name.
- **Search Index Filter Pattern (Optional)**: Use to search index filter patterns to control whether or not to include search index as part of metadata ingestion.
- **Include**: Explicitly include search index by adding a list of comma-separated regular expressions to the Include field. OpenMetadata will include all search indexes with names matching one or more of the supplied regular expressions. All other schemas will be excluded.
- **Exclude**: Explicitly exclude search index by adding a list of comma-separated regular expressions to the Exclude field. OpenMetadata will exclude all search indexes with names matching one or more of the supplied regular expressions. All other schemas will be included.
- **Include Sample Data (toggle)**: Set the Ingest Sample Data toggle to control whether to ingest sample data as part of metadata ingestion.
- **Sample Size**: If include sample data is enabled, 10 records will be ingested by default. Using this field you can customize the size of sample data.
- **Enable Debug Log (toggle)**: Set the Enable Debug Log toggle to set the default log level to debug.


{% /extraContent %}
4 changes: 4 additions & 0 deletions openmetadata-docs/content/v1.2.0-SNAPSHOT/connectors/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,7 @@ the following docs to run the Ingestion Framework in any orchestrator externally

- [Amundsen](/connectors/metadata/amundsen)
- [Atlas](/connectors/metadata/atlas)

## Search Services

- [ElasticSearch](/connectors/search/elasticsearch)
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
title: ElasticSearch
slug: /connectors/search/elasticsearch
---

# ElasticSearch

| Feature | Status |
|------------|------------------------------|
| Search Indexes | {% icon iconName="check" /%} |
| Sample Data | {% icon iconName="check" /%} |
| Supported Versions | ElasticSearch 7.0 and above |
| Stage | BETA |

In this section, we provide guides and references to use the ElasticSearch connector.

Configure and schedule ElasticSearch metadata workflow from the OpenMetadata UI:

- [Requirements](#requirements)
- [Metadata Ingestion](#metadata-ingestion)

{% partial file="/v1.2.0/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/search/elasticsearch/yaml"} /%}

## Requirements

We extract ElasticSearch's metadata by using its [API](https://www.elastic.co/guide/en/elasticsearch/reference/current/rest-apis.html). To run this ingestion, you just need a user with permissions to the ElasticSearch instance.


## Metadata Ingestion

{% partial
file="/v1.2.0/connectors/metadata-ingestion-ui.md"
variables={
connector: "ElasticSearch",
selectServicePath: "/images/v1.2.0/connectors/elasticsearch/select-service.png",
addNewServicePath: "/images/v1.2.0/connectors/elasticsearch/add-new-service.png",
serviceConnectionPath: "/images/v1.2.0/connectors/elasticsearch/service-connection.png",
}
/%}

{% stepsContainer %}
{% extraContent parentTagName="stepsContainer" %}

#### Connection Details

- **Host and Port**: This parameter specifies the host and port of the ElasticSearch instance. This should be specified as a URI string in the format `http://hostname:port` or `https://hostname:port`. For example, you might set it to `https://localhost:9200`.
- **Authentication Types**:
1. Basic Authentication
- Username: Username to connect to ElasticSearch required when Basic Authentication is enabled on ElasticSearch.
- Password: Password of the user account to connect with ElasticSearch.
2. API Key Authentication
- API Key: API Key to connect to ElasticSearch required when API Key Authentication is enabled on ElasticSearch.
- API Key Id: Enter API Key ID In case of API Key Authentication if there is any API Key ID associated with the API Key, otherwise this field can be left blank..
- **Client Certificate Path**: In case the SSL is enabled on your ElasticSearch instance and CA certificate is required for authentication, then specify the path of certificate in this field. NOTE: In case of docker deployment you need to store this certificate accessible to OpenMetadata Ingestion docker container, you can do it via copying the certificate to the docker container or store it in the volume associate with the OpenMetadata Ingestion container.
- **Connection Timeout in Seconds**: Connection timeout configuration for communicating with ElasticSearch APIs.

{% /extraContent %}

{% partial file="/v1.2.0/connectors/test-connection.md" /%}

{% partial file="/v1.2.0/connectors/search/configure-ingestion.md" /%}

{% partial file="/v1.2.0/connectors/ingestion-schedule-and-deploy.md" /%}

{% /stepsContainer %}

{% partial file="/v1.2.0/connectors/troubleshooting.md" /%}
Loading

0 comments on commit 4e63387

Please sign in to comment.