Skip to content

Commit

Permalink
✨ Add Genome Project datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
agmangas committed Jan 27, 2024
1 parent a28b91c commit f92b8ca
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 4 deletions.
3 changes: 2 additions & 1 deletion moderate/moderate/datasets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from moderate.datasets.building_stock import (
building_stock,
platform_asset_building_stock,
platform_asset_building_stock_pv,
platform_asset_building_stock_epc,
platform_asset_building_stock_pv,
)
from moderate.datasets.genome_project import genome_project_datasets
92 changes: 92 additions & 0 deletions moderate/moderate/datasets/genome_project.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import glob
import os
import pprint
from typing import List

import pandas as pd
from dagster import Config, MetadataValue, Output, asset, get_dagster_logger

from moderate.datasets.enums import DataFormats
from moderate.datasets.utils import (
GitAssetForPlatform,
GitRepo,
ListOfGitAssetForPlatformDagsterType,
clone_git_repo,
dataset_name_from_file_path,
)


class GenomeProjectDatasetsConfig(Config):
git_url: str = "https://github.com/buds-lab/building-data-genome-project-2.git"
git_treeish: str = "master"

dataset_paths: List[str] = [
"data/meters/cleaned/*.csv",
"data/weather/*.csv",
"data/metadata/*.csv",
]


@asset(dagster_type=ListOfGitAssetForPlatformDagsterType)
def genome_project_datasets(
config: GenomeProjectDatasetsConfig,
) -> Output[List[GitAssetForPlatform]]:
logger = get_dagster_logger()
git_repo = GitRepo(repo_url=config.git_url, tree_ish=config.git_treeish)

with clone_git_repo(
config=git_repo, lfs=True, lfs_globs=config.dataset_paths
) as cloned_repo:
file_paths = [
os.path.abspath(file_path)
for dataset_path in config.dataset_paths
for file_path in glob.glob(os.path.join(cloned_repo.repo_dir, dataset_path))
if os.path.exists(os.path.abspath(file_path))
]

logger.debug("Found the following files:\n%s", pprint.pformat(file_paths))
read_csv_kwargs = {}

dfs = {
fpath: pd.read_csv(fpath, **read_csv_kwargs.get(fpath, {}))
for fpath in file_paths
}

for fpath, df in dfs.items():
logger.debug("Sample of DataFrame '%s':\n%s", fpath, df.head())

parquet_bytes = {fpath: df.to_parquet(path=None) for fpath, df in dfs.items()}

output_metadata = {
"repo_url": cloned_repo.repo_url,
"commit_sha": cloned_repo.commit_sha,
}

output_value = []
output_metadata = {}

for fpath, data in parquet_bytes.items():
dataset_name = dataset_name_from_file_path(
file_path=fpath, sibling_paths=file_paths
)

output_value.append(
GitAssetForPlatform(
data=data,
name=dataset_name,
format=DataFormats.PARQUET,
metadata=output_metadata,
series_id=dataset_name,
)
)

output_metadata.update(
{
f"{dataset_name}_size_MiB": len(data) / (1024.0**2),
f"{dataset_name}_preview": MetadataValue.md(
dfs[fpath].head().to_markdown()
),
}
)

return Output(output_value, metadata=output_metadata)
41 changes: 38 additions & 3 deletions moderate/moderate/datasets/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import io
import json
import os
import pprint
import shutil
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from typing import IO, Any, ContextManager, Dict, Optional
from typing import IO, Any, ContextManager, Dict, List, Optional

import requests
from dagster import Output, get_dagster_logger, usable_as_dagster_type
from dagster import DagsterType, Output, get_dagster_logger, usable_as_dagster_type
from sh import git
from slugify import slugify

Expand Down Expand Up @@ -50,8 +51,32 @@ def file_name_slug(self) -> str:
GitAssetForPlatformDagsterType = usable_as_dagster_type(GitAssetForPlatform)


def is_list_of_git_asset_for_platform(_, value):
return isinstance(value, list) and all(
isinstance(i, GitAssetForPlatform) for i in value
)


ListOfGitAssetForPlatformDagsterType = DagsterType(
name="ListOfGitAssetForPlatform",
type_check_fn=is_list_of_git_asset_for_platform,
description="A list of GitAssetForPlatform objects",
)


def dataset_name_from_file_path(
file_path: str, sibling_paths: Optional[List[str]] = None
) -> str:
common_prefix = os.path.commonprefix(sibling_paths) if sibling_paths else ""
file_path = file_path.replace(common_prefix, "", 1)
path_part, _ = os.path.splitext(file_path)
return slugify(path_part)


@contextmanager
def clone_git_repo(config: GitRepo) -> ContextManager[ClonedRepo]:
def clone_git_repo(
config: GitRepo, lfs: bool = False, lfs_globs: Optional[List[str]] = None
) -> ContextManager[ClonedRepo]:
"""Context manager that clones a git repository and yields the temporal directory."""

git_dir = str(uuid.uuid4())
Expand All @@ -65,6 +90,16 @@ def clone_git_repo(config: GitRepo) -> ContextManager[ClonedRepo]:
git.reset("--hard", config.tree_ish, _cwd=git_dir)
commit_sha = git("rev-parse", "HEAD", _cwd=git_dir).strip()

if lfs and not lfs_globs:
logger.warning("LFS is enabled but no globs are provided")

if lfs and lfs_globs:
git.lfs.install(_cwd=git_dir)
logger.info("Fetching LFS objects: %s", lfs_globs)
git.lfs.fetch("-I", ",".join(lfs_globs), _cwd=git_dir)
logger.info("Checking out LFS objects: %s", lfs_globs)
git.checkout(config.tree_ish, "--", *lfs_globs, _cwd=git_dir)

yield ClonedRepo(
repo_url=config.repo_url, repo_dir=git_dir, commit_sha=commit_sha
)
Expand Down

0 comments on commit f92b8ca

Please sign in to comment.