Skip to content

Commit

Permalink
organised versioning into functions
Browse files Browse the repository at this point in the history
  • Loading branch information
James Hodson committed Sep 12, 2024
1 parent 5cdaed6 commit 8613524
Showing 1 changed file with 47 additions and 37 deletions.
84 changes: 47 additions & 37 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,55 @@
from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager
from src.utils import read_shot_file
import subprocess
from pathlib import Path

def main():

# Initialize the MPI communicator
def initialize_lakefs_branch():
"""Initialize the lakeFS ingestion branch if on rank 0."""
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
if rank == 0:
lakefs.repository("example-repo").branch("ingestion").create(source_reference="main")


def execute_lakectl_command(command, error_message):
"""Helper function to execute lakectl commands with error handling."""
try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
logging.info("Command executed successfully.")
logging.info("Output: %s", result.stdout)
return True
except subprocess.CalledProcessError as e:
logging.error(error_message)
logging.error("Error message: %s", e.stderr)
return False


def upload_shot_to_lakefs(shot, dataset_path, file_format):
"""Upload a specific shot to lakeFS."""
file_path = f"{dataset_path}/{shot}.{file_format}"
upload_command = [
"lakectl", "fs", "upload",
f"lakefs://example-repo/ingestion/{shot}.{file_format}",
"-s", str(file_path), "--recursive"
]
if execute_lakectl_command(upload_command, f"Failed to upload shot {shot} to lakeFS"):
logging.info(f"Uploaded shot {shot} to lakeFS.")


def commit_shot_to_lakefs(shot):
"""Commit the uploaded shot to lakeFS."""
commit_command = [
"lakectl", "commit",
"lakefs://example-repo/ingestion/",
"-m", f"Commit shot {shot}"
]
if execute_lakectl_command(commit_command, f"Failed to commit shot {shot} to lakeFS"):
logging.info(f"Committed shot {shot} to lakeFS.")

def main():

initialize_lakefs_branch()
initialize()
logging.basicConfig(level=logging.INFO)

Expand All @@ -43,10 +82,7 @@ def main():
args = parser.parse_args()

if args.upload:
bucket_path = args.bucket_path
# Bucket path must have trailing slash
bucket_path = bucket_path + '/' if not bucket_path.endswith('/') else bucket_path

bucket_path = args.bucket_path.rstrip('/') + '/'
config = UploadConfig(
credentials_file=args.credentials_file,
endpoint_url=args.endpoint_url,
Expand All @@ -57,7 +93,6 @@ def main():
config = None
workflow_cls = LocalIngestionWorkflow


shot_list = read_shot_file(args.shot_file)

for source in args.source_names:
Expand All @@ -75,39 +110,14 @@ def main():
)

workflow_manager = WorkflowManager(workflow)
workflow_manager.run_workflows(shot_list, parallel = not args.serial)
workflow_manager.run_workflows(shot_list, parallel=not args.serial)
logging.info(f"Finished source {source}")

# Upload and commit each shot to lakeFS
for shot in shot_list:
file_path = args.dataset_path + f"/{shot}.{args.file_format}"
command = [
"lakectl", "fs", "upload",
f"lakefs://example-repo/ingestion/{shot}.{args.file_format}",
"-s", str(file_path), "--recursive"
]

try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
print("Command executed successfully.")
print("Output:", result.stdout)
except subprocess.CalledProcessError as e:
print("An error occurred while executing the command.")
print("Error message:", e.stderr)

command = [
"lakectl", "commit",
f"lakefs://example-repo/ingestion/",
"-m", f"Commit shot {shot}"
]

try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
print("Command executed successfully.")
print("Output:", result.stdout)
except subprocess.CalledProcessError as e:
print("An error occurred while executing the command.")
print("Error message:", e.stderr)
upload_shot_to_lakefs(shot, args.dataset_path, args.file_format)
commit_shot_to_lakefs(shot)


if __name__ == "__main__":
main()
main()

0 comments on commit 8613524

Please sign in to comment.