dltflow
is a Python package that provides authoring utilities and CD patterns for Databricks' DLT product. It intends
make writing and deploying DLT code and pipelines to Databricks as easy as possible.
NOTE!!!: This project is in early development. APIs and features are subject to change. Use with caution.
Valid question. Here are a few reasons why you might want to use dltflow
:
- DLT Pipelines in Python Modules:
- DLT Pipelines are a newer feature in Databricks bring data quality, lineage, and observability to your data pipelines.
- DLT, as documented, can only be instrumented via Notebooks.
dltflow
provides a way to author DLT pipelines in Python modules - which leverage meta programming patterns (via configuration) and deploy them to Databricks.
- Deployment of DLT Pipelines:
dltflow
provides a way to deploy Python Modules as DLT pipelines to Databricks.- It builds on the shoulders of
dbx
anddlt-meta
projects to provide a seamless deployment experience.
This project heavily is inspired by dbx and dlt-meta projects.
The reason for a seperate project is because:
- Generally DLT pipelines are only SQL or Python, and have to live in Notebooks.
dbx
is a great tool for deploying Python modules to Databricks, but it doesn't support DLT Pipelines for python modules.dlt-meta
has some deployment features that are adopted into this repo.dab
is a new deployment tool by databricks, but suffers from the same problem asdbx
.
pip install dltflow
dltflow
's audience is for developers who are familiar with Databricks, write PySpark, and want to instrument their
data assets via DLT pipelines.
Project Initialization and Templating:
dltflow
provides a cli command to initialize a project. This command will create a dltflow.yml
file in the root of
your project. Optionally, you can start your project with a template.
dltflow init --help
>>> Usage: dltflow init [OPTIONS]
Initialize the project with a dltflow.yml file.
Optionally start your project with a template.
Options:
-p, --profile TEXT Databricks profile to use
-n, --project-name TEXT Name of the project
-c, --config-path TEXT Path to configuration directory
-w, --workflows-path TEXT Path to workflows directory
-t, --build-template Create a templated project?
-o, --overwrite Overwrite existing config file [default: True]
-d, --shared DBFS location to store the project [default:
True]
--help Show this message and exit.
Simply running dltflow init
will bring up a set of prompts which is help you fill out the options listed above. As a
final
question in the prompts, you will be asked if you want to start your project with a template. If you answer yes
, a
template project will be created in the current working directory.
The structure will be as follows:
git-root/
my_project/ # code goes here.
conf/ # configuration to drive your pipelines.
workflows/ # json or yml definitions for workflows in databricks.
dltflow.yml # dltflow config file.
setup.py # setup file for python packages.
pyproject.toml # pyproject file for python packages.
Writing DLT pipelines in Python modules is as simple as writing a Python module. dltflow
currently is customized to be
integrated with PySpark where pipelines are represented as Python classes.
It integrates relatively seamlessly with OOP-style authoring, as it exposes a DLTMetaMixin
class. What really powers
dltflow
is the pattern of configuration-driven meta programming. The configuration approach, along with the easy of
integrating the DLTMetaMixin
class, intends to allow developers to keep code simple without having to worry about the
complexity of DLT.
Let's take a look at an example; we'll break things down after taking a quick look.
pipeline-code-config.json
{
"reader": {
"configuration": "goes_here"
},
"writer": {
"configuration": "goes_here"
},
"dlt": {
"func_name": "orchestrate",
"kind": "table",
"expectations": [
{
"name": "check_addition",
"constraint": "result < 10"
}
],
"expectation_action": "allow"
}
}
The above json file intends to represent a VCS controlled parameter file. It highlights specific components that we can
use to drive our codebase. For the purposes of this repo, we'll be focusing on the dlt
section of the configuration.
There's a few key noteables.
func_name
: This tellsdltflow
what function to decorate with DLT instrumentation.kind
: This tellsdltflow
what kind of DLT object to materialize.- Expected values are
table
,view
.
- Expected values are
expectations
: This is a list of expectations that you want to enforce on your data.expectation_action
: This is the action to take if an expectation is violated.- Expected values are
allow
,drop
, andfail
.
- Expected values are
Not listed above are a few other options:
- is_streaming_table: This is a boolean flag that tells
dltflow
if the DLT object is a streaming table. - append_config: This is a dictionary that allows you to perform append based streaming DLT instrumentation patterns.
- apply_cdc_config: This is a dictionary that allows you to perform CDC based streaming DLT instrumentation patterns.
For more details on dlt
documentation, please refer to
the Databricks documentation on DLT.
Now that we have our configuration, we'll write a very simple pipeline that reads a DataFrame, transforms it, and writes it to a table. We'll use the configuration to drive the DLT instrumentation.
Based on the configuration, our code has the following technical requirements:
- The pipeline must have a method called
orchestrate
.DLTMetaMixin
will decorate this method. - The pipeline expects a column called
result
be present in the Dataframe on the return value oforchestrate
.
simple-dltflow-pipeline.py
import sys
import pathlib
from typing import Dict, Any
from argparse import ArgumentParser
import yaml
from pyspark.sql import DataFrame
from dltflow.quality import DLTMetaMixin
class MyPipeline(DLTMetaMixin):
def __init__(self, spark):
self.spark = spark
def read(self) -> DataFrame:
return self.spark.createDataFrame([(1, 2), (5, 8), (1, 5)], ["col1", "col2"])
def transform(self, df: DataFrame) -> DataFrame:
return df.withColumn("result", df["col1"] + df["col2"])
def write(self, df):
df.write.saveAsTable("my_table")
def orchestrate(self):
df = self.read()
df = self.transform(df)
return df
@staticmethod
def _get_conf_file():
"""Uses the arg parser to extract the config location from cli."""
p = ArgumentParser()
p.add_argument("--conf-file", required=False, type=str)
namespace = p.parse_known_args(sys.argv[1:])[0]
return namespace.conf_file
@staticmethod
def _read_config(conf_file) -> Dict[str, Any]:
config = yaml.safe_load(pathlib.Path(conf_file).read_text())
return config
Now that we have our configuration and our pipeline we can try and execute the pipeline.
NOTE:
- The above code is a simple example. It is not intended to be a complete example.
- DLT Pipelines can only be executed in Databricks DLT Pipelines.
- Running locally is possible with the
databricks-dlt
stub package - however there's some caveats to this approach. Specifically any transformations where we rely on DLT to enforce expectations likeexpect or drop
orexpect all or drop
.
python pipelines/simple-dltflow-pipeline.py --conf-file conf/pipeline-code-config.json
In dltflow
, workflows are deployed to the databricks workspace using a modified version of the Databricks Pipeline
API definition. The workflow spec defined in dltflow
basically is parsed and converted to a Databricks Pipeline for
upload by the dbx
Workflow Deployment Manager.
The following is an example of a workflow spec:
{
"dev": {
"workflows": [
{
"name": "dlflow-example-pipeline",
"storage": "/mnt/datalake/experiment/dltflow-samples/dlt/simple",
"target": "dltflow-samples",
"development": "true",
"edition": "ADVANCED",
"continuous": "false",
"clusters": [
{
"label": "default",
"node_type_id": "Standard_DS3_v2",
"autoscale": {
"min_workers": 1,
"max_workers": 2,
"mode": "ENHANCED"
}
}
],
"pipeline_type": "WORKSPACE",
"data_sampling": false,
"tasks": {
"items": [
{
"python_file": "samples/pipelines/simple-dltflow-pipeline.py",
"parameters": [
"--conf",
"conf/pipeline-code-config.json"
]
}
],
"dependencies": [
{
"pypi": {
"package": "pyspark"
}
}
]
}
}
]
}
}
The tasks
key is what's custom to this job spec definition. You can define n
python files and dependencies perform
workflow. When materialized, each workflow translates to a single Notebook being generated and deploy to the Databricks
workspace.
The same configuration should be possible to express as a yaml
file.
dev:
workflows:
- name: "dltflow-example-pipeline"
storage: "/mnt/datalake/experiment/dltflow-samples/dlt/simple"
target: "dltflow-samples"
development: "true"
edition: "ADVANCED"
continuous: "false"
clusters:
- label: "default"
node_type_id: Standard_DS3_v2"
autoscale:
min_workers: 1
max_workers: 2
mode: "ENHANCED"
pipeline_type: "WORKSPACE"
data_sampling: false
tasks:
items:
- python_file: "samples/pipelines/simple-dltflow-pipeline.py"
parameters:
- "--conf"
- "conf/pipeline-code-config.json"
dependencies:
- whl: "/dbfs/private-site-packages/dltflow-0.0.1b0-py3-none-any.whl"
- pypi:
package: "pyspark"
Now that we have our pipeline, our pipeline configuration, and our DLT pipeline job spec, we can deploy the pipeline to
Databricks. To do so, we need to use dltflow
's deploy
command from the CLI.
Generally dltflow
tries to follow the same pattern as dbx
for deployment. The deploy
command will look for
a dltflow.yml
file in the root of the project. This file should contain the necessary configurations for deployment.
See Initialization docs for more information on the topic.
dltflow deploy-py-dlt --help
>>> Usage: dltflow deploy-py-dlt [OPTIONS]
Deploy a DLT pipeline.
Options:
--deployment-file TEXT [required]
--environment TEXT [required]
--as-individual Overrides project settings. Useful for developers as
their experimenting with getting their code fully
function. The impact of this flag is that any
derived DLT pipelines created with have a prefix
name of [{profile}_{user_name}] -- this is to not
overwrite any existing pipelines with logic that is
not yet fully baked..
--help Show this message and exit.
And to tie together the full example, here's how we can deploy our example pipeline.
dltflow deploy-py-dlt --deployment-file conf/dlt/test.json --environment dev