Skip to content

Commit

Permalink
Add NAQWA data pull demo (#151)
Browse files Browse the repository at this point in the history
Add demo using lithops to query NAQWA data.


---------

Co-authored-by: thodson <thodson@usgs.gov>
  • Loading branch information
kjdoore and thodson-usgs authored Aug 8, 2024
1 parent d3865a2 commit c9fbab8
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 0 deletions.
59 changes: 59 additions & 0 deletions demos/nawqa_data_pull/Dockerfile_dataretrieval
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Python 3.11
FROM python:3.11-slim-buster


RUN apt-get update \
# Install aws-lambda-cpp build dependencies
&& apt-get install -y \
g++ \
make \
cmake \
unzip \
# cleanup package lists, they are not used anymore in this image
&& rm -rf /var/lib/apt/lists/* \
&& apt-cache search linux-headers-generic

ARG FUNCTION_DIR="/function"

# Copy function code
RUN mkdir -p ${FUNCTION_DIR}

# Update pip
# NB botocore/boto3 are pinned due to https://github.com/boto/boto3/issues/3648
# using versions from https://github.com/aio-libs/aiobotocore/blob/72b8dd5d7d4ef2f1a49a0ae0c37b47e5280e2070/setup.py
# due to s3fs dependency
RUN pip install --upgrade --ignore-installed pip wheel six setuptools \
&& pip install --upgrade --no-cache-dir --ignore-installed \
awslambdaric \
botocore==1.29.76 \
boto3==1.26.76 \
redis \
httplib2 \
requests \
numpy \
scipy \
pandas \
pika \
kafka-python \
cloudpickle \
ps-mem \
tblib

# Set working directory to function root directory
WORKDIR ${FUNCTION_DIR}

# Add Lithops
COPY lithops_lambda.zip ${FUNCTION_DIR}
RUN unzip lithops_lambda.zip \
&& rm lithops_lambda.zip \
&& mkdir handler \
&& touch handler/__init__.py \
&& mv entry_point.py handler/

# Put your dependencies here, using RUN pip install... or RUN apt install...

COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

ENTRYPOINT [ "/usr/local/bin/python", "-m", "awslambdaric" ]
CMD [ "handler.entry_point.lambda_handler" ]
44 changes: 44 additions & 0 deletions demos/nawqa_data_pull/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Retrieva data from the National Water Quality Assessment Program (NAWQA)

This examples walks through using lithops to retrieve data from every NAWQA
monitoring site, then writes the results to a parquet files on s3. Each
retrieval also searches the NLDI for neighboring sites with NAWQA data and
merges those data assuming the monitoring site was relocated.

1. Set up a Python environment
```bash
conda create --name dataretrieval-lithops -y python=3.11
conda activate dataretrieval-lithops
pip install -r requirements.txt
```

1. Configure compute and storage backends for [lithops](https://lithops-cloud.github.io/docs/source/configuration.html).
The configuration in `lithops.yaml` uses AWS Lambda for [compute](https://lithops-cloud.github.io/docs/source/compute_config/aws_lambda.html) and AWS S3 for [storage](https://lithops-cloud.github.io/docs/source/storage_config/aws_s3.html).
To use those backends, simply edit `lithops.yaml` with your `bucket` and `execution_role`.

1. Build a runtime image for Cubed
```bash
export LITHOPS_CONFIG_FILE=$(pwd)/lithops.yaml
lithops runtime build -b aws_lambda -f Dockerfile_dataretrieval dataretrieval-runtime
```

1. Download site list
```bash
wget https://www.sciencebase.gov/catalog/file/get/655d2063d34ee4b6e05cc9e6?f=__disk__b3%2F3e%2F5b%2Fb33e5b0038f004c2a48818d0fcc88a0921f3f689 -O NWQN_sites.csv
```

1. Create a s3 bucket for the output, then set it as an environmental variable
```bash
export DESTINATION_BUCKET=<path/to/bucket>
```

1. Run the script
```bash
python retrieve_nawqa_with_lithops.py
```

## Cleaning up
To rebuild the Litops image, delete the existing one by running
```bash
lithops runtime delete -b aws_lambda -d dataretrieval-runtime
```
14 changes: 14 additions & 0 deletions demos/nawqa_data_pull/lithops.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
lithops:
backend: aws_lambda
storage: aws_s3

aws:
region: us-west-2

aws_lambda:
execution_role: arn:aws:iam::807615458658:role/lambdaLithopsExecutionRole
runtime: dataretrieval-runtime
runtime_memory: 2000

aws_s3:
bucket: arn:aws:s3:::cubed-thodson-temp
8 changes: 8 additions & 0 deletions demos/nawqa_data_pull/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
boto3
dataretrieval[nldi]
lithops
pika
ps_mem
pyarrow
s3fs
tblib
112 changes: 112 additions & 0 deletions demos/nawqa_data_pull/retrieve_nawqa_with_lithops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Retrieve data from the National Water Quality Assessment Program (NAWQA)

import lithops
import math
import os
import pandas as pd

from dataretrieval import nldi, nwis, wqp

DESTINATION_BUCKET = os.environ.get('DESTINATION_BUCKET')
PROJECT = "National Water Quality Assessment Program (NAWQA)"


def map_retrieval(site):
"""Map function to pull data from NWIS and WQP"""
site_list = find_neighboring_sites(site)
# reformat for wqp
site_list = [f"USGS-{site}" for site in site_list]

df, _ = wqp.get_results(siteid=site_list,
project=PROJECT,
)

# merge sites
df['MonitoringLocationIdentifier'] = f"USGS-{site}"

if len(df) != 0:
df.astype(str).to_parquet(f's3://{DESTINATION_BUCKET}/nwqn-samples.parquet',
engine='pyarrow',
partition_cols=['MonitoringLocationIdentifier'],
compression='zstd')
# optionally, `return df` for further processing


def find_neighboring_sites(site, search_factor=0.05):
"""Find sites upstream and downstream of the given site within a certain distance.
Parameters
----------
site : str
8-digit site number.
search_factor : float, optional
"""
site_df, _ = nwis.get_info(sites=site)
drain_area_sq_mi = site_df["drain_area_va"].values[0]
length = _estimate_watershed_length_km(drain_area_sq_mi)
search_distance = length * search_factor
# clip between 1 and 9999km
search_distance = max(1.0, min(9999.0, search_distance))

upstream_gdf = nldi.get_features(
feature_source="WQP",
feature_id=f"USGS-{site}",
navigation_mode="UM",
distance=search_distance,
data_source="nwissite",
)

downstream_gdf = nldi.get_features(
feature_source="WQP",
feature_id=f"USGS-{site}",
navigation_mode="DM",
distance=search_distance,
data_source="nwissite",
)

features = pd.concat([upstream_gdf, downstream_gdf], ignore_index=True)

df, _ = nwis.get_info(sites=list(features.identifier.str.strip('USGS-')))
# drop sites with disimilar different drainage areas
df = df.where(
(df["drain_area_va"] / drain_area_sq_mi) > search_factor,
).dropna(how="all")

return df["site_no"].to_list()


def _estimate_watershed_length_km(drain_area_sq_mi):
"""Estimate the diameter assuming a circular watershed.
Parameters
----------
drain_area_sq_mi : float
The drainage area in square miles.
Returns
-------
float
The diameter of the watershed in kilometers.
"""
# assume a circular watershed
length_miles = 2 * (drain_area_sq_mi / math.pi) ** 0.5
# convert to km
return length_miles * 1.60934


if __name__ == "__main__":
project = "National Water Quality Assessment Program (NAWQA)"

site_df = pd.read_csv(
'NWQN_sites.csv',
comment='#',
dtype={'SITE_QW_ID': str, 'SITE_FLOW_ID': str},
)

site_list = site_df['SITE_QW_ID'].to_list()
# site_list = site_list[:4] # prune for testing

fexec = lithops.FunctionExecutor(config_file="lithops.yaml")
futures = fexec.map(map_retrieval, site_list)

futures.get_result()

0 comments on commit c9fbab8

Please sign in to comment.