Skip to content

Commit

Permalink
new method for osm crop (#8) (#9)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholac authored Mar 1, 2023
1 parent 2a5ddbe commit 0aca320
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
- develop

env:
VERSION: 0.2.3
VERSION: 0.2.4
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/build_prod_and_release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
- master

env:
VERSION: 0.2.3
VERSION: 0.2.4
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}

Expand Down
151 changes: 145 additions & 6 deletions dataproc/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
Helper methods / classes
"""
import inspect
from typing import List
from typing import Generator, List
from types import ModuleType
import os
import requests
import zipfile
import json
from subprocess import check_output, CalledProcessError, check_call
import shutil
from collections import OrderedDict
from time import time
import csv
import warnings

from dataproc.processors.internal.base import BaseProcessorABC, BaseMetadataABC
from dataproc.backends import StorageBackend
Expand Down Expand Up @@ -530,6 +534,146 @@ def gpkg_layer_name(pg_table_name: str, boundary: Boundary) -> str:
return f"{pg_table_name}_{boundary['name']}"


def copy_from_pg_table(pg_uri: str, sql: str, output_csv_fpath: str) -> int:
"""
Execute a COPY FROM for the given pg uri and SQL statement
::returns filesize int
"""
import psycopg2
sql = f"""COPY ({sql}) TO STDOUT WITH CSV HEADER"""
with psycopg2.connect(dsn=pg_uri) as conn:
with open(output_csv_fpath, 'w') as fptr:
with conn.cursor() as cur:
cur.copy_expert(sql, fptr)
with open(output_csv_fpath, 'rb') as fptr:
total_lines = sum(1 for i in fptr) - 1 # Remove header line
return total_lines


def crop_osm_to_geopkg(
boundary: Boundary,
pg_uri: str,
pg_table: str,
output_fpath: str,
geometry_column: str = 'geom',
extract_type: str = "clip",
limit : int = None,
batch_size: int = 1000,
) -> Generator:
"""
Uses GDAL interface to crop table to geopkg
https://gis.stackexchange.com/questions/397023/issue-to-convert-from-postgresql-input-to-gpkg-using-python-gdal-api-function-gd
GEOPKG Supports only a single Geometry column per table: https://github.com/opengeospatial/geopackage/issues/77
__NOTE__: We assume the input and output CRS is 4326
__NOTE__: PG doesnt permit EXCEPT syntax with field selection,
so all fields will be output (minus the original geometries if using "clip")
::kwarg extract_type str
Either "intersect" - keep the entire intersecting feature in the output
or "clip" includes only the clipped geometry in the output
::returns Generator[int, int, int, int]
Progress yield: csv_line_count, current_idx, lines_success, lines_skipped, lines_failed
"""
import fiona
from fiona.crs import from_epsg as crs_from_epsg
from shapely import from_wkt, to_geojson, from_wkb

geojson = json.dumps(boundary["geojson"])
if extract_type == "intersect":
stmt = f"SELECT {geometry_column}, properties FROM {pg_table} WHERE ST_Intersects(ST_GeomFromGeoJSON('{geojson}'), {geometry_column})"
else:
# Clip - remembering the geometry inside properties is the entire geometry, not the clipped one
stmt = f"""
WITH clip_geom AS (
SELECT st_geomfromgeojson(\'{geojson}\') AS geometry
)
SELECT (ST_Dump(ST_Intersection(clip_geom.geometry, {pg_table}.{geometry_column}))).geom AS {geometry_column}, properties
FROM {pg_table}, clip_geom
WHERE ST_Intersects({pg_table}.{geometry_column}, clip_geom.geometry)
"""
if limit is not None and int(limit):
stmt = f'{stmt} LIMIT {limit}'
try:
# Generate CSV using COPY command
tmp_csv_fpath = os.path.join(os.path.dirname(output_fpath), f'{time()}_tmp.csv')
csv_line_count = copy_from_pg_table(pg_uri, stmt, tmp_csv_fpath)
# Load CSV to geopkg
crs = crs_from_epsg(4326)
schema = {
'geometry': 'LineString',
'properties': OrderedDict({
'asset_id': 'float:16',
'osm_way_id': 'str',
'asset_type': 'str',
'paved': 'bool',
'material': 'str',
'lanes': 'int',
'_asset_type': 'str',
'rehab_cost_USD_per_km': 'float:16',
'sector': 'str',
'subsector': 'str',
'tag_bridge': 'str',
'bridge': 'bool',
'wkt': 'str'
})
}
template = {_k:None for _k, _ in schema['properties'].items()}
with fiona.open(output_fpath, 'w', driver='GPKG', crs=crs, schema=schema) as output:
with open(tmp_csv_fpath, newline='') as csvfile:
reader = csv.reader(csvfile, delimiter=',', quotechar='"')
next(reader, None) # Skip header
lines_skipped = 0
lines_failed = 0
lines_success = 0
batch = []
for idx, row in enumerate(reader):
try:
data = json.loads(row[1])
outrow = {}
geom = from_wkb(row[0])
if geom.geom_type != 'LineString':
lines_skipped+=1
continue
outrow['geometry'] = json.loads(to_geojson(geom))
# Null missing fields
outrow['properties'] = OrderedDict(template | data)
batch.append(outrow)
if len(batch) >= batch_size:
output.writerecords(batch)
output.flush()
lines_success+=len(batch)
batch = []
yield csv_line_count, idx+1, lines_success, lines_skipped, lines_failed
except Exception as err:
warnings.warn(f'failed to load rows to due: {err}')
# Attempt to load everything in the batch apart from the failed row
if batch:
for outrow in batch:
try:
output.write(outrow)
output.flush()
lines_success+=1
except Exception as rowerr:
warnings.warn(f"failed to load row: {outrow} due to {rowerr}")
lines_failed+=1
finally:
batch = []
# Final batch leftover
if len(batch) > 0:
output.writerecords(batch)
lines_success+=len(batch)
yield csv_line_count, idx+1, lines_success, lines_skipped, lines_failed
finally:
# Cleanup
if os.path.exists(tmp_csv_fpath):
os.remove(tmp_csv_fpath)
yield csv_line_count, idx+1, lines_success, lines_skipped, lines_failed

def gdal_crop_pg_table_to_geopkg(
boundary: Boundary,
pg_uri: str,
Expand All @@ -543,14 +687,10 @@ def gdal_crop_pg_table_to_geopkg(
"""
Uses GDAL interface to crop table to geopkg
https://gis.stackexchange.com/questions/397023/issue-to-convert-from-postgresql-input-to-gpkg-using-python-gdal-api-function-gd
GEOPKG Supports only a single Geometry column per table: https://github.com/opengeospatial/geopackage/issues/77
__NOTE__: We assume the input and output CRS is 4326
__NOTE__: PG doesnt permit EXCEPT syntax with field selection,
so all fields will be output (minus the original geometries if using "clip")
::kwarg extract_type str
Either "intersect" - keep the entire intersecting feature in the output
or "clip" - (Default) - includes only the clipped geometry in the output
Expand Down Expand Up @@ -587,7 +727,6 @@ def gdal_crop_pg_table_to_geopkg(
)
gdal.VectorTranslate(output_fpath, ds, options=vector_options)


def gp_crop_file_to_geopkg(
input_fpath: str,
boundary: Boundary,
Expand Down
8 changes: 6 additions & 2 deletions dataproc/processors/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

import pkgutil
import warnings

__all__ = []
for loader, module_name, is_pkg in pkgutil.walk_packages(__path__):
__all__.append(module_name)
_module = loader.find_module(module_name).load_module(module_name)
globals()[module_name] = _module
try:
_module = loader.find_module(module_name).load_module(module_name)
globals()[module_name] = _module
except Exception as err:
warnings.warn(f"failed to load module {module_name} due to {err}")
18 changes: 12 additions & 6 deletions dataproc/processors/core/gri_osm/roads_and_rail_version_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dataproc.processors.internal.base import BaseProcessorABC, BaseMetadataABC
from dataproc.helpers import (
version_name_from_file,
gdal_crop_pg_table_to_geopkg,
crop_osm_to_geopkg,
data_file_hash,
data_file_size,
processor_name_from_file,
Expand Down Expand Up @@ -60,8 +60,7 @@ class Processor(BaseProcessorABC):
input_pg_table = "features"
input_geometry_column = "geom"
output_geometry_operation = "clip" # Clip or intersect
output_geometry_column = "clipped_geometry"
output_layer_name = "gri-osm"
osm_crop_batch_size = 1000

def exists(self):
"""Whether all output files for a given processor & boundary exist on the FS on not"""
Expand All @@ -87,7 +86,7 @@ def generate(self):
# Crop to given boundary
self.update_progress(10, "cropping source")
self.log.debug("%s - cropping to geopkg", self.metadata.name)
gdal_crop_pg_table_to_geopkg(
gen = crop_osm_to_geopkg(
self.boundary,
str(get_db_uri_ogr(
dbname=os.getenv(self.pg_osm_dbname_env),
Expand All @@ -100,11 +99,17 @@ def generate(self):
output_fpath,
geometry_column=self.input_geometry_column,
extract_type=self.output_geometry_operation,
clipped_geometry_column_name=self.output_geometry_column
batch_size=self.osm_crop_batch_size
)
while True:
try:
progress = next(gen)
self.update_progress(10 + int((progress[1]/progress[0])*80), "cropping source")
except StopIteration:
break
self.provenance_log[f"{self.metadata.name} - crop completed"] = True
# Move cropped data to backend
self.update_progress(50, "moving result")
self.update_progress(90, "moving result")
self.log.debug("%s - moving cropped data to backend", {self.metadata.name})
result_uri = self.storage_backend.put_processor_data(
output_fpath,
Expand All @@ -127,6 +132,7 @@ def generate(self):
self.provenance_log["datapackage"] = datapkg
self.log.debug("%s generated datapackage in log: %s", self.metadata.name, datapkg)
# Cleanup as required
os.remove(output_fpath)
return self.provenance_log

def generate_documentation(self):
Expand Down
11 changes: 7 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
CELERY_RESULT_BACKEND: redis://redis

dataproc:
image: ghcr.io/nismod/irv-autopkg:0.2.3-dev
image: ghcr.io/nismod/irv-autopkg:0.2.4-dev
user: autopkg
build: .
volumes:
Expand All @@ -53,9 +53,12 @@ services:
env_file:
- envs/.api_and_dataproc.env
command: celery --app dataproc.tasks worker
cpus: 2
mem_reservation: "500M"
mem_limit: "1G"

api:
image: ghcr.io/nismod/irv-autopkg:0.2.3-dev
image: ghcr.io/nismod/irv-autopkg:0.2.4-dev
build: .
volumes:
- ./data/packages:/data/packages
Expand All @@ -73,15 +76,15 @@ services:
# WARNING - THESE TESTS WILL WIPE THE CONFIGURED TEST HARNESS DB from anything in MODELS
# To run tests change AUTOPKG_DEPLOYMENT_ENV=test in .api_and_dataproc.env, then reboot api and dataproc services
test-api:
image: ghcr.io/nismod/irv-autopkg:0.2.3-dev
image: ghcr.io/nismod/irv-autopkg:0.2.4-dev
volumes:
- ./tests/data:/usr/src/app/tests/data
env_file:
- envs/.api_and_dataproc.env
command: python3 -m unittest discover /usr/src/app/tests/api

test-dataproc:
image: ghcr.io/nismod/irv-autopkg:0.2.3-dev
image: ghcr.io/nismod/irv-autopkg:0.2.4-dev
volumes:
- ./tests/data:/usr/src/app/tests/data
env_file:
Expand Down
Loading

0 comments on commit 0aca320

Please sign in to comment.