diff --git a/README.md b/README.md index 33e465e..3b396bf 100644 --- a/README.md +++ b/README.md @@ -68,12 +68,10 @@ _**PyPI**_

(back to top)

### Usage -- *In our notebooks containing DLT Jobs the imports changes slightly as below and also the extra decorator -`@dltwithdebug(globals())` is added to the functions* ```python # Imports - from dlt_with_debug import dltwithdebug, pipeline_id, showoutput + from dlt_with_debug import pipeline_id, showoutput if pipeline_id: import dlt @@ -83,8 +81,7 @@ _**PyPI**_ # Now define your dlt code with one extra decorator "@dltwithdebug(globals())" added to it - @dlt.create_table(comment = "dlt pipeline example") - @dltwithdebug(globals()) + @dlt.table(comment = "dlt pipeline example") def click_raw_bz(): return ( spark.read.option("header","true").csv("dbfs:/FileStore/souvikpratiher/click.csv") @@ -96,11 +93,6 @@ _**PyPI**_ # Get the output data to a dataframe df = click_raw_bz() ``` -> **Note**: -> 1. Use the `dlt.create_table()` API instead of `dlt.table()` as `dlt.table()` sometimes gets mixed with `spark.table()` -in the global namespace. -> 2. Always pass the `globals()` namespace to `dltwithdebug` decorator like this `@dltwithdebug(globals())` -

(back to top)

@@ -123,7 +115,7 @@ from pyspark.sql.types import * # dltwithdebug as that's the entry point to interactive DLT workflows # pipeline_id to ensure we import the dlt package based on environment # showoutput is a helper function for seeing the output result along with expectation metrics if any is specified -from dlt_with_debug import dltwithdebug, pipeline_id, showoutput +from dlt_with_debug import pipeline_id if pipeline_id: import dlt @@ -138,13 +130,12 @@ Cmd 4 ```python # Notice we are using dlt.create_table instead of dlt.table -@dlt.create_table( +@dlt.table( comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.", table_properties={ "quality": "bronze" } ) -@dltwithdebug(globals()) def clickstream_raw(): return ( spark.read.option("inferSchema", "true").json(json_path) @@ -161,7 +152,7 @@ showoutput(clickstream_raw) Cmd 6 ```python -@dlt.create_table( +@dlt.table( comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.", table_properties={ "quality": "silver" @@ -170,7 +161,6 @@ Cmd 6 @dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL") @dlt.expect_or_fail("valid_count", "click_count > 0") @dlt.expect_all({'valid_prev_page_id': "previous_page_id IS NOT NULL"}) -@dltwithdebug(globals()) def clickstream_clean(): return ( dlt.read("clickstream_raw") @@ -208,7 +198,7 @@ showoutput(clickstream_clean) #### Table syntax ```python -@dlt.create_table( # <-- Notice we are using the dlt.create_table() instead of dlt.table() +@dlt.table( name="", comment="", spark_conf={"" : ""}, @@ -223,7 +213,6 @@ showoutput(clickstream_clean) @dlt.expect_all @dlt.expect_all_or_drop @dlt.expect_all_or_fail -@dltwithdebug(globals()) # <-- This dltwithdebug(globals()) needs to be added def (): return () ``` @@ -231,7 +220,7 @@ def (): #### View syntax ```python -@dlt.create_view( # <-- Notice we are using the dlt.create_view() instead of dlt.view() +@dlt.view( name="", comment="") @dlt.expect @@ -240,7 +229,6 @@ def (): @dlt.expect_all @dlt.expect_all_or_drop @dlt.expect_all_or_fail -@dltwithdebug(globals()) # <-- This dltwithdebug(globals()) needs to be added def (): return () ``` @@ -253,8 +241,7 @@ showoutput(function_name) # <-- showoutput(function_name) # The name of the function which is wrapped by the dltdecorators # For example: - # @dlt.create_table() - # @dltwithdebug(globals()) + # @dlt.table() # def step_one(): # return spark.read.csv() @@ -268,7 +255,7 @@ showoutput(function_name) # <-- showoutput(function_name) # dltwithdebug as that's the entry point to interactive DLT workflows # pipeline_id to ensure we import the dlt package based on environment # showoutput is a helper function for seeing the output result along with expectation metrics if any is specified -from dlt_with_debug import dltwithdebug, pipeline_id, showoutput +from dlt_with_debug import pipeline_id if pipeline_id: import dlt @@ -286,9 +273,9 @@ As of now the following DLT APIs are covered for interactive use: - `dlt.read` - `dlt.read_stream` - - `dlt.create_table` - - `dlt.create_view` - - `dlt.table` <-- This one sometimes gets overridden with `spark.table` so use `dlt.create_table` instead. + - `dlt.table` + - `dlt.view` + - `dlt.table` - `dlt.view` - `dlt.expect` - `dlt.expect_or_fail` diff --git a/demo_notebook/dlt_debug_hybrid_demo.py b/demo_notebook/dlt_debug_hybrid_demo.py index 309c937..65a6e34 100644 --- a/demo_notebook/dlt_debug_hybrid_demo.py +++ b/demo_notebook/dlt_debug_hybrid_demo.py @@ -6,7 +6,7 @@ from pyspark.sql.functions import * from pyspark.sql.types import * -from dlt_with_debug import dltwithdebug, pipeline_id, showoutput +from dlt_with_debug import pipeline_id, showoutput if pipeline_id: import dlt @@ -19,13 +19,12 @@ # COMMAND ---------- -@dlt.create_table( +@dlt.table( comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.", table_properties={ "quality": "bronze" } ) -@dltwithdebug(globals()) def clickstream_raw(): return ( spark.read.option("inferSchema", "true").json(json_path) @@ -37,7 +36,7 @@ def clickstream_raw(): # COMMAND ---------- -@dlt.create_table( +@dlt.table( comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.", table_properties={ "quality": "silver" @@ -46,7 +45,6 @@ def clickstream_raw(): @dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL") @dlt.expect_or_fail("valid_count", "click_count > 0") @dlt.expect_all({'valid_prev_page_id': "previous_page_id IS NOT NULL"}) -@dltwithdebug(globals()) def clickstream_clean(): return ( dlt.read("clickstream_raw") diff --git a/dlt_with_debug/dlt_signatures.py b/dlt_with_debug/dlt_signatures.py index 396ff91..2bc67e1 100644 --- a/dlt_with_debug/dlt_signatures.py +++ b/dlt_with_debug/dlt_signatures.py @@ -1,12 +1,25 @@ """ This file contains the empty placeholder signatures of the dlt APIs """ +import functools from functools import wraps -from dlt_with_debug.helpers import undecorated +from inspect import getsource +from typing import Dict, Any + +from dlt_with_debug.helpers import undecorated, remove_dltwithdebug_decorator import builtins as orig +from pyspark.sql import SparkSession + +spark = SparkSession.getActiveSession() +pipeline_id = spark.conf.get("pipelines.id", None) + g_ns_for_placeholders = globals() -addglobals = lambda x: g_ns_for_placeholders.update(x) + + +def add_globals(x: Dict[str, Any]): + g_ns_for_placeholders.update(x) + def read(arg): return g_ns_for_placeholders[arg]() @@ -16,6 +29,27 @@ def read_stream(arg): return g_ns_for_placeholders[arg]() +@functools.lru_cache +def add_function_to_ns(f, alternative_name=None): + # it exists in databricks notebooks + import IPython + g_ns = IPython.get_ipython().user_ns + if pipeline_id: + return + elif f.__name__ in g_ns_for_placeholders or ( + alternative_name is not None and alternative_name in g_ns_for_placeholders): + return + else: + f_undec = undecorated(f) + code = getsource(f_undec) + parsed_code = remove_dltwithdebug_decorator(code) + add_globals(g_ns) + exec(parsed_code, g_ns) + if alternative_name is not None: + g_ns_for_placeholders[alternative_name] = g_ns_for_placeholders[f.__name__] + return + + def table(name=None, comment=None, spark_conf=None, @@ -27,12 +61,14 @@ def table(name=None, def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): + add_function_to_ns(f, name) return f(*args, **kwargs) return wrapped return true_decorator + create_table = table @@ -41,19 +77,22 @@ def view(name=None, def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): + add_function_to_ns(f, name) return f(*args, **kwargs) return wrapped return true_decorator + create_view = view -def get_name_inv_statement(f,name,inv): + +def get_name_inv_statement(f, name, inv): func = undecorated(f) count = func().filter(inv).count() total = func().count() - stmt = f"Expectation `{name}` will affect {total-count} records which is {orig.round(((total-count)/total)*100,2)}% of total {total} records" + stmt = f"Expectation `{name}` will affect {total - count} records which is {orig.round(((total - count) / total) * 100, 2)}% of total {total} records" return stmt @@ -63,7 +102,7 @@ def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): if name: - stmt = "'expect' "+get_name_inv_statement(f,name,inv) + stmt = "'expect' " + get_name_inv_statement(f, name, inv) print(stmt) return f(*args, **kwargs) @@ -78,7 +117,7 @@ def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): if name: - stmt = "'expect_or_drop' "+get_name_inv_statement(f,name,inv) + stmt = "'expect_or_drop' " + get_name_inv_statement(f, name, inv) print(stmt) return f(*args, **kwargs) @@ -93,7 +132,7 @@ def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): if name: - stmt = "'expect_or_fail' "+get_name_inv_statement(f,name,inv) + stmt = "'expect_or_fail' " + get_name_inv_statement(f, name, inv) print(stmt) return f(*args, **kwargs) @@ -102,7 +141,7 @@ def wrapped(*args, **kwargs): return true_decorator -def get_expectations_statement(f,expectations): +def get_expectations_statement(f, expectations): func = undecorated(f) expec_lst = list(expectations.values()) expec_lst = ["(" + str(i) + ")" for i in expec_lst] @@ -110,7 +149,7 @@ def get_expectations_statement(f,expectations): count = func().filter(expec_cond).count() total = func().count() expec_txt = " AND ".join(list(expectations.keys())) - stmt = f"Expectations `{expec_txt}` will affect {total-count} records which is {orig.round(((total-count) / total) * 100, 2)}% of total {total} records" + stmt = f"Expectations `{expec_txt}` will affect {total - count} records which is {orig.round(((total - count) / total) * 100, 2)}% of total {total} records" return stmt @@ -119,7 +158,7 @@ def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): if expectations: - stmt = "'expect_all' "+get_expectations_statement(f,expectations) + stmt = "'expect_all' " + get_expectations_statement(f, expectations) print(stmt) return f(*args, **kwargs) @@ -133,7 +172,7 @@ def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): if expectations: - stmt = "'expect_all_or_drop' "+get_expectations_statement(f,expectations) + stmt = "'expect_all_or_drop' " + get_expectations_statement(f, expectations) print(stmt) return f(*args, **kwargs) @@ -147,7 +186,7 @@ def true_decorator(f): @wraps(f) def wrapped(*args, **kwargs): if expectations: - stmt = "'expect_all_or_fail' "+get_expectations_statement(f,expectations) + stmt = "'expect_all_or_fail' " + get_expectations_statement(f, expectations) print(stmt) return f(*args, **kwargs) diff --git a/dlt_with_debug/v2.py b/dlt_with_debug/v2.py index de6117f..af018cd 100644 --- a/dlt_with_debug/v2.py +++ b/dlt_with_debug/v2.py @@ -1,7 +1,7 @@ from functools import wraps from inspect import getsource from dlt_with_debug.helpers import undecorated, remove_dltwithdebug_decorator -from dlt_with_debug.dlt_signatures import addglobals +from dlt_with_debug.dlt_signatures import add_globals, table from pyspark.sql import SparkSession spark = SparkSession.getActiveSession() @@ -9,24 +9,27 @@ def dltwithdebug(g_ns): - def true_decorator(f): - @wraps(f) - def wrapped(*args, **kwargs): - if pipeline_id: - return f(*args, **kwargs) - else: - f_undec = undecorated(f) - code = getsource(f_undec) - parsed_code = remove_dltwithdebug_decorator(code) - addglobals(g_ns) - exec(parsed_code, g_ns) - return f(*args, **kwargs) - return wrapped - return true_decorator + def true_decorator(f): + @wraps(f) + def wrapped(*args, **kwargs): + if pipeline_id: + return f(*args, **kwargs) + else: + f_undec = undecorated(f) + code = getsource(f_undec) + parsed_code = remove_dltwithdebug_decorator(code) + add_globals(g_ns) + exec(parsed_code, g_ns) + return f(*args, **kwargs) + + return wrapped + + return true_decorator + def showoutput(f): - if not pipeline_id: - df = f() - df.display() - else: - None + if not pipeline_id: + df = f() + df.display() + # overwrites the table setting after calling display for supporting dlt.table + add_globals({"table": table})