CC0 - CC0 enables scientists, educators, artists and other creators and owners of copyright- or + database-protected content to waive those interests in their works and thereby place them as completely as + possible in the public domain, so that others may freely build upon, enhance and reuse the works for any + purposes without restriction under copyright or database law.https://creativecommons.org/share-your-work/public-domain/cc0/ +
+ + + + \ No newline at end of file diff --git a/dataproc/processors/core/isimp_drought/templates/version_1/license.html b/dataproc/processors/core/isimp_drought/templates/version_1/license.html new file mode 100644 index 0000000..fc55292 --- /dev/null +++ b/dataproc/processors/core/isimp_drought/templates/version_1/license.html @@ -0,0 +1,20 @@ + + + + +World Resource Institute - Aqueduct Flood Hazard Maps are made available under the Creative Commons Attribution + License International (4.0): ./license.html or https://creativecommons.org/licenses/by/4.0/
+ + + + \ No newline at end of file diff --git a/dataproc/processors/core/isimp_drought/version_1.py b/dataproc/processors/core/isimp_drought/version_1.py new file mode 100644 index 0000000..d89b005 --- /dev/null +++ b/dataproc/processors/core/isimp_drought/version_1.py @@ -0,0 +1,297 @@ +""" +ISIMP Drought V1 Processor +""" + +import os +import inspect +import shutil +from typing import List + +from dataproc import DataPackageLicense +from dataproc.exceptions import ProcessorDatasetExists +from dataproc.processors.internal.base import BaseProcessorABC, BaseMetadataABC +from dataproc.helpers import ( + processor_name_from_file, + tiffs_in_folder, + version_name_from_file, + crop_raster, + assert_geotiff, + data_file_hash, + data_file_size, + generate_index_file, + generate_datapackage, + generate_license_file, + fetch_zenodo_doi, + output_filename, + unpack_zip, +) +from .helpers import VERSION_1_SOURCE_FILES + + +class Metadata(BaseMetadataABC): + """ + Processor metadata + """ + + name = processor_name_from_file( + inspect.stack()[1].filename + ) # this must follow snakecase formatting, without special chars + description = "ISIMP Drought v1 processor" # Longer processor description + version = version_name_from_file( + inspect.stack()[1].filename + ) # Version of the Processor + dataset_name = "ISIMP Drought" # The dataset this processor targets + data_author = "Lange, S., Volkholz, J., Geiger, T., Zhao, F., Vega, I., Veldkamp, T., et al. (2020)" + data_title = "ISIMP Drought" + data_title_long = "Annual probability of extreme heat and drought events, derived from Lange et al 2020" + data_summary = """ +The time series of extreme events given by Lange et al has been processed into an annual probability of occurrence by researchers at the University of Oxford, using the pipeline available online at https://github.com/nismod/infra-risk-vis/blob/45d8974c311067141ee6fcaa1321c7ecdaa59752/etl/pipelines/isimip/Snakefile - this is a draft dataset, used for visualisation in https://global.infrastructureresilience.org/ but not otherwise reviewed or published. + +If you use this, please cite: Lange, S., Volkholz, J., Geiger, T., Zhao, F., Vega, I., Veldkamp, T., et al. (2020). Projecting exposure to extreme climate impact events across six event categories and three spatial scales. Earth's Future, 8, e2020EF001616. DOI 10.1029/2020EF001616 + +This is shared under a CC0 1.0 Universal Public Domain Dedication (CC0 1.0) When using ISIMIP data for your research, please appropriately credit the data providers, e.g. either by citing the DOI for the dataset, or by appropriate acknowledgment. + +Annual probability of drought (soil moisture below a baseline threshold) or extreme heat (temperature and humidity-based indicators over a threshold) events on a 0.5° grid. 8 hydrological models forced by 4 GCMs under baseline, RCP 2.6 & 6.0 emission scenarios. Current and future maps in 2030, 2050 and 2080. + +The ISIMIP2b climate input data and impact model output data analyzed in this study are available in the ISIMIP data repository at ESGF, see https://esg.pik-potsdam.de/search/isimip/?project=ISIMIP2b&product=input and https://esg.pik-potsdam.de/search/isimip/?project=ISIMIP2b&product=output, respectively. More information about the GHM, GGCM, and GVM output data is provided by Gosling et al. (2020), Arneth et al. (2020), and Reyer et al. (2019), respectively. + +Event definitions are given in Lange et al, table 1. Land area is exposed to drought if monthly soil moisture falls below the 2.5th percentile of the preindustrial baseline distribution for at least seven consecutive months. Land area is exposed to extreme heat if both a relative indicator based on temperature (Russo et al 2015, 2017) and an absolute indicator based on temperature and relative humidity (Masterton & Richardson, 1979) exceed their respective threshold value. + """ + data_citation = """ +Lange, S., Volkholz, J., Geiger, T., Zhao, F., Vega, I., Veldkamp, T., et al. (2020). Projecting exposure to extreme climate impact events across six event categories and three spatial scales. Earth's Future, 8, e2020EF001616. DOI 10.1029/2020EF001616 + """ + data_license = DataPackageLicense( + name="CC0", + title="CC0", + path="https://creativecommons.org/share-your-work/public-domain/cc0/", + ) + data_origin_url = "https://doi.org/10.5281/zenodo.7732393" + data_formats = ["GeoTIFF"] + + +class Processor(BaseProcessorABC): + """A Processor for ISIMP Drought V1""" + + zenodo_doi = "10.5281/zenodo.7732393" + source_files = VERSION_1_SOURCE_FILES + total_expected_files = len(source_files) + index_filename = "index.html" + license_filename = "license.html" + + def exists(self): + """Whether all output files for a given processor & boundary exist on the FS on not""" + try: + count_on_backend = self.storage_backend.count_boundary_data_files( + self.boundary["name"], + self.metadata.name, + self.metadata.version, + datafile_ext=".tif", + ) + except FileNotFoundError: + return False + return count_on_backend == self.total_expected_files + + def generate(self): + """Generate files for a given processor""" + if self.exists() is True: + raise ProcessorDatasetExists() + else: + # Ensure we start with a blank output folder on the storage backend + try: + self.storage_backend.remove_boundary_data_files( + self.boundary["name"], + self.metadata.name, + self.metadata.version, + ) + except FileNotFoundError: + pass + # Check if the source TIFF exists and fetch it if not + self.update_progress(10, "fetching and verifying source") + source_fpaths = self._fetch_source() + + self.log.debug("%s - cropping source", self.metadata.name) + results_fpaths = [] + for idx, source_fpath in enumerate(source_fpaths): + self.update_progress( + 10 + int(idx * (80 / len(source_fpaths))), "cropping source" + ) + + subfilename = os.path.splitext(os.path.basename(source_fpath))[0] + file_format = os.path.splitext(os.path.basename(source_fpath))[1] + + output_fpath = os.path.join( + self.tmp_processing_folder, + output_filename( + self.metadata.name, + self.metadata.version, + self.boundary["name"], + file_format, + dataset_subfilename=subfilename, + ), + ) + crop_success = crop_raster(source_fpath, output_fpath, self.boundary) + + self.log.debug( + "%s crop %s - success: %s", + self.metadata.name, + os.path.basename(source_fpath), + crop_success, + ) + if crop_success: + results_fpaths.append( + { + "fpath": output_fpath, + "hash": data_file_hash(output_fpath), + "size": data_file_size(output_fpath), + } + ) + # Check results look sensible + assert ( + len(results_fpaths) == self.total_expected_files + ), f"{self.metadata.name} - number of successfully cropped files {len(results_fpaths)} do not match expected {self.total_expected_files}" + + self.update_progress(85, "moving result") + self.log.debug("%s - moving cropped data to backend", self.metadata.name) + result_uris = [] + for result in results_fpaths: + result_uri = self.storage_backend.put_processor_data( + result["fpath"], + self.boundary["name"], + self.metadata.name, + self.metadata.version, + ) + result_uris.append(result_uri) + self.provenance_log[f"{self.metadata.name} - move to storage success"] = ( + len(result_uris) == self.total_expected_files + ) + self.provenance_log[f"{self.metadata.name} - result URIs"] = ",".join( + result_uris + ) + + # Generate documentation on backend + self.update_progress(90, "generate documentation & datapackage") + self.generate_documentation() + + # Generate datapackage in log (using directory for URI) + datapkg = generate_datapackage( + self.metadata, + result_uris, + "GeoTIFF", + [i["size"] for i in results_fpaths], + [i["hash"] for i in results_fpaths], + ) + self.provenance_log["datapackage"] = datapkg + self.log.debug( + "%s generated datapackage in log: %s", self.metadata.name, datapkg + ) + + return self.provenance_log + + def generate_documentation(self): + """Generate documentation for the processor + on the result backend""" + # Generate Documentation + index_fpath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "templates", + self.metadata.version, + self.index_filename, + ) + index_create = generate_index_file( + self.storage_backend, index_fpath, self.boundary["name"], self.metadata + ) + self.provenance_log[ + f"{self.metadata.name} - created index documentation" + ] = index_create + license_fpath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "templates", + self.metadata.version, + self.license_filename, + ) + license_create = generate_license_file( + self.storage_backend, license_fpath, self.boundary["name"], self.metadata + ) + self.provenance_log[ + f"{self.metadata.name} - created license documentation" + ] = license_create + self.log.debug("%s generated documentation on backend", self.metadata.name) + + def _fetch_source(self) -> List[str]: + """ + Fetch and unpack the required source data if required. + + ::returns source_fpaths List[str] Filepaths of all source data + """ + # Build Source Path + os.makedirs(self.source_folder, exist_ok=True) + if self._all_source_exists(): + self.log.debug( + "%s - all source files appear to exist and are valid", + self.metadata.name, + ) + return [ + os.path.join(self.source_folder, _file) for _file in self.source_files + ] + else: + downloaded_files = fetch_zenodo_doi( + self.zenodo_doi, self.source_folder, return_only_tifs=False + ) + # Should be only one zip + try: + downloaded_zip = [ + i + for i in downloaded_files + if os.path.basename(i) == "lange2020_expected_occurrence.zip" + ][0] + except IndexError: + self.log.error( + "after %s download - required zip file lange2020_expected_occurrence.zip was not present", + self.metadata.name, + ) + raise Exception(f"{self.metadata.name} download failed") + # Unpack zip + unpack_zip(downloaded_zip, self.source_folder) + # Moved nested tiffs up to source folder + for tiff_fpath in tiffs_in_folder( + os.path.join(self.source_folder, "lange2020_expected_occurrence"), + full_paths=True, + ): + shutil.move(tiff_fpath, self.source_folder) + shutil.rmtree( + os.path.join(self.source_folder, "lange2020_expected_occurrence"), + ignore_errors=True, + ) + # Count the Tiffs + self.log.debug("%s - Download Complete", self.metadata.name) + assert ( + self._all_source_exists() + ), f"after {self.metadata.name} download - not all source files were present" + # Filter to just the files we support + return [ + os.path.join(self.source_folder, _file) for _file in self.source_files + ] + + def _all_source_exists(self, remove_invalid=True) -> bool: + """ + Check if all source files exist and are valid + If not source will be removed + """ + source_valid = [True for _ in range(len(self.source_files))] + for idx, _file in enumerate(self.source_files): + fpath = os.path.join(self.source_folder, _file) + try: + assert_geotiff(fpath, check_compression=False, check_crs=None) + except Exception as err: + # remove the file and flag we should need to re-fetch, then move on + self.log.warning( + "%s source file %s appears to be invalid due to %s", + self.metadata.name, + fpath, + err, + ) + if remove_invalid: + if os.path.exists(fpath): + os.remove(fpath) + source_valid[idx] = False + return all(source_valid) diff --git a/dataproc/processors/core/jrc_ghsl_built_c/r2022_epoch2018_10m_mszfun.py b/dataproc/processors/core/jrc_ghsl_built_c/r2022_epoch2018_10m_mszfun.py index afb9d03..72f9434 100644 --- a/dataproc/processors/core/jrc_ghsl_built_c/r2022_epoch2018_10m_mszfun.py +++ b/dataproc/processors/core/jrc_ghsl_built_c/r2022_epoch2018_10m_mszfun.py @@ -8,6 +8,7 @@ from typing import List from celery.app import task +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import ( BaseProcessorABC, @@ -25,8 +26,8 @@ generate_index_file, generate_license_file, generate_datapackage, + output_filename ) -from dataproc.exceptions import FolderNotFoundException from dataproc.processors.core.jrc_ghsl_built_c.helpers import JRCBuiltCFetcher @@ -39,20 +40,58 @@ class Metadata(BaseMetadataABC): inspect.stack()[1].filename ) # this must follow snakecase formatting, without special chars description = """ - A Processor for JRC GHSL Built-Up Characteristics - - R2022 release, Epoch 2018, 10m resolution, Morphological Settlement Zone and Functional classification +A Processor for JRC GHSL Built-Up Characteristics - +R2022 release, Epoch 2018, 10m resolution, Morphological Settlement Zone and Functional classification """ # Longer processor description version = version_name_from_file( inspect.stack()[1].filename ) # Version of the Processor dataset_name = "r2022_epoch2018_10m_mszfun" # The dataset this processor targets data_author = "Joint Research Centre" + data_title = "GHS-BUILT-C MSZ and FC, R2022 E2018 10m" + data_title_long = "JRC Global Human Settlement Layer - Built-Up Characteristics (GHS-BUILT-C - MSZ & FC) - Release 2022 - Epoch 2018 - 10m resolution - Morphological Settlement Zone & Functional Classification" + data_summary = """ +The spatial raster dataset delineates the boundaries of the human settlements at +10m resolution, and describe their inner characteristics in terms of the +morphology of the built environment and the functional use. The Morphological +Settlement Zone (MSZ) delineates the spatial domain of all the human settlements +at the neighboring scale of approx. 100m, based on the spatial generalization of +the built-up surface fraction (BUFRAC) function. The objective is to fill the +open spaces that are surrounded by large patches of built space. MSZ, open +spaces, and built spaces basic class abstractions are derived by mathematical +morphology spatial filtering (opening, closing, regional maxima) from the BUFRAC +function. They are further classified according to the information regarding +vegetation intensity (GHS-BUILT-C_VEG_GLOBE_R2022A), water surfaces +(GHS_LAND_GLOBE_R2022A), road surfaces (OSM highways), functional use +(GHS-BUILT-C_FUN_GLOBE_R2022A), and building height (GHS-BUILT-H_GLOBE_R2022A). + +The main characteristics of this dataset are listed below. The complete +information about the GHSL main products can be found in the GHSL Data Package +2022 report (10.33 MB): +https://ghsl.jrc.ec.europa.eu/documents/GHSL_Data_Package_2022.pdf + """ + data_citation = """ +Dataset: + +Pesaresi M., P. Panagiotis (2022): GHS-BUILT-C R2022A - GHS Settlement +Characteristics, derived from Sentinel2 composite (2018) and other GHS R2022A +data.European Commission, Joint Research Centre (JRC) PID: +http://data.europa.eu/89h/dde11594-2a66-4c1b-9a19-821382aed36e, +doi:10.2905/DDE11594-2A66-4C1B-9A19-821382AED36E + +Concept & Methodology: + +Schiavina M., Melchiorri M., Pesaresi M., Politis P., Freire S., Maffenini L., +Florio P., Ehrlich D., Goch K., Tommasi P., Kemper T. GHSL Data Package 2022, +JRC 129516, ISBN 978-92-76-53071-8 doi:10.2760/19817 + """ data_license = DataPackageLicense( name="CC-BY-4.0", title="Creative Commons Attribution 4.0", path="https://creativecommons.org/licenses/by/4.0/", ) data_origin_url = "https://ghsl.jrc.ec.europa.eu/download.php?ds=builtC" + data_formats = ["GeoTIFF"] class Processor(BaseProcessorABC): @@ -88,15 +127,14 @@ def exists(self): self.metadata.version, datafile_ext=".tif", ) - except FolderNotFoundException: + except FileNotFoundError: return False return count_on_backend == self.total_expected_files def generate(self): """Generate files for a given processor""" if self.exists() is True: - self.provenance_log[self.metadata.name] = "exists" - return self.provenance_log + raise ProcessorDatasetExists() else: # Ensure we start with a blank output folder on the storage backend try: @@ -105,10 +143,8 @@ def generate(self): self.metadata.name, self.metadata.version, ) - except FolderNotFoundException: + except FileNotFoundError: pass - # Cleanup anything in tmp processing - self._clean_tmp_processing() # Check if the source TIFF exists and fetch it if not self.log.debug( "%s - collecting source geotiffs into %s", @@ -122,14 +158,27 @@ def generate(self): results_fpaths = [] for idx, source_fpath in enumerate(source_fpaths): self.update_progress( - 10 + int(idx * (80 / len(source_fpaths))), "cropping source" + 20 + int(idx * (80 / len(source_fpaths))), "cropping source" ) output_fpath = os.path.join( self.tmp_processing_folder, os.path.basename(source_fpath) ) - # Crop Source - preserve Molleweide + output_fpath = os.path.join( + self.tmp_processing_folder, + output_filename( + self.metadata.name, + self.metadata.version, + self.boundary["name"], + 'tif', + dataset_subfilename=os.path.splitext(os.path.basename(source_fpath))[0] + ) + ) + # Crop Source - preserve Molleweide, assume we'll need BIGTIFF for this dataset crop_success = crop_raster( - source_fpath, output_fpath, self.boundary, preserve_raster_crs=True + source_fpath, + output_fpath, + self.boundary, + creation_options=["COMPRESS=DEFLATE", "PREDICTOR=2", "ZLEVEL=6", "BIGTIFF=YES"], ) self.log.debug( "%s %s - success: %s", @@ -137,6 +186,9 @@ def generate(self): os.path.basename(source_fpath), crop_success, ) + self.update_progress( + 20 + int(idx * (80 / len(source_fpaths))), "generating hash" + ) if crop_success: results_fpaths.append( { @@ -221,14 +273,6 @@ def generate_documentation(self): ] = license_create self.log.debug("%s generated documentation on backend", self.metadata.name) - def _clean_tmp_processing(self): - """Remove the tmp processing folder and recreate""" - # Remove partial previous tmp results if they exist - if os.path.exists(self.tmp_processing_folder): - shutil.rmtree(self.tmp_processing_folder) - # Generate the tmp output directory - os.makedirs(self.tmp_processing_folder, exist_ok=True) - def _fetch_source(self) -> List[str]: """ Fetch and unpack the required source data if required. diff --git a/dataproc/processors/core/jrc_ghsl_population/r2022_epoch2020_1km.py b/dataproc/processors/core/jrc_ghsl_population/r2022_epoch2020_1km.py index 7d0b921..7c36a17 100644 --- a/dataproc/processors/core/jrc_ghsl_population/r2022_epoch2020_1km.py +++ b/dataproc/processors/core/jrc_ghsl_population/r2022_epoch2020_1km.py @@ -5,6 +5,7 @@ import os import inspect import shutil +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import ( BaseProcessorABC, @@ -21,8 +22,8 @@ generate_datapackage, generate_index_file, generate_license_file, + output_filename ) -from dataproc.exceptions import FolderNotFoundException from dataproc.processors.core.jrc_ghsl_population.helpers import JRCPopFetcher @@ -40,12 +41,43 @@ class Metadata(BaseMetadataABC): ) # Version of the Processor dataset_name = "r2022_epoch2020_1km" # The dataset this processor targets data_author = "Joint Research Centre" + data_title = "GHS-POP - R2022A" + data_title_long = "GHS-POP R2022A - extract from GHS population grid for 2020, 1km resolution" + data_summary = """ +The spatial raster dataset depicts the distribution of residential population, +expressed as the number of people per cell. Residential population estimates +between 1975 and 2020 in 5-year intervals and projections to 2025 and 2030 +derived from CIESIN GPWv4.11 were disaggregated from census or administrative +units to grid cells, informed by the distribution, density, and classification +of built-up as mapped in the Global Human Settlement Layer (GHSL) global layer +per corresponding epoch. + +The complete information about the GHSL main products can be found in the GHSL +Data Package 2022 report (10.33 MB): +https://ghsl.jrc.ec.europa.eu/documents/GHSL_Data_Package_2022.pdf + """ + data_citation = """ +Dataset: + +Schiavina M., Freire S., MacManus K. (2022): GHS-POP R2022A - GHS population +grid multitemporal (1975-2030).European Commission, Joint Research Centre (JRC) +PID: http://data.europa.eu/89h/d6d86a90-4351-4508-99c1-cb074b022c4a, +doi:10.2905/D6D86A90-4351-4508-99C1-CB074B022C4A + +Concept & Methodology: + +Freire S., MacManus K., Pesaresi M., Doxsey-Whitfield E., Mills J. (2016) +Development of new open and free multi-temporal global population grids at 250 m +resolution. Geospatial Data in a Changing World; Association of Geographic +Information Laboratories in Europe (AGILE), AGILE 2016 + """ data_license = DataPackageLicense( name="CC-BY-4.0", title="Creative Commons Attribution 4.0", path="https://creativecommons.org/licenses/by/4.0/", ) data_origin_url = "https://ghsl.jrc.ec.europa.eu/download.php?ds=pop" + data_formats = ["GeoTIFF"] class Processor(BaseProcessorABC): @@ -66,15 +98,14 @@ def exists(self): self.metadata.version, datafile_ext=".tif", ) - except FolderNotFoundException: + except FileNotFoundError: return False return count_on_backend == self.total_expected_files def generate(self): """Generate files for a given processor""" if self.exists() is True: - self.provenance_log[self.metadata.name] = "exists" - return self.provenance_log + raise ProcessorDatasetExists() else: # Ensure we start with a blank output folder on the storage backend try: @@ -83,10 +114,8 @@ def generate(self): self.metadata.name, self.metadata.version, ) - except FolderNotFoundException: + except FileNotFoundError: pass - # Cleanup anything in tmp processing - self._clean_tmp_processing() # Check if the source TIFF exists and fetch it if not self.log.debug( "%s - collecting source geotiffs into %s", @@ -96,13 +125,15 @@ def generate(self): self.update_progress(10, "fetching and verifying source") source_fpath = self._fetch_source() output_fpath = os.path.join( - self.tmp_processing_folder, os.path.basename(source_fpath) + self.tmp_processing_folder, + output_filename(self.metadata.name, self.metadata.version, self.boundary["name"], 'tif') ) # Crop Source - preserve Molleweide self.update_progress(50, "cropping source") self.log.debug("%s - cropping source", self.metadata.name) crop_success = crop_raster( - source_fpath, output_fpath, self.boundary, preserve_raster_crs=True + source_fpath, output_fpath, self.boundary, + creation_options=["COMPRESS=PACKBITS"] # This fails for jrc pop with higher compression ) self.log.debug( "%s %s - success: %s", @@ -170,14 +201,6 @@ def generate_documentation(self): ] = license_create self.log.debug("%s generated documentation on backend", self.metadata.name) - def _clean_tmp_processing(self): - """Remove the tmp processing folder and recreate""" - # Remove partial previous tmp results if they exist - if os.path.exists(self.tmp_processing_folder): - shutil.rmtree(self.tmp_processing_folder) - # Generate the tmp output directory - os.makedirs(self.tmp_processing_folder, exist_ok=True) - def _fetch_source(self) -> str: """ Fetch and unpack the required source data if required. diff --git a/dataproc/processors/core/natural_earth_raster/version_1.py b/dataproc/processors/core/natural_earth_raster/version_1.py index e1ed5e0..429be64 100644 --- a/dataproc/processors/core/natural_earth_raster/version_1.py +++ b/dataproc/processors/core/natural_earth_raster/version_1.py @@ -6,6 +6,7 @@ import inspect from dataproc import DataPackageLicense +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import BaseProcessorABC, BaseMetadataABC from dataproc.helpers import ( processor_name_from_file, @@ -19,6 +20,7 @@ generate_license_file, data_file_hash, data_file_size, + output_filename ) @@ -38,12 +40,17 @@ class Metadata(BaseMetadataABC): ) # Version of the Processor dataset_name = "natural_earth_raster" # The dataset this processor targets data_author = "Natural Earth Data" + data_title = "" + data_title_long = "" + data_summary = "" + data_citation = "" data_license = DataPackageLicense( name="CC-BY-4.0", title="Creative Commons Attribution 4.0", path="https://creativecommons.org/licenses/by/4.0/", ) data_origin_url = "https://www.naturalearthdata.com/downloads/50m-natural-earth-2/50m-natural-earth-ii-with-shaded-relief/" + data_formats = ["GeoTIFF"] class Processor(BaseProcessorABC): @@ -62,28 +69,22 @@ def exists(self): self.boundary["name"], self.metadata.name, self.metadata.version, - f"{self.boundary['name']}.tif", + output_filename(self.metadata.name, self.metadata.version, self.boundary["name"], 'tif') ) def generate(self): """Generate files for a given processor""" if self.exists() is True: - self.provenance_log[self.metadata.name] = "exists" - return self.provenance_log + raise ProcessorDatasetExists() # Check if the source TIFF exists and fetch it if not self.update_progress(10,"fetching and verifying source") geotiff_fpath = self._fetch_source() # Crop to given boundary self.update_progress(50,"cropping source") - output_folder = self.paths_helper.build_absolute_path( - self.boundary["name"], - "natural_earth_raster", - self.metadata.version, - "outputs", + output_fpath = os.path.join( + self.tmp_processing_folder, + output_filename(self.metadata.name, self.metadata.version, self.boundary["name"], 'tif') ) - os.makedirs(output_folder, exist_ok=True) - - output_fpath = os.path.join(output_folder, f"{self.boundary['name']}.tif") self.log.debug("Natural earth raster - cropping geotiff") crop_success = crop_raster(geotiff_fpath, output_fpath, self.boundary) self.provenance_log[f"{self.metadata.name} - crop success"] = crop_success diff --git a/dataproc/processors/core/natural_earth_vector/version_1.py b/dataproc/processors/core/natural_earth_vector/version_1.py index 85bcfc5..8ff5188 100644 --- a/dataproc/processors/core/natural_earth_vector/version_1.py +++ b/dataproc/processors/core/natural_earth_vector/version_1.py @@ -8,6 +8,7 @@ import sqlalchemy as sa from dataproc import DataPackageLicense +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import BaseProcessorABC, BaseMetadataABC from dataproc.helpers import ( processor_name_from_file, @@ -20,7 +21,8 @@ generate_datapackage, generate_index_file, data_file_hash, - data_file_size + data_file_size, + output_filename ) from config import ( get_db_uri_ogr, @@ -45,7 +47,10 @@ class Metadata(BaseMetadataABC): "natural_earth_vector_roads" # The dataset this processor targets ) data_author = "Natural Earth Data" - data_license = "" + data_title = "" + data_title_long = "" + data_summary = "" + data_citation = "" data_license = DataPackageLicense( name="Natural Earth", title="Natural Earth", @@ -54,6 +59,7 @@ class Metadata(BaseMetadataABC): data_origin_url = ( "https://www.naturalearthdata.com/downloads/10m-cultural-vectors/roads/" ) + data_formats = ["Geopackage"] class Processor(BaseProcessorABC): @@ -81,23 +87,21 @@ def exists(self): self.boundary["name"], self.metadata.name, self.metadata.version, - f"{self.boundary['name']}.gpkg", + output_filename(self.metadata.name, self.metadata.version, self.boundary["name"], 'gpkg'), ) def generate(self): """Generate files for a given processor""" if self.exists() is True: - self.provenance_log[self.metadata.name] = "exists" - return self.provenance_log + raise ProcessorDatasetExists() # Check if the source exists and fetch it if not self.update_progress(10, "fetching and verifying source") pg_table_name = self._fetch_source() # Crop to given boundary - output_folder = self.paths_helper.build_absolute_path( - self.boundary["name"], self.metadata.name, self.metadata.version, "outputs" + output_fpath = os.path.join( + self.tmp_processing_folder, + output_filename(self.metadata.name, self.metadata.version, self.boundary["name"], 'gpkg') ) - os.makedirs(output_folder, exist_ok=True) - output_fpath = os.path.join(output_folder, f"{self.boundary['name']}.gpkg") self.update_progress(20, "cropping source") self.log.debug("Natural earth vector - cropping Roads to geopkg") diff --git a/dataproc/processors/core/storm/global_mosaics_version_1.py b/dataproc/processors/core/storm/global_mosaics_version_1.py index a2a3cf2..fb1ba66 100644 --- a/dataproc/processors/core/storm/global_mosaics_version_1.py +++ b/dataproc/processors/core/storm/global_mosaics_version_1.py @@ -5,6 +5,7 @@ import os import inspect from typing import List +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import ( BaseProcessorABC, @@ -23,8 +24,8 @@ generate_license_file, fetch_zenodo_doi, tiffs_in_folder, + output_filename ) -from dataproc.exceptions import FolderNotFoundException class Metadata(BaseMetadataABC): @@ -41,12 +42,72 @@ class Metadata(BaseMetadataABC): ) # Version of the Processor dataset_name = "STORM Global Mosaics 10.5281/zenodo.7438145" # The dataset this processor targets data_author = "University of Oxford" + data_title = "STORM tropical cyclone wind speed maps" + data_title_long = "STORM tropical cyclone wind speed return period maps as global GeoTIFFs" + data_summary = """ +Global tropical cyclone wind speed return period maps. + +This dataset is derived with minimal processing from the following datasets +created by Bloemendaal et al, which are released with a CC0 license: + +[1] Bloemendaal, Nadia; de Moel, H. (Hans); Muis, S; Haigh, I.D. (Ivan); Aerts, +J.C.J.H. (Jeroen) (2020): STORM tropical cyclone wind speed return periods. +4TU.ResearchData. Dataset. https://doi.org/10.4121/12705164.v3 + +[2] Bloemendaal, Nadia; de Moel, Hans; Dullaart, Job; Haarsma, R.J. (Reindert); +Haigh, I.D. (Ivan); Martinez, Andrew B.; et al. (2022): STORM climate change +tropical cyclone wind speed return periods. 4TU.ResearchData. Dataset. +https://doi.org/10.4121/14510817.v3 + +Datasets containing tropical cyclone maximum wind speed (in m/s) return periods, +generated using the STORM datasets (see +https://www.nature.com/articles/s41597-020-0381-2) and STORM climate change +datasets (see https://figshare.com/s/397aff8631a7da2843fc). Return periods were +empirically calculated using Weibull's plotting formula. The +STORM_FIXED_RETURN_PERIOD dataset contains maximum wind speeds for a fixed set +of return periods at 10 km resolution in every basin and for every climate model +used here (see below). + +The GeoTIFFs provided in the datasets linked above have been mosaicked into +single files with global extent for each climate model/return period using the +following code: + +https://github.com/nismod/open-gira/blob/219315e57cba54bb18f033844cff5e48dd5979d7/workflow/rules/download/storm-ibtracs.smk#L126-L151 + +Files are named on the pattern: +STORM_FIXED_RETURN_PERIODS_{STORM_MODEL}_{STORM_RP}_YR_RP.tif + +STORM_MODEL is be one of constant, CMCC-CM2-VHR4, CNRM-CM6-1-HR, EC-Earth3P-HR +or HadGEM3-GC31-HM. The "constant" files are for the present day, baseline +climate scenario as explained in dataset [1]. The other files are for 2050, +RCP8.5 under different models as explained in the paper linked from dataset [2]. + +STORM_RP is one of 10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 200, 300, 400, 500, +600, 700, 800, 900, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000 or +10000. +""" + data_citation = """ +Russell, Tom. (2022). STORM tropical cyclone wind speed return periods as global +GeoTIFFs (1.0.0) [Data set]. Zenodo. https://doi.org/10.5281/zenodo.7438145 + +Derived from: + +[1] Bloemendaal, Nadia; de Moel, H. (Hans); Muis, S; Haigh, I.D. (Ivan); Aerts, +J.C.J.H. (Jeroen) (2020): STORM tropical cyclone wind speed return periods. +4TU.ResearchData. Dataset. https://doi.org/10.4121/12705164.v3 + +[2] Bloemendaal, Nadia; de Moel, Hans; Dullaart, Job; Haarsma, R.J. (Reindert); +Haigh, I.D. (Ivan); Martinez, Andrew B.; et al. (2022): STORM climate change +tropical cyclone wind speed return periods. 4TU.ResearchData. Dataset. +https://doi.org/10.4121/14510817.v3 + """ data_license = DataPackageLicense( name="CC0", title="CC0", path="https://creativecommons.org/share-your-work/public-domain/cc0/", ) - data_origin_url = "https://zenodo.org/record/7438145#.Y-S6cS-l30o" + data_origin_url = "https://doi.org/10.5281/zenodo.7438145" + data_formats = ["GeoTIFF"] class Processor(BaseProcessorABC): @@ -66,15 +127,14 @@ def exists(self): self.metadata.version, datafile_ext=".tif", ) - except FolderNotFoundException: + except FileNotFoundError: return False return count_on_backend == self.total_expected_files def generate(self): """Generate files for a given processor""" if self.exists() is True: - self.provenance_log[self.metadata.name] = "exists" - return self.provenance_log + raise ProcessorDatasetExists() else: # Ensure we start with a blank output folder on the storage backend try: @@ -83,7 +143,7 @@ def generate(self): self.metadata.name, self.metadata.version, ) - except FolderNotFoundException: + except FileNotFoundError: pass # Check if the source TIFF exists and fetch it if not self.update_progress(10, "fetching and verifying source") @@ -95,8 +155,16 @@ def generate(self): self.update_progress( 10 + int(idx * (80 / len(source_fpaths))), "cropping source" ) + subfilename = os.path.splitext(os.path.basename(source_fpath))[0] output_fpath = os.path.join( - self.tmp_processing_folder, os.path.basename(source_fpath) + self.tmp_processing_folder, + output_filename( + self.metadata.name, + self.metadata.version, + self.boundary["name"], + 'tif', + dataset_subfilename=subfilename + ) ) crop_success = crop_raster(source_fpath, output_fpath, self.boundary) self.log.debug( diff --git a/dataproc/processors/core/test_fail_processor/__init__.py b/dataproc/processors/core/test_fail_processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataproc/processors/core/test_fail_processor/version_1.py b/dataproc/processors/core/test_fail_processor/version_1.py new file mode 100644 index 0000000..ed31050 --- /dev/null +++ b/dataproc/processors/core/test_fail_processor/version_1.py @@ -0,0 +1,66 @@ +""" +Test Failing Processor +""" + +from time import sleep +import os +import inspect + +from dataproc import DataPackageLicense +from dataproc.processors.internal.base import ( + BaseProcessorABC, + BaseMetadataABC, +) +from dataproc.helpers import ( + version_name_from_file, + create_test_file, + data_file_hash, + datapackage_resource, + processor_name_from_file, +) + + +class Metadata(BaseMetadataABC): + """Processor metadata""" + + name = processor_name_from_file( + inspect.stack()[1].filename + ) # this must follow snakecase formatting, without special chars + description = "A test processor that fails" # Longer processor description + version = version_name_from_file( + inspect.stack()[1].filename + ) # Version of the Processor + dataset_name = "" # The dataset this processor targets + data_author = "" + data_title = "" + data_title_long = "" + data_summary = "" + data_citation = "" + data_license = DataPackageLicense( + name="CC-BY-4.0", + title="Creative Commons Attribution 4.0", + path="https://creativecommons.org/licenses/by/4.0/", + ) + data_origin_url = "http://url" + data_formats = ["GeoTIFF"] + + +class Processor(BaseProcessorABC): + """A Test Failing Processor""" + + def generate(self): + """Generate files for a given processor""" + # Pause to allow inspection + sleep(1) + self.update_progress(30,"waiting") + assert(0==1), "test-fail-processor failed as expected" + return self.provenance_log + + def exists(self): + """Whether all files for a given processor exist on the FS on not""" + return self.storage_backend.processor_file_exists( + self.boundary["name"], + self.metadata.name, + self.metadata.version, + f"{self.boundary['name']}_test.tif", + ) diff --git a/dataproc/processors/core/test_processor/version_1.py b/dataproc/processors/core/test_processor/version_1.py index 1b8eba9..e711feb 100644 --- a/dataproc/processors/core/test_processor/version_1.py +++ b/dataproc/processors/core/test_processor/version_1.py @@ -7,6 +7,7 @@ import inspect from dataproc import DataPackageLicense +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import ( BaseProcessorABC, BaseMetadataABC, @@ -32,12 +33,17 @@ class Metadata(BaseMetadataABC): ) # Version of the Processor dataset_name = "nightlights" # The dataset this processor targets data_author = "Nightlights Author" + data_title = "" + data_title_long = "" + data_summary = "" + data_citation = "" data_license = DataPackageLicense( name="CC-BY-4.0", title="Creative Commons Attribution 4.0", path="https://creativecommons.org/licenses/by/4.0/", ) data_origin_url = "http://url" + data_formats = ["GeoTIFF"] class Processor(BaseProcessorABC): @@ -53,8 +59,7 @@ def generate(self): ) output_fpath = os.path.join(output_folder, f"{self.boundary['name']}_test.tif") if self.exists() is True: - self.update_progress(100,"waiting") - self.provenance_log[f"{self.metadata.name}"] = "exists" + raise ProcessorDatasetExists() else: # Generate a blank tests dataset create_test_file(output_fpath) diff --git a/dataproc/processors/core/wri_aqueduct/version_2.py b/dataproc/processors/core/wri_aqueduct/version_2.py index 11540b5..8952053 100644 --- a/dataproc/processors/core/wri_aqueduct/version_2.py +++ b/dataproc/processors/core/wri_aqueduct/version_2.py @@ -7,6 +7,7 @@ import shutil from celery.app import task +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import ( BaseProcessorABC, @@ -24,9 +25,9 @@ generate_license_file, generate_datapackage, generate_index_file, + output_filename ) from dataproc.processors.core.wri_aqueduct.helpers import HazardAqueduct -from dataproc.exceptions import FolderNotFoundException class Metadata(BaseMetadataABC): @@ -42,7 +43,18 @@ class Metadata(BaseMetadataABC): inspect.stack()[1].filename ) # Version of the Processor dataset_name = "wri_aqueduct" # The dataset this processor targets - data_author = "WRI" + data_title = "Aqueduct Flood Hazard Maps" + data_title_long = "World Resource Institute - Aqueduct Flood Hazard Maps (Version 2, updated October 20, 2020)" + data_author = "Ward, P.J., H.C. Winsemius, S. Kuzma, M.F.P. Bierkens, A. Bouwman, H. de Moel, A. DÃaz Loaiza, et al." + data_summary = """World Resource Institute - Aqueduct Flood Hazard Maps (Version 2 (updated +October 20, 2020)). Inundation depth in meters for coastal and riverine +floods over 1km grid squares. 1 in 2 to 1 in 1000 year return periods. +Baseline, RCP 4.5 & 8.5 emission scenarios. Current and future maps in 2030, +2050 and 2080.""" + data_citation = """ +Ward, P.J., H.C. Winsemius, S. Kuzma, M.F.P. Bierkens, A. Bouwman, H. de Moel, A. DÃaz Loaiza, et al. 2020. +Aqueduct Floods Methodology. Technical Note. Washington, D.C.: World Resources Institute. Available online at: +www.wri.org/publication/aqueduct-floods-methodology.""" data_license = DataPackageLicense( name="CC-BY-4.0", title="Creative Commons Attribution 4.0", @@ -51,6 +63,7 @@ class Metadata(BaseMetadataABC): data_origin_url = ( "http://wri-projects.s3.amazonaws.com/AqueductFloodTool/download/v2/index.html" ) + data_formats = ["GeoTIFF"] class Processor(BaseProcessorABC): @@ -73,15 +86,14 @@ def exists(self): self.metadata.version, datafile_ext=".tif", ) - except FolderNotFoundException: + except FileNotFoundError: return False return count_on_backend == self.total_expected_files def generate(self): """Generate files for a given processor""" if self.exists() is True: - self.provenance_log[self.metadata.name] = "exists" - return self.provenance_log + raise ProcessorDatasetExists() else: # Ensure we start with a blank output folder on the storage backend try: @@ -90,18 +102,12 @@ def generate(self): self.metadata.name, self.metadata.version, ) - except FolderNotFoundException: + except FileNotFoundError: pass # Check if the source TIFF exists and fetch it if not self.update_progress(10, "fetching and verifying source") self._fetch_source() - # Remove partial previous tmp results if they exist - if os.path.exists(self.tmp_processing_folder): - shutil.rmtree(self.tmp_processing_folder) - # Generate the tmp output directory - os.makedirs(self.tmp_processing_folder, exist_ok=True) - self.log.debug("WRI Aqueduct - cropping geotiffs") results_fpaths = [] for idx, fileinfo in enumerate(os.scandir(self.source_folder)): @@ -114,7 +120,19 @@ def generate(self): 10 + int(idx * (80 / self.total_expected_files)), "cropping source" ) geotiff_fpath = os.path.join(self.source_folder, fileinfo.name) - output_fpath = os.path.join(self.tmp_processing_folder, fileinfo.name) + + subfilename = os.path.splitext(fileinfo.name)[0] + output_fpath = os.path.join( + self.tmp_processing_folder, + output_filename( + self.metadata.name, + self.metadata.version, + self.boundary["name"], + 'tif', + dataset_subfilename=subfilename + ) + ) + assert_geotiff(geotiff_fpath) crop_success = crop_raster(geotiff_fpath, output_fpath, self.boundary) self.log.debug( diff --git a/dataproc/processors/core/wri_powerplants/version_130.py b/dataproc/processors/core/wri_powerplants/version_130.py index d60de02..ccbe0d6 100644 --- a/dataproc/processors/core/wri_powerplants/version_130.py +++ b/dataproc/processors/core/wri_powerplants/version_130.py @@ -6,6 +6,7 @@ import inspect from dataproc import DataPackageLicense +from dataproc.exceptions import ProcessorDatasetExists from dataproc.processors.internal.base import BaseProcessorABC, BaseMetadataABC from dataproc.helpers import ( version_name_from_file, @@ -18,8 +19,9 @@ generate_datapackage, unpack_zip, csv_to_gpkg, - gp_crop_file_to_geopkg, + fiona_crop_file_to_geopkg, assert_vector_file, + output_filename ) @@ -33,12 +35,24 @@ class Metadata(BaseMetadataABC): version = version_name_from_file(inspect.stack()[1].filename) dataset_name = "wri_powerplants" data_author = "World Resources Institute" + data_title = "WRI Global Power Plant Database" + data_title_long = "World Resources Institute Global Power Plant Database" + data_summary = """The Global Power Plant Database is a comprehensive, open source database of power plants around the world. It +centralizes power plant data to make it easier to navigate, compare and draw insights for one’s own analysis. +The database covers approximately 35,000 power plants from 167 countries and includes thermal plants (e.g. coal, +gas, oil, nuclear, biomass, waste, geothermal) and renewables (e.g. hydro, wind, solar). Each power plant is +geolocated and entries contain information on plant capacity, generation, ownership, and fuel type. It will be +continuously updated as data becomes available.""" + data_citation = """Global Energy Observatory, Google, KTH Royal Institute of Technology in Stockholm, Enipedia, World Resources +Institute. 2018. Global Power Plant Database. Published on Resource Watch and Google Earth Engine; +http://resourcewatch.org/ https://earthengine.google.com/""" data_license = DataPackageLicense( name="CC-BY-4.0", title="Creative Commons Attribution 4.0", path="https://creativecommons.org/licenses/by/4.0/", ) data_origin_url = "https://datasets.wri.org/dataset/globalpowerplantdatabase" + data_formats = ["Geopackage"] class Processor(BaseProcessorABC): @@ -50,6 +64,47 @@ class Processor(BaseProcessorABC): source_zip_url = "https://wri-dataportal-prod.s3.amazonaws.com/manual/global_power_plant_database_v_1_3.zip" expected_zip_hash = "083f11452efc1ed0e8fb1494f0ce49e5c37718e2" source_file = "global_power_plant_database.gpkg" + output_schema = { + "properties": { + "country": "str", + "country_long": "str", + "name": "str", + "gppd_idnr": "str", + "capacity_mw": "float", + "latitude": "float", + "longitude": "float", + "primary_fuel": "str", + "other_fuel1": "str", + "other_fuel2": "str", + "other_fuel3": "str", + "commissioning_year": "float", + "owner": "str", + "source": "str", + "url": "str", + "geolocation_source": "str", + "wepp_id": "str", + "year_of_capacity_data": "float", + "generation_gwh_2013": "float", + "generation_gwh_2014": "float", + "generation_gwh_2015": "float", + "generation_gwh_2016": "float", + "generation_gwh_2017": "float", + "generation_gwh_2018": "float", + "generation_gwh_2019": "float", + "generation_data_source": "str", + "estimated_generation_gwh_2013": "float", + "estimated_generation_gwh_2014": "float", + "estimated_generation_gwh_2015": "float", + "estimated_generation_gwh_2016": "float", + "estimated_generation_gwh_2017": "float", + "estimated_generation_note_2013": "str", + "estimated_generation_note_2014": "str", + "estimated_generation_note_2015": "str", + "estimated_generation_note_2016": "str", + "estimated_generation_note_2017": "str", + }, + "geometry": "Point", + } expected_source_gpkg_shape = (34936, 37) def exists(self): @@ -58,20 +113,18 @@ def exists(self): self.boundary["name"], self.metadata.name, self.metadata.version, - f"{self.boundary['name']}.gpkg", + output_filename(self.metadata.name, self.metadata.version, self.boundary["name"], 'gpkg') ) def generate(self): """Generate files for a given processor""" if self.exists() is True: - self.provenance_log[self.metadata.name] = "exists" - return self.provenance_log - # Setup output path in the processing backend - output_folder = self.paths_helper.build_absolute_path( - self.boundary["name"], self.metadata.name, self.metadata.version, "outputs" + raise ProcessorDatasetExists() + + output_fpath = os.path.join( + self.tmp_processing_folder, + output_filename(self.metadata.name, self.metadata.version, self.boundary["name"], 'gpkg') ) - os.makedirs(output_folder, exist_ok=True) - output_fpath = os.path.join(output_folder, f"{self.boundary['name']}.gpkg") # Fetch source as required self.update_progress(10, "fetching and verifying source") @@ -80,8 +133,11 @@ def generate(self): # Crop to given boundary self.update_progress(50, "cropping source") self.log.debug("%s - cropping to geopkg", self.metadata.name) - crop_result = gp_crop_file_to_geopkg( - source_gpkg_fpath, self.boundary, output_fpath, mask_type="boundary" + crop_result = fiona_crop_file_to_geopkg( + source_gpkg_fpath, + self.boundary, + output_fpath, + self.output_schema ) self.provenance_log[f"{self.metadata.name} - crop completed"] = crop_result @@ -107,7 +163,9 @@ def generate(self): self.metadata, [result_uri], "GEOPKG", sizes, hashes ) self.provenance_log["datapackage"] = datapkg - self.log.debug("%s generated datapackage in log: %s", self.metadata.name, datapkg) + self.log.debug( + "%s generated datapackage in log: %s", self.metadata.name, datapkg + ) # Cleanup as required return self.provenance_log @@ -151,14 +209,17 @@ def _fetch_source(self) -> str: os.makedirs(self.source_folder, exist_ok=True) if self._all_source_exists(): self.log.debug( - "%s - all source files appear to exist and are valid", self.metadata.name + "%s - all source files appear to exist and are valid", + self.metadata.name, ) return os.path.join(self.source_folder, self.source_file) # Fetch the source zip self.log.debug("%s - fetching zip", self.metadata.name) local_zip_fpath = self._fetch_zip() self.log.debug("%s - fetched zip to %s", self.metadata.name, local_zip_fpath) - self.provenance_log[f"{self.metadata.name} - zip download path"] = local_zip_fpath + self.provenance_log[ + f"{self.metadata.name} - zip download path" + ] = local_zip_fpath # Unpack self.log.debug("%s - unpacking zip", self.metadata.name) unpack_zip(local_zip_fpath, self.tmp_processing_folder) @@ -177,7 +238,9 @@ def _fetch_source(self) -> str: longitude_col="longitude", ) self.log.info( - "%s - CSV conversion to source GPKG success: %s", self.metadata.name, converted + "%s - CSV conversion to source GPKG success: %s", + self.metadata.name, + converted, ) return gpkg_fpath diff --git a/dataproc/processors/internal/base.py b/dataproc/processors/internal/base.py index 6846ba1..48c4be2 100644 --- a/dataproc/processors/internal/base.py +++ b/dataproc/processors/internal/base.py @@ -17,7 +17,11 @@ class BaseMetadataABC(ABC): description: str = "" # Longer processor description version: str = "" # Version of the Processor dataset_name: str = "" # The dataset this processor targets + data_title: str = "" # Short one-liner title for dataset, ~30 characters is good + data_title_long: str = "" # Long title for dataset data_author: str = "" + data_summary: str = "" # 1-3 paragraph prose summary of the dataset + data_citation: str = "" # Suggested citation, e.g. "Nicholas, C (2023) irv-autopkg. [Software] Available at: https://github.com/nismod/irv-autopkg" data_license: DataPackageLicense = None data_origin_url: str = "" @@ -45,7 +49,7 @@ def __init__( self.source_folder = self.paths_helper.build_absolute_path("source_data") os.makedirs(self.source_folder, exist_ok=True) # Tmp Processing data will be cleaned between processor runs - self.tmp_processing_folder = self.paths_helper.build_absolute_path("tmp") + self.tmp_processing_folder = self.paths_helper.build_absolute_path("tmp", self.boundary['name']) os.makedirs(self.tmp_processing_folder, exist_ok=True) def __enter__(self): @@ -60,7 +64,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): exc_tb, ) try: - shutil.rmtree(self.tmp_processing_folder) + shutil.rmtree(self.tmp_processing_folder, ignore_errors=True) except FileNotFoundError: pass diff --git a/dataproc/tasks.py b/dataproc/tasks.py index c9c79f6..874653f 100644 --- a/dataproc/tasks.py +++ b/dataproc/tasks.py @@ -5,7 +5,7 @@ from contextlib import contextmanager import logging -from celery import signals +from celery import signals, states from celery.utils.log import get_task_logger from redis import Redis @@ -14,7 +14,6 @@ CELERY_APP, TASK_LOCK_TIMEOUT, STORAGE_BACKEND, - LOCALFS_STORAGE_BACKEND_ROOT, LOCALFS_PROCESSING_BACKEND_ROOT, REDIS_HOST, ) @@ -24,22 +23,25 @@ BoundaryProcessor, ProvenanceProcessor, ) -from dataproc.exceptions import ProcessorAlreadyExecutingException +from dataproc.exceptions import ProcessorAlreadyExecutingException, ProcessorDatasetExists, ProcessorExecutionFailed from dataproc.backends.storage import init_storage_backend # Setup Configured Storage Backend -storage_backend = init_storage_backend(STORAGE_BACKEND)(LOCALFS_STORAGE_BACKEND_ROOT) +storage_backend = init_storage_backend(STORAGE_BACKEND) # Used for guarding against parallel execution of duplicate tasks redis_client = Redis(host=REDIS_HOST) +def task_sig_exists(task_sig) -> bool: + """Check a task signature in Redis""" + return redis_client.exists(task_sig) != 0 @contextmanager def redis_lock(task_sig: str): """ Manage Task execution lock within redis """ - if redis_client.exists(task_sig): + if task_sig_exists(task_sig) is True: raise ProcessorAlreadyExecutingException() yield redis_client.setex(task_sig, TASK_LOCK_TIMEOUT, value="") @@ -114,8 +116,24 @@ def processor_task( ::param sink Any Sink for result of previous processor in the group """ + retry_countdown = 5 logger = get_task_logger(__name__) task_sig = task_signature(boundary["name"], processor_name_version) + # There can be cases where two dup tasks are submitted - one runs the boundary processors and the other ends up running the actual processing + # In this case there is a chance the boundary processor does not complete before the processor runs (as it ends up running in parallel). + # So here we ensure the boundary step is complete for external tasks before continuing + # NOTE: This is the ONLY retry condition for a Dataset Processor + boundary_task_sig = task_signature(boundary["name"], "boundary_setup") + try: + if task_sig_exists(boundary_task_sig) is True: + raise ProcessorAlreadyExecutingException("boundary setup for this processor executing") + except ProcessorAlreadyExecutingException as err: + logger.warning( + "boundary task with signature %s is currently executing for processor %s - will retry processor in %s secs", + boundary_task_sig, task_sig, retry_countdown + ) + raise self.retry(exc=err, countdown=retry_countdown) + # Run the processor try: with redis_lock(task_sig) as acquired: if acquired: @@ -132,13 +150,17 @@ def processor_task( result = proc.generate() # Update sink for this processor sink[processor_name_version] = result + return sink + except ProcessorDatasetExists: + sink[processor_name_version] = {"skipped": f"{task_sig} exists"} + return sink except Exception as err: logger.exception("") # Update sink for this processor - sink[processor_name_version] = {"failed": type(err).__name__} + sink[processor_name_version] = {"failed": f"{type(err).__name__} - {err}"} + return sink finally: _ = redis_client.getdel(task_sig) - return sink else: raise ProcessorAlreadyExecutingException() except ProcessorAlreadyExecutingException: @@ -168,18 +190,20 @@ def generate_provenance(self, sink: Any, boundary: Boundary): if isinstance(sink, dict): sink = [sink] proc = ProvenanceProcessor(boundary, storage_backend) - res = proc.generate(sink) + return proc.generate(sink) except Exception as err: logger.exception("") # Update sink for this processor - sink["generate_provenance"] = {"failed": type(err).__name__} + if isinstance(sink, dict): + sink["generate_provenance"] = {"failed": type(err).__name__} + else: + sink.append({"generate_provenance failed": type(err).__name__}) finally: _ = redis_client.getdel(task_sig) else: raise ProcessorAlreadyExecutingException() - except ProcessorAlreadyExecutingException: + except ProcessorAlreadyExecutingException as err: logger.warning( "task with signature %s skipped because it was already executing", task_sig ) - self.retry(countdown=5) - return res + raise self.retry(exc=err, countdown=5) diff --git a/docker-compose.yaml b/docker-compose.yaml index af10aec..2b351de 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -21,13 +21,18 @@ services: # - ./tests/data/packages:/www/data/packages # Testing redis: - image: redis:6.2-alpine + image: ghcr.io/nismod/irv-autopkg-redis:0.1 + build: ./redis restart: always + user: autopkg ports: - 6379:6379 command: redis-server --save 20 1 --loglevel debug volumes: - ./data/redis:/data + cpus: "0.25" + mem_reservation: "50M" + mem_limit: "250M" flower: image: mher/flower @@ -42,14 +47,13 @@ services: CELERY_RESULT_BACKEND: redis://redis dataproc: - image: ghcr.io/nismod/irv-autopkg:0.2.4-dev + image: ghcr.io/nismod/irv-autopkg:0.2.7-dev user: autopkg build: . volumes: - ./data/packages:/data/packages - ./data/processing:/data/processing - - ./tests/data/packages:/usr/src/app/tests/data/packages - - ./tests/data/tmp:/usr/src/app/tests/data/tmp + - ./tests:/usr/src/app/tests env_file: - envs/.api_and_dataproc.env command: celery --app dataproc.tasks worker @@ -58,35 +62,38 @@ services: mem_limit: "1G" api: - image: ghcr.io/nismod/irv-autopkg:0.2.4-dev + image: ghcr.io/nismod/irv-autopkg:0.2.7-dev build: . volumes: - ./data/packages:/data/packages - ./data/processing:/data/processing - - ./tests/data/packages:/usr/src/app/tests/data/packages - - ./tests/data/tmp:/usr/src/app/tests/data/tmp + - ./tests:/usr/src/app/tests ports: - 8000:8000 env_file: - envs/.api_and_dataproc.env - command: uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload + command: uvicorn api.main:app --host 0.0.0.0 --port 8000 + cpus: "0.25" + mem_reservation: "50M" + mem_limit: "250M" - # These test-harness containers require API and Dataproc to be running + # API test-harness requires API and Dataproc to be running # WARNING - THESE TESTS WILL WIPE THE CONFIGURED TEST HARNESS DB from anything in MODELS # To run tests change AUTOPKG_DEPLOYMENT_ENV=test in .api_and_dataproc.env, then reboot api and dataproc services test-api: - image: ghcr.io/nismod/irv-autopkg:0.2.4-dev + image: ghcr.io/nismod/irv-autopkg:0.2.7-dev volumes: - - ./tests/data:/usr/src/app/tests/data + - ./tests:/usr/src/app/tests env_file: - envs/.api_and_dataproc.env - command: python3 -m unittest discover /usr/src/app/tests/api + command: python -m unittest discover /usr/src/app/tests/api + # Dataproc test-harness only requires DB test-dataproc: - image: ghcr.io/nismod/irv-autopkg:0.2.4-dev + image: ghcr.io/nismod/irv-autopkg:0.2.7-dev volumes: - - ./tests/data:/usr/src/app/tests/data + - ./tests:/usr/src/app/tests env_file: - envs/.api_and_dataproc.env command: python -m unittest discover /usr/src/app/tests/dataproc diff --git a/docs/architecture.png b/docs/architecture.png new file mode 100644 index 0000000..5b9f599 Binary files /dev/null and b/docs/architecture.png differ diff --git a/docs/package_structure.png b/docs/package_structure.png new file mode 100644 index 0000000..36265db Binary files /dev/null and b/docs/package_structure.png differ diff --git a/redis/Dockerfile b/redis/Dockerfile new file mode 100644 index 0000000..5f28e98 --- /dev/null +++ b/redis/Dockerfile @@ -0,0 +1,5 @@ +FROM redis:6.2-alpine + +RUN addgroup -g 1002 autopkg && adduser -SHD autopkg -u 1002 -G autopkg + +USER autopkg \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6e23d19..63f142f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,6 @@ shapely==2.0.0 pyproj==3.4.1 datapackage==1.15.2 zenodo_get==1.3.4 -geopandas==0.12.2 \ No newline at end of file +geopandas==0.12.2 +pyarrow==11.0.0 +fiona==1.9.1 \ No newline at end of file diff --git a/tests/api/integration/test_boundaries.py b/tests/api/integration/test_boundaries.py index a8a2953..f784f2c 100644 --- a/tests/api/integration/test_boundaries.py +++ b/tests/api/integration/test_boundaries.py @@ -100,8 +100,8 @@ def test_search_boundary_by_name(self): """ Retrieve boundaries by searching for a name """ - search_name = 'gh' - expected_names = ['ghana', 'guinea'] + search_name = 'mbi' + expected_names = ['mozambique', 'gambia', 'zambia'] route = build_route(f"{BOUNDARY_SEARCH_ROUTE}?name={search_name}") response = requests.get(route) self.assert_boundary_summary(response, expected_count=len(expected_names)) @@ -129,6 +129,16 @@ def test_search_boundary_by_coords(self): self.assert_boundary_summary(response, expected_count=len(expected_names)) self.assertCountEqual( [item["name"] for item in response.json()], expected_names) + + def test_search_boundary_by_coords_zero(self): + search_latitude = 28.2 + search_longitude = 0.0 + expected_names = ['algeria'] + route = build_route(f"{BOUNDARY_SEARCH_ROUTE}?latitude={search_latitude}&longitude={search_longitude}") + response = requests.get(route) + self.assert_boundary_summary(response, expected_count=len(expected_names)) + self.assertCountEqual( + [item["name"] for item in response.json()], expected_names) def test_search_boundary_by_coords_nothing_found(self): search_latitude = 128.2 diff --git a/tests/api/integration/test_jobs.py b/tests/api/integration/test_jobs.py index 21c619a..c22b67d 100644 --- a/tests/api/integration/test_jobs.py +++ b/tests/api/integration/test_jobs.py @@ -9,6 +9,7 @@ from uuid import uuid4 from time import time, sleep import json +import shutil import requests @@ -21,7 +22,19 @@ from tests.dataproc.integration.processors import ( LOCAL_FS_PACKAGE_DATA_TOP_DIR, ) -from config import PACKAGES_HOST_URL +from tests.helpers import ( + clean_packages, + assert_package_awss3, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import AWSS3StorageBackend, S3Manager +from config import ( + STORAGE_BACKEND, + S3_BUCKET, + S3_REGION, + PACKAGES_HOST_URL, +) + JOB_SUBMIT_DATA_BOUNDARY_NOEXIST = { "boundary_name": "noexist", @@ -44,23 +57,29 @@ JOB_SUBMIT_DATA_GAMBIA_TEST_PROC = { "boundary_name": "gambia", "processors": ["test_processor.version_1"], -} # Awaits 5 secs +} + +JOB_SUBMIT_DATA_ZIMBABWE_TEST_PROC = { + "boundary_name": "zimbabwe", + "processors": ["test_fail_processor.version_1"], +} JOB_SUBMIT_DATA_ZAMBIA_TEST_PROC = { "boundary_name": "zambia", "processors": ["test_processor.version_1"], -} # Awaits 5 secs +} JOB_SUBMIT_DATA_GHANA_TEST_PROC = { "boundary_name": "ghana", "processors": ["test_processor.version_1"], -} # Awaits 5 secs +} JOB_SUBMIT_DATA_SSUDAN_NE_VECTOR_PROC = { "boundary_name": "ssudan", "processors": ["natural_earth_vector.version_1"], -} # Awaits 5 secs +} +PACAKGES_USED = ["gambia", "zambia", "ssudan", "zimbabwe"] class TestProcessingJobs(unittest.TestCase): @@ -68,8 +87,37 @@ class TestProcessingJobs(unittest.TestCase): These tests require API and Celery Worker to be running (with redis) """ + @classmethod + def setUpClass(cls): + cls.max_job_await = 20 # secs + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=PACAKGES_USED, + ) + + @classmethod + def tearDownClass(cls): + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=PACAKGES_USED, + ) + def setUp(self): - self.max_job_await = 6 # secs + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=PACAKGES_USED, + ) def test_get_job_no_exist(self): """""" @@ -115,13 +163,9 @@ def test_submit_job_duplicate_processor(self): response.json()["detail"][0]["msg"], "duplicate processors not allowed" ) - # __NOTE__: These submission tests use different bounaries - # so results do not overlap in the backend queue - def test_submit_job(self): """Simple submission and await completion of a job""" # Ensure the package tree is clean - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["zambia"]) expected_code = 202 route = build_route(JOBS_BASE_ROUTE) response = requests.post(route, json=JOB_SUBMIT_DATA_GAMBIA_TEST_PROC) @@ -134,29 +178,68 @@ def test_submit_job(self): route = build_route(JOB_STATUS_ROUTE.format(job_id=job_id)) response = requests.get(route) if response.json()["job_group_processors"]: - self.assertEqual(response.json()["job_group_processors"][0]["job_id"], job_id) + self.assertEqual( + response.json()["job_group_processors"][0]["job_id"], job_id + ) if not response.json()["job_group_status"] == "PENDING": + # Final await for any S3 refreshing backend + sleep(1.0) break - sleep(0.2) + sleep(1.0) if (time() - start) > self.max_job_await: self.fail("max await reached") self.assertEqual(response.json()["job_group_status"], "COMPLETE") # Assert the package integrity, including submitted processor - assert_package( - LOCAL_FS_PACKAGE_DATA_TOP_DIR, - "gambia", - ) - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["gambia"]) + if STORAGE_BACKEND == "localfs": + assert_package( + LOCAL_FS_PACKAGE_DATA_TOP_DIR, + "gambia", + ) + elif STORAGE_BACKEND == "awss3": + assert_package_awss3( + self.storage_backend, + "gambia", + expected_processor_versions=JOB_SUBMIT_DATA_GAMBIA_TEST_PROC["processors"], + ) - def test_submit_job_already_processing_using_test_processor(self): + def test_submit_failing_job(self): + """Submission of a job that fals""" + # Ensure the package tree is clean + expected_code = 202 + route = build_route(JOBS_BASE_ROUTE) + response = requests.post(route, json=JOB_SUBMIT_DATA_ZIMBABWE_TEST_PROC) + self.assertEqual(response.status_code, expected_code) + self.assertIn("job_id", response.json().keys()) + job_id = response.json()["job_id"] + # Await job completion + start = time() + while True: + route = build_route(JOB_STATUS_ROUTE.format(job_id=job_id)) + response = requests.get(route) + if response.json()["job_group_processors"]: + self.assertEqual( + response.json()["job_group_processors"][0]["job_id"], job_id + ) + if not response.json()["job_group_status"] == "PENDING": + # Final await for any S3 refreshing backend + sleep(1.0) + break + sleep(1.0) + if (time() - start) > self.max_job_await: + self.fail("max await reached") + self.assertEqual(response.json()["job_group_status"], "COMPLETE") + # Job Statuses show failed + self.assertEqual(response.json()["job_group_processors"][0]['job_status'], "FAILURE") + + + def test_submit_job_already_executing_using_test_processor(self): """ - Submission of a second job containing - the same boundary and processor while one is already executing + Submission of a multiple jobs containing the same boundary and + processor while one is already executing (test processor) """ - max_wait = 20 # secs + max_wait = 60 # secs dup_processors_to_submit = 8 expected_responses = [202 for i in range(dup_processors_to_submit)] - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["gambia"]) route = build_route(JOBS_BASE_ROUTE) responses = [] for _ in range(dup_processors_to_submit): @@ -191,6 +274,13 @@ def test_submit_job_already_processing_using_test_processor(self): sleep(0.5) # Jobs completed successfully self.assertEqual(statuses, ["COMPLETE" for i in range(dup_processors_to_submit)]) + # Job Statuses show skipped and success + expected_msgs = ["SKIPPED" for _ in range(dup_processors_to_submit-1)] + expected_msgs.append("SUCCESS") + self.assertCountEqual( + [i['job_status'] for i in results], + expected_msgs + ) test_proc_results = [] for result in results: test_proc_results.append(result["job_result"]) @@ -218,24 +308,28 @@ def test_submit_job_already_processing_using_test_processor(self): test_proc_results, ) # Processor success only reported once - self.assertTrue(len(set([json.dumps(i) for i in test_proc_results]))) + self.assertTrue(len(set([json.dumps(i) for i in test_proc_results])), 1) # Assert we only get a single package output - assert_package( - LOCAL_FS_PACKAGE_DATA_TOP_DIR, - "zambia", - ) - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["zambia"]) + if STORAGE_BACKEND == "localfs": + assert_package( + LOCAL_FS_PACKAGE_DATA_TOP_DIR, + "zambia", + ) + elif STORAGE_BACKEND == "awss3": + assert_package_awss3( + self.storage_backend, + "zambia", + expected_processor_versions=JOB_SUBMIT_DATA_ZAMBIA_TEST_PROC["processors"], + ) def test_submit_job_already_processing_using_ne_vector_processor(self): """ - Submission of a second job containing - the same boundary and processor while one is already executing + Submission of a second job containing the same boundary and processor while one is already executing (ne vector) """ - max_wait = 60 # secs - dup_processors_to_submit = 8 + max_wait = 30 # secs + dup_processors_to_submit = 2 expected_responses = [202 for i in range(dup_processors_to_submit)] - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["ssudan"]) route = build_route(JOBS_BASE_ROUTE) responses = [] for _ in range(dup_processors_to_submit): @@ -270,14 +364,28 @@ def test_submit_job_already_processing_using_ne_vector_processor(self): sleep(0.5) # Jobs completed successfully self.assertEqual(statuses, ["COMPLETE" for i in range(dup_processors_to_submit)]) + # Job Statuses show skipped and success + expected_msgs = ["SKIPPED" for _ in range(dup_processors_to_submit-1)] + expected_msgs.append("SUCCESS") + self.assertCountEqual( + [i['job_status'] for i in results], + expected_msgs + ) # Between the two sets of results there should be success for # both boundaries and test_processor test_proc_results = [] for result in results: test_proc_results.append(result["job_result"]) - # Correct total processing results - including 7 exists + # Correct total processing results self.assertEqual(len(test_proc_results), dup_processors_to_submit) - # Test Processor Success + # Should have only ran fully once - the rest should be exists + count_processed_e2e = 0 + count_processed_e2e_key = "natural_earth_vector - loaded NE Roads to PG" + for i in test_proc_results: + if count_processed_e2e_key in i.keys(): + count_processed_e2e+=1 + self.assertEqual(count_processed_e2e, 1) + # Test Processor Success keys all exist self.assertIn( sorted([ "natural_earth_vector - zip download path", @@ -294,9 +402,14 @@ def test_submit_job_already_processing_using_ne_vector_processor(self): # Processor success only reported once self.assertTrue(len(set([json.dumps(i) for i in test_proc_results]))) - # Assert we only get a single package output - assert_package( - LOCAL_FS_PACKAGE_DATA_TOP_DIR, - "ssudan", - ) - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["ssudan"]) + if STORAGE_BACKEND == "localfs": + assert_package( + LOCAL_FS_PACKAGE_DATA_TOP_DIR, + "ssudan", + ) + elif STORAGE_BACKEND == "awss3": + assert_package_awss3( + self.storage_backend, + "ssudan", + expected_processor_versions=JOB_SUBMIT_DATA_SSUDAN_NE_VECTOR_PROC["processors"], + ) diff --git a/tests/api/integration/test_packages.py b/tests/api/integration/test_packages.py index ddf96cf..425890c 100644 --- a/tests/api/integration/test_packages.py +++ b/tests/api/integration/test_packages.py @@ -6,19 +6,33 @@ import sys import inspect import unittest +import shutil import requests -from tests.helpers import build_route, create_tree, remove_tree, assert_datapackage_resource - current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parent_dir = os.path.dirname(current_dir) sys.path.insert(0, parent_dir) from api.routes import PACKAGE_ROUTE, PACKAGES_BASE_ROUTE +from tests.helpers import ( + build_route, + create_tree, + remove_tree, + assert_datapackage_resource, + create_tree_awss3, + clean_packages, +) from tests.dataproc.integration.processors import ( LOCAL_FS_PACKAGE_DATA_TOP_DIR, ) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + STORAGE_BACKEND, + S3_BUCKET, + S3_REGION +) class TestPackages(unittest.TestCase): @@ -27,66 +41,101 @@ class TestPackages(unittest.TestCase): These tests require API and Celery Worker to be run ning (with redis) """ - def assert_package(self, response, expected_boundary_name: str, expected_dataset_names_versions: list): + @classmethod + def setUpClass(cls): + cls.backend = init_storage_backend(STORAGE_BACKEND) + + @classmethod + def tearDownClass(cls): + # Package data + clean_packages( + STORAGE_BACKEND, + cls.backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) + + def assert_package( + self, + response, + expected_boundary_name: str, + expected_dataset_names_versions: list, + ): """ Check the package repsonse is valid ::param expected_dataset_names_versions list ["natural_earth_raster.version_1", ...] """ self.assertEqual(response.status_code, 200) - self.assertEqual(response.json()['boundary_name'], expected_boundary_name) + self.assertEqual(response.json()["boundary_name"], expected_boundary_name) name_versions = [] # Check the processors for dataset in response.json()["processors"]: - for version in dataset['versions']: + for version in dataset["versions"]: name_versions.append(f'{dataset["name"]}.{version["version"]}') - self.assertListEqual( - name_versions, - expected_dataset_names_versions - ) + self.assertListEqual(name_versions, expected_dataset_names_versions) # Ensure we have a nested datapackage self.assertIn("datapackage", response.json().keys()) - for dp_resource in response.json()['datapackage']['resources']: + for dp_resource in response.json()["datapackage"]["resources"]: assert_datapackage_resource(dp_resource) def test_get_all_packages(self): """ Retrieve all packages """ - if not LOCAL_FS_PACKAGE_DATA_TOP_DIR: - raise Exception("localfs storage root not set in env") - create_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + if STORAGE_BACKEND == "localfs": + create_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + elif STORAGE_BACKEND == "awss3": + with S3Manager( + *self.backend._parse_env(), region=self.backend.s3_region + ) as s3_fs: + create_tree_awss3( + s3_fs, + S3_BUCKET, + ) route = build_route(PACKAGES_BASE_ROUTE) response = requests.get(route) # Ensure we can find at least the fake packages we created self.assertIn( - 'zambia', - [boundary['boundary_name'] for boundary in response.json()] + "zambia", [boundary["boundary_name"] for boundary in response.json()] ) self.assertIn( - 'gambia', - [boundary['boundary_name'] for boundary in response.json()] + "gambia", [boundary["boundary_name"] for boundary in response.json()] ) remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR) def test_get_package_by_name_not_found(self): """Attempt to retrieve details of a package which does not exist""" - route = build_route(PACKAGE_ROUTE.format(boundary_name='noexist')) + route = build_route(PACKAGE_ROUTE.format(boundary_name="noexist")) response = requests.get(route) self.assertEqual(response.status_code, 404) - self.assertDictEqual(response.json(), {'detail': 'Package noexist not found'}) + self.assertDictEqual(response.json(), {"detail": "Package noexist not found"}) def test_get_package_by_name_no_valid_datasets(self): """ - Attempt to Retrieve details of a package by boundary name, + Attempt to Retrieve details of a package by boundary name, where there are no datasets which have applicable processors """ - create_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=['gambia'], datasets=['noexist']) - route = build_route(PACKAGE_ROUTE.format(boundary_name='gambia')) + if STORAGE_BACKEND == "localfs": + create_tree( + LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["gambia"], datasets=["noexist"] + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager( + *self.backend._parse_env(), region=self.backend.s3_region + ) as s3_fs: + create_tree_awss3( + s3_fs, S3_BUCKET, packages=["gambia"], datasets=["noexist"] + ) + route = build_route(PACKAGE_ROUTE.format(boundary_name="gambia")) response = requests.get(route) self.assertEqual(response.status_code, 404) - self.assertDictEqual(response.json(), {'detail': 'Package gambia has no existing or executing datasets'}) - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=['gambia']) + self.assertDictEqual( + response.json(), + {"detail": "Package gambia has no existing or executing datasets"}, + ) + remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["gambia"]) def test_get_package_by_name(self): """ @@ -94,9 +143,23 @@ def test_get_package_by_name(self): Package is created within the test, but the processor must exist and be valid (natural_earth_raster.version_1) """ - create_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=['gambia'], datasets=['natural_earth_raster']) - route = build_route(PACKAGE_ROUTE.format(boundary_name='gambia')) + if STORAGE_BACKEND == "localfs": + create_tree( + LOCAL_FS_PACKAGE_DATA_TOP_DIR, + packages=["gambia"], + datasets=["natural_earth_raster"], + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager( + *self.backend._parse_env(), region=self.backend.s3_region + ) as s3_fs: + create_tree_awss3( + s3_fs, + S3_BUCKET, + packages=["gambia"], + datasets=["natural_earth_raster"], + ) + route = build_route(PACKAGE_ROUTE.format(boundary_name="gambia")) response = requests.get(route) self.assert_package(response, "gambia", ["natural_earth_raster.version_1"]) - remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=['gambia']) - + remove_tree(LOCAL_FS_PACKAGE_DATA_TOP_DIR, packages=["gambia"]) diff --git a/tests/api/integration/test_processors.py b/tests/api/integration/test_processors.py new file mode 100644 index 0000000..4f4f2e0 --- /dev/null +++ b/tests/api/integration/test_processors.py @@ -0,0 +1,105 @@ +""" +Tests for Processor Endpoints +""" + +import os +import sys +import inspect +import unittest + +import requests + +current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) +parent_dir = os.path.dirname(current_dir) +sys.path.insert(0, parent_dir) + +from api.routes import ( + PROCESSORS_BASE_ROUTE, + PROCESSORS_NAME_ROUTE, + PROCESSORS_VERSION_ROUTE, +) +from tests.helpers import build_route + +EXPECTED_PROCESSOR_VERSION = { + "name": "test_processor.version_1", + "description": "A test processor for nightlights", + "version": "version_1", + "data_author": "Nightlights Author", + "data_title": "", + "data_title_long": "", + "data_summary": "", + "data_citation": "", + "data_license": { + "name": "CC-BY-4.0", + "path": "https://creativecommons.org/licenses/by/4.0/", + "title": "Creative Commons Attribution 4.0", + }, + "data_origin_url": "http://url", + "data_formats" : ["GeoTIFF"] +} + + +class TestProcessors(unittest.TestCase): + + """ + These tests require API to be running + """ + + @classmethod + def setUpClass(cls): + pass + + @classmethod + def tearDownClass(cls): + pass + + def test_get_all_processors(self): + """ + Retrieve all Processors + """ + route = build_route(PROCESSORS_BASE_ROUTE) + response = requests.get(route) + self.assertEqual(response.status_code, 200) + self.assertTrue(len(response.json()) > 0) + self.assertIn("test_processor", [proc['name'] for proc in response.json()]) + + def test_get_processor_name_noexist(self): + """ + Retrieve a processor by name which does not exist + """ + route = build_route(PROCESSORS_NAME_ROUTE.format(name="noexist")) + response = requests.get(route) + self.assertEqual(response.status_code, 404) + + def test_get_processor_name_version_noexist(self): + """ + Retrieve a processor version which does not exist + """ + route = build_route( + PROCESSORS_VERSION_ROUTE.format(name="test_processor", version="noexist") + ) + response = requests.get(route) + self.assertEqual(response.status_code, 404) + + def test_get_processor_by_name(self): + """ + Retrieve a processor by name + """ + route = build_route(PROCESSORS_NAME_ROUTE.format(name="test_processor")) + response = requests.get(route) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["name"], "test_processor") + self.assertEqual(len(response.json()["versions"]), 1) + self.assertDictEqual(response.json()["versions"][0], EXPECTED_PROCESSOR_VERSION) + + def test_get_processor_version(self): + """ + Retrieve a processor version + """ + route = build_route( + PROCESSORS_VERSION_ROUTE.format( + name="test_processor", version=EXPECTED_PROCESSOR_VERSION["version"] + ) + ) + response = requests.get(route) + self.assertDictEqual(response.json(), EXPECTED_PROCESSOR_VERSION) diff --git a/tests/data/isimp_drought_v1/lange2020_clm45_gfdl-esm2m_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2030_occurrence.tif b/tests/data/isimp_drought_v1/lange2020_clm45_gfdl-esm2m_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2030_occurrence.tif new file mode 100644 index 0000000..5a312e8 Binary files /dev/null and b/tests/data/isimp_drought_v1/lange2020_clm45_gfdl-esm2m_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2030_occurrence.tif differ diff --git a/tests/data/isimp_drought_v1/lange2020_clm45_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif b/tests/data/isimp_drought_v1/lange2020_clm45_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif new file mode 100644 index 0000000..6f75db0 Binary files /dev/null and b/tests/data/isimp_drought_v1/lange2020_clm45_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif differ diff --git a/tests/data/isimp_drought_v1/lange2020_lpjml_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif b/tests/data/isimp_drought_v1/lange2020_lpjml_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif new file mode 100644 index 0000000..fd444ce Binary files /dev/null and b/tests/data/isimp_drought_v1/lange2020_lpjml_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif differ diff --git a/tests/data/load_boundaries.py b/tests/data/load_boundaries.py index 20daa07..2d8ed0a 100644 --- a/tests/data/load_boundaries.py +++ b/tests/data/load_boundaries.py @@ -65,7 +65,8 @@ def load_boundaries( long_name_column="name_long", admin_level="0", wipe_table=True, - skip_names=['-99'] + skip_names=['-99'], + setup_tables=True ) -> Tuple[bool, List]: """ Load a geojson file of multipolygons into Boundaries table @@ -75,6 +76,8 @@ def load_boundaries( db_uri = get_db_uri_sync(API_POSTGRES_DB) # Init DB and Load via SA engine = sa.create_engine(db_uri, pool_pre_ping=True) + if setup_tables is True: + db.Base.metadata.create_all(engine) if wipe_table is True: for tbl in reversed(db.Base.metadata.sorted_tables): engine.execute(tbl.delete()) diff --git a/tests/data/notafile.gpkg b/tests/data/notafile.gpkg new file mode 100644 index 0000000..e69de29 diff --git a/tests/data/tmp/.gitignore b/tests/data/tmp/.gitignore deleted file mode 100644 index 86d0cb2..0000000 --- a/tests/data/tmp/.gitignore +++ /dev/null @@ -1,4 +0,0 @@ -# Ignore everything in this directory -* -# Except this file -!.gitignore \ No newline at end of file diff --git a/tests/dataproc/integration/processors/test_gri_osm.py b/tests/dataproc/integration/processors/test_gri_osm.py index ac26dfc..350b440 100644 --- a/tests/dataproc/integration/processors/test_gri_osm.py +++ b/tests/dataproc/integration/processors/test_gri_osm.py @@ -5,22 +5,31 @@ import unittest import shutil -from dataproc.backends.storage.localfs import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.gri_osm.roads_and_rail_version_1 import ( - Processor, - Metadata, -) from tests.helpers import ( + assert_exists_awss3, load_country_geojson, assert_datapackage_resource, + clean_packages ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor ) -from config import PACKAGES_HOST_URL, TEST_GRI_OSM +from dataproc import Boundary +from dataproc.processors.core.gri_osm.roads_and_rail_version_1 import ( + Processor, + Metadata, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, + TEST_GRI_OSM +) class TestGRIOSMProcessor(unittest.TestCase): @@ -34,14 +43,31 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): # Tmp and Source data - shutil.rmtree(cls.test_processing_data_dir, ignore_errors=True) + shutil.rmtree(cls.test_processing_data_dir) # Package data - shutil.rmtree(os.path.join(cls.storage_backend.top_level_folder_path, "gambia"), ignore_errors=True) + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) def setUp(self): self.task_executor = DummyTaskExecutor() @@ -96,20 +122,29 @@ def test_generate(self): # Remove the final package artifacts (but keep the test data artifacts if they exist) if TEST_GRI_OSM is False: self.skipTest(f"Skipping GRI OSM due to TEST_GRI_OSM == {TEST_GRI_OSM}") - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) prov_log = self.proc.generate() # # Assert the log contains a succesful entries self.assertTrue(prov_log[f"{self.proc.metadata.name} - crop completed"]) self.assertTrue(prov_log[f"{self.proc.metadata.name} - move to storage success"]) # # Collect the URI for the final Raster final_uri = prov_log[f"{self.proc.metadata.name} - result URI"] - # Assert the file exists (replacing the uri for local FS) - self.assertTrue(os.path.exists(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR))) + if STORAGE_BACKEND == "localfs": + self.assertTrue(os.path.exists(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR))) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_exists_awss3( + s3_fs, + final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log["datapackage"]) diff --git a/tests/dataproc/integration/processors/test_gridfinder.py b/tests/dataproc/integration/processors/test_gridfinder.py index c92427c..d2b72d2 100644 --- a/tests/dataproc/integration/processors/test_gridfinder.py +++ b/tests/dataproc/integration/processors/test_gridfinder.py @@ -5,24 +5,32 @@ import unittest import shutil -from dataproc.backends import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.gridfinder.version_1 import ( - Processor, - Metadata, -) -from dataproc.helpers import assert_geotiff, assert_vector_file + from tests.helpers import ( load_country_geojson, - assert_raster_bounds_correct, + assert_vector_output, + assert_raster_output, assert_datapackage_resource, + clean_packages ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor, ) -from config import PACKAGES_HOST_URL +from dataproc import Boundary +from dataproc.processors.core.gridfinder.version_1 import ( + Processor, + Metadata, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, +) TEST_DATA_DIR = os.path.join( os.path.dirname( @@ -44,16 +52,30 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): # Tmp and Source data shutil.rmtree(cls.test_processing_data_dir, ignore_errors=True) # Package data - shutil.rmtree( - os.path.join(cls.storage_backend.top_level_folder_path, "gambia"), - ignore_errors=True, + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], ) def setUp(self): @@ -111,16 +133,21 @@ def test_generate(self): We are using locally sourced test-files. """ expected_crs = { - "grid.gpkg": "EPSG:4326", - "lv.tif": "ESRI:54009", - "targets.tif": "EPSG:4326", + "gridfinder-version_1-grid-gambia.gpkg": "EPSG:4326", + "gridfinder-version_1-lv-gambia.tif": "ESRI:54009", + "gridfinder-version_1-targets-gambia.tif": "EPSG:4326", } - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + result_source_map = { + "gridfinder-version_1-lv-gambia.tif": "lv.tif", + "gridfinder-version_1-targets-gambia.tif": "targets.tif" + } + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) # Move test-data into the expected source folder for _file in os.scandir(TEST_DATA_DIR): shutil.copy( @@ -136,25 +163,46 @@ def test_generate(self): # Collect the URIs for the final Raster final_uris = prov_log[f"{self.proc.metadata.name} - result URIs"] self.assertEqual(len(final_uris.split(",")), self.proc.total_expected_files) + # Collect the original source fpaths for pixel assertion for final_uri in final_uris.split(","): fname = os.path.basename(final_uri) if os.path.splitext(fname)[1] == ".tif": - # # Assert the geotiffs are valid - assert_geotiff( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), - check_crs=expected_crs[fname], - ) - # # Assert the envelopes - assert_raster_bounds_correct( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), - self.boundary["envelope_geojson"], - ) + # Match original source raster for pixel assertion + if STORAGE_BACKEND == "localfs": + assert_raster_output( + self.boundary["envelope_geojson"], + final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + check_crs=expected_crs[fname], + pixel_check_raster_fpath=os.path.join(self.proc.source_folder, result_source_map[fname]) + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_raster_output( + self.boundary["envelope_geojson"], + s3_fs=s3_fs, + s3_raster_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + check_crs=expected_crs[fname], + pixel_check_raster_fpath=os.path.join(self.proc.source_folder, result_source_map[fname]) + ) + else: + pass else: - assert_vector_file( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), - (163, 2), - expected_crs=expected_crs[fname], - ) + if STORAGE_BACKEND == "localfs": + assert_vector_output( + (84, 2), + expected_crs[fname], + local_vector_fpath=final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_vector_output( + (84, 2), + expected_crs[fname], + s3_fs=s3_fs, + s3_vector_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log["datapackage"]) diff --git a/tests/dataproc/integration/processors/test_isimp_drought.py b/tests/dataproc/integration/processors/test_isimp_drought.py new file mode 100644 index 0000000..d827661 --- /dev/null +++ b/tests/dataproc/integration/processors/test_isimp_drought.py @@ -0,0 +1,176 @@ +""" +Unit tests for ISIMP Drought +""" +import os +import unittest +import shutil + +from tests.helpers import ( + load_country_geojson, + assert_raster_bounds_correct, + assert_datapackage_resource, + clean_packages, + assert_raster_output +) +from tests.dataproc.integration.processors import ( + LOCAL_FS_PROCESSING_DATA_TOP_DIR, + LOCAL_FS_PACKAGE_DATA_TOP_DIR, + DummyTaskExecutor +) +from dataproc import Boundary +from dataproc.processors.core.isimp_drought.version_1 import ( + Processor, + Metadata, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, +) + +TEST_VERSION_1_SOURCE_FILES = [ + "lange2020_clm45_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif", + "lange2020_lpjml_miroc5_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2080_occurrence.tif", + "lange2020_clm45_gfdl-esm2m_ewembi_rcp60_2005soc_co2_led_global_annual_2006_2099_2030_occurrence.tif" +] + +TEST_DATA_DIR = os.path.join( + os.path.dirname( + os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + ), + "data", + "isimp_drought_v1", +) + +class TestISIMPDroughtV1Processor(unittest.TestCase): + """ + """ + + @classmethod + def setUpClass(cls): + cls.test_processing_data_dir = os.path.join( + LOCAL_FS_PROCESSING_DATA_TOP_DIR, Metadata().name, Metadata().version + ) + os.makedirs(cls.test_processing_data_dir, exist_ok=True) + gambia_geojson, envelope_geojson = load_country_geojson("gambia") + cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) + + @classmethod + def tearDownClass(cls): + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir, ignore_errors=True) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) + + def setUp(self): + self.task_executor = DummyTaskExecutor() + self.meta = Metadata() + self.proc = Processor( + self.meta, + self.boundary, + self.storage_backend, + self.task_executor, + LOCAL_FS_PROCESSING_DATA_TOP_DIR, + ) + + def test_processor_init(self): + """""" + self.assertIsInstance(self.proc, Processor) + + def test_context_manager(self): + """""" + with Processor( + self.meta, + self.boundary, + self.storage_backend, + self.task_executor, + self.test_processing_data_dir, + ) as proc: + self.assertIsInstance(proc, Processor) + + def test_context_manager_cleanup_on_error(self): + """""" + with Processor( + self.meta, + self.boundary, + self.storage_backend, + self.task_executor, + self.test_processing_data_dir, + ) as proc: + test_fpath = os.path.join(proc.tmp_processing_folder, "testfile") + # Add a file into the tmp processing backend + with open(test_fpath, "w") as fptr: + fptr.write("data") + self.assertFalse(os.path.exists(test_fpath)) + + def test_meta_init(self): + """""" + self.assertIsInstance(self.meta, Metadata) + self.assertNotEqual(self.meta.name, "") + self.assertNotEqual(self.meta.version, "") + self.assertNotEqual(self.meta.dataset_name, "") + + def test_generate(self): + """E2E generate test - fetch, crop, push""" + # Move test-data into the expected source folder + for _file in os.scandir(TEST_DATA_DIR): + shutil.copy( + os.path.join(TEST_DATA_DIR, _file.name), + os.path.join(self.proc.source_folder, _file.name), + ) + # Limit expected source files + self.proc.source_files = TEST_VERSION_1_SOURCE_FILES + self.proc.total_expected_files = len(TEST_VERSION_1_SOURCE_FILES) + prov_log = self.proc.generate() + # Assert the log contains successful entries + self.assertTrue(prov_log[f"{self.proc.metadata.name} - move to storage success"]) + # Collect the URIs for the final Rasters + final_uris = prov_log[f"{self.proc.metadata.name} - result URIs"] + self.assertEqual(len(final_uris.split(",")), self.proc.total_expected_files) + # Collect the original source fpaths for pixel assertion + source_fpaths = self.proc._fetch_source() + for idx, final_uri in enumerate(final_uris.split(",")): + if STORAGE_BACKEND == "localfs": + assert_raster_output( + self.boundary["envelope_geojson"], + final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + check_crs="EPSG:4326", + tolerence=0.5, + pixel_check_raster_fpath=source_fpaths[idx] + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_raster_output( + self.boundary["envelope_geojson"], + s3_fs=s3_fs, + s3_raster_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + check_crs="EPSG:4326", + tolerence=0.5, + pixel_check_raster_fpath=source_fpaths[idx] + ) + else: + pass + # Check the datapackage thats included in the prov log + self.assertIn("datapackage", prov_log.keys()) + assert_datapackage_resource(prov_log['datapackage']) diff --git a/tests/dataproc/integration/processors/test_jrc_ghsl_built_c.py b/tests/dataproc/integration/processors/test_jrc_ghsl_built_c.py index 99c0896..c19fc5e 100644 --- a/tests/dataproc/integration/processors/test_jrc_ghsl_built_c.py +++ b/tests/dataproc/integration/processors/test_jrc_ghsl_built_c.py @@ -5,24 +5,31 @@ import unittest import shutil -from dataproc.backends import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.jrc_ghsl_built_c.r2022_epoch2018_10m_mszfun import ( - Processor, - Metadata, -) -from dataproc.helpers import assert_geotiff from tests.helpers import ( load_country_geojson, assert_raster_bounds_correct, - assert_datapackage_resource + assert_datapackage_resource, + clean_packages, + assert_raster_output ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor ) -from config import PACKAGES_HOST_URL +from dataproc import Boundary +from dataproc.processors.core.jrc_ghsl_built_c.r2022_epoch2018_10m_mszfun import ( + Processor, + Metadata, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, +) class TestJRCGHSLBuiltCR2022Processor(unittest.TestCase): @@ -36,14 +43,31 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): # Tmp and Source data shutil.rmtree(cls.test_processing_data_dir, ignore_errors=True) # Package data - shutil.rmtree(os.path.join(cls.storage_backend.top_level_folder_path, "gambia"), ignore_errors=True) + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) def setUp(self): self.task_executor = DummyTaskExecutor() @@ -100,13 +124,6 @@ def test_meta_init(self): def test_generate(self): """E2E generate test - fetch, crop, push""" - self.skipTest(f"Skipping JRC BUILT-C due to WIP") - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass # Limit the files to be downloaded in the fetcher self.proc.total_expected_files = 2 prov_log = self.proc.generate() @@ -115,11 +132,29 @@ def test_generate(self): # Collect the URIs for the final Rasters final_uris = prov_log[f"{self.proc.metadata.name} - result URIs"] self.assertEqual(len(final_uris.split(",")), self.proc.total_expected_files) - for final_uri in final_uris.split(","): - # # Assert the geotiffs are valid - assert_geotiff(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), check_crs="ESRI:54009") - # # Assert the envelopes - assert_raster_bounds_correct(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), self.boundary["envelope_geojson"]) + # Collect the original source fpaths for pixel assertion + source_fpaths = self.proc._fetch_source() + for idx, final_uri in enumerate(final_uris.split(",")): + if STORAGE_BACKEND == "localfs": + assert_raster_output( + self.boundary["envelope_geojson"], + final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + check_crs="ESRI:54009", + check_is_bigtiff=True, + pixel_check_raster_fpath=source_fpaths[idx] + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_raster_output( + self.boundary["envelope_geojson"], + s3_fs=s3_fs, + s3_raster_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + check_crs="ESRI:54009", + check_is_bigtiff=True, + pixel_check_raster_fpath=source_fpaths[idx] + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log['datapackage']) diff --git a/tests/dataproc/integration/processors/test_jrc_ghsl_population.py b/tests/dataproc/integration/processors/test_jrc_ghsl_population.py index 5351e25..89455d8 100644 --- a/tests/dataproc/integration/processors/test_jrc_ghsl_population.py +++ b/tests/dataproc/integration/processors/test_jrc_ghsl_population.py @@ -5,24 +5,30 @@ import unittest import shutil -from dataproc.backends import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.jrc_ghsl_population.r2022_epoch2020_1km import ( - Processor, - Metadata, -) -from dataproc.helpers import assert_geotiff from tests.helpers import ( load_country_geojson, - assert_raster_bounds_correct, + assert_raster_output, assert_datapackage_resource, + clean_packages ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor, ) -from config import PACKAGES_HOST_URL +from dataproc import Boundary +from dataproc.processors.core.jrc_ghsl_population.r2022_epoch2020_1km import ( + Processor, + Metadata, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, +) class TestJRCGHSLPopR2022E20201KMProcessor(unittest.TestCase): @@ -36,16 +42,30 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): # Tmp and Source data shutil.rmtree(cls.test_processing_data_dir, ignore_errors=True) # Package data - shutil.rmtree( - os.path.join(cls.storage_backend.top_level_folder_path, "gambia"), - ignore_errors=True, + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], ) def setUp(self): @@ -103,12 +123,13 @@ def test_meta_init(self): def test_generate(self): """E2E generate test - fetch, crop, push""" - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) # Limit the files to be downloaded in the fetcher self.proc.total_expected_files = 1 prov_log = self.proc.generate() @@ -118,16 +139,24 @@ def test_generate(self): ) # Collect the URIs for the final Raster final_uri = prov_log[f"{self.proc.metadata.name} - result URI"] - # # Assert the geotiffs are valid - assert_geotiff( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), - check_crs="ESRI:54009", - ) - # # Assert the envelopes - NOTE: this will assert the Molleweide bounds - assert_raster_bounds_correct( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), - self.boundary["envelope_geojson"], - ) + if STORAGE_BACKEND == "localfs": + assert_raster_output( + self.boundary["envelope_geojson"], + final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + check_crs="ESRI:54009", + pixel_check_raster_fpath=self.proc._fetch_source() + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_raster_output( + self.boundary["envelope_geojson"], + s3_fs=s3_fs, + s3_raster_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + check_crs="ESRI:54009", + pixel_check_raster_fpath=self.proc._fetch_source() + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log["datapackage"]) diff --git a/tests/dataproc/integration/processors/test_natural_earth_raster.py b/tests/dataproc/integration/processors/test_natural_earth_raster.py index 4b6cc23..4729e27 100644 --- a/tests/dataproc/integration/processors/test_natural_earth_raster.py +++ b/tests/dataproc/integration/processors/test_natural_earth_raster.py @@ -5,24 +5,32 @@ import unittest import shutil -from dataproc.backends.storage.localfs import LocalFSStorageBackend + from tests.helpers import ( load_country_geojson, - assert_raster_bounds_correct, + assert_raster_output, assert_datapackage_resource, + clean_packages +) +from tests.dataproc.integration.processors import ( + LOCAL_FS_PROCESSING_DATA_TOP_DIR, + LOCAL_FS_PACKAGE_DATA_TOP_DIR, + DummyTaskExecutor, ) from dataproc import Boundary +from dataproc.helpers import sample_geotiff from dataproc.processors.core.natural_earth_raster.version_1 import ( Processor, Metadata, ) -from dataproc.helpers import assert_geotiff -from tests.dataproc.integration.processors import ( - LOCAL_FS_PROCESSING_DATA_TOP_DIR, - LOCAL_FS_PACKAGE_DATA_TOP_DIR, - DummyTaskExecutor, +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, ) -from config import PACKAGES_HOST_URL class TestNaturalEarthRasterProcessor(unittest.TestCase): @@ -36,14 +44,18 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) - - @classmethod - def tearDownClass(cls): + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env # Tmp and Source data shutil.rmtree(cls.test_processing_data_dir) # Package data - shutil.rmtree(os.path.join(cls.storage_backend.top_level_folder_path, "gambia")) + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) def setUp(self): self.task_executor = DummyTaskExecutor() @@ -56,6 +68,19 @@ def setUp(self): self.test_processing_data_dir, ) + @classmethod + def tearDownClass(cls): + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) + def test_processor_init(self): """""" self.assertIsInstance(self.proc, Processor) @@ -100,22 +125,35 @@ def test_fetch_source(self): def test_generate(self): """E2E generate test - fetch, crop, push""" - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) prov_log = self.proc.generate() # Assert the log contains a succesful entries self.assertTrue(prov_log[f"{self.proc.metadata.name} - crop success"]) - self.assertTrue(prov_log[f"{self.proc.metadata.name} - move to storage success"]) + self.assertTrue( + prov_log[f"{self.proc.metadata.name} - move to storage success"] + ) # Collect the URI for the final Raster final_uri = prov_log[f"{self.proc.metadata.name} - result URI"] - # Assert the geotiff is valid - assert_geotiff(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR)) - # Assert the envelope - assert_raster_bounds_correct(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), self.boundary["envelope_geojson"]) + if STORAGE_BACKEND == "localfs": + assert_raster_output( + self.boundary["envelope_geojson"], + final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + pixel_check_raster_fpath=self.proc._fetch_source() + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_raster_output( + self.boundary["envelope_geojson"], + s3_fs=s3_fs, + s3_raster_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + pixel_check_raster_fpath=self.proc._fetch_source() + ) # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log["datapackage"]) diff --git a/tests/dataproc/integration/processors/test_natural_earth_vector.py b/tests/dataproc/integration/processors/test_natural_earth_vector.py index 5a8f914..b12e14f 100644 --- a/tests/dataproc/integration/processors/test_natural_earth_vector.py +++ b/tests/dataproc/integration/processors/test_natural_earth_vector.py @@ -5,25 +5,27 @@ import unittest import shutil -from dataproc.backends.storage.localfs import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.natural_earth_vector.version_1 import ( - Processor, - Metadata, -) -from config import get_db_uri_sync, API_POSTGRES_DB from tests.helpers import ( load_country_geojson, assert_table_in_pg, drop_natural_earth_roads_from_pg, + assert_exists_awss3, assert_datapackage_resource, + clean_packages ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor ) -from config import PACKAGES_HOST_URL +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from dataproc import Boundary +from dataproc.processors.core.natural_earth_vector.version_1 import ( + Processor, + Metadata, +) +from config import get_db_uri_sync, API_POSTGRES_DB, PACKAGES_HOST_URL, S3_REGION, STORAGE_BACKEND, S3_BUCKET class TestNaturalEarthVectorProcessor(unittest.TestCase): @@ -38,20 +40,32 @@ def setUpClass(cls): cls.test_data_dir = None gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): # Cleans processing data - try: - # Tmp and Source data - shutil.rmtree(cls.test_processing_data_dir) - # Package data - shutil.rmtree( - os.path.join(cls.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - print("Skipped removing test data tree for", cls.__name__) + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) try: drop_natural_earth_roads_from_pg() except: @@ -118,20 +132,29 @@ def test_fetch_source(self): def test_generate(self): """E2E generate test - fetch, crop, push""" # Remove the final package artifacts (but keep the test data artifacts if they exist) - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) prov_log = self.proc.generate() # # Assert the log contains a succesful entries self.assertTrue(prov_log[f"{self.proc.metadata.name} - crop completed"]) self.assertTrue(prov_log[f"{self.proc.metadata.name} - move to storage success"]) # # Collect the URI for the final Raster final_uri = prov_log[f"{self.proc.metadata.name} - result URI"] - # Assert the file exists - self.assertTrue(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR)) + if STORAGE_BACKEND == "localfs": + self.assertTrue(os.path.exists(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR))) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_exists_awss3( + s3_fs, + final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log["datapackage"]) diff --git a/tests/dataproc/integration/processors/test_storm.py b/tests/dataproc/integration/processors/test_storm.py index 5a00318..4d8342d 100644 --- a/tests/dataproc/integration/processors/test_storm.py +++ b/tests/dataproc/integration/processors/test_storm.py @@ -5,24 +5,31 @@ import unittest import shutil -from dataproc.backends import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.storm.global_mosaics_version_1 import ( - Processor, - Metadata, -) -from dataproc.helpers import assert_geotiff, download_file from tests.helpers import ( load_country_geojson, - assert_raster_bounds_correct, + assert_raster_output, assert_datapackage_resource, + clean_packages ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor, ) -from config import PACKAGES_HOST_URL +from dataproc import Boundary +from dataproc.processors.core.storm.global_mosaics_version_1 import ( + Processor, + Metadata, +) +from dataproc.helpers import assert_geotiff, download_file +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, +) TEST_TIF_URL = "https://zenodo.org/record/7438145/files/STORM_FIXED_RETURN_PERIODS_CMCC-CM2-VHR4_10000_YR_RP.tif" @@ -38,14 +45,29 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): - # Tmp and Source data - shutil.rmtree(cls.test_processing_data_dir) # Package data - shutil.rmtree(os.path.join(cls.storage_backend.top_level_folder_path, "gambia")) + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) def setUp(self): self.task_executor = DummyTaskExecutor() @@ -97,12 +119,13 @@ def test_meta_init(self): def test_generate(self): """E2E generate test - fetch, crop, push""" - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) # Fetch a single file into the source folder then limit expected files _ = download_file( TEST_TIF_URL, @@ -116,16 +139,26 @@ def test_generate(self): # Collect the URIs for the final Raster final_uris = prov_log[f"{self.proc.metadata.name} - result URIs"] self.assertEqual(len(final_uris.split(",")), self.proc.total_expected_files) - for final_uri in final_uris.split(","): + # Collect the original source fpaths for pixel assertion + source_fpaths = self.proc._fetch_source() + for idx, final_uri in enumerate(final_uris.split(",")): # # Assert the geotiffs are valid - assert_geotiff( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR) - ) - # # Assert the envelopes - assert_raster_bounds_correct( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), - self.boundary["envelope_geojson"], - ) + if STORAGE_BACKEND == "localfs": + assert_raster_output( + self.boundary["envelope_geojson"], + final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + pixel_check_raster_fpath=source_fpaths[idx] + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_raster_output( + self.boundary["envelope_geojson"], + s3_fs=s3_fs, + s3_raster_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + pixel_check_raster_fpath=source_fpaths[idx] + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log["datapackage"]) diff --git a/tests/dataproc/integration/processors/test_wri_aqueduct.py b/tests/dataproc/integration/processors/test_wri_aqueduct.py index b17e5fd..cee7bc2 100644 --- a/tests/dataproc/integration/processors/test_wri_aqueduct.py +++ b/tests/dataproc/integration/processors/test_wri_aqueduct.py @@ -5,25 +5,33 @@ import unittest import shutil -from dataproc.backends import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.wri_aqueduct.version_2 import ( - Processor, - Metadata, -) -from dataproc.helpers import assert_geotiff from tests.helpers import ( load_country_geojson, assert_raster_bounds_correct, setup_test_data_paths, - assert_datapackage_resource + assert_raster_output, + assert_datapackage_resource, + clean_packages ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor ) -from config import PACKAGES_HOST_URL +from dataproc import Boundary +from dataproc.helpers import tiffs_in_folder +from dataproc.processors.core.wri_aqueduct.version_2 import ( + Processor, + Metadata, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, +) class TestWRIAqueductProcessor(unittest.TestCase): @@ -37,14 +45,31 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): # Tmp and Source data shutil.rmtree(cls.test_processing_data_dir) # Package data - shutil.rmtree(os.path.join(cls.storage_backend.top_level_folder_path, "gambia")) + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) def setUp(self): self.task_executor = DummyTaskExecutor() @@ -96,12 +121,13 @@ def test_meta_init(self): def test_generate(self): """E2E generate test - fetch, crop, push""" - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) # Limit the files to be downloaded in the fetcher self.proc.total_expected_files = 1 prov_log = self.proc.generate() @@ -110,11 +136,25 @@ def test_generate(self): # Collect the URIs for the final Raster final_uris = prov_log[f"{self.proc.metadata.name} - result URIs"] self.assertEqual(len(final_uris.split(",")), self.proc.total_expected_files) - for final_uri in final_uris.split(","): - # # Assert the geotiffs are valid - assert_geotiff(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR)) - # # Assert the envelopes - assert_raster_bounds_correct(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), self.boundary["envelope_geojson"]) + # Collect the original source fpaths for pixel assertion + source_tiffs = tiffs_in_folder(self.proc.source_folder) + for idx, final_uri in enumerate(final_uris.split(",")): + if STORAGE_BACKEND == "localfs": + assert_raster_output( + self.boundary["envelope_geojson"], + final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + pixel_check_raster_fpath=os.path.join(self.proc.source_folder, source_tiffs[idx]) + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_raster_output( + self.boundary["envelope_geojson"], + s3_fs=s3_fs, + s3_raster_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET), + pixel_check_raster_fpath=os.path.join(self.proc.source_folder, source_tiffs[idx]) + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log['datapackage']) diff --git a/tests/dataproc/integration/processors/test_wri_powerplants.py b/tests/dataproc/integration/processors/test_wri_powerplants.py index a0e9511..9eb7e19 100644 --- a/tests/dataproc/integration/processors/test_wri_powerplants.py +++ b/tests/dataproc/integration/processors/test_wri_powerplants.py @@ -5,23 +5,32 @@ import unittest import shutil -from dataproc.backends import LocalFSStorageBackend -from dataproc import Boundary -from dataproc.processors.core.wri_powerplants.version_130 import ( - Processor, - Metadata, -) -from dataproc.helpers import assert_vector_file from tests.helpers import ( + assert_exists_awss3, load_country_geojson, assert_datapackage_resource, + clean_packages, + assert_vector_output ) from tests.dataproc.integration.processors import ( LOCAL_FS_PROCESSING_DATA_TOP_DIR, LOCAL_FS_PACKAGE_DATA_TOP_DIR, DummyTaskExecutor ) -from config import PACKAGES_HOST_URL +from dataproc import Boundary +from dataproc.processors.core.wri_powerplants.version_130 import ( + Processor, + Metadata, +) +from dataproc.backends.storage import init_storage_backend +from dataproc.backends.storage.awss3 import S3Manager +from config import ( + PACKAGES_HOST_URL, + S3_REGION, + STORAGE_BACKEND, + S3_BUCKET, + TEST_GRI_OSM +) class TestWRIPowerplantsProcessor(unittest.TestCase): @@ -35,14 +44,31 @@ def setUpClass(cls): os.makedirs(cls.test_processing_data_dir, exist_ok=True) gambia_geojson, envelope_geojson = load_country_geojson("gambia") cls.boundary = Boundary("gambia", gambia_geojson, envelope_geojson) - cls.storage_backend = LocalFSStorageBackend(LOCAL_FS_PACKAGE_DATA_TOP_DIR) + cls.storage_backend = init_storage_backend(STORAGE_BACKEND) + # Ensure clean test-env + # Tmp and Source data + shutil.rmtree(cls.test_processing_data_dir, ignore_errors=True) + # Package data + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) @classmethod def tearDownClass(cls): # Tmp and Source data - shutil.rmtree(cls.test_processing_data_dir) + shutil.rmtree(cls.test_processing_data_dir, ignore_errors=True) # Package data - shutil.rmtree(os.path.join(cls.storage_backend.top_level_folder_path, "gambia")) + clean_packages( + STORAGE_BACKEND, + cls.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) def setUp(self): self.task_executor = DummyTaskExecutor() @@ -94,12 +120,13 @@ def test_meta_init(self): def test_generate(self): """E2E generate test - fetch, crop, push""" - try: - shutil.rmtree( - os.path.join(self.storage_backend.top_level_folder_path, "gambia") - ) - except FileNotFoundError: - pass + clean_packages( + STORAGE_BACKEND, + self.storage_backend, + s3_bucket=S3_BUCKET, + s3_region=S3_REGION, + packages=["gambia"], + ) # Limit the files to be downloaded in the fetcher self.proc.total_expected_files = 1 prov_log = self.proc.generate() @@ -112,14 +139,23 @@ def test_generate(self): # # Collect the URI for the final Raster final_uri = prov_log[f"{self.proc.metadata.name} - result URI"] # Assert the file exists - self.assertTrue( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR) - ) - assert_vector_file( - final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), - expected_shape=(2, 37), - expected_crs="EPSG:4326", - ) + if STORAGE_BACKEND == "localfs": + self.assertTrue(os.path.exists(final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR))) + assert_vector_output( + expected_shape=(2, 37), + expected_crs="EPSG:4326", + local_vector_fpath=final_uri.replace(PACKAGES_HOST_URL, LOCAL_FS_PACKAGE_DATA_TOP_DIR), + ) + elif STORAGE_BACKEND == "awss3": + with S3Manager(*self.storage_backend._parse_env(), region=S3_REGION) as s3_fs: + assert_vector_output( + expected_shape=(2, 37), + expected_crs="EPSG:4326", + s3_fs=s3_fs, + s3_vector_fpath=final_uri.replace(PACKAGES_HOST_URL, S3_BUCKET) + ) + else: + pass # Check the datapackage thats included in the prov log self.assertIn("datapackage", prov_log.keys()) assert_datapackage_resource(prov_log["datapackage"]) diff --git a/tests/dataproc/unit/processors/test_env.py b/tests/dataproc/unit/processors/test_env.py index 249f8b9..c4c5ddd 100644 --- a/tests/dataproc/unit/processors/test_env.py +++ b/tests/dataproc/unit/processors/test_env.py @@ -1,7 +1,9 @@ """ Test Processor Python environment """ +import os import unittest +from subprocess import check_call class TestProcessorEnv(unittest.TestCase): """""" @@ -14,4 +16,12 @@ def test_imports(self): import celery import shapely import pyproj - import rasterio \ No newline at end of file + import rasterio + import pyarrow as pa + import geopandas as gp + import psycopg2 + + def test_commands(self): + """""" + self.assertEqual(check_call(['gdalwarp', '--version']), 0) + self.assertEqual(check_call(['openssl', 'sha1', f'{os.path.abspath(__file__)}']), 0) \ No newline at end of file diff --git a/tests/dataproc/unit/test_localfs_backend.py b/tests/dataproc/unit/test_localfs_backend.py deleted file mode 100644 index 16df41e..0000000 --- a/tests/dataproc/unit/test_localfs_backend.py +++ /dev/null @@ -1,50 +0,0 @@ -""" -Unit tests for Dataproc classes -""" -import os -import unittest - -from dataproc.backends.storage.localfs import LocalFSStorageBackend -from tests.helpers import create_tree, remove_tree - -LOCAL_FS_DATA_TOP_DIR = os.path.join( - os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), - "data", - "packages", -) - - -class TestLocalFSBackend(unittest.TestCase): - """""" - - def setUp(self): - self.backend = LocalFSStorageBackend(LOCAL_FS_DATA_TOP_DIR) - - - def expected_fs_structure(self): - """ - The expected initial FS structure - """ - return { - "gambia": { - "aqueduct": ["0.1"], - "biodiversity": ["version_1"], - "osm_roads": ["20221201"], - }, - "zambia": {"osm_roads": ["20230401"]}, - } - - def test_init(self): - """Initialisation of the backend and methods available""" - self.assertIsInstance(self.backend, LocalFSStorageBackend) - self.assertEqual(self.backend.top_level_folder_path, LOCAL_FS_DATA_TOP_DIR) - self.assertTrue( - hasattr(self.backend, "tree") and callable(getattr(self.backend, "tree")) - ) - - def test_tree(self): - """Test Generation of the package / dataset / version structure""" - create_tree(LOCAL_FS_DATA_TOP_DIR) - tree = self.backend.tree() - self.assertDictEqual(tree, self.expected_fs_structure()) - remove_tree(LOCAL_FS_DATA_TOP_DIR) diff --git a/tests/helpers.py b/tests/helpers.py index 9b4f25f..e0c42bb 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -7,15 +7,21 @@ import json from typing import Any, List, Tuple import shutil +from time import sleep, time import sqlalchemy as sa import rasterio import shapely from shapely.ops import transform import pyproj +from pyarrow import fs +from pyarrow.fs import S3FileSystem, LocalFileSystem +import numpy as np from config import get_db_uri_sync, API_POSTGRES_DB, INTEGRATION_TEST_ENDPOINT from api import db +from dataproc.helpers import assert_geotiff, assert_vector_file, sample_geotiff, sample_geotiff_coords +from dataproc.backends.storage.awss3 import S3Manager, AWSS3StorageBackend current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parent_dir = os.path.dirname(os.path.dirname(current_dir)) @@ -28,11 +34,13 @@ engine = sa.create_engine(db_uri, pool_pre_ping=True) -def wipe_db(): +def wipe_db(setup_tables=True): """Wipe all SQLA Tables in the DB""" db_uri = get_db_uri_sync(API_POSTGRES_DB) # Init DB and Load via SA engine = sa.create_engine(db_uri, pool_pre_ping=True) + if setup_tables: + db.Base.metadata.create_all(engine) for tbl in reversed(db.Base.metadata.sorted_tables): engine.execute(tbl.delete()) @@ -113,10 +121,10 @@ def create_tree( top_level_path: str, packages: list = ["gambia", "zambia"], datasets: list = ["aqueduct", "biodiversity", "osm_roads"], - wipe_existing: bool = True + wipe_existing: bool = True, ): """ - Create a fake tree so we can check reading packages + Create a fake tree in local FS so we can check reading packages """ # Generate the datapackage.jsons for package in packages: @@ -177,12 +185,209 @@ def create_tree( def remove_tree(top_level_path: str, packages=["gambia", "zambia"]): """ - Cleanup the test tree + Cleanup the test tree from local FS """ for package in packages: shutil.rmtree(os.path.join(top_level_path, package), ignore_errors=True) +def create_tree_awss3( + s3_fs: S3FileSystem, + bucket: str, + packages: list = ["gambia", "zambia"], + datasets: list = ["aqueduct", "biodiversity", "osm_roads"], + wipe_existing: bool = True, +): + """ + Create a fake tree in local FS so we can check reading packages + """ + # Generate the datapackage.jsons + for package in packages: + if wipe_existing is True: + try: + s3_fs.delete_dir(os.path.join(bucket, package)) + except FileNotFoundError: + pass + s3_fs.create_dir(os.path.join(bucket, package)) + dp = gen_datapackage(package, datasets) + dp_fpath = os.path.join(bucket, package, "datapackage.json") + with s3_fs.open_output_stream(dp_fpath) as stream: + stream.write(json.dumps(dp).encode()) + + if "gambia" in packages: + if "noexist" in datasets: + # An invalid processor or dataset was placed in the tree + s3_fs.create_dir(os.path.join(bucket, "gambia", "datasets", "noexist")) + if "aqueduct" in datasets: + s3_fs.create_dir( + os.path.join(bucket, "gambia", "datasets", "aqueduct", "0.1") + ) + if "biodiversity" in datasets: + s3_fs.create_dir( + os.path.join(bucket, "gambia", "datasets", "biodiversity", "version_1") + ) + if "osm_roads" in datasets: + s3_fs.create_dir( + os.path.join(bucket, "gambia", "datasets", "osm_roads", "20221201") + ) + if "natural_earth_raster" in datasets: + s3_fs.create_dir( + os.path.join( + bucket, + "gambia", + "datasets", + "natural_earth_raster", + "version_1", + ) + ) + if "zambia" in packages: + if "osm_roads" in datasets: + s3_fs.create_dir( + os.path.join(bucket, "zambia", "datasets", "osm_roads", "20230401") + ) + + +def remove_tree_awss3( + s3_fs: S3FileSystem, bucket: str, packages: list = ["gambia", "zambia"] +): + """Remove a tree from aws s3 backend""" + for package in packages: + s3_fs.delete_dir(os.path.join(bucket, package)) + + +def clean_packages( + backend_type: str, + storage_backend: Any, + s3_bucket: str = None, + s3_region="eu-west-2", + packages=["gambia"] +): + """Remove packages used in a test""" + max_wait = 60 + start = time() + try: + if backend_type == "awss3": + with S3Manager(*storage_backend._parse_env(), region=s3_region) as s3_fs: + remove_tree_awss3(s3_fs, s3_bucket, packages=packages) + while True: + existing_packages = storage_backend.packages() + if any([True for i in existing_packages if i in packages]): + sleep(0.5) + else: + break + if (time()-start) > max_wait: + raise Exception("timed out waiting for packages to be deleted") + elif backend_type == "localfs": + remove_tree(storage_backend.top_level_folder_path, packages=packages) + else: + print("unknown backend type:", backend_type) + except FileNotFoundError: + pass + +def assert_vector_output( + expected_shape: tuple, + expected_crs: str, + local_vector_fpath: str=None, + s3_fs: S3FileSystem = None, + s3_vector_fpath: str = None, + tmp_folder: str = None, +): + """ + Wrapper for assert vector file with support for fetching from S3 + """ + if s3_fs and s3_vector_fpath: + if not tmp_folder: + local_vector_fpath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "data", + "processing", + os.path.basename(s3_vector_fpath), + ) + else: + local_vector_fpath = os.path.join( + tmp_folder, os.path.basename(s3_vector_fpath) + ) + fs.copy_files( + s3_vector_fpath, + local_vector_fpath, + source_filesystem=s3_fs, + destination_filesystem=fs.LocalFileSystem(), + ) + assert_vector_file( + local_vector_fpath, + expected_shape, + expected_crs=expected_crs, + ) + +def assert_raster_output( + envelope: dict, + localfs_raster_fpath: str = None, + s3_fs: S3FileSystem = None, + s3_raster_fpath: str = None, + check_crs: str = "EPSG:4326", + check_compression=True, + tolerence: float = 0.1, + tmp_folder: str = None, + check_is_bigtiff: bool=False, + pixel_check_raster_fpath: str = None, + pixel_check_num_samples: int = 100 +): + """ + Wrapper for assert_geotiff and assert_raster_bounds_correct + which asserts either local or S3 source results + if localfs_raster_fpath is provided then local source will be assumed + + if s3_fs and s3_raster_fpath are provided then requested source + will be pulled locally before assertions. + + ::kwarg pixel_check_raster_fpath str + If this kwarg is set then pixels will be sampled from the raster at localfs_raster_fpath + and compared to pisels in the raster at pixel_check_raster_fpath + """ + try: + if s3_fs and s3_raster_fpath: + if not tmp_folder: + localfs_raster_fpath = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "data", + "processing", + os.path.basename(s3_raster_fpath), + ) + else: + localfs_raster_fpath = os.path.join( + tmp_folder, os.path.basename(s3_raster_fpath) + ) + fs.copy_files( + s3_raster_fpath, + localfs_raster_fpath, + source_filesystem=s3_fs, + destination_filesystem=fs.LocalFileSystem(), + ) + if pixel_check_raster_fpath is not None: + # Collect sample and coords from the first raster, then sample second raster + src_coords = sample_geotiff_coords(localfs_raster_fpath, pixel_check_num_samples) + _, expected_samples = sample_geotiff(pixel_check_raster_fpath, coords=src_coords) + else: + src_coords = None + expected_samples = None + assert_geotiff( + localfs_raster_fpath, + check_crs=check_crs, + check_compression=check_compression, + check_is_bigtiff=check_is_bigtiff, + check_pixel_coords=src_coords, + check_pixel_expected_samples=expected_samples + ) + assert_raster_bounds_correct( + localfs_raster_fpath, envelope, tolerence=tolerence + ) + finally: + # Clean local S3 artifacts + if s3_fs and s3_raster_fpath: + if os.path.exists(localfs_raster_fpath): + os.remove(localfs_raster_fpath) + + def assert_raster_bounds_correct( raster_fpath: str, envelope: dict, tolerence: float = 0.1 ): @@ -222,12 +427,20 @@ def assert_raster_bounds_correct( ), f"bounds {src.bounds.bottom} did not match expected {min(y_coords)} within tolerence {tolerence}" -def assert_package( - top_level_fpath: str, boundary_name: str -): +def assert_exists_awss3(s3_fs: S3FileSystem, s3_raster_fpath: str): + """ + Check if a given file exists on the s3 filessytem + """ + chk = s3_fs.get_file_info(s3_raster_fpath) + assert ( + chk.type != fs.FileType.NotFound + ), f"file was not found on S3 {s3_raster_fpath}" + + +def assert_package(top_level_fpath: str, boundary_name: str): """Assert integrity of a package and datasets contained within - This does not assert the integrity of actualy data files (raster/vector); - just the folder structure + This does not assert the integrity of actualy data files (raster/vector); + just the folder structure """ required_top_level_docs = [ "index.html", @@ -256,6 +469,34 @@ def assert_package( os.path.join(top_level_fpath, boundary_name, doc) ), f"top-level {doc} missing" +def assert_package_awss3(awss3_backend: AWSS3StorageBackend, boundary_name: str, expected_processor_versions: List=[]): + """Assert integrity of a package and datasets contained within (on S3) + This does not assert the integrity of actualy data files (raster/vector); + just the folder structure + """ + required_top_level_docs = [ + "index.html", + "license.html", + "version.html", + "provenance.json", + "datapackage.json", + ] + packages = awss3_backend._list_directories(awss3_backend._build_absolute_path("")) + assert ( + boundary_name in packages + ), f"{boundary_name} missing in package S3 root: {packages}" + + # Ensure the top-level index and other docs exist + for doc in required_top_level_docs: + assert awss3_backend.boundary_file_exists( + boundary_name, doc + ), f"package {boundary_name} is missing a top-level file: {doc}" + + # Check we have folders for the expected processor versions + for proc_version in expected_processor_versions: + proc, version = proc_version.split('.') + s3_versions = awss3_backend.dataset_versions(boundary_name, proc) + assert version in s3_versions, f"{version} not found in dataset {s3_versions} for processor {proc}" def assert_table_in_pg(db_uri: str, tablename: str): """Check a given table exists in PG""" @@ -284,10 +525,11 @@ def assert_datapackage_resource(dp_resource: dict): assert "name" in dp_resource.keys(), "datapackage missing name" assert isinstance(dp_resource["path"], list), "datapackage path not a list" assert isinstance(dp_resource["hashes"], list), "datapackage hashes not a list" - assert isinstance(dp_resource["bytes"], int), f"datapackage bytes {dp_resource['bytes']} not a int was {type(dp_resource['bytes'])}" - assert ( - len(dp_resource["path"]) - == len(dp_resource["hashes"]) + assert isinstance( + dp_resource["bytes"], int + ), f"datapackage bytes {dp_resource['bytes']} not a int was {type(dp_resource['bytes'])}" + assert len(dp_resource["path"]) == len( + dp_resource["hashes"] ), f"datapackage path and hashes must be the same length {len(dp_resource['path'])}, {len(dp_resource['hashes'])}" assert isinstance(dp_resource["license"], dict), "datapackage license must be dict" assert (