- Python Testing for Databricks
- Installation
- Ecosystem
- PyTest Fixtures
- Logging
debug_env_name
fixturedebug_env
fixtureenv_or_skip
fixturews
fixturemake_run_as
fixtureacc
fixturespark
fixturesql_backend
fixturesql_exec
fixturesql_fetch_all
fixturemake_random
fixturemake_instance_pool
fixturemake_instance_pool_permissions
fixturemake_job
fixturemake_job_permissions
fixturemake_cluster
fixturemake_cluster_permissions
fixturemake_cluster_policy
fixturemake_cluster_policy_permissions
fixturemake_pipeline
fixturemake_warehouse
fixturemake_group
fixturemake_acc_group
fixturemake_user
fixturemake_pipeline_permissions
fixturemake_notebook
fixturemake_notebook_permissions
fixturemake_workspace_file
fixturemake_directory
fixturemake_directory_permissions
fixturemake_repo
fixturemake_repo_permissions
fixturemake_workspace_file_permissions
fixturemake_workspace_file_path_permissions
fixturemake_secret_scope
fixturemake_secret_scope_acl
fixturemake_authorization_permissions
fixturemake_udf
fixturemake_catalog
fixturemake_schema
fixturemake_table
fixturemake_storage_credential
fixturemake_volume
fixtureproduct_info
fixturemake_model
fixturemake_experiment
fixturemake_experiment_permissions
fixturemake_warehouse_permissions
fixturemake_lakeview_dashboard_permissions
fixturelog_workspace_link
fixturelog_account_link
fixturemake_dashboard_permissions
fixturemake_alert_permissions
fixturemake_query
fixturemake_query_permissions
fixturemake_registered_model_permissions
fixturemake_serving_endpoint
fixturemake_serving_endpoint_permissions
fixturemake_feature_table
fixturemake_feature_table_permissions
fixturewatchdog_remove_after
fixturewatchdog_purge_suffix
fixtureis_in_debug
fixture
- Project Support
Add a databricks-labs-pytester
dependency to your pyproject.toml
file (or legacy requirements.txt
file). You can
also install it directly from the command line:
pip install databricks-labs-pytester
If you use hatch
as a build system, make sure to add databricks-labs-pytester
as
a test-time dependency and not as a compile-time dependency, otherwise your wheels will
transitively depend on pytest
, which is not usually something you need.
[project]
name = "name-of-your-project"
# ...
dependencies = [
"databricks-sdk~=0.30",
# ... dependencies required for your code to execute
]
[tool.hatch.envs.default]
dependencies = [
# ... dependencies required to test/validate/format your code:
"black~=24.3.0",
"coverage[toml]~=7.4.4",
"mypy~=1.9.0",
"pylint~=3.2.2",
"pylint-pytest==2.0.0a0",
"databricks-labs-pylint~=0.4.0",
"databricks-labs-pytester~=0.2", # <= this library
"pytest~=8.3.3",
"pytest-cov~=4.1.0",
"pytest-mock~=3.14.0",
"pytest-timeout~=2.3.1",
"pytest-xdist~=3.5.0",
"python-lsp-server>=1.9.0",
"ruff~=0.3.4",
"types-PyYAML~=6.0.12",
"types-requests~=2.31.0",
]
Built on top of Databricks SDK for Python, this library is part of the Databricks Labs Python ecosystem, which includes the following projects:
- PyLint Plugin for Databricks for static code analysis and early bug detection.
- Blueprint for Python-native pathlib.Path-like interfaces, Managing Python App installations within Databricks Workspaces, Application Migrations, and Building Wheels.
- LSQL for lightweight SQL handling and dashboards-as-code.
- UCX for automated migrations into Unity Catalog and LSP plugin for static code analysis for UC compatibility.
See this video for a quick overview of the Databricks Labs Python ecosystem.
PyTest Fixtures are a powerful way to manage test setup and teardown in Python. This library provides a set of fixtures to help you write integration tests for Databricks. These fixtures were incubated within the Unity Catalog Automated Migrations project for more than a year and are now available for other projects to simplify integration testing with Databricks.
This library is built on years of debugging integration tests for Databricks and its ecosystem.
That's why it comes with a built-in logger that traces creation and deletion of dummy entities through links in the Databricks Workspace UI. If you run the following code:
def test_new_user(make_user, ws):
new_user = make_user()
home_dir = ws.workspace.get_status(f"/Users/{new_user.user_name}")
assert home_dir.object_type == ObjectType.DIRECTORY
You will see the following output, where the first line is clickable and will take you to the user's profile in the Databricks Workspace UI:
12:30:53 INFO [d.l.p.fixtures.baseline] Created dummy-xwuq-...@example.com: https://.....azuredatabricks.net/#settings/workspace/identity-and-access/users/735...
12:30:53 DEBUG [d.l.p.fixtures.baseline] added workspace user fixture: User(active=True, display_name='dummy-xwuq-...@example.com', ...)
12:30:58 DEBUG [d.l.p.fixtures.baseline] clearing 1 workspace user fixtures
12:30:58 DEBUG [d.l.p.fixtures.baseline] removing workspace user fixture: User(active=True, display_name='dummy-xwuq-...@example.com', ...)
You may need to add the following to your conftest.py
file to enable this:
import logging
from databricks.labs.blueprint.logger import install_logger
install_logger()
logging.getLogger('databricks.labs.pytester').setLevel(logging.DEBUG)
Specify the name of the debug environment. By default, it is set to .env
,
which will try to find a file named .env
in any of the parent directories of the current working directory and load
the environment variables from it via the debug_env
fixture.
Alternatively, if you are concerned of the
risk of .env
files getting checked into version control,
we recommend using the ~/.databricks/debug-env.json
file to store different sets of environment variables.
The file cannot be checked into version control by design, because it is stored in the user's home directory.
This file is used for local debugging and integration tests in IDEs like PyCharm, VSCode, and IntelliJ IDEA while developing Databricks Platform Automation Stack, which includes Databricks SDKs for Python, Go, and Java, as well as Databricks Terraform Provider and Databricks CLI. This file enables multi-environment and multi-cloud testing with a single set of integration tests.
The file is typically structured as follows:
$ cat ~/.databricks/debug-env.json
{
"ws": {
"CLOUD_ENV": "azure",
"DATABRICKS_HOST": "....azuredatabricks.net",
"DATABRICKS_CLUSTER_ID": "0708-200540-...",
"DATABRICKS_WAREHOUSE_ID": "33aef...",
...
},
"acc": {
"CLOUD_ENV": "aws",
"DATABRICKS_HOST": "accounts.cloud.databricks.net",
"DATABRICKS_CLIENT_ID": "....",
"DATABRICKS_CLIENT_SECRET": "....",
...
}
}
And you can load it in your conftest.py
file as follows:
@pytest.fixture
def debug_env_name():
return "ws"
This will load the ws
environment from the ~/.databricks/debug-env.json
file.
If any of the environment variables are not found, env_or_skip
fixture
will gracefully skip the execution of tests.
See also debug_env
.
Loads environment variables specified in debug_env_name
fixture from a file
for local debugging in IDEs, otherwise allowing the tests to run with the default environment variables
specified in the CI/CD pipeline.
See also acc
, env_or_skip
, ws
, debug_env_name
, is_in_debug
.
Fixture to get environment variables or skip tests.
It is extremely useful to skip tests if the required environment variables are not set.
In the following example, test_something
would only run if the environment variable
SOME_EXTERNAL_SERVICE_TOKEN
is set:
def test_something(env_or_skip):
token = env_or_skip("SOME_EXTERNAL_SERVICE_TOKEN")
assert token is not None
See also acc
, make_run_as
, make_udf
, sql_backend
, debug_env
, is_in_debug
.
Create and provide a Databricks WorkspaceClient object.
This fixture initializes a Databricks WorkspaceClient object, which can be used to interact with the Databricks workspace API. The created instance of WorkspaceClient is shared across all test functions within the test session.
See detailed documentation for the list of environment variables that can be used to authenticate the WorkspaceClient.
In your test functions, include this fixture as an argument to use the WorkspaceClient:
def test_workspace_operations(ws):
clusters = ws.clusters.list_clusters()
assert len(clusters) >= 0
See also log_workspace_link
, make_alert_permissions
, make_authorization_permissions
, make_catalog
, make_cluster
, make_cluster_permissions
, make_cluster_policy
, make_cluster_policy_permissions
, make_dashboard_permissions
, make_directory
, make_directory_permissions
, make_experiment
, make_experiment_permissions
, make_feature_table
, make_feature_table_permissions
, make_group
, make_instance_pool
, make_instance_pool_permissions
, make_job
, make_job_permissions
, make_lakeview_dashboard_permissions
, make_model
, make_notebook
, make_notebook_permissions
, make_pipeline
, make_pipeline_permissions
, make_query
, make_query_permissions
, make_registered_model_permissions
, make_repo
, make_repo_permissions
, make_run_as
, make_secret_scope
, make_secret_scope_acl
, make_serving_endpoint
, make_serving_endpoint_permissions
, make_storage_credential
, make_udf
, make_user
, make_volume
, make_warehouse
, make_warehouse_permissions
, make_workspace_file
, make_workspace_file_path_permissions
, make_workspace_file_permissions
, spark
, sql_backend
, debug_env
, product_info
.
This fixture provides a function to create an account service principal via acc
fixture and
assign it to a workspace. The service principal is removed after the test is complete. The service principal is
created with a random display name and assigned to the workspace with the default permissions.
Use the account_groups
argument to assign the service principal to account groups, which have the required
permissions to perform a specific action.
Example:
def test_run_as_lower_privilege_user(make_run_as, ws):
run_as = make_run_as(account_groups=['account.group.name'])
through_query = next(run_as.sql_fetch_all("SELECT CURRENT_USER() AS my_name"))
me = ws.current_user.me()
assert me.user_name != through_query.my_name
Returned object has the following properties:
ws
: Workspace client that is authenticated as the ephemeral service principal.sql_backend
: SQL backend that is authenticated as the ephemeral service principal.sql_exec
: Function to execute a SQL statement on behalf of the ephemeral service principal.sql_fetch_all
: Function to fetch all rows from a SQL statement on behalf of the ephemeral service principal.display_name
: Display name of the ephemeral service principal.application_id
: Application ID of the ephemeral service principal.- if you want to have other fixtures available in the context of the ephemeral service principal, you can override
the
ws
fixture on the file level, which would make all workspace fixtures provided by this plugin to run as lower privilege ephemeral service principal. You cannot combine it with the account-admin-level principal you're using to create the ephemeral principal.
Example:
from pytest import fixture
@fixture
def ws(make_run_as):
run_as = make_run_as(account_groups=['account.group.used.for.all.tests.in.this.file'])
return run_as.ws
def test_creating_notebook_on_behalf_of_ephemeral_principal(make_notebook):
notebook = make_notebook()
assert notebook.exists()
This fixture currently doesn't work with Databricks Metadata Service authentication on Azure Databricks.
See also acc
, ws
, make_random
, env_or_skip
, log_account_link
, is_in_debug
.
Create and provide a Databricks AccountClient object.
This fixture initializes a Databricks AccountClient object, which can be used to interact with the Databricks account API. The created instance of AccountClient is shared across all test functions within the test session.
Requires DATABRICKS_ACCOUNT_ID
environment variable to be set. If DATABRICKS_HOST
points to a workspace host, the fixture would automatically determine the account host
from it.
See detailed documentation for the list of environment variables that can be used to authenticate the AccountClient.
In your test functions, include this fixture as an argument to use the AccountClient:
def test_listing_workspaces(acc):
workspaces = acc.workspaces.list()
assert len(workspaces) >= 1
See also log_account_link
, make_acc_group
, make_run_as
, debug_env
, product_info
, env_or_skip
.
Get Databricks Connect Spark session. Requires databricks-connect
package to be installed.
Usage:
def test_databricks_connect(spark):
rows = spark.sql("SELECT 1").collect()
assert rows[0][0] == 1
See also ws
.
Create and provide a SQL backend for executing statements.
Requires the environment variable DATABRICKS_WAREHOUSE_ID
to be set.
See also make_schema
, make_table
, make_udf
, sql_exec
, sql_fetch_all
, ws
, env_or_skip
.
Execute SQL statement and don't return any results.
See also sql_backend
.
Fetch all rows from a SQL statement.
See also sql_backend
.
Fixture to generate random strings.
This fixture provides a function to generate random strings of a specified length. The generated strings are created using a character set consisting of uppercase letters, lowercase letters, and digits.
To generate a random string with default length of 16 characters:
random_string = make_random()
assert len(random_string) == 16
To generate a random string with a specified length:
random_string = make_random(k=8)
assert len(random_string) == 8
See also make_acc_group
, make_catalog
, make_cluster
, make_cluster_policy
, make_directory
, make_experiment
, make_feature_table
, make_group
, make_instance_pool
, make_job
, make_model
, make_notebook
, make_pipeline
, make_query
, make_repo
, make_run_as
, make_schema
, make_secret_scope
, make_serving_endpoint
, make_table
, make_udf
, make_user
, make_volume
, make_warehouse
, make_workspace_file
.
Create a Databricks instance pool and clean it up after the test. Returns a function to create instance pools.
Use instance_pool_id
attribute from the returned object to get an ID of the pool.
Keyword Arguments:
instance_pool_name
(str, optional): The name of the instance pool. If not provided, a random name will be generated.node_type_id
(str, optional): The node type ID of the instance pool. If not provided, a node type with local disk and 16GB memory will be used.- other arguments are passed to
WorkspaceClient.instance_pools.create
method.
Usage:
def test_instance_pool(make_instance_pool):
logger.info(f"created {make_instance_pool()}")
See also ws
, make_random
, watchdog_remove_after
.
No description yet.
See also ws
.
Create a Databricks job and clean it up after the test. Returns a function to create jobs, that returns
a Job
instance.
Keyword Arguments:
name
(str, optional): The name of the job. If not provided, a random name will be generated.path
(str, optional): The path to the notebook or file used in the job. If not provided, a random notebook or file will be created- [DEPRECATED: Use
path
instead]notebook_path
(str, optional): The path to the notebook. If not provided, a random notebook will be created. content
(str | bytes, optional): The content of the notebook or file used in the job. If not provided, default content ofmake_notebook
will be used.task_type
(type[NotebookTask] | type[SparkPythonTask], optional): The type of task. If not provides,type[NotebookTask]
will be used.instance_pool_id
(str, optional): The instance pool id to add to the job cluster. If not provided, no instance pool will be used.spark_conf
(dict, optional): The Spark configuration of the job. If not provided, Spark configuration is not explicitly set.libraries
(list, optional): The list of libraries to install on the job.tags
(list[str], optional): A list of job tags. If not provided, no additional tags will be set on the job.tasks
(list[Task], optional): A list of job tags. If not provided, a single task with a notebook task will be created, along with a disposable notebook. Latest Spark version and a single worker clusters will be used to run this ephemeral job.
Usage:
def test_job(make_job):
logger.info(f"created {make_job()}")
See also ws
, make_random
, make_notebook
, make_workspace_file
, watchdog_remove_after
.
No description yet.
See also ws
.
Create a Databricks cluster, waits for it to start, and clean it up after the test.
Returns a function to create clusters. You can get cluster_id
attribute from the returned object.
Keyword Arguments:
single_node
(bool, optional): Whether to create a single-node cluster. Defaults to False.cluster_name
(str, optional): The name of the cluster. If not provided, a random name will be generated.spark_version
(str, optional): The Spark version of the cluster. If not provided, the latest version will be used.autotermination_minutes
(int, optional): The number of minutes before the cluster is automatically terminated. Defaults to 10.
Usage:
def test_cluster(make_cluster):
logger.info(f"created {make_cluster(single_node=True)}")
See also ws
, make_random
, watchdog_remove_after
.
No description yet.
See also ws
.
Create a Databricks cluster policy and clean it up after the test. Returns a function to create cluster policies,
which returns CreatePolicyResponse
instance.
Keyword Arguments:
name
(str, optional): The name of the cluster policy. If not provided, a random name will be generated.
Usage:
def test_cluster_policy(make_cluster_policy):
logger.info(f"created {make_cluster_policy()}")
See also ws
, make_random
, watchdog_purge_suffix
.
No description yet.
See also ws
.
Create Delta Live Table Pipeline and clean it up after the test. Returns a function to create pipelines.
Results in a CreatePipelineResponse
instance.
Keyword Arguments:
name
(str, optional): The name of the pipeline. If not provided, a random name will be generated.libraries
(list, optional): The list of libraries to install on the pipeline. If not provided, a random disposable notebook will be created.clusters
(list, optional): The list of clusters to use for the pipeline. If not provided, a single node cluster will be created with 16GB memory and local disk.
Usage:
def test_pipeline(make_pipeline, make_pipeline_permissions, make_group):
group = make_group()
pipeline = make_pipeline()
make_pipeline_permissions(
object_id=pipeline.pipeline_id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also ws
, make_random
, make_notebook
, watchdog_remove_after
, watchdog_purge_suffix
.
Create a Databricks warehouse and clean it up after the test. Returns a function to create warehouses.
Keyword Arguments:
warehouse_name
(str, optional): The name of the warehouse. If not provided, a random name will be generated.warehouse_type
(CreateWarehouseRequestWarehouseType, optional): The type of the warehouse. Defaults toPRO
.cluster_size
(str, optional): The size of the cluster. Defaults to2X-Small
.
Usage:
def test_warehouse_has_remove_after_tag(ws, make_warehouse):
new_warehouse = make_warehouse()
created_warehouse = ws.warehouses.get(new_warehouse.response.id)
warehouse_tags = created_warehouse.tags.as_dict()
assert warehouse_tags["custom_tags"][0]["key"] == "RemoveAfter"
See also ws
, make_random
, watchdog_remove_after
.
This fixture provides a function to manage Databricks workspace groups. Groups can be created with specified
members and roles, and they will be deleted after the test is complete. Deals with eventual consistency issues by
retrying the creation process for 30 seconds and then waiting for up to 3 minutes for the group to be provisioned.
Returns an instance of Group
.
Keyword arguments:
members
(list of strings): A list of user IDs to add to the group.roles
(list of strings): A list of roles to assign to the group.display_name
(str): The display name of the group.entitlements
(list of strings): A list of entitlements to assign to the group.
The following example creates a group with a single member and independently verifies that the group was created:
def test_new_group(make_group, make_user, ws):
user = make_user()
group = make_group(members=[user.id])
loaded = ws.groups.get(group.id)
assert group.display_name == loaded.display_name
assert group.members == loaded.members
See also ws
, make_random
, watchdog_purge_suffix
.
This fixture provides a function to manage Databricks account groups. Groups can be created with specified members and roles, and they will be deleted after the test is complete.
Has the same arguments and behavior as make_group
fixture but uses the account
client instead of the workspace client.
Example usage:
def test_new_account_group(make_acc_group, acc):
group = make_acc_group()
loaded = acc.groups.get(group.id)
assert group.display_name == loaded.display_name
See also acc
, make_random
, watchdog_purge_suffix
.
This fixture returns a function that creates a Databricks workspace user
and removes it after the test is complete. In case of random naming conflicts,
the fixture will retry the creation process for 30 seconds. Returns an instance
of User
. Usage:
def test_new_user(make_user, ws):
new_user = make_user()
home_dir = ws.workspace.get_status(f"/Users/{new_user.user_name}")
assert home_dir.object_type == ObjectType.DIRECTORY
See also ws
, make_random
, watchdog_purge_suffix
.
No description yet.
See also ws
.
Returns a function to create Databricks Notebooks and clean them up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the notebook. Defaults todummy-*
notebook in current user's home folder.content
(str | bytes | io.BinaryIO, optional): The content of the notebook. Defaults toprint(1)
for Python andSELECT 1
for SQL.language
(Language
, optional): The language of the notebook. Defaults toLanguage.PYTHON
.encoding
(str
, optional): The file encoding. Defaults tosys.getdefaultencoding()
.format
(ImportFormat
, optional): The format of the notebook. Defaults toImportFormat.SOURCE
.overwrite
(bool, optional): Whether to overwrite the notebook if it already exists. Defaults toFalse
.
This example creates a notebook and verifies that print(1)
is in the content:
def test_creates_some_notebook(make_notebook):
notebook = make_notebook()
assert "print(1)" in notebook.read_text()
See also make_job
, make_pipeline
, ws
, make_random
, watchdog_purge_suffix
.
No description yet.
See also ws
.
Returns a function to create Databricks workspace file and clean up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the file. Defaults todummy-*
notebook in current user's home folder.content
(str | bytes, optional): The content of the file. Defaults toprint(1)
for Python andSELECT 1
for SQL.language
(Language
, optional): The language of the notebook. Defaults toLanguage.PYTHON
.encoding
(str
, optional): The file encoding. Defaults tosys.getdefaultencoding()
.
This example creates a notebook and verifies that the workspace path is an existing file with contents print(1)
:
def test_create_file(make_workspace_file):
workspace_file = make_workspace_file()
assert workspace_file.is_file()
assert "print(1)" in workspace_file.read_text()
TODO:
Merge functionality with make_notebook
if WorkspacePath
supports creating notebooks.
See also make_job
, ws
, make_random
, watchdog_purge_suffix
.
Returns a function to create Databricks Workspace Folders and clean them up after the test.
The function returns os.PathLike
object.
Keyword arguments:
path
(str, optional): The path of the notebook. Defaults todummy-*
folder in current user's home folder.
This example creates a folder and verifies that it contains a notebook:
def test_creates_some_folder_with_a_notebook(make_directory, make_notebook):
folder = make_directory()
notebook = make_notebook(path=folder / 'foo.py')
files = [_.name for _ in folder.iterdir()]
assert ['foo.py'] == files
assert notebook.parent == folder
See also make_experiment
, ws
, make_random
, watchdog_purge_suffix
.
No description yet.
See also ws
.
Returns a function to create Databricks Repos and clean them up after the test.
The function returns a RepoInfo
object.
Keyword arguments:
url
(str, optional): The URL of the repository.provider
(str, optional): The provider of the repository.path
(str, optional): The path of the repository. Defaults to/Repos/{current_user}/sdk-{random}-{purge_suffix}
.
Usage:
def test_repo(make_repo):
logger.info(f"created {make_repo()}")
See also ws
, make_random
, watchdog_purge_suffix
.
No description yet.
See also ws
.
No description yet.
See also ws
.
No description yet.
See also ws
.
This fixture provides a function to create secret scopes. The created secret scope will be deleted after the test is complete. Returns the name of the secret scope.
To create a secret scope and use it within a test function:
def test_secret_scope_creation(make_secret_scope):
secret_scope_name = make_secret_scope()
assert secret_scope_name.startswith("dummy-")
See also ws
, make_random
.
This fixture provides a function to manage access control lists (ACLs) for secret scopes. ACLs define permissions for principals (users or groups) on specific secret scopes.
Arguments:
scope
: The name of the secret scope.principal
: The name of the principal (user or group).permission
: The permission level for the principal on the secret scope.
Returns a tuple containing the secret scope name and the principal name.
To manage secret scope ACLs using the make_secret_scope_acl fixture:
from databricks.sdk.service.workspace import AclPermission
def test_secret_scope_acl_management(make_user, make_secret_scope, make_secret_scope_acl):
scope_name = make_secret_scope()
principal_name = make_user().display_name
permission = AclPermission.READ
acl_info = make_secret_scope_acl(
scope=scope_name,
principal=principal_name,
permission=permission,
)
assert acl_info == (scope_name, principal_name)
See also ws
.
No description yet.
See also ws
.
Create a UDF and return its info. Remove it after the test. Returns instance of FunctionInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the UDF will be created. Default ishive_metastore
.schema_name
(str): The name of the schema where the UDF will be created. Default is a random string.name
(str): The name of the UDF. Default is a random string.hive_udf
(bool): IfTrue
, the UDF will be created as a Hive UDF. Default isFalse
.
Usage:
def test_make_some_udfs(make_schema, make_udf):
schema_a = make_schema(catalog_name="hive_metastore")
make_udf(schema_name=schema_a.name)
make_udf(schema_name=schema_a.name, hive_udf=True)
See also ws
, env_or_skip
, sql_backend
, make_schema
, make_random
.
Create a catalog and return its info. Remove it after the test.
Returns instance of CatalogInfo
.
Keyword Arguments:
name
(str): The name of the catalog. Default is a random string.
Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_volume
, ws
, make_random
, watchdog_remove_after
.
Create a schema and return its info. Remove it after the test. Returns instance of SchemaInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the schema will be created. Default ishive_metastore
.name
(str): The name of the schema. Default is a random string.location
(str): The path to the location if it should be a managed schema.
Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_table
, make_udf
, make_volume
, sql_backend
, make_random
, watchdog_remove_after
.
Create a table and return its info. Remove it after the test. Returns instance of TableInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the table will be created. Default ishive_metastore
.schema_name
(str): The name of the schema where the table will be created. Default is a random string.name
(str): The name of the table. Default is a random string.ctas
(str): The CTAS statement to create the table. Default isNone
.non_delta
(bool): IfTrue
, the table will be created as a non-delta table. Default isFalse
.external
(bool): IfTrue
, the table will be created as an external table. Default isFalse
.external_csv
(str): The location of the external CSV table. Default isNone
.external_delta
(str): The location of the external Delta table. Default isNone
.view
(bool): IfTrue
, the table will be created as a view. Default isFalse
.tbl_properties
(dict): The table properties. Default isNone
.hiveserde_ddl
(str): The DDL statement to create the table. Default isNone
.storage_override
(str): The storage location override. Default isNone
.columns
(list): The list of columns. Default isNone
.
Usage:
def test_catalog_fixture(make_catalog, make_schema, make_table):
from_catalog = make_catalog()
from_schema = make_schema(catalog_name=from_catalog.name)
from_table_1 = make_table(catalog_name=from_catalog.name, schema_name=from_schema.name)
logger.info(f"Created new schema: {from_table_1}")
See also make_query
, sql_backend
, make_schema
, make_random
, watchdog_remove_after
.
Create a storage credential and return its info. Remove it after the test. Returns instance of StorageCredentialInfo
.
Keyword Arguments:
credential_name
(str): The name of the storage credential. Default is a random string.application_id
(str): The application ID for the Azure service principal. Default is an empty string.client_secret
(str): The client secret for the Azure service principal. Default is an empty string.directory_id
(str): The directory ID for the Azure service principal. Default is an empty string.aws_iam_role_arn
(str): The ARN of the AWS IAM role. Default is an empty string.read_only
(bool): IfTrue
, the storage credential will be read-only. Default isFalse
.
Usage:
def test_storage_credential(env_or_skip, make_storage_credential, make_random):
random = make_random(6).lower()
credential_name = f"dummy-{random}"
make_storage_credential(
credential_name=credential_name,
aws_iam_role_arn=env_or_skip("TEST_UBER_ROLE_ID"),
)
See also ws
, watchdog_remove_after
.
Create a volume and return its info. Remove it after the test. Returns instance of VolumeInfo
.
Keyword Arguments:
catalog_name
(str): The name of the catalog where the schema and the volume are.schema_name
(str): The name of the schema where the volume is.name
(str): The name of the volume.comment
(str, optional): The comment attached to the volume.
Usage:
def test_volume_creation(make_catalog, make_schema, make_volume, make_random):
# Create a catalog
catalog = make_catalog()
# Create a schema in the catalog
schema = make_schema(catalog_name=catalog.name)
# Generate a random name for the volume
volume_name = f"dummy_vol_{make_random(6).lower()}"
# Create the volume
volume = make_volume(
catalog_name=catalog.name,
schema_name=schema.name,
name=volume_name
)
See also ws
, make_catalog
, make_schema
, make_random
.
No description yet.
Returns a function to create Databricks Models and clean them up after the test.
The function returns a GetModelResponse
object.
Keyword arguments:
model_name
(str, optional): The name of the model. Defaults todummy-*
.
Usage:
from databricks.sdk.service.iam import PermissionLevel
def test_models(make_group, make_model, make_registered_model_permissions):
group = make_group()
model = make_model()
make_registered_model_permissions(
object_id=model.id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also make_serving_endpoint
, ws
, make_random
, watchdog_remove_after
.
Returns a function to create Databricks Experiments and clean them up after the test.
The function returns a CreateExperimentResponse
object.
Keyword arguments:
path
(str, optional): The path of the experiment. Defaults todummy-*
experiment in current user's home folder.experiment_name
(str, optional): The name of the experiment. Defaults todummy-*
.
Usage:
from databricks.sdk.service.iam import PermissionLevel
def test_experiments(make_group, make_experiment, make_experiment_permissions):
group = make_group()
experiment = make_experiment()
make_experiment_permissions(
object_id=experiment.experiment_id,
permission_level=PermissionLevel.CAN_MANAGE,
group_name=group.display_name,
)
See also ws
, make_random
, make_directory
, watchdog_purge_suffix
.
No description yet.
See also ws
.
No description yet.
See also ws
.
No description yet.
See also ws
.
rns a function to log a workspace link.
See also ws
.
rns a function to log an account link.
See also make_run_as
, acc
.
No description yet.
See also ws
.
No description yet.
See also ws
.
Create a query and remove it after the test is done. Returns the LegacyQuery
object.
Keyword Arguments:
sql_query
: The query to be stored. Default isSELECT * FROM <newly created random table>
.
Usage:
from databricks.sdk.service.sql import PermissionLevel
def test_permissions_for_redash(
make_user,
make_query,
make_query_permissions,
):
user = make_user()
query = make_query()
make_query_permissions(
object_id=query.id,
permission_level=PermissionLevel.CAN_EDIT,
user_name=user.display_name,
)
See also ws
, make_table
, make_random
, watchdog_remove_after
.
No description yet.
See also ws
.
No description yet.
See also ws
.
Returns a function to create Databricks Serving Endpoints and clean them up after the test.
The function returns a ServingEndpointDetailed
object.
Under the covers, this fixture also creates a model to serve on a small workload size.
Usage:
def test_endpoints(make_group, make_serving_endpoint, make_serving_endpoint_permissions):
group = make_group()
endpoint = make_serving_endpoint()
make_serving_endpoint_permissions(
object_id=endpoint.response.id,
permission_level=PermissionLevel.CAN_QUERY,
group_name=group.display_name,
)
See also ws
, make_random
, make_model
, watchdog_remove_after
.
No description yet.
See also ws
.
No description yet.
See also ws
, make_random
.
No description yet.
See also ws
.
Purge time for test objects, representing the (UTC-based) hour from which objects may be purged.
See also make_catalog
, make_cluster
, make_instance_pool
, make_job
, make_model
, make_pipeline
, make_query
, make_schema
, make_serving_endpoint
, make_storage_credential
, make_table
, make_warehouse
, watchdog_purge_suffix
.
HEX-encoded purge time suffix for test objects.
See also make_acc_group
, make_cluster_policy
, make_directory
, make_experiment
, make_group
, make_notebook
, make_pipeline
, make_repo
, make_user
, make_workspace_file
, watchdog_remove_after
.
Returns true if the test is running from a debugger in IDE, otherwise false.
The following IDE are supported: IntelliJ IDEA (including Community Edition), PyCharm (including Community Edition), and Visual Studio Code.
See also debug_env
, env_or_skip
, make_run_as
.
Please note that this project is provided for your exploration only and is not formally supported by Databricks with Service Level Agreements (SLAs). They are provided AS-IS, and we do not make any guarantees of any kind. Please do not submit a support ticket relating to any issues arising from the use of this project.
Any issues discovered through the use of this project should be filed as GitHub Issues on this repository. They will be reviewed as time permits, but no formal SLAs for support exist.