diff --git a/buildstockbatch/base.py b/buildstockbatch/base.py index b646267d..1cf31033 100644 --- a/buildstockbatch/base.py +++ b/buildstockbatch/base.py @@ -871,7 +871,9 @@ def process_results(self, skip_combine=False, force_upload=False): aws_conf = self.cfg.get('postprocessing', {}).get('aws', {}) if 's3' in aws_conf or force_upload: - s3_bucket, s3_prefix = postprocessing.upload_results(aws_conf, self.output_dir, self.results_dir) + s3_bucket, s3_prefix = postprocessing.upload_results( + aws_conf, self.output_dir, self.results_dir, self.sampler.csv_path + ) if 'athena' in aws_conf: postprocessing.create_athena_tables(aws_conf, os.path.basename(self.output_dir), s3_bucket, s3_prefix) diff --git a/buildstockbatch/postprocessing.py b/buildstockbatch/postprocessing.py index 8962b8b0..9d5e02f0 100644 --- a/buildstockbatch/postprocessing.py +++ b/buildstockbatch/postprocessing.py @@ -592,10 +592,9 @@ def remove_intermediate_files(fs, results_dir, keep_individual_timeseries=False) fs.rm(ts_in_dir, recursive=True) -def upload_results(aws_conf, output_dir, results_dir): +def upload_results(aws_conf, output_dir, results_dir, buildstock_csv_filename): logger.info("Uploading the parquet files to s3") - buildstock_dir = Path(results_dir).parent.joinpath('housing_characteristics') output_folder_name = Path(output_dir).name parquet_dir = Path(results_dir).joinpath('parquet') ts_dir = parquet_dir / 'timeseries' @@ -608,9 +607,6 @@ def upload_results(aws_conf, output_dir, results_dir): all_files.append(file.relative_to(parquet_dir)) for file in [*ts_dir.glob('_common_metadata'), *ts_dir.glob('_metadata')]: all_files.append(file.relative_to(parquet_dir)) - buildstock_csv = '' - for file in buildstock_dir.glob('buildstock.csv'): - buildstock_csv = file.relative_to(buildstock_dir) s3_prefix = aws_conf.get('s3', {}).get('prefix', '').rstrip('/') s3_bucket = aws_conf.get('s3', {}).get('bucket', None) @@ -626,23 +622,24 @@ def upload_results(aws_conf, output_dir, results_dir): logger.error(f"There are already {n_existing_files} files in the s3 folder {s3_bucket}/{s3_prefix_output}.") raise FileExistsError(f"s3://{s3_bucket}/{s3_prefix_output}") - def upload_file(filepath): - full_path = parquet_dir.joinpath(filepath) + def upload_file(filepath, s3key=None): + full_path = filepath if filepath.is_absolute() else parquet_dir.joinpath(filepath) s3 = boto3.resource('s3') bucket = s3.Bucket(s3_bucket) - s3key = Path(s3_prefix_output).joinpath(filepath).as_posix() - bucket.upload_file(str(full_path), str(s3key)) - - def upload_buildstock_csv(filepath): - full_path = buildstock_dir.joinpath(filepath) - s3 = boto3.resource('s3') - bucket = s3.Bucket(s3_bucket) - s3_prefix_output_new = s3_prefix_output+ '/' + 'buildstock_csv' + '/' - s3key = Path(s3_prefix_output_new).joinpath(filepath).as_posix() + if s3key is None: + s3key = Path(s3_prefix_output).joinpath(filepath).as_posix() bucket.upload_file(str(full_path), str(s3key)) tasks = list(map(dask.delayed(upload_file), all_files)) - tasks.append(dask.delayed(upload_buildstock_csv)(buildstock_csv)) + if buildstock_csv_filename is not None: + buildstock_csv_filepath = Path(buildstock_csv_filename) + if buildstock_csv_filepath.exists(): + tasks.append(dask.delayed(upload_file)( + buildstock_csv_filepath, + f"{s3_prefix_output}buildstock_csv/{buildstock_csv_filepath.name}" + )) + else: + logger.warning(f"{buildstock_csv_filename} doesn't exist, can't upload.") dask.compute(tasks) logger.info(f"Upload to S3 completed. The files are uploaded to: {s3_bucket}/{s3_prefix_output}") return s3_bucket, s3_prefix_output diff --git a/buildstockbatch/test/conftest.py b/buildstockbatch/test/conftest.py index b42cca05..5a1240bd 100644 --- a/buildstockbatch/test/conftest.py +++ b/buildstockbatch/test/conftest.py @@ -31,12 +31,8 @@ def _basic_residential_project_file(update_args={}, raw=False): shutil.move(os.path.join(output_directory, 'simulation_output', 'job0.json'), os.path.join(output_directory, 'simulation_output', '..', '..', 'job0.json')) - #os.mkdir(os.path.join(output_directory, 'housing_characteristics')) + os.mkdir(os.path.join(output_directory, 'housing_characteristics')) os.mkdir(os.path.join(buildstock_directory, project_directory, 'housing_characteristics')) - shutil.copytree( - os.path.join(os.path.dirname(os.path.abspath(__file__)), 'test_results', 'housing_characteristics'), - os.path.join(Path(output_directory).parent.joinpath('housing_characteristics')) - ) cfg = { 'buildstock_directory': buildstock_directory, 'project_directory': project_directory, diff --git a/buildstockbatch/test/test_base.py b/buildstockbatch/test/test_base.py index 8658db8a..5c9c0dfe 100644 --- a/buildstockbatch/test/test_base.py +++ b/buildstockbatch/test/test_base.py @@ -20,7 +20,7 @@ from buildstockbatch.local import LocalBatch from buildstockbatch.exc import ValidationError from buildstockbatch.postprocessing import write_dataframe_as_parquet -from buildstockbatch.utils import read_csv +from buildstockbatch.utils import read_csv, ContainerRuntime dask.config.set(scheduler='synchronous') here = os.path.dirname(os.path.abspath(__file__)) @@ -129,10 +129,16 @@ def test_upload_files(mocked_boto3, basic_residential_project_file): mocked_boto3.client = MagicMock(return_value=mocked_glueclient) mocked_boto3.resource().Bucket().objects.filter.side_effect = [[], ['a', 'b', 'c']] project_filename, results_dir = basic_residential_project_file(upload_config) + buildstock_csv_path = Path(results_dir).parent / 'openstudio_buildstock' / 'project_resstock_national' / 'housing_characteristics' / 'buildstock.csv' # noqa: E501 + shutil.copy2( + Path(__file__).parent / 'test_results' / 'housing_characteristics' / 'buildstock.csv', + buildstock_csv_path + ) with patch.object(BuildStockBatchBase, 'weather_dir', None), \ patch.object(BuildStockBatchBase, 'output_dir', results_dir), \ patch.object(BuildStockBatchBase, 'get_dask_client') as get_dask_client_mock, \ - patch.object(BuildStockBatchBase, 'results_dir', results_dir): + patch.object(BuildStockBatchBase, 'results_dir', results_dir), \ + patch.object(BuildStockBatchBase, 'CONTAINER_RUNTIME', ContainerRuntime.LOCAL_OPENSTUDIO): bsb = BuildStockBatchBase(project_filename) bsb.process_results() get_dask_client_mock.assert_called_once() @@ -169,7 +175,6 @@ def test_upload_files(mocked_boto3, basic_residential_project_file): # check if all the files are properly uploaded source_path = os.path.join(results_dir, 'parquet') - buildstock_dir = Path(results_dir).parent.joinpath('housing_characteristics') s3_path = s3_prefix + '/' + OUTPUT_FOLDER_NAME + '/' s3_file_path = s3_path + 'baseline/results_up00.parquet' @@ -203,9 +208,9 @@ def test_upload_files(mocked_boto3, basic_residential_project_file): files_uploaded.remove((source_file_path, s3_file_path)) s3_file_path = s3_path + 'buildstock_csv/buildstock.csv' - source_file_path_tst = os.path.join(buildstock_dir, 'buildstock.csv') - assert (source_file_path_tst, s3_file_path) in files_uploaded - files_uploaded.remove((source_file_path_tst, s3_file_path)) + source_file_path = str(buildstock_csv_path) + assert (source_file_path, s3_file_path) in files_uploaded + files_uploaded.remove((source_file_path, s3_file_path)) assert len(files_uploaded) == 0, f"These files shouldn't have been uploaded: {files_uploaded}"