Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of LakeFS #14

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7cf606e
lakefs python code for branch creation, committing and merging
jameshod5 Sep 3, 2024
e0b0fe4
initial test of lakefs workflow
jameshod5 Sep 3, 2024
eb69a9d
added crude versioning to local ingestion
Sep 5, 2024
06889bf
data versioning task added
Sep 5, 2024
1ae88df
removed config requirements, reads from lakectl config now
Sep 5, 2024
aa87edd
removed cleanup function and versioning from workflow file
Sep 6, 2024
27ffe0b
improved versioning to get past mpi processes overlapping
Sep 6, 2024
bb327df
seperate lakefs versioning task, run external to ingestion
Sep 6, 2024
190fa49
removed mpi versioning, now doing seperate versioning process
Sep 9, 2024
0e0437f
added extra validation for data dir, lakectl install
Sep 9, 2024
781df1e
added cleanup functionality
Sep 9, 2024
30831e8
removed cleanup from workflow, moved to lake_fs.py
Sep 9, 2024
d866f06
removed unused versioning data task
Sep 9, 2024
fd80c08
create branch for versioning at beginning
Sep 12, 2024
a3b586f
commit shot after it is creating to branch ingestion
Sep 12, 2024
207074e
removed versioning from workflow
Sep 12, 2024
5cdaed6
working example of versioning, committing each shot at the end of the…
Sep 12, 2024
8613524
organised versioning into functions
Sep 12, 2024
a425809
added option to turn on or off versioning
Sep 16, 2024
2313959
removed args for versioning for now
Sep 16, 2024
42c32f8
added merge and branch delete
Sep 17, 2024
306f868
rework of ingestion workflow to use lakefs instead, does not work ful…
Sep 17, 2024
7629c80
added committing to the branch and cleanup to remove local files afte…
Sep 18, 2024
5190edd
included merge at the end of versioning run
Sep 18, 2024
885508c
upload and commit the shot name instead of the local_path prefix
Sep 18, 2024
ade3c38
WIP file. Location of merge and execute functions used in main.
Sep 18, 2024
a270e28
now able to use --upload arg as an option to enable lakefs versioning…
Sep 18, 2024
950f186
added branch to config to enable merging of ingestion branch into mai…
Sep 18, 2024
d813606
removed excess class
Sep 18, 2024
5a8919e
better info messages
Sep 20, 2024
239e971
removed logging info
Sep 20, 2024
426e6f9
ruff fixes
Sep 20, 2024
5ea3f84
ruff fixes
Sep 20, 2024
fe93dce
changed job to work with lakefs command
Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions jobs/ingest.csd3.slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@
#SBATCH -A UKAEA-AP002-CPU
#SBATCH -p icelake
#SBATCH --job-name=fair-mast-ingest
#SBATCH --output=fair-mast-ingest_%A.out
#SBATCH --output=outputs/fair-mast-ingest_%A.out
#SBATCH --time=5:00:00
#SBATCH --mem=250G
#SBATCH --ntasks=128
#SBATCH --mem=40G
#SBATCH --ntasks=8
#SBATCH -N 2


summary_file=$1
bucket_path=$2
num_workers=$SLURM_NTASKS

random_string=$(head /dev/urandom | tr -dc A-Za-z0-9 | head -c 16)
temp_dir="/rds/project/rds-sPGbyCAPsJI/local_cache/$random_string"
temp_dir="data/local"
metadata_dir="/rds/project/rds-sPGbyCAPsJI/data/uda"

source /rds/project/rds-sPGbyCAPsJI/uda-ssl.sh

mpirun -np $num_workers \
python3 -m src.main $temp_dir $summary_file --metadata_dir $metadata_dir --bucket_path $bucket_path --upload --force --source_names ${@:3}
python3 -m src.main $temp_dir $summary_file --metadata_dir $metadata_dir --upload example-repo --force --source_names ${@:3}
32 changes: 32 additions & 0 deletions src/lake_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging
import subprocess
from datetime import datetime
import uuid

def execute_command(command):
try:
result = subprocess.run(command, check=True, capture_output=True, text=True)
logging.info("Command output: %s", result.stdout)
return result
except subprocess.CalledProcessError as e:
logging.error("Error executing command: %s", e.stderr)
return e

def create_branch(repo):
branch_name = f"branch-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{uuid.uuid4().hex[:8]}"
command = [
"lakectl", "branch", "create",
f"lakefs://{repo}/{branch_name}/",
"-s", f"lakefs://{repo}/main/"
]
execute_command(command)
return branch_name

def lakefs_merge_into_main(repo, branch):
command = [
"lakectl", "merge",
f"lakefs://{repo}/{branch}/",
f"lakefs://{repo}/main/",
"-m", f"Merge {branch} branch into main"
]
execute_command(command)
32 changes: 15 additions & 17 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import logging
from functools import partial
from dask_mpi import initialize
from src.uploader import UploadConfig
from src.workflow import S3IngestionWorkflow, LocalIngestionWorkflow, WorkflowManager
from src.lake_fs import lakefs_merge_into_main, create_branch
from src.uploader import LakeFSUploadConfig
from src.workflow import LakeFSIngestionWorkflow, LocalIngestionWorkflow, WorkflowManager
from src.utils import read_shot_file


def main():

initialize()
logging.basicConfig(level=logging.INFO)

Expand All @@ -18,11 +19,10 @@ def main():

parser.add_argument("dataset_path")
parser.add_argument("shot_file")
parser.add_argument("--bucket_path")
parser.add_argument("--credentials_file", default=".s5cfg.stfc")
parser.add_argument("--credentials_file", default="lakectl.cfg")
parser.add_argument("--serial", default=False, action='store_true')
parser.add_argument("--endpoint_url", default="https://s3.echo.stfc.ac.uk")
parser.add_argument("--upload", default=False, action="store_true")
parser.add_argument("--endpoint_url", default="http://localhost:8000")
parser.add_argument("--upload", nargs='?', const=False, default=False)
parser.add_argument("--metadata_dir", default="data/uda")
parser.add_argument("--force", action="store_true")
parser.add_argument("--signal_names", nargs="*", default=[])
Expand All @@ -31,23 +31,19 @@ def main():
parser.add_argument("--facility", choices=['MAST', 'MASTU'], default='MAST')

args = parser.parse_args()

if args.upload:
bucket_path = args.bucket_path
# Bucket path must have trailing slash
bucket_path = bucket_path + '/' if not bucket_path.endswith('/') else bucket_path

config = UploadConfig(
new_branch = create_branch(args.upload)
config = LakeFSUploadConfig(
credentials_file=args.credentials_file,
endpoint_url=args.endpoint_url,
url=bucket_path,
repository=args.upload,
branch=new_branch
)
workflow_cls = partial(S3IngestionWorkflow, upload_config=config)
workflow_cls = partial(LakeFSIngestionWorkflow, upload_config=config)
else:
config = None
workflow_cls = LocalIngestionWorkflow


shot_list = read_shot_file(args.shot_file)

for source in args.source_names:
Expand All @@ -65,9 +61,11 @@ def main():
)

workflow_manager = WorkflowManager(workflow)
workflow_manager.run_workflows(shot_list, parallel = not args.serial)
workflow_manager.run_workflows(shot_list, parallel=not args.serial)
logging.info(f"Finished source {source}")

if args.upload:
lakefs_merge_into_main(args.upload, new_branch)

if __name__ == "__main__":
main()
77 changes: 54 additions & 23 deletions src/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from src.mast import MASTClient
from src.reader import DatasetReader, SignalMetadataReader, SourceMetadataReader
from src.writer import DatasetWriter
from src.uploader import UploadConfig

from src.uploader import LakeFSUploadConfig
logging.basicConfig(level=logging.INFO)



class CleanupDatasetTask:

def __init__(self, path: str) -> None:
Expand All @@ -28,43 +28,74 @@ def __call__(self):
logging.warning(f"Cannot remove path: {self.path}")


class UploadDatasetTask:
class LakeFSUploadDatasetTask:

def __init__(self, local_file: Path, config: UploadConfig):
def __init__(self, local_file: Path, shot_name: Path, config: LakeFSUploadConfig):
self.config = config
self.local_file = local_file
self.shot_name = shot_name

def __call__(self):
logging.info(f"Uploading {self.local_file} to {self.config.url}")

if not Path(self.config.credentials_file).exists():
raise RuntimeError(f"Credentials file {self.config.credentials_file} does not exist!")

env = os.environ.copy()

logging.info(f"Attempting to upload {self.local_file} to repository: {self.config.repository}...")
args = [
"s5cmd",
"--credentials-file",
self.config.credentials_file,
"--endpoint-url",
self.config.endpoint_url,
"cp",
"--acl",
"public-read",
str(self.local_file),
self.config.url,
"lakectl", "fs", "upload",
f"lakefs://{self.config.repository}/{self.config.branch}/{self.shot_name}",
"-s", str(self.local_file), "--recursive"
]

logging.debug(' ' .join(args))

subprocess.run(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
env=env,
check=True,
)
try:
result = subprocess.run(
args,
capture_output=True,
env=env,
check=True
)
logging.info(f"Successfully uploaded {self.local_file} to repository: {self.config.repository}.")
logging.debug(f"Command output: {result.stdout.decode()}")
except subprocess.CalledProcessError as e:
logging.error(f"Failed to upload {self.local_file}: {e.stderr.decode()}")
raise

class LakeFSCommitDatasetTask:

def __init__(self, local_file: Path, config: LakeFSUploadConfig):
self.config = config
self.local_file = local_file

def __call__(self):

if not Path(self.config.credentials_file).exists():
raise RuntimeError(f"Credentials file {self.config.credentials_file} does not exist!")

env = os.environ.copy()
logging.info(f"Attempting to commit {self.local_file} to branch: {self.config.branch}...")
args = [
"lakectl", "commit",
f"lakefs://{self.config.repository}/{self.config.branch}/",
"-m", f"Commit file {self.local_file}"
]

logging.debug(' ' .join(args))

try:
result = subprocess.run(
args,
capture_output=True,
env=env,
check=True
)
logging.info(f"Successfully committed {self.local_file} to branch: {self.config.branch}...")
logging.debug(f"Command output: {result.stdout.decode()}")
except subprocess.CalledProcessError as e:
logging.error(f"Failed to commit {self.local_file}: {e.stderr.decode()}")
raise

class CreateDatasetTask:

Expand Down
7 changes: 7 additions & 0 deletions src/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ class UploadConfig:
url: str
endpoint_url: str
credentials_file: str

@dataclass
class LakeFSUploadConfig:
endpoint_url: str
credentials_file: str
repository: str
branch: str
32 changes: 14 additions & 18 deletions src/workflow.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import s3fs
import logging
from pathlib import Path
from dask.distributed import Client, as_completed
from src.task import (
CreateDatasetTask,
UploadDatasetTask,
CleanupDatasetTask,
LakeFSUploadDatasetTask,
LakeFSCommitDatasetTask,
CreateSignalMetadataTask,
CreateSourceMetadataTask,
CleanupDatasetTask
)
from src.uploader import UploadConfig
from src.uploader import LakeFSUploadConfig

logging.basicConfig(level=logging.INFO)

Expand All @@ -33,13 +33,13 @@ def __call__(self, shot: int):
logging.error(f"Could not parse source metadata for shot {shot}: {e}")


class S3IngestionWorkflow:
class LakeFSIngestionWorkflow:

def __init__(
self,
metadata_dir: str,
data_dir: str,
upload_config: UploadConfig,
upload_config: LakeFSUploadConfig,
force: bool = True,
signal_names: list[str] = [],
source_names: list[str] = [],
Expand All @@ -52,14 +52,12 @@ def __init__(
self.force = force
self.signal_names = signal_names
self.source_names = source_names
self.fs = s3fs.S3FileSystem(
anon=True, client_kwargs={"endpoint_url": self.upload_config.endpoint_url}
)
self.file_format = file_format
self.facility = facility

def __call__(self, shot: int):
local_path = self.data_dir / f"{shot}.{self.file_format}"
shot_name = f"{shot}.{self.file_format}"
create = CreateDatasetTask(
self.metadata_dir,
self.data_dir,
Expand All @@ -70,21 +68,18 @@ def __call__(self, shot: int):
self.facility
)

upload = UploadDatasetTask(local_path, self.upload_config)
upload = LakeFSUploadDatasetTask(local_path, shot_name, self.upload_config)
commit = LakeFSCommitDatasetTask(local_path, self.upload_config)
cleanup = CleanupDatasetTask(local_path)

try:
url = self.upload_config.url + f"{shot}.{self.file_format}"
if self.force or not self.fs.exists(url):
create()
upload()
else:
logging.info(f"Skipping shot {shot} as it already exists")
create()
upload()
commit()
cleanup()
except Exception as e:
logging.error(f"Failed to run workflow with error {type(e)}: {e}")

cleanup()

class LocalIngestionWorkflow:

def __init__(
Expand Down Expand Up @@ -120,6 +115,7 @@ def __call__(self, shot: int):

try:
create()

except Exception as e:
import traceback
trace = traceback.format_exc()
Expand Down