Skip to content

Commit

Permalink
Fix #12779: Add support for SearchIndexes for ElasticSearch and OpenS…
Browse files Browse the repository at this point in the history
…earch (#12782)

* Fix #12779: Add support for SearchIndexes for ElasticSearch and OpenSearch

* Fix #12779: Add support for SearchIndexes for ElasticSearch and OpenSearch

* Fix #12779: Add support for SearchIndexes for ElasticSearch and OpenSearch

* Rebase fixes with main

* Add Sample Data

* lint fix

* remove unused import

* Fix service count test

---------

Co-authored-by: ulixius9 <mayursingal9@gmail.com>
  • Loading branch information
harshach and ulixius9 authored Aug 10, 2023
1 parent 60ff64c commit 7aaf654
Show file tree
Hide file tree
Showing 34 changed files with 3,129 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,27 +1,3 @@
-- create domain entity table
CREATE TABLE IF NOT EXISTS domain_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- create data product entity table
CREATE TABLE IF NOT EXISTS data_product_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- Rename includeTempTables with includeTransTables
UPDATE dbservice_entity
SET json = JSON_REMOVE(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,54 @@
-- column deleted not needed for entities that don't support soft delete
ALTER TABLE query_entity DROP COLUMN deleted;
ALTER TABLE event_subscription_entity DROP COLUMN deleted;

-- create domain entity table
CREATE TABLE IF NOT EXISTS domain_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- create data product entity table
CREATE TABLE IF NOT EXISTS data_product_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- create search service entity
CREATE TABLE IF NOT EXISTS search_service_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
nameHash VARCHAR(256) NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.serviceType') NOT NULL,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS (json -> '$.deleted'),
PRIMARY KEY (id),
UNIQUE (nameHash)
);

-- create search index entity
CREATE TABLE IF NOT EXISTS search_index_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> '$.id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.name') NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSON NOT NULL,
updatedAt BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.updatedAt') NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> '$.updatedBy') NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS (json -> '$.deleted'),
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
Original file line number Diff line number Diff line change
@@ -1,28 +1,4 @@
-- create domain entity table
CREATE TABLE IF NOT EXISTS domain_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- create data product entity table
CREATE TABLE IF NOT EXISTS data_product_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- Rename includeTempTables in snowflake to includeTransientTables
-- Rename includeTempTables in snowflake to includeTransientTables

UPDATE dbservice_entity
SET json = jsonb_set(json::jsonb #- '{connection,config,includeTempTables}', '{connection,config,includeTransientTables}',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,54 @@
-- column deleted not needed for entities that don't support soft delete
ALTER TABLE query_entity DROP COLUMN deleted;
ALTER TABLE event_subscription_entity DROP COLUMN deleted;

-- create domain entity table
CREATE TABLE IF NOT EXISTS domain_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- create data product entity table
CREATE TABLE IF NOT EXISTS data_product_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);

-- create search service entity
CREATE TABLE IF NOT EXISTS search_service_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
nameHash VARCHAR(256) NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
serviceType VARCHAR(256) GENERATED ALWAYS AS (json ->> 'serviceType') STORED NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS ((json ->> 'deleted')::boolean) STORED,
PRIMARY KEY (id),
UNIQUE (nameHash)
);

-- create search index entity
CREATE TABLE IF NOT EXISTS search_index_entity (
id VARCHAR(36) GENERATED ALWAYS AS (json ->> 'id') STORED NOT NULL,
name VARCHAR(256) GENERATED ALWAYS AS (json ->> 'name') STORED NOT NULL,
fqnHash VARCHAR(256) NOT NULL,
json JSONB NOT NULL,
updatedAt BIGINT GENERATED ALWAYS AS ((json ->> 'updatedAt')::bigint) STORED NOT NULL,
updatedBy VARCHAR(256) GENERATED ALWAYS AS (json ->> 'updatedBy') STORED NOT NULL,
deleted BOOLEAN GENERATED ALWAYS AS ((json ->> 'deleted')::boolean) STORED,
PRIMARY KEY (id),
UNIQUE (fqnHash)
);
76 changes: 76 additions & 0 deletions ingestion/examples/sample_data/searchIndexes/searchIndexes.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"searchIndexes": [
{
"id": "e093dd27-390e-4360-8efd-e4d63ec167a9",
"name": "table_search_index",
"displayName": "TableSearchIndex",
"fullyQualifiedName": "elasticsearch_sample.table_search_index",
"description": "Table Search Index",
"version": 0.1,
"updatedAt": 1638354087591,
"serviceType": "ElasticSearch",
"fields": [
{
"name": "name",
"dataType": "TEXT",
"dataTypeDisplay": "text",
"description": "Table Entity Name.",
"tags": []
},
{
"name": "displayName",
"dataType": "TEXT",
"dataTypeDisplay": "text",
"description": "Table Entity DisplayName.",
"tags": []
},
{
"name": "description",
"dataType": "TEXT",
"dataTypeDisplay": "text",
"description": "Table Entity Description.",
"tags": []
},
{
"name": "columns",
"dataType": "NESTED",
"dataTypeDisplay": "nested",
"description": "Table Columns.",
"tags": [],
"children": [
{
"name": "name",
"dataType": "TEXT",
"dataTypeDisplay": "text",
"description": "Column Name.",
"tags": []
},
{
"name": "displayName",
"dataType": "TEXT",
"dataTypeDisplay": "text",
"description": "Column DisplayName.",
"tags": []
},
{
"name": "description",
"dataType": "TEXT",
"dataTypeDisplay": "text",
"description": "Column Description.",
"tags": []
}
]
},
{
"name": "databaseSchema",
"dataType": "TEXT",
"dataTypeDisplay": "text",
"description": "Database Schema that this table belongs to.",
"tags": []
}
],
"tags": [],
"followers": []
}
]
}
12 changes: 12 additions & 0 deletions ingestion/examples/sample_data/searchIndexes/service.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "elasticsearch",
"serviceName": "elasticsearch_sample",
"serviceConnection": {
"config": {
"type": "ElasticSearch",
"hostPort": "localhost:9200"
}
},
"sourceConfig": {
}
}
10 changes: 10 additions & 0 deletions ingestion/src/metadata/ingestion/api/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
PipelineConnection,
PipelineServiceType,
)
from metadata.generated.schema.entity.services.searchService import (
SearchConnection,
SearchServiceType,
)
from metadata.generated.schema.entity.services.storageService import (
StorageConnection,
StorageServiceType,
Expand Down Expand Up @@ -74,6 +78,10 @@
PipelineMetadataConfigType,
PipelineServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.searchServiceMetadataPipeline import (
SearchMetadataConfigType,
SearchServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.storageServiceMetadataPipeline import (
StorageMetadataConfigType,
StorageServiceMetadataPipeline,
Expand Down Expand Up @@ -102,6 +110,7 @@
**{service: PipelineConnection for service in PipelineServiceType.__members__},
**{service: MlModelConnection for service in MlModelServiceType.__members__},
**{service: StorageConnection for service in StorageServiceType.__members__},
**{service: SearchConnection for service in SearchServiceType.__members__},
}

SOURCE_CONFIG_CLASS_MAP = {
Expand All @@ -113,6 +122,7 @@
MlModelMetadataConfigType.MlModelMetadata.value: MlModelServiceMetadataPipeline,
DatabaseMetadataConfigType.DatabaseMetadata.value: DatabaseServiceMetadataPipeline,
StorageMetadataConfigType.StorageMetadata.value: StorageServiceMetadataPipeline,
SearchMetadataConfigType.SearchMetadata.value: SearchServiceMetadataPipeline,
}


Expand Down
15 changes: 11 additions & 4 deletions ingestion/src/metadata/ingestion/ometa/ometa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.query import Query
from metadata.generated.schema.entity.data.report import Report
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.policies.policy import Policy
Expand All @@ -67,6 +68,7 @@
from metadata.generated.schema.entity.services.metadataService import MetadataService
from metadata.generated.schema.entity.services.mlmodelService import MlModelService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.services.searchService import SearchService
from metadata.generated.schema.entity.services.storageService import StorageService
from metadata.generated.schema.entity.teams.role import Role
from metadata.generated.schema.entity.teams.team import Team
Expand Down Expand Up @@ -358,6 +360,12 @@ def get_suffix(self, entity: Type[T]) -> str: # pylint: disable=R0911,R0912
):
return "/containers"

if issubclass(
entity,
get_args(Union[SearchIndex, self.get_create_entity_type(SearchIndex)]),
):
return "/searchIndexes"

if issubclass(
entity, get_args(Union[Workflow, self.get_create_entity_type(Workflow)])
):
Expand Down Expand Up @@ -422,11 +430,9 @@ def get_suffix(self, entity: Type[T]) -> str: # pylint: disable=R0911,R0912

if issubclass(
entity,
get_args(
Union[StorageService, self.get_create_entity_type(StorageService)]
),
get_args(Union[SearchService, self.get_create_entity_type(SearchService)]),
):
return "/services/storageServices"
return "/services/searchServices"

if issubclass(
entity,
Expand Down Expand Up @@ -533,6 +539,7 @@ def get_entity_from_create(self, create: Type[C]) -> Type[T]:
.replace("testsuite", "testSuite")
.replace("testdefinition", "testDefinition")
.replace("testcase", "testCase")
.replace("searchindex", "searchIndex")
)

class_path = ".".join(
Expand Down
3 changes: 3 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def format_name(name: str) -> str:
return re.sub(r"[" + subs + "]", "_", name)


# pylint: disable=too-many-return-statements
def get_entity_type(
entity: Union[Type[T], str],
) -> str:
Expand All @@ -54,6 +55,8 @@ def get_entity_type(
return class_name.replace("testsuite", "testSuite")
if "databaseschema" in class_name:
return class_name.replace("databaseschema", "databaseSchema")
if "searchindex" in class_name:
return class_name.replace("searchindex", "searchIndex")

return class_name

Expand Down
Loading

0 comments on commit 7aaf654

Please sign in to comment.