Skip to content

Commit

Permalink
using csv_path from the sampler to locate the builstock.csv file
Browse files Browse the repository at this point in the history
  • Loading branch information
nmerket committed Oct 9, 2023
1 parent b911d9c commit aa4828f
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 29 deletions.
4 changes: 3 additions & 1 deletion buildstockbatch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,9 @@ def process_results(self, skip_combine=False, force_upload=False):

aws_conf = self.cfg.get('postprocessing', {}).get('aws', {})
if 's3' in aws_conf or force_upload:
s3_bucket, s3_prefix = postprocessing.upload_results(aws_conf, self.output_dir, self.results_dir)
s3_bucket, s3_prefix = postprocessing.upload_results(
aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path
)
if 'athena' in aws_conf:
postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix)

Expand Down
31 changes: 14 additions & 17 deletions buildstockbatch/postprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,10 +592,9 @@ def remove_intermediate_files(fs, results_dir, keep_individual_timeseries=False)
fs.rm(ts_in_dir, recursive=True)


def upload_results(aws_conf, output_dir, results_dir):
def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename):
logger.info("Uploading the parquet files to s3")

buildstock_dir = Path(results_dir).parent.joinpath('housing_characteristics')
output_folder_name = Path(output_dir).name
parquet_dir = Path(results_dir).joinpath('parquet')
ts_dir = parquet_dir / 'timeseries'
Expand All @@ -608,9 +607,6 @@ def upload_results(aws_conf, output_dir, results_dir):
all_files.append(file.relative_to(parquet_dir))
for file in [*ts_dir.glob('_common_metadata'), *ts_dir.glob('_metadata')]:
all_files.append(file.relative_to(parquet_dir))
buildstock_csv = ''
for file in buildstock_dir.glob('buildstock.csv'):
buildstock_csv = file.relative_to(buildstock_dir)

s3_prefix = aws_conf.get('s3', {}).get('prefix', '').rstrip('/')
s3_bucket = aws_conf.get('s3', {}).get('bucket', None)
Expand All @@ -626,23 +622,24 @@ def upload_results(aws_conf, output_dir, results_dir):
logger.error(f"There are already {n_existing_files} files in the s3 folder {s3_bucket}/{s3_prefix_output}.")
raise FileExistsError(f"s3://{s3_bucket}/{s3_prefix_output}")

def upload_file(filepath):
full_path = parquet_dir.joinpath(filepath)
def upload_file(filepath, s3key=None):
full_path = filepath if filepath.is_absolute() else parquet_dir.joinpath(filepath)
s3 = boto3.resource('s3')
bucket = s3.Bucket(s3_bucket)
s3key = Path(s3_prefix_output).joinpath(filepath).as_posix()
bucket.upload_file(str(full_path), str(s3key))

def upload_buildstock_csv(filepath):
full_path = buildstock_dir.joinpath(filepath)
s3 = boto3.resource('s3')
bucket = s3.Bucket(s3_bucket)
s3_prefix_output_new = s3_prefix_output+ '/' + 'buildstock_csv' + '/'
s3key = Path(s3_prefix_output_new).joinpath(filepath).as_posix()
if s3key is None:
s3key = Path(s3_prefix_output).joinpath(filepath).as_posix()
bucket.upload_file(str(full_path), str(s3key))

tasks = list(map(dask.delayed(upload_file), all_files))
tasks.append(dask.delayed(upload_buildstock_csv)(buildstock_csv))
if buildstock_csv_filename is not None:
buildstock_csv_filepath = Path(buildstock_csv_filename)
if buildstock_csv_filepath.exists():
tasks.append(dask.delayed(upload_file)(
buildstock_csv_filepath,
f"{s3_prefix_output}buildstock_csv/{buildstock_csv_filepath.name}"
))
else:
logger.warning(f"{buildstock_csv_filename} doesn't exist, can't upload.")
dask.compute(tasks)
logger.info(f"Upload to S3 completed. The files are uploaded to: {s3_bucket}/{s3_prefix_output}")
return s3_bucket, s3_prefix_output
Expand Down
6 changes: 1 addition & 5 deletions buildstockbatch/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ def _basic_residential_project_file(update_args={}, raw=False):
shutil.move(os.path.join(output_directory, 'simulation_output', 'job0.json'),
os.path.join(output_directory, 'simulation_output', '..', '..', 'job0.json'))

#os.mkdir(os.path.join(output_directory, 'housing_characteristics'))
os.mkdir(os.path.join(output_directory, 'housing_characteristics'))
os.mkdir(os.path.join(buildstock_directory, project_directory, 'housing_characteristics'))
shutil.copytree(
os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results', 'housing_characteristics'),
os.path.join(Path(output_directory).parent.joinpath('housing_characteristics'))
)
cfg = {
'buildstock_directory': buildstock_directory,
'project_directory': project_directory,
Expand Down
17 changes: 11 additions & 6 deletions buildstockbatch/test/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from buildstockbatch.local import LocalBatch
from buildstockbatch.exc import ValidationError
from buildstockbatch.postprocessing import write_dataframe_as_parquet
from buildstockbatch.utils import read_csv
from buildstockbatch.utils import read_csv, ContainerRuntime

dask.config.set(scheduler='synchronous')
here = os.path.dirname(os.path.abspath(__file__))
Expand Down Expand Up @@ -129,10 +129,16 @@ def test_upload_files(mocked_boto3, basic_residential_project_file):
mocked_boto3.client = MagicMock(return_value=mocked_glueclient)
mocked_boto3.resource().Bucket().objects.filter.side_effect = [[], ['a', 'b', 'c']]
project_filename, results_dir = basic_residential_project_file(upload_config)
buildstock_csv_path = Path(results_dir).parent / 'openstudio_buildstock' / 'project_resstock_national' / 'housing_characteristics' / 'buildstock.csv' # noqa: E501
shutil.copy2(
Path(__file__).parent / 'test_results' / 'housing_characteristics' / 'buildstock.csv',
buildstock_csv_path
)
with patch.object(BuildStockBatchBase, 'weather_dir', None), \
patch.object(BuildStockBatchBase, 'output_dir', results_dir), \
patch.object(BuildStockBatchBase, 'get_dask_client') as get_dask_client_mock, \
patch.object(BuildStockBatchBase, 'results_dir', results_dir):
patch.object(BuildStockBatchBase, 'results_dir', results_dir), \
patch.object(BuildStockBatchBase, 'CONTAINER_RUNTIME', ContainerRuntime.LOCAL_OPENSTUDIO):
bsb = BuildStockBatchBase(project_filename)
bsb.process_results()
get_dask_client_mock.assert_called_once()
Expand Down Expand Up @@ -169,7 +175,6 @@ def test_upload_files(mocked_boto3, basic_residential_project_file):

# check if all the files are properly uploaded
source_path = os.path.join(results_dir, 'parquet')
buildstock_dir = Path(results_dir).parent.joinpath('housing_characteristics')
s3_path = s3_prefix + '/' + OUTPUT_FOLDER_NAME + '/'

s3_file_path = s3_path + 'baseline/results_up00.parquet'
Expand Down Expand Up @@ -203,9 +208,9 @@ def test_upload_files(mocked_boto3, basic_residential_project_file):
files_uploaded.remove((source_file_path, s3_file_path))

s3_file_path = s3_path + 'buildstock_csv/buildstock.csv'
source_file_path_tst = os.path.join(buildstock_dir, 'buildstock.csv')
assert (source_file_path_tst, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path_tst, s3_file_path))
source_file_path = str(buildstock_csv_path)
assert (source_file_path, s3_file_path) in files_uploaded
files_uploaded.remove((source_file_path, s3_file_path))

assert len(files_uploaded) == 0, f"These files shouldn't have been uploaded: {files_uploaded}"

Expand Down

0 comments on commit aa4828f

Please sign in to comment.