Skip to content

Commit

Permalink
feat: add stac-arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomski committed May 31, 2024
1 parent c1c2668 commit ded75c7
Show file tree
Hide file tree
Showing 7 changed files with 291 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
- "-p stac -F reqwest"
- "-p stac-api"
- "-p stac -p stac-api -F geo"
- "-p stac-arrow"
- "-p stac-async"
- "-p stac-cli --no-default-features"
- "-p stac-server --no-default-features"
Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"stac",
"pgstac",
"stac-api",
"stac-arrow",
"stac-async",
"stac-cli",
"stac-server",
Expand All @@ -12,6 +13,7 @@ members = [
default-members = [
"stac",
"stac-api",
"stac-arrow",
"stac-async",
"stac-cli",
"stac-server",
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This monorepo contains several crates:
| [stac](./stac/README.md) | Core data structures and synchronous I/O | [![docs.rs](https://img.shields.io/docsrs/stac?style=flat-square)](https://docs.rs/stac/latest/stac/) <br> [![Crates.io](https://img.shields.io/crates/v/stac?style=flat-square)](https://crates.io/crates/stac) |
| [pgstac](./pgstac/README.md) | Bindings for [pgstac](https://github.com/stac-utils/pgstac) | [![docs.rs](https://img.shields.io/docsrs/pgstac?style=flat-square)](https://docs.rs/pgstac/latest/pgstac/) <br> [![Crates.io](https://img.shields.io/crates/v/pgstac?style=flat-square)](https://crates.io/crates/pgstac) |
| [stac-api](./stac-api/README.md) | Data structures for the [STAC API](https://github.com/radiantearth/stac-api-spec) specification | [![docs.rs](https://img.shields.io/docsrs/stac-api?style=flat-square)](https://docs.rs/stac-api/latest/stac_api/) <br> [![Crates.io](https://img.shields.io/crates/v/stac-api?style=flat-square)](https://crates.io/crates/stac-api) |
| [stac-arrow](./stac-arrow/README.md) | Read STAC data stored in [arrow](https://arrow.apache.org/) | [![docs.rs](https://img.shields.io/docsrs/stac-arrow?style=flat-square)](https://docs.rs/stac-arrow/latest/stac_arrow/) <br> [![Crates.io](https://img.shields.io/crates/v/stac-arrow?style=flat-square)](https://crates.io/crates/stac-arrow) |
| [stac-async](./stac-async/README.md) | Asynchronous I/O with [tokio](https://tokio.rs/) | [![docs.rs](https://img.shields.io/docsrs/stac-async?style=flat-square)](https://docs.rs/stac-async/latest/stac_async/) <br> [![Crates.io](https://img.shields.io/crates/v/stac-async?style=flat-square)](https://crates.io/crates/stac-async) |
| [stac-cli](./stac-cli/README.md)| Command line interface | [![docs.rs](https://img.shields.io/docsrs/stac-cli?style=flat-square)](https://docs.rs/stac-cli/latest/stac_cli/) <br> [![Crates.io](https://img.shields.io/crates/v/stac-cli?style=flat-square)](https://crates.io/crates/stac-cli) |
| [stac-server](./stac-server/README.md)| STAC API server with multiple backends | [![docs.rs](https://img.shields.io/docsrs/stac-server?style=flat-square)](https://docs.rs/stac-server/latest/stac_server/) <br> [![Crates.io](https://img.shields.io/crates/v/stac-server?style=flat-square)](https://crates.io/crates/stac-server) |
Expand Down
23 changes: 23 additions & 0 deletions stac-arrow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "stac-arrow"
version = "0.1.0"
authors = ["Pete Gadomski <pete.gadomski@gmail.com>"]
edition = "2021"
description = "Read STAC data stored in Apache Arrow"
homepage = "https://github.com/stac-utils/stac-rs"
repository = "https://github.com/stac-utils/stac-rs"
keywords = ["geospatial", "stac", "metadata", "geo", "arrow"]
categories = ["science", "data-structures"]

[dependencies]
arrow = { version = "51", features = ["chrono-tz"] }
arrow-json = "51"
log = "0.4"
serde_json = "1"
stac = { version = "0.7", path = "../stac" }
thiserror = "1"
wkb = "0.7"

[dev-dependencies]
parquet = "51"
stac-validate = { version = "0.1", path = "../stac-validate" }
42 changes: 42 additions & 0 deletions stac-arrow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# stac-arrow

[![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/stac-utils/stac-rs/ci.yml?branch=main&style=for-the-badge)](https://github.com/stac-utils/stac-rs/actions/workflows/ci.yml)
[![docs.rs](https://img.shields.io/docsrs/stac-arrow?style=for-the-badge)](https://docs.rs/stac-arrow/latest/stac_arrow/)
[![Crates.io](https://img.shields.io/crates/v/stac-arrow?style=for-the-badge)](https://crates.io/crates/stac-arrow)
![Crates.io](https://img.shields.io/crates/l/stac-arrow?style=for-the-badge)
[![Contributor Covenant](https://img.shields.io/badge/Contributor%20Covenant-2.1-4baaaa.svg?style=for-the-badge)](./CODE_OF_CONDUCT)

Read [STAC](https://stacspec.org/) data stored in [arrow](https://arrow.apache.org/).

## Usage

To use the library in your project:

```toml
[dependencies]
stac-arrow = "0.1"
```

## Examples

```rust
use std::fs::File;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;

let file = File::open("data/naip.parquet").unwrap();
let reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let mut items = Vec::new();
for result in reader {
items.extend(stac_arrow::record_batch_to_items(result.unwrap()).unwrap());
}
assert_eq!(items.len(), 5);
```

Please see the [documentation](https://docs.rs/stac-arrow) for more usage examples.

## Other info

This crate is part of the [stac-rs](https://github.com/stac-utils/stac-rs) monorepo, see its README for contributing and license information.
Binary file added stac-arrow/data/naip.parquet
Binary file not shown.
222 changes: 222 additions & 0 deletions stac-arrow/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
//! Read STAC from [arrow](https://arrow.apache.org/).
//!
//! The arrow data are expected to be formatted per the [stac-geoparquet
//! spec](https://github.com/stac-utils/stac-geoparquet/blob/main/spec/stac-geoparquet-spec.md).
#![deny(
elided_lifetimes_in_paths,
explicit_outlives_requirements,
keyword_idents,
macro_use_extern_crate,
meta_variable_misuse,
missing_abi,
missing_debug_implementations,
missing_docs,
non_ascii_idents,
noop_method_call,
pointer_structural_match,
rust_2021_incompatible_closure_captures,
rust_2021_incompatible_or_patterns,
rust_2021_prefixes_incompatible_syntax,
rust_2021_prelude_collisions,
single_use_lifetimes,
trivial_casts,
trivial_numeric_casts,
unreachable_pub,
unsafe_code,
unsafe_op_in_unsafe_fn,
unused_crate_dependencies,
unused_extern_crates,
unused_import_braces,
unused_lifetimes,
unused_qualifications,
unused_results,
warnings
)]

use arrow::array::{AsArray, RecordBatch};
use arrow_json::ArrayWriter;
use serde_json::{Map, Value};
use stac::Item;
use std::io::Cursor;
use thiserror::Error;

/// Crate-specific error enum
#[derive(Debug, Error)]
pub enum Error {
/// [arrow::error::ArrowError]
#[error(transparent)]
Arrow(#[from] arrow::error::ArrowError),

/// A required field is missing.
#[error("missing required field: {0}")]
MissingField(&'static str),

/// The geometry column is not binary.
#[error("non-binary geometry column")]
NonBinaryGeometryColumn,

/// [serde_json::Error]
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),

/// [wkb::WKBReadError]
#[error("wkb read error: {0:?}")]
WkbRead(wkb::WKBReadError),
}

/// Crate-specific result type.
pub type Result<T> = std::result::Result<T, Error>;

/// Converts a [RecordBatch] into a vector of [Items](Item).
///
/// # Examples
///
/// ```
/// use std::fs::File;
/// use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
///
/// let file = File::open("data/naip.parquet").unwrap();
/// let reader = ParquetRecordBatchReaderBuilder::try_new(file)
/// .unwrap()
/// .build()
/// .unwrap();
/// let mut items = Vec::new();
/// for result in reader {
/// items.extend(stac_arrow::record_batch_to_items(result.unwrap()).unwrap());
/// }
/// assert_eq!(items.len(), 5);
/// ```
pub fn record_batch_to_items(mut record_batch: RecordBatch) -> Result<Vec<Item>> {
let index = record_batch.schema().index_of("geometry")?;
let geometry = record_batch.remove_column(index);
// TODO allow for i64 offsets
let geometry = geometry
.as_binary_opt::<i32>()
.ok_or_else(|| Error::NonBinaryGeometryColumn)?;
let mut writer = ArrayWriter::new(Vec::new());
writer.write(&record_batch.into())?;
writer.finish()?;
let items: Vec<Map<String, Value>> = serde_json::from_reader(writer.into_inner().as_slice())?;
items
.into_iter()
.enumerate()
.map(|(i, item)| {
let mut item = map_to_item(item)?;
// TODO handle null geometries
item.geometry = Some((&wkb::wkb_to_geom(&mut Cursor::new(geometry.value(i)))?).into());
Ok(item)
})
.collect()
}

fn map_to_item(mut map: Map<String, Value>) -> Result<Item> {
let _ = map.remove("type");
let mut item = Item::new(map.remove("id").ok_or_else(|| Error::MissingField("id"))?);
if let Some(stac_extensions) = map.remove("stac_extensions") {
if let Value::Array(array) = stac_extensions {
for value in array {
if let Value::String(stac_extension) = value {
item.extensions.push(stac_extension);
} else {
log::warn!(
"stac_extension value not a string, discarding: {}",
value.to_string()
);
}
}
} else {
log::warn!(
"stac_extensions value not an array, discarding: {}",
stac_extensions.to_string()
);
}
}
if let Some(bbox) = map.remove("bbox") {
if let Value::Array(bbox) = bbox {
let original_length = bbox.len();
let bbox: Vec<_> = bbox
.into_iter()
.map(|value| value.as_f64())
.flatten()
.collect();
if bbox.len() != original_length {
log::warn!("some bbox values were not floats, discarding")
} else if bbox.len() != 4 && bbox.len() != 6 {
log::warn!("bbox is invalid length, discarding")
} else {
item.bbox = Some(bbox);
}
}
}
item.links = serde_json::from_value(
map.remove("links")
.ok_or_else(|| Error::MissingField("links"))?,
)?;
item.assets = serde_json::from_value(
map.remove("assets")
.ok_or_else(|| Error::MissingField("assets"))?,
)?;
if let Some(collection) = map.remove("collection") {
if let Value::String(collection) = collection {
item.collection = Some(collection)
} else {
log::warn!(
"collection is not a string, discarding: {}",
collection.to_string()
);
}
};
item.properties = serde_json::from_value(Value::Object(map))?;
Ok(item)
}

impl From<wkb::WKBReadError> for Error {
fn from(value: wkb::WKBReadError) -> Self {
Error::WkbRead(value)
}
}

#[cfg(test)]
mod tests {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use stac_validate::Validate;
use std::fs::File;

#[test]
fn record_batch_to_items() {
let file = File::open("data/naip.parquet").unwrap();
let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
.unwrap()
.build()
.unwrap();
let items = reader
.next()
.map(|result| super::record_batch_to_items(result.unwrap()).unwrap())
.unwrap();
assert_eq!(items.len(), 5);
for item in items {
assert_eq!(item.extensions.len(), 2);
assert!(item.geometry.is_some());
assert!(item.bbox.is_some());
assert!(!item.links.is_empty());
assert!(!item.assets.is_empty());
assert!(item.collection.is_some());
item.validate().unwrap();
}
}
}

// From https://github.com/rust-lang/cargo/issues/383#issuecomment-720873790,
// may they be forever blessed.
#[cfg(doctest)]
mod readme {
macro_rules! external_doc_test {
($x:expr) => {
#[doc = $x]
extern "C" {}
};
}

external_doc_test!(include_str!("../README.md"));
}

0 comments on commit ded75c7

Please sign in to comment.