From 6351f91b6e5bece7fab38a5ead8dd9f619284060 Mon Sep 17 00:00:00 2001 From: reata Date: Mon, 1 May 2023 15:11:08 +0800 Subject: [PATCH 01/10] fix: missing column lineage for select distinct (#365) --- sqllineage/core/parser/sqlparse/handlers/source.py | 4 ++++ tests/test_columns.py | 10 ++++++++++ 2 files changed, 14 insertions(+) diff --git a/sqllineage/core/parser/sqlparse/handlers/source.py b/sqllineage/core/parser/sqlparse/handlers/source.py index 18c03f52..11210180 100644 --- a/sqllineage/core/parser/sqlparse/handlers/source.py +++ b/sqllineage/core/parser/sqlparse/handlers/source.py @@ -47,6 +47,10 @@ def _indicate(self, token: Token) -> bool: if token.normalized in ("UNION", "UNION ALL"): self.union_barriers.append((len(self.columns), len(self.tables))) + if self.column_flag is True and bool(token.normalized == "DISTINCT"): + # special handling for SELECT DISTINCT + return True + if any(re.match(regex, token.normalized) for regex in self.SOURCE_TABLE_TOKENS): self.column_flag = False return True diff --git a/tests/test_columns.py b/tests/test_columns.py index d777bf4c..1b085c00 100644 --- a/tests/test_columns.py +++ b/tests/test_columns.py @@ -52,6 +52,16 @@ def test_select_column_wildcard(): ) +def test_select_distinct_column(): + sql = """INSERT INTO tab1 +SELECT DISTINCT col1 +FROM tab2""" + assert_column_lineage_equal( + sql, + [(ColumnQualifierTuple("col1", "tab2"), ColumnQualifierTuple("col1", "tab1"))], + ) + + def test_select_column_using_function(): sql = """INSERT INTO tab1 SELECT max(col1), From c568aed460da3ec72443e57c806ea36781f8bf0e Mon Sep 17 00:00:00 2001 From: reata Date: Mon, 1 May 2023 15:45:04 +0800 Subject: [PATCH 02/10] fix: parenthesis around arithmetic operation (#366) --- .../core/parser/sqlparse/handlers/source.py | 2 +- tests/test_columns.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sqllineage/core/parser/sqlparse/handlers/source.py b/sqllineage/core/parser/sqlparse/handlers/source.py index 11210180..f7cc0a3f 100644 --- a/sqllineage/core/parser/sqlparse/handlers/source.py +++ b/sqllineage/core/parser/sqlparse/handlers/source.py @@ -96,7 +96,7 @@ def _handle_table(self, token: Token, holder: SubQueryLineageHolder) -> None: ) def _handle_column(self, token: Token) -> None: - column_token_types = (Identifier, Function, Operation, Case) + column_token_types = (Identifier, Function, Operation, Case, Parenthesis) if isinstance(token, column_token_types) or token.ttype is Wildcard: column_tokens = [token] elif isinstance(token, IdentifierList): diff --git a/tests/test_columns.py b/tests/test_columns.py index 1b085c00..fcf0a598 100644 --- a/tests/test_columns.py +++ b/tests/test_columns.py @@ -223,6 +223,22 @@ def test_select_column_using_expression(): def test_select_column_using_expression_in_parenthesis(): sql = """INSERT INTO tab1 +SELECT (col1 + col2) +FROM tab2""" + assert_column_lineage_equal( + sql, + [ + ( + ColumnQualifierTuple("col1", "tab2"), + ColumnQualifierTuple("(col1 + col2)", "tab1"), + ), + ( + ColumnQualifierTuple("col2", "tab2"), + ColumnQualifierTuple("(col1 + col2)", "tab1"), + ), + ], + ) + sql = """INSERT INTO tab1 SELECT (col1 + col2) AS col3 FROM tab2""" assert_column_lineage_equal( From c7be506430fb3d3bcbfc949d62fb16fd0ed6f757 Mon Sep 17 00:00:00 2001 From: reata Date: Mon, 1 May 2023 17:15:10 +0800 Subject: [PATCH 03/10] feat: support postgres style type cast (#367) --- sqllineage/core/parser/sqlfluff/models.py | 18 +++++++++++++++++- tests/test_columns.py | 10 ++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sqllineage/core/parser/sqlfluff/models.py b/sqllineage/core/parser/sqlfluff/models.py index 2e52eaac..ecd4e4a6 100644 --- a/sqllineage/core/parser/sqlfluff/models.py +++ b/sqllineage/core/parser/sqlfluff/models.py @@ -24,6 +24,7 @@ "when_clause", "else_clause", "select_clause_element", + "cast_expression", ] SOURCE_COLUMN_SEGMENT_TYPE = NON_IDENTIFIER_OR_COLUMN_SEGMENT_TYPE + [ @@ -113,7 +114,22 @@ def of(column: BaseSegment, **kwargs) -> Column: for sub_segment in sub_segments: if sub_segment.type == "column_reference": column_name = get_identifier(sub_segment) - + elif sub_segment.type == "expression": + # special handling for postgres style type cast, col as target column name instead of col::type + if len(sub2_segments := retrieve_segments(sub_segment)) == 1: + if ( + sub2_segment := sub2_segments[0] + ).type == "cast_expression": + if ( + len( + sub3_segments := retrieve_segments(sub2_segment) + ) + == 2 + ): + if ( + sub3_segment := sub3_segments[0] + ).type == "column_reference": + column_name = get_identifier(sub3_segment) return Column( column.raw if column_name is None else column_name, source_columns=source_columns, diff --git a/tests/test_columns.py b/tests/test_columns.py index fcf0a598..d4bba137 100644 --- a/tests/test_columns.py +++ b/tests/test_columns.py @@ -734,6 +734,16 @@ def test_cast_using_constant(): assert_column_lineage_equal(sql) +def test_postgres_style_type_cast(): + sql = """INSERT INTO tab1 +SELECT col1::timestamp +FROM tab2""" + assert_column_lineage_equal( + sql, + [(ColumnQualifierTuple("col1", "tab2"), ColumnQualifierTuple("col1", "tab1"))], + ) + + def test_window_function_in_subquery(): sql = """INSERT INTO tab1 SELECT rn FROM ( From c6488ddff010da5275a05d12c4fb05a6f453e3e6 Mon Sep 17 00:00:00 2001 From: reata Date: Sat, 13 May 2023 20:54:56 +0800 Subject: [PATCH 04/10] fix: CTE at the start of query in DML (#369) --- .../parser/sqlfluff/extractors/dml_insert_extractor.py | 8 ++++++++ tests/test_cte.py | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py b/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py index 3d3a4757..dd2202af 100644 --- a/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py +++ b/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py @@ -59,6 +59,14 @@ def extract( for current_handler in handlers: current_handler.handle(segment, holder) + if segment.type == "with_compound_statement": + from .cte_extractor import DmlCteExtractor + + holder |= DmlCteExtractor(self.dialect).extract( + segment, + AnalyzerContext(prev_cte=holder.cte, prev_write=holder.write), + ) + if segment.type == "select_statement": holder |= DmlSelectExtractor(self.dialect).extract( segment, diff --git a/tests/test_cte.py b/tests/test_cte.py index 1fd979f5..19345050 100644 --- a/tests/test_cte.py +++ b/tests/test_cte.py @@ -54,3 +54,11 @@ def test_with_insert(): {"tab2"}, {"tab3"}, ) + + +def test_with_insert_in_query(): + assert_table_lineage_equal( + "INSERT INTO tab3 WITH tab1 AS (SELECT * FROM tab2) SELECT * FROM tab1", + {"tab2"}, + {"tab3"}, + ) From 8882d4afa29a87e1bcef0b688d7035bcbbab4fd6 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Mon, 29 May 2023 20:14:55 +0530 Subject: [PATCH 05/10] support create table..clone and alter table...swap (#372) * support create table..clone and alter table...swap * fix test name * add bigquery test * address review comments * merge tests * black * docs: add comment for skipping sqlparse test --------- Co-authored-by: reata --- .../extractors/ddl_alter_extractor.py | 7 +++- .../core/parser/sqlfluff/handlers/target.py | 14 ++++---- tests/test_others_dialect_specific.py | 32 +++++++++++++++++++ 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/sqllineage/core/parser/sqlfluff/extractors/ddl_alter_extractor.py b/sqllineage/core/parser/sqlfluff/extractors/ddl_alter_extractor.py index 83a8a217..30d6ccdd 100644 --- a/sqllineage/core/parser/sqlfluff/extractors/ddl_alter_extractor.py +++ b/sqllineage/core/parser/sqlfluff/extractors/ddl_alter_extractor.py @@ -13,6 +13,8 @@ class DdlAlterExtractor(LineageHolderExtractor): DDL Alter queries lineage extractor """ + SOURCE_KEYWORDS = {"EXCHANGE", "SWAP"} + SUPPORTED_STMT_TYPES = [ "alter_table_statement", "rename_statement", @@ -44,7 +46,10 @@ def extract( if any(k.raw_upper == "RENAME" for k in keywords): if statement.type == "alter_table_statement" and len(tables) == 2: holder.add_rename(tables[0], tables[1]) - if any(k.raw_upper == "EXCHANGE" for k in keywords) and len(tables) == 2: + if ( + any(k.raw_upper in self.SOURCE_KEYWORDS for k in keywords) + and len(tables) == 2 + ): holder.add_write(tables[0]) holder.add_read(tables[1]) return holder diff --git a/sqllineage/core/parser/sqlfluff/handlers/target.py b/sqllineage/core/parser/sqlfluff/handlers/target.py index e4d3aa4e..a1a1052e 100644 --- a/sqllineage/core/parser/sqlfluff/handlers/target.py +++ b/sqllineage/core/parser/sqlfluff/handlers/target.py @@ -22,7 +22,7 @@ class TargetHandler(ConditionalSegmentBaseHandler): def __init__(self) -> None: self.indicator = False - self.prev_token_like = False + self.prev_token_read = False self.prev_token_from = False TARGET_KEYWORDS = ( @@ -35,7 +35,7 @@ def __init__(self) -> None: "DIRECTORY", ) - LIKE_KEYWORD = "LIKE" + READ_KEYWORDS = {"LIKE", "CLONE"} FROM_KEYWORD = "FROM" @@ -44,17 +44,17 @@ def _init_tokens(self, segment: BaseSegment) -> None: Check if the segment is a 'LIKE' or 'FROM' keyword :param segment: segment to be use for checking """ - if segment.raw_upper == self.LIKE_KEYWORD: - self.prev_token_like = True + if segment.raw_upper in self.READ_KEYWORDS: + self.prev_token_read = True if segment.raw_upper == self.FROM_KEYWORD: self.prev_token_from = True def _reset_tokens(self) -> None: """ - Set 'prev_token_like' and 'prev_token_like' variable to False + Set 'prev_token_from' and 'prev_token_read' variable to False """ - self.prev_token_like = False + self.prev_token_read = False self.prev_token_from = False def indicate(self, segment: BaseSegment) -> bool: @@ -81,7 +81,7 @@ def handle(self, segment: BaseSegment, holder: SubQueryLineageHolder) -> None: """ if segment.type == "table_reference": write_obj = SqlFluffTable.of(segment) - if self.prev_token_like: + if self.prev_token_read: holder.add_read(write_obj) else: holder.add_write(write_obj) diff --git a/tests/test_others_dialect_specific.py b/tests/test_others_dialect_specific.py index 3ca5c9b7..0aac6d03 100644 --- a/tests/test_others_dialect_specific.py +++ b/tests/test_others_dialect_specific.py @@ -94,6 +94,38 @@ def test_alter_table_exchange_partition(dialect: str): ) +@pytest.mark.parametrize("dialect", ["snowflake", "bigquery"]) +def test_create_clone(dialect: str): + """ + Language manual: + https://cloud.google.com/bigquery/docs/table-clones-create + https://docs.snowflake.com/en/sql-reference/sql/create-clone + Note clone is not a keyword in sqlparse, we'll skip testing for it. + """ + assert_table_lineage_equal( + "create table tab2 CLONE tab1;", + {"tab1"}, + {"tab2"}, + dialect=dialect, + test_sqlparse=False, + ) + + +@pytest.mark.parametrize("dialect", ["snowflake"]) +def test_alter_table_swap_partition(dialect: str): + """ + See https://docs.snowflake.com/en/sql-reference/sql/alter-table for language manual + Note swap is not a keyword in sqlparse, we'll skip testing for it. + """ + assert_table_lineage_equal( + "alter table tab1 swap with tab2", + {"tab2"}, + {"tab1"}, + dialect=dialect, + test_sqlparse=False, + ) + + @pytest.mark.parametrize("dialect", ["databricks", "sparksql"]) def test_refresh_table(dialect: str): assert_table_lineage_equal("refresh table tab1", None, None, dialect) From 10bec6d1a40f2c68c826ddccd1a96d9853c41440 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Sun, 11 Jun 2023 17:08:45 +0530 Subject: [PATCH 06/10] feat: Improve column level lineage for insert & create queries (sqlfluff) (#371) * Improve column level lineage for insert & create queries * fix format * fix flake * fix typing * fix black * fix type * Address review comments: add target_column as property and add target columns as graph edge * fix mypy: handle subquery type * refactor: simplify SourceHandlerMixin logic * refactor: simplify index store and query * refactor: simplify DmlInsertExtractor * test: move sqlfluff specific test cases to test_sqlfluff.py * test: add a parenthesized union test case --------- Co-authored-by: reata --- sqllineage/core/holders.py | 33 ++++- sqllineage/core/models.py | 9 +- sqllineage/core/parser/__init__.py | 12 +- .../extractors/dml_insert_extractor.py | 84 ++++++------ .../extractors/lineage_holder_extractor.py | 4 + .../core/parser/sqlfluff/handlers/target.py | 16 +++ sqllineage/utils/constant.py | 4 + tests/test_insert.py | 18 +-- tests/test_others.py | 10 -- tests/test_sqlfluff.py | 128 ++++++++++++++++++ 10 files changed, 243 insertions(+), 75 deletions(-) diff --git a/sqllineage/core/holders.py b/sqllineage/core/holders.py index 7434522c..24837cfc 100644 --- a/sqllineage/core/holders.py +++ b/sqllineage/core/holders.py @@ -1,11 +1,11 @@ import itertools -from typing import Set, Tuple, Union +from typing import List, Set, Tuple, Union import networkx as nx from networkx import DiGraph from sqllineage.core.models import Column, Path, SubQuery, Table -from sqllineage.utils.constant import EdgeType, NodeTag +from sqllineage.utils.constant import EdgeTag, EdgeType, NodeTag DATASET_CLASSES = (Path, Table) @@ -84,6 +84,35 @@ def cte(self) -> Set[SubQuery]: def add_cte(self, value) -> None: self._property_setter(value, NodeTag.CTE) + @property + def target_columns(self) -> List[Column]: + """ + in case of DML with column specified, like INSERT INTO tab1 (col1, col2) SELECT ... + target_columns tell us that tab1 has column col1 and col2 in that order. + """ + tgt_cols = [] + if self.write: + tgt_tbl = list(self.write)[0] + tgt_col_with_idx: List[Tuple[Column, int]] = sorted( + [ + (col, attr.get(EdgeTag.INDEX, 0)) + for tbl, col, attr in self.graph.out_edges(tgt_tbl, data=True) + if attr["type"] == EdgeType.HAS_COLUMN + ], + key=lambda x: x[1], + ) + tgt_cols = [x[0] for x in tgt_col_with_idx] + return tgt_cols + + def add_target_column(self, *tgt_cols: Column) -> None: + if self.write: + tgt_tbl = list(self.write)[0] + for idx, tgt_col in enumerate(tgt_cols): + tgt_col.parent = tgt_tbl + self.graph.add_edge( + tgt_tbl, tgt_col, type=EdgeType.HAS_COLUMN, **{EdgeTag.INDEX: idx} + ) + def add_column_lineage(self, src: Column, tgt: Column) -> None: self.graph.add_edge(src, tgt, type=EdgeType.LINEAGE) self.graph.add_edge(tgt.parent, tgt, type=EdgeType.HAS_COLUMN) diff --git a/sqllineage/core/models.py b/sqllineage/core/models.py index 4df575c8..c5eb25f2 100644 --- a/sqllineage/core/models.py +++ b/sqllineage/core/models.py @@ -225,21 +225,22 @@ class AnalyzerContext: Data class to hold the analyzer context """ - subquery: Optional[SubQuery] - prev_cte: Optional[Set[SubQuery]] - prev_write: Optional[Set[Union[SubQuery, Table]]] - def __init__( self, subquery: Optional[SubQuery] = None, prev_cte: Optional[Set[SubQuery]] = None, prev_write: Optional[Set[Union[SubQuery, Table]]] = None, + target_columns=None, ): """ :param subquery: subquery :param prev_cte: previous CTE queries :param prev_write: previous written tables + :param target_columns: previous target columns """ + if target_columns is None: + target_columns = [] self.subquery = subquery self.prev_cte = prev_cte self.prev_write = prev_write + self.target_columns = target_columns diff --git a/sqllineage/core/parser/__init__.py b/sqllineage/core/parser/__init__.py index 3b2a1422..2719084e 100644 --- a/sqllineage/core/parser/__init__.py +++ b/sqllineage/core/parser/__init__.py @@ -21,17 +21,23 @@ def end_of_query_cleanup(self, holder: SubQueryLineageHolder) -> None: ) col_grp = self.columns[prev_col_barrier:col_barrier] tbl_grp = self.tables[prev_tbl_barrier:tbl_barrier] - tgt_tbl = None if holder.write: if len(holder.write) > 1: raise SQLLineageException tgt_tbl = list(holder.write)[0] - if tgt_tbl: - for tgt_col in col_grp: + for idx in range(len(col_grp)): + tgt_col = col_grp[idx] tgt_col.parent = tgt_tbl for src_col in tgt_col.to_source_columns( self.get_alias_mapping_from_table_group(tbl_grp, holder) ): + if len(target_columns := holder.target_columns) == len(col_grp): + # example query: create view test (col3) select col1 as col2 from tab + # without target_columns = [col3] information, by default src_col = col1 and tgt_col = col2 + # when target_columns exist and length matches, we want tgt_col = col3 instead of col2 + # for invalid query: create view test (col3, col4) select col1 as col2 from tab, + # when the length doesn't match, we fall back to default behavior + tgt_col = target_columns[idx] holder.add_column_lineage(src_col, tgt_col) @classmethod diff --git a/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py b/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py index dd2202af..4e4943c1 100644 --- a/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py +++ b/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py @@ -1,3 +1,5 @@ +from typing import Optional + from sqlfluff.core.parser import BaseSegment from sqllineage.core.holders import SubQueryLineageHolder @@ -9,7 +11,7 @@ LineageHolderExtractor, ) from sqllineage.core.parser.sqlfluff.models import SqlFluffSubQuery -from sqllineage.core.parser.sqlfluff.utils import retrieve_segments +from sqllineage.core.parser.sqlfluff.utils import get_child, is_union, retrieve_segments class DmlInsertExtractor(LineageHolderExtractor): @@ -48,14 +50,8 @@ def extract( holder = self._init_holder(context) - subqueries = [] segments = retrieve_segments(statement) for segment in segments: - for sq in self.parse_subquery(segment): - # Collecting subquery on the way, hold on parsing until last - # so that each handler don't have to worry about what's inside subquery - subqueries.append(sq) - for current_handler in handlers: current_handler.handle(segment, holder) @@ -66,40 +62,46 @@ def extract( segment, AnalyzerContext(prev_cte=holder.cte, prev_write=holder.write), ) + elif segment.type == "bracketed" and ( + self.parse_subquery(segment) or is_union(segment) + ): + # note regular subquery within SELECT statement is handled by DmlSelectExtractor, this is only to handle + # top-level subquery in DML like: 1) create table foo as (subquery); 2) insert into foo (subquery) + # subquery here isn't added as read source, and it inherits DML-level target_columns if parsed + if subquery_segment := get_child(segment, "select_statement"): + self._extract_select(holder, subquery_segment) + elif subquery_segment := get_child(segment, "set_expression"): + self._extract_set(holder, subquery_segment) + elif segment.type == "select_statement": + self._extract_select(holder, segment) + elif segment.type == "set_expression": + self._extract_set(holder, segment) + else: + for conditional_handler in conditional_handlers: + if conditional_handler.indicate(segment): + conditional_handler.handle(segment, holder) - if segment.type == "select_statement": - holder |= DmlSelectExtractor(self.dialect).extract( - segment, - AnalyzerContext( - SqlFluffSubQuery.of(segment, None), - prev_cte=holder.cte, - prev_write=holder.write, - ), - ) - continue - - if segment.type == "set_expression": - sub_segments = retrieve_segments(segment) - for sub_segment in sub_segments: - if sub_segment.type == "select_statement": - holder |= DmlSelectExtractor(self.dialect).extract( - sub_segment, - AnalyzerContext( - SqlFluffSubQuery.of(segment, None), - prev_cte=holder.cte, - prev_write=holder.write, - ), - ) - continue - - for conditional_handler in conditional_handlers: - if conditional_handler.indicate(segment): - conditional_handler.handle(segment, holder) + return holder - # By recursively extracting each subquery of the parent and merge, we're doing Depth-first search - for sq in subqueries: - holder |= DmlSelectExtractor(self.dialect).extract( - sq.query, AnalyzerContext(sq, holder.cte) - ) + def _extract_set(self, holder: SubQueryLineageHolder, set_segment: BaseSegment): + for sub_segment in retrieve_segments(set_segment): + if sub_segment.type == "select_statement": + self._extract_select(holder, sub_segment, set_segment) - return holder + def _extract_select( + self, + holder: SubQueryLineageHolder, + select_segment: BaseSegment, + set_segment: Optional[BaseSegment] = None, + ): + holder |= DmlSelectExtractor(self.dialect).extract( + select_segment, + AnalyzerContext( + SqlFluffSubQuery.of( + set_segment if set_segment else select_segment, None + ), + prev_cte=holder.cte, + prev_write=holder.write, + target_columns=holder.target_columns, + ), + ) diff --git a/sqllineage/core/parser/sqlfluff/extractors/lineage_holder_extractor.py b/sqllineage/core/parser/sqlfluff/extractors/lineage_holder_extractor.py index b6ec5ea0..337a56b4 100644 --- a/sqllineage/core/parser/sqlfluff/extractors/lineage_holder_extractor.py +++ b/sqllineage/core/parser/sqlfluff/extractors/lineage_holder_extractor.py @@ -130,4 +130,8 @@ def _init_holder(context: AnalyzerContext) -> SubQueryLineageHolder: # If within subquery, then manually add subquery as target table holder.add_write(context.subquery) + if context.target_columns: + # target columns can be referred while creating column level lineage + holder.add_target_column(*context.target_columns) + return holder diff --git a/sqllineage/core/parser/sqlfluff/handlers/target.py b/sqllineage/core/parser/sqlfluff/handlers/target.py index a1a1052e..39684121 100644 --- a/sqllineage/core/parser/sqlfluff/handlers/target.py +++ b/sqllineage/core/parser/sqlfluff/handlers/target.py @@ -5,6 +5,7 @@ from sqllineage.core.parser.sqlfluff.handlers.base import ConditionalSegmentBaseHandler from sqllineage.core.parser.sqlfluff.holder_utils import retrieve_holder_data_from from sqllineage.core.parser.sqlfluff.models import ( + SqlFluffColumn, SqlFluffTable, ) from sqllineage.core.parser.sqlfluff.utils import ( @@ -135,3 +136,18 @@ def handle(self, segment: BaseSegment, holder: SubQueryLineageHolder) -> None: ) if read: holder.add_read(read) + + elif segment.type == "bracketed": + """ + In case of bracketed column reference, add these target columns to holder + so that when we compute the column level lineage + we keep these columns into consideration + """ + sub_segments = retrieve_segments(segment) + if all( + sub_segment.type == "column_reference" for sub_segment in sub_segments + ): + # target columns only apply to bracketed column references + holder.add_target_column( + *[SqlFluffColumn.of(sub_segment) for sub_segment in sub_segments] + ) diff --git a/sqllineage/utils/constant.py b/sqllineage/utils/constant.py index 16659626..3fdc6f4e 100644 --- a/sqllineage/utils/constant.py +++ b/sqllineage/utils/constant.py @@ -11,6 +11,10 @@ class NodeTag: SELFLOOP = "selfloop" +class EdgeTag: + INDEX = "index" + + @unique class EdgeType(Enum): LINEAGE = 1 diff --git a/tests/test_insert.py b/tests/test_insert.py index d94864ce..595fb1ac 100644 --- a/tests/test_insert.py +++ b/tests/test_insert.py @@ -5,21 +5,9 @@ def test_insert_into(): assert_table_lineage_equal("INSERT INTO tab1 VALUES (1, 2)", set(), {"tab1"}) -def test_insert_into_with_columns(): +def test_insert_into_select(): assert_table_lineage_equal( - "INSERT INTO tab1 (col1, col2) SELECT * FROM tab2;", {"tab2"}, {"tab1"} - ) - - -def test_insert_into_with_columns_and_select(): - assert_table_lineage_equal( - "INSERT INTO tab1 (col1, col2) SELECT * FROM tab2", {"tab2"}, {"tab1"} - ) - - -def test_insert_into_with_columns_and_select_union(): - assert_table_lineage_equal( - "INSERT INTO tab1 (col1, col2) SELECT * FROM tab2 UNION SELECT * FROM tab3", - {"tab2", "tab3"}, + "INSERT INTO tab1 SELECT * FROM tab2;", + {"tab2"}, {"tab1"}, ) diff --git a/tests/test_others.py b/tests/test_others.py index a5e7349d..8f3fc109 100644 --- a/tests/test_others.py +++ b/tests/test_others.py @@ -38,22 +38,12 @@ def test_create_as(): ) -def test_create_as_with_parenthesis_around_select_statement(): - sql = "CREATE TABLE tab1 AS (SELECT * FROM tab2)" - assert_table_lineage_equal(sql, {"tab2"}, {"tab1"}) - - def test_create_as_with_parenthesis_around_table_name(): assert_table_lineage_equal( "CREATE TABLE tab1 AS SELECT * FROM (tab2)", {"tab2"}, {"tab1"} ) -def test_create_as_with_parenthesis_around_both(): - sql = "CREATE TABLE tab1 AS (SELECT * FROM (tab2))" - assert_table_lineage_equal(sql, {"tab2"}, {"tab1"}) - - def test_create_like(): assert_table_lineage_equal("CREATE TABLE tab1 LIKE tab2", {"tab2"}, {"tab1"}) diff --git a/tests/test_sqlfluff.py b/tests/test_sqlfluff.py index 386c0710..6fde533d 100644 --- a/tests/test_sqlfluff.py +++ b/tests/test_sqlfluff.py @@ -102,3 +102,131 @@ def test_non_reserved_keyword_as_column_name(): ], test_sqlparse=False, ) + + +# For top-level query parenthesis in DML, we don't treat it as subquery. +# sqlparse has some problems identifying these subqueries. +# note the table-level lineage works, only column-level lineage breaks for sqlparse +def test_create_as_with_parenthesis_around_select_statement(): + sql = "CREATE TABLE tab1 AS (SELECT * FROM tab2)" + assert_table_lineage_equal(sql, {"tab2"}, {"tab1"}, test_sqlparse=False) + + +def test_create_as_with_parenthesis_around_both(): + sql = "CREATE TABLE tab1 AS (SELECT * FROM (tab2))" + assert_table_lineage_equal(sql, {"tab2"}, {"tab1"}, test_sqlparse=False) + + +# specify columns in CREATE statement, sqlparse would parse my_view as function call +def test_view_with_subquery_custom_columns(): + # select as subquery + sql = "CREATE VIEW my_view (random1,random2) AS (SELECT col1,col2 FROM tbl)" + assert_column_lineage_equal( + sql, + [ + ( + ColumnQualifierTuple("col1", "tbl"), + ColumnQualifierTuple("random1", "my_view"), + ), + ( + ColumnQualifierTuple("col2", "tbl"), + ColumnQualifierTuple("random2", "my_view"), + ), + ], + test_sqlparse=False, + ) + sql = "CREATE VIEW my_view (random1,random2) AS SELECT col1,col2 FROM tbl" + assert_column_lineage_equal( + sql, + [ + ( + ColumnQualifierTuple("col1", "tbl"), + ColumnQualifierTuple("random1", "my_view"), + ), + ( + ColumnQualifierTuple("col2", "tbl"), + ColumnQualifierTuple("random2", "my_view"), + ), + ], + test_sqlparse=False, + ) + + +def test_create_view_with_same_columns(): + sql = "CREATE VIEW my_view (col1,col2) AS (SELECT col1,col2 FROM tbl)" + assert_column_lineage_equal( + sql, + [ + ( + ColumnQualifierTuple("col1", "tbl"), + ColumnQualifierTuple("col1", "my_view"), + ), + ( + ColumnQualifierTuple("col2", "tbl"), + ColumnQualifierTuple("col2", "my_view"), + ), + ], + test_sqlparse=False, + ) + + +# specify columns in INSERT statement, sqlparse would parse my_view as function call +def test_insert_into_with_columns(): + # table lineage works for sqlparse + assert_table_lineage_equal( + "INSERT INTO tab1 (col1, col2) SELECT * FROM tab2;", + {"tab2"}, + {"tab1"}, + test_sqlparse=False, + ) + + +def test_insert_into_with_columns_and_select_union(): + assert_table_lineage_equal( + "INSERT INTO tab1 (col1, col2) SELECT * FROM tab2 UNION SELECT * FROM tab3", + {"tab2", "tab3"}, + {"tab1"}, + test_sqlparse=False, + ) + assert_table_lineage_equal( + "INSERT INTO tab1 (col1, col2) (SELECT * FROM tab2 UNION SELECT * FROM tab3)", + {"tab2", "tab3"}, + {"tab1"}, + test_sqlparse=False, + ) + + +def test_insert_with_custom_columns(): + # test with query as subquery + sql = "INSERT INTO tgt_tbl(random1, random2) (SELECT col1,col2 FROM src_tbl)" + assert_column_lineage_equal( + sql, + [ + ( + ColumnQualifierTuple("col1", "src_tbl"), + ColumnQualifierTuple("random1", "tgt_tbl"), + ), + ( + ColumnQualifierTuple("col2", "src_tbl"), + ColumnQualifierTuple("random2", "tgt_tbl"), + ), + ], + test_sqlparse=False, + ) + + # test with plain query + sql = "INSERT INTO tgt_tbl(random1, random2) SELECT col1,col2 FROM src_tbl_new" + assert_column_lineage_equal( + sql, + [ + ( + ColumnQualifierTuple("col1", "src_tbl_new"), + ColumnQualifierTuple("random1", "tgt_tbl"), + ), + ( + ColumnQualifierTuple("col2", "src_tbl_new"), + ColumnQualifierTuple("random2", "tgt_tbl"), + ), + ], + test_sqlparse=False, + ) From 8ed0e5275f2dbef4801f6461fbfb7925d634907f Mon Sep 17 00:00:00 2001 From: reata Date: Sun, 11 Jun 2023 20:17:57 +0800 Subject: [PATCH 07/10] fix: send lineage request with dialect from localStorage (#388) --- sqllineagejs/src/features/editor/Editor.js | 6 +++--- sqllineagejs/src/features/editor/editorSlice.js | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sqllineagejs/src/features/editor/Editor.js b/sqllineagejs/src/features/editor/Editor.js index 8c4ee8ab..cc4f1b5d 100644 --- a/sqllineagejs/src/features/editor/Editor.js +++ b/sqllineagejs/src/features/editor/Editor.js @@ -39,11 +39,11 @@ export function Editor(props) { dispatch(setDagLevel("table")); if (file === null) { dispatch(setEditable(true)); - dispatch(fetchDAG({"e": editorState.contentComposed, "dialect": dialect})) + dispatch(fetchDAG({"e": editorState.contentComposed})) } else { dispatch(setEditable(false)); dispatch(fetchContent({"f": file})); - dispatch(fetchDAG({"f": file, "dialect": dialect})); + dispatch(fetchDAG({"f": file})); } } } @@ -54,7 +54,7 @@ export function Editor(props) { editor.onDidBlurEditorText(() => { if (!editor.getOption(readOnly)) { dispatch(setContentComposed(editor.getValue())); - dispatch(fetchDAG({"e": editor.getValue(), "dialect": dialect})); + dispatch(fetchDAG({"e": editor.getValue()})); } }) editor.onKeyDown(() => { diff --git a/sqllineagejs/src/features/editor/editorSlice.js b/sqllineagejs/src/features/editor/editorSlice.js index fd7b49d0..cfdda95c 100644 --- a/sqllineagejs/src/features/editor/editorSlice.js +++ b/sqllineagejs/src/features/editor/editorSlice.js @@ -22,6 +22,10 @@ export const fetchContent = createAsyncThunk('editor/fetchContent', async (paylo }) export const fetchDAG = createAsyncThunk('dag/fetchDAG', async (payload) => { + let dialect = localStorage.getItem("dialect"); + if (dialect !== null) { + payload["dialect"] = dialect + } return await client.post(assemble_absolute_endpoint("/lineage"), payload); }) From e31d830791de8af988e91eb79a7bc9922bb405d2 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Wed, 21 Jun 2023 19:09:33 +0530 Subject: [PATCH 08/10] python 3.7 compatible changes --- sqllineage/core/parser/__init__.py | 4 ++-- .../extractors/dml_insert_extractor.py | 10 +++++---- sqllineage/core/parser/sqlfluff/models.py | 21 +++++++------------ 3 files changed, 15 insertions(+), 20 deletions(-) diff --git a/sqllineage/core/parser/__init__.py b/sqllineage/core/parser/__init__.py index 2719084e..670c223d 100644 --- a/sqllineage/core/parser/__init__.py +++ b/sqllineage/core/parser/__init__.py @@ -31,13 +31,13 @@ def end_of_query_cleanup(self, holder: SubQueryLineageHolder) -> None: for src_col in tgt_col.to_source_columns( self.get_alias_mapping_from_table_group(tbl_grp, holder) ): - if len(target_columns := holder.target_columns) == len(col_grp): + if len(holder.target_columns) == len(col_grp): # example query: create view test (col3) select col1 as col2 from tab # without target_columns = [col3] information, by default src_col = col1 and tgt_col = col2 # when target_columns exist and length matches, we want tgt_col = col3 instead of col2 # for invalid query: create view test (col3, col4) select col1 as col2 from tab, # when the length doesn't match, we fall back to default behavior - tgt_col = target_columns[idx] + tgt_col = holder.target_columns[idx] holder.add_column_lineage(src_col, tgt_col) @classmethod diff --git a/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py b/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py index 4e4943c1..4e7700b9 100644 --- a/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py +++ b/sqllineage/core/parser/sqlfluff/extractors/dml_insert_extractor.py @@ -68,10 +68,12 @@ def extract( # note regular subquery within SELECT statement is handled by DmlSelectExtractor, this is only to handle # top-level subquery in DML like: 1) create table foo as (subquery); 2) insert into foo (subquery) # subquery here isn't added as read source, and it inherits DML-level target_columns if parsed - if subquery_segment := get_child(segment, "select_statement"): - self._extract_select(holder, subquery_segment) - elif subquery_segment := get_child(segment, "set_expression"): - self._extract_set(holder, subquery_segment) + subquery_segment_select = get_child(segment, "select_statement") + subquery_segment_set = get_child(segment, "set_expression") + if subquery_segment_select: + self._extract_select(holder, subquery_segment_select) + elif subquery_segment_set: + self._extract_set(holder, subquery_segment_set) elif segment.type == "select_statement": self._extract_select(holder, segment) elif segment.type == "set_expression": diff --git a/sqllineage/core/parser/sqlfluff/models.py b/sqllineage/core/parser/sqlfluff/models.py index ecd4e4a6..4d353cfb 100644 --- a/sqllineage/core/parser/sqlfluff/models.py +++ b/sqllineage/core/parser/sqlfluff/models.py @@ -116,20 +116,13 @@ def of(column: BaseSegment, **kwargs) -> Column: column_name = get_identifier(sub_segment) elif sub_segment.type == "expression": # special handling for postgres style type cast, col as target column name instead of col::type - if len(sub2_segments := retrieve_segments(sub_segment)) == 1: - if ( - sub2_segment := sub2_segments[0] - ).type == "cast_expression": - if ( - len( - sub3_segments := retrieve_segments(sub2_segment) - ) - == 2 - ): - if ( - sub3_segment := sub3_segments[0] - ).type == "column_reference": - column_name = get_identifier(sub3_segment) + sub2_segments = retrieve_segments(sub_segment) + if len(sub2_segments) == 1: + if (sub2_segments[0]).type == "cast_expression": + sub3_segments = retrieve_segments(sub2_segments[0]) + if len(retrieve_segments(sub2_segments[0])) == 2: + if (sub3_segments[0]).type == "column_reference": + column_name = get_identifier(sub3_segments[0]) return Column( column.raw if column_name is None else column_name, source_columns=source_columns, From b54bcbd776dab37a28ef8ab406d25d67d2b0089e Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Fri, 23 Jun 2023 00:03:27 +0530 Subject: [PATCH 09/10] skip 3.7 test for macos --- .github/workflows/python-package.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 7f46c12e..68369b6a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -30,6 +30,12 @@ jobs: - name: Install run: pip install tox codecov - name: Script - run: tox -e py + run: | + # Add conditional check to skip the step + if [ "${{ matrix.os }}" != "macos-latest" ] || [ "${{ matrix.python-version }}" != "3.7" ]; then + tox -e py + else + echo "Skipping Script step for macOS-latest with Python 3.7" + fi - name: After Success run: codecov From 82d118b419a109a5f6af1c2c5726594729db9329 Mon Sep 17 00:00:00 2001 From: ulixius9 Date: Fri, 23 Jun 2023 00:18:25 +0530 Subject: [PATCH 10/10] Add shell in workflow --- .github/workflows/python-package.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 68369b6a..a1ec774b 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -37,5 +37,6 @@ jobs: else echo "Skipping Script step for macOS-latest with Python 3.7" fi + shell: bash - name: After Success run: codecov