Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed requirements for dltwithdebug and globals requirement. Added … #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 12 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,10 @@ _**PyPI**_
<p align="right">(<a href="#top">back to top</a>)</p>

### Usage
- *In our notebooks containing DLT Jobs the imports changes slightly as below and also the extra decorator
`@dltwithdebug(globals())` is added to the functions*

```python
# Imports
from dlt_with_debug import dltwithdebug, pipeline_id, showoutput
from dlt_with_debug import pipeline_id, showoutput

if pipeline_id:
import dlt
Expand All @@ -83,8 +81,7 @@ _**PyPI**_

# Now define your dlt code with one extra decorator "@dltwithdebug(globals())" added to it

@dlt.create_table(comment = "dlt pipeline example")
@dltwithdebug(globals())
@dlt.table(comment = "dlt pipeline example")
def click_raw_bz():
return (
spark.read.option("header","true").csv("dbfs:/FileStore/souvikpratiher/click.csv")
Expand All @@ -96,11 +93,6 @@ _**PyPI**_
# Get the output data to a dataframe
df = click_raw_bz()
```
> **Note**:
> 1. Use the `dlt.create_table()` API instead of `dlt.table()` as `dlt.table()` sometimes gets mixed with `spark.table()`
in the global namespace.
> 2. Always pass the `globals()` namespace to `dltwithdebug` decorator like this `@dltwithdebug(globals())`


<p align="right">(<a href="#top">back to top</a>)</p>

Expand All @@ -123,7 +115,7 @@ from pyspark.sql.types import *
# dltwithdebug as that's the entry point to interactive DLT workflows
# pipeline_id to ensure we import the dlt package based on environment
# showoutput is a helper function for seeing the output result along with expectation metrics if any is specified
from dlt_with_debug import dltwithdebug, pipeline_id, showoutput
from dlt_with_debug import pipeline_id

if pipeline_id:
import dlt
Expand All @@ -138,13 +130,12 @@ Cmd 4
```python
# Notice we are using dlt.create_table instead of dlt.table

@dlt.create_table(
@dlt.table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
@dltwithdebug(globals())
def clickstream_raw():
return (
spark.read.option("inferSchema", "true").json(json_path)
Expand All @@ -161,7 +152,7 @@ showoutput(clickstream_raw)

Cmd 6
```python
@dlt.create_table(
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
Expand All @@ -170,7 +161,6 @@ Cmd 6
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
@dlt.expect_all({'valid_prev_page_id': "previous_page_id IS NOT NULL"})
@dltwithdebug(globals())
def clickstream_clean():
return (
dlt.read("clickstream_raw")
Expand Down Expand Up @@ -208,7 +198,7 @@ showoutput(clickstream_clean)
#### Table syntax

```python
@dlt.create_table( # <-- Notice we are using the dlt.create_table() instead of dlt.table()
@dlt.table(
name="<name>",
comment="<comment>",
spark_conf={"<key>" : "<value", "<key" : "<value>"},
Expand All @@ -223,15 +213,14 @@ showoutput(clickstream_clean)
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
@dltwithdebug(globals()) # <-- This dltwithdebug(globals()) needs to be added
def <function-name>():
return (<query>)
```

#### View syntax

```python
@dlt.create_view( # <-- Notice we are using the dlt.create_view() instead of dlt.view()
@dlt.view(
name="<name>",
comment="<comment>")
@dlt.expect
Expand All @@ -240,7 +229,6 @@ def <function-name>():
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
@dltwithdebug(globals()) # <-- This dltwithdebug(globals()) needs to be added
def <function-name>():
return (<query>)
```
Expand All @@ -253,8 +241,7 @@ showoutput(function_name) # <-- showoutput(function_name)
# The name of the function which is wrapped by the dltdecorators

# For example:
# @dlt.create_table()
# @dltwithdebug(globals())
# @dlt.table()
# def step_one():
# return spark.read.csv()

Expand All @@ -268,7 +255,7 @@ showoutput(function_name) # <-- showoutput(function_name)
# dltwithdebug as that's the entry point to interactive DLT workflows
# pipeline_id to ensure we import the dlt package based on environment
# showoutput is a helper function for seeing the output result along with expectation metrics if any is specified
from dlt_with_debug import dltwithdebug, pipeline_id, showoutput
from dlt_with_debug import pipeline_id

if pipeline_id:
import dlt
Expand All @@ -286,9 +273,9 @@ As of now the following DLT APIs are covered for interactive use:

- `dlt.read`
- `dlt.read_stream`
- `dlt.create_table`
- `dlt.create_view`
- `dlt.table` <-- This one sometimes gets overridden with `spark.table` so use `dlt.create_table` instead.
- `dlt.table`
- `dlt.view`
- `dlt.table`
- `dlt.view`
- `dlt.expect`
- `dlt.expect_or_fail`
Expand Down
8 changes: 3 additions & 5 deletions demo_notebook/dlt_debug_hybrid_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pyspark.sql.functions import *
from pyspark.sql.types import *

from dlt_with_debug import dltwithdebug, pipeline_id, showoutput
from dlt_with_debug import pipeline_id, showoutput

if pipeline_id:
import dlt
Expand All @@ -19,13 +19,12 @@

# COMMAND ----------

@dlt.create_table(
@dlt.table(
comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
@dltwithdebug(globals())
def clickstream_raw():
return (
spark.read.option("inferSchema", "true").json(json_path)
Expand All @@ -37,7 +36,7 @@ def clickstream_raw():

# COMMAND ----------

@dlt.create_table(
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
Expand All @@ -46,7 +45,6 @@ def clickstream_raw():
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
@dlt.expect_all({'valid_prev_page_id': "previous_page_id IS NOT NULL"})
@dltwithdebug(globals())
def clickstream_clean():
return (
dlt.read("clickstream_raw")
Expand Down
63 changes: 51 additions & 12 deletions dlt_with_debug/dlt_signatures.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
"""
This file contains the empty placeholder signatures of the dlt APIs
"""
import functools
from functools import wraps
from dlt_with_debug.helpers import undecorated
from inspect import getsource
from typing import Dict, Any

from dlt_with_debug.helpers import undecorated, remove_dltwithdebug_decorator
import builtins as orig

from pyspark.sql import SparkSession

spark = SparkSession.getActiveSession()
pipeline_id = spark.conf.get("pipelines.id", None)

g_ns_for_placeholders = globals()
addglobals = lambda x: g_ns_for_placeholders.update(x)


def add_globals(x: Dict[str, Any]):
g_ns_for_placeholders.update(x)


def read(arg):
return g_ns_for_placeholders[arg]()
Expand All @@ -16,6 +29,27 @@ def read_stream(arg):
return g_ns_for_placeholders[arg]()


@functools.lru_cache
def add_function_to_ns(f, alternative_name=None):
# it exists in databricks notebooks
import IPython
g_ns = IPython.get_ipython().user_ns
if pipeline_id:
return
elif f.__name__ in g_ns_for_placeholders or (
alternative_name is not None and alternative_name in g_ns_for_placeholders):
return
else:
f_undec = undecorated(f)
code = getsource(f_undec)
parsed_code = remove_dltwithdebug_decorator(code)
add_globals(g_ns)
exec(parsed_code, g_ns)
if alternative_name is not None:
g_ns_for_placeholders[alternative_name] = g_ns_for_placeholders[f.__name__]
return


def table(name=None,
comment=None,
spark_conf=None,
Expand All @@ -27,12 +61,14 @@ def table(name=None,
def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
add_function_to_ns(f, name)
return f(*args, **kwargs)

return wrapped

return true_decorator


create_table = table


Expand All @@ -41,19 +77,22 @@ def view(name=None,
def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
add_function_to_ns(f, name)
return f(*args, **kwargs)

return wrapped

return true_decorator


create_view = view

def get_name_inv_statement(f,name,inv):

def get_name_inv_statement(f, name, inv):
func = undecorated(f)
count = func().filter(inv).count()
total = func().count()
stmt = f"Expectation `{name}` will affect {total-count} records which is {orig.round(((total-count)/total)*100,2)}% of total {total} records"
stmt = f"Expectation `{name}` will affect {total - count} records which is {orig.round(((total - count) / total) * 100, 2)}% of total {total} records"
return stmt


Expand All @@ -63,7 +102,7 @@ def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if name:
stmt = "'expect' "+get_name_inv_statement(f,name,inv)
stmt = "'expect' " + get_name_inv_statement(f, name, inv)
print(stmt)
return f(*args, **kwargs)

Expand All @@ -78,7 +117,7 @@ def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if name:
stmt = "'expect_or_drop' "+get_name_inv_statement(f,name,inv)
stmt = "'expect_or_drop' " + get_name_inv_statement(f, name, inv)
print(stmt)
return f(*args, **kwargs)

Expand All @@ -93,7 +132,7 @@ def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if name:
stmt = "'expect_or_fail' "+get_name_inv_statement(f,name,inv)
stmt = "'expect_or_fail' " + get_name_inv_statement(f, name, inv)
print(stmt)
return f(*args, **kwargs)

Expand All @@ -102,15 +141,15 @@ def wrapped(*args, **kwargs):
return true_decorator


def get_expectations_statement(f,expectations):
def get_expectations_statement(f, expectations):
func = undecorated(f)
expec_lst = list(expectations.values())
expec_lst = ["(" + str(i) + ")" for i in expec_lst]
expec_cond = " AND ".join(expec_lst)
count = func().filter(expec_cond).count()
total = func().count()
expec_txt = " AND ".join(list(expectations.keys()))
stmt = f"Expectations `{expec_txt}` will affect {total-count} records which is {orig.round(((total-count) / total) * 100, 2)}% of total {total} records"
stmt = f"Expectations `{expec_txt}` will affect {total - count} records which is {orig.round(((total - count) / total) * 100, 2)}% of total {total} records"
return stmt


Expand All @@ -119,7 +158,7 @@ def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if expectations:
stmt = "'expect_all' "+get_expectations_statement(f,expectations)
stmt = "'expect_all' " + get_expectations_statement(f, expectations)
print(stmt)
return f(*args, **kwargs)

Expand All @@ -133,7 +172,7 @@ def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if expectations:
stmt = "'expect_all_or_drop' "+get_expectations_statement(f,expectations)
stmt = "'expect_all_or_drop' " + get_expectations_statement(f, expectations)
print(stmt)
return f(*args, **kwargs)

Expand All @@ -147,7 +186,7 @@ def true_decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if expectations:
stmt = "'expect_all_or_fail' "+get_expectations_statement(f,expectations)
stmt = "'expect_all_or_fail' " + get_expectations_statement(f, expectations)
print(stmt)
return f(*args, **kwargs)

Expand Down
Loading