diff --git a/src/main.py b/src/main.py index 4901801..1305104 100644 --- a/src/main.py +++ b/src/main.py @@ -8,16 +8,55 @@ from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager from src.utils import read_shot_file import subprocess +from pathlib import Path -def main(): - # Initialize the MPI communicator +def initialize_lakefs_branch(): + """Initialize the lakeFS ingestion branch if on rank 0.""" comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: lakefs.repository("example-repo").branch("ingestion").create(source_reference="main") +def execute_lakectl_command(command, error_message): + """Helper function to execute lakectl commands with error handling.""" + try: + result = subprocess.run(command, check=True, capture_output=True, text=True) + logging.info("Command executed successfully.") + logging.info("Output: %s", result.stdout) + return True + except subprocess.CalledProcessError as e: + logging.error(error_message) + logging.error("Error message: %s", e.stderr) + return False + + +def upload_shot_to_lakefs(shot, dataset_path, file_format): + """Upload a specific shot to lakeFS.""" + file_path = f"{dataset_path}/{shot}.{file_format}" + upload_command = [ + "lakectl", "fs", "upload", + f"lakefs://example-repo/ingestion/{shot}.{file_format}", + "-s", str(file_path), "--recursive" + ] + if execute_lakectl_command(upload_command, f"Failed to upload shot {shot} to lakeFS"): + logging.info(f"Uploaded shot {shot} to lakeFS.") + + +def commit_shot_to_lakefs(shot): + """Commit the uploaded shot to lakeFS.""" + commit_command = [ + "lakectl", "commit", + "lakefs://example-repo/ingestion/", + "-m", f"Commit shot {shot}" + ] + if execute_lakectl_command(commit_command, f"Failed to commit shot {shot} to lakeFS"): + logging.info(f"Committed shot {shot} to lakeFS.") + +def main(): + + initialize_lakefs_branch() initialize() logging.basicConfig(level=logging.INFO) @@ -43,10 +82,7 @@ def main(): args = parser.parse_args() if args.upload: - bucket_path = args.bucket_path - # Bucket path must have trailing slash - bucket_path = bucket_path + '/' if not bucket_path.endswith('/') else bucket_path - + bucket_path = args.bucket_path.rstrip('/') + '/' config = UploadConfig( credentials_file=args.credentials_file, endpoint_url=args.endpoint_url, @@ -57,7 +93,6 @@ def main(): config = None workflow_cls = LocalIngestionWorkflow - shot_list = read_shot_file(args.shot_file) for source in args.source_names: @@ -75,39 +110,14 @@ def main(): ) workflow_manager = WorkflowManager(workflow) - workflow_manager.run_workflows(shot_list, parallel = not args.serial) + workflow_manager.run_workflows(shot_list, parallel=not args.serial) logging.info(f"Finished source {source}") + # Upload and commit each shot to lakeFS for shot in shot_list: - file_path = args.dataset_path + f"/{shot}.{args.file_format}" - command = [ - "lakectl", "fs", "upload", - f"lakefs://example-repo/ingestion/{shot}.{args.file_format}", - "-s", str(file_path), "--recursive" - ] - - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - print("Command executed successfully.") - print("Output:", result.stdout) - except subprocess.CalledProcessError as e: - print("An error occurred while executing the command.") - print("Error message:", e.stderr) - - command = [ - "lakectl", "commit", - f"lakefs://example-repo/ingestion/", - "-m", f"Commit shot {shot}" - ] - - try: - result = subprocess.run(command, check=True, capture_output=True, text=True) - print("Command executed successfully.") - print("Output:", result.stdout) - except subprocess.CalledProcessError as e: - print("An error occurred while executing the command.") - print("Error message:", e.stderr) + upload_shot_to_lakefs(shot, args.dataset_path, args.file_format) + commit_shot_to_lakefs(shot) if __name__ == "__main__": - main() \ No newline at end of file + main()