From c51d531f08575685f79195467b7b50477e3ea68b Mon Sep 17 00:00:00 2001 From: Ricky Schools Date: Wed, 8 May 2024 12:07:47 -0400 Subject: [PATCH] Update deploy.py --- dltflow/cli/deploy.py | 79 ++++++++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 35 deletions(-) diff --git a/dltflow/cli/deploy.py b/dltflow/cli/deploy.py index 733ba91..1432426 100644 --- a/dltflow/cli/deploy.py +++ b/dltflow/cli/deploy.py @@ -47,7 +47,7 @@ import sys import yaml import pathlib -sys.path.append('{package_namespace}') +sys.path.append('/dbfs{package_namespace}') """ _CODE_IMPORT_AND_LAUNCH_CELLS = """ @@ -67,12 +67,13 @@ def upload_py_package_to_workspace( - run_id: str, - dbfs_path: pathlib.Path, - package_path: pathlib.Path, - files: list, - profile: str, - ws_client: WorkspaceClient = None, + run_id: str, + dbfs_path: pathlib.Path, + package_path: pathlib.Path, + config_path: pathlib.Path, + files: list, + profile: str, + ws_client: WorkspaceClient = None, ): """ Upload a python package to the Databricks workspace. The logic being leveraged by @@ -108,13 +109,18 @@ def upload_py_package_to_workspace( dbx_echo(f"Upload directory: [yellow]`{upload_dbfs_path}`") ws_client.dbfs.mkdirs(str(upload_dbfs_path)) - for file in files: - file = pathlib.Path(file).absolute() - ws_client.dbfs.copy( - f"file://{file}/", - str(upload_dbfs_path), - recursive=True, - ) + ws_client.dbfs.copy(f"file://{package_path}/", str(upload_dbfs_path), recursive=True) + ws_client.dbfs.copy(f'file://{config_path}/', str(upload_dbfs_path), recursive=True) + + # for file in files: + # file = pathlib.Path(file).absolute() + # parent_dir = file.parent + # ws_client.dbfs.mkdirs(str(parent_dir)) + # ws_client.dbfs.copy( + # f"file://{file}/", + # str(upload_dbfs_path), + # recursive=True, + # ) dbx_echo(f"📦📤 [green]Upload complete!") return "/dbfs" + str(upload_dbfs_path) @@ -160,10 +166,10 @@ def get_module_info_from_path_name(project_name, project_path, module_path: str) def create_notebook_and_upload( - ws_client: WorkspaceClient, - dbfs_path: pathlib.Path, - notebook_name: str, - notebook_src: str, + ws_client: WorkspaceClient, + dbfs_path: pathlib.Path, + notebook_name: str, + notebook_src: str, ): """ This function creates a DLT pipeline notebook and uploads it to the Databricks workspace. @@ -206,15 +212,16 @@ def create_notebook_and_upload( def deploy_py_module_as_notebook( - run_id: str, - ws_client: WorkspaceClient, - project_name: str, - workflow_name: str, - dbfs_path: pathlib.Path, - package_path: pathlib.Path, - items: list, - dependencies: list = None, - launch_method: str = "launch", + run_id: str, + ws_client: WorkspaceClient, + project_name: str, + workflow_name: str, + dbfs_path: pathlib.Path, + package_path: pathlib.Path, + config_path: pathlib.Path, + items: list, + dependencies: list = None, + launch_method: str = "launch", ): """ This function deploys a python module as a DLT pipeline to the Databricks workspace. @@ -253,7 +260,7 @@ def deploy_py_module_as_notebook( ) for item in items: - cfg = f'config_path = "{str(package_namespace.joinpath(item.get("config_path")))}"' + cfg = f'config_path = "/dbfs{str(package_namespace.joinpath(item.get("config_path")))}"' class_name, import_path = get_module_info_from_path_name( project_name, package_path, item.get("python_file") ) @@ -268,6 +275,7 @@ def deploy_py_module_as_notebook( run_id=run_id, dbfs_path=dbfs_path, package_path=package_path, + config_path=config_path, files=item.values(), profile=ws_client.config.profile, ws_client=ws_client, @@ -287,11 +295,11 @@ def deploy_py_module_as_notebook( def register_notebook_as_dlt_pipeline( - ws_client: WorkspaceClient, - notebook_path: str, - pipeline_payload: DLTFlowPipeline, - project_conf, - as_individual: bool, + ws_client: WorkspaceClient, + notebook_path: str, + pipeline_payload: DLTFlowPipeline, + project_conf, + as_individual: bool, ): # pragma: no cover """ Register a notebook as a DLT pipeline in the Databricks workspace. @@ -329,7 +337,7 @@ def register_notebook_as_dlt_pipeline( def handle_deployment_per_file_config_dependency( - workflow_name, items, dependencies, deployment_config, profile, as_individual: bool + workflow_name, items, dependencies, deployment_config, profile, as_individual: bool ): """ A helper function to handle the deployment of a python module as a DLT pipeline. @@ -353,7 +361,7 @@ def handle_deployment_per_file_config_dependency( config: ProjectConfig = get_config() project_path = pathlib.Path(".") _code_path = project_path.joinpath(config.include.code.name).resolve() - _cfg_path = project_path.joinpath(config.include.conf.name) + _cfg_path = project_path.joinpath(config.include.conf.name).resolve() assert profile in config.targets, f"Profile {profile} not found in config file." _target_config = config.targets.get(profile) _dbfs_path = pathlib.Path(config.targets.get(profile).workspace.root_path) @@ -367,6 +375,7 @@ def handle_deployment_per_file_config_dependency( workflow_name=workflow_name, dbfs_path=_dbfs_path, package_path=_code_path, + config_path=_cfg_path, items=items, dependencies=dependencies, launch_method=config.include.code.dict().get("launch_method", "launch"),