Skip to content

Commit

Permalink
Merge branch 'main' into issue-11002-v1
Browse files Browse the repository at this point in the history
  • Loading branch information
harshach authored Dec 16, 2024
2 parents 85c9eb1 + 254fce4 commit 415d463
Show file tree
Hide file tree
Showing 61 changed files with 1,454 additions and 322 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Individual scopes

# Review from UI owners for changes around UI code
/openmetadata-ui/src/main/resources/ui/ @ShaileshParmar11 @karanh37 @chirag-madlani @Sachin-chaurasiya
/openmetadata-ui/src/main/resources/ui/ @open-metadata/ui

# Review from Backend owners for changes around Backend code
/openmetadata-service/ @open-metadata/backend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@
)
from metadata.ingestion.connections.test_connections import test_connection_db_common
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.oracle.queries import CHECK_ACCESS_TO_ALL
from metadata.ingestion.source.database.oracle.queries import (
CHECK_ACCESS_TO_ALL,
ORACLE_GET_SCHEMA,
ORACLE_GET_STORED_PACKAGES,
)
from metadata.utils.constants import THREE_MIN
from metadata.utils.logger import ingestion_logger

Expand Down Expand Up @@ -131,6 +135,12 @@ def get_connection(connection: OracleConnection) -> Engine:
)


class OraclePackageAccessError(Exception):
"""
Raised when unable to access Oracle stored packages
"""


def test_connection(
metadata: OpenMetadata,
engine: Engine,
Expand All @@ -143,7 +153,19 @@ def test_connection(
of a metadata workflow or during an Automation Workflow
"""

test_conn_queries = {"CheckAccess": CHECK_ACCESS_TO_ALL}
def test_oracle_package_access(engine):
try:
schema_name = engine.execute(ORACLE_GET_SCHEMA).scalar()
return ORACLE_GET_STORED_PACKAGES.format(schema=schema_name)
except Exception as e:
raise OraclePackageAccessError(
f"Failed to access Oracle stored packages: {e}"
)

test_conn_queries = {
"CheckAccess": CHECK_ACCESS_TO_ALL,
"PackageAccess": test_oracle_package_access(engine),
}

return test_connection_db_common(
metadata=metadata,
Expand Down
55 changes: 36 additions & 19 deletions ingestion/src/metadata/ingestion/source/database/oracle/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from metadata.generated.schema.entity.data.storedProcedure import (
Language,
StoredProcedureCode,
StoredProcedureType,
)
from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.services.connections.database.oracleConnection import (
Expand All @@ -45,10 +46,11 @@
TableNameAndType,
)
from metadata.ingestion.source.database.oracle.models import (
FetchProcedureList,
OracleStoredProcedure,
FetchObjectList,
OracleStoredObject,
)
from metadata.ingestion.source.database.oracle.queries import (
ORACLE_GET_STORED_PACKAGES,
ORACLE_GET_STORED_PROCEDURES,
)
from metadata.ingestion.source.database.oracle.utils import (
Expand Down Expand Up @@ -181,48 +183,63 @@ def get_schema_definition(
logger.warning(f"Failed to fetch Schema definition for {table_name}: {exc}")
return None

def process_result(self, data: FetchProcedureList):
def process_result(self, data: FetchObjectList):
"""Process data as per our stored procedure format"""
result_dict = {}

for row in data:
owner, name, line, text = row

owner, name, line, text, procedure_type = row
key = (owner, name)
if key not in result_dict:
result_dict[key] = {"lines": [], "text": ""}
result_dict[key] = {"lines": [], "text": "", "procedure_type": ""}
result_dict[key]["lines"].append(line)
result_dict[key]["text"] += text
result_dict[key]["procedure_type"] = procedure_type

# Return the concatenated text for each procedure name, ordered by line
return result_dict

def get_stored_procedures(self) -> Iterable[OracleStoredProcedure]:
def _get_stored_procedures_internal(
self, query: str
) -> Iterable[OracleStoredObject]:
results: FetchObjectList = self.engine.execute(
query.format(schema=self.context.get().database_schema.upper())
).all()
results = self.process_result(data=results)
for row in results.items():
stored_procedure = OracleStoredObject(
name=row[0][1],
definition=row[1]["text"],
owner=row[0][0],
procedure_type=row[1]["procedure_type"],
)
yield stored_procedure

def get_stored_procedures(self) -> Iterable[OracleStoredObject]:
"""List Oracle Stored Procedures"""
if self.source_config.includeStoredProcedures:
results: FetchProcedureList = self.engine.execute(
ORACLE_GET_STORED_PROCEDURES.format(
schema=self.context.get().database_schema.upper()
)
).all()
results = self.process_result(data=results)
for row in results.items():
stored_procedure = OracleStoredProcedure(
name=row[0][1], definition=row[1]["text"], owner=row[0][0]
)
yield stored_procedure
yield from self._get_stored_procedures_internal(
ORACLE_GET_STORED_PROCEDURES
)
yield from self._get_stored_procedures_internal(ORACLE_GET_STORED_PACKAGES)

def yield_stored_procedure(
self, stored_procedure: OracleStoredProcedure
self, stored_procedure: OracleStoredObject
) -> Iterable[Either[CreateStoredProcedureRequest]]:
"""Prepare the stored procedure payload"""

try:
stored_procedure_request = CreateStoredProcedureRequest(
name=EntityName(stored_procedure.name),
storedProcedureCode=StoredProcedureCode(
language=Language.SQL,
code=stored_procedure.definition,
),
storedProcedureType=(
StoredProcedureType.StoredPackage
if stored_procedure.procedure_type == "StoredPackage"
else StoredProcedureType.StoredProcedure
),
owners=self.metadata.get_reference_by_name(
name=stored_procedure.owner.lower(), is_owner=True
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pydantic import BaseModel, Field


class OracleStoredProcedure(BaseModel):
class OracleStoredObject(BaseModel):
"""Oracle Stored Procedure list query results"""

name: str
Expand All @@ -15,9 +15,10 @@ class OracleStoredProcedure(BaseModel):
None, description="Will only be informed for non-SQL routines."
)
owner: str
procedure_type: Optional[str] = Field(None, alias="procedure_type")


class FetchProcedure(BaseModel):
class FetchObject(BaseModel):
"""Oracle Fetch Stored Procedure Raw Model"""

owner: Optional[str] = None
Expand All @@ -26,5 +27,5 @@ class FetchProcedure(BaseModel):
text: str


class FetchProcedureList(BaseModel):
__name__: List[FetchProcedure]
class FetchObjectList(BaseModel):
__name__: List[FetchObject]
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,34 @@
OWNER,
NAME,
LINE,
TEXT
TEXT,
'StoredProcedure' as procedure_type
FROM
DBA_SOURCE
WHERE
type = 'PROCEDURE' and owner = '{schema}'
"""
)
ORACLE_GET_SCHEMA = """
SELECT USERNAME AS SCHEMA_NAME
FROM ALL_USERS
WHERE ROWNUM = 1
ORDER BY USERNAME
"""
ORACLE_GET_STORED_PACKAGES = textwrap.dedent(
"""
SELECT
OWNER,
NAME,
LINE,
TEXT,
'StoredPackage' as procedure_type
FROM
DBA_SOURCE
WHERE TYPE IN ('PACKAGE', 'PACKAGE BODY') AND owner = '{schema}'
"""
)
CHECK_ACCESS_TO_ALL = "SELECT table_name FROM DBA_TABLES where ROWNUM < 2"
ORACLE_GET_STORED_PROCEDURE_QUERIES = textwrap.dedent(
"""
Expand Down
45 changes: 42 additions & 3 deletions ingestion/tests/unit/topology/database/test_oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.storedProcedure import StoredProcedureCode
from metadata.generated.schema.entity.data.storedProcedure import (
StoredProcedureCode,
StoredProcedureType,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
Expand All @@ -40,7 +43,7 @@
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.oracle.metadata import OracleSource
from metadata.ingestion.source.database.oracle.models import OracleStoredProcedure
from metadata.ingestion.source.database.oracle.models import OracleStoredObject

mock_oracle_config = {
"source": {
Expand Down Expand Up @@ -103,10 +106,18 @@
),
)

MOCK_STORED_PROCEDURE = OracleStoredProcedure(
MOCK_STORED_PROCEDURE = OracleStoredObject(
name="sample_procedure",
definition="SAMPLE_SQL_TEXT",
owner="sample_stored_prcedure_owner",
procedure_type="StoredProcedure",
)

MOCK_STORED_PACKAGE = OracleStoredObject(
name="sample_package",
definition="SAMPLE_SQL_TEXT",
owner="sample_stored_package_owner",
procedure_type="StoredPackage",
)

EXPECTED_DATABASE = [
Expand Down Expand Up @@ -154,6 +165,28 @@
owners=None,
tags=None,
storedProcedureCode=StoredProcedureCode(language="SQL", code="SAMPLE_SQL_TEXT"),
storedProcedureType=StoredProcedureType.StoredProcedure,
databaseSchema=FullyQualifiedEntityName(
"oracle_source_test.sample_database.sample_schema"
),
extension=None,
dataProducts=None,
sourceUrl=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
]

EXPECTED_STORED_PACKAGE = [
CreateStoredProcedureRequest(
name=EntityName("sample_package"),
displayName=None,
description=None,
owners=None,
tags=None,
storedProcedureCode=StoredProcedureCode(language="SQL", code="SAMPLE_SQL_TEXT"),
storedProcedureType=StoredProcedureType.StoredPackage,
databaseSchema=FullyQualifiedEntityName(
"oracle_source_test.sample_database.sample_schema"
),
Expand Down Expand Up @@ -221,3 +254,9 @@ def test_yield_stored_procedure(self):
either.right
for either in self.oracle.yield_stored_procedure(MOCK_STORED_PROCEDURE)
]

def test_yield_stored_package(self):
assert EXPECTED_STORED_PACKAGE == [
either.right
for either in self.oracle.yield_stored_procedure(MOCK_STORED_PACKAGE)
]
Loading

0 comments on commit 415d463

Please sign in to comment.